#!/usr/bin/env python3 """ EPUB Converter - Compiles translated HTML files into EPUB format Supports extraction of translated titles from chapter content """ import os import sys import io import json import mimetypes import re import zipfile import unicodedata import html as html_module from xml.etree import ElementTree as ET from typing import Dict, List, Tuple, Optional, Callable from ebooklib import epub, ITEM_DOCUMENT from bs4 import BeautifulSoup from metadata_batch_translator import enhance_epub_compiler from concurrent.futures import ThreadPoolExecutor, as_completed try: from unified_api_client import UnifiedClient except ImportError: UnifiedClient = None # Configure stdout for UTF-8 def configure_utf8_output(): """Configure stdout for UTF-8 encoding""" try: if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8', errors='ignore') except AttributeError: if sys.stdout is None: devnull = open(os.devnull, "wb") sys.stdout = io.TextIOWrapper(devnull, encoding='utf-8', errors='ignore') elif hasattr(sys.stdout, 'buffer'): try: sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='ignore') except: pass # Global configuration configure_utf8_output() _global_log_callback = None def set_global_log_callback(callback: Optional[Callable]): """Set the global log callback for module-level functions""" global _global_log_callback _global_log_callback = callback def log(message: str): """Module-level logging that works with or without callback""" if _global_log_callback: _global_log_callback(message) else: print(message) class HTMLEntityDecoder: """Handles comprehensive HTML entity decoding with full Unicode support""" # Comprehensive entity replacement dictionary ENTITY_MAP = { # Quotation marks and apostrophes '"': '"', '"': '"', ''': "'", '&APOS;': "'", '‘': '\u2018', '’': '\u2019', '“': '\u201c', '”': '\u201d', '‚': '‚', '„': '„', '‹': '‹', '›': '›', '«': '«', '»': '»', # Spaces and dashes ' ': ' ', '&NBSP;': ' ', ' ': ' ', ' ': ' ', ' ': ' ', '‌': '\u200c', '‍': '\u200d', '‎': '\u200e', '‏': '\u200f', '–': '–', '—': '—', '−': '−', '‐': '‐', # Common symbols '…': '…', '…': '…', '•': '•', '•': '•', '·': '·', '·': '·', '§': '§', '¶': '¶', '†': '†', '‡': '‡', '◊': '◊', '♦': '♦', '♣': '♣', '♥': '♥', '♠': '♠', # Currency symbols '¢': '¢', '£': '£', '¥': '¥', '€': '€', '¤': '¤', # Mathematical symbols '±': '±', '×': '×', '÷': '÷', '⁄': '⁄', '‰': '‰', '‱': '‱', '′': '\u2032', '″': '\u2033', '∞': '∞', '∅': '∅', '∇': '∇', '&partial;': '∂', '∑': '∑', '∏': '∏', '∫': '∫', '√': '√', '≈': '≈', '≠': '≠', '≡': '≡', '≤': '≤', '≥': '≥', '⊂': '⊂', '⊃': '⊃', '⊄': '⊄', '⊆': '⊆', '⊇': '⊇', # Intellectual property '©': '©', '©': '©', '®': '®', '®': '®', '™': '™', '™': '™', } # Common encoding fixes ENCODING_FIXES = { # UTF-8 decoded as Latin-1 '’': "'", 'â€Å"': '"', '�': '"', '–': '–', 'â€â€': '—', ' ': ' ', 'ÂÂ': '', 'â': 'â', 'é': 'é', 'è': 'è', 'ä': 'ä', 'ö': 'ö', 'ü': 'ü', 'ñ': 'ñ', 'ç': 'ç', # Common mojibake patterns '’': "'", '“': '"', 'â€': '"', 'â€"': '—', 'â€"': '–', '…': '…', '•': '•', 'â„¢': '™', '©': '©', '®': '®', # Windows-1252 interpreted as UTF-8 '‘': '\u2018', '’': '\u2019', '“': '\u201c', 'â€': '\u201d', '•': '•', 'â€"': '–', 'â€"': '—', } @classmethod def decode(cls, text: str) -> str: """Comprehensive HTML entity decoding - PRESERVES UNICODE""" if text is None: return "" if not isinstance(text, str): text = str(text) if not text: return text # Fix common encoding issues first for bad, good in cls.ENCODING_FIXES.items(): text = text.replace(bad, good) # Multiple passes to handle nested/double-encoded entities max_passes = 3 for _ in range(max_passes): prev_text = text # Use html module for standard decoding (this handles <, >, etc.) text = html_module.unescape(text) if text == prev_text: break # Apply any remaining entity replacements for entity, char in cls.ENTITY_MAP.items(): text = text.replace(entity, char) return text @staticmethod def _decode_decimal(match): """Decode decimal HTML entity""" try: code = int(match.group(1)) if XMLValidator.is_valid_char_code(code): return chr(code) except: pass return match.group(0) @staticmethod def _decode_hex(match): """Decode hexadecimal HTML entity""" try: code = int(match.group(1), 16) if XMLValidator.is_valid_char_code(code): return chr(code) except: pass return match.group(0) class XMLValidator: """Handles XML validation and character checking""" @staticmethod def is_valid_char_code(codepoint: int) -> bool: """Check if a codepoint is valid for XML""" return ( codepoint == 0x9 or codepoint == 0xA or codepoint == 0xD or (0x20 <= codepoint <= 0xD7FF) or (0xE000 <= codepoint <= 0xFFFD) or (0x10000 <= codepoint <= 0x10FFFF) ) @staticmethod def is_valid_char(c: str) -> bool: """Check if a character is valid for XML""" return XMLValidator.is_valid_char_code(ord(c)) @staticmethod def clean_for_xml(text: str) -> str: """Remove invalid XML characters""" return ''.join(c for c in text if XMLValidator.is_valid_char(c)) class ContentProcessor: """Handles content cleaning and processing - UPDATED WITH UNICODE PRESERVATION""" @staticmethod def safe_escape(text: str) -> str: """Escape XML special characters for use in XHTML titles/attributes""" if text is None: return "" if not isinstance(text, str): try: text = str(text) except Exception: return "" # Use html.escape to handle &, <, > and quotes; then escape single quotes escaped = html_module.escape(text, quote=True) escaped = escaped.replace("'", "'") return escaped class TitleExtractor: """Handles extraction of titles from HTML content - UPDATED WITH UNICODE PRESERVATION""" @staticmethod def extract_from_html(html_content: str, chapter_num: Optional[int] = None, filename: Optional[str] = None) -> Tuple[str, float]: """Extract title from HTML content with confidence score - KEEP ALL HEADERS INCLUDING NUMBERS""" try: # Decode entities first - PRESERVES UNICODE html_content = HTMLEntityDecoder.decode(html_content) soup = BeautifulSoup(html_content, 'lxml', from_encoding='utf-8') candidates = [] # Strategy 1: tag (highest confidence) title_tag = soup.find('title') if title_tag and title_tag.string: title_text = HTMLEntityDecoder.decode(title_tag.string.strip()) if title_text and len(title_text) > 0 and title_text.lower() not in ['untitled', 'chapter', 'document']: candidates.append((title_text, 0.95, "title_tag")) # Strategy 2: h1 tags (very high confidence) h1_tags = soup.find_all('h1') for i, h1 in enumerate(h1_tags[:3]): # Check first 3 h1 tags text = HTMLEntityDecoder.decode(h1.get_text(strip=True)) if text and len(text) < 300: # First h1 gets highest confidence confidence = 0.9 if i == 0 else 0.85 candidates.append((text, confidence, f"h1_tag_{i+1}")) # Strategy 3: h2 tags (high confidence) h2_tags = soup.find_all('h2') for i, h2 in enumerate(h2_tags[:3]): # Check first 3 h2 tags text = HTMLEntityDecoder.decode(h2.get_text(strip=True)) if text and len(text) < 250: # First h2 gets highest confidence among h2s confidence = 0.8 if i == 0 else 0.75 candidates.append((text, confidence, f"h2_tag_{i+1}")) # Strategy 4: h3 tags (moderate confidence) h3_tags = soup.find_all('h3') for i, h3 in enumerate(h3_tags[:3]): # Check first 3 h3 tags text = HTMLEntityDecoder.decode(h3.get_text(strip=True)) if text and len(text) < 200: confidence = 0.7 if i == 0 else 0.65 candidates.append((text, confidence, f"h3_tag_{i+1}")) # Strategy 5: Bold text in first elements (lower confidence) first_elements = soup.find_all(['p', 'div'])[:5] for elem in first_elements: for bold in elem.find_all(['b', 'strong'])[:2]: # Limit to first 2 bold items bold_text = HTMLEntityDecoder.decode(bold.get_text(strip=True)) if bold_text and 2 <= len(bold_text) <= 150: candidates.append((bold_text, 0.6, "bold_text")) # Strategy 6: Center-aligned text (common for chapter titles) center_elements = soup.find_all(['center', 'div', 'p'], attrs={'align': 'center'}) or \ soup.find_all(['div', 'p'], style=lambda x: x and 'text-align' in x and 'center' in x) for center in center_elements[:3]: # Check first 3 centered elements text = HTMLEntityDecoder.decode(center.get_text(strip=True)) if text and 2 <= len(text) <= 200: candidates.append((text, 0.65, "centered_text")) # Strategy 7: All-caps text (common for titles in older books) for elem in soup.find_all(['h1', 'h2', 'h3', 'p', 'div'])[:10]: text = elem.get_text(strip=True) # Check if text is mostly uppercase if text and len(text) > 2 and text.isupper(): decoded_text = HTMLEntityDecoder.decode(text) # Keep it as-is (don't convert to title case automatically) candidates.append((decoded_text, 0.55, "all_caps_text")) # Strategy 8: Patterns in first paragraph first_p = soup.find('p') if first_p: p_text = HTMLEntityDecoder.decode(first_p.get_text(strip=True)) # Look for "Chapter X: Title" patterns chapter_pattern = re.match( r'^(Chapter\s+[\dIVXLCDM]+\s*[:\-\u2013\u2014]\s*)(.{2,100})(?:\.|$)', p_text, re.IGNORECASE ) if chapter_pattern: # Extract just the title part after "Chapter X:" title_part = chapter_pattern.group(2).strip() if title_part: candidates.append((title_part, 0.8, "paragraph_pattern_title")) # Also add the full "Chapter X: Title" as a lower confidence option full_title = chapter_pattern.group(0).strip().rstrip('.') candidates.append((full_title, 0.75, "paragraph_pattern_full")) elif len(p_text) <= 100 and len(p_text) > 2: # Short first paragraph might be the title candidates.append((p_text, 0.4, "paragraph_standalone")) # Strategy 9: Filename if filename: filename_match = re.search(r'response_\d+_(.+?)\.html', filename) if filename_match: filename_title = filename_match.group(1).replace('_', ' ').title() if len(filename_title) > 2: candidates.append((filename_title, 0.3, "filename")) # Filter and rank candidates if candidates: unique_candidates = {} for title, confidence, source in candidates: # Clean the title but keep roman numerals and short titles title = TitleExtractor.clean_title(title) # Don't reject short titles (like "III", "IX") - they're valid! if title and len(title) > 0: # Don't apply is_valid_title check too strictly # Roman numerals and chapter numbers are valid titles if title not in unique_candidates or unique_candidates[title][1] < confidence: unique_candidates[title] = (title, confidence, source) if unique_candidates: sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x[1], reverse=True) best_title, best_confidence, best_source = sorted_candidates[0] # Log what we found for debugging log(f"[DEBUG] Best title candidate: '{best_title}' (confidence: {best_confidence:.2f}, source: {best_source})") return best_title, best_confidence # Fallback - only use generic chapter number if we really found nothing if chapter_num: return f"Chapter {chapter_num}", 0.1 return "Untitled Chapter", 0.0 except Exception as e: log(f"[WARNING] Error extracting title: {e}") if chapter_num: return f"Chapter {chapter_num}", 0.1 return "Untitled Chapter", 0.0 @staticmethod def clean_title(title: str) -> str: """Clean and normalize extracted title - PRESERVE SHORT TITLES LIKE ROMAN NUMERALS""" if not title: return "" # Remove any [tag] patterns first #title = re.sub(r'\[(title|skill|ability|spell|detect|status|class|level|stat|buff|debuff|item|quest)[^\]]*?\]', '', title) # Decode entities - PRESERVES UNICODE title = HTMLEntityDecoder.decode(title) # Remove HTML tags title = re.sub(r'<[^>]+>', '', title) # Normalize spaces title = re.sub(r'[\xa0\u2000-\u200a\u202f\u205f\u3000]+', ' ', title) title = re.sub(r'\s+', ' ', title).strip() # Remove leading/trailing punctuation EXCEPT for roman numeral dots # Don't strip trailing dots from roman numerals like "III." or "IX." if not re.match(r'^[IVXLCDM]+\.?$', title, re.IGNORECASE): title = re.sub(r'^[][(){}\s\-\u2013\u2014:;,.|/\\]+', '', title).strip() title = re.sub(r'[][(){}\s\-\u2013\u2014:;,.|/\\]+$', '', title).strip() # Remove quotes if they wrap the entire title quote_pairs = [ ('"', '"'), ("'", "'"), ('\u201c', '\u201d'), ('\u2018', '\u2019'), # Smart quotes ('«', '»'), ('‹', '›'), # Guillemets ] for open_q, close_q in quote_pairs: if title.startswith(open_q) and title.endswith(close_q): title = title[len(open_q):-len(close_q)].strip() break # Normalize Unicode - PRESERVES READABILITY title = unicodedata.normalize('NFC', title) # Remove zero-width characters title = re.sub(r'[\u200b\u200c\u200d\u200e\u200f\ufeff]', '', title) # Final cleanup title = ' '.join(title.split()) # Truncate if too long if len(title) > 150: truncated = title[:147] last_space = truncated.rfind(' ') if last_space > 100: truncated = truncated[:last_space] title = truncated + "..." return title @staticmethod def is_valid_title(title: str) -> bool: """Check if extracted title is valid - ACCEPT SHORT TITLES LIKE ROMAN NUMERALS""" if not title: return False # Accept any non-empty title after cleaning # Don't reject roman numerals or short titles # Only reject truly invalid patterns invalid_patterns = [ r'^untitled$', # Just "untitled" r'^chapter$', # Just "chapter" without a number r'^document$', # Just "document" ] for pattern in invalid_patterns: if re.match(pattern, title.lower().strip()): return False # Skip obvious filler phrases filler_phrases = [ 'click here', 'read more', 'continue reading', 'next chapter', 'previous chapter', 'table of contents', 'back to top' ] title_lower = title.lower().strip() if any(phrase in title_lower for phrase in filler_phrases): return False # Accept everything else, including roman numerals and short titles return True class XHTMLConverter: """Handles XHTML conversion and compliance""" @staticmethod def ensure_compliance(html_content: str, title: str = "Chapter", css_links: Optional[List[str]] = None) -> str: """Ensure HTML content is XHTML-compliant while PRESERVING story tags""" try: import html import re # Add debug at the very start log(f"[DEBUG] Processing chapter: {title}") log(f"[DEBUG] Input HTML length: {len(html_content)}") # Unescape HTML entities but PRESERVE < and > so fake angle brackets in narrative # text don't become real tags (which breaks parsing across paragraphs like the sample). if any(ent in html_content for ent in ['&', '"', '&#', '<', '>']): log(f"[DEBUG] Unescaping HTML entities (preserving < and >)") # Temporarily protect < and > (both cases) from unescaping placeholder_lt = '\ue000' placeholder_gt = '\ue001' html_content = html_content.replace('<', placeholder_lt).replace('<', placeholder_lt) html_content = html_content.replace('>', placeholder_gt).replace('>', placeholder_gt) # Unescape remaining entities html_content = html.unescape(html_content) # Restore protected angle bracket entities html_content = html_content.replace(placeholder_lt, '<').replace(placeholder_gt, '>') # Strip out ANY existing DOCTYPE, XML declaration, or html wrapper # We only want the body content log(f"[DEBUG] Extracting body content") # Try to extract just body content body_match = re.search(r'<body[^>]*>(.*?)</body>', html_content, re.DOTALL | re.IGNORECASE) if body_match: html_content = body_match.group(1) log(f"[DEBUG] Extracted body content") else: # No body tags, strip any DOCTYPE/html tags if present html_content = re.sub(r'<\?xml[^>]*\?>', '', html_content) html_content = re.sub(r'<!DOCTYPE[^>]*>', '', html_content) html_content = re.sub(r'</?html[^>]*>', '', html_content) html_content = re.sub(r'<head[^>]*>.*?</head>', '', html_content, flags=re.DOTALL) log(f"[DEBUG] Stripped wrapper tags") # Now process the content normally # Fix broken attributes with ="" pattern def fix_broken_attributes_only(match): tag_content = match.group(0) if '=""' in tag_content and tag_content.count('=""') > 2: tag_match = re.match(r'<(\w+)', tag_content) if tag_match: tag_name = tag_match.group(1) words = re.findall(r'(\w+)=""', tag_content) if words: content = ' '.join(words) return f'<{tag_name}>{content}</{tag_name}>' return '' return tag_content html_content = re.sub(r'<[^>]*?=""[^>]*?>', fix_broken_attributes_only, html_content) # Sanitize attributes that contain a colon (:) but are NOT valid namespaces. # Example: <status effects:="" high="" temperature="" unconscious=""></status> # becomes: <status data-effects="" high="" temperature="" unconscious=""></status> def _sanitize_colon_attributes_in_tags(text: str) -> str: # Process only inside start tags; skip closing tags, comments, doctypes, processing instructions def _process_tag(tag_match): tag = tag_match.group(0) if tag.startswith('</') or tag.startswith('<!') or tag.startswith('<?'): return tag def _attr_repl(m): before, name, eqval = m.group(1), m.group(2), m.group(3) lname = name.lower() # Preserve known namespace attributes if ( lname.startswith('xml:') or lname.startswith('xlink:') or lname.startswith('epub:') or lname == 'xmlns' or lname.startswith('xmlns:') ): return m.group(0) if ':' not in name: return m.group(0) # Replace colon(s) with dashes and prefix with data- safe = re.sub(r'[:]+', '-', name).strip('-') safe = re.sub(r'[^A-Za-z0-9_.-]', '-', safe) or 'attr' if not safe.startswith('data-'): safe = 'data-' + safe return f'{before}{safe}{eqval}' # Replace attributes with colon in the name (handles both single and double quoted values) tag = re.sub(r'(\s)([A-Za-z_:][A-Za-z0-9_.:-]*:[A-Za-z0-9_.:-]*)(\s*=\s*(?:"[^"]*"|\'[^\']*\'))', _attr_repl, tag) return tag return re.sub(r'<[^>]+>', _process_tag, text) html_content = _sanitize_colon_attributes_in_tags(html_content) # Convert only "story tags" whose TAG NAME contains a colon (e.g., <System:Message>), # but DO NOT touch valid HTML/SVG tags where colons appear in attributes (e.g., style="color:red" or xlink:href) # and DO NOT touch namespaced tags like <svg:rect>. allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"} def _escape_story_tag(match): full_tag = match.group(0) # Entire <...> or </...> tag_name = match.group(1) # The tag name possibly containing ':' prefix = tag_name.split(':', 1)[0].lower() # If this is a known namespace prefix (e.g., svg:rect), leave it alone if prefix in allowed_ns_prefixes: return full_tag # Otherwise, treat as a story/fake tag and replace angle brackets with Chinese brackets return full_tag.replace('<', '《').replace('>', '》') # Escape invalid story tags (tag names containing ':') so they render literally with angle brackets. allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"} def _escape_story_tag_entities(m): tagname = m.group(1) prefix = tagname.split(':', 1)[0].lower() if prefix in allowed_ns_prefixes: return m.group(0) tag_text = m.group(0) return tag_text.replace('<', '<').replace('>', '>') # Apply in order: self-closing, opening, closing html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, html_content) html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, html_content) html_content = re.sub(r'</([A-Za-z][\w.-]*:[\w.-]*)\s*>', _escape_story_tag_entities, html_content) # Parse with lxml from lxml import html as lxml_html, etree parser = lxml_html.HTMLParser(recover=True) doc = lxml_html.document_fromstring(f"<div>{html_content}</div>", parser=parser) # Get the content back body_xhtml = etree.tostring(doc, method='xml', encoding='unicode') # Remove the wrapper div we added body_xhtml = re.sub(r'^<div[^>]*>|</div>$', '', body_xhtml) # Optionally replace angle-bracket entities with Chinese brackets # Default behavior: keep them as entities (< >) so the output preserves the original text bracket_style = os.getenv('ANGLE_BRACKET_OUTPUT', 'entity').lower() if '<' in body_xhtml or '>' in body_xhtml: if bracket_style in ('cjk', 'chinese', 'cjk_brackets'): body_xhtml = body_xhtml.replace('<', '《').replace('>', '》') # else: keep as entities # Build our own clean XHTML document return XHTMLConverter._build_xhtml(title, body_xhtml, css_links) except Exception as e: log(f"[WARNING] Failed to ensure XHTML compliance: {e}") import traceback log(f"[DEBUG] Full traceback:\n{traceback.format_exc()}") log(f"[DEBUG] Failed chapter title: {title}") log(f"[DEBUG] First 500 chars of input: {html_content[:500] if html_content else 'EMPTY'}") return XHTMLConverter._build_fallback_xhtml(title) @staticmethod def _build_xhtml(title: str, body_content: str, css_links: Optional[List[str]] = None) -> str: """Build XHTML document""" if not body_content.strip(): body_content = '<p>Empty chapter</p>' title = ContentProcessor.safe_escape(title) body_content = XHTMLConverter._ensure_xml_safe_readable(body_content) xml_declaration = '<?xml version="1.0" encoding="utf-8"?>' doctype = '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">' xhtml_parts = [ xml_declaration, doctype, '<html xmlns="http://www.w3.org/1999/xhtml" xmlns:epub="http://www.idpf.org/2007/ops">', '<head>', '<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />', f'<title>{title}' ] if css_links: for css_link in css_links: if css_link.startswith('') xhtml_parts.extend([ '', '', body_content, '', '' ]) return '\n'.join(xhtml_parts) @staticmethod def _ensure_xml_safe_readable(content: str) -> str: """Ensure content is XML-safe""" content = re.sub( r'&(?!(?:' r'[a-zA-Z][a-zA-Z0-9]{0,30};|' r'#[0-9]{1,7};|' r'#x[0-9a-fA-F]{1,6};' r'))', '&', content ) return content @staticmethod def _build_fallback_xhtml(title: str) -> str: """Build minimal fallback XHTML""" safe_title = re.sub(r'[<>&"\']+', '', str(title)) if not safe_title: safe_title = "Chapter" return f''' {ContentProcessor.safe_escape(safe_title)}

