fastrtc / app.py
bharathmunakala's picture
Update app.py
84913ca verified
import argparse
import io
import time
import re
from typing import Generator, Tuple, Union
import numpy as np
import soundfile as sf
from fastrtc import (
AlgoOptions,
ReplyOnPause,
Stream,
)
from cartesia import Cartesia
from loguru import logger
from dotenv import load_dotenv
import os
load_dotenv()
from websearch_agent import agent, agent_config
logger.remove()
logger.add(
lambda msg: print(msg),
colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level}</level> | <level>{message}</level>",
)
# Initialize Cartesia with API key
cartesia_client = Cartesia(api_key=os.getenv("CARTESIA_API_KEY"))
# Cartesia Sonic 3 TTS Configuration
logger.info("🎀 Initializing Cartesia Sonic 3 TTS...")
CARTESIA_TTS_CONFIG = {
"model_id": "sonic-3", # Latest streaming TTS model
"voice": {
"mode": "id",
"id": "f786b574-daa5-4673-aa0c-cbe3e8534c02", # Katie - stable, realistic voice for voice agents
},
"output_format": {
"container": "raw",
"sample_rate": 24000,
"encoding": "pcm_f32le",
},
}
logger.info("βœ… Cartesia Sonic 3 TTS configured successfully")
def response(
audio: tuple[int, np.ndarray],
) -> Generator[Tuple[int, np.ndarray], None, None]:
"""
Process audio input, transcribe it, generate a response using LangGraph, and deliver TTS audio.
Optimized for ultra-low latency with Cartesia STT and Sonic 3 TTS.
Args:
audio: Tuple containing sample rate and audio data
Yields:
Tuples of (sample_rate, audio_array) for audio playback
"""
start_time = time.time()
logger.info("πŸŽ™οΈ Received audio input")
# ============ STT (Speech-to-Text) with Cartesia ============
stt_start = time.time()
logger.debug("πŸ”„ Transcribing audio with Cartesia...")
sample_rate, audio_data = audio
# Convert audio to PCM format for Cartesia
# Cartesia expects 16kHz, 16-bit PCM
target_sample_rate = 16000
# Resample if needed
if sample_rate != target_sample_rate:
import librosa
# Convert to float32 for resampling
if audio_data.dtype != np.float32:
audio_float = audio_data.astype(np.float32) / np.iinfo(audio_data.dtype).max
else:
audio_float = audio_data
# Resample
audio_resampled = librosa.resample(
audio_float.T.flatten() if audio_float.ndim > 1 else audio_float,
orig_sr=sample_rate,
target_sr=target_sample_rate
)
audio_data = audio_resampled
sample_rate = target_sample_rate
# Convert to 16-bit PCM bytes
if audio_data.dtype == np.float32:
audio_int16 = (audio_data * 32767).astype(np.int16)
else:
audio_int16 = audio_data.astype(np.int16)
audio_bytes = audio_int16.tobytes()
# Create websocket connection with optimized endpointing
ws = cartesia_client.stt.websocket(
model="ink-whisper",
language="en",
encoding="pcm_s16le",
sample_rate=target_sample_rate,
min_volume=0.1, # Low threshold for voice detection
max_silence_duration_secs=0.3, # Quick endpointing
)
# Send audio in chunks (20ms chunks for streaming)
chunk_size = int(target_sample_rate * 0.02 * 2) # 20ms chunks
for i in range(0, len(audio_bytes), chunk_size):
chunk = audio_bytes[i:i + chunk_size]
if chunk:
ws.send(chunk)
# Finalize transcription
ws.send("finalize")
ws.send("done")
# Receive transcription results
transcript = ""
for result in ws.receive():
if result['type'] == 'transcript':
if result['is_final']:
transcript = result['text']
break
elif result['type'] == 'done':
break
ws.close()
stt_time = time.time() - stt_start
logger.info(f'πŸ‘‚ Transcribed in {stt_time:.2f}s: "{transcript}"')
# ============ LLM (Language Model) ============
llm_start = time.time()
logger.debug("🧠 Running agent...")
agent_response = agent.invoke(
{"messages": [{"role": "user", "content": transcript}]}, config=agent_config
)
response_text = agent_response["messages"][-1].content
llm_time = time.time() - llm_start
logger.info(f'πŸ’¬ Response in {llm_time:.2f}s: "{response_text}"')
# ============ TTS (Text-to-Speech) with Cartesia Sonic 3 ============
tts_start = time.time()
logger.debug("πŸ”Š Generating speech with Cartesia Sonic 3...")
# Clean markdown formatting for better TTS output
clean_text = response_text
# Remove asterisks (bold/italic markdown)
clean_text = re.sub(r'\*+', '', clean_text)
# Remove other common markdown symbols (including table separators)
clean_text = re.sub(r'[#_`]', '', clean_text)
# Remove dashes/hyphens used in tables and horizontal rules
clean_text = re.sub(r'-{2,}', ' ', clean_text) # Replace multiple dashes with space
# Remove pipe symbols used in markdown tables
clean_text = re.sub(r'\|', ' ', clean_text)
# Remove extra whitespace
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
if clean_text != response_text:
logger.debug(f"Cleaned text for TTS: {clean_text}")
try:
# Generate speech using Cartesia Sonic 3 TTS (streaming)
chunk_count = 0
chunk_iter = cartesia_client.tts.bytes(
model_id=CARTESIA_TTS_CONFIG["model_id"],
transcript=clean_text,
voice=CARTESIA_TTS_CONFIG["voice"],
output_format=CARTESIA_TTS_CONFIG["output_format"],
)
# Buffer to accumulate partial chunks
buffer = b""
element_size = 4 # float32 is 4 bytes
# Stream audio chunks and convert to FastRTC format
for chunk in chunk_iter:
# Accumulate chunks in buffer
buffer += chunk
# Process complete float32 samples
num_complete_samples = len(buffer) // element_size
if num_complete_samples > 0:
# Extract complete samples
complete_bytes = num_complete_samples * element_size
complete_buffer = buffer[:complete_bytes]
buffer = buffer[complete_bytes:] # Keep remainder for next iteration
# Convert to numpy array
audio_array = np.frombuffer(complete_buffer, dtype=np.float32)
chunk_count += 1
# Yield in FastRTC format: (sample_rate, audio_array)
yield (CARTESIA_TTS_CONFIG["output_format"]["sample_rate"], audio_array)
# Process any remaining bytes in buffer
if len(buffer) > 0:
# Pad to complete sample if needed
remainder = len(buffer) % element_size
if remainder != 0:
buffer += b'\x00' * (element_size - remainder)
if len(buffer) >= element_size:
audio_array = np.frombuffer(buffer, dtype=np.float32)
chunk_count += 1
yield (CARTESIA_TTS_CONFIG["output_format"]["sample_rate"], audio_array)
tts_time = time.time() - tts_start
total_time = time.time() - start_time
logger.info(f'⚑ Performance: STT={stt_time:.2f}s | LLM={llm_time:.2f}s | TTS={tts_time:.2f}s | Total={total_time:.2f}s | Chunks={chunk_count}')
except Exception as e:
logger.error(f"Error in Cartesia TTS generation: {str(e)}")
raise
def create_stream() -> Stream:
"""
Create and configure a Stream instance with audio capabilities.
Optimized for low latency with RTC configuration for cloud deployment.
Returns:
Stream: Configured FastRTC Stream instance
"""
# RTC Configuration for Hugging Face Spaces deployment
rtc_config = {
"iceServers": [
{
"urls": ["stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"]
}
]
}
return Stream(
modality="audio",
mode="send-receive",
handler=ReplyOnPause(
response,
algo_options=AlgoOptions(
speech_threshold=0.4, # Slightly lower for faster detection
),
),
rtc_configuration=rtc_config, # Required for Hugging Face Spaces
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="FastRTC Cartesia Voice Agent (Ultra-Low Latency)")
parser.add_argument(
"--phone",
action="store_true",
help="Launch with FastRTC phone interface (get a temp phone number)",
)
args = parser.parse_args()
stream = create_stream()
logger.info("🎧 Stream handler configured")
if args.phone:
logger.info("🌈 Launching with FastRTC phone interface...")
stream.fastphone()
else:
logger.info("🌈 Launching with Gradio UI...")
# Configure for both local and Hugging Face Spaces deployment
stream.ui.launch(
server_name="0.0.0.0", # Bind to all interfaces for cloud deployment
server_port=int(os.getenv("PORT", 7860)), # Use PORT env var if available, default to 7860
)