tag (highest confidence)
title_tag = soup.find('title')
if title_tag and title_tag.string:
title_text = HTMLEntityDecoder.decode(title_tag.string.strip())
if title_text and len(title_text) > 0 and title_text.lower() not in ['untitled', 'chapter', 'document']:
candidates.append((title_text, 0.95, "title_tag"))
# Strategy 2: h1 tags (very high confidence)
h1_tags = soup.find_all('h1')
for i, h1 in enumerate(h1_tags[:3]): # Check first 3 h1 tags
text = HTMLEntityDecoder.decode(h1.get_text(strip=True))
if text and len(text) < 300:
# First h1 gets highest confidence
confidence = 0.9 if i == 0 else 0.85
candidates.append((text, confidence, f"h1_tag_{i+1}"))
# Strategy 3: h2 tags (high confidence)
h2_tags = soup.find_all('h2')
for i, h2 in enumerate(h2_tags[:3]): # Check first 3 h2 tags
text = HTMLEntityDecoder.decode(h2.get_text(strip=True))
if text and len(text) < 250:
# First h2 gets highest confidence among h2s
confidence = 0.8 if i == 0 else 0.75
candidates.append((text, confidence, f"h2_tag_{i+1}"))
# Strategy 4: h3 tags (moderate confidence)
h3_tags = soup.find_all('h3')
for i, h3 in enumerate(h3_tags[:3]): # Check first 3 h3 tags
text = HTMLEntityDecoder.decode(h3.get_text(strip=True))
if text and len(text) < 200:
confidence = 0.7 if i == 0 else 0.65
candidates.append((text, confidence, f"h3_tag_{i+1}"))
# Strategy 5: Bold text in first elements (lower confidence)
first_elements = soup.find_all(['p', 'div'])[:5]
for elem in first_elements:
for bold in elem.find_all(['b', 'strong'])[:2]: # Limit to first 2 bold items
bold_text = HTMLEntityDecoder.decode(bold.get_text(strip=True))
if bold_text and 2 <= len(bold_text) <= 150:
candidates.append((bold_text, 0.6, "bold_text"))
# Strategy 6: Center-aligned text (common for chapter titles)
center_elements = soup.find_all(['center', 'div', 'p'],
attrs={'align': 'center'}) or \
soup.find_all(['div', 'p'],
style=lambda x: x and 'text-align' in x and 'center' in x)
for center in center_elements[:3]: # Check first 3 centered elements
text = HTMLEntityDecoder.decode(center.get_text(strip=True))
if text and 2 <= len(text) <= 200:
candidates.append((text, 0.65, "centered_text"))
# Strategy 7: All-caps text (common for titles in older books)
for elem in soup.find_all(['h1', 'h2', 'h3', 'p', 'div'])[:10]:
text = elem.get_text(strip=True)
# Check if text is mostly uppercase
if text and len(text) > 2 and text.isupper():
decoded_text = HTMLEntityDecoder.decode(text)
# Keep it as-is (don't convert to title case automatically)
candidates.append((decoded_text, 0.55, "all_caps_text"))
# Strategy 8: Patterns in first paragraph
first_p = soup.find('p')
if first_p:
p_text = HTMLEntityDecoder.decode(first_p.get_text(strip=True))
# Look for "Chapter X: Title" patterns
chapter_pattern = re.match(
r'^(Chapter\s+[\dIVXLCDM]+\s*[:\-\u2013\u2014]\s*)(.{2,100})(?:\.|$)',
p_text, re.IGNORECASE
)
if chapter_pattern:
# Extract just the title part after "Chapter X:"
title_part = chapter_pattern.group(2).strip()
if title_part:
candidates.append((title_part, 0.8, "paragraph_pattern_title"))
# Also add the full "Chapter X: Title" as a lower confidence option
full_title = chapter_pattern.group(0).strip().rstrip('.')
candidates.append((full_title, 0.75, "paragraph_pattern_full"))
elif len(p_text) <= 100 and len(p_text) > 2:
# Short first paragraph might be the title
candidates.append((p_text, 0.4, "paragraph_standalone"))
# Strategy 9: Filename
if filename:
filename_match = re.search(r'response_\d+_(.+?)\.html', filename)
if filename_match:
filename_title = filename_match.group(1).replace('_', ' ').title()
if len(filename_title) > 2:
candidates.append((filename_title, 0.3, "filename"))
# Filter and rank candidates
if candidates:
unique_candidates = {}
for title, confidence, source in candidates:
# Clean the title but keep roman numerals and short titles
title = TitleExtractor.clean_title(title)
# Don't reject short titles (like "III", "IX") - they're valid!
if title and len(title) > 0:
# Don't apply is_valid_title check too strictly
# Roman numerals and chapter numbers are valid titles
if title not in unique_candidates or unique_candidates[title][1] < confidence:
unique_candidates[title] = (title, confidence, source)
if unique_candidates:
sorted_candidates = sorted(unique_candidates.values(), key=lambda x: x[1], reverse=True)
best_title, best_confidence, best_source = sorted_candidates[0]
# Log what we found for debugging
log(f"[DEBUG] Best title candidate: '{best_title}' (confidence: {best_confidence:.2f}, source: {best_source})")
return best_title, best_confidence
# Fallback - only use generic chapter number if we really found nothing
if chapter_num:
return f"Chapter {chapter_num}", 0.1
return "Untitled Chapter", 0.0
except Exception as e:
log(f"[WARNING] Error extracting title: {e}")
if chapter_num:
return f"Chapter {chapter_num}", 0.1
return "Untitled Chapter", 0.0
@staticmethod
def clean_title(title: str) -> str:
"""Clean and normalize extracted title - PRESERVE SHORT TITLES LIKE ROMAN NUMERALS"""
if not title:
return ""
# Remove any [tag] patterns first
#title = re.sub(r'\[(title|skill|ability|spell|detect|status|class|level|stat|buff|debuff|item|quest)[^\]]*?\]', '', title)
# Decode entities - PRESERVES UNICODE
title = HTMLEntityDecoder.decode(title)
# Remove HTML tags
title = re.sub(r'<[^>]+>', '', title)
# Normalize spaces
title = re.sub(r'[\xa0\u2000-\u200a\u202f\u205f\u3000]+', ' ', title)
title = re.sub(r'\s+', ' ', title).strip()
# Remove leading/trailing punctuation EXCEPT for roman numeral dots
# Don't strip trailing dots from roman numerals like "III." or "IX."
if not re.match(r'^[IVXLCDM]+\.?$', title, re.IGNORECASE):
title = re.sub(r'^[][(){}\s\-\u2013\u2014:;,.|/\\]+', '', title).strip()
title = re.sub(r'[][(){}\s\-\u2013\u2014:;,.|/\\]+$', '', title).strip()
# Remove quotes if they wrap the entire title
quote_pairs = [
('"', '"'), ("'", "'"),
('\u201c', '\u201d'), ('\u2018', '\u2019'), # Smart quotes
('«', '»'), ('‹', '›'), # Guillemets
]
for open_q, close_q in quote_pairs:
if title.startswith(open_q) and title.endswith(close_q):
title = title[len(open_q):-len(close_q)].strip()
break
# Normalize Unicode - PRESERVES READABILITY
title = unicodedata.normalize('NFC', title)
# Remove zero-width characters
title = re.sub(r'[\u200b\u200c\u200d\u200e\u200f\ufeff]', '', title)
# Final cleanup
title = ' '.join(title.split())
# Truncate if too long
if len(title) > 150:
truncated = title[:147]
last_space = truncated.rfind(' ')
if last_space > 100:
truncated = truncated[:last_space]
title = truncated + "..."
return title
@staticmethod
def is_valid_title(title: str) -> bool:
"""Check if extracted title is valid - ACCEPT SHORT TITLES LIKE ROMAN NUMERALS"""
if not title:
return False
# Accept any non-empty title after cleaning
# Don't reject roman numerals or short titles
# Only reject truly invalid patterns
invalid_patterns = [
r'^untitled$', # Just "untitled"
r'^chapter$', # Just "chapter" without a number
r'^document$', # Just "document"
]
for pattern in invalid_patterns:
if re.match(pattern, title.lower().strip()):
return False
# Skip obvious filler phrases
filler_phrases = [
'click here', 'read more', 'continue reading', 'next chapter',
'previous chapter', 'table of contents', 'back to top'
]
title_lower = title.lower().strip()
if any(phrase in title_lower for phrase in filler_phrases):
return False
# Accept everything else, including roman numerals and short titles
return True
class XHTMLConverter:
"""Handles XHTML conversion and compliance"""
@staticmethod
def ensure_compliance(html_content: str, title: str = "Chapter",
css_links: Optional[List[str]] = None) -> str:
"""Ensure HTML content is XHTML-compliant while PRESERVING story tags"""
try:
import html
import re
# Add debug at the very start
log(f"[DEBUG] Processing chapter: {title}")
log(f"[DEBUG] Input HTML length: {len(html_content)}")
# Unescape HTML entities but PRESERVE < and > so fake angle brackets in narrative
# text don't become real tags (which breaks parsing across paragraphs like the sample).
if any(ent in html_content for ent in ['&', '"', '', '<', '>']):
log(f"[DEBUG] Unescaping HTML entities (preserving < and >)")
# Temporarily protect < and > (both cases) from unescaping
placeholder_lt = '\ue000'
placeholder_gt = '\ue001'
html_content = html_content.replace('<', placeholder_lt).replace('<', placeholder_lt)
html_content = html_content.replace('>', placeholder_gt).replace('>', placeholder_gt)
# Unescape remaining entities
html_content = html.unescape(html_content)
# Restore protected angle bracket entities
html_content = html_content.replace(placeholder_lt, '<').replace(placeholder_gt, '>')
# Strip out ANY existing DOCTYPE, XML declaration, or html wrapper
# We only want the body content
log(f"[DEBUG] Extracting body content")
# Try to extract just body content
body_match = re.search(r']*>(.*?)', html_content, re.DOTALL | re.IGNORECASE)
if body_match:
html_content = body_match.group(1)
log(f"[DEBUG] Extracted body content")
else:
# No body tags, strip any DOCTYPE/html tags if present
html_content = re.sub(r'<\?xml[^>]*\?>', '', html_content)
html_content = re.sub(r']*>', '', html_content)
html_content = re.sub(r'?html[^>]*>', '', html_content)
html_content = re.sub(r']*>.*?', '', html_content, flags=re.DOTALL)
log(f"[DEBUG] Stripped wrapper tags")
# Now process the content normally
# Fix broken attributes with ="" pattern
def fix_broken_attributes_only(match):
tag_content = match.group(0)
if '=""' in tag_content and tag_content.count('=""') > 2:
tag_match = re.match(r'<(\w+)', tag_content)
if tag_match:
tag_name = tag_match.group(1)
words = re.findall(r'(\w+)=""', tag_content)
if words:
content = ' '.join(words)
return f'<{tag_name}>{content}{tag_name}>'
return ''
return tag_content
html_content = re.sub(r'<[^>]*?=""[^>]*?>', fix_broken_attributes_only, html_content)
# Sanitize attributes that contain a colon (:) but are NOT valid namespaces.
# Example:
# becomes:
def _sanitize_colon_attributes_in_tags(text: str) -> str:
# Process only inside start tags; skip closing tags, comments, doctypes, processing instructions
def _process_tag(tag_match):
tag = tag_match.group(0)
if tag.startswith('') or tag.startswith(']+>', _process_tag, text)
html_content = _sanitize_colon_attributes_in_tags(html_content)
# Convert only "story tags" whose TAG NAME contains a colon (e.g., ),
# but DO NOT touch valid HTML/SVG tags where colons appear in attributes (e.g., style="color:red" or xlink:href)
# and DO NOT touch namespaced tags like .
allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"}
def _escape_story_tag(match):
full_tag = match.group(0) # Entire <...> or
tag_name = match.group(1) # The tag name possibly containing ':'
prefix = tag_name.split(':', 1)[0].lower()
# If this is a known namespace prefix (e.g., svg:rect), leave it alone
if prefix in allowed_ns_prefixes:
return full_tag
# Otherwise, treat as a story/fake tag and replace angle brackets with Chinese brackets
return full_tag.replace('<', '《').replace('>', '》')
# Escape invalid story tags (tag names containing ':') so they render literally with angle brackets.
allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"}
def _escape_story_tag_entities(m):
tagname = m.group(1)
prefix = tagname.split(':', 1)[0].lower()
if prefix in allowed_ns_prefixes:
return m.group(0)
tag_text = m.group(0)
return tag_text.replace('<', '<').replace('>', '>')
# Apply in order: self-closing, opening, closing
html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, html_content)
html_content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, html_content)
html_content = re.sub(r'([A-Za-z][\w.-]*:[\w.-]*)\s*>', _escape_story_tag_entities, html_content)
# Parse with lxml
from lxml import html as lxml_html, etree
parser = lxml_html.HTMLParser(recover=True)
doc = lxml_html.document_fromstring(f"
{html_content}
", parser=parser)
# Get the content back
body_xhtml = etree.tostring(doc, method='xml', encoding='unicode')
# Remove the wrapper div we added
body_xhtml = re.sub(r'^
]*>|
$', '', body_xhtml)
# Optionally replace angle-bracket entities with Chinese brackets
# Default behavior: keep them as entities (< >) so the output preserves the original text
bracket_style = os.getenv('ANGLE_BRACKET_OUTPUT', 'entity').lower()
if '<' in body_xhtml or '>' in body_xhtml:
if bracket_style in ('cjk', 'chinese', 'cjk_brackets'):
body_xhtml = body_xhtml.replace('<', '《').replace('>', '》')
# else: keep as entities
# Build our own clean XHTML document
return XHTMLConverter._build_xhtml(title, body_xhtml, css_links)
except Exception as e:
log(f"[WARNING] Failed to ensure XHTML compliance: {e}")
import traceback
log(f"[DEBUG] Full traceback:\n{traceback.format_exc()}")
log(f"[DEBUG] Failed chapter title: {title}")
log(f"[DEBUG] First 500 chars of input: {html_content[:500] if html_content else 'EMPTY'}")
return XHTMLConverter._build_fallback_xhtml(title)
@staticmethod
def _build_xhtml(title: str, body_content: str, css_links: Optional[List[str]] = None) -> str:
"""Build XHTML document"""
if not body_content.strip():
body_content = '
Error processing content. Please check the source file.
'''
@staticmethod
def validate(content: str) -> str:
"""Validate and fix XHTML content - WITH DEBUGGING"""
import re
# Ensure XML declaration
if not content.strip().startswith('\n' + content
# Remove control characters
content = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', content)
# Fix unescaped ampersands
content = re.sub(
r'&(?!(?:'
r'amp|lt|gt|quot|apos|'
r'[a-zA-Z][a-zA-Z0-9]{1,31}|'
r'#[0-9]{1,7}|'
r'#x[0-9a-fA-F]{1,6}'
r');)',
'&',
content
)
# Fix unquoted attributes
try:
content = re.sub(r'<([^>]+)\s+(\w+)=([^\s"\'>]+)([>\s])', r'<\1 \2="\3"\4', content)
except re.error:
pass # Skip if regex fails
# Sanitize invalid colon-containing attribute names (preserve XML/xlink/epub/xmlns)
def _sanitize_colon_attrs_in_content(text: str) -> str:
def _process_tag(m):
tag = m.group(0)
if tag.startswith('') or tag.startswith(']+>', _process_tag, text)
content = _sanitize_colon_attrs_in_content(content)
# Escape invalid story tags so they render literally with angle brackets in output
allowed_ns_prefixes = {"svg", "math", "xlink", "xml", "xmlns", "epub"}
def _escape_story_tag_entities(m):
tagname = m.group(1)
prefix = tagname.split(':', 1)[0].lower()
if prefix in allowed_ns_prefixes:
return m.group(0)
tag_text = m.group(0)
return tag_text.replace('<', '<').replace('>', '>')
# Apply in order: self-closing, opening, closing
content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)/>', _escape_story_tag_entities, content)
content = re.sub(r'<([A-Za-z][\w.-]*:[\w.-]*)\s*([^>]*)>', _escape_story_tag_entities, content)
content = re.sub(r'([A-Za-z][\w.-]*:[\w.-]*)\s*>', _escape_story_tag_entities, content)
# Clean for XML
content = XMLValidator.clean_for_xml(content)
# Try to parse for validation
try:
ET.fromstring(content.encode('utf-8'))
except ET.ParseError as e:
log(f"[WARNING] XHTML validation failed: {e}")
# DEBUG: Show what's at the error location
import re
match = re.search(r'line (\d+), column (\d+)', str(e))
if match:
line_num = int(match.group(1))
col_num = int(match.group(2))
lines = content.split('\n')
log(f"[DEBUG] Error at line {line_num}, column {col_num}")
log(f"[DEBUG] Total lines in content: {len(lines)}")
if line_num <= len(lines):
problem_line = lines[line_num - 1]
log(f"[DEBUG] Full problem line: {problem_line!r}")
# Show the problem area
if col_num <= len(problem_line):
# Show 40 characters before and after
start = max(0, col_num - 40)
end = min(len(problem_line), col_num + 40)
log(f"[DEBUG] Context around error: {problem_line[start:end]!r}")
log(f"[DEBUG] Character at column {col_num}: {problem_line[col_num-1]!r} (U+{ord(problem_line[col_num-1]):04X})")
# Show 5 characters before and after with hex
for i in range(max(0, col_num-5), min(len(problem_line), col_num+5)):
char = problem_line[i]
marker = " <-- ERROR" if i == col_num-1 else ""
log(f"[DEBUG] Col {i+1}: {char!r} (U+{ord(char):04X}){marker}")
else:
log(f"[DEBUG] Column {col_num} is beyond line length {len(problem_line)}")
else:
log(f"[DEBUG] Line {line_num} doesn't exist (only {len(lines)} lines)")
# Show last few lines
for i in range(max(0, len(lines)-3), len(lines)):
log(f"[DEBUG] Line {i+1}: {lines[i][:100]!r}...")
# Try to recover
content = XHTMLConverter._attempt_recovery(content, e)
return content
@staticmethod
def _attempt_recovery(content: str, error: ET.ParseError) -> str:
"""Attempt to recover from XML parse errors - ENHANCED"""
try:
# Use BeautifulSoup to fix structure
soup = BeautifulSoup(content, 'lxml')
# Ensure we have proper XHTML structure
if not soup.find('html'):
new_soup = BeautifulSoup('', 'lxml')
html_tag = new_soup.html
for child in list(soup.children):
html_tag.append(child)
soup = new_soup
# Ensure we have head and body
if not soup.find('head'):
head = soup.new_tag('head')
meta = soup.new_tag('meta')
meta['http-equiv'] = 'Content-Type'
meta['content'] = 'text/html; charset=utf-8'
head.append(meta)
title_tag = soup.new_tag('title')
title_tag.string = 'Chapter'
head.append(title_tag)
if soup.html:
soup.html.insert(0, head)
if not soup.find('body'):
body = soup.new_tag('body')
if soup.html:
for child in list(soup.html.children):
if child.name not in ['head', 'body']:
body.append(child.extract())
soup.html.append(body)
# Convert back to string
recovered = str(soup)
# Ensure proper XML declaration
if not recovered.strip().startswith('\n' + recovered
# Add DOCTYPE if missing
if '')
recovered = '\n'.join(lines)
# Final validation
ET.fromstring(recovered.encode('utf-8'))
log(f"[INFO] Successfully recovered XHTML")
return recovered
except Exception as recovery_error:
log(f"[WARNING] Recovery attempt failed: {recovery_error}")
# Last resort: use fallback
return XHTMLConverter._build_fallback_xhtml("Chapter")
class FileUtils:
"""File handling utilities"""
@staticmethod
def sanitize_filename(filename: str, allow_unicode: bool = False) -> str:
"""Sanitize filename for safety"""
if allow_unicode:
filename = unicodedata.normalize('NFC', filename)
replacements = {
'/': '_', '\\': '_', ':': '_', '*': '_',
'?': '_', '"': '_', '<': '_', '>': '_',
'|': '_', '\0': '_',
}
for old, new in replacements.items():
filename = filename.replace(old, new)
filename = ''.join(char for char in filename if ord(char) >= 32 or ord(char) == 9)
else:
filename = unicodedata.normalize('NFKD', filename)
try:
filename = filename.encode('ascii', 'ignore').decode('ascii')
except:
filename = ''.join(c if ord(c) < 128 else '_' for c in filename)
replacements = {
'/': '_', '\\': '_', ':': '_', '*': '_',
'?': '_', '"': '_', '<': '_', '>': '_',
'|': '_', '\n': '_', '\r': '_', '\t': '_',
'&': '_and_', '#': '_num_', ' ': '_',
}
for old, new in replacements.items():
filename = filename.replace(old, new)
filename = ''.join(char for char in filename if ord(char) >= 32)
filename = re.sub(r'_+', '_', filename)
filename = filename.strip('_')
# Limit length
name, ext = os.path.splitext(filename)
if len(name) > 100:
name = name[:100]
if not name or name == '_':
name = 'file'
return name + ext
@staticmethod
def ensure_bytes(content) -> bytes:
"""Ensure content is bytes"""
if content is None:
return b''
if isinstance(content, bytes):
return content
if not isinstance(content, str):
content = str(content)
return content.encode('utf-8')
class EPUBCompiler:
"""Main EPUB compilation class"""
def __init__(self, base_dir: str, log_callback: Optional[Callable] = None):
self.base_dir = os.path.abspath(base_dir)
self.log_callback = log_callback
self.output_dir = self.base_dir
self.images_dir = os.path.join(self.output_dir, "images")
self.css_dir = os.path.join(self.output_dir, "css")
self.fonts_dir = os.path.join(self.output_dir, "fonts")
self.metadata_path = os.path.join(self.output_dir, "metadata.json")
self.attach_css_to_chapters = os.getenv('ATTACH_CSS_TO_CHAPTERS', '0') == '1' # Default to '0' (disabled)
self.max_workers = int(os.environ.get("EXTRACTION_WORKERS", "4"))
self.log(f"[INFO] Using {self.max_workers} workers for parallel processing")
# Track auxiliary (non-chapter) HTML files to include in spine but omit from TOC
self.auxiliary_html_files: set[str] = set()
# SVG rasterization settings
self.rasterize_svg = os.getenv('RASTERIZE_SVG_FALLBACK', '1') == '1'
try:
import cairosvg # noqa: F401
self._cairosvg_available = True
except Exception:
self._cairosvg_available = False
# Set global log callback
set_global_log_callback(log_callback)
# translation features
self.html_dir = self.output_dir # For compatibility
self.translate_titles = os.getenv('TRANSLATE_BOOK_TITLE', '1') == '1'
# Initialize API client if needed
self.api_client = None
if self.translate_titles or os.getenv('BATCH_TRANSLATE_HEADERS', '0') == '1':
model = os.getenv('MODEL')
api_key = os.getenv('API_KEY')
if model and api_key and UnifiedClient:
self.api_client = UnifiedClient(api_key=api_key, model=model, output_dir=self.output_dir)
elif model and api_key and not UnifiedClient:
self.log("Warning: UnifiedClient module not available, translation features disabled")
# Enhance with translation features
enhance_epub_compiler(self)
def log(self, message: str):
"""Log a message"""
if self.log_callback:
self.log_callback(message)
else:
print(message)
def compile(self):
"""Main compilation method"""
try:
# Debug: Check what metadata enhancement was done
self.log("[DEBUG] Checking metadata translation setup...")
self.log(f"[DEBUG] Has api_client: {hasattr(self, 'api_client') and self.api_client is not None}")
self.log(f"[DEBUG] Has metadata_translator: {hasattr(self, 'metadata_translator')}")
self.log(f"[DEBUG] Has translate_metadata_fields: {hasattr(self, 'translate_metadata_fields')}")
if hasattr(self, 'translate_metadata_fields'):
self.log(f"[DEBUG] translate_metadata_fields content: {self.translate_metadata_fields}")
enabled_fields = [k for k, v in self.translate_metadata_fields.items() if v]
self.log(f"[DEBUG] Enabled metadata fields: {enabled_fields}")
# Pre-flight check
if not self._preflight_check():
return
# Analyze chapters FIRST to get the structure
chapter_titles_info = self._analyze_chapters()
# Debug: Check if batch translation is enabled
self.log(f"[DEBUG] Batch translation enabled: {getattr(self, 'batch_translate_headers', False)}")
self.log(f"[DEBUG] Has header translator: {hasattr(self, 'header_translator')}")
self.log(f"[DEBUG] EPUB_PATH env: {os.getenv('EPUB_PATH', 'NOT SET')}")
self.log(f"[DEBUG] HTML dir: {self.html_dir}")
# Extract source headers AND current titles if batch translation is enabled
source_headers = {}
current_titles = {}
if (hasattr(self, 'batch_translate_headers') and self.batch_translate_headers and
hasattr(self, 'header_translator') and self.header_translator):
# Check if the extraction method exists
if hasattr(self, '_extract_source_headers_and_current_titles'):
# Use the new extraction method
source_headers, current_titles = self._extract_source_headers_and_current_titles()
self.log(f"[DEBUG] Extraction complete: {len(source_headers)} source, {len(current_titles)} current")
else:
self.log("⚠️ Missing _extract_source_headers_and_current_titles method!")
# Batch translate headers if we have source headers
translated_headers = {}
if source_headers and hasattr(self, 'header_translator') and self.header_translator:
# Check if translated_headers.txt already exists
translations_file = os.path.join(self.output_dir, "translated_headers.txt")
if os.path.exists(translations_file):
# File exists - skip translation entirely
self.log("📁 Found existing translated_headers.txt - skipping header translation")
# No need to parse or do anything else
else:
# No existing file - proceed with translation
self.log("🌐 Batch translating chapter headers...")
try:
# Check if the translator has been initialized properly
if not hasattr(self.header_translator, 'client') or not self.header_translator.client:
self.log("⚠️ Header translator not properly initialized, skipping batch translation")
else:
self.log(f"📚 Found {len(source_headers)} headers to translate")
self.log(f"📚 Found {len(current_titles)} current titles in HTML files")
# Debug: Show a few examples
for num in list(source_headers.keys())[:3]:
self.log(f" Example - Chapter {num}: {source_headers[num]}")
# Translate headers with current titles info
translated_headers = self.header_translator.translate_and_save_headers(
html_dir=self.html_dir,
headers_dict=source_headers,
batch_size=getattr(self, 'headers_per_batch', 400),
output_dir=self.output_dir,
update_html=getattr(self, 'update_html_headers', True),
save_to_file=getattr(self, 'save_header_translations', True),
current_titles=current_titles # Pass current titles for exact replacement
)
# Update chapter_titles_info with translations
if translated_headers:
self.log("\n📝 Updating chapter titles in EPUB structure...")
for chapter_num, translated_title in translated_headers.items():
if chapter_num in chapter_titles_info:
# Keep the original confidence and method, just update the title
orig_title, confidence, method = chapter_titles_info[chapter_num]
chapter_titles_info[chapter_num] = (translated_title, confidence, method)
self.log(f"✓ Chapter {chapter_num}: {source_headers.get(chapter_num, 'Unknown')} → {translated_title}")
else:
# Add new entry if not in chapter_titles_info
chapter_titles_info[chapter_num] = (translated_title, 1.0, 'batch_translation')
self.log(f"✓ Added Chapter {chapter_num}: {translated_title}")
except Exception as e:
self.log(f"⚠️ Batch translation failed: {e}")
import traceback
self.log(traceback.format_exc())
# Continue with compilation even if translation fails
else:
if not source_headers:
self.log("⚠️ No source headers found, skipping batch translation")
elif not hasattr(self, 'header_translator'):
self.log("⚠️ No header translator available")
# Find HTML files
html_files = self._find_html_files()
if not html_files:
raise Exception("No translated chapters found to compile into EPUB")
# Load metadata
metadata = self._load_metadata()
# Translate metadata if configured
if hasattr(self, 'metadata_translator') and self.metadata_translator:
if hasattr(self, 'translate_metadata_fields') and any(self.translate_metadata_fields.values()):
self.log("🌐 Translating metadata fields...")
try:
translated_metadata = self.metadata_translator.translate_metadata(
metadata,
self.translate_metadata_fields,
mode=getattr(self, 'metadata_translation_mode', 'together')
)
# Preserve original values
for field in self.translate_metadata_fields:
if field in metadata and field in translated_metadata:
if metadata[field] != translated_metadata[field]:
translated_metadata[f'original_{field}'] = metadata[field]
metadata = translated_metadata
except Exception as e:
self.log(f"⚠️ Metadata translation failed: {e}")
# Continue with original metadata
# Create EPUB book
book = self._create_book(metadata)
# Process all components
spine = []
toc = []
# Add CSS
css_items = self._add_css_files(book)
# Add fonts
self._add_fonts(book)
# Process images and cover
processed_images, cover_file = self._process_images()
# Add images to book
self._add_images_to_book(book, processed_images, cover_file)
# Add cover page if exists
if cover_file:
cover_page = self._create_cover_page(book, cover_file, processed_images, css_items, metadata)
if cover_page:
spine.insert(0, cover_page)
# Process chapters with updated titles
chapters_added = self._process_chapters(
book, html_files, chapter_titles_info,
css_items, processed_images, spine, toc, metadata
)
if chapters_added == 0:
raise Exception("No chapters could be added to the EPUB")
# Add optional gallery (unless disabled)
disable_gallery = os.environ.get('DISABLE_EPUB_GALLERY', '0') == '1'
if disable_gallery:
self.log("📷 Image gallery disabled by user preference")
else:
gallery_images = [img for img in processed_images.values() if img != cover_file]
if gallery_images:
self.log(f"📷 Creating image gallery with {len(gallery_images)} images...")
gallery_page = self._create_gallery_page(book, gallery_images, css_items, metadata)
spine.append(gallery_page)
toc.append(gallery_page)
else:
self.log("📷 No images found for gallery")
# Finalize book
self._finalize_book(book, spine, toc, cover_file)
# Write EPUB
self._write_epub(book, metadata)
# Show summary
self._show_summary(chapter_titles_info, css_items)
except Exception as e:
self.log(f"❌ EPUB compilation failed: {e}")
raise
def _fix_encoding_issues(self, content: str) -> str:
"""Convert smart quotes and other Unicode punctuation to ASCII."""
# Convert smart quotes to regular quotes and other punctuation
fixes = {
'’': "'", # Right single quotation mark
'‘': "'", # Left single quotation mark
'“': '"', # Left double quotation mark
'”': '"', # Right double quotation mark
'—': '-', # Em dash to hyphen
'–': '-', # En dash to hyphen
'…': '...', # Ellipsis to three dots
}
for bad, good in fixes.items():
if bad in content:
content = content.replace(bad, good)
#self.log(f"[DEBUG] Replaced {bad!r} with {good!r}")
return content
def _preflight_check(self) -> bool:
"""Pre-flight check before compilation with progressive fallback"""
# Check if we have standard files
if self._has_standard_files():
# Use original strict check
return self._preflight_check_strict()
else:
# Use progressive check for non-standard files
result = self._preflight_check_progressive()
return result is not None
def _has_standard_files(self) -> bool:
"""Check if directory contains standard response_ files"""
if not os.path.exists(self.base_dir):
return False
html_exts = ('.html', '.xhtml', '.htm')
html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)]
response_files = [f for f in html_files if f.startswith('response_')]
return len(response_files) > 0
def _preflight_check_strict(self) -> bool:
"""Original strict pre-flight check - for standard files"""
self.log("\n📋 Pre-flight Check")
self.log("=" * 50)
issues = []
if not os.path.exists(self.base_dir):
issues.append(f"Directory does not exist: {self.base_dir}")
return False
html_files = [f for f in os.listdir(self.base_dir) if f.endswith('.html')]
response_files = [f for f in html_files if f.startswith('response_')]
if not html_files:
issues.append("No HTML files found in directory")
elif not response_files:
issues.append(f"Found {len(html_files)} HTML files but none start with 'response_'")
else:
self.log(f"✅ Found {len(response_files)} chapter files")
if not os.path.exists(self.metadata_path):
self.log("⚠️ No metadata.json found (will use defaults)")
else:
self.log("✅ Found metadata.json")
for subdir in ['css', 'images', 'fonts']:
path = os.path.join(self.base_dir, subdir)
if os.path.exists(path):
count = len(os.listdir(path))
self.log(f"✅ Found {subdir}/ with {count} files")
if issues:
self.log("\n❌ Pre-flight check FAILED:")
for issue in issues:
self.log(f" • {issue}")
return False
self.log("\n✅ Pre-flight check PASSED")
return True
def _preflight_check_progressive(self) -> dict:
"""Progressive pre-flight check for non-standard files"""
self.log("\n📋 Starting Progressive Pre-flight Check")
self.log("=" * 50)
# Critical check - always required
if not os.path.exists(self.base_dir):
self.log(f"❌ CRITICAL: Directory does not exist: {self.base_dir}")
return None
# Phase 1: Try strict mode (response_ files) - already checked in caller
# Phase 2: Try relaxed mode (any HTML files)
self.log("\n[Phase 2] Checking for any HTML files...")
html_exts = ('.html', '.xhtml', '.htm')
html_files = [f for f in os.listdir(self.base_dir) if f.lower().endswith(html_exts)]
if html_files:
self.log(f"✅ Found {len(html_files)} HTML files:")
# Show first 5 files as examples
for i, f in enumerate(html_files[:5]):
self.log(f" • {f}")
if len(html_files) > 5:
self.log(f" ... and {len(html_files) - 5} more")
self._check_optional_resources()
self.log("\n⚠️ Pre-flight check PASSED with warnings (relaxed mode)")
return {'success': True, 'mode': 'relaxed'}
# Phase 3: No HTML files at all
self.log("❌ No HTML files found in directory")
self.log("\n[Phase 3] Checking directory contents...")
all_files = os.listdir(self.base_dir)
self.log(f"📁 Directory contains {len(all_files)} total files")
# Look for any potential content
potential_content = [f for f in all_files if not f.startswith('.')]
if potential_content:
self.log("⚠️ Found non-HTML files:")
for i, f in enumerate(potential_content[:5]):
self.log(f" • {f}")
if len(potential_content) > 5:
self.log(f" ... and {len(potential_content) - 5} more")
self.log("\n⚠️ BYPASSING standard checks - compilation may fail!")
return {'success': True, 'mode': 'bypass'}
self.log("\n❌ Directory appears to be empty")
return None
def _check_optional_resources(self):
"""Check for optional resources (metadata, CSS, images, fonts)"""
self.log("\n📁 Checking optional resources:")
if os.path.exists(self.metadata_path):
self.log("✅ Found metadata.json")
else:
self.log("⚠️ No metadata.json found (will use defaults)")
resources_found = False
for subdir in ['css', 'images', 'fonts']:
path = os.path.join(self.base_dir, subdir)
if os.path.exists(path):
items = os.listdir(path)
if items:
self.log(f"✅ Found {subdir}/ with {len(items)} files")
resources_found = True
else:
self.log(f"📁 Found {subdir}/ (empty)")
if not resources_found:
self.log("⚠️ No resource directories found (CSS/images/fonts)")
def _analyze_chapters(self) -> Dict[int, Tuple[str, float, str]]:
"""Analyze chapter files and extract titles using parallel processing"""
self.log("\n📖 Extracting translated titles from chapter files...")
chapter_info = {}
sorted_files = self._find_html_files()
if not sorted_files:
self.log("⚠️ No translated chapter files found!")
return chapter_info
self.log(f"📖 Analyzing {len(sorted_files)} translated chapter files for titles...")
self.log(f"🔧 Using {self.max_workers} parallel workers")
def analyze_single_file(idx_filename):
"""Worker function to analyze a single file"""
idx, filename = idx_filename
file_path = os.path.join(self.output_dir, filename)
try:
# Read and process file
with open(file_path, 'r', encoding='utf-8') as f:
raw_html_content = f.read()
# Decode HTML entities
import html
html_content = html.unescape(raw_html_content)
html_content = self._fix_encoding_issues(html_content)
html_content = HTMLEntityDecoder.decode(html_content)
# Extract title
title, confidence = TitleExtractor.extract_from_html(
html_content, idx, filename
)
return idx, (title, confidence, filename)
except Exception as e:
return idx, (f"Chapter {idx}", 0.0, filename), str(e)
# Process files in parallel using environment variable worker count
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
futures = {
executor.submit(analyze_single_file, (idx, filename)): idx
for idx, filename in enumerate(sorted_files)
}
# Collect results as they complete
completed = 0
for future in as_completed(futures):
try:
result = future.result()
completed += 1
if len(result) == 2: # Success
idx, info = result
chapter_info[idx] = info
# Log progress
title, confidence, filename = info
indicator = "✅" if confidence > 0.7 else "🟡" if confidence > 0.4 else "🔴"
self.log(f" [{completed}/{len(sorted_files)}] {indicator} Chapter {idx}: '{title}' (confidence: {confidence:.2f})")
else: # Error
idx, info, error = result
chapter_info[idx] = info
self.log(f"❌ [{completed}/{len(sorted_files)}] Error processing chapter {idx}: {error}")
except Exception as e:
idx = futures[future]
self.log(f"❌ Failed to process chapter {idx}: {e}")
chapter_info[idx] = (f"Chapter {idx}", 0.0, sorted_files[idx])
return chapter_info
def _process_chapters(self, book: epub.EpubBook, html_files: List[str],
chapter_titles_info: Dict[int, Tuple[str, float, str]],
css_items: List[epub.EpubItem], processed_images: Dict[str, str],
spine: List, toc: List, metadata: dict) -> int:
"""Process chapters using parallel processing with AGGRESSIVE DEBUGGING"""
chapters_added = 0
self.log(f"\n{'='*80}")
self.log(f"📚 STARTING CHAPTER PROCESSING")
self.log(f"📚 Total files to process: {len(html_files)}")
self.log(f"🔧 Using {self.max_workers} parallel workers")
self.log(f"📂 Output directory: {self.output_dir}")
self.log(f"{'='*80}")
# Debug chapter titles info
self.log(f"\n[DEBUG] Chapter titles info has {len(chapter_titles_info)} entries")
for num in list(chapter_titles_info.keys())[:5]:
title, conf, method = chapter_titles_info[num]
self.log(f" Chapter {num}: {title[:50]}... (conf: {conf}, method: {method})")
# Prepare chapter data
chapter_data = []
for idx, filename in enumerate(html_files):
chapter_num = idx
if chapter_num not in chapter_titles_info and (chapter_num + 1) in chapter_titles_info:
chapter_num = idx + 1
chapter_data.append((chapter_num, filename))
# Debug specific problem chapters
if 49 <= chapter_num <= 56:
self.log(f"[DEBUG] Problem chapter found: {chapter_num} -> {filename}")
def process_chapter_content(data):
"""Worker function to process chapter content with FULL DEBUGGING"""
chapter_num, filename = data
path = os.path.join(self.output_dir, filename)
# Debug tracking for problem chapters
is_problem_chapter = 49 <= chapter_num <= 56
try:
if is_problem_chapter:
self.log(f"\n[DEBUG] {'*'*60}")
self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {chapter_num}: {filename}")
self.log(f"[DEBUG] Full path: {path}")
# Check file exists
if not os.path.exists(path):
error_msg = f"File does not exist: {path}"
self.log(f"[ERROR] {error_msg}")
raise FileNotFoundError(error_msg)
# Get file size
file_size = os.path.getsize(path)
if is_problem_chapter:
self.log(f"[DEBUG] File size: {file_size} bytes")
# Read and decode
raw_content = self._read_and_decode_html_file(path)
if is_problem_chapter:
self.log(f"[DEBUG] Raw content length after reading: {len(raw_content) if raw_content else 'NULL'}")
if raw_content:
self.log(f"[DEBUG] First 200 chars: {raw_content[:200]}")
# Fix encoding
raw_content = self._fix_encoding_issues(raw_content)
if is_problem_chapter:
self.log(f"[DEBUG] Content length after encoding fix: {len(raw_content) if raw_content else 'NULL'}")
if not raw_content or not raw_content.strip():
error_msg = f"Empty content after reading/decoding: {filename}"
if is_problem_chapter:
self.log(f"[ERROR] {error_msg}")
raise ValueError(error_msg)
# Extract main content
if not filename.startswith('response_'):
before_len = len(raw_content)
raw_content = self._extract_main_content(raw_content, filename)
if is_problem_chapter:
self.log(f"[DEBUG] Content extraction: {before_len} -> {len(raw_content)} chars")
# Get title
title = self._get_chapter_title(chapter_num, filename, raw_content, chapter_titles_info)
if is_problem_chapter:
self.log(f"[DEBUG] Chapter title: {title}")
# Prepare CSS links
css_links = [f"css/{item.file_name.split('/')[-1]}" for item in css_items]
if is_problem_chapter:
self.log(f"[DEBUG] CSS links: {css_links}")
# XHTML conversion - THE CRITICAL PART
if is_problem_chapter:
self.log(f"[DEBUG] Starting XHTML conversion...")
xhtml_content = XHTMLConverter.ensure_compliance(raw_content, title, css_links)
if is_problem_chapter:
self.log(f"[DEBUG] XHTML content length: {len(xhtml_content) if xhtml_content else 'NULL'}")
if xhtml_content:
self.log(f"[DEBUG] XHTML first 300 chars: {xhtml_content[:300]}")
# Process images
xhtml_content = self._process_chapter_images(xhtml_content, processed_images)
# Validate
if is_problem_chapter:
self.log(f"[DEBUG] Starting validation...")
final_content = XHTMLConverter.validate(xhtml_content)
if is_problem_chapter:
self.log(f"[DEBUG] Final content length: {len(final_content)}")
# Final XML validation
try:
ET.fromstring(final_content.encode('utf-8'))
if is_problem_chapter:
self.log(f"[DEBUG] XML validation PASSED")
except ET.ParseError as e:
if is_problem_chapter:
self.log(f"[ERROR] XML validation FAILED: {e}")
# Show the exact error location
lines = final_content.split('\n')
import re
match = re.search(r'line (\d+), column (\d+)', str(e))
if match:
line_num = int(match.group(1))
if line_num <= len(lines):
self.log(f"[ERROR] Problem line {line_num}: {lines[line_num-1][:100]}")
# Create fallback
final_content = XHTMLConverter._build_fallback_xhtml(title)
if is_problem_chapter:
self.log(f"[DEBUG] Using fallback XHTML")
if is_problem_chapter:
self.log(f"[DEBUG] Chapter processing SUCCESSFUL")
self.log(f"[DEBUG] {'*'*60}\n")
return {
'num': chapter_num,
'filename': filename,
'title': title,
'content': final_content,
'success': True
}
except Exception as e:
import traceback
tb = traceback.format_exc()
if is_problem_chapter:
self.log(f"[ERROR] {'!'*60}")
self.log(f"[ERROR] CHAPTER {chapter_num} PROCESSING FAILED")
self.log(f"[ERROR] Exception type: {type(e).__name__}")
self.log(f"[ERROR] Exception: {e}")
self.log(f"[ERROR] Full traceback:\n{tb}")
self.log(f"[ERROR] {'!'*60}\n")
return {
'num': chapter_num,
'filename': filename,
'title': chapter_titles_info.get(chapter_num, (f"Chapter {chapter_num}", 0, ""))[0],
'error': str(e),
'traceback': tb,
'success': False
}
# Process in parallel
processed_chapters = []
completed = 0
self.log(f"\n[DEBUG] Starting parallel processing...")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(process_chapter_content, data): data[0]
for data in chapter_data
}
for future in as_completed(futures):
try:
result = future.result()
if result:
processed_chapters.append(result)
completed += 1
# Extra logging for problem chapters
if 49 <= result['num'] <= 56:
if result['success']:
self.log(f" [{completed}/{len(chapter_data)}] ✅ PROBLEM CHAPTER PROCESSED: {result['num']} - {result['title']}")
else:
self.log(f" [{completed}/{len(chapter_data)}] ❌ PROBLEM CHAPTER FAILED: {result['num']} - {result['filename']}")
self.log(f" Error: {result['error']}")
else:
if result['success']:
self.log(f" [{completed}/{len(chapter_data)}] ✅ Processed: {result['title']}")
else:
self.log(f" [{completed}/{len(chapter_data)}] ❌ Failed: {result['filename']} - {result['error']}")
except Exception as e:
completed += 1
chapter_num = futures[future]
self.log(f" [{completed}/{len(chapter_data)}] ❌ Exception processing chapter {chapter_num}: {e}")
import traceback
self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}")
# Sort by chapter number to maintain order
processed_chapters.sort(key=lambda x: x['num'])
# Debug what we have
self.log(f"\n[DEBUG] Processed {len(processed_chapters)} chapters")
failed_chapters = [c for c in processed_chapters if not c['success']]
if failed_chapters:
self.log(f"[WARNING] {len(failed_chapters)} chapters failed:")
for fc in failed_chapters:
self.log(f" - Chapter {fc['num']}: {fc['filename']} - {fc.get('error', 'Unknown error')}")
# Add chapters to book in order (this must be sequential)
self.log("\n📦 Adding chapters to EPUB structure...")
for chapter_data in processed_chapters:
# Debug for problem chapters
if 49 <= chapter_data['num'] <= 56:
self.log(f"[DEBUG] Adding problem chapter {chapter_data['num']} to EPUB...")
if chapter_data['success']:
try:
# Create EPUB chapter
import html
chapter = epub.EpubHtml(
title=html.unescape(chapter_data['title']),
file_name=os.path.basename(chapter_data['filename']),
lang=metadata.get("language", "en")
)
chapter.content = FileUtils.ensure_bytes(chapter_data['content'])
if self.attach_css_to_chapters:
for css_item in css_items:
chapter.add_item(css_item)
# Add to book
book.add_item(chapter)
spine.append(chapter)
# Include auxiliary files in spine but omit from TOC
base_name = os.path.basename(chapter_data['filename'])
if hasattr(self, 'auxiliary_html_files') and base_name in self.auxiliary_html_files:
self.log(f" 🛈 Added auxiliary page to spine (not in TOC): {base_name}")
else:
toc.append(chapter)
chapters_added += 1
if 49 <= chapter_data['num'] <= 56:
self.log(f" ✅ ADDED PROBLEM CHAPTER {chapter_data['num']}: '{chapter_data['title']}'")
else:
if base_name in getattr(self, 'auxiliary_html_files', set()):
self.log(f" ✅ Added auxiliary page (spine only): '{base_name}'")
else:
self.log(f" ✅ Added chapter {chapter_data['num']}: '{chapter_data['title']}'")
except Exception as e:
self.log(f" ❌ Failed to add chapter {chapter_data['num']} to book: {e}")
import traceback
self.log(f"[ERROR] Traceback:\n{traceback.format_exc()}")
# Add error placeholder
self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata)
chapters_added += 1
else:
self.log(f" ⚠️ Adding error placeholder for chapter {chapter_data['num']}")
# Add error placeholder
self._add_error_chapter_from_data(book, chapter_data, spine, toc, metadata)
chapters_added += 1
self.log(f"\n{'='*80}")
self.log(f"✅ CHAPTER PROCESSING COMPLETE")
self.log(f"✅ Added {chapters_added} chapters to EPUB")
self.log(f"{'='*80}\n")
return chapters_added
def _add_error_chapter_from_data(self, book, chapter_data, spine, toc, metadata):
"""Helper to add an error placeholder chapter"""
try:
title = chapter_data.get('title', f"Chapter {chapter_data['num']}")
chapter = epub.EpubHtml(
title=title,
file_name=f"chapter_{chapter_data['num']:03d}.xhtml",
lang=metadata.get("language", "en")
)
error_content = f"""
{ContentProcessor.safe_escape(title)}
"""
chapter.content = error_content.encode('utf-8')
book.add_item(chapter)
spine.append(chapter)
toc.append(chapter)
except Exception as e:
self.log(f" ❌ Failed to add error placeholder: {e}")
def _get_chapter_order_from_opf(self) -> Dict[str, int]:
"""Get chapter order from content.opf or source EPUB
Returns dict mapping original_filename -> chapter_number
"""
# First, try to find content.opf in the current directory
opf_path = os.path.join(self.output_dir, "content.opf")
if os.path.exists(opf_path):
self.log("✅ Found content.opf - using for chapter ordering")
return self._parse_opf_file(opf_path)
# If not found, try to extract from source EPUB
source_epub = os.getenv('EPUB_PATH')
if source_epub and os.path.exists(source_epub):
self.log(f"📚 Extracting chapter order from source EPUB: {source_epub}")
return self._extract_order_from_epub(source_epub)
# Fallback to translation_progress.json if available
progress_file = os.path.join(self.output_dir, "translation_progress.json")
if os.path.exists(progress_file):
self.log("📄 Using translation_progress.json for chapter order")
return self._get_order_from_progress_file(progress_file)
return None
def _parse_opf_file(self, opf_path: str) -> Dict[str, int]:
"""Parse content.opf to get chapter order from spine
Returns dict mapping original_filename -> chapter_number
"""
try:
tree = ET.parse(opf_path)
root = tree.getroot()
# Handle namespaces
ns = {'opf': 'http://www.idpf.org/2007/opf'}
if root.tag.startswith('{'):
# Extract default namespace
default_ns = root.tag[1:root.tag.index('}')]
ns = {'opf': default_ns}
# Get manifest to map IDs to files
manifest = {}
for item in root.findall('.//opf:manifest/opf:item', ns):
item_id = item.get('id')
href = item.get('href')
media_type = item.get('media-type', '')
# Only include HTML/XHTML files
if item_id and href and ('html' in media_type.lower() or href.endswith(('.html', '.xhtml', '.htm'))):
# Get just the filename without path
filename = os.path.basename(href)
manifest[item_id] = filename
# Get spine order
filename_to_order = {}
chapter_num = 0 # Start from 0 for array indexing
spine = root.find('.//opf:spine', ns)
if spine is not None:
# Build dynamic skip list; allow cover when TRANSLATE_COVER_HTML is enabled
skip_list = ['nav', 'toc', 'contents']
if os.environ.get('TRANSLATE_COVER_HTML', '0') != '1':
skip_list.append('cover')
for itemref in spine.findall('opf:itemref', ns):
idref = itemref.get('idref')
if idref and idref in manifest:
filename = manifest[idref]
# Skip navigation documents; optionally skip cover
if not any(skip in filename.lower() for skip in skip_list):
filename_to_order[filename] = chapter_num
self.log(f" Chapter {chapter_num}: {filename}")
chapter_num += 1
return filename_to_order
except Exception as e:
self.log(f"⚠️ Error parsing content.opf: {e}")
import traceback
self.log(traceback.format_exc())
return None
def _extract_order_from_epub(self, epub_path: str) -> List[Tuple[int, str]]:
"""Extract chapter order from source EPUB file"""
try:
import zipfile
with zipfile.ZipFile(epub_path, 'r') as zf:
# Find content.opf (might be in different locations)
opf_file = None
for name in zf.namelist():
if name.endswith('content.opf'):
opf_file = name
break
if not opf_file:
# Try META-INF/container.xml to find content.opf
try:
container = zf.read('META-INF/container.xml')
# Parse container.xml to find content.opf location
container_tree = ET.fromstring(container)
rootfile = container_tree.find('.//{urn:oasis:names:tc:opendocument:xmlns:container}rootfile')
if rootfile is not None:
opf_file = rootfile.get('full-path')
except:
pass
if opf_file:
opf_content = zf.read(opf_file)
# Save temporarily and parse
temp_opf = os.path.join(self.output_dir, "temp_content.opf")
with open(temp_opf, 'wb') as f:
f.write(opf_content)
result = self._parse_opf_file(temp_opf)
# Clean up temp file
if os.path.exists(temp_opf):
os.remove(temp_opf)
return result
except Exception as e:
self.log(f"⚠️ Error extracting from EPUB: {e}")
return None
def _find_html_files(self) -> List[str]:
"""Find HTML files using OPF-based ordering when available"""
self.log(f"\n[DEBUG] Scanning directory: {self.output_dir}")
# Get all HTML files in directory
all_files = os.listdir(self.output_dir)
html_extensions = ('.html', '.htm', '.xhtml')
html_files = [f for f in all_files if f.lower().endswith(html_extensions)]
if not html_files:
self.log("[ERROR] No HTML files found!")
return []
# Try to get authoritative order from OPF/EPUB
opf_order = self._get_chapter_order_from_opf()
if opf_order:
self.log("✅ Using authoritative chapter order from OPF/EPUB")
self.log(f"[DEBUG] OPF entries (first 5): {list(opf_order.items())[:5]}")
# Create mapping based on core filename (strip response_ and strip ALL extensions)
ordered_files = []
unmapped_files = []
def strip_all_ext(name: str) -> str:
# Remove all trailing known extensions
core = name
while True:
parts = core.rsplit('.', 1)
if len(parts) == 2 and parts[1].lower() in ['html', 'htm', 'xhtml', 'xml']:
core = parts[0]
else:
break
return core
for output_file in html_files:
core_name = output_file[9:] if output_file.startswith('response_') else output_file
core_name = strip_all_ext(core_name)
matched = False
for opf_name, chapter_order in opf_order.items():
opf_file = opf_name.split('/')[-1]
opf_core = strip_all_ext(opf_file)
if core_name == opf_core:
ordered_files.append((chapter_order, output_file))
self.log(f" Mapped: {output_file} -> {opf_name} (order: {chapter_order})")
matched = True
break
if not matched:
unmapped_files.append(output_file)
self.log(f" ⚠️ Could not map: {output_file} (core: {core_name})")
if ordered_files:
# Sort by chapter order and extract just the filenames
ordered_files.sort(key=lambda x: x[0])
final_order = [f for _, f in ordered_files]
# Append any unmapped files at the end
if unmapped_files:
self.log(f"⚠️ Adding {len(unmapped_files)} unmapped files at the end")
final_order.extend(sorted(unmapped_files))
# Mark non-response unmapped files as auxiliary (omit from TOC)
aux = {f for f in unmapped_files if not f.startswith('response_')}
# If skipping override is enabled, do NOT treat cover.html as auxiliary
if os.environ.get('TRANSLATE_COVER_HTML', '0') == '1':
aux = {f for f in aux if os.path.splitext(os.path.basename(f))[0].lower() not in ['cover']}
self.auxiliary_html_files = aux
else:
self.auxiliary_html_files = set()
self.log(f"✅ Successfully ordered {len(final_order)} chapters using OPF")
return final_order
else:
self.log("⚠️ Could not map any files using OPF order, falling back to pattern matching")
# Fallback to original pattern matching logic
self.log("⚠️ No OPF/EPUB found or mapping failed, using filename pattern matching")
# First, try to find response_ files
response_files = [f for f in html_files if f.startswith('response_')]
if response_files:
# Sort response_ files as primary chapters
main_files = list(response_files)
self.log(f"[DEBUG] Found {len(response_files)} response_ files")
# Check if files have -h- pattern
if any('-h-' in f for f in response_files):
# Use special sorting for -h- pattern
def extract_h_number(filename):
match = re.search(r'-h-(\d+)', filename)
if match:
return int(match.group(1))
return 999999
main_files.sort(key=extract_h_number)
else:
# Use numeric sorting for standard response_ files
def extract_number(filename):
match = re.match(r'response_(\d+)_', filename)
if match:
return int(match.group(1))
return 0
main_files.sort(key=extract_number)
# Append non-response files as auxiliary pages (not in TOC)
aux_files = sorted([f for f in html_files if not f.startswith('response_')])
if aux_files:
aux_set = set(aux_files)
# If skipping override is enabled, ensure cover.html is not marked auxiliary
if os.environ.get('TRANSLATE_COVER_HTML', '0') == '1':
aux_set = {f for f in aux_set if os.path.splitext(os.path.basename(f))[0].lower() != 'cover'}
self.auxiliary_html_files = aux_set
self.log(f"[DEBUG] Appending {len(aux_set)} auxiliary HTML file(s) (not in TOC): {list(aux_set)[:5]}")
else:
self.auxiliary_html_files = set()
return main_files + aux_files
else:
# Progressive sorting for non-standard files
html_files.sort(key=self.get_robust_sort_key)
# No response_ files -> treat none as auxiliary
self.auxiliary_html_files = set()
return html_files
def _read_and_decode_html_file(self, file_path: str) -> str:
"""Read HTML file and decode entities, preserving < and > as text.
This prevents narrative angle-bracket text from becoming bogus tags."""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content:
return content
import re
import html
# Placeholders for angle bracket entities
LT_PLACEHOLDER = "\ue000"
GT_PLACEHOLDER = "\ue001"
# Patterns for common representations of < and >
_lt_entity_patterns = [r'<', r'<', r'*60;', r'*3[cC];']
_gt_entity_patterns = [r'>', r'>', r'*62;', r'*3[eE];']
def protect_angle_entities(s: str) -> str:
# Replace all forms of < and > with placeholders so unescape won't turn them into real < >
for pat in _lt_entity_patterns:
s = re.sub(pat, LT_PLACEHOLDER, s)
for pat in _gt_entity_patterns:
s = re.sub(pat, GT_PLACEHOLDER, s)
return s
max_iterations = 5
for _ in range(max_iterations):
prev_content = content
# Protect before each pass in case of double-encoded entities
content = protect_angle_entities(content)
# html.unescape handles all standard HTML entities (except our placeholders)
content = html.unescape(content)
if content == prev_content:
break
# Restore placeholders back to entities so they remain literal text in XHTML
content = content.replace(LT_PLACEHOLDER, '<').replace(GT_PLACEHOLDER, '>')
return content
def _process_single_chapter(self, book: epub.EpubBook, num: int, filename: str,
chapter_titles_info: Dict[int, Tuple[str, float, str]],
css_items: List[epub.EpubItem], processed_images: Dict[str, str],
spine: List, toc: List, metadata: dict) -> bool:
"""Process a single chapter with COMPREHENSIVE debugging"""
path = os.path.join(self.output_dir, filename)
# Flag for extra debugging on problem chapters
is_problem_chapter = 49 <= num <= 56
is_response_file = filename.startswith('response_')
try:
if is_problem_chapter:
self.log(f"\n{'='*70}")
self.log(f"[DEBUG] PROCESSING PROBLEM CHAPTER {num}")
self.log(f"[DEBUG] Filename: {filename}")
self.log(f"[DEBUG] Is response file: {is_response_file}")
self.log(f"[DEBUG] Full path: {path}")
# Check file exists and size
if not os.path.exists(path):
self.log(f"[ERROR] File does not exist: {path}")
return False
file_size = os.path.getsize(path)
if is_problem_chapter:
self.log(f"[DEBUG] File size: {file_size} bytes")
if file_size == 0:
self.log(f"[ERROR] File is empty (0 bytes): {filename}")
return False
# Read and decode
if is_problem_chapter:
self.log(f"[DEBUG] Reading and decoding file...")
raw_content = self._read_and_decode_html_file(path)
if is_problem_chapter:
self.log(f"[DEBUG] Raw content length: {len(raw_content) if raw_content else 'NULL'}")
if raw_content:
# Show first and last parts
self.log(f"[DEBUG] First 300 chars of raw content:")
self.log(f" {raw_content[:300]!r}")
self.log(f"[DEBUG] Last 300 chars of raw content:")
self.log(f" {raw_content[-300:]!r}")
# Check for common issues
if '<' in raw_content[:500]:
self.log(f"[DEBUG] Found < entities in content")
if '>' in raw_content[:500]:
self.log(f"[DEBUG] Found > entities in content")
if ' {after_fix} chars")
if before_fix != after_fix:
self.log(f"[DEBUG] Content changed during encoding fix")
if not raw_content or not raw_content.strip():
self.log(f"[WARNING] Chapter {num} is empty after decoding/encoding fix")
if is_problem_chapter:
self.log(f"[ERROR] Problem chapter {num} has no content!")
return False
# Extract main content if needed
if not filename.startswith('response_'):
if is_problem_chapter:
self.log(f"[DEBUG] Extracting main content (not a response file)...")
before_extract = len(raw_content)
raw_content = self._extract_main_content(raw_content, filename)
after_extract = len(raw_content)
if is_problem_chapter:
self.log(f"[DEBUG] Content extraction: {before_extract} -> {after_extract} chars")
if after_extract < before_extract / 2:
self.log(f"[WARNING] Lost more than 50% of content during extraction!")
self.log(f"[DEBUG] Content after extraction (first 300 chars):")
self.log(f" {raw_content[:300]!r}")
else:
if is_problem_chapter:
self.log(f"[DEBUG] Skipping content extraction for response file")
self.log(f"[DEBUG] Response file content structure:")
# Check what's in a response file
if '' in raw_content:
self.log(f" Has tag")
if '' in raw_content:
self.log(f" Has tag")
if ' str:
"""Get chapter title with fallbacks - uses position-based numbering"""
title = None
confidence = 0.0
# Primary source: pre-analyzed title using position-based number
if num in chapter_titles_info:
title, confidence, stored_filename = chapter_titles_info[num]
# Re-extract if low confidence or missing
if not title or confidence < 0.5:
backup_title, backup_confidence = TitleExtractor.extract_from_html(content, num, filename)
if backup_confidence > confidence:
title = backup_title
confidence = backup_confidence
# Clean and validate
if title:
title = TitleExtractor.clean_title(title)
if not TitleExtractor.is_valid_title(title):
title = None
# Fallback for non-standard files
if not title and not filename.startswith('response_'):
# Try enhanced extraction methods for web-scraped content
title = self._fallback_title_extraction(content, filename, num)
# Final fallback - use position-based chapter number
if not title:
title = f"Chapter {num}"
return title
def get_robust_sort_key(self, filename):
"""Extract chapter/sequence number using multiple patterns"""
# Pattern 1: -h-NUMBER (your current pattern)
match = re.search(r'-h-(\d+)', filename)
if match:
return (1, int(match.group(1)))
# Pattern 2: chapter-NUMBER or chapter_NUMBER or chapterNUMBER
match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return (2, int(match.group(1)))
# Pattern 3: ch-NUMBER or ch_NUMBER or chNUMBER
match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE)
if match:
return (3, int(match.group(1)))
# Pattern 4: response_NUMBER_ (if response_ prefix exists)
if filename.startswith('response_'):
match = re.match(r'response_(\d+)[-_]', filename)
if match:
return (4, int(match.group(1)))
# Pattern 5: book_NUMBER, story_NUMBER, part_NUMBER, section_NUMBER
match = re.search(r'(?:book|story|part|section)[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return (5, int(match.group(1)))
# Pattern 6: split_NUMBER (Calibre pattern)
match = re.search(r'split_(\d+)', filename)
if match:
return (6, int(match.group(1)))
# Pattern 7: Just NUMBER.html (like 1.html, 2.html)
match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (7, int(match.group(1)))
# Pattern 8: -NUMBER at end before extension
match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (8, int(match.group(1)))
# Pattern 9: _NUMBER at end before extension
match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (9, int(match.group(1)))
# Pattern 10: (NUMBER) in parentheses anywhere
match = re.search(r'\((\d+)\)', filename)
if match:
return (10, int(match.group(1)))
# Pattern 11: [NUMBER] in brackets anywhere
match = re.search(r'\[(\d+)\]', filename)
if match:
return (11, int(match.group(1)))
# Pattern 12: page-NUMBER or p-NUMBER or pg-NUMBER
match = re.search(r'(?:page|pg?)[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return (12, int(match.group(1)))
# Pattern 13: Any file ending with NUMBER before extension
match = re.search(r'(\d+)\.(?:html?|xhtml)$', filename)
if match:
return (13, int(match.group(1)))
# Pattern 14: Roman numerals (I, II, III, IV, etc.)
roman_pattern = r'\b(M{0,3}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3}))\b'
match = re.search(roman_pattern, filename)
if match:
roman = match.group(1)
# Convert roman to number
roman_dict = {'I':1,'V':5,'X':10,'L':50,'C':100,'D':500,'M':1000}
val = 0
for i in range(len(roman)):
if i > 0 and roman_dict[roman[i]] > roman_dict[roman[i-1]]:
val += roman_dict[roman[i]] - 2 * roman_dict[roman[i-1]]
else:
val += roman_dict[roman[i]]
return (14, val)
# Pattern 15: First significant number found
numbers = re.findall(r'\d+', filename)
if numbers:
# Skip common year numbers (1900-2099) unless it's the only number
significant_numbers = [int(n) for n in numbers if not (1900 <= int(n) <= 2099)]
if significant_numbers:
return (15, significant_numbers[0])
elif numbers:
return (15, int(numbers[0]))
# Final fallback: alphabetical
return (99, filename)
def _extract_chapter_number(self, filename: str, default_idx: int) -> int:
"""Extract chapter number using multiple patterns"""
# FIXED: Pattern 1 - Check -h-NUMBER FIRST (YOUR FILES USE THIS!)
match = re.search(r'-h-(\d+)', filename)
if match:
return int(match.group(1))
# Pattern 2: response_NUMBER_ (standard pattern)
match = re.match(r"response_(\d+)_", filename)
if match:
return int(match.group(1))
# Pattern 3: chapter-NUMBER, chapter_NUMBER, chapterNUMBER
match = re.search(r'chapter[-_\s]?(\d+)', filename, re.IGNORECASE)
if match:
return int(match.group(1))
# Pattern 4: ch-NUMBER, ch_NUMBER, chNUMBER
match = re.search(r'\bch[-_\s]?(\d+)\b', filename, re.IGNORECASE)
if match:
return int(match.group(1))
# Pattern 5: Just NUMBER.html (like 127.html)
match = re.match(r'^(\d+)\.(?:html?|xhtml)$', filename)
if match:
return int(match.group(1))
# Pattern 6: _NUMBER at end before extension
match = re.search(r'_(\d+)\.(?:html?|xhtml)$', filename)
if match:
return int(match.group(1))
# Pattern 7: -NUMBER at end before extension
match = re.search(r'-(\d+)\.(?:html?|xhtml)$', filename)
if match:
return int(match.group(1))
# Pattern 8: (NUMBER) in parentheses
match = re.search(r'\((\d+)\)', filename)
if match:
return int(match.group(1))
# Pattern 9: [NUMBER] in brackets
match = re.search(r'\[(\d+)\]', filename)
if match:
return int(match.group(1))
# Pattern 10: Use the sort key logic
sort_key = self.get_robust_sort_key(filename)
if isinstance(sort_key[1], int) and sort_key[1] > 0:
return sort_key[1]
# Final fallback: use position + 1
return default_idx + 1
def _extract_main_content(self, html_content: str, filename: str) -> str:
"""Extract main content from web-scraped HTML pages
This method tries to find the actual chapter content within a full webpage
"""
try:
# For web-scraped content, try to extract just the chapter part
# Common patterns for chapter content containers
content_patterns = [
# Look for specific class names commonly used for content
(r'
', re.DOTALL | re.IGNORECASE),
# Look for article tags with content
(r']*>(.*?)', re.DOTALL | re.IGNORECASE),
# Look for main tags
(r']*>(.*?)', re.DOTALL | re.IGNORECASE),
# Look for specific id patterns
(r'
', re.DOTALL | re.IGNORECASE),
]
for pattern, flags in content_patterns:
match = re.search(pattern, html_content, flags)
if match:
extracted = match.group(1)
# Make sure we got something substantial
if len(extracted.strip()) > 100:
self.log(f"📄 Extracted main content using pattern for {filename}")
return extracted
# If no patterns matched, check if this looks like a full webpage
if ']*>(.*?)', html_content, re.DOTALL | re.IGNORECASE)
if body_match:
self.log(f"📄 Extracted body content for {filename}")
return body_match.group(1)
# If all else fails, return original content
self.log(f"📄 Using original content for {filename}")
return html_content
except Exception as e:
self.log(f"⚠️ Content extraction failed for {filename}: {e}")
return html_content
def _fallback_title_extraction(self, content: str, filename: str, num: int) -> Optional[str]:
"""Fallback title extraction for when TitleExtractor fails
This handles web-scraped pages and other non-standard formats
"""
# Try filename-based extraction first (often more reliable for web scrapes)
filename_title = self._extract_title_from_filename_fallback(filename, num)
if filename_title:
return filename_title
# Try HTML content extraction with patterns TitleExtractor might miss
html_title = self._extract_title_from_html_fallback(content, num)
if html_title:
return html_title
return None
def _extract_title_from_html_fallback(self, content: str, num: int) -> Optional[str]:
"""Fallback HTML title extraction for web-scraped content"""
# Look for title patterns that TitleExtractor might miss
# Specifically for web-scraped novel sites
patterns = [
# Title tags with site separators
r']*>([^|–\-]+?)(?:\s*[|–\-]\s*[^<]+)?',
# Specific class patterns from novel sites
r'
]*class="[^"]*cat-series[^"]*"[^>]*>([^<]+)
',
r'
]*class="[^"]*entry-title[^"]*"[^>]*>([^<]+)
',
r']*class="[^"]*chapter-title[^"]*"[^>]*>([^<]+)',
# Meta property patterns
r']*property="og:title"[^>]*content="([^"]+)"',
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
title = match.group(1).strip()
# Decode HTML entities
title = HTMLEntityDecoder.decode(title)
# Additional cleanup for web-scraped content
title = re.sub(r'\s+', ' ', title) # Normalize whitespace
title = title.strip()
# Validate it's reasonable
if 3 < len(title) < 200 and title.lower() != 'untitled':
self.log(f"📝 Fallback extracted title from HTML: '{title}'")
return title
return None
def _extract_title_from_filename_fallback(self, filename: str, num: int) -> Optional[str]:
"""Fallback filename title extraction"""
# Remove extension
base_name = re.sub(r'\.(html?|xhtml)$', '', filename, flags=re.IGNORECASE)
# Web-scraped filename patterns
patterns = [
# "theend-chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7"
r'(?:theend|story|novel)[-_]chapter[-_](\d+)[-_](.+)',
# "chapter-127-apocalypse-7" -> "Chapter 127 - Apocalypse 7"
r'chapter[-_](\d+)[-_](.+)',
# "ch127-title" -> "Chapter 127 - Title"
r'ch[-_]?(\d+)[-_](.+)',
# Just the title part after number
r'^\d+[-_](.+)',
]
for pattern in patterns:
match = re.search(pattern, base_name, re.IGNORECASE)
if match:
if match.lastindex == 2: # Pattern with chapter number and title
chapter_num = match.group(1)
title_part = match.group(2)
else: # Pattern with just title
chapter_num = str(num)
title_part = match.group(1)
# Clean up the title part
title_part = title_part.replace('-', ' ').replace('_', ' ')
# Capitalize properly
words = title_part.split()
title_part = ' '.join(word.capitalize() if len(word) > 2 else word for word in words)
title = f"Chapter {chapter_num} - {title_part}"
self.log(f"📝 Fallback extracted title from filename: '{title}'")
return title
return None
def _load_metadata(self) -> dict:
"""Load metadata from JSON file"""
if os.path.exists(self.metadata_path):
try:
import html
with open(self.metadata_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
self.log("[DEBUG] Metadata loaded successfully")
return metadata
except Exception as e:
self.log(f"[WARNING] Failed to load metadata.json: {e}")
else:
self.log("[WARNING] metadata.json not found, using defaults")
return {}
def _create_book(self, metadata: dict) -> epub.EpubBook:
"""Create and configure EPUB book with complete metadata"""
book = epub.EpubBook()
# Set identifier
book.set_identifier(metadata.get("identifier", f"translated-{os.path.basename(self.base_dir)}"))
# Fix encoding issues in titles before using them
if metadata.get('title'):
metadata['title'] = self._fix_encoding_issues(metadata['title'])
if metadata.get('original_title'):
metadata['original_title'] = self._fix_encoding_issues(metadata['original_title'])
# Determine title
book_title = self._determine_book_title(metadata)
book.set_title(book_title)
# Set language
book.set_language(metadata.get("language", "en"))
# Store original title as alternative metadata (not as another dc:title)
# This prevents EPUB readers from getting confused about which title to display
if metadata.get('original_title') and metadata.get('original_title') != book_title:
# Use 'alternative' field instead of 'title' to avoid display issues
book.add_metadata('DC', 'alternative', metadata['original_title'])
# Also store in a custom field for reference
book.add_metadata('calibre', 'original_title', metadata['original_title'])
self.log(f"[INFO] Stored original title as alternative: {metadata['original_title']}")
# Set author/creator
if metadata.get("creator"):
book.add_author(metadata["creator"])
self.log(f"[INFO] Set author: {metadata['creator']}")
# ADD DESCRIPTION - This is what Calibre looks for
if metadata.get("description"):
# Clean the description of any HTML entities
description = HTMLEntityDecoder.decode(str(metadata["description"]))
book.add_metadata('DC', 'description', description)
self.log(f"[INFO] Set description: {description[:100]}..." if len(description) > 100 else f"[INFO] Set description: {description}")
# Add publisher
if metadata.get("publisher"):
book.add_metadata('DC', 'publisher', metadata["publisher"])
self.log(f"[INFO] Set publisher: {metadata['publisher']}")
# Add publication date
if metadata.get("date"):
book.add_metadata('DC', 'date', metadata["date"])
self.log(f"[INFO] Set date: {metadata['date']}")
# Add rights/copyright
if metadata.get("rights"):
book.add_metadata('DC', 'rights', metadata["rights"])
self.log(f"[INFO] Set rights: {metadata['rights']}")
# Add subject/genre/tags
if metadata.get("subject"):
if isinstance(metadata["subject"], list):
for subject in metadata["subject"]:
book.add_metadata('DC', 'subject', subject)
self.log(f"[INFO] Added subject: {subject}")
else:
book.add_metadata('DC', 'subject', metadata["subject"])
self.log(f"[INFO] Set subject: {metadata['subject']}")
# Add series information if available
if metadata.get("series"):
# Calibre uses a custom metadata field for series
book.add_metadata('calibre', 'series', metadata["series"])
self.log(f"[INFO] Set series: {metadata['series']}")
# Add series index if available
if metadata.get("series_index"):
book.add_metadata('calibre', 'series_index', str(metadata["series_index"]))
self.log(f"[INFO] Set series index: {metadata['series_index']}")
# Add custom metadata for translator info
if metadata.get("translator"):
book.add_metadata('DC', 'contributor', metadata["translator"], {'role': 'translator'})
self.log(f"[INFO] Set translator: {metadata['translator']}")
# Add source information
if metadata.get("source"):
book.add_metadata('DC', 'source', metadata["source"])
self.log(f"[INFO] Set source: {metadata['source']}")
# Add any ISBN if available
if metadata.get("isbn"):
book.add_metadata('DC', 'identifier', f"ISBN:{metadata['isbn']}", {'scheme': 'ISBN'})
self.log(f"[INFO] Set ISBN: {metadata['isbn']}")
# Add coverage (geographic/temporal scope) if available
if metadata.get("coverage"):
book.add_metadata('DC', 'coverage', metadata["coverage"])
self.log(f"[INFO] Set coverage: {metadata['coverage']}")
# Add any custom metadata that might be in the JSON
# This handles any additional fields that might be present
custom_metadata_fields = [
'contributor', 'format', 'relation', 'type'
]
for field in custom_metadata_fields:
if metadata.get(field):
book.add_metadata('DC', field, metadata[field])
self.log(f"[INFO] Set {field}: {metadata[field]}")
return book
def _determine_book_title(self, metadata: dict) -> str:
"""Determine the book title from metadata"""
# Try translated title
if metadata.get('title') and str(metadata['title']).strip():
title = str(metadata['title']).strip()
self.log(f"✅ Using translated title: '{title}'")
return title
# Try original title
if metadata.get('original_title') and str(metadata['original_title']).strip():
title = str(metadata['original_title']).strip()
self.log(f"⚠️ Using original title: '{title}'")
return title
# Fallback to directory name
title = os.path.basename(self.base_dir)
self.log(f"📁 Using directory name: '{title}'")
return title
def _create_default_css(self) -> str:
"""Create default CSS for proper chapter formatting"""
return """
/* Default EPUB CSS */
body {
margin: 1em;
padding: 0;
font-family: serif;
line-height: 1.6;
}
h1, h2, h3, h4, h5, h6 {
font-weight: bold;
margin-top: 1em;
margin-bottom: 0.5em;
page-break-after: avoid;
}
h1 {
font-size: 1.5em;
text-align: center;
margin-top: 2em;
margin-bottom: 2em;
}
p {
margin: 1em 0;
text-indent: 0;
}
img {
max-width: 100%;
height: auto;
display: block;
margin: 1em auto;
}
/* Prevent any overlay issues */
* {
position: static !important;
z-index: auto !important;
}
/* Remove any floating elements */
.title, [class*="title"] {
position: static !important;
float: none !important;
background: transparent !important;
}
"""
def _add_css_files(self, book: epub.EpubBook) -> List[epub.EpubItem]:
"""Add CSS files to book"""
css_items = []
# First, add a default CSS to ensure proper formatting
default_css = epub.EpubItem(
uid="css_default",
file_name="css/default.css",
media_type="text/css",
content=FileUtils.ensure_bytes(self._create_default_css())
)
book.add_item(default_css)
css_items.append(default_css)
self.log("✅ Added default CSS")
# Then add user CSS files
if not os.path.isdir(self.css_dir):
return css_items
css_files = [f for f in sorted(os.listdir(self.css_dir)) if f.endswith('.css')]
self.log(f"[DEBUG] Found {len(css_files)} CSS files")
for css_file in css_files:
css_path = os.path.join(self.css_dir, css_file)
try:
import html
with open(css_path, 'r', encoding='utf-8') as f:
css_content = f.read()
css_item = epub.EpubItem(
uid=f"css_{css_file}",
file_name=f"css/{css_file}",
media_type="text/css",
content=FileUtils.ensure_bytes(css_content)
)
book.add_item(css_item)
css_items.append(css_item)
self.log(f"✅ Added CSS: {css_file}")
except Exception as e:
self.log(f"[WARNING] Failed to add CSS {css_file}: {e}")
return css_items
def _add_fonts(self, book: epub.EpubBook):
"""Add font files to book"""
if not os.path.isdir(self.fonts_dir):
return
for font_file in os.listdir(self.fonts_dir):
font_path = os.path.join(self.fonts_dir, font_file)
if not os.path.isfile(font_path):
continue
try:
mime_type = 'application/font-woff'
if font_file.endswith('.ttf'):
mime_type = 'font/ttf'
elif font_file.endswith('.otf'):
mime_type = 'font/otf'
elif font_file.endswith('.woff2'):
mime_type = 'font/woff2'
with open(font_path, 'rb') as f:
book.add_item(epub.EpubItem(
uid=f"font_{font_file}",
file_name=f"fonts/{font_file}",
media_type=mime_type,
content=f.read()
))
self.log(f"✅ Added font: {font_file}")
except Exception as e:
self.log(f"[WARNING] Failed to add font {font_file}: {e}")
def _process_images(self) -> Tuple[Dict[str, str], Optional[str]]:
"""Process images using parallel processing"""
processed_images = {}
cover_file = None
try:
# Find the images directory
actual_images_dir = None
possible_dirs = [
self.images_dir,
os.path.join(self.base_dir, "images"),
os.path.join(self.output_dir, "images"),
]
for test_dir in possible_dirs:
self.log(f"[DEBUG] Checking for images in: {test_dir}")
if os.path.isdir(test_dir):
files = os.listdir(test_dir)
if files:
self.log(f"[DEBUG] Found {len(files)} files in {test_dir}")
actual_images_dir = test_dir
break
if not actual_images_dir:
self.log("[WARNING] No images directory found or directory is empty")
return processed_images, cover_file
self.images_dir = actual_images_dir
self.log(f"[INFO] Using images directory: {self.images_dir}")
# Get list of files to process
image_files = sorted(os.listdir(self.images_dir))
self.log(f"🖼️ Processing {len(image_files)} potential images with {self.max_workers} workers")
def process_single_image(img):
"""Worker function to process a single image"""
path = os.path.join(self.images_dir, img)
if not os.path.isfile(path):
return None
# Check MIME type
ctype, _ = mimetypes.guess_type(path)
# If MIME type detection fails, check extension
if not ctype:
ext = os.path.splitext(img)[1].lower()
mime_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.bmp': 'image/bmp',
'.webp': 'image/webp',
'.svg': 'image/svg+xml'
}
ctype = mime_map.get(ext)
if ctype and ctype.startswith("image"):
safe_name = FileUtils.sanitize_filename(img, allow_unicode=False)
# Ensure extension
if not os.path.splitext(safe_name)[1]:
ext = os.path.splitext(img)[1]
if ext:
safe_name += ext
elif ctype == 'image/jpeg':
safe_name += '.jpg'
elif ctype == 'image/png':
safe_name += '.png'
# Special handling for SVG: rasterize to PNG fallback for reader compatibility
if ctype == 'image/svg+xml' and self.rasterize_svg and self._cairosvg_available:
try:
from cairosvg import svg2png
png_name = os.path.splitext(safe_name)[0] + '.png'
png_path = os.path.join(self.images_dir, png_name)
# Generate PNG only if not already present
if not os.path.exists(png_path):
svg2png(url=path, write_to=png_path)
self.log(f" 🖼️ Rasterized SVG → PNG: {img} -> {png_name}")
# Return the PNG as the image to include
return (png_name, png_name, 'image/png')
except Exception as e:
self.log(f"[WARNING] SVG rasterization failed for {img}: {e}")
# Fall back to adding the raw SVG
return (img, safe_name, ctype)
return (img, safe_name, ctype)
else:
return None
# Process images in parallel
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(process_single_image, img) for img in image_files]
completed = 0
for future in as_completed(futures):
try:
result = future.result()
completed += 1
if result:
original, safe, ctype = result
processed_images[original] = safe
self.log(f" [{completed}/{len(image_files)}] ✅ Processed: {original} -> {safe}")
else:
self.log(f" [{completed}/{len(image_files)}] ⏭️ Skipped non-image file")
except Exception as e:
completed += 1
self.log(f" [{completed}/{len(image_files)}] ❌ Failed to process image: {e}")
# Find cover (sequential - quick operation)
# Respect user preference to disable automatic cover creation
disable_auto_cover = os.environ.get('DISABLE_AUTOMATIC_COVER_CREATION', '0') == '1'
if processed_images and not disable_auto_cover:
cover_prefixes = ['cover', 'front']
for original_name, safe_name in processed_images.items():
name_lower = original_name.lower()
if any(name_lower.startswith(prefix) for prefix in cover_prefixes):
cover_file = safe_name
self.log(f"📔 Found cover image: {original_name} -> {cover_file}")
break
if not cover_file:
cover_file = next(iter(processed_images.values()))
self.log(f"📔 Using first image as cover: {cover_file}")
self.log(f"✅ Processed {len(processed_images)} images successfully")
except Exception as e:
self.log(f"[ERROR] Error processing images: {e}")
import traceback
self.log(f"[DEBUG] Traceback: {traceback.format_exc()}")
return processed_images, cover_file
def _add_images_to_book(self, book: epub.EpubBook, processed_images: Dict[str, str],
cover_file: Optional[str]):
"""Add images to book using parallel processing for reading files"""
# Filter out cover image
images_to_add = [(orig, safe) for orig, safe in processed_images.items()
if safe != cover_file]
if not images_to_add:
self.log("No images to add (besides cover)")
return
self.log(f"📚 Adding {len(images_to_add)} images to EPUB with {self.max_workers} workers")
def read_image_file(image_data):
"""Worker function to read image file"""
original_name, safe_name = image_data
img_path = os.path.join(self.images_dir, original_name)
try:
ctype, _ = mimetypes.guess_type(img_path)
if not ctype:
ctype = "image/jpeg" # Default fallback
with open(img_path, 'rb') as f:
content = f.read()
return {
'original': original_name,
'safe': safe_name,
'ctype': ctype,
'content': content,
'success': True
}
except Exception as e:
return {
'original': original_name,
'safe': safe_name,
'error': str(e),
'success': False
}
# Read all images in parallel
image_data_list = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(read_image_file, img_data) for img_data in images_to_add]
completed = 0
for future in as_completed(futures):
try:
result = future.result()
completed += 1
if result['success']:
image_data_list.append(result)
self.log(f" [{completed}/{len(images_to_add)}] ✅ Read: {result['original']}")
else:
self.log(f" [{completed}/{len(images_to_add)}] ❌ Failed: {result['original']} - {result['error']}")
except Exception as e:
completed += 1
self.log(f" [{completed}/{len(images_to_add)}] ❌ Exception reading image: {e}")
# Add images to book sequentially (required by ebooklib)
self.log("\n📦 Adding images to EPUB structure...")
added = 0
for img_data in image_data_list:
try:
book.add_item(epub.EpubItem(
uid=img_data['safe'],
file_name=f"images/{img_data['safe']}",
media_type=img_data['ctype'],
content=img_data['content']
))
added += 1
self.log(f" ✅ Added: {img_data['original']}")
except Exception as e:
self.log(f" ❌ Failed to add {img_data['original']} to EPUB: {e}")
self.log(f"✅ Successfully added {added}/{len(images_to_add)} images to EPUB")
def _create_cover_page(self, book: epub.EpubBook, cover_file: str,
processed_images: Dict[str, str], css_items: List[epub.EpubItem],
metadata: dict) -> Optional[epub.EpubHtml]:
"""Create cover page"""
# Find original filename
original_cover = None
for orig, safe in processed_images.items():
if safe == cover_file:
original_cover = orig
break
if not original_cover:
return None
cover_path = os.path.join(self.images_dir, original_cover)
try:
with open(cover_path, 'rb') as f:
cover_data = f.read()
# Add cover image
cover_img = epub.EpubItem(
uid="cover-image",
file_name=f"images/{cover_file}",
media_type=mimetypes.guess_type(cover_path)[0] or "image/jpeg",
content=cover_data
)
book.add_item(cover_img)
# Set cover metadata
cover_img.properties = ["cover-image"]
book.add_metadata('http://purl.org/dc/elements/1.1/', 'cover', 'cover-image')
# Create cover page
cover_page = epub.EpubHtml(
title="Cover",
file_name="cover.xhtml",
lang=metadata.get("language", "en")
)
# Build cover HTML directly without going through ensure_compliance
# Since it's simple and controlled, we can build it directly
cover_content = f'''
Cover
'''
cover_page.content = cover_content.encode('utf-8')
# Associate CSS with cover page if needed
if self.attach_css_to_chapters:
for css_item in css_items:
cover_page.add_item(css_item)
book.add_item(cover_page)
self.log(f"✅ Set cover image: {cover_file}")
return cover_page
except Exception as e:
self.log(f"[WARNING] Failed to add cover: {e}")
return None
def _process_chapter_images(self, xhtml_content: str, processed_images: Dict[str, str]) -> str:
"""Process image paths and inline SVG in chapter content.
- Rewrites to use images/ paths and prefers PNG fallback for SVGs.
- Converts inline