""" Vector embeddings utilities for semantic search. """ import os import threading from typing import List, Optional, Union, Dict import numpy as np from pathlib import Path try: from sentence_transformers import SentenceTransformer SENTENCE_TRANSFORMERS_AVAILABLE = True except ImportError: SENTENCE_TRANSFORMERS_AVAILABLE = False SentenceTransformer = None # Available embedding models (ordered by preference for Vietnamese) # Models are ordered from fastest to best quality AVAILABLE_MODELS = { # Fast models (384 dim) - Good for production "paraphrase-multilingual": "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", # Fast, 384 dim # High quality models (768 dim) - Better accuracy "multilingual-mpnet": "sentence-transformers/paraphrase-multilingual-mpnet-base-v2", # High quality, 768 dim, recommended "vietnamese-sbert": "keepitreal/vietnamese-sbert-v2", # Vietnamese-specific (may require auth) # Very high quality models (1024+ dim) - Best accuracy but slower "bge-m3": "BAAI/bge-m3", # Best for Vietnamese, 1024 dim, supports dense+sparse+multi-vector "multilingual-e5-large": "intfloat/multilingual-e5-large", # Very high quality, 1024 dim, large model "multilingual-e5-base": "intfloat/multilingual-e5-base", # High quality, 768 dim, balanced # Vietnamese-specific models (if available) "vietnamese-embedding": "dangvantuan/vietnamese-embedding", # Vietnamese-specific (if available) "vietnamese-bi-encoder": "bkai-foundation-models/vietnamese-bi-encoder", # Vietnamese bi-encoder (if available) } # Default embedding model for Vietnamese (can be overridden via env var) # Use bge-m3 as default - best for Vietnamese legal documents (1024 dim) # Fallback to multilingual-e5-base if bge-m3 not available (768 dim, good balance) # Can be set via EMBEDDING_MODEL env var (supports both short names and full model paths) # Examples: # - EMBEDDING_MODEL=bge-m3 (uses short name, recommended for Vietnamese) # - EMBEDDING_MODEL=multilingual-e5-base (uses short name) # - EMBEDDING_MODEL=intfloat/multilingual-e5-base (full path) # - EMBEDDING_MODEL=/path/to/local/model (local model path) # - EMBEDDING_MODEL=username/private-model (private HF model, requires HF_TOKEN) DEFAULT_MODEL_NAME = os.environ.get( "EMBEDDING_MODEL", AVAILABLE_MODELS.get("bge-m3", "BAAI/bge-m3") # BGE-M3 is default, no fallback ) FALLBACK_MODEL_NAME = AVAILABLE_MODELS.get("paraphrase-multilingual", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") # Thread-safe singleton for model caching class EmbeddingModelManager: """Thread-safe singleton manager for embedding models.""" _instance: Optional["EmbeddingModelManager"] = None _lock = threading.Lock() _model: Optional[SentenceTransformer] = None _model_name: Optional[str] = None _model_lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def get_model( self, model_name: Optional[str] = None, force_reload: bool = False, ) -> Optional[SentenceTransformer]: """ Get or load embedding model instance with thread-safe caching. Args: model_name: Name of the model to load. force_reload: Force reload model even if cached. Returns: SentenceTransformer instance or None if not available. """ if not SENTENCE_TRANSFORMERS_AVAILABLE: print( "Warning: sentence-transformers not installed. " "Install with: pip install sentence-transformers" ) return None resolved_model_name = model_name or DEFAULT_MODEL_NAME if resolved_model_name in AVAILABLE_MODELS: resolved_model_name = AVAILABLE_MODELS[resolved_model_name] if ( not force_reload and self._model is not None and self._model_name == resolved_model_name ): return self._model with self._model_lock: if ( not force_reload and self._model is not None and self._model_name == resolved_model_name ): return self._model return self._load_model(resolved_model_name) def _load_model(self, resolved_model_name: str) -> Optional[SentenceTransformer]: """Internal method to load model (must be called with lock held).""" try: print(f"Loading embedding model: {resolved_model_name}") model_path = Path(resolved_model_name) if model_path.exists() and model_path.is_dir(): print(f"Loading local model from: {resolved_model_name}") self._model = SentenceTransformer(str(model_path)) else: hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") model_kwargs = {} if hf_token: print(f"Using Hugging Face token for model: {resolved_model_name}") model_kwargs["token"] = hf_token self._model = SentenceTransformer(resolved_model_name, **model_kwargs) self._model_name = resolved_model_name try: test_embedding = self._model.encode("test", show_progress_bar=False) dim = len(test_embedding) print(f"✅ Successfully loaded model: {resolved_model_name} (dimension: {dim})") except Exception: print(f"✅ Successfully loaded model: {resolved_model_name}") return self._model except Exception as exc: print(f"❌ Error loading model {resolved_model_name}: {exc}") if resolved_model_name != FALLBACK_MODEL_NAME: print(f"Trying fallback model: {FALLBACK_MODEL_NAME}") try: self._model = SentenceTransformer(FALLBACK_MODEL_NAME) self._model_name = FALLBACK_MODEL_NAME test_embedding = self._model.encode("test", show_progress_bar=False) dim = len(test_embedding) print( f"✅ Successfully loaded fallback model: {FALLBACK_MODEL_NAME} " f"(dimension: {dim})" ) return self._model except Exception as fallback_exc: print(f"❌ Error loading fallback model: {fallback_exc}") return None # Global manager instance _embedding_manager = EmbeddingModelManager() def get_embedding_model(model_name: Optional[str] = None, force_reload: bool = False) -> Optional[SentenceTransformer]: """ Get or load embedding model instance with thread-safe caching. Args: model_name: Name of the model to load. Can be: - Full model name (e.g., "keepitreal/vietnamese-sbert-v2") - Short name (e.g., "vietnamese-sbert") - None (uses DEFAULT_MODEL_NAME from env or default) force_reload: Force reload model even if cached. Returns: SentenceTransformer instance or None if not available. """ return _embedding_manager.get_model(model_name, force_reload) def list_available_models() -> Dict[str, str]: """ List all available embedding models. Returns: Dictionary mapping short names to full model names. """ return AVAILABLE_MODELS.copy() def compare_models(texts: List[str], model_names: Optional[List[str]] = None) -> Dict[str, Dict[str, float]]: """ Compare different embedding models on sample texts. Args: texts: List of sample texts to test. model_names: List of model names to compare. If None, compares all available models. Returns: Dictionary with comparison results including: - dimension: Embedding dimension - encoding_time: Time to encode texts (seconds) - avg_similarity: Average similarity between texts """ import time if model_names is None: model_names = list(AVAILABLE_MODELS.keys()) results = {} for model_key in model_names: if model_key not in AVAILABLE_MODELS: continue model_name = AVAILABLE_MODELS[model_key] try: model = get_embedding_model(model_name, force_reload=True) if model is None: continue # Get dimension dim = get_embedding_dimension(model_name) # Measure encoding time start_time = time.time() embeddings = generate_embeddings_batch(texts, model=model) encoding_time = time.time() - start_time # Calculate average similarity similarities = [] for i in range(len(embeddings)): for j in range(i + 1, len(embeddings)): if embeddings[i] is not None and embeddings[j] is not None: sim = cosine_similarity(embeddings[i], embeddings[j]) similarities.append(sim) avg_similarity = sum(similarities) / len(similarities) if similarities else 0.0 results[model_key] = { "model_name": model_name, "dimension": dim, "encoding_time": encoding_time, "avg_similarity": avg_similarity } except Exception as e: print(f"Error comparing model {model_key}: {e}") results[model_key] = {"error": str(e)} return results def generate_embedding(text: str, model: Optional[SentenceTransformer] = None) -> Optional[np.ndarray]: """ Generate embedding vector for a single text. Args: text: Input text to embed. model: SentenceTransformer instance. If None, uses default model. Returns: Numpy array of embedding vector or None if error. """ if not text or not text.strip(): return None if model is None: model = get_embedding_model() if model is None: return None try: import sys # Increase recursion limit temporarily for model.encode old_limit = sys.getrecursionlimit() try: sys.setrecursionlimit(5000) # Increase limit for model.encode embedding = model.encode(text, normalize_embeddings=True, show_progress_bar=False, convert_to_numpy=True) return embedding finally: sys.setrecursionlimit(old_limit) # Restore original limit except RecursionError as e: print(f"Error generating embedding (recursion): {e}", flush=True) return None except Exception as e: print(f"Error generating embedding: {e}", flush=True) return None def generate_embeddings_batch(texts: List[str], model: Optional[SentenceTransformer] = None, batch_size: Optional[int] = None) -> List[Optional[np.ndarray]]: # Get batch_size from env var or use default (balance speed and RAM) # Smaller batch = faster, larger batch = more RAM usage if batch_size is None: batch_size = int(os.environ.get("EMBEDDING_BATCH_SIZE", "128")) # Reduced from 256 for speed """ Generate embeddings for a batch of texts. Args: texts: List of input texts. model: SentenceTransformer instance. If None, uses default model. batch_size: Batch size for processing. Returns: List of numpy arrays (embeddings) or None for failed texts. """ if not texts: return [] if model is None: model = get_embedding_model() if model is None: return [None] * len(texts) try: import sys # Increase recursion limit temporarily for model.encode old_limit = sys.getrecursionlimit() try: sys.setrecursionlimit(5000) # Increase limit for model.encode embeddings = model.encode( texts, batch_size=batch_size, normalize_embeddings=True, show_progress_bar=False, convert_to_numpy=True ) return [emb for emb in embeddings] finally: sys.setrecursionlimit(old_limit) # Restore original limit except RecursionError as e: print(f"Error generating batch embeddings (recursion): {e}", flush=True) return [None] * len(texts) except Exception as e: print(f"Error generating batch embeddings: {e}", flush=True) return [None] * len(texts) def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float: """ Calculate cosine similarity between two vectors. Args: vec1: First vector. vec2: Second vector. Returns: Cosine similarity score (0-1). """ if vec1 is None or vec2 is None: return 0.0 dot_product = np.dot(vec1, vec2) norm1 = np.linalg.norm(vec1) norm2 = np.linalg.norm(vec2) if norm1 == 0 or norm2 == 0: return 0.0 return float(dot_product / (norm1 * norm2)) def get_embedding_dimension(model_name: Optional[str] = None) -> int: """ Get embedding dimension for a model. Args: model_name: Model name. If None, uses default. Returns: Embedding dimension or 0 if unknown. """ model = get_embedding_model(model_name) if model is None: return 0 # Get dimension by encoding a dummy text try: dummy_embedding = model.encode("test", show_progress_bar=False) return len(dummy_embedding) except Exception: return 0