|
|
from datetime import datetime |
|
|
import json |
|
|
import time |
|
|
import numpy as np |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request |
|
|
from fastapi.responses import StreamingResponse, Response |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel |
|
|
from typing import Optional, Union, Dict, Any |
|
|
from llama_cpp import Llama |
|
|
from huggingface_hub import login, hf_hub_download |
|
|
import logging |
|
|
import os |
|
|
import faiss |
|
|
import asyncio |
|
|
import psutil |
|
|
from google import genai |
|
|
from google.genai import types |
|
|
import httpx |
|
|
import wave |
|
|
import io |
|
|
from elevenlabs import ElevenLabs, VoiceSettings |
|
|
from slowapi import Limiter, _rate_limit_exceeded_handler |
|
|
from slowapi.util import get_remote_address |
|
|
from slowapi.errors import RateLimitExceeded |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
|
|
|
limiter = Limiter(key_func=get_remote_address) |
|
|
app.state.limiter = limiter |
|
|
|
|
|
|
|
|
async def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded): |
|
|
client_ip = get_remote_address(request) |
|
|
logger.warning(f"Rate limit exceeded for IP {client_ip} on endpoint {request.url.path}") |
|
|
|
|
|
|
|
|
return Response( |
|
|
content=json.dumps({ |
|
|
"error": "rate_limit_exceeded", |
|
|
"message": "Too many requests. Please wait a moment before trying again.", |
|
|
"retry_after": 60 |
|
|
}), |
|
|
status_code=429, |
|
|
headers={ |
|
|
"Content-Type": "application/json", |
|
|
"Retry-After": "60" |
|
|
} |
|
|
) |
|
|
|
|
|
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
model_lock = asyncio.Lock() |
|
|
|
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if not hf_token: |
|
|
logger.error("HF_TOKEN environment variable not set.") |
|
|
raise ValueError("HF_TOKEN not set") |
|
|
login(token=hf_token) |
|
|
|
|
|
|
|
|
USE_GEMINI = os.getenv("USE_GEMINI", "false").lower() == "true" |
|
|
sentence_transformer_model = "all-MiniLM-L6-v2" |
|
|
repo_id = "unsloth/Qwen3-1.7B-GGUF" |
|
|
filename = "Qwen3-1.7B-Q4_K_M.gguf" |
|
|
|
|
|
|
|
|
if USE_GEMINI: |
|
|
gemini_api_key = os.getenv("GEMINI_API_KEY") |
|
|
gemini_model = os.getenv("GEMINI_MODEL") |
|
|
if not gemini_api_key: |
|
|
logger.error("GEMINI_API_KEY environment variable not set but USE_GEMINI is true.") |
|
|
raise ValueError("GEMINI_API_KEY not set") |
|
|
gemini_client = genai.Client(api_key=gemini_api_key) |
|
|
logger.info("Gemini API client initialized") |
|
|
else: |
|
|
gemini_client = None |
|
|
logger.info("Using local model (Gemini disabled)") |
|
|
|
|
|
|
|
|
elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY") |
|
|
if elevenlabs_api_key: |
|
|
elevenlabs_client = ElevenLabs(api_key=elevenlabs_api_key) |
|
|
|
|
|
|
|
|
tts_voice_id = os.getenv("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") |
|
|
logger.info("ElevenLabs TTS client initialized") |
|
|
else: |
|
|
elevenlabs_client = None |
|
|
logger.info("ElevenLabs TTS disabled (no API key provided)") |
|
|
|
|
|
|
|
|
tts_provider = os.getenv("TTS_PROVIDER", "elevenlabs").lower() |
|
|
gemini_tts_model = os.getenv("GEMINI_TTS_MODEL", "gemini-2.5-flash-preview-tts") |
|
|
gemini_tts_voice = os.getenv("GEMINI_TTS_VOICE", "Kore") |
|
|
|
|
|
|
|
|
faqs = [ |
|
|
{"question": "What is your name?", "answer": "My name is Tim Luka Horstmann."}, |
|
|
{"question": "Where do you live?", "answer": "I live in Paris, France."}, |
|
|
{"question": "What is your education?", "answer": "I am currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris. I have an MPhil in Advanced Computer Science from the University of Cambridge, and a BSc in Business Informatics from RheinMain University of Applied Sciences."}, |
|
|
{"question": "What are your skills?", "answer": "I am proficient in Python, Java, SQL, Cypher, SPARQL, VBA, JavaScript, HTML/CSS, and Ruby. I also use tools like PyTorch, Hugging Face, Scikit-Learn, NumPy, Pandas, Matplotlib, Jupyter, Git, Bash, IoT, Ansible, QuickSight, and Wordpress."}, |
|
|
{"question": "How are you?", "answer": "I’m doing great, thanks for asking! I’m enjoying life in Paris and working on some exciting AI projects."}, |
|
|
{"question": "What do you do?", "answer": "I’m a Computer Scientist and AI enthusiast, currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris and interning as a Machine Learning Research Engineer at Hi! PARIS."}, |
|
|
{"question": "How’s it going?", "answer": "Things are going well, thanks! I’m busy with my studies and research, but I love the challenges and opportunities I get to explore."}, |
|
|
] |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Loading CV embeddings from cv_embeddings.json") |
|
|
with open("cv_embeddings.json", "r", encoding="utf-8") as f: |
|
|
cv_data = json.load(f) |
|
|
cv_chunks = [item["chunk"] for item in cv_data] |
|
|
cv_embeddings = np.array([item["embedding"] for item in cv_data]).astype('float32') |
|
|
faiss.normalize_L2(cv_embeddings) |
|
|
faiss_index = faiss.IndexFlatIP(cv_embeddings.shape[1]) |
|
|
faiss_index.add(cv_embeddings) |
|
|
logger.info("FAISS index built successfully") |
|
|
|
|
|
|
|
|
logger.info("Loading SentenceTransformer model") |
|
|
embedder = SentenceTransformer(sentence_transformer_model, device="cpu") |
|
|
logger.info("SentenceTransformer model loaded") |
|
|
|
|
|
|
|
|
faq_questions = [faq["question"] for faq in faqs] |
|
|
faq_embeddings = embedder.encode(faq_questions, convert_to_numpy=True).astype("float32") |
|
|
faiss.normalize_L2(faq_embeddings) |
|
|
|
|
|
|
|
|
if not USE_GEMINI: |
|
|
logger.info(f"Loading {filename} model") |
|
|
model_path = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename=filename, |
|
|
local_dir="/app/cache" if os.getenv("HF_HOME") else None, |
|
|
token=hf_token, |
|
|
) |
|
|
generator = Llama( |
|
|
model_path=model_path, |
|
|
n_ctx=3072, |
|
|
n_threads=2, |
|
|
n_batch=64, |
|
|
n_gpu_layers=0, |
|
|
use_mlock=True, |
|
|
f16_kv=True, |
|
|
verbose=True, |
|
|
batch_prefill=True, |
|
|
prefill_logits=False, |
|
|
) |
|
|
logger.info(f"{filename} model loaded") |
|
|
else: |
|
|
generator = None |
|
|
logger.info("Skipping local model loading (using Gemini API)") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Startup error: {str(e)}", exc_info=True) |
|
|
raise |
|
|
|
|
|
def retrieve_context(query, top_k=2): |
|
|
try: |
|
|
query_embedding = embedder.encode(query, convert_to_numpy=True).astype("float32") |
|
|
query_embedding = query_embedding.reshape(1, -1) |
|
|
faiss.normalize_L2(query_embedding) |
|
|
distances, indices = faiss_index.search(query_embedding, top_k) |
|
|
return "\n".join([cv_chunks[i] for i in indices[0]]) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in retrieve_context: {str(e)}") |
|
|
raise |
|
|
|
|
|
|
|
|
try: |
|
|
with open("cv_text.txt", "r", encoding="utf-8") as f: |
|
|
full_cv_text = f.read() |
|
|
if not isinstance(full_cv_text, str): |
|
|
full_cv_text = str(full_cv_text) |
|
|
logger.info("CV text loaded successfully") |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading cv_text.txt: {str(e)}") |
|
|
raise |
|
|
|
|
|
|
|
|
STATION_EXTRAS_BY_NAME: Dict[str, str] = {} |
|
|
STATION_EXTRAS_BY_ID: Dict[str, str] = {} |
|
|
try: |
|
|
extra_path = os.path.join(os.path.dirname(__file__), 'stations_extra.json') |
|
|
if os.path.exists(extra_path): |
|
|
with open(extra_path, 'r', encoding='utf-8') as f: |
|
|
extra_data = json.load(f) |
|
|
|
|
|
if isinstance(extra_data, dict): |
|
|
|
|
|
by_name = extra_data.get('by_name') or {} |
|
|
by_id = extra_data.get('by_id') or {} |
|
|
if isinstance(by_name, dict): |
|
|
for k, v in by_name.items(): |
|
|
if isinstance(k, str) and isinstance(v, str): |
|
|
STATION_EXTRAS_BY_NAME[k.strip().lower()] = v.strip() |
|
|
if isinstance(by_id, dict): |
|
|
for k, v in by_id.items(): |
|
|
if isinstance(k, str) and isinstance(v, str): |
|
|
STATION_EXTRAS_BY_ID[k.strip().lower()] = v.strip() |
|
|
|
|
|
stations = extra_data.get('stations') or [] |
|
|
if isinstance(stations, list): |
|
|
for item in stations: |
|
|
if not isinstance(item, dict): |
|
|
continue |
|
|
name = (item.get('name') or '').strip().lower() |
|
|
sid = (item.get('id') or '').strip().lower() |
|
|
extra = (item.get('extra') or '').strip() |
|
|
if name and extra: |
|
|
STATION_EXTRAS_BY_NAME[name] = extra |
|
|
if sid and extra: |
|
|
STATION_EXTRAS_BY_ID[sid] = extra |
|
|
elif isinstance(extra_data, list): |
|
|
for item in extra_data: |
|
|
if not isinstance(item, dict): |
|
|
continue |
|
|
name = (item.get('name') or '').strip().lower() |
|
|
sid = (item.get('id') or '').strip().lower() |
|
|
extra = (item.get('extra') or '').strip() |
|
|
if name and extra: |
|
|
STATION_EXTRAS_BY_NAME[name] = extra |
|
|
if sid and extra: |
|
|
STATION_EXTRAS_BY_ID[sid] = extra |
|
|
logger.info(f"Loaded station extras: {len(STATION_EXTRAS_BY_NAME)} by name, {len(STATION_EXTRAS_BY_ID)} by id") |
|
|
else: |
|
|
logger.info("No stations_extra.json found; skipping extras") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to load stations_extra.json: {e}") |
|
|
|
|
|
async def stream_response(query, history, game_context=None, mode: Optional[str] = None): |
|
|
"""Main streaming response function that routes to either Gemini or local model""" |
|
|
if USE_GEMINI: |
|
|
async for chunk in stream_response_gemini(query, history, game_context, mode): |
|
|
yield chunk |
|
|
else: |
|
|
async for chunk in stream_response_local(query, history, game_context, mode): |
|
|
yield chunk |
|
|
|
|
|
def _format_game_context_for_prompt(game_context: Optional[Union[str, Dict[str, Any]]]) -> str: |
|
|
"""Return a concise text snippet to inject into the system prompt from game context. |
|
|
|
|
|
Key changes: |
|
|
- Prefer current station (id/name) and filter transcript strictly to this station to avoid confusion. |
|
|
- Include admin-provided extras when available. |
|
|
- Provide a short visited list for flavor only. |
|
|
""" |
|
|
if not game_context: |
|
|
return "" |
|
|
try: |
|
|
if isinstance(game_context, str): |
|
|
return f"\nGAME CONTEXT: The player is currently at a station about {game_context}." |
|
|
if isinstance(game_context, dict): |
|
|
current = (game_context.get('current_station') or game_context.get('station') or '').strip() |
|
|
current_id = (game_context.get('current_station_id') or '').strip() |
|
|
visited = game_context.get('visited_stations') or [] |
|
|
context = (game_context.get('context') or game_context.get('current_context') or '').strip() |
|
|
|
|
|
parts = ["\nGAME CONTEXT:"] |
|
|
if current or current_id: |
|
|
parts.append(f"Current station: {current} (id: {current_id}).") |
|
|
if context: |
|
|
parts.append(f"Station details: {context}.") |
|
|
|
|
|
|
|
|
extra_text = '' |
|
|
if current_id: |
|
|
extra_text = STATION_EXTRAS_BY_ID.get(current_id.lower(), '') |
|
|
if not extra_text and current: |
|
|
extra_text = STATION_EXTRAS_BY_NAME.get(current.lower(), '') |
|
|
if extra_text: |
|
|
parts.append(f"Additional station notes (must consider): {extra_text}") |
|
|
|
|
|
|
|
|
if visited: |
|
|
try: |
|
|
uniq = [] |
|
|
for v in visited: |
|
|
if v and v not in uniq: |
|
|
uniq.append(v) |
|
|
if uniq: |
|
|
parts.append(f"Visited stations so far: {', '.join(uniq)}.") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
mem = game_context.get('__memory') if isinstance(game_context, dict) else None |
|
|
if isinstance(mem, dict): |
|
|
try: |
|
|
transcript = mem.get('transcript') or [] |
|
|
if transcript and (current or current_id): |
|
|
def belongs_here(m: Dict[str, Any]) -> bool: |
|
|
sid = (m.get('stationId') or '').strip() |
|
|
sname = (m.get('stationName') or '').strip() |
|
|
if current_id and sid: |
|
|
if sid.strip().lower() == current_id.lower(): |
|
|
return True |
|
|
if current and sname: |
|
|
if sname.strip().lower() == current.lower(): |
|
|
return True |
|
|
return False |
|
|
|
|
|
filtered = [m for m in transcript if belongs_here(m)] |
|
|
|
|
|
if filtered: |
|
|
lines = [] |
|
|
for m in filtered[-20:]: |
|
|
role = (m.get('role') or '').strip() |
|
|
src = (m.get('source') or '').strip() |
|
|
sta = (m.get('stationName') or '').strip() |
|
|
txt = (m.get('content') or '').replace('\n', ' ').strip() |
|
|
if len(txt) > 2000: |
|
|
txt = txt[:2000] + '…' |
|
|
label = role if role else 'msg' |
|
|
if src or sta: |
|
|
label += f"[{src}{'/' + sta if sta else ''}]" |
|
|
lines.append(f"- {label}: {txt}") |
|
|
if lines: |
|
|
parts.append("Recent exchanges at this station:\n" + "\n".join(lines)) |
|
|
except Exception: |
|
|
pass |
|
|
return " ".join(parts) |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
async def stream_response_gemini(query, history, game_context=None, mode: Optional[str] = None): |
|
|
"""Stream response using Gemini API with a proper system_instruction.""" |
|
|
logger.info(f"Processing query with Gemini: {query}") |
|
|
start_time = time.time() |
|
|
first_token_logged = False |
|
|
|
|
|
|
|
|
current_date = datetime.now().strftime("%Y-%m-%d") |
|
|
game_context_text = _format_game_context_for_prompt(game_context) |
|
|
|
|
|
|
|
|
is_game_mode = (mode == 'game') |
|
|
if is_game_mode: |
|
|
system_prompt = ( |
|
|
"You are Tim Luka Horstmann as a friendly in-game 'Station Guide'. " |
|
|
"Stay in first person. The current station is the primary focus—treat it as 'now' even when revisiting. " |
|
|
"Use the CV for personal facts, roles and dates; include 'Additional station notes' if present and relevant. " |
|
|
"Keep answers concise (2–4 sentences). If off-topic, answer briefly, then gently steer back to the current station. " |
|
|
"When revisiting a station, optionally acknowledge the revisit in one short clause. " |
|
|
"End with a tiny hint or nudge (optional) about what to explore here or next. " |
|
|
f"Today's date is {current_date}. CV: {full_cv_text}" + game_context_text |
|
|
) |
|
|
else: |
|
|
system_prompt = ( |
|
|
"You are Tim Luka Horstmann, a Computer Scientist. " |
|
|
"Respond as yourself in the first person with a warm, upbeat tone—feel free to be lighthearted, witty, and encouraging. " |
|
|
"Rely on the CV and FAQs below for personal facts and never invent new personal details, achievements, or opinions that are not documented there. " |
|
|
"You may comfortably discuss well-established general knowledge, your fields of interest, and everyday small talk, and you can acknowledge known facts about yourself such as living in Paris or working in AI. " |
|
|
"When a user seeks private or highly specific personal information that is missing from the CV or FAQs, politely explain that you do not have more to share and try to keep the conversation engaging by offering related general insights or friendly banter. " |
|
|
"Always remain kind, respectful, and helpful, and lean into genuinely helpful answers before declining. " |
|
|
f"Today's date is {current_date}. CV: {full_cv_text}" |
|
|
) |
|
|
|
|
|
|
|
|
contents = [] |
|
|
for msg in history: |
|
|
|
|
|
api_role = "" |
|
|
if msg["role"] == "user": |
|
|
api_role = "user" |
|
|
elif msg["role"] == "assistant": |
|
|
api_role = "model" |
|
|
elif msg["role"] == "model": |
|
|
api_role = "model" |
|
|
else: |
|
|
|
|
|
logger.warning(f"Unrecognized role '{msg['role']}' in history. Skipping message.") |
|
|
continue |
|
|
|
|
|
contents.append( |
|
|
types.Content( |
|
|
role=api_role, |
|
|
parts=[ types.Part.from_text(text=msg["content"]) ] |
|
|
) |
|
|
) |
|
|
|
|
|
contents.append( |
|
|
types.Content( |
|
|
role="user", |
|
|
parts=[ types.Part.from_text(text=query) ] |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
response = gemini_client.models.generate_content_stream( |
|
|
model=gemini_model, |
|
|
contents=contents, |
|
|
config=types.GenerateContentConfig( |
|
|
system_instruction=system_prompt, |
|
|
temperature=0.3, |
|
|
top_p=0.7, |
|
|
max_output_tokens=1024, |
|
|
response_mime_type="text/plain", |
|
|
) |
|
|
) |
|
|
|
|
|
for chunk in response: |
|
|
if chunk.text: |
|
|
if not first_token_logged: |
|
|
logger.info(f"First token time (Gemini): {time.time() - start_time:.2f}s") |
|
|
first_token_logged = True |
|
|
yield f"data: {chunk.text}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Gemini API error: {str(e)}") |
|
|
yield f"data: Sorry, I encountered an error with Gemini API: {str(e)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
|
|
|
async def stream_response_local(query, history, game_context=None, mode: Optional[str] = None): |
|
|
"""Stream response using local model""" |
|
|
logger.info(f"Processing query with local model: {query}") |
|
|
start_time = time.time() |
|
|
first_token_logged = False |
|
|
|
|
|
current_date = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
game_context_text = _format_game_context_for_prompt(game_context) |
|
|
|
|
|
|
|
|
is_game_mode = (mode == 'game') |
|
|
if is_game_mode: |
|
|
system_prompt = ( |
|
|
"/no_think You are Tim Luka Horstmann as a friendly in-game 'Station Guide'. " |
|
|
"Stay in first person. The current station is the primary focus—treat it as 'now' even when revisiting. " |
|
|
"Use the CV for personal facts, roles and dates; include 'Additional station notes' if present (do not ignore them). " |
|
|
"Keep answers concise (2–4 sentences). If off-topic, answer briefly, then gently steer back to the current station. " |
|
|
"When revisiting a station, optionally acknowledge the revisit in one short clause. " |
|
|
"End with a tiny hint or nudge (optional) about what to explore here or next. " |
|
|
f"Today's date is {current_date}. CV: {full_cv_text}" + game_context_text |
|
|
) |
|
|
else: |
|
|
system_prompt = ( |
|
|
"/no_think You are Tim Luka Horstmann, a Computer Scientist. Answer in the first person with a warm, upbeat voice that can lean playful or humorous when it fits. " |
|
|
"Use the CV and FAQs below as the authoritative source for personal facts and never fabricate additional personal details, stories, or opinions. " |
|
|
"Feel free to chat about general knowledge, your interests, or universally known truths, and you may mention well-established facts about yourself such as being based in Paris or working in AI. " |
|
|
"If someone requests private or highly specific personal information that is not covered by the CV or FAQs, explain politely that you do not have more to share, then keep the conversation inviting by offering related general insights or light banter. " |
|
|
"Always stay kind, respectful, and genuinely helpful, prioritising helpful engagement before declining. " |
|
|
f"Today's date is {current_date}. CV: {full_cv_text}" |
|
|
) |
|
|
|
|
|
if not isinstance(system_prompt, str): |
|
|
system_prompt = str(system_prompt) |
|
|
logger.info(f"System prompt type: {type(system_prompt)}, length: {len(system_prompt)}") |
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": "/no_think" + query}] |
|
|
|
|
|
try: |
|
|
system_tokens = len(generator.tokenize(system_prompt.encode('utf-8'), add_bos=True, special=True)) |
|
|
query_tokens = len(generator.tokenize(query.encode('utf-8'), add_bos=False, special=True)) |
|
|
history_tokens = [len(generator.tokenize(msg["content"].encode('utf-8'), add_bos=False, special=True)) for msg in history] |
|
|
except Exception as e: |
|
|
logger.error(f"Tokenization error: {str(e)}") |
|
|
yield f"data: Sorry, I encountered a tokenization error: {str(e)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
return |
|
|
|
|
|
total_tokens = system_tokens + query_tokens + sum(history_tokens) + len(history) * 10 + 10 |
|
|
max_allowed_tokens = generator.n_ctx() - 512 - 100 |
|
|
|
|
|
while total_tokens > max_allowed_tokens and history: |
|
|
removed_msg = history.pop(0) |
|
|
removed_tokens = len(generator.tokenize(removed_msg["content"].encode('utf-8'), add_bos=False, special=True)) |
|
|
total_tokens -= (removed_tokens + 10) |
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": query}] |
|
|
|
|
|
async with model_lock: |
|
|
try: |
|
|
for chunk in generator.create_chat_completion( |
|
|
messages=messages, |
|
|
max_tokens=512, |
|
|
stream=True, |
|
|
temperature=0.3, |
|
|
top_p=0.7, |
|
|
repeat_penalty=1.2 |
|
|
): |
|
|
token = chunk['choices'][0]['delta'].get('content', '') |
|
|
if token: |
|
|
if not first_token_logged: |
|
|
logger.info(f"First token time (local): {time.time() - start_time:.2f}s") |
|
|
first_token_logged = True |
|
|
yield f"data: {token}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
except Exception as e: |
|
|
logger.error(f"Generation error: {str(e)}") |
|
|
yield f"data: Sorry, I encountered an error during generation: {str(e)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
class QueryRequest(BaseModel): |
|
|
query: str |
|
|
history: list |
|
|
game_context: Optional[Union[str, Dict[str, Any]]] = None |
|
|
mode: Optional[str] = None |
|
|
game_memory: Optional[Dict[str, Any]] = None |
|
|
|
|
|
class TTSRequest(BaseModel): |
|
|
text: str |
|
|
|
|
|
|
|
|
def get_ram_usage(): |
|
|
memory = psutil.virtual_memory() |
|
|
total_ram = memory.total / (1024 ** 3) |
|
|
used_ram = memory.used / (1024 ** 3) |
|
|
free_ram = memory.available / (1024 ** 3) |
|
|
percent_used = memory.percent |
|
|
return { |
|
|
"total_ram_gb": round(total_ram, 2), |
|
|
"used_ram_gb": round(used_ram, 2), |
|
|
"free_ram_gb": round(free_ram, 2), |
|
|
"percent_used": percent_used |
|
|
} |
|
|
|
|
|
@app.post("/api/predict") |
|
|
@limiter.limit("5/minute") |
|
|
async def predict(request: Request, query_request: QueryRequest): |
|
|
query = query_request.query |
|
|
history = query_request.history |
|
|
game_context = query_request.game_context |
|
|
mode = (query_request.mode or '').lower() or None |
|
|
|
|
|
if (mode == 'game') and (query_request.game_memory is not None): |
|
|
if isinstance(game_context, dict): |
|
|
game_context = dict(game_context) |
|
|
game_context['__memory'] = query_request.game_memory |
|
|
else: |
|
|
game_context = { 'context': game_context, '__memory': query_request.game_memory } |
|
|
return StreamingResponse(stream_response(query, history, game_context, mode), media_type="text/event-stream") |
|
|
|
|
|
@app.post("/api/tts") |
|
|
@limiter.limit("5/minute") |
|
|
async def text_to_speech(request: Request, tts_request: TTSRequest): |
|
|
"""Convert text to speech using ElevenLabs or Gemini API""" |
|
|
|
|
|
|
|
|
clean_text = tts_request.text.replace("**", "").replace("*", "").replace("\n", " ").strip() |
|
|
|
|
|
if not clean_text: |
|
|
raise HTTPException(status_code=400, detail="No text provided for TTS") |
|
|
|
|
|
if len(clean_text) > 1000: |
|
|
clean_text = clean_text[:1000] + "..." |
|
|
|
|
|
if tts_provider == "gemini": |
|
|
if not gemini_client: |
|
|
raise HTTPException(status_code=503, detail="Gemini TTS service not available (API key missing)") |
|
|
|
|
|
try: |
|
|
response = gemini_client.models.generate_content( |
|
|
model=gemini_tts_model, |
|
|
contents=clean_text, |
|
|
config=types.GenerateContentConfig( |
|
|
response_modalities=["AUDIO"], |
|
|
speech_config=types.SpeechConfig( |
|
|
voice_config=types.VoiceConfig( |
|
|
prebuilt_voice_config=types.PrebuiltVoiceConfig( |
|
|
voice_name=gemini_tts_voice, |
|
|
) |
|
|
) |
|
|
), |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
pcm_data = response.candidates[0].content.parts[0].inline_data.data |
|
|
|
|
|
|
|
|
wav_buffer = io.BytesIO() |
|
|
with wave.open(wav_buffer, "wb") as wf: |
|
|
wf.setnchannels(1) |
|
|
wf.setsampwidth(2) |
|
|
wf.setframerate(24000) |
|
|
wf.writeframes(pcm_data) |
|
|
|
|
|
audio_bytes = wav_buffer.getvalue() |
|
|
|
|
|
return Response( |
|
|
content=audio_bytes, |
|
|
media_type="audio/wav", |
|
|
headers={ |
|
|
"Content-Disposition": "inline; filename=tts_audio.wav", |
|
|
"Cache-Control": "no-cache" |
|
|
} |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Gemini TTS error: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=f"Gemini TTS conversion failed: {str(e)}") |
|
|
|
|
|
else: |
|
|
if not elevenlabs_client: |
|
|
raise HTTPException(status_code=503, detail="TTS service not available") |
|
|
|
|
|
try: |
|
|
|
|
|
response = elevenlabs_client.text_to_speech.convert( |
|
|
voice_id=tts_voice_id, |
|
|
model_id="eleven_flash_v2_5", |
|
|
text=clean_text, |
|
|
voice_settings=VoiceSettings( |
|
|
stability=0.7, |
|
|
similarity_boost=0.9, |
|
|
style=0.2, |
|
|
use_speaker_boost=True |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
audio_bytes = b"".join(response) |
|
|
|
|
|
return Response( |
|
|
content=audio_bytes, |
|
|
media_type="audio/mpeg", |
|
|
headers={ |
|
|
"Content-Disposition": "inline; filename=tts_audio.mp3", |
|
|
"Cache-Control": "no-cache" |
|
|
} |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"TTS error: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=f"TTS conversion failed: {str(e)}") |
|
|
|
|
|
@app.get("/health") |
|
|
@limiter.limit("30/minute") |
|
|
async def health_check(request: Request): |
|
|
return {"status": "healthy"} |
|
|
|
|
|
@app.get("/model_info") |
|
|
@limiter.limit("10/minute") |
|
|
async def model_info(request: Request): |
|
|
base_info = { |
|
|
"embedding_model": sentence_transformer_model, |
|
|
"faiss_index_size": len(cv_chunks), |
|
|
"faiss_index_dim": cv_embeddings.shape[1], |
|
|
"tts_available": elevenlabs_client is not None, |
|
|
} |
|
|
|
|
|
if USE_GEMINI: |
|
|
base_info.update({ |
|
|
"model_type": "gemini", |
|
|
"model_name": gemini_model, |
|
|
"provider": "Google Gemini API", |
|
|
}) |
|
|
else: |
|
|
base_info.update({ |
|
|
"model_type": "local", |
|
|
"model_name": filename, |
|
|
"repo_id": repo_id, |
|
|
"model_size": "1.7B", |
|
|
"quantization": "Q4_K_M", |
|
|
}) |
|
|
|
|
|
return base_info |
|
|
|
|
|
@app.get("/ram_usage") |
|
|
@limiter.limit("20/minute") |
|
|
async def ram_usage(request: Request): |
|
|
"""Endpoint to get current RAM usage.""" |
|
|
try: |
|
|
ram_stats = get_ram_usage() |
|
|
return ram_stats |
|
|
except Exception as e: |
|
|
logger.error(f"Error retrieving RAM usage: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=f"Error retrieving RAM usage: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def setup_periodic_tasks(): |
|
|
if not USE_GEMINI: |
|
|
asyncio.create_task(keep_model_warm()) |
|
|
logger.info("Periodic model warm-up task scheduled for local model") |
|
|
else: |
|
|
logger.info("Gemini API in use - no warm-up needed") |
|
|
|
|
|
async def keep_model_warm(): |
|
|
"""Background task that keeps the local model warm by sending periodic requests""" |
|
|
while True: |
|
|
try: |
|
|
logger.info("Performing periodic local model warm-up") |
|
|
dummy_query = "Say only the word 'ok.'" |
|
|
dummy_history = [] |
|
|
|
|
|
async for _ in stream_response(dummy_query, dummy_history): |
|
|
pass |
|
|
logger.info("Periodic warm-up completed") |
|
|
except Exception as e: |
|
|
logger.error(f"Error in periodic warm-up: {str(e)}") |
|
|
|
|
|
|
|
|
await asyncio.sleep(13 * 60) |
|
|
|