diff --git "a/image_translator.py" "b/image_translator.py" new file mode 100644--- /dev/null +++ "b/image_translator.py" @@ -0,0 +1,2500 @@ +""" +Image Translation Module for EPUB Translator +Handles detection, extraction, and translation of images containing text +Includes support for web novel images and watermark handling +""" + +import os +import json +import base64 +import zipfile +from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter +import io +from typing import List, Dict, Optional, Tuple +import re +from bs4 import BeautifulSoup +import logging +import time +import queue +import threading +# OpenCV availability check +try: + import cv2 + import numpy as np + CV2_AVAILABLE = True +except ImportError: + CV2_AVAILABLE = False + print("โ ๏ธ OpenCV not available - advanced image processing disabled") +import numpy as np +from unified_api_client import UnifiedClientError + +logger = logging.getLogger(__name__) + +def requires_cv2(func): + """Decorator to skip methods that require OpenCV""" + def wrapper(self, *args, **kwargs): + if not CV2_AVAILABLE: + # Return sensible defaults based on the function + if func.__name__ == '_detect_watermark_pattern': + return False, None + elif func.__name__ in ['_remove_periodic_watermark', + '_adaptive_histogram_equalization', + '_bilateral_filter', + '_enhance_text_regions']: + # Return the image array unchanged + return args[0] if args else None + else: + return None + return func(self, *args, **kwargs) + return wrapper + +def send_image_with_interrupt(client, messages, image_data, temperature, max_tokens, stop_check_fn, chunk_timeout=None, context='image_translation'): + """Send image API request with interrupt capability and timeout retry""" + import queue + import threading + from unified_api_client import UnifiedClientError + + result_queue = queue.Queue() + + def api_call(): + try: + start_time = time.time() + result = client.send_image(messages, image_data, temperature=temperature, + max_tokens=max_tokens, context=context) + elapsed = time.time() - start_time + result_queue.put((result, elapsed)) + except Exception as e: + result_queue.put(e) + + api_thread = threading.Thread(target=api_call) + api_thread.daemon = True + api_thread.start() + + # Use chunk timeout if provided, otherwise use default + timeout = chunk_timeout if chunk_timeout else 300 + check_interval = 0.5 + elapsed = 0 + + while elapsed < timeout: + try: + result = result_queue.get(timeout=check_interval) + if isinstance(result, Exception): + raise result + if isinstance(result, tuple): + api_result, api_time = result + # Check if it took too long + if chunk_timeout and api_time > chunk_timeout: + raise UnifiedClientError(f"Image API call took {api_time:.1f}s (timeout: {chunk_timeout}s)") + return api_result + return result + except queue.Empty: + if stop_check_fn and stop_check_fn(): + raise UnifiedClientError("Image translation stopped by user") + elapsed += check_interval + + raise UnifiedClientError(f"Image API call timed out after {timeout} seconds") + +class ImageTranslator: + def __init__(self, client, output_dir: str, profile_name: str = "", system_prompt: str = "", + temperature: float = 0.3, log_callback=None, progress_manager=None, + history_manager=None, chunk_context_manager=None): + """ + Initialize the image translator + + Args: + client: UnifiedClient instance for API calls + output_dir: Directory to save translated images + profile_name: Source language for translation + system_prompt: System prompt from GUI to use for translation + temperature: Temperature for translation + log_callback: Optional callback function for logging + progress_manager: Shared ProgressManager instance for synchronization + """ + self.client = client + self.output_dir = output_dir + self.profile_name = profile_name + self.system_prompt = system_prompt + self.temperature = temperature + self.log_callback = log_callback + self.progress_manager = progress_manager # Use shared progress manager + self.images_dir = os.path.join(output_dir, "images") + self.translated_images_dir = os.path.join(output_dir, "translated_images") + os.makedirs(self.translated_images_dir, exist_ok=True) + self.api_delay = float(os.getenv("SEND_INTERVAL_SECONDS", "2")) + + # Track processed images to avoid duplicates + self.processed_images = {} + self.image_translations = {} + + # Configuration from environment + self.process_webnovel = os.getenv("PROCESS_WEBNOVEL_IMAGES", "1") == "1" + self.webnovel_min_height = int(os.getenv("WEBNOVEL_MIN_HEIGHT", "1000")) + self.image_max_tokens = int(os.getenv("MAX_OUTPUT_TOKENS", "8192")) + self.chunk_height = int(os.getenv("IMAGE_CHUNK_HEIGHT", "2000")) + + # Add context tracking for image chunks + self.contextual_enabled = os.getenv("CONTEXTUAL", "1") == "1" + self.history_manager = history_manager + self.chunk_context_manager = chunk_context_manager + self.remove_ai_artifacts = os.getenv("REMOVE_AI_ARTIFACTS", "0") == "1" + + + def extract_images_from_chapter(self, chapter_html: str) -> List[Dict]: + """ + Extract image references from chapter HTML + + Returns: + List of dicts with image info: {src, alt, width, height} + """ + soup = BeautifulSoup(chapter_html, 'html.parser') + images = [] + + for img in soup.find_all('img'): + img_info = { + 'src': img.get('src', ''), + 'alt': img.get('alt', ''), + 'width': img.get('width'), + 'height': img.get('height'), + 'style': img.get('style', '') + } + + if img_info['src']: + images.append(img_info) + + return images + + def compress_image(self, image_path): + """ + Compress an image based on settings from environment variables + + Args: + image_path: Path to the input image + + Returns: + Path to compressed image (temporary or saved) + """ + try: + # Check if compression is enabled + if os.getenv("ENABLE_IMAGE_COMPRESSION", "0") != "1": + return image_path # Return original if compression disabled + + print(f" ๐๏ธ Compressing image: {os.path.basename(image_path)}") + + # Load compression settings from environment + target_format = os.getenv("IMAGE_COMPRESSION_FORMAT", "auto") + max_dimension = int(os.getenv("MAX_IMAGE_DIMENSION", "2048")) + max_size_mb = float(os.getenv("MAX_IMAGE_SIZE_MB", "10")) + + quality_settings = { + 'webp': int(os.getenv("WEBP_QUALITY", "85")), + 'jpeg': int(os.getenv("JPEG_QUALITY", "85")), + 'png': int(os.getenv("PNG_COMPRESSION", "6")) + } + + auto_compress = os.getenv("AUTO_COMPRESS_ENABLED", "1") == "1" + preserve_transparency = os.getenv("PRESERVE_TRANSPARENCY", "0") == "1" # Default is now False + preserve_original_format = os.getenv("PRESERVE_ORIGINAL_FORMAT", "0") == "1" # New option + optimize_for_ocr = os.getenv("OPTIMIZE_FOR_OCR", "1") == "1" + progressive = os.getenv("PROGRESSIVE_ENCODING", "1") == "1" + save_compressed = os.getenv("SAVE_COMPRESSED_IMAGES", "0") == "1" + + # Open image + with Image.open(image_path) as img: + original_format = img.format.lower() if img.format else 'png' + has_transparency = img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info) + + # Special handling for GIF files + is_gif = original_format == 'gif' + if is_gif and not preserve_original_format: + print(f" ๐๏ธ GIF detected - converting to static image for better compression") + # For animated GIFs, we'll take the first frame + # Convert to RGBA to preserve any transparency + if img.mode == 'P' and 'transparency' in img.info: + img = img.convert('RGBA') + elif img.mode not in ('RGB', 'RGBA'): + img = img.convert('RGB') + elif is_gif and preserve_original_format: + print(f" ๐๏ธ GIF detected - preserving original format as requested") + + # Calculate original size + original_size_mb = os.path.getsize(image_path) / (1024 * 1024) + print(f" ๐ Original: {img.width}x{img.height}, {original_size_mb:.2f}MB, format: {original_format}") + + # Get chunk height from environment - this comes from the GUI setting + chunk_height = int(os.getenv("IMAGE_CHUNK_HEIGHT", "1500")) + print(f" ๐ Using chunk height from settings: {chunk_height}px") + + # Check if resizing is needed - BUT NOT FOR TALL IMAGES THAT WILL BE CHUNKED! + needs_resize = img.width > max_dimension or img.height > max_dimension + + # CRITICAL: Check if this is a tall image that will be chunked + # If so, DO NOT resize the height! + is_tall_text_image = img.height > chunk_height + + if needs_resize: + if is_tall_text_image: + # Only resize width if needed, NEVER touch the height for tall images + if img.width > max_dimension: + # Keep aspect ratio but don't exceed max width + ratio = max_dimension / img.width + new_width = max_dimension + new_height = int(img.height * ratio) + print(f" โ ๏ธ Tall image ({img.height}px > chunk height {chunk_height}px)") + print(f" ๐ Resizing width only: {img.width} โ {new_width} (height: {img.height} โ {new_height})") + img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) + else: + print(f" โ Tall image ({img.height}px) - keeping dimensions (will be chunked into {(img.height + chunk_height - 1) // chunk_height} chunks)") + else: + # Normal resize for regular images (not tall enough to chunk) + ratio = min(max_dimension / img.width, max_dimension / img.height) + new_size = (int(img.width * ratio), int(img.height * ratio)) + img = img.resize(new_size, Image.Resampling.LANCZOS) + print(f" ๐ Regular image resized to: {new_size[0]}x{new_size[1]}") + + # Auto-select format if needed + if preserve_original_format and target_format == 'auto': + # Keep the original format + target_format = original_format + # Special handling for formats that might not be ideal + if original_format == 'bmp': + target_format = 'png' # Convert BMP to PNG as BMP is uncompressed + print(f" ๐ธ Preserving original format: {target_format}") + elif target_format == 'auto': + # For GIFs with text (web novel chapters), prefer PNG or WebP + if is_gif: + if has_transparency and preserve_transparency: + target_format = 'png' # Better for text with transparency + else: + target_format = 'webp' # Good compression for text + elif has_transparency and preserve_transparency: + target_format = 'webp' + elif optimize_for_ocr and img.width * img.height > 1000000: + target_format = 'webp' + elif original_size_mb > 5: + target_format = 'webp' + else: + target_format = 'jpeg' + print(f" ๐ฏ Auto-selected format: {target_format}") + + # Handle transparency conversion if needed + if target_format == 'jpeg' and (has_transparency or img.mode == 'RGBA'): + # Convert to RGB with white background + rgb_img = Image.new('RGB', img.size, (255, 255, 255)) + if img.mode == 'RGBA': + rgb_img.paste(img, mask=img.split()[3]) + else: + rgb_img.paste(img) + img = rgb_img + + # Apply OCR optimization if enabled + if optimize_for_ocr: + # Skip OCR optimization for GIF files in palette mode when preserving format + if target_format == 'gif' and img.mode in ('P', 'L'): + print(f" โ ๏ธ Applying OCR optimization to GIF (converting modes temporarily)") + # Convert to RGB temporarily for enhancement, then convert back + original_mode = img.mode + transparency_info = None + + if img.mode == 'P': + # Preserve transparency info if present + transparency_info = img.info.get('transparency', None) + # Convert to RGBA if has transparency, otherwise RGB + if transparency_info is not None: + img = img.convert('RGBA') + else: + img = img.convert('RGB') + elif img.mode == 'L': + img = img.convert('RGB') + + # Apply enhancements + from PIL import ImageEnhance + enhancer = ImageEnhance.Contrast(img) + img = enhancer.enhance(1.2) + enhancer = ImageEnhance.Sharpness(img) + img = enhancer.enhance(1.1) + + # Extra sharpening for GIF text + img = enhancer.enhance(1.2) + + # Convert back to original mode for GIF saving + if original_mode == 'P': + # Quantize back to palette mode + img = img.quantize(colors=256, method=2) # MEDIANCUT + if transparency_info is not None: + img.info['transparency'] = transparency_info + elif original_mode == 'L': + img = img.convert('L') + else: + # Normal OCR optimization for non-GIF formats or RGB-mode images + from PIL import ImageEnhance + enhancer = ImageEnhance.Contrast(img) + img = enhancer.enhance(1.2) + enhancer = ImageEnhance.Sharpness(img) + img = enhancer.enhance(1.1) + + # Extra sharpening for GIF text which might be lower quality + if is_gif: + img = enhancer.enhance(1.2) + + # Prepare save parameters based on format + save_params = {} + + if target_format == 'webp': + # For WebP, decide whether to keep transparency + if has_transparency and preserve_transparency: + save_params = { + 'format': 'WEBP', + 'quality': quality_settings['webp'], + 'method': 6, + 'lossless': False, + 'exact': True # Preserve transparency + } + else: + # Convert to RGB with white background for WebP without transparency + if img.mode in ('RGBA', 'LA', 'P'): + rgb_img = Image.new('RGB', img.size, (255, 255, 255)) + if img.mode == 'RGBA': + rgb_img.paste(img, mask=img.split()[3]) + elif img.mode == 'LA': + rgb_img.paste(img, mask=img.split()[1]) + else: # P mode + if 'transparency' in img.info: + img = img.convert('RGBA') + rgb_img.paste(img, mask=img.split()[3]) + else: + rgb_img.paste(img) + img = rgb_img + + save_params = { + 'format': 'WEBP', + 'quality': quality_settings['webp'], + 'method': 6, + 'lossless': False + } + + elif target_format == 'jpeg': + save_params = { + 'format': 'JPEG', + 'quality': quality_settings['jpeg'], + 'optimize': True, + 'progressive': progressive + } + + elif target_format == 'png': + # For PNG, handle transparency properly + if not (has_transparency and preserve_transparency): + # Convert to RGB with white background if not preserving transparency + if img.mode in ('RGBA', 'LA', 'P'): + rgb_img = Image.new('RGB', img.size, (255, 255, 255)) + if img.mode == 'RGBA': + rgb_img.paste(img, mask=img.split()[3]) + elif img.mode == 'LA': + rgb_img.paste(img, mask=img.split()[1]) + else: # P mode + if 'transparency' in img.info: + img = img.convert('RGBA') + rgb_img.paste(img, mask=img.split()[3]) + else: + rgb_img.paste(img) + img = rgb_img + elif img.mode == 'P' and 'transparency' in img.info: + # Convert palette mode with transparency to RGBA + img = img.convert('RGBA') + + save_params = { + 'format': 'PNG', + 'compress_level': quality_settings['png'], + 'optimize': True + } + + elif target_format == 'gif': + # GIF format - limited but preserving original when requested + print(f" โ ๏ธ Warning: GIF format has limited colors (256) and may reduce text quality") + if img.mode not in ('P', 'L'): + # Convert to palette mode for GIF + img = img.quantize(colors=256, method=2) # MEDIANCUT method + + save_params = { + 'format': 'GIF', + 'optimize': True + } + + # Auto-compress to meet token target if specified + if auto_compress: + target_tokens = int(os.getenv("TARGET_IMAGE_TOKENS", "1000")) + # For text-heavy images (like web novel GIFs), be less aggressive + if is_gif or 'chapter' in os.path.basename(image_path).lower(): + target_mb = min(max_size_mb, 3.0) # Allow up to 3MB for text clarity + else: + target_mb = min(max_size_mb, 2.0) # Regular images + print(f" ๐ฏ Auto-compress target: {target_mb:.1f}MB for token efficiency") + max_size_mb = target_mb + + # Save compressed image + output_path = None + quality = save_params.get('quality', 85) + + # Try different quality levels to meet size target + while quality > 10: + from io import BytesIO + buffer = BytesIO() + + if 'quality' in save_params: + save_params['quality'] = quality + + img.save(buffer, **save_params) + compressed_size_mb = len(buffer.getvalue()) / (1024 * 1024) + + if compressed_size_mb <= max_size_mb or quality <= 10: + # Size is acceptable or we've reached minimum quality + if save_compressed: + # FIXED: Handle PyInstaller paths properly + try: + # Try to determine the proper output directory + # First check if self.output_dir is absolute and exists + if hasattr(self, 'output_dir') and self.output_dir and os.path.isabs(self.output_dir): + base_output_dir = self.output_dir + else: + # Fall back to using the directory of the source image + base_output_dir = os.path.dirname(image_path) + # Look for a typical output structure + if 'translated_images' not in base_output_dir: + # Try to find or create the translated_images directory + parent_dir = base_output_dir + while parent_dir and not os.path.exists(os.path.join(parent_dir, 'translated_images')): + new_parent = os.path.dirname(parent_dir) + if new_parent == parent_dir: # Reached root + break + parent_dir = new_parent + + if parent_dir and os.path.exists(os.path.join(parent_dir, 'translated_images')): + base_output_dir = parent_dir + else: + # Create translated_images in the same directory as the source + base_output_dir = os.path.dirname(image_path) + + compressed_dir = os.path.join(base_output_dir, "translated_images", "compressed") + + # Ensure the directory exists with proper error handling + try: + os.makedirs(compressed_dir, exist_ok=True) + except OSError as e: + print(f" โ ๏ธ Failed to create compressed directory: {e}") + # Fall back to source image directory + compressed_dir = os.path.join(os.path.dirname(image_path), "compressed") + os.makedirs(compressed_dir, exist_ok=True) + + base_name = os.path.basename(image_path) + name, original_ext = os.path.splitext(base_name) + + # Add source format info to filename if converting from GIF + if is_gif and target_format != 'gif': + name = f"{name}_from_gif" + + ext = '.webp' if target_format == 'webp' else f'.{target_format}' + output_path = os.path.join(compressed_dir, f"{name}_compressed{ext}") + + # Write the file with proper error handling + try: + with open(output_path, 'wb') as f: + f.write(buffer.getvalue()) + print(f" ๐พ Saved compressed image: {output_path}") + except OSError as e: + print(f" โ Failed to save compressed image: {e}") + # Fall back to temporary file + raise # This will trigger the temporary file fallback below + + except Exception as e: + print(f" โ ๏ธ Failed to save to permanent location: {e}") + # Fall back to temporary file + import tempfile + ext = '.webp' if target_format == 'webp' else f'.{target_format}' + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(buffer.getvalue()) + output_path = tmp.name + print(f" ๐ Created temp compressed image instead") + else: + # Save to temporary file + import tempfile + ext = '.webp' if target_format == 'webp' else f'.{target_format}' + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(buffer.getvalue()) + output_path = tmp.name + + print(f" ๐ Created temp compressed image") + + compression_ratio = (1 - compressed_size_mb / original_size_mb) * 100 + if compression_ratio > 0: + print(f" โ Compressed: {original_size_mb:.2f}MB โ {compressed_size_mb:.2f}MB " + f"({compression_ratio:.1f}% reduction, quality: {quality})") + else: + print(f" โ ๏ธ Compression increased size: {original_size_mb:.2f}MB โ {compressed_size_mb:.2f}MB " + f"({abs(compression_ratio):.1f}% larger, quality: {quality})") + + # Special note for GIF conversions + if is_gif: + print(f" ๐๏ธ GIF converted to {target_format.upper()} for better compression") + + return output_path + + # Reduce quality and try again + quality -= 5 + print(f" ๐ Size {compressed_size_mb:.2f}MB > target {max_size_mb:.2f}MB, " + f"reducing quality to {quality}") + + # If we couldn't meet the target, return the best we got + print(f" โ ๏ธ Could not meet size target, using minimum quality") + return output_path if output_path else image_path + + except Exception as e: + print(f" โ Compression failed: {e}") + import traceback + traceback.print_exc() + return image_path # Return original on error + + def _process_image_with_compression(self, image_path, context, check_stop_fn): + """Process image with optional compression before translation""" + try: + # Apply compression if enabled + if os.getenv("ENABLE_IMAGE_COMPRESSION", "0") == "1": + compressed_path = self.compress_image(image_path) + if compressed_path != image_path: + # Use compressed image for translation + result = self._process_single_image_original(compressed_path, context, check_stop_fn) + + # Clean up temp file if needed + if not os.getenv("SAVE_COMPRESSED_IMAGES", "0") == "1": + try: + os.unlink(compressed_path) + except: + pass + + return result + + # No compression, use original method + return self._process_single_image_original(image_path, context, check_stop_fn) + + except Exception as e: + print(f" โ Error in image processing: {e}") + return None + + def _process_image_chunks_single_api(self, img, width, height, context, check_stop_fn): + """Process all image chunks in a single API call with compression support""" + + num_chunks = (height + self.chunk_height - 1) // self.chunk_height + overlap_percentage = float(os.getenv('IMAGE_CHUNK_OVERLAP_PERCENT', '1')) + overlap = int(self.chunk_height * (overlap_percentage / 100)) + + print(" ๐ Using SINGLE API CALL mode for " + str(num_chunks) + " chunks") + print(f" ๐ Chunk overlap: {overlap_percentage}% ({overlap} pixels)") + #print(" ๐ This is more efficient and produces better translations") + #print(" โณ Estimated time: 30-90 seconds total") + + # Check for stop at the very beginning + if check_stop_fn and check_stop_fn(): + print(" โ Image translation stopped by user") + return None + + # Load progress for resumability + prog = self.load_progress() + image_basename = os.path.basename(self.current_image_path) if hasattr(self, 'current_image_path') else str(hash(str(img))) + + # Detect original image format from filename or image + original_format = 'png' # default + if hasattr(self, 'current_image_path'): + ext = os.path.splitext(self.current_image_path)[1].lower() + if ext in ['.gif', '.jpg', '.jpeg', '.png', '.webp']: + original_format = ext[1:] # Remove the dot + if original_format == 'jpg': + original_format = 'jpeg' + + # Check if we should preserve original format + preserve_original_format = os.getenv("PRESERVE_ORIGINAL_FORMAT", "0") == "1" + + # Try to extract chapter number + chapter_num = None + if hasattr(self, 'current_chapter_num'): + chapter_num = self.current_chapter_num + else: + import re + match = re.search(r'ch(?:apter)?[\s_-]*(\d+)', image_basename, re.IGNORECASE) + if match: + chapter_num = match.group(1) + + # Create unique key + if chapter_num: + image_key = "ch" + str(chapter_num) + "_" + image_basename + else: + image_key = image_basename + + # Check if already processed + if "single_api_chunks" not in prog: + prog["single_api_chunks"] = {} + + if image_key in prog["single_api_chunks"] and prog["single_api_chunks"][image_key].get("completed"): + print(" โญ๏ธ Image already translated, using cached result") + return prog["single_api_chunks"][image_key]["translation"] + + # Prepare chunks + try: + content_parts = [] + + print(" ๐ฆ Preparing " + str(num_chunks) + " image chunks...") + + # Check if we should save debug images + save_cleaned = os.getenv('SAVE_CLEANED_IMAGES', '0') == '1' + if save_cleaned: + debug_dir = os.path.join(self.output_dir, "translated_images", "debug_chunks") + os.makedirs(debug_dir, exist_ok=True) + print(" ๐ Debug mode: Saving chunks to " + debug_dir) + + # Create subdirectory for compressed chunks + compressed_debug_dir = os.path.join(debug_dir, "compressed") + os.makedirs(compressed_debug_dir, exist_ok=True) + + # Check if compression is enabled + compression_enabled = os.getenv("ENABLE_IMAGE_COMPRESSION", "0") == "1" + total_uncompressed_size = 0 + total_compressed_size = 0 + + # Temporarily set the original format in environment for _image_to_bytes_with_compression + old_env_format = os.environ.get("ORIGINAL_IMAGE_FORMAT", "") + if preserve_original_format and original_format: + os.environ["ORIGINAL_IMAGE_FORMAT"] = original_format + + for i in range(num_chunks): + # Check for stop during preparation + if check_stop_fn and check_stop_fn(): + print(" โ Stopped while preparing chunk " + str(i+1) + "/" + str(num_chunks)) + # Restore environment + if old_env_format: + os.environ["ORIGINAL_IMAGE_FORMAT"] = old_env_format + elif "ORIGINAL_IMAGE_FORMAT" in os.environ: + del os.environ["ORIGINAL_IMAGE_FORMAT"] + return None + + # Calculate chunk boundaries with overlap + start_y = max(0, i * self.chunk_height - (overlap if i > 0 else 0)) + end_y = min(height, (i + 1) * self.chunk_height) + + # Crop the chunk + chunk = img.crop((0, start_y, width, end_y)) + + # Save uncompressed debug chunk if enabled + if save_cleaned: + # Use original format for debug chunks if preserving format + if preserve_original_format and original_format == 'gif': + chunk_ext = 'gif' + # Need to convert to palette mode for GIF + if chunk.mode not in ('P', 'L'): + chunk_to_save = chunk.quantize(colors=256, method=2) # MEDIANCUT + else: + chunk_to_save = chunk + else: + chunk_ext = 'png' + chunk_to_save = chunk + + chunk_filename = image_key + "_chunk_" + str(i+1) + "_of_" + str(num_chunks) + "_y" + str(start_y) + "-" + str(end_y) + "." + chunk_ext + chunk_path = os.path.join(debug_dir, chunk_filename) + + if chunk_ext == 'gif': + chunk_to_save.save(chunk_path, "GIF", optimize=True) + else: + chunk_to_save.save(chunk_path, "PNG") + + print(" ๐พ Saved debug chunk: " + chunk_filename) + + # Get uncompressed size + uncompressed_size = os.path.getsize(chunk_path) + total_uncompressed_size += uncompressed_size + + # Convert chunk to bytes with compression if enabled + if compression_enabled: + print(f" ๐๏ธ Compressing chunk {i+1}/{num_chunks}...") + + # Use the compression method + chunk_bytes = self._image_to_bytes_with_compression(chunk) + + # Determine format based on compression settings + format_setting = os.getenv("IMAGE_COMPRESSION_FORMAT", "auto") + if format_setting == "auto": + if preserve_original_format and original_format == 'gif': + # If original was GIF and we're preserving format, use GIF + format_used = 'gif' + else: + # Check if chunk has transparency + has_transparency = chunk.mode in ('RGBA', 'LA') or (chunk.mode == 'P' and 'transparency' in chunk.info) + preserve_transparency = os.getenv("PRESERVE_TRANSPARENCY", "0") == "1" + if has_transparency and preserve_transparency: + format_used = 'png' + else: + format_used = 'webp' # Default to WebP for best compression + else: + format_used = format_setting + + # Calculate compression stats + compressed_size = len(chunk_bytes) + if save_cleaned: + # Get the actual original size of the chunk before compression + original_chunk_buffer = io.BytesIO() + chunk.save(original_chunk_buffer, format='PNG') + actual_original_size = len(original_chunk_buffer.getvalue()) + compression_ratio = (1 - compressed_size / actual_original_size) * 100 + print(f" ๐ Chunk {i+1}: {uncompressed_size:,} โ {compressed_size:,} bytes ({compression_ratio:.1f}% reduction, format: {format_used.upper()})") + total_compressed_size += compressed_size + + # Save compressed chunk for debugging + compressed_chunk_filename = image_key + "_chunk_" + str(i+1) + "_compressed." + format_used.lower() + compressed_chunk_path = os.path.join(compressed_debug_dir, compressed_chunk_filename) + with open(compressed_chunk_path, 'wb') as f: + f.write(chunk_bytes) + print(f" ๐พ Saved compressed chunk: {compressed_chunk_filename}") + else: + # No compression - use original format if preserving, otherwise PNG + if preserve_original_format and original_format == 'gif': + chunk_bytes = self._image_to_bytes(chunk, format='GIF') + format_used = 'gif' + else: + chunk_bytes = self._image_to_bytes(chunk, format='PNG') + format_used = 'png' + + if save_cleaned: + total_compressed_size += len(chunk_bytes) + + # Convert to base64 + chunk_base64 = base64.b64encode(chunk_bytes).decode('utf-8') + + # Add image to content with appropriate format + content_parts.append({ + "type": "image_url", + "image_url": { + "url": f"data:image/{format_used.lower()};base64," + chunk_base64 + } + }) + + # Restore original environment variable + if old_env_format: + os.environ["ORIGINAL_IMAGE_FORMAT"] = old_env_format + elif "ORIGINAL_IMAGE_FORMAT" in os.environ: + del os.environ["ORIGINAL_IMAGE_FORMAT"] + + # Count the number of images in content_parts + num_images = sum(1 for part in content_parts if part.get("type") == "image_url") + + # Show overall compression stats if enabled + if compression_enabled and save_cleaned and total_uncompressed_size > 0: + overall_compression = (1 - total_compressed_size / total_uncompressed_size) * 100 + print(f"\n ๐ Overall compression stats:") + print(f" Total uncompressed: {total_uncompressed_size:,} bytes ({total_uncompressed_size / 1024 / 1024:.2f} MB)") + print(f" Total compressed: {total_compressed_size:,} bytes ({total_compressed_size / 1024 / 1024:.2f} MB)") + print(f" Reduction: {overall_compression:.1f}%") + print(f" Savings: {(total_uncompressed_size - total_compressed_size):,} bytes\n") + + except Exception as e: + # Make sure to restore environment + if 'old_env_format' in locals(): + if old_env_format: + os.environ["ORIGINAL_IMAGE_FORMAT"] = old_env_format + elif "ORIGINAL_IMAGE_FORMAT" in os.environ: + del os.environ["ORIGINAL_IMAGE_FORMAT"] + + print(" โ Error preparing chunks: " + str(e)) + import traceback + traceback.print_exc() + print(" ๐ Falling back to sequential chunk processing...") + return self._process_image_chunks(img, width, height, context, check_stop_fn) + + # Calculate token estimate based on provider + if 'gemini' in self.client.model.lower(): + # Gemini charges flat 258 tokens per image + estimated_image_tokens = num_images * 258 + elif 'gpt-4' in self.client.model.lower() or 'gpt-4o' in self.client.model.lower(): + # GPT-4V uses ~85 tokens per 512x512 tile + # Adjust estimate based on compression + if compression_enabled: + # Compressed images use fewer tokens + tiles_per_chunk = max(1, (self.chunk_height * width * 0.7) // (512 * 512)) + else: + tiles_per_chunk = max(1, (self.chunk_height * width) // (512 * 512)) + estimated_image_tokens = num_images * tiles_per_chunk * 85 + elif 'claude' in self.client.model.lower(): + # Claude varies by resolution, estimate based on compression + if compression_enabled: + estimated_image_tokens = num_images * 1500 # Compressed images + else: + estimated_image_tokens = num_images * 2000 # Uncompressed + else: + # Default conservative estimate + estimated_image_tokens = num_images * 1000 + + # Calculate text tokens + text_tokens = sum(len(part.get("text", "")) for part in content_parts if part.get("type") == "text") // 4 + estimated_text_tokens = len(self.system_prompt) // 4 + text_tokens + 200 + total_estimated_tokens = estimated_image_tokens + estimated_text_tokens + + print(" ๐ Token estimate:") + print(" Number of images: " + str(num_images)) + print(" Image tokens: ~" + "{:,}".format(estimated_image_tokens) + " (model: " + self.client.model + ")") + if compression_enabled: + print(" Compression: ENABLED โ ") + print(" Text tokens: ~" + "{:,}".format(estimated_text_tokens)) + print(" Total: ~" + "{:,}".format(total_estimated_tokens) + " tokens") + + # Make the API call + try: + # Build messages + messages = [{"role": "system", "content": self.system_prompt}] + messages.append({ + "role": "user", + "content": content_parts + }) + + print("\n ๐ Sending " + str(num_chunks) + " chunks to API in single call...") + if compression_enabled: + print(" ๐๏ธ Using compressed chunks for efficient API usage") + + # Final stop check before API call + if check_stop_fn and check_stop_fn(): + print(" โ Stopped before API call") + return None + + # Use send_image_with_interrupt for interruptible API call + start_time = time.time() + + # Get timeout settings + chunk_timeout = int(os.getenv('CHUNK_TIMEOUT', '0')) + retry_timeout = os.getenv('RETRY_TIMEOUT', '0') == '1' + + # Make interruptible API call + # Since we already have images in content_parts, we need to use regular send, not send_image + try: + # Create a wrapper to make regular send interruptible + result_queue = queue.Queue() + + def api_call(): + try: + start = time.time() + result = self.client.send( + messages=messages, + temperature=self.temperature, + max_tokens=self.image_max_tokens + ) + elapsed_time = time.time() - start + result_queue.put((result, elapsed_time)) + except Exception as e: + result_queue.put(e) + + api_thread = threading.Thread(target=api_call) + api_thread.daemon = True + api_thread.start() + + # Check for completion or stop + timeout = chunk_timeout if chunk_timeout else 900 + check_interval = 0.5 + elapsed_check = 0 + + while elapsed_check < timeout: + try: + result = result_queue.get(timeout=check_interval) + if isinstance(result, Exception): + raise result + if isinstance(result, tuple): + response, elapsed_time = result + elapsed = elapsed_time + break + except queue.Empty: + if check_stop_fn and check_stop_fn(): + raise UnifiedClientError("Translation stopped by user") + elapsed_check += check_interval + else: + raise UnifiedClientError("API call timed out after " + str(timeout) + " seconds") + + except UnifiedClientError as e: + if "stopped by user" in str(e).lower(): + print(" โ Translation stopped by user during API call") + return None + elif "timed out" in str(e).lower(): + print(" โฑ๏ธ API call timed out: " + str(e)) + print(" ๐ Falling back to sequential chunk processing...") + return self._process_image_chunks(img, width, height, context, check_stop_fn) + else: + raise + + # Handle the result based on what's returned + if isinstance(response, tuple): + response, elapsed_time = response + # Handle case where elapsed_time might be 'stop' or other non-numeric + try: + elapsed = float(elapsed_time) + except (ValueError, TypeError): + elapsed = time.time() - start_time + + # Success! + print(" ๐ก API response received in " + "{:.1f}".format(elapsed) + "s") + + # Check if response is valid + if not response: + print(" โ No response from API") + print(" ๐ Falling back to sequential chunk processing...") + return self._process_image_chunks(img, width, height, context, check_stop_fn) + + # Extract content from UnifiedResponse + if hasattr(response, 'content'): + translation_response = response.content + elif hasattr(response, 'text'): + translation_response = response.text + else: + translation_response = str(response) + + # Unescape the response text if it has escaped characters + if '\\n' in translation_response or translation_response.startswith('('): + print(" ๐ง Detected escaped text, unescaping...") + translation_response = self._unescape_response_text(translation_response) + + # Check if we got actual content + if not translation_response or not translation_response.strip(): + print(" โ Empty response content from API") + print(" ๐ Falling back to sequential chunk processing...") + return self._process_image_chunks(img, width, height, context, check_stop_fn) + + # Process response + trans_finish = getattr(response, 'finish_reason', 'unknown') + + print(" ๐ก Finish reason: " + trans_finish) + print(" ๐ Response length: " + str(len(translation_response)) + " characters") + + if trans_finish in ["length", "max_tokens"]: + print(" โ ๏ธ Translation was TRUNCATED! Consider increasing Max tokens.") + translation_response += "\n\n[TRANSLATION TRUNCATED DUE TO TOKEN LIMIT]" + + # Clean translation based on REMOVE_AI_ARTIFACTS setting + if self.remove_ai_artifacts: + cleaned_translation = self._clean_translation_response(translation_response) + print(" ๐งน Cleaned translation (artifact removal enabled)") + else: + cleaned_translation = translation_response + print(" ๐ Using raw translation (artifact removal disabled)") + + # Normalize and sanitize to avoid squared/cubed glyphs + cleaned_translation = self._normalize_unicode_width(cleaned_translation) + cleaned_translation = self._sanitize_unicode_characters(cleaned_translation) + + if not cleaned_translation: + print(" โ No text extracted from response after cleaning") + print(" ๐ Falling back to sequential chunk processing...") + return self._process_image_chunks(img, width, height, context, check_stop_fn) + + # Save to progress + if "single_api_chunks" not in prog: + prog["single_api_chunks"] = {} + + prog["single_api_chunks"][image_key] = { + "completed": True, + "translation": cleaned_translation, + "chunks": num_chunks, + "overlap": overlap, + "compression_enabled": compression_enabled, + "original_format": original_format, + "timestamp": time.time() + } + self.save_progress(prog) + + print(" โ Translation complete (" + str(len(cleaned_translation)) + " chars)") + return cleaned_translation + + except Exception as e: + error_str = str(e) + error_msg = error_str.lower() + + # Log the full error + print(" โ API Error: " + error_str) + import traceback + traceback.print_exc() + + # Check for stop + if "stopped by user" in error_msg or (check_stop_fn and check_stop_fn()): + print(" โ Translation stopped by user") + return None + + # For any API error at this point, fall back to sequential + print(" ๐ Single API call failed, falling back to sequential chunk processing...") + return self._process_image_chunks(img, width, height, context, check_stop_fn) + + def should_translate_image(self, image_path: str, check_illustration: bool = True) -> bool: + """ + Determine if an image should be translated based on various heuristics + + Args: + image_path: Path to the image file + check_illustration: Whether to check if it's likely an illustration + + Returns: + True if image likely contains translatable text + """ + # Skip if already processed + if image_path in self.processed_images: + return False + + # Check file extension - ADD GIF SUPPORT + ext = os.path.splitext(image_path)[1].lower() + if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']: + return False + + # Check file size (skip very small images) + if os.path.exists(image_path): + size = os.path.getsize(image_path) + if size < 5000: # Less than 5KB (lowered threshold for GIFs) + return False + + # For GIF files from web novels, always process them + if ext == '.gif' and 'chapter' in os.path.basename(image_path).lower(): + print(f" ๐ Web novel GIF detected: {os.path.basename(image_path)}") + return True + + # Check file size (skip very small images) + if os.path.exists(image_path): + size = os.path.getsize(image_path) + if size < 10000: # Less than 10KB + return False + + # Check image dimensions + try: + with Image.open(image_path) as img: + width, height = img.size + # Skip very small images (likely icons) + if width < 100 or height < 100: + return False + + # Calculate aspect ratio + aspect_ratio = width / height + + # Check for web novel/long text images (very tall, narrow images) + if self.process_webnovel and height > self.webnovel_min_height and aspect_ratio < 0.5: + # This is likely a web novel chapter or long text screenshot + print(f" ๐ Web novel/long text image detected: {os.path.basename(image_path)}") + return True + + # Skip OTHER extreme aspect ratios (but not tall text images) + if aspect_ratio > 5: # Very wide images + return False + + # Additional check for illustrations (typically larger, square-ish images) + if check_illustration: + # Large images with normal aspect ratios are often illustrations + if width > 800 and height > 600 and 0.5 < aspect_ratio < 2: + # Check filename for illustration indicators + filename = os.path.basename(image_path).lower() + illustration_indicators = [ + 'illust', 'illustration', 'art', 'artwork', 'drawing', + 'painting', 'sketch', 'design', 'visual', 'graphic', + 'image', 'picture', 'fig', 'figure', 'plate' + ] + + # If filename suggests it's an illustration, skip + for indicator in illustration_indicators: + if indicator in filename: + print(f" ๐ Skipping likely illustration: {filename}") + return False + + except Exception: + return False + + # Check filename patterns that suggest text content + filename = os.path.basename(image_path).lower() + + # Strong indicators of text content (including web novel patterns) + text_indicators = [ + 'text', 'title', 'chapter', 'page', 'dialog', 'dialogue', + 'bubble', 'sign', 'note', 'letter', 'message', 'notice', + 'banner', 'caption', 'subtitle', 'heading', 'label', + 'menu', 'interface', 'ui', 'screen', 'display', + 'novel', 'webnovel', 'lightnovel', 'wn', 'ln', # Web novel indicators + 'chap', 'ch', 'episode', 'ep' # Chapter indicators + ] + + # Strong indicators to skip + skip_indicators = [ + 'cover', 'logo', 'decoration', 'ornament', 'border', + 'background', 'wallpaper', 'texture', 'pattern', + 'icon', 'button', 'avatar', 'profile', 'portrait', + 'landscape', 'scenery', 'character', 'hero', 'heroine' + ] + + # Check for text indicators + for indicator in text_indicators: + if indicator in filename: + print(f" ๐ Text-likely image detected: {filename}") + return True + + # Check for skip indicators + for indicator in skip_indicators: + if indicator in filename: + print(f" ๐จ Skipping decorative/character image: {filename}") + return False + + # For ambiguous cases, if it's a tall image, assume it might be text + try: + with Image.open(image_path) as img: + width, height = img.size + if height > width * 2: # Height is more than twice the width + print(f" ๐ Tall image detected, assuming possible text content") + return True + except: + pass + + # Default to False to avoid processing regular illustrations + return False + + def load_progress(self): + """Load progress tracking for image chunks""" + if self.progress_manager: + # Use the shared progress manager's data + prog = self.progress_manager.prog.copy() + # Ensure image_chunks key exists + if "image_chunks" not in prog: + prog["image_chunks"] = {} + return prog + else: + # Fallback to original behavior if no progress manager provided + progress_file = os.path.join(self.output_dir, "translation_progress.json") + if os.path.exists(progress_file): + try: + with open(progress_file, 'r', encoding='utf-8') as f: + prog = json.load(f) + # Ensure image_chunks key exists + if "image_chunks" not in prog: + prog["image_chunks"] = {} + return prog + except Exception as e: + print(f"โ ๏ธ Warning: Could not load progress file: {e}") + # Return minimal structure to avoid breaking + return { + "chapters": {}, + "content_hashes": {}, + "chapter_chunks": {}, + "image_chunks": {}, + "version": "2.1" + } + # Return the same structure as TranslateKRtoEN expects + return { + "chapters": {}, + "content_hashes": {}, + "chapter_chunks": {}, + "image_chunks": {}, + "version": "2.1" + } + + def save_progress(self, prog): + """Save progress tracking - with safe writing""" + if self.progress_manager: + # Update the shared progress manager's data + self.progress_manager.prog["image_chunks"] = prog.get("image_chunks", {}) + # Save through the progress manager + self.progress_manager.save() + else: + # Fallback to original behavior if no progress manager provided + progress_file = os.path.join(self.output_dir, "translation_progress.json") + try: + # Write to a temporary file first + temp_file = progress_file + '.tmp' + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(prog, f, ensure_ascii=False, indent=2) + + # If successful, replace the original file + if os.path.exists(progress_file): + os.remove(progress_file) + os.rename(temp_file, progress_file) + except Exception as e: + print(f"โ ๏ธ Warning: Failed to save progress: {e}") + # Clean up temp file if it exists + if os.path.exists(temp_file): + try: + os.remove(temp_file) + except: + pass + + def preprocess_image_for_watermarks(self, image_path: str) -> str: + """ + Enhanced preprocessing for watermark removal and text clarity + Now returns path to processed image instead of bytes + + Args: + image_path: Path to the image file + + Returns: + Path to processed image (either cleaned permanent file or original) + """ + try: + # Check if watermark removal is enabled + if not os.getenv("ENABLE_WATERMARK_REMOVAL", "1") == "1": + return image_path # Return original path + + print(f" ๐งน Preprocessing image for watermark removal...") + + # Open image + img = Image.open(image_path) + + # Convert to RGB if necessary + if img.mode not in ('RGB', 'RGBA'): + img = img.convert('RGB') + + # Check if advanced watermark removal is enabled AND cv2 is available + if os.getenv("ADVANCED_WATERMARK_REMOVAL", "0") == "1": + if CV2_AVAILABLE: + print(f" ๐ฌ Using advanced watermark removal...") + + # Convert to numpy array for advanced processing + img_array = np.array(img) + + # These will safely return defaults if cv2 is not available + has_pattern, pattern_mask = self._detect_watermark_pattern(img_array) + if has_pattern: + print(f" ๐ Detected watermark pattern in image") + img_array = self._remove_periodic_watermark(img_array, pattern_mask) + + img_array = self._adaptive_histogram_equalization(img_array) + img_array = self._bilateral_filter(img_array) + img_array = self._enhance_text_regions(img_array) + + # Convert back to PIL Image + img = Image.fromarray(img_array) + else: + print(f" โ ๏ธ Advanced watermark removal requested but OpenCV not available") + + # Apply basic PIL enhancements (always works) + enhancer = ImageEnhance.Contrast(img) + img = enhancer.enhance(1.5) + + enhancer = ImageEnhance.Brightness(img) + img = enhancer.enhance(1.1) + + img = img.filter(ImageFilter.SHARPEN) + + # Check if we should save cleaned images + save_cleaned = os.getenv("SAVE_CLEANED_IMAGES", "1") == "1" + + if save_cleaned: + # Save to permanent location + cleaned_dir = os.path.join(self.translated_images_dir, "cleaned") + os.makedirs(cleaned_dir, exist_ok=True) + + base_name = os.path.basename(image_path) + name, ext = os.path.splitext(base_name) + cleaned_path = os.path.join(cleaned_dir, f"{name}_cleaned{ext}") + + img.save(cleaned_path, optimize=True) + print(f" ๐พ Saved cleaned image: {cleaned_path}") + + return cleaned_path # Return path to cleaned image + else: + # Save to temporary file + import tempfile + _, ext = os.path.splitext(image_path) + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + img.save(tmp.name, optimize=False) + print(f" ๐ Created temp cleaned image") + return tmp.name # Return temp path + + except Exception as e: + logger.warning(f"Could not preprocess image: {e}") + return image_path # Return original on error + + @requires_cv2 + def _detect_watermark_pattern(self, img_array: np.ndarray) -> Tuple[bool, Optional[np.ndarray]]: + """Detect repeating watermark patterns using FFT""" + try: + # Convert to grayscale for pattern detection + if len(img_array.shape) == 3: + gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) + else: + gray = img_array + + # Apply FFT to detect periodicity + f_transform = np.fft.fft2(gray) + f_shift = np.fft.fftshift(f_transform) + magnitude = np.log(np.abs(f_shift) + 1) # Log scale for better visualization + + # Look for peaks that indicate repeating patterns + mean_mag = np.mean(magnitude) + std_mag = np.std(magnitude) + threshold = mean_mag + 2 * std_mag + + # Create binary mask of high-frequency components + pattern_mask = magnitude > threshold + + # Exclude center (DC component) - represents average brightness + center_y, center_x = pattern_mask.shape[0] // 2, pattern_mask.shape[1] // 2 + pattern_mask[center_y-10:center_y+10, center_x-10:center_x+10] = False + + # Count significant peaks + pattern_threshold = int(os.getenv("WATERMARK_PATTERN_THRESHOLD", "10")) + peak_count = np.sum(pattern_mask) + + # If we have significant peaks, there's likely a repeating pattern + has_pattern = peak_count > pattern_threshold + + return has_pattern, pattern_mask if has_pattern else None + + except Exception as e: + logger.warning(f"Pattern detection failed: {e}") + return False, None + + @requires_cv2 + def _remove_periodic_watermark(self, img_array: np.ndarray, pattern_mask: np.ndarray) -> np.ndarray: + """Remove periodic watermark using frequency domain filtering""" + try: + result = img_array.copy() + + # Process each color channel + for channel in range(img_array.shape[2] if len(img_array.shape) == 3 else 1): + if len(img_array.shape) == 3: + gray = img_array[:, :, channel] + else: + gray = img_array + + # Apply FFT + f_transform = np.fft.fft2(gray) + f_shift = np.fft.fftshift(f_transform) + + # Apply notch filter to remove periodic components + f_shift[pattern_mask] = 0 + + # Inverse FFT + f_ishift = np.fft.ifftshift(f_shift) + img_filtered = np.fft.ifft2(f_ishift) + img_filtered = np.real(img_filtered) + + # Ensure values are in valid range + img_filtered = np.clip(img_filtered, 0, 255) + + if len(img_array.shape) == 3: + result[:, :, channel] = img_filtered + else: + result = img_filtered + + return result.astype(np.uint8) + + except Exception as e: + logger.warning(f"Watermark removal failed: {e}") + return img_array + + @requires_cv2 + def _adaptive_histogram_equalization(self, img_array: np.ndarray) -> np.ndarray: + """Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)""" + try: + # Convert to LAB color space for better results + lab = cv2.cvtColor(img_array, cv2.COLOR_RGB2LAB) + + # Split channels + l, a, b = cv2.split(lab) + + # Apply CLAHE to L channel only + clahe_limit = float(os.getenv("WATERMARK_CLAHE_LIMIT", "3.0")) + clahe = cv2.createCLAHE(clipLimit=clahe_limit, tileGridSize=(8, 8)) + l = clahe.apply(l) + + # Merge channels back + lab = cv2.merge([l, a, b]) + + # Convert back to RGB + enhanced = cv2.cvtColor(lab, cv2.COLOR_LAB2RGB) + + return enhanced + + except Exception as e: + logger.warning(f"Adaptive histogram equalization failed: {e}") + return img_array + + @requires_cv2 + def _bilateral_filter(self, img_array: np.ndarray) -> np.ndarray: + """Apply bilateral filter for edge-preserving denoising""" + try: + # Bilateral filter removes noise while keeping edges sharp + filtered = cv2.bilateralFilter( + img_array, + d=9, + sigmaColor=75, + sigmaSpace=75 + ) + return filtered + + except Exception as e: + logger.warning(f"Bilateral filtering failed: {e}") + return img_array + + @requires_cv2 + def _enhance_text_regions(self, img_array: np.ndarray) -> np.ndarray: + """Specifically enhance regions likely to contain text""" + try: + # Convert to grayscale for text detection + gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) + + # Step 1: Detect text regions using gradient analysis + grad_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) + grad_y = cv2.Sobel(gray, cv2.CV_64F, 0, 1, ksize=3) + gradient_magnitude = np.sqrt(grad_x**2 + grad_y**2) + + # Normalize gradient + gradient_magnitude = (gradient_magnitude / gradient_magnitude.max() * 255).astype(np.uint8) + + # Step 2: Create text probability mask + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) + gradient_density = cv2.morphologyEx(gradient_magnitude, cv2.MORPH_CLOSE, kernel) + + # Threshold to get text regions + _, text_mask = cv2.threshold(gradient_density, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) + + # Dilate to connect text regions + text_mask = cv2.dilate(text_mask, kernel, iterations=2) + + # Step 3: Enhance contrast in text regions + enhanced = img_array.copy() + + # Create 3-channel mask + text_mask_3ch = cv2.cvtColor(text_mask, cv2.COLOR_GRAY2RGB) / 255.0 + + # Apply enhancement only to text regions + enhanced = enhanced.astype(np.float32) + enhanced = enhanced * (1 + (0.2 * text_mask_3ch)) # 20% enhancement in text regions + enhanced = np.clip(enhanced, 0, 255).astype(np.uint8) + + return enhanced + + except Exception as e: + logger.warning(f"Text region enhancement failed: {e}") + return img_array + + def translate_image(self, image_path: str, context: str = "", check_stop_fn=None) -> Optional[str]: + """ + Translate text in an image using vision API - with chunking for tall images and stop support + """ + processed_path = None + compressed_path = None + + try: + self.current_image_path = image_path + print(f" ๐ translate_image called for: {image_path}") + + # Check for stop at the beginning + if check_stop_fn and check_stop_fn(): + print(" โ Image translation stopped by user") + return None + + if not os.path.exists(image_path): + logger.warning(f"Image not found: {image_path}") + print(f" โ Image file does not exist!") + return None + + # Get configuration + hide_label = os.getenv("HIDE_IMAGE_TRANSLATION_LABEL", "0") == "1" + + # Apply compression FIRST if enabled + compressed_path = image_path + if os.getenv("ENABLE_IMAGE_COMPRESSION", "0") == "1": + compressed_path = self.compress_image(image_path) + # If compression produced a different file, use it + if compressed_path != image_path: + print(f" ๐๏ธ Using compressed image for translation") + + # Apply watermark preprocessing (on compressed image if applicable) + processed_path = self.preprocess_image_for_watermarks(compressed_path) + + # Open and process the image (now using processed_path) + with Image.open(processed_path) as img: + width, height = img.size + aspect_ratio = width / height if height > 0 else 1 + print(f" ๐ Image dimensions: {width}x{height}, aspect ratio: {aspect_ratio:.2f}") + + # Convert to RGB if necessary + if img.mode not in ('RGB', 'RGBA'): + img = img.convert('RGB') + + # Determine if it's a long text image + is_long_text = height > self.webnovel_min_height and aspect_ratio < 0.5 + + # Process chunks or single image + if height > self.chunk_height: + # Check if single API mode is enabled + if os.getenv("SINGLE_API_IMAGE_CHUNKS", "1") == "1": + translated_text = self._process_image_chunks_single_api(img, width, height, context, check_stop_fn) + else: + translated_text = self._process_image_chunks(img, width, height, context, check_stop_fn) + else: + translated_text = self._process_single_image(img, context, check_stop_fn) + + if not translated_text: + return None + + # Store the result for caching (use original path as key) + self.processed_images[image_path] = translated_text + + # Save translation for debugging + self._save_translation_debug(image_path, translated_text) + + # Create HTML output - use processed_path for the image reference + # Handle cross-drive paths on Windows + try: + img_rel_path = os.path.relpath(processed_path, self.output_dir) + except ValueError as e: + # This happens when paths are on different drives in Windows + print(f" โ ๏ธ Cross-drive path detected, copying image to output directory") + + # Copy the processed image to the output directory's images folder + import shutil + images_output_dir = os.path.join(self.output_dir, "images") + os.makedirs(images_output_dir, exist_ok=True) + + # Generate a unique filename to avoid conflicts + base_name = os.path.basename(processed_path) + dest_path = os.path.join(images_output_dir, base_name) + + # Handle potential naming conflicts + if os.path.exists(dest_path): + name, ext = os.path.splitext(base_name) + counter = 1 + while os.path.exists(dest_path): + dest_path = os.path.join(images_output_dir, f"{name}_{counter}{ext}") + counter += 1 + + # Copy the file + shutil.copy2(processed_path, dest_path) + print(f" ๐ Copied image to: {dest_path}") + + # Calculate relative path from the copied location + img_rel_path = os.path.relpath(dest_path, self.output_dir) + + # Update processed_path for cleanup logic + processed_path = dest_path + + html_output = self._create_html_output(img_rel_path, translated_text, is_long_text, + hide_label, check_stop_fn and check_stop_fn()) + + return html_output + + except Exception as e: + logger.error(f"Error translating image {image_path}: {e}") + print(f" โ Exception in translate_image: {e}") + import traceback + traceback.print_exc() + return None + + finally: + # Clean up temp files if they were created + # Clean up compressed file if it's temporary + if compressed_path and compressed_path != image_path: + if not os.getenv("SAVE_COMPRESSED_IMAGES", "0") == "1": + try: + if os.path.exists(compressed_path): + os.unlink(compressed_path) + print(f" ๐งน Cleaned up temp compressed file") + except Exception as e: + logger.warning(f"Could not delete temp compressed file: {e}") + + # Clean up processed file if it's temporary + if processed_path and processed_path != image_path and processed_path != compressed_path: + if not os.getenv("SAVE_CLEANED_IMAGES", "0") == "1": + try: + if os.path.exists(processed_path): + os.unlink(processed_path) + print(f" ๐งน Cleaned up temp processed file") + except Exception as e: + logger.warning(f"Could not delete temp processed file: {e}") + + + def _process_single_image(self, img, context, check_stop_fn): + """Process a single image that doesn't need chunking""" + + # Clear any previous context + self.image_chunk_context = [] + + print(f" ๐ Image height OK ({img.height}px), processing as single image...") + + # Check for stop before processing + if check_stop_fn and check_stop_fn(): + print(" โ Image translation stopped by user") + return None + + # Convert image to bytes using compression settings + image_bytes = self._image_to_bytes_with_compression(img) + + # Call API + translation = self._call_vision_api(image_bytes, context, check_stop_fn) + + if translation: + if self.remove_ai_artifacts: + translation = self._clean_translation_response(translation) + # Normalize and sanitize output + translation = self._normalize_unicode_width(translation) + translation = self._sanitize_unicode_characters(translation) + return translation + else: + print(f" โ Translation returned empty result") + return None + + + def _image_to_bytes_with_compression(self, img): + """Convert PIL Image to bytes with compression settings applied""" + # Check if compression is enabled + if os.getenv("ENABLE_IMAGE_COMPRESSION", "0") == "1": + # Get compression settings + format_setting = os.getenv("IMAGE_COMPRESSION_FORMAT", "auto") + webp_quality = int(os.getenv("WEBP_QUALITY", "85")) + jpeg_quality = int(os.getenv("JPEG_QUALITY", "85")) + png_compression = int(os.getenv("PNG_COMPRESSION", "6")) + preserve_transparency = os.getenv("PRESERVE_TRANSPARENCY", "0") == "1" + optimize_for_ocr = os.getenv("OPTIMIZE_FOR_OCR", "1") == "1" + + # Store original mode for GIF handling + original_mode = img.mode + transparency_info = None + + # Check if this is a chunk from a GIF (palette mode) + is_gif_chunk = img.mode in ('P', 'L') + + # Apply OCR optimization if enabled + if optimize_for_ocr: + # Handle GIF chunks in palette mode + if is_gif_chunk: + print(f" ๐จ Chunk is in {img.mode} mode - converting for optimization") + + if img.mode == 'P': + # Preserve transparency info if present + transparency_info = img.info.get('transparency', None) + # Convert to RGBA if has transparency, otherwise RGB + if transparency_info is not None: + img = img.convert('RGBA') + else: + img = img.convert('RGB') + elif img.mode == 'L': + img = img.convert('RGB') + + # Apply enhancements (now safe for all modes) + from PIL import ImageEnhance + enhancer = ImageEnhance.Contrast(img) + img = enhancer.enhance(1.2) + enhancer = ImageEnhance.Sharpness(img) + img = enhancer.enhance(1.1) + + # Extra sharpening for GIF-sourced chunks + if is_gif_chunk: + img = enhancer.enhance(1.2) + print(f" โจ Applied extra sharpening for GIF-sourced chunk") + + # Auto-select format if needed + if format_setting == "auto": + # Check if we should preserve original format + preserve_original_format = os.getenv("PRESERVE_ORIGINAL_FORMAT", "0") == "1" + original_format = os.getenv("ORIGINAL_IMAGE_FORMAT", "").lower() + + # If preserving format and we know the original format + if preserve_original_format and original_format: + if original_format == 'gif': + format_setting = 'gif' + print(f" ๐๏ธ Preserving GIF format for chunk") + elif original_format in ['png', 'jpeg', 'jpg', 'webp']: + format_setting = original_format.replace('jpg', 'jpeg') + print(f" ๐ธ Preserving {format_setting.upper()} format for chunk") + else: + # Fallback to PNG for unknown formats + format_setting = "png" + print(f" ๐ธ Using PNG for chunk (unknown original format: {original_format})") + # Legacy fallback: If chunk is in palette mode and preserve format is on, assume GIF + elif preserve_original_format and is_gif_chunk: + format_setting = 'gif' + print(f" ๐๏ธ Preserving GIF format for chunk (palette mode detected)") + else: + # Check image characteristics for auto-selection + has_transparency = img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info) + + # For chunks, prefer WebP for best compression unless transparency is needed + if has_transparency and preserve_transparency: + format_setting = "png" # PNG for transparency + else: + format_setting = "webp" # WebP for best compression + + print(f" ๐ฏ Auto-selected format for chunk: {format_setting}") + + # Use the selected format with compression + if format_setting == "webp": + print(f" ๐๏ธ Compressing chunk as WebP (quality: {webp_quality})") + return self._image_to_bytes(img, format='WEBP', quality=webp_quality) + elif format_setting == "jpeg": + print(f" ๐๏ธ Compressing chunk as JPEG (quality: {jpeg_quality})") + return self._image_to_bytes(img, format='JPEG', quality=jpeg_quality) + elif format_setting == "png": + # PNG uses compression level, not quality + print(f" ๐๏ธ Compressing chunk as PNG (compression: {png_compression})") + img_bytes = io.BytesIO() + img.save(img_bytes, format='PNG', compress_level=png_compression, optimize=True) + img_bytes.seek(0) + data = img_bytes.read() + + # Log compression info + print(f" ๐ Chunk size: {len(data) / 1024:.1f}KB") + return data + elif format_setting == "gif": + # GIF format for chunks + print(f" ๐๏ธ Saving chunk as GIF") + img_bytes = io.BytesIO() + # Convert to palette mode if needed + if img.mode not in ('P', 'L'): + img = img.quantize(colors=256, method=2) # MEDIANCUT + img.save(img_bytes, format='GIF', optimize=True) + img_bytes.seek(0) + data = img_bytes.read() + + # Log compression info + print(f" ๐ Chunk size: {len(data) / 1024:.1f}KB") + return data + + # Default: use existing method without compression + print(f" โ ๏ธ Compression disabled, using default PNG format") + return self._image_to_bytes(img) + + def _image_to_bytes(self, img, format='PNG', quality=None): + """Convert PIL Image to bytes with various format options""" + img_bytes = io.BytesIO() + + if format == 'WEBP': + # WebP is much better for manga/text images + # Ensure RGB mode for WebP (no RGBA in some cases) + if img.mode == 'RGBA' and not os.getenv("PRESERVE_TRANSPARENCY", "0") == "1": + # Create white background + background = Image.new('RGB', img.size, (255, 255, 255)) + background.paste(img, mask=img.split()[3]) + img = background + elif img.mode not in ['RGB', 'L', 'RGBA']: + img = img.convert('RGB') + + if quality: + img.save(img_bytes, format='WEBP', quality=quality, method=6) + else: + img.save(img_bytes, format='WEBP', lossless=True) + elif format == 'JPEG': + # JPEG doesn't support transparency, so convert RGBA to RGB + if img.mode == 'RGBA': + # Create white background + background = Image.new('RGB', img.size, (255, 255, 255)) + background.paste(img, mask=img.split()[3]) + img = background + elif img.mode != 'RGB': + img = img.convert('RGB') + + # Save as JPEG with specified quality + if quality: + img.save(img_bytes, format='JPEG', quality=quality, optimize=True, + progressive=os.getenv("PROGRESSIVE_ENCODING", "1") == "1") + else: + img.save(img_bytes, format='JPEG', quality=85, optimize=True) + elif format == 'GIF': + # GIF format handling + if img.mode not in ('P', 'L'): + # Convert to palette mode for GIF + img = img.quantize(colors=256, method=2) # MEDIANCUT method + + # Save as GIF + img.save(img_bytes, format='GIF', optimize=True) + else: + # Default PNG format + compress_level = int(os.getenv("PNG_COMPRESSION", "6")) + img.save(img_bytes, format='PNG', compress_level=compress_level, optimize=True) + + img_bytes.seek(0) + data = img_bytes.read() + + # Log the size for debugging + size_kb = len(data) / 1024 + if size_kb > 500: # Warn if chunk is over 500KB + print(f" โ ๏ธ Large chunk detected: {size_kb:.1f}KB - consider enabling compression!") + + return data + + def _process_image_chunks(self, img, width, height, context, check_stop_fn): + """Process a tall image by splitting it into chunks with contextual support""" + num_chunks = (height + self.chunk_height - 1) // self.chunk_height + overlap = 100 # Pixels of overlap between chunks + + print(f" โ๏ธ Image too tall ({height}px), splitting into {num_chunks} chunks of {self.chunk_height}px...") + + # Clear context for new image + self.image_chunk_context = [] + + # Add retry info if enabled + if os.getenv("RETRY_TIMEOUT", "1") == "1": + timeout_seconds = int(os.getenv("CHUNK_TIMEOUT", "180")) + print(f" โฑ๏ธ Auto-retry enabled: Will retry if chunks take > {timeout_seconds}s") + + print(f" โณ This may take {num_chunks * 30}-{num_chunks * 60} seconds to complete") + print(f" โน๏ธ Stop will take effect after current chunk completes") + + # Check if we should save debug chunks + save_debug_chunks = os.getenv('SAVE_CLEANED_IMAGES', '0') == '1' + save_compressed_chunks = os.getenv('SAVE_COMPRESSED_IMAGES', '0') == '1' + + if save_debug_chunks or save_compressed_chunks: + debug_dir = os.path.join(self.output_dir, "translated_images", "debug_chunks") + os.makedirs(debug_dir, exist_ok=True) + print(f" ๐ Debug mode: Saving chunks to {debug_dir}") + + # Load progress - maintaining full structure + prog = self.load_progress() + + # Create unique key for this image - include chapter info if available + image_basename = os.path.basename(self.current_image_path) if hasattr(self, 'current_image_path') else str(hash(str(img))) + + # Try to extract chapter number from context or path + chapter_num = None + if hasattr(self, 'current_chapter_num'): + chapter_num = self.current_chapter_num + else: + # Try to extract from filename + import re + match = re.search(r'ch(?:apter)?[\s_-]*(\d+)', image_basename, re.IGNORECASE) + if match: + chapter_num = match.group(1) + + # Create a more unique key that includes chapter info + if chapter_num: + image_key = f"ch{chapter_num}_{image_basename}" + else: + image_key = image_basename + + # Initialize image chunk tracking + if "image_chunks" not in prog: + prog["image_chunks"] = {} + + if image_key not in prog["image_chunks"]: + prog["image_chunks"][image_key] = { + "total": num_chunks, + "completed": [], + "chunks": {}, + "height": height, + "width": width, + "chapter": chapter_num, # Store chapter association + "filename": image_basename + } + + all_translations = [] + was_stopped = False + + # Process chunks + for i in range(num_chunks): + # Check if this chunk was already translated + if i in prog["image_chunks"][image_key]["completed"]: + saved_chunk = prog["image_chunks"][image_key]["chunks"].get(str(i)) + if saved_chunk: + all_translations.append(saved_chunk) + print(f" โญ๏ธ Chunk {i+1}/{num_chunks} already translated, skipping") + continue + + # Check for stop before processing each chunk + if check_stop_fn and check_stop_fn(): + print(f" โ Stopped at chunk {i+1}/{num_chunks}") + was_stopped = True + break + + # Calculate chunk boundaries with overlap + start_y = max(0, i * self.chunk_height - (overlap if i > 0 else 0)) + end_y = min(height, (i + 1) * self.chunk_height) + + current_filename = os.path.basename(self.current_image_path) if hasattr(self, 'current_image_path') else 'unknown' + print(f" ๐ Processing chunk {i+1}/{num_chunks} (y: {start_y}-{end_y}) for {current_filename}") + if self.log_callback and hasattr(self.log_callback, '__self__') and hasattr(self.log_callback.__self__, 'append_chunk_progress'): + self.log_callback.__self__.append_chunk_progress( + i + 1, + num_chunks, + "image", + f"Image: {os.path.basename(self.current_image_path) if hasattr(self, 'current_image_path') else 'unknown'}" + ) + + print(f" โณ Estimated time: 30-60 seconds for this chunk") + + # Crop and process the chunk + chunk = img.crop((0, start_y, width, end_y)) + + # Convert chunk to bytes with compression + chunk_bytes = self._image_to_bytes_with_compression(chunk) + + # Save debug chunks if enabled + if save_debug_chunks or save_compressed_chunks: + # Save original chunk + if save_debug_chunks: + chunk_path = os.path.join(debug_dir, f"chunk_{i+1}_original.png") + chunk.save(chunk_path) + print(f" ๐พ Saved original chunk: {chunk_path}") + + # Save compressed chunk if enabled + if save_compressed_chunks and os.getenv("ENABLE_IMAGE_COMPRESSION", "0") == "1": + compressed_dir = os.path.join(self.output_dir, "translated_images", "compressed", "chunks") + os.makedirs(compressed_dir, exist_ok=True) + + # Use compression settings to save chunk + format_setting = os.getenv("IMAGE_COMPRESSION_FORMAT", "auto") + if format_setting == "auto": + format_setting = "webp" # Default to WebP for chunks + + # Create a temporary in-memory file for the compressed chunk + from io import BytesIO + compressed_buffer = BytesIO() + + if format_setting == "webp": + quality = int(os.getenv("WEBP_QUALITY", "85")) + chunk.save(compressed_buffer, format='WEBP', quality=quality, method=6) + compressed_chunk_path = os.path.join(compressed_dir, f"chunk_{i+1}_compressed.webp") + elif format_setting == "jpeg": + quality = int(os.getenv("JPEG_QUALITY", "85")) + # Convert RGBA to RGB for JPEG + if chunk.mode == 'RGBA': + rgb_chunk = Image.new('RGB', chunk.size, (255, 255, 255)) + rgb_chunk.paste(chunk, mask=chunk.split()[3]) + chunk_to_save = rgb_chunk + else: + chunk_to_save = chunk + chunk_to_save.save(compressed_buffer, format='JPEG', quality=quality, optimize=True) + compressed_chunk_path = os.path.join(compressed_dir, f"chunk_{i+1}_compressed.jpg") + else: # PNG + compress_level = int(os.getenv("PNG_COMPRESSION", "6")) + chunk.save(compressed_buffer, format='PNG', compress_level=compress_level, optimize=True) + compressed_chunk_path = os.path.join(compressed_dir, f"chunk_{i+1}_compressed.png") + + # Write the compressed chunk to disk + with open(compressed_chunk_path, 'wb') as f: + f.write(compressed_buffer.getvalue()) + + # Get actual original chunk size before compression + chunk_buffer = BytesIO() + chunk.save(chunk_buffer, format='PNG') + actual_original_size = len(chunk_buffer.getvalue()) / 1024 # KB + + # Log compression info + compressed_size = len(compressed_buffer.getvalue()) / 1024 # KB + compression_ratio = (1 - compressed_size / actual_original_size) * 100 if actual_original_size > 0 else 0 + + print(f" ๐พ Saved compressed chunk: {compressed_chunk_path}") + print(f" ๐ Chunk compression: {actual_original_size:.1f}KB โ {compressed_size:.1f}KB ({compression_ratio:.1f}% reduction)") + + # Get custom image chunk prompt template from environment + image_chunk_prompt_template = os.getenv("IMAGE_CHUNK_PROMPT", "This is part {chunk_idx} of {total_chunks} of a longer image. You must maintain the narrative flow with the previous chunks while translating it and following all system prompt guidelines previously mentioned. {context}") + + # Build context for this chunk + chunk_context = image_chunk_prompt_template.format( + chunk_idx=i+1, + total_chunks=num_chunks, + context=context + ) + + # Translate chunk WITH CONTEXT + translation = self._call_vision_api(chunk_bytes, chunk_context, check_stop_fn) + + if translation: + # Clean AI artifacts from chunk + if self.remove_ai_artifacts: + chunk_text = self._clean_translation_response(translation) + else: + chunk_text = translation + # Normalize and sanitize each chunk + chunk_text = self._normalize_unicode_width(chunk_text) + chunk_text = self._sanitize_unicode_characters(chunk_text) + all_translations.append(chunk_text) + print(f" ๐ DEBUG: Chunk {i+1} length: {len(chunk_text)} chars") + if len(chunk_text) > 10000: # Flag suspiciously large chunks + print(f" โ ๏ธ WARNING: Chunk unusually large!") + print(f" First 500 chars: {chunk_text[:500]}") + print(f" Last 500 chars: {chunk_text[-500:]}") + + # Store context for next chunks + if self.contextual_enabled: + self.image_chunk_context.append({ + "user": chunk_context, + "assistant": chunk_text + }) + + # Save chunk progress + prog["image_chunks"][image_key]["completed"].append(i) + prog["image_chunks"][image_key]["chunks"][str(i)] = chunk_text + self.save_progress(prog) + + print(f" โ Chunk {i+1} translated and saved ({len(chunk_text)} chars)") + else: + print(f" โ ๏ธ Chunk {i+1} returned no text") + + # Delay between chunks if not the last one + if i < num_chunks - 1 and not was_stopped: + self._api_delay_with_stop_check(check_stop_fn) + if check_stop_fn and check_stop_fn(): + was_stopped = True + break + + # Combine all chunk translations + if all_translations: + translated_text = "\n\n".join(all_translations) + if was_stopped: + translated_text += "\n\n[TRANSLATION STOPPED BY USER]" + print(f" โ Combined {len(all_translations)} chunks into final translation") + return translated_text + else: + print(f" โ No successful translations from any chunks") + return None + + def set_current_chapter(self, chapter_num): + """Set the current chapter number for progress tracking""" + self.current_chapter_num = chapter_num + + def _call_vision_api(self, image_data, context, check_stop_fn): + """Make the actual API call for vision translation with retry support""" + # Build messages - NO HARDCODED PROMPT + messages = [ + {"role": "system", "content": self.system_prompt} + ] + + # Add context from previous chunks if contextual is enabled + if hasattr(self, 'contextual_enabled') and self.contextual_enabled: + if hasattr(self, 'image_chunk_context') and self.image_chunk_context: + # Include ALL previous chunks from this image, not just last 2 + print(f" ๐ Including ALL {len(self.image_chunk_context)} previous chunks as context") + + for ctx in self.image_chunk_context: + messages.extend([ + {"role": "user", "content": ctx["user"]}, + {"role": "assistant", "content": ctx["assistant"]} + ]) + + # Add current chunk (this already exists) + messages.append({ + "role": "user", + "content": context if context else "" + }) + if hasattr(self, 'current_chapter_num'): + chapter_num = self.current_chapter_num + image_idx = getattr(self, 'current_image_index', 0) + output_filename = f"response_{chapter_num:03d}_Chapter_{chapter_num}_image_{image_idx}.html" + self.client.set_output_filename(output_filename) + + retry_timeout_enabled = os.getenv("RETRY_TIMEOUT", "1") == "1" + chunk_timeout = int(os.getenv("CHUNK_TIMEOUT", "180")) if retry_timeout_enabled else None + max_timeout_retries = 2 + + # Store original values + original_max_tokens = self.image_max_tokens + original_temp = self.temperature + + # Initialize retry counters + timeout_retry_count = 0 + + while True: + try: + current_max_tokens = self.image_max_tokens + current_temp = self.temperature + + print(f" ๐ Calling vision API...") + print(f" ๐ Using temperature: {current_temp}") + print(f" ๐ Output Token Limit: {current_max_tokens}") + + if chunk_timeout: + print(f" โฑ๏ธ Timeout enabled: {chunk_timeout} seconds") + + # Final stop check before API call + if check_stop_fn and check_stop_fn(): + print(" โ Stopped before API call") + return None + + # Use the new interrupt function + translation_response, trans_finish = send_image_with_interrupt( + self.client, + messages, + image_data, + current_temp, + current_max_tokens, + check_stop_fn, + chunk_timeout, + 'image_translation' + ) + + print(f" ๐ก API response received, finish_reason: {trans_finish}") + + # Check if translation was truncated + if trans_finish in ["length", "max_tokens"]: + print(f" โ ๏ธ Translation was TRUNCATED! Consider increasing Max tokens.") + translation_response += "\n\n[TRANSLATION TRUNCATED DUE TO TOKEN LIMIT]" + + # Success - restore original values if they were changed + if timeout_retry_count > 0: + self.image_max_tokens = original_max_tokens + self.temperature = original_temp + print(f" โ Restored original settings after successful retry") + + return translation_response.strip() + + except Exception as e: + from unified_api_client import UnifiedClientError + error_msg = str(e) + print(f"\n๐ DEBUG: Image Translation Failed") + print(f" Error: {error_msg}") + print(f" Error Type: {type(e).__name__}") + + # Handle user stop + if "stopped by user" in error_msg: + print(" โ Image translation stopped by user") + return None + # Handle timeout specifically + if "took" in error_msg and "timeout:" in error_msg: + if timeout_retry_count < max_timeout_retries: + timeout_retry_count += 1 + print(f" โฑ๏ธ Chunk took too long, retry {timeout_retry_count}/{max_timeout_retries}") + + print(f" ๐ Retrying") + + time.sleep(2) + continue + else: + print(f" โ Max timeout retries reached for image") + # Restore original values + self.image_max_tokens = original_max_tokens + self.temperature = original_temp + return f"[Image Translation Error: Timeout after {max_timeout_retries} retries]" + + # Handle other timeouts + elif "timed out" in error_msg and "timeout:" not in error_msg: + print(f" โ ๏ธ {error_msg}, retrying...") + time.sleep(5) + continue + + # For other errors, restore values and return error + if timeout_retry_count > 0: + self.image_max_tokens = original_max_tokens + self.temperature = original_temp + + print(f" โ Translation failed: {e}") + print(f" โ Error type: {type(e).__name__}") + return f"[Image Translation Error: {str(e)}]" + + + def _clean_translation_response(self, response): + """Clean AI artifacts from translation response while preserving content""" + if not response or not response.strip(): + return response + + # First, preserve the original response length for debugging + original_length = len(response) + + # Remove common AI prefixes - but be more careful + lines = response.split('\n') + + # Check if first line is just a prefix without content + if len(lines) > 1 and lines[0].strip() and lines[0].strip().lower() in [ + 'sure', 'here', "i'll translate", 'certainly', 'okay', + 'here is the translation:', 'translation:', "here's the translation:", + "i'll translate the text from the image:", "let me translate that for you:" + ]: + # Remove only the first line if it's just a prefix + response = '\n'.join(lines[1:]).strip() + elif len(lines) > 1 and lines[0].strip() and any( + lines[0].strip().lower().startswith(prefix) + for prefix in ['sure,', 'here,', "i'll translate", 'certainly,', 'okay,'] + ): + # Check if the first line contains actual translation content after the prefix + first_line = lines[0].strip() + # Look for a colon or period that might separate prefix from content + for sep in [':', '.', ',']: + if sep in first_line: + parts = first_line.split(sep, 1) + if len(parts) > 1 and parts[1].strip(): + # There's content after the separator, keep it + lines[0] = parts[1].strip() + response = '\n'.join(lines).strip() + break + else: + # No separator found with content, remove the whole first line + response = '\n'.join(lines[1:]).strip() + + # Log if we removed significant content + cleaned_length = len(response) + if cleaned_length == 0 and original_length > 0: + print(f" โ ๏ธ WARNING: Cleaning removed all content! Original: {original_length} chars") + print(f" โ ๏ธ First 200 chars were: {response[:200]}") + elif cleaned_length < original_length * 0.5: + print(f" โ ๏ธ WARNING: Cleaning removed >50% of content! {original_length} โ {cleaned_length}") + + return response + + def _save_translation_debug(self, image_path, translated_text): + """Save translation to file for debugging""" + trans_filename = f"translated_{os.path.basename(image_path)}.txt" + trans_filepath = os.path.join(self.translated_images_dir, trans_filename) + + try: + with open(trans_filepath, 'w', encoding='utf-8') as f: + f.write(translated_text) + print(f" ๐พ Saved translation to: {trans_filename}") + except Exception as e: + print(f" โ ๏ธ Could not save translation file: {e}") + + def _remove_http_links(self, text: str) -> str: + """Remove HTTP/HTTPS URLs from text while preserving other content""" + # Pattern to match URLs + url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+(?:\.[^\s<>"{}|\\^`\[\]]+)*' + + # Replace URLs with empty string + cleaned_text = re.sub(url_pattern, '', text) + + # Clean up extra whitespace that may result from URL removal + cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() + + return cleaned_text + + def _normalize_unicode_width(self, text: str) -> str: + """Normalize Unicode width and compatibility forms using NFKC""" + if not text: + return text + try: + import unicodedata + original = text + text = unicodedata.normalize('NFKC', text) + if text != original: + try: + if self.log_callback: + self.log_callback(f"๐ค Normalized width/compat: '{original[:30]}...' โ '{text[:30]}...'") + except Exception: + pass + return text + except Exception: + return text + + def _sanitize_unicode_characters(self, text: str) -> str: + """Remove invalid Unicode characters and common fallback boxes""" + if not text: + return text + import re + original = text + # Replacement character and common geometric fallbacks + text = text.replace('\ufffd', '') + for ch in ['โก','โ','โ','โ ','โข','โฃ','โค','โฅ','โฆ','โง','โจ','โฉ']: + text = text.replace(ch, '') + text = re.sub(r'[\u200b-\u200f\u2028-\u202f\u205f-\u206f\ufeff]', '', text) + text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F-\x9F]', '', text) + try: + text = text.encode('utf-8', errors='ignore').decode('utf-8') + except UnicodeError: + pass + # Normalize whitespace + text = re.sub(r'\s+', ' ', text).strip() + return text + + def _create_html_output(self, img_rel_path, translated_text, is_long_text, hide_label, was_stopped): + print(f" ๐ DEBUG: Creating HTML output") + print(f" Total translation length: {len(translated_text)} chars") + if len(translated_text) > 50000: + print(f" โ ๏ธ WARNING: Translation suspiciously large!") + """Create the final HTML output""" + # Check if the translation is primarily a URL (only a URL and nothing else) + url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+(?:\.[^\s<>"{}|\\^`\[\]]+)*' + + # Check if the entire content is just a URL + url_match = re.match(r'^\s*' + url_pattern + r'\s*$', translated_text.strip()) + is_only_url = bool(url_match) + + # Build the label HTML if needed + if hide_label: + label_html = "" + # Remove URLs from the text, but keep other content + cleaned_text = self._remove_http_links(translated_text) + + # If after removing URLs there's no content left, and original was only URL + if not cleaned_text and is_only_url: + translated_text = "[Image contains only URL]" + else: + # Use the cleaned text (URLs removed, other content preserved) + translated_text = cleaned_text + else: + if was_stopped: + label_html = f'
(partial)
\n' + else: + label_html = "" + + # Build the image HTML based on type - or skip it entirely if hide_label is enabled + if hide_label: + # Don't include the image at all when hide_label is enabled + image_html = "" + css_class = "translated-text-only" + elif is_long_text: + image_html = f"""{para}
') + else: + html_parts.append(f'{para}
') + + # If no paragraphs were created (single line), wrap it + if not html_parts and text.strip(): + html_parts.append(f'{text.strip()}
') + + result = '\n'.join(html_parts) + + # Debug output + print(f" ๐ Created {len(html_parts)} paragraphs from text") + + return result + + def _unescape_response_text(self, text): + """Unescape text that comes back with literal \n characters""" + if not text: + return text + + # Convert to string if needed + text = str(text) + + # Remove tuple wrapping if present (e.g., ('text') or ("text")) + if text.startswith('("') and text.endswith('")'): + text = text[2:-2] + elif text.startswith("('") and text.endswith("')"): + text = text[2:-2] + elif text.startswith('(') and text.endswith(')') and len(text) > 2: + # Check if it's a single-item tuple representation + inner = text[1:-1].strip() + if (inner.startswith('"') and inner.endswith('"')) or (inner.startswith("'") and inner.endswith("'")): + text = inner[1:-1] + + # Handle escaped characters - convert literal \n to actual newlines + text = text.replace('\\n', '\n') + text = text.replace('\\t', '\t') + text = text.replace('\\"', '"') + text = text.replace("\\'", "'") + text = text.replace('\\\\', '\\') + + return text + + def update_chapter_with_translated_images(self, chapter_html: str, image_translations: Dict[str, str]) -> str: + """ + Update chapter HTML to include image translations + + Args: + chapter_html: Original chapter HTML + image_translations: Dict mapping original image paths to translation HTML + + Returns: + Updated HTML + """ + soup = BeautifulSoup(chapter_html, 'html.parser') + + for img in soup.find_all('img'): + src = img.get('src', '') + if src in image_translations: + # Replace the img tag with the translation HTML + translation_html = image_translations[src] + new_element = BeautifulSoup(translation_html, 'html.parser') + img.replace_with(new_element) + + return str(soup) + + def save_translation_log(self, chapter_num: int, translations: Dict[str, str]): + """ + Save a log of all translations for a chapter + + Args: + chapter_num: Chapter number + translations: Dict of image path to translated text + """ + if not translations: + return + + log_dir = os.path.join(self.translated_images_dir, 'logs') + os.makedirs(log_dir, exist_ok=True) + + log_file = os.path.join(log_dir, f'chapter_{chapter_num}_translations.json') + + log_data = { + 'chapter': chapter_num, + 'timestamp': os.environ.get('TZ', 'UTC'), + 'translations': {} + } + + for img_path, translation in translations.items(): + # Extract just the text from HTML if needed + if '