Error processing content. Please check the source file.

''' @staticmethod def validate(content: str) -> str: """Validate and fix XHTML content - WITH DEBUGGING""" import re # Ensure XML declaration if not content.strip().startswith('\n' + content # Remove control characters content = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', content) # Fix unescaped ampersands content = re.sub( r'&(?!(?:' r'amp|lt|gt|quot|apos|' r'[a-zA-Z][a-zA-Z0-9]{1,31}|' r'#[0-9]{1,7}|' r'#x[0-9a-fA-F]{1,6}' r');)', '&', content ) # Fix unquoted attributes try: content = re.sub(r'<([^>]+)\s+(\w+)=([^\s"\'>]+)([>\s])', r'<\1 \2="\3"\4', content) except re.error: pass # Skip if regex fails # Sanitize invalid colon-containing attribute names (preserve XML/xlink/epub/xmlns) def _sanitize_colon_attrs_in_content(text: str) -> str: def _process_tag(m): tag = m.group(0) if tag.startswith(']+>', _process_tag, text) content = _sanitize_colon_attrs_in_content(content) # Escape invalid story tags so they render literally with angle brackets in output allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"} def _escape_story_tag_entities(m): tagname = m.group(1) prefix = tagname.split(':', 1)[0].lower() if prefix in allowed_ns_prefixes: return m.group(0) tag_text = m.group(0) return tag_text.replace('<', '<').replace('>', '>') # Apply in order: self-closing, opening, closing content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, content) content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, content) content = re.sub(r'', _escape_story_tag_entities, content) # Clean for XML content = XMLValidator.clean_for_xml(content) # Try to parse for validation try: ET.fromstring(content.encode('utf-8')) except ET.ParseError as e: log(f"[WARNING] XHTML validation failed: {e}") # DEBUG: Show what's at the error location import re match = re.search(r'line (\d+), column (\d+)', str(e)) if match: line_num = int(match.group(1)) col_num = int(match.group(2)) lines = content.split('\n') log(f"[DEBUG] Error at line {line_num}, column {col_num}") log(f"[DEBUG] Total lines in content: {len(lines)}") if line_num <= len(lines): problem_line = lines[line_num - 1] log(f"[DEBUG] Full problem line: {problem_line!r}") # Show the problem area if col_num <= len(problem_line): # Show 40 characters before and after start = max(0, col_num - 40) end = min(len(problem_line), col_num + 40) log(f"[DEBUG] Context around error: {problem_line[start:end]!r}") log(f"[DEBUG] Character at column {col_num}: {problem_line[col_num-1]!r} (U+{ord(problem_line[col_num-1]):04X})") # Show 5 characters before and after with hex for i in range(max(0, col_num-5), min(len(problem_line), col_num+5)): char = problem_line[i] marker = " <-- ERROR" if i == col_num-1 else "" log(f"[DEBUG] Col {i+1}: {char!r} (U+{ord(char):04X}){marker}") else: log(f"[DEBUG] Column {col_num} is beyond line length {len(problem_line)}") else: log(f"[DEBUG] Line {line_num} doesn't exist (only {len(lines)} lines)") # Show last few lines for i in range(max(0, len(lines)-3), len(lines)): log(f"[DEBUG] Line {i+1}: {lines[i][:100]!r}...") # Try to recover content = XHTMLConverter._attempt_recovery(content, e) return content @staticmethod def _attempt_recovery(content: str, error: ET.ParseError) -> str: """Attempt to recover from XML parse errors - ENHANCED""" try: # Use BeautifulSoup to fix structure soup = BeautifulSoup(content, 'lxml') # Ensure we have proper XHTML structure if not soup.find('html'): new_soup = BeautifulSoup('', 'lxml') html_tag = new_soup.html for child in list(soup.children): html_tag.append(child) soup = new_soup # Ensure we have head and body if not soup.find('head'): head = soup.new_tag('head') meta = soup.new_tag('meta') meta['http-equiv'] = 'Content-Type' meta['content'] = 'text/html; charset=utf-8' head.append(meta) title_tag = soup.new_tag('title') title_tag.string = 'Chapter' head.append(title_tag) if soup.html: soup.html.insert(0, head) if not soup.find('body'): body = soup.new_tag('body') if soup.html: for child in list(soup.html.children): if child.name not in ['head', 'body']: body.append(child.extract()) soup.html.append(body) # Convert back to string recovered = str(soup) # Ensure proper XML declaration if not recovered.strip().startswith('\n' + recovered # Add DOCTYPE if missing if '') recovered = '\n'.join(lines) # Final validation ET.fromstring(recovered.encode('utf-8')) log(f"[INFO] Successfully recovered XHTML") return recovered except Exception as recovery_error: log(f"[WARNING] Recovery attempt failed: {recovery_error}") # Last resort: use fallback return XHTMLConverter._build_fallback_xhtml("Chapter") class FileUtils: """File handling utilities""" @staticmethod def sanitize_filename(filename: str, allow_unicode: bool = False) -> str: """Sanitize filename for safety""" if allow_unicode: filename = unicodedata.normalize('NFC', filename) replacements = { '/': '_', '\\': '_', ':': '_', '*': '_', '?': '_', '"': '_', '<': '_', '>': '_', '|': '_', '\0': '_', } for old, new in replacements.items(): filename = filename.replace(old, new) filename = ''.join(char for char in filename if ord(char) >= 32 or ord(char) == 9) else: filename = unicodedata.normalize('NFKD', filename) try: filename = filename.encode('ascii', 'ignore').decode('ascii') except: filename = ''.join(c if ord(c) < 128 else '_' for c in filename) replacements = { '/': '_', '\\': '_', ':': '_', '*': '_', '?': '_', '"': '_', '<': '_', '>': '_', '|': '_', '\n': '_', '\r': '_', '\t': '_', '&': '_and_', '#': '_num_', ' ': '_', } for old, new in replacements.items(): filename = filename.replace(old, new) filename = ''.join(char for char in filename if ord(char) >= 32) filename = re.sub(r'_+', '_', filename) filename = filename.strip('_') # Limit length name, ext = os.path.splitext(filename) if len(name) > 100: name = name[:100] if not name or name == '_': name = 'file' return name + ext @staticmethod def ensure_bytes(content) -> bytes: """Ensure content is bytes""" if content is None: return b'' if isinstance(content, bytes): return content if not isinstance(content, str): content = str(content) return content.encode('utf-8') class EPUBCompiler: """Main EPUB compilation class""" def __init__(self, base_dir: str, log_callback: Optional[Callable] = None): self.base_dir = os.path.abspath(base_dir) self.log_callback = log_callback self.output_dir = self.base_dir self.images_dir = os.path.join(self.output_dir, "images") self.css_dir = os.path.join(self.output_dir, "css") self.fonts_dir = os.path.join(self.output_dir, "fonts") self.metadata_path = os.path.join(self.output_dir, "metadata.json") self.attach_css_to_chapters = os.getenv('ATTACH_CSS_TO_CHAPTERS', '0') == '1' # Default to '0' (disabled) self.max_workers = int(os.environ.get("EXTRACTION_WORKERS", "4")) self.log(f"[INFO] Using {self.max_workers} workers for parallel processing") # Track auxiliary (non-chapter) HTML files to include in spine but omit from TOC self.auxiliary_html_files: set[str] = set() # SVG rasterization settings self.rasterize_svg = os.getenv('RASTERIZE_SVG_FALLBACK', '1') == '1' try: import cairosvg # noqa: F401 self._cairosvg_available = True except Exception: self._cairosvg_available = False # Set global log callback set_global_log_callback(log_callback) # translation features self.html_dir = self.output_dir # For compatibility self.translate_titles = os.getenv('TRANSLATE_BOOK_TITLE', '1') == '1' # Initialize API client if needed self.api_client = None if self.translate_titles or os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1': model = os.getenv('MODEL') api_key = os.getenv('API_KEY') if model and api_key and UnifiedClient: self.api_client = UnifiedClient(api_key=api_key, model=model, output_dir=self.output_dir) elif model and api_key and not UnifiedClient: self.log("Warning: UnifiedClient module not available, translation features disabled") # Enhance with translation features enhance_epub_compiler(self) def log(self, message: str): """Log a message""" if self.log_callback: self.log_callback(message) else: print(message) def compile(self): """Main compilation method""" try: # Debug: Check what metadata enhancement was done self.log("[DEBUG] Checking metadata translation setup...") self.log(f"[DEBUG] Has api_client: {hasattr(self, 'api_client') and self.api_client is not None}") self.log(f"[DEBUG] Has metadata_translator: {hasattr(self, 'metadata_translator')}") self.log(f"[DEBUG] Has translate_metadata_fields: {hasattr(self, 'translate_metadata_fields')}") if hasattr(self, 'translate_metadata_fields'): self.log(f"[DEBUG] translate_metadata_fields content: {self.translate_metadata_fields}") enabled_fields = [k for k, v in self.translate_metadata_fields.items() if v] self.log(f"[DEBUG] Enabled metadata fields: {enabled_fields}") # Pre-flight check if not self._preflight_check(): return # Analyze chapters FIRST to get the structure chapter_titles_info = self._analyze_chapters() # Debug: Check if batch translation is enabled self.log(f"[DEBUG] Batch translation enabled: {getattr(self, 'batch_translate_headers', False)}") self.log(f"[DEBUG] Has header translator: {hasattr(self, 'header_translator')}") self.log(f"[DEBUG] EPUB_PATH env: {os.getenv('EPUB_PATH', 'NOT SET')}") self.log(f"[DEBUG] HTML dir: {self.html_dir}") # Extract source headers AND current titles if batch translation is enabled source_headers = {} current_titles = {} if (hasattr(self, 'batch_translate_headers') and self.batch_translate_headers and hasattr(self, 'header_translator') and self.header_translator): # Check if the extraction method exists if hasattr(self, '_extract_source_headers_and_current_titles'): # Use the new extraction method source_headers, current_titles = self._extract_source_headers_and_current_titles() self.log(f"[DEBUG] Extraction complete: {len(source_headers)} source, {len(current_titles)} current") else: self.log("⚠️ Missing _extract_source_headers_and_current_titles method!") # Batch translate headers if we have source headers translated_headers = {} if source_headers and hasattr(self, 'header_translator') and self.header_translator: # Check if translated_headers.txt already exists translations_file = os.path.join(self.output_dir, "translated_headers.txt") if os.path.exists(translations_file): # File exists - skip translation entirely self.log("📁 Found existing translated_headers.txt - skipping header translation") # No need to parse or do anything else else: # No existing file - proceed with translation self.log("🌐 Batch translating chapter headers...") try: # Check if the translator has been initialized properly if not hasattr(self.header_translator, 'client') or not self.header_translator.client: self.log("⚠️ Header translator not properly initialized, skipping batch translation") else: self.log(f"📚 Found {len(source_headers)} headers to translate") self.log(f"📚 Found {len(current_titles)} current titles in HTML files") # Debug: Show a few examples for num in list(source_headers.keys())[:3]: self.log(f" Example - Chapter {num}: {source_headers[num]}") # Translate headers with current titles info translated_headers = self.header_translator.translate_and_save_headers( html_dir=self.html_dir, headers_dict=source_headers, batch_size=getattr(self, 'headers_per_batch', 400), output_dir=self.output_dir, update_html=getattr(self, 'update_html_headers', True), save_to_file=getattr(self, 'save_header_translations', True), current_titles=current_titles # Pass current titles for exact replacement ) # Update chapter_titles_info with translations if translated_headers: self.log("\n📝 Updating chapter titles in EPUB structure...") for chapter_num, translated_title in translated_headers.items(): if chapter_num in chapter_titles_info: # Keep the original confidence and method, just update the title orig_title, confidence, method = chapter_titles_info[chapter_num] chapter_titles_info[chapter_num] = (translated_title, confidence, method) self.log(f"✓ Chapter {chapter_num}: {source_headers.get(chapter_num, 'Unknown')} → {translated_title}") else: # Add new entry if not in chapter_titles_info chapter_titles_info[chapter_num] = (translated_title, 1.0, 'batch_translation') self.log(f"✓ Added Chapter {chapter_num}: {translated_title}") except Exception as e: self.log(f"⚠️ Batch translation failed: {e}") import traceback self.log(traceback.format_exc()) # Continue with compilation even if translation fails else: if not source_headers: self.log("⚠️ No source headers found, skipping batch translation") elif not hasattr(self, 'header_translator'): self.log("⚠️ No header translator available") # Find HTML files html_files = self._find_html_files() if not html_files: raise Exception("No translated chapters found to compile into EPUB") # Load metadata metadata = self._load_metadata() # Translate metadata if configured if hasattr(self, 'metadata_translator') and self.metadata_translator: if hasattr(self, 'translate_metadata_fields') and any(self.translate_metadata_fields.values()): self.log("🌐 Translating metadata fields...") try: translated_metadata = self.metadata_translator.translate_metadata( metadata, self.translate_metadata_fields, mode=getattr(self, 'metadata_translation_mode', 'together') ) # Preserve original values for field in self.translate_metadata_fields: if field in metadata and field in translated_metadata: if metadata[field] != translated_metadata[field]: translated_metadata[f'original_{field}'] = metadata[field] metadata = translated_metadata except Exception as e: self.log(f"⚠️ Metadata translation failed: {e}") # Continue with original metadata # Create EPUB book book = self._create_book(metadata) # Process all components spine = [] toc = [] # Add CSS css_items = self._add_css_files(book) # Add fonts self._add_fonts(book) # Process images and cover processed_images, cover_file = self._process_images() # Add images to book self._add_images_to_book(book, processed_images, cover_file) # Add cover page if exists if cover_file: cover_page = self._create_cover_page(book, cover_file, processed_images, css_items, metadata) if cover_page: spine.insert(0, cover_page) # Process chapters with updated titles chapters_added = self._process_chapters( book, html_files, chapter_titles_info, css_items, processed_images, spine, toc, metadata ) if chapters_added == 0: raise Exception("No chapters could be added to the EPUB") # Add optional gallery (unless disabled) disable_gallery = os.environ.get('DISABLE_EPUB_GALLERY', '0') == '1' if disable_gallery: self.log("📷 Image gallery disabled by user preference") else: gallery_images = [img for img in processed_images.values() if img != cover_file] if gallery_images: self.log(f"📷 Creating image gallery with {len(gallery_images)} images...") gallery_page = self._create_gallery_page(book, gallery_images, css_items, metadata) spine.append(gallery_page) toc.append(gallery_page) else: self.log("📷 No images found for gallery") # Finalize book self._finalize_book(book, spine, toc, cover_file) # Write EPUB self._write_epub(book, metadata) # Show summary self._show_summary(chapter_titles_info, css_items) except Exception as e: self.log(f"❌ EPUB compilation failed: {e}") raise def _fix_encoding_issues(self, content: str) -> str: """Convert smart quotes and other Unicode punctuation to ASCII.""" # Convert smart quotes to regular quotes and other punctuation fixes = { '’': "'", # Right single quotation mark '‘': "'", # Left single quotation mark '“': '"', # Left double quotation mark '”': '"', # Right double quotation mark '—': '-', # Em dash to hyphen '–': '-', # En dash to hyphen '…': '...', # Ellipsis to three dots } for bad, good in fixes.items(): if bad in content: content = content.replace(bad, good) #self.log(f"[DEBUG] Replaced {bad!r} with {good!r}") return content def _preflight_check(self) -> bool: """Pre-flight check before compilation with progressive fallback""" # Check if we have standard files if self._has_standard_files(): # Use original strict check return self._preflight_check_strict() else: # Use progressive check for non-standard files result = self._preflight_check_progressive() return result is not None def _has_standard_files(self) -> bool: """Check if directory contains standard response_ files""" if not os.path.exists(self.base_dir): return False html_exts = ('.html', '.xhtml', '.htm') html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)] response_files = [f for f in html_files if f.startswith('response_')] return len(response_files) > 0 def _preflight_check_strict(self) -> bool: """Original strict pre-flight check - for standard files""" self.log("\n📋 Pre-flight Check") self.log("=" * 50) issues = [] if not os.path.exists(self.base_dir): issues.append(f"Directory does not exist: {self.base_dir}") return False html_files = [f for f in os.listdir(self.base_dir) if f.endswith('.html')] response_files = [f for f in html_files if f.startswith('response_')] if not html_files: issues.append("No HTML files found in directory") elif not response_files: issues.append(f"Found {len(html_files)} HTML files but none start with 'response_'") else: self.log(f"✅ Found {len(response_files)} chapter files") if not os.path.exists(self.metadata_path): self.log("⚠️ No metadata.json found (will use defaults)") else: self.log("✅ Found metadata.json") for subdir in ['css', 'images', 'fonts']: path = os.path.join(self.base_dir, subdir) if os.path.exists(path): count = len(os.listdir(path)) self.log(f"✅ Found {subdir}/ with {count} files") if issues: self.log("\n❌ Pre-flight check FAILED:") for issue in issues: self.log(f" • {issue}") return False self.log("\n✅ Pre-flight check PASSED") return True def _preflight_check_progressive(self) -> dict: """Progressive pre-flight check for non-standard files""" self.log("\n📋 Starting Progressive Pre-flight Check") self.log("=" * 50) # Critical check - always required if not os.path.exists(self.base_dir): self.log(f"❌ CRITICAL: Directory does not exist: {self.base_dir}") return None # Phase 1: Try strict mode (response_ files) - already checked in caller # Phase 2: Try relaxed mode (any HTML files) self.log("\n[Phase 2] Checking for any HTML files...") html_exts = ('.html', '.xhtml', '.htm') html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)] if html_files: self.log(f"✅ Found {len(html_files)} HTML files:") # Show first 5 files as examples for i, f in enumerate(html_files[:5]): self.log(f" • {f}") if len(html_files) > 5: self.log(f" ... and {len(html_files) - 5} more") self._check_optional_resources() self.log("\n⚠️ Pre-flight check PASSED with warnings (relaxed mode)") return {'success': True, 'mode': 'relaxed'} # Phase 3: No HTML files at all self.log("❌ No HTML files found in directory") self.log("\n[Phase 3] Checking directory contents...") all_files = os.listdir(self.base_dir) self.log(f"📁 Directory contains {len(all_files)} total files") # Look for any potential content potential_content = [f for f in all_files if not f.startswith('.')] if potential_content: self.log("⚠️ Found non-HTML files:") for i, f in enumerate(potential_content[:5]): self.log(f" • {f}") if len(potential_content) > 5: self.log(f" ... and {len(potential_content) - 5} more") self.log("\n⚠️ BYPASSING standard checks - compilation may fail!") return {'success': True, 'mode': 'bypass'} self.log("\n❌ Directory appears to be empty") return None def _check_optional_resources(self): """Check for optional resources (metadata, CSS, images, fonts)""" self.log("\n📁 Checking optional resources:") if os.path.exists(self.metadata_path): self.log("✅ Found metadata.json") else: self.log("⚠️ No metadata.json found (will use defaults)") resources_found = False for subdir in ['css', 'images', 'fonts']: path = os.path.join(self.base_dir, subdir) if os.path.exists(path): items = os.listdir(path) if items: self.log(f"✅ Found {subdir}/ with {len(items)} files") resources_found = True else: self.log(f"📁 Found {subdir}/ (empty)") if not resources_found: self.log("⚠️ No resource directories found (CSS/images/fonts)") def _analyze_chapters(self) -> Dict[int, Tuple[str, float, str]]: """Analyze chapter files and extract titles using parallel processing""" self.log("\n📖 Extracting translated titles from chapter files...") chapter_info = {} sorted_files = self._find_html_files() if not sorted_files: self.log("⚠️ No translated chapter files found!") return chapter_info self.log(f"📖 Analyzing {len(sorted_files)} translated chapter files for titles...") self.log(f"🔧 Using {self.max_workers} parallel workers") def analyze_single_file(idx_filename): """Worker function to analyze a single file""" idx, filename = idx_filename file_path = os.path.join(self.output_dir, filename) try: # Read and process file with open(file_path, 'r', encoding='utf-8') as f: raw_html_content = f.read() # Decode HTML entities import html html_content = html.unescape(raw_html_content) html_content = self._fix_encoding_issues(html_content) html_content = HTMLEntityDecoder.decode(html_content) # Extract title title, confidence = TitleExtractor.extract_from_html( html_content, idx, filename ) return idx, (title, confidence, filename) except Exception as e: return idx, (f"Chapter {idx}", 0.0, filename), str(e) # Process files in parallel using environment variable worker count with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Submit all tasks futures = { executor.submit(analyze_single_file, (idx, filename)): idx for idx, filename in enumerate(sorted_files) } # Collect results as they complete completed = 0 for future in as_completed(futures): try: result = future.result() completed += 1 if len(result) == 2: # Success idx, info = result chapter_info[idx] = info # Log progress title, confidence, filename = info indicator = "✅" if confidence > 0.7 else "🟡" if confidence > 0.4 else "🔴" self.log(f" [{completed}/{len(sorted_files)}] {indicator} Chapter {idx}: '{title}' (confidence: {confidence:.2f})") else: # Error idx, info, error = result chapter_info[idx] = info self.log(f"❌ [{completed}/{len(sorted_files)}] Error processing chapter {idx}: {error}") except Exception as e: idx = futures[future] self.log(f"❌ Failed to process chapter {idx}: {e}") chapter_info[idx] = (f"Chapter {idx}", 0.0, sorted_files[idx]) return chapter_info def _process_chapters(self, book: epub.EpubBook, html_files: List[str], chapter_titles_info: Dict[int, Tuple[str, float, str]], css_items: List[epub.EpubItem], processed_images: Dict[str, str], spine: List, toc: List, metadata: dict) -> int: """Process chapters using parallel processing with AGGRESSIVE DEBUGGING""" chapters_added = 0 self.log(f"\n{'='*80}") self.log(f"📚 STARTING CHAPTER PROCESSING") self.log(f"📚 Total files to process: {len(html_files)}") self.log(f"🔧 Using {self.max_workers} parallel workers") self.log(f"📂 Output directory: {self.output_dir}") self.log(f"{'='*80}") # Debug chapter titles info self.log(f"\n[DEBUG] Chapter titles info has {len(chapter_titles_info)} entries") for num in list(chapter_titles_info.keys())[:5]: title, conf, method = chapter_titles_info[num] self.log(f" Chapter {num}: {title[:50]}... (conf: {conf}, method: {method})") # Prepare chapter data chapter_data = [] for idx, filename in enumerate(html_files): chapter_num = idx if chapter_num not in chapter_titles_info and (chapter_num + 1) in chapter_titles_info: chapter_num = idx + 1 chapter_data.append((chapter_num, filename)) # Debug specific problem chapters if 49 <= chapter_num <= 56: self.log(f"[DEBUG] Problem chapter found: {chapter_num} -> {filename}") def process_chapter_content(data): """Worker function to process chapter content with FULL DEBUGGING""" chapter_num, filename = data path = os.path.join(self.output_dir, filename) # Debug tracking for problem chapters is_problem_chapter = 49 <= chapter_num <= 56 try: if is_problem_chapter: self.log(f"\n[DEBUG] {'*'*60}") self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {chapter_num}: {filename}") self.log(f"[DEBUG] Full path: {path}") # Check file exists if not os.path.exists(path): error_msg = f"File does not exist: {path}" self.log(f"[ERROR] {error_msg}") raise FileNotFoundError(error_msg) # Get file size file_size = os.path.getsize(path) if is_problem_chapter: self.log(f"[DEBUG] File size: {file_size} bytes") # Read and decode raw_content = self._read_and_decode_html_file(path) if is_problem_chapter: self.log(f"[DEBUG] Raw content length after reading: {len(raw_content) if raw_content else 'NULL'}") if raw_content: self.log(f"[DEBUG] First 200 chars: {raw_content[:200]}") # Fix encoding raw_content = self._fix_encoding_issues(raw_content) if is_problem_chapter: self.log(f"[DEBUG] Content length after encoding fix: {len(raw_content) if raw_content else 'NULL'}") if not raw_content or not raw_content.strip(): error_msg = f"Empty content after reading/decoding: {filename}" if is_problem_chapter: self.log(f"[ERROR] {error_msg}") raise ValueError(error_msg) # Extract main content if not filename.startswith('response_'): before_len = len(raw_content) raw_content = self._extract_main_content(raw_content, filename) if is_problem_chapter: self.log(f"[DEBUG] Content extraction: {before_len} -> {len(raw_content)} chars") # Get title title = self._get_chapter_title(chapter_num, filename, raw_content, chapter_titles_info) if is_problem_chapter: self.log(f"[DEBUG] Chapter title: {title}") # Prepare CSS links css_links = [f"css/{item.file_name.split('/')[-1]}" for item in css_items] if is_problem_chapter: self.log(f"[DEBUG] CSS links: {css_links}") # XHTML conversion - THE CRITICAL PART if is_problem_chapter: self.log(f"[DEBUG] Starting XHTML conversion...") xhtml_content = XHTMLConverter.ensure_compliance(raw_content, title, css_links) if is_problem_chapter: self.log(f"[DEBUG] XHTML content length: {len(xhtml_content) if xhtml_content else 'NULL'}") if xhtml_content: self.log(f"[DEBUG] XHTML first 300 chars: {xhtml_content[:300]}") # Process images xhtml_content = self._process_chapter_images(xhtml_content, processed_images) # Validate if is_problem_chapter: self.log(f"[DEBUG] Starting validation...") final_content = XHTMLConverter.validate(xhtml_content) if is_problem_chapter: self.log(f"[DEBUG] Final content length: {len(final_content)}") # Final XML validation try: ET.fromstring(final_content.encode('utf-8')) if is_problem_chapter: self.log(f"[DEBUG] XML validation PASSED") except ET.ParseError as e: if is_problem_chapter: self.log(f"[ERROR] XML validation FAILED: {e}") # Show the exact error location lines = final_content.split('\n') import re match = re.search(r'line (\d+), column (\d+)', str(e)) if match: line_num = int(match.group(1)) if line_num <= len(lines): self.log(f"[ERROR] Problem line {line_num}: {lines[line_num-1][:100]}") # Create fallback final_content = XHTMLConverter._build_fallback_xhtml(title) if is_problem_chapter: self.log(f"[DEBUG] Using fallback XHTML") if is_problem_chapter: self.log(f"[DEBUG] Chapter processing SUCCESSFUL") self.log(f"[DEBUG] {'*'*60}\n") return { 'num': chapter_num, 'filename': filename, 'title': title, 'content': final_content, 'success': True } except Exception as e: import traceback tb = traceback.format_exc() if is_problem_chapter: self.log(f"[ERROR] {'!'*60}") self.log(f"[ERROR] CHAPTER {chapter_num} PROCESSING FAILED") self.log(f"[ERROR] Exception type: {type(e).__name__}") self.log(f"[ERROR] Exception: {e}") self.log(f"[ERROR] Full traceback:\n{tb}") self.log(f"[ERROR] {'!'*60}\n") return { 'num': chapter_num, 'filename': filename, 'title': chapter_titles_info.get(chapter_num, (f"Chapter {chapter_num}", 0, ""))[0], 'error': str(e), 'traceback': tb, 'success': False } # Process in parallel processed_chapters = [] completed = 0 self.log(f"\n[DEBUG] Starting parallel processing...") with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = { executor.submit(process_chapter_content, data): data[0] for data in chapter_data } for future in as_completed(futures): try: result = future.result() if result: processed_chapters.append(result) completed += 1 # Extra logging for problem chapters if 49 <= result['num'] <= 56: if result['success']: self.log(f" [{completed}/{len(chapter_data)}] ✅ PROBLEM CHAPTER PROCESSED: {result['num']} - {result['title']}") else: self.log(f" [{completed}/{len(chapter_data)}] ❌ PROBLEM CHAPTER FAILED: {result['num']} - {result['filename']}") self.log(f" Error: {result['error']}") else: if result['success']: self.log(f" [{completed}/{len(chapter_data)}] ✅ Processed: {result['title']}") else: self.log(f" [{completed}/{len(chapter_data)}] ❌ Failed: {result['filename']} - {result['error']}") except Exception as e: completed += 1 chapter_num = futures[future] self.log(f" [{completed}/{len(chapter_data)}] ❌ Exception processing chapter {chapter_num}: {e}") import traceback self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}") # Sort by chapter number to maintain order processed_chapters.sort(key=lambda x: x['num']) # Debug what we have self.log(f"\n[DEBUG] Processed {len(processed_chapters)} chapters") failed_chapters = [c for c in processed_chapters if not c['success']] if failed_chapters: self.log(f"[WARNING] {len(failed_chapters)} chapters failed:") for fc in failed_chapters: self.log(f" - Chapter {fc['num']}: {fc['filename']} - {fc.get('error', 'Unknown error')}") # Add chapters to book in order (this must be sequential) self.log("\n📦 Adding chapters to EPUB structure...") for chapter_data in processed_chapters: # Debug for problem chapters if 49 <= chapter_data['num'] <= 56: self.log(f"[DEBUG] Adding problem chapter {chapter_data['num']} to EPUB...") if chapter_data['success']: try: # Create EPUB chapter import html chapter = epub.EpubHtml( title=html.unescape(chapter_data['title']), file_name=os.path.basename(chapter_data['filename']), lang=metadata.get("language", "en") ) chapter.content = FileUtils.ensure_bytes(chapter_data['content']) if self.attach_css_to_chapters: for css_item in css_items: chapter.add_item(css_item) # Add to book book.add_item(chapter) spine.append(chapter) # Include auxiliary files in spine but omit from TOC base_name = os.path.basename(chapter_data['filename']) if hasattr(self, 'auxiliary_html_files') and base_name in self.auxiliary_html_files: self.log(f" 🛈 Added auxiliary page to spine (not in TOC): {base_name}") else: toc.append(chapter) chapters_added += 1 if 49 <= chapter_data['num'] <= 56: self.log(f" ✅ ADDED PROBLEM CHAPTER {chapter_data['num']}: '{chapter_data['title']}'") else: if base_name in getattr(self, 'auxiliary_html_files', set()): self.log(f" ✅ Added auxiliary page (spine only): '{base_name}'") else: self.log(f" ✅ Added chapter {chapter_data['num']}: '{chapter_data['title']}'") except Exception as e: self.log(f" ❌ Failed to add chapter {chapter_data['num']} to book: {e}") import traceback self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}") # Add error placeholder self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata) chapters_added += 1 else: self.log(f" ⚠️ Adding error placeholder for chapter {chapter_data['num']}") # Add error placeholder self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata) chapters_added += 1 self.log(f"\n{'='*80}") self.log(f"✅ CHAPTER PROCESSING COMPLETE") self.log(f"✅ Added {chapters_added} chapters to EPUB") self.log(f"{'='*80}\n") return chapters_added def _add_error_chapter_from_data(self, book, chapter_data, spine, toc, metadata): """Helper to add an error placeholder chapter""" try: title = chapter_data.get('title', f"Chapter {chapter_data['num']}") chapter = epub.EpubHtml( title=title, file_name=f"chapter_{chapter_data['num']:03d}.xhtml", lang=metadata.get("language", "en") ) error_content = f""" {ContentProcessor.safe_escape(title)}

{ContentProcessor.safe_escape(title)}

Error loading chapter content.

File: {chapter_data.get('filename', 'unknown')}

Error: {chapter_data.get('error', 'unknown error')}

""" chapter.content = error_content.encode('utf-8') book.add_item(chapter) spine.append(chapter) toc.append(chapter) except Exception as e: self.log(f" ❌ Failed to add error placeholder: {e}") def _get_chapter_order_from_opf(self) -> Dict[str, int]: """Get chapter order from content.opf or source EPUB Returns dict mapping original_filename -> chapter_number """ # First, try to find content.opf in the current directory opf_path = os.path.join(self.output_dir, "content.opf") if os.path.exists(opf_path): self.log("✅ Found content.opf - using for chapter ordering") return self._parse_opf_file(opf_path) # If not found, try to extract from source EPUB source_epub = os.getenv('EPUB_PATH') if source_epub and os.path.exists(source_epub): self.log(f"📚 Extracting chapter order from source EPUB: {source_epub}") return self._extract_order_from_epub(source_epub) # Fallback to translation_progress.json if available progress_file = os.path.join(self.output_dir, "translation_progress.json") if os.path.exists(progress_file): self.log("📄 Using translation_progress.json for chapter order") return self._get_order_from_progress_file(progress_file) return None def _parse_opf_file(self, opf_path: str) -> Dict[str, int]: """Parse content.opf to get chapter order from spine Returns dict mapping original_filename -> chapter_number """ try: tree = ET.parse(opf_path) root = tree.getroot() # Handle namespaces ns = {'opf': 'http://www.idpf.org/2007/opf'} if root.tag.startswith('{'): # Extract default namespace default_ns = root.tag[1:root.tag.index('}')] ns = {'opf': default_ns} # Get manifest to map IDs to files manifest = {} for item in root.findall('.//opf:manifest/opf:item', ns): item_id = item.get('id') href = item.get('href') media_type = item.get('media-type', '') # Only include HTML/XHTML files if item_id and href and ('html' in media_type.lower() or href.endswith(('.html', '.xhtml', '.htm'))): # Get just the filename without path filename = os.path.basename(href) manifest[item_id] = filename # Get spine order filename_to_order = {} chapter_num = 0 # Start from 0 for array indexing spine = root.find('.//opf:spine', ns) if spine is not None: # Build dynamic skip list; allow cover when TRANSLATE_COVER_HTML is enabled skip_list = ['nav', 'toc', 'contents'] if os.environ.get('TRANSLATE_COVER_HTML', '0') != '1': skip_list.append('cover') for itemref in spine.findall('opf:itemref', ns): idref = itemref.get('idref') if idref and idref in manifest: filename = manifest[idref] # Skip navigation documents; optionally skip cover if not any(skip in filename.lower() for skip in skip_list): filename_to_order[filename] = chapter_num self.log(f" Chapter {chapter_num}: {filename}") chapter_num += 1 return filename_to_order except Exception as e: self.log(f"⚠️ Error parsing content.opf: {e}") import traceback self.log(traceback.format_exc()) return None def _extract_order_from_epub(self, epub_path: str) -> List[Tuple[int, str]]: """Extract chapter order from source EPUB file""" try: import zipfile with zipfile.ZipFile(epub_path, 'r') as zf: # Find content.opf (might be in different locations) opf_file = None for name in zf.namelist(): if name.endswith('content.opf'): opf_file = name break if not opf_file: # Try META-INF/container.xml to find content.opf try: container = zf.read('META-INF/container.xml') # Parse container.xml to find content.opf location container_tree = ET.fromstring(container) rootfile = container_tree.find('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile') if rootfile is not None: opf_file = rootfile.get('full-path') except: pass if opf_file: opf_content = zf.read(opf_file) # Save temporarily and parse temp_opf = os.path.join(self.output_dir, "temp_content.opf") with open(temp_opf, 'wb') as f: f.write(opf_content) result = self._parse_opf_file(temp_opf) # Clean up temp file if os.path.exists(temp_opf): os.remove(temp_opf) return result except Exception as e: self.log(f"⚠️ Error extracting from EPUB: {e}") return None def _find_html_files(self) -> List[str]: """Find HTML files using OPF-based ordering when available""" self.log(f"\n[DEBUG] Scanning directory: {self.output_dir}") # Get all HTML files in directory all_files = os.listdir(self.output_dir) html_extensions = ('.html', '.htm', '.xhtml') html_files = [f for f in all_files if f.lower().endswith(html_extensions)] if not html_files: self.log("[ERROR] No HTML files found!") return [] # Try to get authoritative order from OPF/EPUB opf_order = self._get_chapter_order_from_opf() if opf_order: self.log("✅ Using authoritative chapter order from OPF/EPUB") self.log(f"[DEBUG] OPF entries (first 5): {list(opf_order.items())[:5]}") # Create mapping based on core filename (strip response_ and strip ALL extensions) ordered_files = [] unmapped_files = [] def strip_all_ext(name: str) -> str: # Remove all trailing known extensions core = name while True: parts = core.rsplit('.', 1) if len(parts) == 2 and parts[1].lower() in ['html', 'htm', 'xhtml', 'xml']: core = parts[0] else: break return core for output_file in html_files: core_name = output_file[9:] if output_file.startswith('response_') else output_file core_name = strip_all_ext(core_name) matched = False for opf_name, chapter_order in opf_order.items(): opf_file = opf_name.split('/')[-1] opf_core = strip_all_ext(opf_file) if core_name == opf_core: ordered_files.append((chapter_order, output_file)) self.log(f" Mapped: {output_file} -> {opf_name} (order: {chapter_order})") matched = True break if not matched: unmapped_files.append(output_file) self.log(f" ⚠️ Could not map: {output_file} (core: {core_name})") if ordered_files: # Sort by chapter order and extract just the filenames ordered_files.sort(key=lambda x: x[0]) final_order = [f for _, f in ordered_files] # Append any unmapped files at the end if unmapped_files: self.log(f"⚠️ Adding {len(unmapped_files)} unmapped files at the end") final_order.extend(sorted(unmapped_files)) # Mark non-response unmapped files as auxiliary (omit from TOC) aux = {f for f in unmapped_files if not f.startswith('response_')} # If skipping override is enabled, do NOT treat cover.html as auxiliary if os.environ.get('TRANSLATE_COVER_HTML', '0') == '1': aux = {f for f in aux if os.path.splitext(os.path.basename(f))[0].lower() not in ['cover']} self.auxiliary_html_files = aux else: self.auxiliary_html_files = set() self.log(f"✅ Successfully ordered {len(final_order)} chapters using OPF") return final_order else: self.log("⚠️ Could not map any files using OPF order, falling back to pattern matching") # Fallback to original pattern matching logic self.log("⚠️ No OPF/EPUB found or mapping failed, using filename pattern matching") # First, try to find response_ files response_files = [f for f in html_files if f.startswith('response_')] if response_files: # Sort response_ files as primary chapters main_files = list(response_files) self.log(f"[DEBUG] Found {len(response_files)} response_ files") # Check if files have -h- pattern if any('-h-' in f for f in response_files): # Use special sorting for -h- pattern def extract_h_number(filename): match = re.search(r'-h-(\d+)', filename) if match: return int(match.group(1)) return 999999 main_files.sort(key=extract_h_number) else: # Use numeric sorting for standard response_ files def extract_number(filename): match = re.match(r'response_(\d+)_', filename) if match: return int(match.group(1)) return 0 main_files.sort(key=extract_number) # Append non-response files as auxiliary pages (not in TOC) aux_files = sorted([f for f in html_files if not f.startswith('response_')]) if aux_files: aux_set = set(aux_files) # If skipping override is enabled, ensure cover.html is not marked auxiliary if os.environ.get('TRANSLATE_COVER_HTML', '0') == '1': aux_set = {f for f in aux_set if os.path.splitext(os.path.basename(f))[0].lower() != 'cover'} self.auxiliary_html_files = aux_set self.log(f"[DEBUG] Appending {len(aux_set)} auxiliary HTML file(s) (not in TOC): {list(aux_set)[:5]}") else: self.auxiliary_html_files = set() return main_files + aux_files else: # Progressive sorting for non-standard files html_files.sort(key=self.get_robust_sort_key) # No response_ files -> treat none as auxiliary self.auxiliary_html_files = set() return html_files def _read_and_decode_html_file(self, file_path: str) -> str: """Read HTML file and decode entities, preserving < and > as text. This prevents narrative angle-bracket text from becoming bogus tags.""" with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if not content: return content import re import html # Placeholders for angle bracket entities LT_PLACEHOLDER = "\ue000" GT_PLACEHOLDER = "\ue001" # Patterns for common representations of < and > _lt_entity_patterns = [r'<', r'<', r'�*60;', r'�*3[cC];'] _gt_entity_patterns = [r'>', r'>', r'�*62;', r'�*3[eE];'] def protect_angle_entities(s: str) -> str: # Replace all forms of < and > with placeholders so unescape won't turn them into real < > for pat in _lt_entity_patterns: s = re.sub(pat, LT_PLACEHOLDER, s) for pat in _gt_entity_patterns: s = re.sub(pat, GT_PLACEHOLDER, s) return s max_iterations = 5 for _ in range(max_iterations): prev_content = content # Protect before each pass in case of double-encoded entities content = protect_angle_entities(content) # html.unescape handles all standard HTML entities (except our placeholders) content = html.unescape(content) if content == prev_content: break # Restore placeholders back to entities so they remain literal text in XHTML content = content.replace(LT_PLACEHOLDER, '<').replace(GT_PLACEHOLDER, '>') return content def _process_single_chapter(self, book: epub.EpubBook, num: int, filename: str, chapter_titles_info: Dict[int, Tuple[str, float, str]], css_items: List[epub.EpubItem], processed_images: Dict[str, str], spine: List, toc: List, metadata: dict) -> bool: """Process a single chapter with COMPREHENSIVE debugging""" path = os.path.join(self.output_dir, filename) # Flag for extra debugging on problem chapters is_problem_chapter = 49 <= num <= 56 is_response_file = filename.startswith('response_') try: if is_problem_chapter: self.log(f"\n{'='*70}") self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {num}") self.log(f"[DEBUG] Filename: {filename}") self.log(f"[DEBUG] Is response file: {is_response_file}") self.log(f"[DEBUG] Full path: {path}") # Check file exists and size if not os.path.exists(path): self.log(f"[ERROR] File does not exist: {path}") return False file_size = os.path.getsize(path) if is_problem_chapter: self.log(f"[DEBUG] File size: {file_size} bytes") if file_size == 0: self.log(f"[ERROR] File is empty (0 bytes): {filename}") return False # Read and decode if is_problem_chapter: self.log(f"[DEBUG] Reading and decoding file...") raw_content = self._read_and_decode_html_file(path) if is_problem_chapter: self.log(f"[DEBUG] Raw content length: {len(raw_content) if raw_content else 'NULL'}") if raw_content: # Show first and last parts self.log(f"[DEBUG] First 300 chars of raw content:") self.log(f" {raw_content[:300]!r}") self.log(f"[DEBUG] Last 300 chars of raw content:") self.log(f" {raw_content[-300:]!r}") # Check for common issues if '<' in raw_content[:500]: self.log(f"[DEBUG] Found < entities in content") if '>' in raw_content[:500]: self.log(f"[DEBUG] Found > entities in content") if ' {after_fix} chars") if before_fix != after_fix: self.log(f"[DEBUG] Content changed during encoding fix") if not raw_content or not raw_content.strip(): self.log(f"[WARNING] Chapter {num} is empty after decoding/encoding fix") if is_problem_chapter: self.log(f"[ERROR] Problem chapter {num} has no content!") return False # Extract main content if needed if not filename.startswith('response_'): if is_problem_chapter: self.log(f"[DEBUG] Extracting main content (not a response file)...") before_extract = len(raw_content) raw_content = self._extract_main_content(raw_content, filename) after_extract = len(raw_content) if is_problem_chapter: self.log(f"[DEBUG] Content extraction: {before_extract} -> {after_extract} chars") if after_extract < before_extract / 2: self.log(f"[WARNING] Lost more than 50% of content during extraction!") self.log(f"[DEBUG] Content after extraction (first 300 chars):") self.log(f" {raw_content[:300]!r}") else: if is_problem_chapter: self.log(f"[DEBUG] Skipping content extraction for response file") self.log(f"[DEBUG] Response file content structure:") # Check what's in a response file if '' in raw_content: self.log(f" Has tag") if '' in raw_content: self.log(f" Has tag") if ' str: """Get chapter title with fallbacks - uses position-based numbering""" title = None confidence = 0.0 # Primary source: pre-analyzed title using position-based number if num in chapter_titles_info: title, confidence, stored_filename = chapter_titles_info[num] # Re-extract if low confidence or missing if not title or confidence < 0.5: backup_title, backup_confidence = TitleExtractor.extract_from_html(content, num, filename) if backup_confidence > confidence: title = backup_title confidence = backup_confidence # Clean and validate if title: title = TitleExtractor.clean_title(title) if not TitleExtractor.is_valid_title(title): title = None # Fallback for non-standard files if not title and not filename.startswith('response_'): # Try enhanced extraction methods for web-scraped content title = self._fallback_title_extraction(content, filename, num) # Final fallback - use position-based chapter number if not title: title = f"Chapter {num}" return title def get_robust_sort_key(self, filename): """Extract chapter/sequence number using multiple patterns""" # Pattern 1: -h-NUMBER (your current pattern) match = re.search(r'-h-(\d+)', filename) if match: return (1, int(match.group(1))) # Pattern 2: chapter-NUMBER or chapter_NUMBER or chapterNUMBER match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE) if match: return (2, int(match.group(1))) # Pattern 3: ch-NUMBER or ch_NUMBER or chNUMBER match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE) if match: return (3, int(match.group(1))) # Pattern 4: response_NUMBER_ (if response_ prefix exists) if filename.startswith('response_'): match = re.match(r'response_(\d+)[-_]', filename) if match: return (4, int(match.group(1))) # Pattern 5: book_NUMBER, story_NUMBER, part_NUMBER, section_NUMBER match = re.search(r'(?:book|story|part|section)[-_\s]?(\d+)', filename, re.IGNORECASE) if match: return (5, int(match.group(1))) # Pattern 6: split_NUMBER (Calibre pattern) match = re.search(r'split_(\d+)', filename) if match: return (6, int(match.group(1))) # Pattern 7: Just NUMBER.html (like 1.html, 2.html) match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename) if match: return (7, int(match.group(1))) # Pattern 8: -NUMBER at end before extension match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename) if match: return (8, int(match.group(1))) # Pattern 9: _NUMBER at end before extension match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename) if match: return (9, int(match.group(1))) # Pattern 10: (NUMBER) in parentheses anywhere match = re.search(r'\((\d+)\)', filename) if match: return (10, int(match.group(1))) # Pattern 11: [NUMBER] in brackets anywhere match = re.search(r'\[(\d+)\]', filename) if match: return (11, int(match.group(1))) # Pattern 12: page-NUMBER or p-NUMBER or pg-NUMBER match = re.search(r'(?:page|pg?)[-_\s]?(\d+)', filename, re.IGNORECASE) if match: return (12, int(match.group(1))) # Pattern 13: Any file ending with NUMBER before extension match = re.search(r'(\d+)\.(?:html?|xhtml)$', filename) if match: return (13, int(match.group(1))) # Pattern 14: Roman numerals (I, II, III, IV, etc.) roman_pattern = r'\b(M{0,3}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3}))\b' match = re.search(roman_pattern, filename) if match: roman = match.group(1) # Convert roman to number roman_dict = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000} val = 0 for i in range(len(roman)): if i > 0 and roman_dict[roman[i]] > roman_dict[roman[i-1]]: val += roman_dict[roman[i]] - 2 * roman_dict[roman[i-1]] else: val += roman_dict[roman[i]] return (14, val) # Pattern 15: First significant number found numbers = re.findall(r'\d+', filename) if numbers: # Skip common year numbers (1900-2099) unless it's the only number significant_numbers = [int(n) for n in numbers if not (1900 <= int(n) <= 2099)] if significant_numbers: return (15, significant_numbers[0]) elif numbers: return (15, int(numbers[0])) # Final fallback: alphabetical return (99, filename) def _extract_chapter_number(self, filename: str, default_idx: int) -> int: """Extract chapter number using multiple patterns""" # FIXED: Pattern 1 - Check -h-NUMBER FIRST (YOUR FILES USE THIS!) match = re.search(r'-h-(\d+)', filename) if match: return int(match.group(1)) # Pattern 2: response_NUMBER_ (standard pattern) match = re.match(r"response_(\d+)_", filename) if match: return int(match.group(1)) # Pattern 3: chapter-NUMBER, chapter_NUMBER, chapterNUMBER match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE) if match: return int(match.group(1)) # Pattern 4: ch-NUMBER, ch_NUMBER, chNUMBER match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE) if match: return int(match.group(1)) # Pattern 5: Just NUMBER.html (like 127.html) match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename) if match: return int(match.group(1)) # Pattern 6: _NUMBER at end before extension match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename) if match: return int(match.group(1)) # Pattern 7: -NUMBER at end before extension match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename) if match: return int(match.group(1)) # Pattern 8: (NUMBER) in parentheses match = re.search(r'\((\d+)\)', filename) if match: return int(match.group(1)) # Pattern 9: [NUMBER] in brackets match = re.search(r'\[(\d+)\]', filename) if match: return int(match.group(1)) # Pattern 10: Use the sort key logic sort_key = self.get_robust_sort_key(filename) if isinstance(sort_key[1], int) and sort_key[1] > 0: return sort_key[1] # Final fallback: use position + 1 return default_idx + 1 def _extract_main_content(self, html_content: str, filename: str) -> str: """Extract main content from web-scraped HTML pages This method tries to find the actual chapter content within a full webpage """ try: # For web-scraped content, try to extract just the chapter part # Common patterns for chapter content containers content_patterns = [ # Look for specific class names commonly used for content (r']*class="[^"]*(?:chapter-content|entry-content|epcontent|post-content|content-area|main-content)[^"]*"[^>]*>(.*?)', re.DOTALL | re.IGNORECASE), # Look for article tags with content (r']*>(.*?)', re.DOTALL | re.IGNORECASE), # Look for main tags (r']*>(.*?)', re.DOTALL | re.IGNORECASE), # Look for specific id patterns (r']*id="[^"]*(?:content|chapter|post)[^"]*"[^>]*>(.*?)', re.DOTALL | re.IGNORECASE), ] for pattern, flags in content_patterns: match = re.search(pattern, html_content, flags) if match: extracted = match.group(1) # Make sure we got something substantial if len(extracted.strip()) > 100: self.log(f"📄 Extracted main content using pattern for {filename}") return extracted # If no patterns matched, check if this looks like a full webpage if ']*>(.*?)', html_content, re.DOTALL | re.IGNORECASE) if body_match: self.log(f"📄 Extracted body content for {filename}") return body_match.group(1) # If all else fails, return original content self.log(f"📄 Using original content for {filename}") return html_content except Exception as e: self.log(f"⚠️ Content extraction failed for {filename}: {e}") return html_content def _fallback_title_extraction(self, content: str, filename: str, num: int) -> Optional[str]: """Fallback title extraction for when TitleExtractor fails This handles web-scraped pages and other non-standard formats """ # Try filename-based extraction first (often more reliable for web scrapes) filename_title = self._extract_title_from_filename_fallback(filename, num) if filename_title: return filename_title # Try HTML content extraction with patterns TitleExtractor might miss html_title = self._extract_title_from_html_fallback(content, num) if html_title: return html_title return None def _extract_title_from_html_fallback(self, content: str, num: int) -> Optional[str]: """Fallback HTML title extraction for web-scraped content""" # Look for title patterns that TitleExtractor might miss # Specifically for web-scraped novel sites patterns = [ # Title tags with site separators r']*>([^|–\-]+?)(?:\s*[|–\-]\s*[^<]+)?', # Specific class patterns from novel sites r']*class="[^"]*cat-series[^"]*"[^>]*>([^<]+)', r']*class="[^"]*entry-title[^"]*"[^>]*>([^<]+)', r']*class="[^"]*chapter-title[^"]*"[^>]*>([^<]+)', # Meta property patterns r']*property="og:title"[^>]*content="([^"]+)"', ] for pattern in patterns: match = re.search(pattern, content, re.IGNORECASE) if match: title = match.group(1).strip() # Decode HTML entities title = HTMLEntityDecoder.decode(title) # Additional cleanup for web-scraped content title = re.sub(r'\s+', ' ', title) # Normalize whitespace title = title.strip() # Validate it's reasonable if 3 < len(title) < 200 and title.lower() != 'untitled': self.log(f"📝 Fallback extracted title from HTML: '{title}'") return title return None def _extract_title_from_filename_fallback(self, filename: str, num: int) -> Optional[str]: """Fallback filename title extraction""" # Remove extension base_name = re.sub(r'\.(html?|xhtml)$', '', filename, flags=re.IGNORECASE) # Web-scraped filename patterns patterns = [ # "theend-chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7" r'(?:theend|story|novel)[-_]chapter[-_](\d+)[-_](.+)', # "chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7" r'chapter[-_](\d+)[-_](.+)', # "ch127-title" -> "Chapter 127 - Title" r'ch[-_]?(\d+)[-_](.+)', # Just the title part after number r'^\d+[-_](.+)', ] for pattern in patterns: match = re.search(pattern, base_name, re.IGNORECASE) if match: if match.lastindex == 2: # Pattern with chapter number and title chapter_num = match.group(1) title_part = match.group(2) else: # Pattern with just title chapter_num = str(num) title_part = match.group(1) # Clean up the title part title_part = title_part.replace('-', ' ').replace('_', ' ') # Capitalize properly words = title_part.split() title_part = ' '.join(word.capitalize() if len(word) > 2 else word for word in words) title = f"Chapter {chapter_num} - {title_part}" self.log(f"📝 Fallback extracted title from filename: '{title}'") return title return None def _load_metadata(self) -> dict: """Load metadata from JSON file""" if os.path.exists(self.metadata_path): try: import html with open(self.metadata_path, 'r', encoding='utf-8') as f: metadata = json.load(f) self.log("[DEBUG] Metadata loaded successfully") return metadata except Exception as e: self.log(f"[WARNING] Failed to load metadata.json: {e}") else: self.log("[WARNING] metadata.json not found, using defaults") return {} def _create_book(self, metadata: dict) -> epub.EpubBook: """Create and configure EPUB book with complete metadata""" book = epub.EpubBook() # Set identifier book.set_identifier(metadata.get("identifier", f"translated-{os.path.basename(self.base_dir)}")) # Fix encoding issues in titles before using them if metadata.get('title'): metadata['title'] = self._fix_encoding_issues(metadata['title']) if metadata.get('original_title'): metadata['original_title'] = self._fix_encoding_issues(metadata['original_title']) # Determine title book_title = self._determine_book_title(metadata) book.set_title(book_title) # Set language book.set_language(metadata.get("language", "en")) # Store original title as alternative metadata (not as another dc:title) # This prevents EPUB readers from getting confused about which title to display if metadata.get('original_title') and metadata.get('original_title') != book_title: # Use 'alternative' field instead of 'title' to avoid display issues book.add_metadata('DC', 'alternative', metadata['original_title']) # Also store in a custom field for reference book.add_metadata('calibre', 'original_title', metadata['original_title']) self.log(f"[INFO] Stored original title as alternative: {metadata['original_title']}") # Set author/creator if metadata.get("creator"): book.add_author(metadata["creator"]) self.log(f"[INFO] Set author: {metadata['creator']}") # ADD DESCRIPTION - This is what Calibre looks for if metadata.get("description"): # Clean the description of any HTML entities description = HTMLEntityDecoder.decode(str(metadata["description"])) book.add_metadata('DC', 'description', description) self.log(f"[INFO] Set description: {description[:100]}..." if len(description) > 100 else f"[INFO] Set description: {description}") # Add publisher if metadata.get("publisher"): book.add_metadata('DC', 'publisher', metadata["publisher"]) self.log(f"[INFO] Set publisher: {metadata['publisher']}") # Add publication date if metadata.get("date"): book.add_metadata('DC', 'date', metadata["date"]) self.log(f"[INFO] Set date: {metadata['date']}") # Add rights/copyright if metadata.get("rights"): book.add_metadata('DC', 'rights', metadata["rights"]) self.log(f"[INFO] Set rights: {metadata['rights']}") # Add subject/genre/tags if metadata.get("subject"): if isinstance(metadata["subject"], list): for subject in metadata["subject"]: book.add_metadata('DC', 'subject', subject) self.log(f"[INFO] Added subject: {subject}") else: book.add_metadata('DC', 'subject', metadata["subject"]) self.log(f"[INFO] Set subject: {metadata['subject']}") # Add series information if available if metadata.get("series"): # Calibre uses a custom metadata field for series book.add_metadata('calibre', 'series', metadata["series"]) self.log(f"[INFO] Set series: {metadata['series']}") # Add series index if available if metadata.get("series_index"): book.add_metadata('calibre', 'series_index', str(metadata["series_index"])) self.log(f"[INFO] Set series index: {metadata['series_index']}") # Add custom metadata for translator info if metadata.get("translator"): book.add_metadata('DC', 'contributor', metadata["translator"], {'role': 'translator'}) self.log(f"[INFO] Set translator: {metadata['translator']}") # Add source information if metadata.get("source"): book.add_metadata('DC', 'source', metadata["source"]) self.log(f"[INFO] Set source: {metadata['source']}") # Add any ISBN if available if metadata.get("isbn"): book.add_metadata('DC', 'identifier', f"ISBN:{metadata['isbn']}", {'scheme': 'ISBN'}) self.log(f"[INFO] Set ISBN: {metadata['isbn']}") # Add coverage (geographic/temporal scope) if available if metadata.get("coverage"): book.add_metadata('DC', 'coverage', metadata["coverage"]) self.log(f"[INFO] Set coverage: {metadata['coverage']}") # Add any custom metadata that might be in the JSON # This handles any additional fields that might be present custom_metadata_fields = [ 'contributor', 'format', 'relation', 'type' ] for field in custom_metadata_fields: if metadata.get(field): book.add_metadata('DC', field, metadata[field]) self.log(f"[INFO] Set {field}: {metadata[field]}") return book def _determine_book_title(self, metadata: dict) -> str: """Determine the book title from metadata""" # Try translated title if metadata.get('title') and str(metadata['title']).strip(): title = str(metadata['title']).strip() self.log(f"✅ Using translated title: '{title}'") return title # Try original title if metadata.get('original_title') and str(metadata['original_title']).strip(): title = str(metadata['original_title']).strip() self.log(f"⚠️ Using original title: '{title}'") return title # Fallback to directory name title = os.path.basename(self.base_dir) self.log(f"📁 Using directory name: '{title}'") return title def _create_default_css(self) -> str: """Create default CSS for proper chapter formatting""" return """ /* Default EPUB CSS */ body { margin: 1em; padding: 0; font-family: serif; line-height: 1.6; } h1, h2, h3, h4, h5, h6 { font-weight: bold; margin-top: 1em; margin-bottom: 0.5em; page-break-after: avoid; } h1 { font-size: 1.5em; text-align: center; margin-top: 2em; margin-bottom: 2em; } p { margin: 1em 0; text-indent: 0; } img { max-width: 100%; height: auto; display: block; margin: 1em auto; } /* Prevent any overlay issues */ * { position: static !important; z-index: auto !important; } /* Remove any floating elements */ .title, [class*="title"] { position: static !important; float: none !important; background: transparent !important; } """ def _add_css_files(self, book: epub.EpubBook) -> List[epub.EpubItem]: """Add CSS files to book""" css_items = [] # First, add a default CSS to ensure proper formatting default_css = epub.EpubItem( uid="css_default", file_name="css/default.css", media_type="text/css", content=FileUtils.ensure_bytes(self._create_default_css()) ) book.add_item(default_css) css_items.append(default_css) self.log("✅ Added default CSS") # Then add user CSS files if not os.path.isdir(self.css_dir): return css_items css_files = [f for f in sorted(os.listdir(self.css_dir)) if f.endswith('.css')] self.log(f"[DEBUG] Found {len(css_files)} CSS files") for css_file in css_files: css_path = os.path.join(self.css_dir, css_file) try: import html with open(css_path, 'r', encoding='utf-8') as f: css_content = f.read() css_item = epub.EpubItem( uid=f"css_{css_file}", file_name=f"css/{css_file}", media_type="text/css", content=FileUtils.ensure_bytes(css_content) ) book.add_item(css_item) css_items.append(css_item) self.log(f"✅ Added CSS: {css_file}") except Exception as e: self.log(f"[WARNING] Failed to add CSS {css_file}: {e}") return css_items def _add_fonts(self, book: epub.EpubBook): """Add font files to book""" if not os.path.isdir(self.fonts_dir): return for font_file in os.listdir(self.fonts_dir): font_path = os.path.join(self.fonts_dir, font_file) if not os.path.isfile(font_path): continue try: mime_type = 'application/font-woff' if font_file.endswith('.ttf'): mime_type = 'font/ttf' elif font_file.endswith('.otf'): mime_type = 'font/otf' elif font_file.endswith('.woff2'): mime_type = 'font/woff2' with open(font_path, 'rb') as f: book.add_item(epub.EpubItem( uid=f"font_{font_file}", file_name=f"fonts/{font_file}", media_type=mime_type, content=f.read() )) self.log(f"✅ Added font: {font_file}") except Exception as e: self.log(f"[WARNING] Failed to add font {font_file}: {e}") def _process_images(self) -> Tuple[Dict[str, str], Optional[str]]: """Process images using parallel processing""" processed_images = {} cover_file = None try: # Find the images directory actual_images_dir = None possible_dirs = [ self.images_dir, os.path.join(self.base_dir, "images"), os.path.join(self.output_dir, "images"), ] for test_dir in possible_dirs: self.log(f"[DEBUG] Checking for images in: {test_dir}") if os.path.isdir(test_dir): files = os.listdir(test_dir) if files: self.log(f"[DEBUG] Found {len(files)} files in {test_dir}") actual_images_dir = test_dir break if not actual_images_dir: self.log("[WARNING] No images directory found or directory is empty") return processed_images, cover_file self.images_dir = actual_images_dir self.log(f"[INFO] Using images directory: {self.images_dir}") # Get list of files to process image_files = sorted(os.listdir(self.images_dir)) self.log(f"🖼️ Processing {len(image_files)} potential images with {self.max_workers} workers") def process_single_image(img): """Worker function to process a single image""" path = os.path.join(self.images_dir, img) if not os.path.isfile(path): return None # Check MIME type ctype, _ = mimetypes.guess_type(path) # If MIME type detection fails, check extension if not ctype: ext = os.path.splitext(img)[1].lower() mime_map = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp', '.svg': 'image/svg+xml' } ctype = mime_map.get(ext) if ctype and ctype.startswith("image"): safe_name = FileUtils.sanitize_filename(img, allow_unicode=False) # Ensure extension if not os.path.splitext(safe_name)[1]: ext = os.path.splitext(img)[1] if ext: safe_name += ext elif ctype == 'image/jpeg': safe_name += '.jpg' elif ctype == 'image/png': safe_name += '.png' # Special handling for SVG: rasterize to PNG fallback for reader compatibility if ctype == 'image/svg+xml' and self.rasterize_svg and self._cairosvg_available: try: from cairosvg import svg2png png_name = os.path.splitext(safe_name)[0] + '.png' png_path = os.path.join(self.images_dir, png_name) # Generate PNG only if not already present if not os.path.exists(png_path): svg2png(url=path, write_to=png_path) self.log(f" 🖼️ Rasterized SVG → PNG: {img} -> {png_name}") # Return the PNG as the image to include return (png_name, png_name, 'image/png') except Exception as e: self.log(f"[WARNING] SVG rasterization failed for {img}: {e}") # Fall back to adding the raw SVG return (img, safe_name, ctype) return (img, safe_name, ctype) else: return None # Process images in parallel with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(process_single_image, img) for img in image_files] completed = 0 for future in as_completed(futures): try: result = future.result() completed += 1 if result: original, safe, ctype = result processed_images[original] = safe self.log(f" [{completed}/{len(image_files)}] ✅ Processed: {original} -> {safe}") else: self.log(f" [{completed}/{len(image_files)}] ⏭️ Skipped non-image file") except Exception as e: completed += 1 self.log(f" [{completed}/{len(image_files)}] ❌ Failed to process image: {e}") # Find cover (sequential - quick operation) # Respect user preference to disable automatic cover creation disable_auto_cover = os.environ.get('DISABLE_AUTOMATIC_COVER_CREATION', '0') == '1' if processed_images and not disable_auto_cover: cover_prefixes = ['cover', 'front'] for original_name, safe_name in processed_images.items(): name_lower = original_name.lower() if any(name_lower.startswith(prefix) for prefix in cover_prefixes): cover_file = safe_name self.log(f"📔 Found cover image: {original_name} -> {cover_file}") break if not cover_file: cover_file = next(iter(processed_images.values())) self.log(f"📔 Using first image as cover: {cover_file}") self.log(f"✅ Processed {len(processed_images)} images successfully") except Exception as e: self.log(f"[ERROR] Error processing images: {e}") import traceback self.log(f"[DEBUG] Traceback: {traceback.format_exc()}") return processed_images, cover_file def _add_images_to_book(self, book: epub.EpubBook, processed_images: Dict[str, str], cover_file: Optional[str]): """Add images to book using parallel processing for reading files""" # Filter out cover image images_to_add = [(orig, safe) for orig, safe in processed_images.items() if safe != cover_file] if not images_to_add: self.log("No images to add (besides cover)") return self.log(f"📚 Adding {len(images_to_add)} images to EPUB with {self.max_workers} workers") def read_image_file(image_data): """Worker function to read image file""" original_name, safe_name = image_data img_path = os.path.join(self.images_dir, original_name) try: ctype, _ = mimetypes.guess_type(img_path) if not ctype: ctype = "image/jpeg" # Default fallback with open(img_path, 'rb') as f: content = f.read() return { 'original': original_name, 'safe': safe_name, 'ctype': ctype, 'content': content, 'success': True } except Exception as e: return { 'original': original_name, 'safe': safe_name, 'error': str(e), 'success': False } # Read all images in parallel image_data_list = [] with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(read_image_file, img_data) for img_data in images_to_add] completed = 0 for future in as_completed(futures): try: result = future.result() completed += 1 if result['success']: image_data_list.append(result) self.log(f" [{completed}/{len(images_to_add)}] ✅ Read: {result['original']}") else: self.log(f" [{completed}/{len(images_to_add)}] ❌ Failed: {result['original']} - {result['error']}") except Exception as e: completed += 1 self.log(f" [{completed}/{len(images_to_add)}] ❌ Exception reading image: {e}") # Add images to book sequentially (required by ebooklib) self.log("\n📦 Adding images to EPUB structure...") added = 0 for img_data in image_data_list: try: book.add_item(epub.EpubItem( uid=img_data['safe'], file_name=f"images/{img_data['safe']}", media_type=img_data['ctype'], content=img_data['content'] )) added += 1 self.log(f" ✅ Added: {img_data['original']}") except Exception as e: self.log(f" ❌ Failed to add {img_data['original']} to EPUB: {e}") self.log(f"✅ Successfully added {added}/{len(images_to_add)} images to EPUB") def _create_cover_page(self, book: epub.EpubBook, cover_file: str, processed_images: Dict[str, str], css_items: List[epub.EpubItem], metadata: dict) -> Optional[epub.EpubHtml]: """Create cover page""" # Find original filename original_cover = None for orig, safe in processed_images.items(): if safe == cover_file: original_cover = orig break if not original_cover: return None cover_path = os.path.join(self.images_dir, original_cover) try: with open(cover_path, 'rb') as f: cover_data = f.read() # Add cover image cover_img = epub.EpubItem( uid="cover-image", file_name=f"images/{cover_file}", media_type=mimetypes.guess_type(cover_path)[0] or "image/jpeg", content=cover_data ) book.add_item(cover_img) # Set cover metadata cover_img.properties = ["cover-image"] book.add_metadata('http://purl.org/dc/elements/1.1/', 'cover', 'cover-image') # Create cover page cover_page = epub.EpubHtml( title="Cover", file_name="cover.xhtml", lang=metadata.get("language", "en") ) # Build cover HTML directly without going through ensure_compliance # Since it's simple and controlled, we can build it directly cover_content = f''' Cover
Cover
''' cover_page.content = cover_content.encode('utf-8') # Associate CSS with cover page if needed if self.attach_css_to_chapters: for css_item in css_items: cover_page.add_item(css_item) book.add_item(cover_page) self.log(f"✅ Set cover image: {cover_file}") return cover_page except Exception as e: self.log(f"[WARNING] Failed to add cover: {e}") return None def _process_chapter_images(self, xhtml_content: str, processed_images: Dict[str, str]) -> str: """Process image paths and inline SVG in chapter content. - Rewrites to use images/ paths and prefers PNG fallback for SVGs. - Converts inline elements to when CairoSVG is available. """ try: soup = BeautifulSoup(xhtml_content, 'lxml') changed = False # Debug: Log what images we're looking for self.log(f"[DEBUG] Processing chapter images. Available images: {list(processed_images.keys())}") # 1) Handle tags that reference files for img in soup.find_all('img'): src = img.get('src', '') if not src: self.log(f"[WARNING] Image tag with no src attribute found") continue # Get the base filename - handle various path formats # Remove query parameters first clean_src = src.split('?')[0] basename = os.path.basename(clean_src) # Debug: Log what we're looking for self.log(f"[DEBUG] Looking for image: {basename} (from src: {src})") # Look up the safe name if basename in processed_images: safe_name = processed_images[basename] new_src = f"images/{safe_name}" if src != new_src: self.log(f"[DEBUG] Updating image src: {src} -> {new_src}") img['src'] = new_src changed = True else: # Try without extension variations name_without_ext = os.path.splitext(basename)[0] found = False for original_name, safe_name in processed_images.items(): if os.path.splitext(original_name)[0] == name_without_ext: new_src = f"images/{safe_name}" self.log(f"[DEBUG] Found image by name match: {src} -> {new_src}") img['src'] = new_src changed = True found = True break if not found: self.log(f"[WARNING] Image not found in processed_images: {basename}") # Still update the path to use images/ prefix if it doesn't have it if not src.startswith('images/'): img['src'] = f"images/{basename}" changed = True # Ensure alt attribute exists (required for XHTML) if not img.get('alt'): img['alt'] = '' changed = True # 2) Convert inline SVG wrappers that point to raster images into plain # Example: for svg_tag in soup.find_all('svg'): try: image_child = svg_tag.find('image') if image_child: href = ( image_child.get('xlink:href') or image_child.get('href') or image_child.get('{http://www.w3.org/1999/xlink}href') ) if href: clean_href = href.split('?')[0] basename = os.path.basename(clean_href) # Map to processed image name if basename in processed_images: safe_name = processed_images[basename] else: name_wo = os.path.splitext(basename)[0] safe_name = None for orig, safe in processed_images.items(): if os.path.splitext(orig)[0] == name_wo: safe_name = safe break new_src = f"images/{safe_name}" if safe_name else f"images/{basename}" new_img = soup.new_tag('img') new_img['src'] = new_src new_img['alt'] = svg_tag.get('aria-label') or svg_tag.get('title') or '' new_img['style'] = 'width:100%; height:auto; display:block;' svg_tag.replace_with(new_img) changed = True self.log(f"[DEBUG] Rewrote inline SVG to ") except Exception as e: self.log(f"[WARNING] Failed to rewrite inline SVG wrapper: {e}") # 3) Convert remaining inline (complex vector art) to PNG data URIs if possible if self.rasterize_svg and self._cairosvg_available: try: from cairosvg import svg2png import base64 for svg_tag in soup.find_all('svg'): try: svg_markup = str(svg_tag) png_bytes = svg2png(bytestring=svg_markup.encode('utf-8')) b64 = base64.b64encode(png_bytes).decode('ascii') alt_text = svg_tag.get('aria-label') or svg_tag.get('title') or '' new_img = soup.new_tag('img') new_img['src'] = f'data:image/png;base64,{b64}' new_img['alt'] = alt_text new_img['style'] = 'width:100%; height:auto; display:block;' svg_tag.replace_with(new_img) changed = True self.log("[DEBUG] Converted inline to PNG data URI") except Exception as e: self.log(f"[WARNING] Failed to rasterize inline SVG: {e}") except Exception: pass if changed: # Return the modified content return str(soup) return xhtml_content except Exception as e: self.log(f"[WARNING] Failed to process images in chapter: {e}") return xhtml_content def _create_gallery_page(self, book: epub.EpubBook, images: List[str], css_items: List[epub.EpubItem], metadata: dict) -> epub.EpubHtml: """Create image gallery page - FIXED to avoid escaping HTML tags""" gallery_page = epub.EpubHtml( title="Gallery", file_name="gallery.xhtml", lang=metadata.get("language", "en") ) # Build the gallery body content gallery_body_parts = ['

Image Gallery

'] for img in images: gallery_body_parts.append( f'
' f'{img}' f'
' ) gallery_body_content = '\n'.join(gallery_body_parts) # Build XHTML directly without going through ensure_compliance # which might escape our HTML tags css_links = [f"css/{item.file_name.split('/')[-1]}" for item in css_items] # Build the complete XHTML document manually xhtml_content = f''' Gallery''' # Add CSS links for css_link in css_links: xhtml_content += f'\n' xhtml_content += f''' {gallery_body_content} ''' # Validate the XHTML validated_content = XHTMLConverter.validate(xhtml_content) # Set the content gallery_page.content = FileUtils.ensure_bytes(validated_content) # Associate CSS with gallery page if self.attach_css_to_chapters: for css_item in css_items: gallery_page.add_item(css_item) book.add_item(gallery_page) return gallery_page def _create_nav_content(self, toc_items, book_title="Book"): """Create navigation content manually""" nav_content = ''' Table of Contents ''' return nav_content def _get_order_from_progress_file(self, progress_file: str) -> Dict[str, int]: """Get chapter order from translation_progress.json Returns dict mapping original_filename -> chapter_number """ try: with open(progress_file, 'r', encoding='utf-8') as f: progress_data = json.load(f) filename_to_order = {} # Extract chapter order from progress data chapters = progress_data.get('chapters', {}) for chapter_key, chapter_info in chapters.items(): # Get the original basename from progress data original_basename = chapter_info.get('original_basename', '') if original_basename: # Map to chapter position (key is usually the chapter number) try: chapter_num = int(chapter_key) filename_to_order[original_basename] = chapter_num - 1 # Convert to 0-based self.log(f" Progress mapping: {original_basename} -> Chapter {chapter_num}") except (ValueError, TypeError): pass return filename_to_order if filename_to_order else None except Exception as e: self.log(f"⚠️ Error reading translation_progress.json: {e}") return None def _finalize_book(self, book: epub.EpubBook, spine: List, toc: List, cover_file: Optional[str]): """Finalize book structure""" # Check if we should use NCX-only use_ncx_only = os.environ.get('FORCE_NCX_ONLY', '0') == '1' # Check if first item in spine is a cover has_cover = False cover_item = None if spine and len(spine) > 0: first_item = spine[0] if hasattr(first_item, 'title') and first_item.title == "Cover": has_cover = True cover_item = first_item spine = spine[1:] # Remove cover from spine temporarily # DEBUG: Log what we have before sorting self.log("\n[DEBUG] Before sorting TOC:") self.log("Spine order:") for idx, item in enumerate(spine): if hasattr(item, 'file_name') and hasattr(item, 'title'): self.log(f" Spine[{idx}]: {item.file_name} -> {item.title}") #self.log("\nTOC order (before sorting):") for idx, item in enumerate(toc): if hasattr(item, 'file_name') and hasattr(item, 'title'): self.log(f" TOC[{idx}]: {item.file_name} -> {item.title}") # CRITICAL FIX: Sort TOC to match spine order # Create a mapping of file_name to spine position spine_order = {} for idx, item in enumerate(spine): if hasattr(item, 'file_name'): spine_order[item.file_name] = idx # Sort the TOC based on spine order sorted_toc = [] unsorted_items = [] for toc_item in toc: if hasattr(toc_item, 'file_name'): if toc_item.file_name in spine_order: sorted_toc.append((spine_order[toc_item.file_name], toc_item)) else: # Items not in spine (like gallery) go at the end unsorted_items.append(toc_item) else: unsorted_items.append(toc_item) # Sort by spine position sorted_toc.sort(key=lambda x: x[0]) # Extract just the items (remove the sort key) final_toc = [item for _, item in sorted_toc] # Add any unsorted items at the end (like gallery) final_toc.extend(unsorted_items) # DEBUG: Log after sorting self.log("\nTOC order (after sorting to match spine):") for idx, item in enumerate(final_toc): if hasattr(item, 'file_name') and hasattr(item, 'title'): self.log(f" TOC[{idx}]: {item.file_name} -> {item.title}") # Set the sorted TOC book.toc = final_toc # Add NCX ncx = epub.EpubNcx() book.add_item(ncx) if use_ncx_only: self.log(f"[INFO] NCX-only navigation forced - {len(final_toc)} chapters") # Build final spine: Cover (if exists) → Chapters final_spine = [] if has_cover: final_spine.append(cover_item) final_spine.extend(spine) book.spine = final_spine self.log("📖 Using EPUB 3.3 with NCX navigation only") if has_cover: self.log("📖 Reading order: Cover → Chapters") else: self.log("📖 Reading order: Chapters") else: # Normal EPUB3 processing with Nav self.log(f"[INFO] EPUB3 format - {len(final_toc)} chapters") # Create Nav with manual content using SORTED TOC nav = epub.EpubNav() nav.content = self._create_nav_content(final_toc, book.title).encode('utf-8') nav.uid = 'nav' nav.file_name = 'nav.xhtml' book.add_item(nav) # Build final spine: Cover (if exists) → Nav → Chapters final_spine = [] if has_cover: final_spine.append(cover_item) final_spine.append(nav) final_spine.extend(spine) book.spine = final_spine self.log("📖 Using EPUB3 format with full navigation") if has_cover: self.log("📖 Reading order: Cover → Table of Contents → Chapters") else: self.log("📖 Reading order: Table of Contents → Chapters") def _write_epub(self, book: epub.EpubBook, metadata: dict): """Write EPUB file with automatic format selection""" # Determine output filename book_title = book.title if book_title and book_title != os.path.basename(self.output_dir): safe_filename = FileUtils.sanitize_filename(book_title, allow_unicode=True) out_path = os.path.join(self.output_dir, f"{safe_filename}.epub") else: base_name = os.path.basename(self.output_dir) out_path = os.path.join(self.output_dir, f"{base_name}.epub") self.log(f"\n[DEBUG] Writing EPUB to: {out_path}") # Always write as EPUB3 try: opts = {'epub3': True} epub.write_epub(out_path, book, opts) self.log("[SUCCESS] Written as EPUB 3.3") except Exception as e: self.log(f"[ERROR] Write failed: {e}") raise # Verify the file if os.path.exists(out_path): file_size = os.path.getsize(out_path) if file_size > 0: self.log(f"✅ EPUB created: {out_path}") self.log(f"📊 File size: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)") self.log("📝 Format: EPUB 3.3") else: raise Exception("EPUB file is empty") else: raise Exception("EPUB file was not created") def _show_summary(self, chapter_titles_info: Dict[int, Tuple[str, float, str]], css_items: List[epub.EpubItem]): """Show compilation summary""" if chapter_titles_info: high = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if conf > 0.7) medium = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if 0.4 < conf <= 0.7) low = sum(1 for _, (_, conf, _) in chapter_titles_info.items() if conf <= 0.4) self.log(f"\n📊 Title Extraction Summary:") self.log(f" • High confidence: {high} chapters") self.log(f" • Medium confidence: {medium} chapters") self.log(f" • Low confidence: {low} chapters") if css_items: self.log(f"\n✅ Successfully embedded {len(css_items)} CSS files") # Gallery status if os.environ.get('DISABLE_EPUB_GALLERY', '0') == '1': self.log("\n📷 Image Gallery: Disabled by user preference") self.log("\n📱 Compatibility Notes:") self.log(" • XHTML 1.1 compliant") self.log(" • All tags properly closed") self.log(" • Special characters escaped") self.log(" • Extracted translated titles") self.log(" • Enhanced entity decoding") # Main entry point def compile_epub(base_dir: str, log_callback: Optional[Callable] = None): """Compile translated HTML files into EPUB""" compiler = EPUBCompiler(base_dir, log_callback) compiler.compile() # Legacy alias fallback_compile_epub = compile_epub if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python epub_converter.py ") sys.exit(1) directory_path = sys.argv[1] try: compile_epub(directory_path) except Exception as e: print(f"Error: {e}") sys.exit(1)