Spaces:
Sleeping
Sleeping
File size: 9,372 Bytes
40d9fe6 aeb59c2 40d9fe6 aeb59c2 40d9fe6 aeb59c2 40d9fe6 84913ca 40d9fe6 84913ca 40d9fe6 84913ca 40d9fe6 aeb59c2 40d9fe6 84913ca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
import argparse
import io
import time
import re
from typing import Generator, Tuple, Union
import numpy as np
import soundfile as sf
from fastrtc import (
AlgoOptions,
ReplyOnPause,
Stream,
)
from cartesia import Cartesia
from loguru import logger
from dotenv import load_dotenv
import os
load_dotenv()
from websearch_agent import agent, agent_config
logger.remove()
logger.add(
lambda msg: print(msg),
colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level}</level> | <level>{message}</level>",
)
# Initialize Cartesia with API key
cartesia_client = Cartesia(api_key=os.getenv("CARTESIA_API_KEY"))
# Cartesia Sonic 3 TTS Configuration
logger.info("π€ Initializing Cartesia Sonic 3 TTS...")
CARTESIA_TTS_CONFIG = {
"model_id": "sonic-3", # Latest streaming TTS model
"voice": {
"mode": "id",
"id": "f786b574-daa5-4673-aa0c-cbe3e8534c02", # Katie - stable, realistic voice for voice agents
},
"output_format": {
"container": "raw",
"sample_rate": 24000,
"encoding": "pcm_f32le",
},
}
logger.info("β
Cartesia Sonic 3 TTS configured successfully")
def response(
audio: tuple[int, np.ndarray],
) -> Generator[Tuple[int, np.ndarray], None, None]:
"""
Process audio input, transcribe it, generate a response using LangGraph, and deliver TTS audio.
Optimized for ultra-low latency with Cartesia STT and Sonic 3 TTS.
Args:
audio: Tuple containing sample rate and audio data
Yields:
Tuples of (sample_rate, audio_array) for audio playback
"""
start_time = time.time()
logger.info("ποΈ Received audio input")
# ============ STT (Speech-to-Text) with Cartesia ============
stt_start = time.time()
logger.debug("π Transcribing audio with Cartesia...")
sample_rate, audio_data = audio
# Convert audio to PCM format for Cartesia
# Cartesia expects 16kHz, 16-bit PCM
target_sample_rate = 16000
# Resample if needed
if sample_rate != target_sample_rate:
import librosa
# Convert to float32 for resampling
if audio_data.dtype != np.float32:
audio_float = audio_data.astype(np.float32) / np.iinfo(audio_data.dtype).max
else:
audio_float = audio_data
# Resample
audio_resampled = librosa.resample(
audio_float.T.flatten() if audio_float.ndim > 1 else audio_float,
orig_sr=sample_rate,
target_sr=target_sample_rate
)
audio_data = audio_resampled
sample_rate = target_sample_rate
# Convert to 16-bit PCM bytes
if audio_data.dtype == np.float32:
audio_int16 = (audio_data * 32767).astype(np.int16)
else:
audio_int16 = audio_data.astype(np.int16)
audio_bytes = audio_int16.tobytes()
# Create websocket connection with optimized endpointing
ws = cartesia_client.stt.websocket(
model="ink-whisper",
language="en",
encoding="pcm_s16le",
sample_rate=target_sample_rate,
min_volume=0.1, # Low threshold for voice detection
max_silence_duration_secs=0.3, # Quick endpointing
)
# Send audio in chunks (20ms chunks for streaming)
chunk_size = int(target_sample_rate * 0.02 * 2) # 20ms chunks
for i in range(0, len(audio_bytes), chunk_size):
chunk = audio_bytes[i:i + chunk_size]
if chunk:
ws.send(chunk)
# Finalize transcription
ws.send("finalize")
ws.send("done")
# Receive transcription results
transcript = ""
for result in ws.receive():
if result['type'] == 'transcript':
if result['is_final']:
transcript = result['text']
break
elif result['type'] == 'done':
break
ws.close()
stt_time = time.time() - stt_start
logger.info(f'π Transcribed in {stt_time:.2f}s: "{transcript}"')
# ============ LLM (Language Model) ============
llm_start = time.time()
logger.debug("π§ Running agent...")
agent_response = agent.invoke(
{"messages": [{"role": "user", "content": transcript}]}, config=agent_config
)
response_text = agent_response["messages"][-1].content
llm_time = time.time() - llm_start
logger.info(f'π¬ Response in {llm_time:.2f}s: "{response_text}"')
# ============ TTS (Text-to-Speech) with Cartesia Sonic 3 ============
tts_start = time.time()
logger.debug("π Generating speech with Cartesia Sonic 3...")
# Clean markdown formatting for better TTS output
clean_text = response_text
# Remove asterisks (bold/italic markdown)
clean_text = re.sub(r'\*+', '', clean_text)
# Remove other common markdown symbols (including table separators)
clean_text = re.sub(r'[#_`]', '', clean_text)
# Remove dashes/hyphens used in tables and horizontal rules
clean_text = re.sub(r'-{2,}', ' ', clean_text) # Replace multiple dashes with space
# Remove pipe symbols used in markdown tables
clean_text = re.sub(r'\|', ' ', clean_text)
# Remove extra whitespace
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
if clean_text != response_text:
logger.debug(f"Cleaned text for TTS: {clean_text}")
try:
# Generate speech using Cartesia Sonic 3 TTS (streaming)
chunk_count = 0
chunk_iter = cartesia_client.tts.bytes(
model_id=CARTESIA_TTS_CONFIG["model_id"],
transcript=clean_text,
voice=CARTESIA_TTS_CONFIG["voice"],
output_format=CARTESIA_TTS_CONFIG["output_format"],
)
# Buffer to accumulate partial chunks
buffer = b""
element_size = 4 # float32 is 4 bytes
# Stream audio chunks and convert to FastRTC format
for chunk in chunk_iter:
# Accumulate chunks in buffer
buffer += chunk
# Process complete float32 samples
num_complete_samples = len(buffer) // element_size
if num_complete_samples > 0:
# Extract complete samples
complete_bytes = num_complete_samples * element_size
complete_buffer = buffer[:complete_bytes]
buffer = buffer[complete_bytes:] # Keep remainder for next iteration
# Convert to numpy array
audio_array = np.frombuffer(complete_buffer, dtype=np.float32)
chunk_count += 1
# Yield in FastRTC format: (sample_rate, audio_array)
yield (CARTESIA_TTS_CONFIG["output_format"]["sample_rate"], audio_array)
# Process any remaining bytes in buffer
if len(buffer) > 0:
# Pad to complete sample if needed
remainder = len(buffer) % element_size
if remainder != 0:
buffer += b'\x00' * (element_size - remainder)
if len(buffer) >= element_size:
audio_array = np.frombuffer(buffer, dtype=np.float32)
chunk_count += 1
yield (CARTESIA_TTS_CONFIG["output_format"]["sample_rate"], audio_array)
tts_time = time.time() - tts_start
total_time = time.time() - start_time
logger.info(f'β‘ Performance: STT={stt_time:.2f}s | LLM={llm_time:.2f}s | TTS={tts_time:.2f}s | Total={total_time:.2f}s | Chunks={chunk_count}')
except Exception as e:
logger.error(f"Error in Cartesia TTS generation: {str(e)}")
raise
def create_stream() -> Stream:
"""
Create and configure a Stream instance with audio capabilities.
Optimized for low latency with RTC configuration for cloud deployment.
Returns:
Stream: Configured FastRTC Stream instance
"""
# RTC Configuration for Hugging Face Spaces deployment
rtc_config = {
"iceServers": [
{
"urls": ["stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"]
}
]
}
return Stream(
modality="audio",
mode="send-receive",
handler=ReplyOnPause(
response,
algo_options=AlgoOptions(
speech_threshold=0.4, # Slightly lower for faster detection
),
),
rtc_configuration=rtc_config, # Required for Hugging Face Spaces
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="FastRTC Cartesia Voice Agent (Ultra-Low Latency)")
parser.add_argument(
"--phone",
action="store_true",
help="Launch with FastRTC phone interface (get a temp phone number)",
)
args = parser.parse_args()
stream = create_stream()
logger.info("π§ Stream handler configured")
if args.phone:
logger.info("π Launching with FastRTC phone interface...")
stream.fastphone()
else:
logger.info("π Launching with Gradio UI...")
# Configure for both local and Hugging Face Spaces deployment
stream.ui.launch(
server_name="0.0.0.0", # Bind to all interfaces for cloud deployment
server_port=int(os.getenv("PORT", 7860)), # Use PORT env var if available, default to 7860
) |