File size: 32,868 Bytes
b77d28c cb8303f 293413b cb8303f 7ee4aae 1292878 cb8303f a3b349c aa6b888 588cb6a a2d5223 58d2235 687de1a ae2bc6b 1292878 964084b 1292878 7ee4aae 588cb6a cb8303f 7ee4aae ee37147 7ee4aae 1292878 58d2235 54039cd 588cb6a a29c4ff 588cb6a 58272f8 588cb6a 54039cd ae2bc6b a29c4ff e47a0a3 9c89db3 ae2bc6b 8d04f0d ae2bc6b 1292878 964084b 58d2235 9c89db3 48a65b5 9c89db3 e54e8f7 9c89db3 a29c4ff 588cb6a 58d2235 588cb6a a2d5223 a29c4ff a2d5223 cb8303f 54039cd 588cb6a a29c4ff 588cb6a cb8303f 54039cd 9c89db3 ae2bc6b cb8303f 588cb6a cb8303f 293413b 588cb6a a2d5223 588cb6a 54039cd 3bbf0cd cb8303f 7ee889e a3b349c ae2bc6b a3b349c ae2bc6b a3b349c ae2bc6b a3b349c f18ae9b a3b349c f18ae9b a3b349c f18ae9b a3b349c f18ae9b a3b349c f18ae9b 7ee889e f18ae9b 7ee889e f18ae9b 7ee889e f18ae9b a3b349c f18ae9b 2f6b259 f18ae9b 2f6b259 a3b349c 16a3faa ae2bc6b 43799fd ae2bc6b a3b349c 1817d6e a3b349c f18ae9b a3b349c d6f1c5d a3b349c ae2bc6b 43799fd fedc11d 43799fd 16a3faa fedc11d 5bf6ded 16a3faa 43799fd 16a3faa 5bf6ded 16a3faa 43799fd ae2bc6b 43799fd ae2bc6b 16a3faa ae2bc6b 16a3faa 43799fd 9737d81 a9456f8 ae2bc6b 16a3faa ae2bc6b 16a3faa ae2bc6b 5bf6ded a3b349c ae2bc6b 293413b 8583b57 b77d28c 293413b a3b349c 1817d6e a3b349c f18ae9b a3b349c d6f1c5d a3b349c 6f6e59d 3bbf0cd 8356d0c 6f6e59d 3bbf0cd 687de1a 6f6e59d 3bbf0cd 6f6e59d 58d2235 3bbf0cd ae2bc6b 3bbf0cd 8583b57 cb8303f 6f6e59d 1292878 a3b349c 2f6b259 1292878 0e9cc30 cb8303f 687de1a cb8303f 245f2cf 7ee4aae a3b349c 1817d6e 2f6b259 a3b349c 588cb6a 1292878 245f2cf 7ee4aae 964084b 1292878 964084b 1292878 964084b b8961cc 964084b 1292878 964084b 1292878 588cb6a 7ee4aae a2d5223 a29c4ff 7ee4aae 1292878 ae2bc6b 1292878 ae2bc6b 1292878 ae2bc6b 1292878 ae2bc6b 1292878 a29c4ff 687de1a 7ee4aae 687de1a e8ba1ec 703cd97 ae2bc6b 703cd97 ae2bc6b 703cd97 ae2bc6b 703cd97 a3b349c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 |
from datetime import datetime
import json
import time
import numpy as np
from sentence_transformers import SentenceTransformer
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import StreamingResponse, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Union, Dict, Any
from llama_cpp import Llama
from huggingface_hub import login, hf_hub_download
import logging
import os
import faiss
import asyncio
import psutil # Added for RAM tracking
from google import genai
from google.genai import types
import httpx
import wave
import io
from elevenlabs import ElevenLabs, VoiceSettings
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# Custom rate limit exceeded handler with logging
async def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
client_ip = get_remote_address(request)
logger.warning(f"Rate limit exceeded for IP {client_ip} on endpoint {request.url.path}")
# Return a proper JSON response for rate limiting
return Response(
content=json.dumps({
"error": "rate_limit_exceeded",
"message": "Too many requests. Please wait a moment before trying again.",
"retry_after": 60 # seconds
}),
status_code=429,
headers={
"Content-Type": "application/json",
"Retry-After": "60"
}
)
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
# Add CORS middleware to handle cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your domain
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global lock for model access
model_lock = asyncio.Lock()
# Authenticate with Hugging Face
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
logger.error("HF_TOKEN environment variable not set.")
raise ValueError("HF_TOKEN not set")
login(token=hf_token)
# Models Configuration
USE_GEMINI = os.getenv("USE_GEMINI", "false").lower() == "true"
sentence_transformer_model = "all-MiniLM-L6-v2"
repo_id = "unsloth/Qwen3-1.7B-GGUF" # "bartowski/deepcogito_cogito-v1-preview-llama-3B-GGUF" # "bartowski/deepcogito_cogito-v1-preview-llama-8B-GGUF"
filename = "Qwen3-1.7B-Q4_K_M.gguf" # "deepcogito_cogito-v1-preview-llama-3B-Q4_K_M.gguf"
# Gemini Configuration
if USE_GEMINI:
gemini_api_key = os.getenv("GEMINI_API_KEY")
gemini_model = os.getenv("GEMINI_MODEL")
if not gemini_api_key:
logger.error("GEMINI_API_KEY environment variable not set but USE_GEMINI is true.")
raise ValueError("GEMINI_API_KEY not set")
gemini_client = genai.Client(api_key=gemini_api_key)
logger.info("Gemini API client initialized")
else:
gemini_client = None
logger.info("Using local model (Gemini disabled)")
# ElevenLabs Configuration
elevenlabs_api_key = os.getenv("ELEVENLABS_API_KEY")
if elevenlabs_api_key:
elevenlabs_client = ElevenLabs(api_key=elevenlabs_api_key)
# You can set a specific voice ID here or use the default voice
# Get your voice ID from ElevenLabs dashboard after cloning your voice
tts_voice_id = os.getenv("ELEVENLABS_VOICE_ID", "21m00Tcm4TlvDq8ikWAM") # Default voice, replace with your cloned voice ID
logger.info("ElevenLabs TTS client initialized")
else:
elevenlabs_client = None
logger.info("ElevenLabs TTS disabled (no API key provided)")
# TTS Configuration
tts_provider = os.getenv("TTS_PROVIDER", "elevenlabs").lower()
gemini_tts_model = os.getenv("GEMINI_TTS_MODEL", "gemini-2.5-flash-preview-tts")
gemini_tts_voice = os.getenv("GEMINI_TTS_VOICE", "Kore")
# Define FAQs
faqs = [
{"question": "What is your name?", "answer": "My name is Tim Luka Horstmann."},
{"question": "Where do you live?", "answer": "I live in Paris, France."},
{"question": "What is your education?", "answer": "I am currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris. I have an MPhil in Advanced Computer Science from the University of Cambridge, and a BSc in Business Informatics from RheinMain University of Applied Sciences."},
{"question": "What are your skills?", "answer": "I am proficient in Python, Java, SQL, Cypher, SPARQL, VBA, JavaScript, HTML/CSS, and Ruby. I also use tools like PyTorch, Hugging Face, Scikit-Learn, NumPy, Pandas, Matplotlib, Jupyter, Git, Bash, IoT, Ansible, QuickSight, and Wordpress."},
{"question": "How are you?", "answer": "I’m doing great, thanks for asking! I’m enjoying life in Paris and working on some exciting AI projects."},
{"question": "What do you do?", "answer": "I’m a Computer Scientist and AI enthusiast, currently pursuing a MSc in Data and AI at Institut Polytechnique de Paris and interning as a Machine Learning Research Engineer at Hi! PARIS."},
{"question": "How’s it going?", "answer": "Things are going well, thanks! I’m busy with my studies and research, but I love the challenges and opportunities I get to explore."},
]
try:
# Load CV embeddings and build FAISS index
logger.info("Loading CV embeddings from cv_embeddings.json")
with open("cv_embeddings.json", "r", encoding="utf-8") as f:
cv_data = json.load(f)
cv_chunks = [item["chunk"] for item in cv_data]
cv_embeddings = np.array([item["embedding"] for item in cv_data]).astype('float32')
faiss.normalize_L2(cv_embeddings)
faiss_index = faiss.IndexFlatIP(cv_embeddings.shape[1])
faiss_index.add(cv_embeddings)
logger.info("FAISS index built successfully")
# Load embedding model
logger.info("Loading SentenceTransformer model")
embedder = SentenceTransformer(sentence_transformer_model, device="cpu")
logger.info("SentenceTransformer model loaded")
# Compute FAQ embeddings
faq_questions = [faq["question"] for faq in faqs]
faq_embeddings = embedder.encode(faq_questions, convert_to_numpy=True).astype("float32")
faiss.normalize_L2(faq_embeddings)
# Load the local model only if not using Gemini
if not USE_GEMINI:
logger.info(f"Loading {filename} model")
model_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
local_dir="/app/cache" if os.getenv("HF_HOME") else None,
token=hf_token,
)
generator = Llama(
model_path=model_path,
n_ctx=3072,
n_threads=2,
n_batch=64,
n_gpu_layers=0,
use_mlock=True,
f16_kv=True,
verbose=True,
batch_prefill=True,
prefill_logits=False,
)
logger.info(f"{filename} model loaded")
else:
generator = None
logger.info("Skipping local model loading (using Gemini API)")
except Exception as e:
logger.error(f"Startup error: {str(e)}", exc_info=True)
raise
def retrieve_context(query, top_k=2):
try:
query_embedding = embedder.encode(query, convert_to_numpy=True).astype("float32")
query_embedding = query_embedding.reshape(1, -1)
faiss.normalize_L2(query_embedding)
distances, indices = faiss_index.search(query_embedding, top_k)
return "\n".join([cv_chunks[i] for i in indices[0]])
except Exception as e:
logger.error(f"Error in retrieve_context: {str(e)}")
raise
# Load the full CV at startup with explicit UTF-8 handling
try:
with open("cv_text.txt", "r", encoding="utf-8") as f:
full_cv_text = f.read()
if not isinstance(full_cv_text, str):
full_cv_text = str(full_cv_text)
logger.info("CV text loaded successfully")
except Exception as e:
logger.error(f"Error loading cv_text.txt: {str(e)}")
raise
# Optional: Load additional station info (admin-editable) from JSON
STATION_EXTRAS_BY_NAME: Dict[str, str] = {}
STATION_EXTRAS_BY_ID: Dict[str, str] = {}
try:
extra_path = os.path.join(os.path.dirname(__file__), 'stations_extra.json')
if os.path.exists(extra_path):
with open(extra_path, 'r', encoding='utf-8') as f:
extra_data = json.load(f)
# Accept either mapping format or list of entries
if isinstance(extra_data, dict):
# by_name or by_id mapping
by_name = extra_data.get('by_name') or {}
by_id = extra_data.get('by_id') or {}
if isinstance(by_name, dict):
for k, v in by_name.items():
if isinstance(k, str) and isinstance(v, str):
STATION_EXTRAS_BY_NAME[k.strip().lower()] = v.strip()
if isinstance(by_id, dict):
for k, v in by_id.items():
if isinstance(k, str) and isinstance(v, str):
STATION_EXTRAS_BY_ID[k.strip().lower()] = v.strip()
# Also support {"stations": [{"id":..., "name":..., "extra":...}]}
stations = extra_data.get('stations') or []
if isinstance(stations, list):
for item in stations:
if not isinstance(item, dict):
continue
name = (item.get('name') or '').strip().lower()
sid = (item.get('id') or '').strip().lower()
extra = (item.get('extra') or '').strip()
if name and extra:
STATION_EXTRAS_BY_NAME[name] = extra
if sid and extra:
STATION_EXTRAS_BY_ID[sid] = extra
elif isinstance(extra_data, list):
for item in extra_data:
if not isinstance(item, dict):
continue
name = (item.get('name') or '').strip().lower()
sid = (item.get('id') or '').strip().lower()
extra = (item.get('extra') or '').strip()
if name and extra:
STATION_EXTRAS_BY_NAME[name] = extra
if sid and extra:
STATION_EXTRAS_BY_ID[sid] = extra
logger.info(f"Loaded station extras: {len(STATION_EXTRAS_BY_NAME)} by name, {len(STATION_EXTRAS_BY_ID)} by id")
else:
logger.info("No stations_extra.json found; skipping extras")
except Exception as e:
logger.warning(f"Failed to load stations_extra.json: {e}")
async def stream_response(query, history, game_context=None, mode: Optional[str] = None):
"""Main streaming response function that routes to either Gemini or local model"""
if USE_GEMINI:
async for chunk in stream_response_gemini(query, history, game_context, mode):
yield chunk
else:
async for chunk in stream_response_local(query, history, game_context, mode):
yield chunk
def _format_game_context_for_prompt(game_context: Optional[Union[str, Dict[str, Any]]]) -> str:
"""Return a concise text snippet to inject into the system prompt from game context.
Key changes:
- Prefer current station (id/name) and filter transcript strictly to this station to avoid confusion.
- Include admin-provided extras when available.
- Provide a short visited list for flavor only.
"""
if not game_context:
return ""
try:
if isinstance(game_context, str):
return f"\nGAME CONTEXT: The player is currently at a station about {game_context}."
if isinstance(game_context, dict):
current = (game_context.get('current_station') or game_context.get('station') or '').strip()
current_id = (game_context.get('current_station_id') or '').strip()
visited = game_context.get('visited_stations') or []
context = (game_context.get('context') or game_context.get('current_context') or '').strip()
parts = ["\nGAME CONTEXT:"]
if current or current_id:
parts.append(f"Current station: {current} (id: {current_id}).")
if context:
parts.append(f"Station details: {context}.")
# Look up admin-provided extras
extra_text = ''
if current_id:
extra_text = STATION_EXTRAS_BY_ID.get(current_id.lower(), '')
if not extra_text and current:
extra_text = STATION_EXTRAS_BY_NAME.get(current.lower(), '')
if extra_text:
parts.append(f"Additional station notes (must consider): {extra_text}")
# Visited list (names only, de-duplicated)
if visited:
try:
uniq = []
for v in visited:
if v and v not in uniq:
uniq.append(v)
if uniq:
parts.append(f"Visited stations so far: {', '.join(uniq)}.")
except Exception:
pass
# Memory transcript: include only messages tied to this station to reduce confusion
mem = game_context.get('__memory') if isinstance(game_context, dict) else None
if isinstance(mem, dict):
try:
transcript = mem.get('transcript') or []
if transcript and (current or current_id):
def belongs_here(m: Dict[str, Any]) -> bool:
sid = (m.get('stationId') or '').strip()
sname = (m.get('stationName') or '').strip()
if current_id and sid:
if sid.strip().lower() == current_id.lower():
return True
if current and sname:
if sname.strip().lower() == current.lower():
return True
return False
filtered = [m for m in transcript if belongs_here(m)]
# if nothing matches, don't include cross-station chatter
if filtered:
lines = []
for m in filtered[-20:]:
role = (m.get('role') or '').strip()
src = (m.get('source') or '').strip()
sta = (m.get('stationName') or '').strip()
txt = (m.get('content') or '').replace('\n', ' ').strip()
if len(txt) > 2000:
txt = txt[:2000] + '…'
label = role if role else 'msg'
if src or sta:
label += f"[{src}{'/' + sta if sta else ''}]"
lines.append(f"- {label}: {txt}")
if lines:
parts.append("Recent exchanges at this station:\n" + "\n".join(lines))
except Exception:
pass
return " ".join(parts)
except Exception:
return ""
async def stream_response_gemini(query, history, game_context=None, mode: Optional[str] = None):
"""Stream response using Gemini API with a proper system_instruction."""
logger.info(f"Processing query with Gemini: {query}")
start_time = time.time()
first_token_logged = False
# 1) Build your system prompt once
current_date = datetime.now().strftime("%Y-%m-%d")
game_context_text = _format_game_context_for_prompt(game_context)
# Only enable game persona when explicitly requested via mode=='game'.
# Do NOT infer game mode from presence of game_context or memory alone.
is_game_mode = (mode == 'game')
if is_game_mode:
system_prompt = (
"You are Tim Luka Horstmann as a friendly in-game 'Station Guide'. "
"Stay in first person. The current station is the primary focus—treat it as 'now' even when revisiting. "
"Use the CV for personal facts, roles and dates; include 'Additional station notes' if present and relevant. "
"Keep answers concise (2–4 sentences). If off-topic, answer briefly, then gently steer back to the current station. "
"When revisiting a station, optionally acknowledge the revisit in one short clause. "
"End with a tiny hint or nudge (optional) about what to explore here or next. "
f"Today's date is {current_date}. CV: {full_cv_text}" + game_context_text
)
else:
system_prompt = (
"You are Tim Luka Horstmann, a Computer Scientist. "
"Respond as yourself in the first person with a warm, upbeat tone—feel free to be lighthearted, witty, and encouraging. "
"Rely on the CV and FAQs below for personal facts and never invent new personal details, achievements, or opinions that are not documented there. "
"You may comfortably discuss well-established general knowledge, your fields of interest, and everyday small talk, and you can acknowledge known facts about yourself such as living in Paris or working in AI. "
"When a user seeks private or highly specific personal information that is missing from the CV or FAQs, politely explain that you do not have more to share and try to keep the conversation engaging by offering related general insights or friendly banter. "
"Always remain kind, respectful, and helpful, and lean into genuinely helpful answers before declining. "
f"Today's date is {current_date}. CV: {full_cv_text}"
)
# 2) Build only user/model history as `contents`
contents = []
for msg in history:
# Ensure the role is compatible with Gemini API ('user' or 'model')
api_role = ""
if msg["role"] == "user":
api_role = "user"
elif msg["role"] == "assistant": # Map "assistant" from client to "model" for API
api_role = "model"
elif msg["role"] == "model": # Already correct
api_role = "model"
else:
# Log a warning or handle unrecognized roles as needed
logger.warning(f"Unrecognized role '{msg['role']}' in history. Skipping message.")
continue
contents.append(
types.Content(
role=api_role,
parts=[ types.Part.from_text(text=msg["content"]) ]
)
)
# finally append the new user question
contents.append(
types.Content(
role="user",
parts=[ types.Part.from_text(text=query) ]
)
)
# 3) Call Gemini with `system_instruction`
try:
response = gemini_client.models.generate_content_stream(
model=gemini_model,
contents=contents,
config=types.GenerateContentConfig(
system_instruction=system_prompt,
temperature=0.3,
top_p=0.7,
max_output_tokens=1024,
response_mime_type="text/plain",
)
)
for chunk in response:
if chunk.text:
if not first_token_logged:
logger.info(f"First token time (Gemini): {time.time() - start_time:.2f}s")
first_token_logged = True
yield f"data: {chunk.text}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Gemini API error: {str(e)}")
yield f"data: Sorry, I encountered an error with Gemini API: {str(e)}\n\n"
yield "data: [DONE]\n\n"
async def stream_response_local(query, history, game_context=None, mode: Optional[str] = None):
"""Stream response using local model"""
logger.info(f"Processing query with local model: {query}")
start_time = time.time()
first_token_logged = False
current_date = datetime.now().strftime("%Y-%m-%d")
game_context_text = _format_game_context_for_prompt(game_context)
# Only enable game persona when explicitly requested via mode=='game'.
# Do NOT infer game mode from presence of game_context or memory alone.
is_game_mode = (mode == 'game')
if is_game_mode:
system_prompt = (
"/no_think You are Tim Luka Horstmann as a friendly in-game 'Station Guide'. "
"Stay in first person. The current station is the primary focus—treat it as 'now' even when revisiting. "
"Use the CV for personal facts, roles and dates; include 'Additional station notes' if present (do not ignore them). "
"Keep answers concise (2–4 sentences). If off-topic, answer briefly, then gently steer back to the current station. "
"When revisiting a station, optionally acknowledge the revisit in one short clause. "
"End with a tiny hint or nudge (optional) about what to explore here or next. "
f"Today's date is {current_date}. CV: {full_cv_text}" + game_context_text
)
else:
system_prompt = (
"/no_think You are Tim Luka Horstmann, a Computer Scientist. Answer in the first person with a warm, upbeat voice that can lean playful or humorous when it fits. "
"Use the CV and FAQs below as the authoritative source for personal facts and never fabricate additional personal details, stories, or opinions. "
"Feel free to chat about general knowledge, your interests, or universally known truths, and you may mention well-established facts about yourself such as being based in Paris or working in AI. "
"If someone requests private or highly specific personal information that is not covered by the CV or FAQs, explain politely that you do not have more to share, then keep the conversation inviting by offering related general insights or light banter. "
"Always stay kind, respectful, and genuinely helpful, prioritising helpful engagement before declining. "
f"Today's date is {current_date}. CV: {full_cv_text}"
)
if not isinstance(system_prompt, str):
system_prompt = str(system_prompt)
logger.info(f"System prompt type: {type(system_prompt)}, length: {len(system_prompt)}")
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": "/no_think" + query}]
try:
system_tokens = len(generator.tokenize(system_prompt.encode('utf-8'), add_bos=True, special=True))
query_tokens = len(generator.tokenize(query.encode('utf-8'), add_bos=False, special=True))
history_tokens = [len(generator.tokenize(msg["content"].encode('utf-8'), add_bos=False, special=True)) for msg in history]
except Exception as e:
logger.error(f"Tokenization error: {str(e)}")
yield f"data: Sorry, I encountered a tokenization error: {str(e)}\n\n"
yield "data: [DONE]\n\n"
return
total_tokens = system_tokens + query_tokens + sum(history_tokens) + len(history) * 10 + 10
max_allowed_tokens = generator.n_ctx() - 512 - 100
while total_tokens > max_allowed_tokens and history:
removed_msg = history.pop(0)
removed_tokens = len(generator.tokenize(removed_msg["content"].encode('utf-8'), add_bos=False, special=True))
total_tokens -= (removed_tokens + 10)
messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": query}]
async with model_lock:
try:
for chunk in generator.create_chat_completion(
messages=messages,
max_tokens=512,
stream=True,
temperature=0.3,
top_p=0.7,
repeat_penalty=1.2
):
token = chunk['choices'][0]['delta'].get('content', '')
if token:
if not first_token_logged:
logger.info(f"First token time (local): {time.time() - start_time:.2f}s")
first_token_logged = True
yield f"data: {token}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Generation error: {str(e)}")
yield f"data: Sorry, I encountered an error during generation: {str(e)}\n\n"
yield "data: [DONE]\n\n"
class QueryRequest(BaseModel):
query: str
history: list
game_context: Optional[Union[str, Dict[str, Any]]] = None
mode: Optional[str] = None
game_memory: Optional[Dict[str, Any]] = None
class TTSRequest(BaseModel):
text: str
# RAM Usage Tracking Function
def get_ram_usage():
memory = psutil.virtual_memory()
total_ram = memory.total / (1024 ** 3) # Convert to GB
used_ram = memory.used / (1024 ** 3) # Convert to GB
free_ram = memory.available / (1024 ** 3) # Convert to GB
percent_used = memory.percent
return {
"total_ram_gb": round(total_ram, 2),
"used_ram_gb": round(used_ram, 2),
"free_ram_gb": round(free_ram, 2),
"percent_used": percent_used
}
@app.post("/api/predict")
@limiter.limit("5/minute") # Allow 5 chat requests per minute per IP
async def predict(request: Request, query_request: QueryRequest):
query = query_request.query
history = query_request.history
game_context = query_request.game_context
mode = (query_request.mode or '').lower() or None
# Attach optional game_memory only when in explicit game mode
if (mode == 'game') and (query_request.game_memory is not None):
if isinstance(game_context, dict):
game_context = dict(game_context)
game_context['__memory'] = query_request.game_memory
else:
game_context = { 'context': game_context, '__memory': query_request.game_memory }
return StreamingResponse(stream_response(query, history, game_context, mode), media_type="text/event-stream")
@app.post("/api/tts")
@limiter.limit("5/minute") # Allow 5 TTS requests per minute per IP
async def text_to_speech(request: Request, tts_request: TTSRequest):
"""Convert text to speech using ElevenLabs or Gemini API"""
# Clean the text for TTS (remove markdown and special characters)
clean_text = tts_request.text.replace("**", "").replace("*", "").replace("\n", " ").strip()
if not clean_text:
raise HTTPException(status_code=400, detail="No text provided for TTS")
if len(clean_text) > 1000: # Limit text length to avoid long processing times
clean_text = clean_text[:1000] + "..."
if tts_provider == "gemini":
if not gemini_client:
raise HTTPException(status_code=503, detail="Gemini TTS service not available (API key missing)")
try:
response = gemini_client.models.generate_content(
model=gemini_tts_model,
contents=clean_text,
config=types.GenerateContentConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(
voice_name=gemini_tts_voice,
)
)
),
)
)
# Get raw PCM data
pcm_data = response.candidates[0].content.parts[0].inline_data.data
# Convert PCM to WAV
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(24000)
wf.writeframes(pcm_data)
audio_bytes = wav_buffer.getvalue()
return Response(
content=audio_bytes,
media_type="audio/wav",
headers={
"Content-Disposition": "inline; filename=tts_audio.wav",
"Cache-Control": "no-cache"
}
)
except Exception as e:
logger.error(f"Gemini TTS error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Gemini TTS conversion failed: {str(e)}")
else:
if not elevenlabs_client:
raise HTTPException(status_code=503, detail="TTS service not available")
try:
# Generate speech
response = elevenlabs_client.text_to_speech.convert(
voice_id=tts_voice_id,
model_id="eleven_flash_v2_5",
text=clean_text,
voice_settings=VoiceSettings(
stability=0.7, # More stability = less variability; best: 0.7–0.85
similarity_boost=0.9, # Boost similarity to original voice
style=0.2, # Keep subtle emotion; increase for expressive output
use_speaker_boost=True # Helps preserve speaker identity better
)
)
# Convert generator to bytes
audio_bytes = b"".join(response)
return Response(
content=audio_bytes,
media_type="audio/mpeg",
headers={
"Content-Disposition": "inline; filename=tts_audio.mp3",
"Cache-Control": "no-cache"
}
)
except Exception as e:
logger.error(f"TTS error: {str(e)}")
raise HTTPException(status_code=500, detail=f"TTS conversion failed: {str(e)}")
@app.get("/health")
@limiter.limit("30/minute") # Allow frequent health checks
async def health_check(request: Request):
return {"status": "healthy"}
@app.get("/model_info")
@limiter.limit("10/minute") # Limit model info requests
async def model_info(request: Request):
base_info = {
"embedding_model": sentence_transformer_model,
"faiss_index_size": len(cv_chunks),
"faiss_index_dim": cv_embeddings.shape[1],
"tts_available": elevenlabs_client is not None,
}
if USE_GEMINI:
base_info.update({
"model_type": "gemini",
"model_name": gemini_model,
"provider": "Google Gemini API",
})
else:
base_info.update({
"model_type": "local",
"model_name": filename,
"repo_id": repo_id,
"model_size": "1.7B",
"quantization": "Q4_K_M",
})
return base_info
@app.get("/ram_usage")
@limiter.limit("20/minute") # Allow moderate monitoring requests
async def ram_usage(request: Request):
"""Endpoint to get current RAM usage."""
try:
ram_stats = get_ram_usage()
return ram_stats
except Exception as e:
logger.error(f"Error retrieving RAM usage: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error retrieving RAM usage: {str(e)}")
# @app.on_event("startup")
# async def warm_up_model():
# logger.info("Warming up the model...")
# dummy_query = "Hello"
# dummy_history = []
# async for _ in stream_response(dummy_query, dummy_history):
# pass
# logger.info("Model warm-up completed.")
# # Log initial RAM usage
# ram_stats = get_ram_usage()
# logger.info(f"Initial RAM usage after startup: {ram_stats}")
# Add a background task to keep the model warm
@app.on_event("startup")
async def setup_periodic_tasks():
if not USE_GEMINI: # Only warm up local models
asyncio.create_task(keep_model_warm())
logger.info("Periodic model warm-up task scheduled for local model")
else:
logger.info("Gemini API in use - no warm-up needed")
async def keep_model_warm():
"""Background task that keeps the local model warm by sending periodic requests"""
while True:
try:
logger.info("Performing periodic local model warm-up")
dummy_query = "Say only the word 'ok.'"
dummy_history = []
# Process a dummy query through the generator to keep it warm
async for _ in stream_response(dummy_query, dummy_history):
pass
logger.info("Periodic warm-up completed")
except Exception as e:
logger.error(f"Error in periodic warm-up: {str(e)}")
# Wait for 13 minutes before the next warm-up
await asyncio.sleep(13 * 60)
|