Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import string | |
| import subprocess | |
| import requests | |
| from datetime import datetime | |
| from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context | |
| from bs4 import BeautifulSoup | |
| import markdown | |
| import threading | |
| from queue import Queue | |
| import time | |
| import json | |
| app = Flask(__name__) | |
| # Define directories | |
| file_folder = os.path.dirname(os.path.abspath(__file__)) | |
| temp_audio_folder = os.path.join(file_folder, 'temp_audio') | |
| model_folder = None | |
| piper_binary_path = os.path.join(file_folder, 'piper') | |
| # Create necessary directories | |
| os.makedirs(temp_audio_folder, exist_ok=True) | |
| # Check default user folder | |
| default_user_folder = "./" | |
| if os.path.exists(default_user_folder) and any(f.endswith('.onnx') for f in os.listdir(default_user_folder)): | |
| model_folder = default_user_folder | |
| # Global settings | |
| DEFAULT_BASE_HOST = "http://localhost:11434" | |
| SETTINGS = { | |
| 'speaker': 0, | |
| 'noise_scale': 0.667, | |
| 'length_scale': 1.0, | |
| 'noise_w': 0.8, | |
| 'sentence_silence': 0.2 | |
| } | |
| def get_available_models(): | |
| if not model_folder: | |
| return [] | |
| return [os.path.splitext(model)[0] for model in os.listdir(model_folder) if model.endswith('.onnx')] | |
| def get_ollama_models(base_host=DEFAULT_BASE_HOST): | |
| try: | |
| response = requests.get(f"{base_host}/api/tags") | |
| if response.status_code == 200: | |
| return [model['name'] for model in response.json().get('models', [])] | |
| return [] | |
| except: | |
| return [] | |
| def remove_markdown(text): | |
| html_content = markdown.markdown(text) | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| return soup.get_text().strip() | |
| def convert_to_speech(text, model_name, remove_md=False): | |
| if model_name not in get_available_models(): | |
| return None | |
| if remove_md: | |
| text = remove_markdown(text) | |
| random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + '.wav' | |
| output_file = os.path.join(temp_audio_folder, random_name) | |
| # Clean old audio files | |
| for file in os.listdir(temp_audio_folder): | |
| if file.endswith('.wav'): | |
| os.remove(os.path.join(temp_audio_folder, file)) | |
| model_path = os.path.join(model_folder, model_name + '.onnx') | |
| try: | |
| # Use the text directly in the command | |
| command = ( | |
| f'"{piper_binary_path}" -m "{model_path}" -f "{output_file}" ' | |
| f'--speaker {SETTINGS["speaker"]} --noise_scale {SETTINGS["noise_scale"]} ' | |
| f'--length_scale {SETTINGS["length_scale"]} --noise_w {SETTINGS["noise_w"]} ' | |
| f'--sentence_silence {SETTINGS["sentence_silence"]}' | |
| ) | |
| # Pass the text as input to the command | |
| result = subprocess.run(command, input=text.encode('utf-8'), shell=True, check=True) | |
| if os.path.exists(output_file): | |
| return output_file | |
| except Exception as e: | |
| print(f"Error during text-to-speech conversion: {e}") | |
| return None | |
| def set_default_models(): | |
| tts_models = get_available_models() | |
| ollama_models = get_ollama_models() | |
| default_tts_model = "RecomendacionesConMiau" if "RecomendacionesConMiau" in tts_models else None | |
| default_ollama_model = "llama3.2:1b" if "llama3.2:1b" in ollama_models else None | |
| return default_tts_model, default_ollama_model | |
| def index(): | |
| tts_models = get_available_models() | |
| default_tts_model, default_ollama_model = set_default_models() | |
| return render_template('index.html', tts_models=tts_models, default_tts_model=default_tts_model, default_ollama_model=default_ollama_model) | |
| def list_ollama_models(): | |
| base_host = request.args.get('base_host', DEFAULT_BASE_HOST) | |
| return jsonify(models=get_ollama_models(base_host)) | |
| def chat(): | |
| data = request.json | |
| base_host = data.get('base_host', DEFAULT_BASE_HOST) | |
| model = data.get('model') | |
| messages = data.get('messages', []) | |
| def generate(): | |
| queue = Queue() | |
| thread = threading.Thread( | |
| target=stream_ollama_response, | |
| args=(base_host, model, messages, queue) | |
| ) | |
| thread.start() | |
| complete_response = "" | |
| while True: | |
| msg_type, content = queue.get() | |
| if msg_type == "error": | |
| yield f"data: {json.dumps({'error': content})}\n\n" | |
| break | |
| elif msg_type == "chunk": | |
| complete_response = content | |
| yield f"data: {json.dumps({'chunk': content})}\n\n" | |
| elif msg_type == "done": | |
| yield f"data: {json.dumps({'done': complete_response})}\n\n" | |
| break | |
| return Response(stream_with_context(generate()), mimetype='text/event-stream') | |
| def stream_ollama_response(base_host, model, messages, queue): | |
| url = f"{base_host}/api/chat" | |
| data = { | |
| "model": model, | |
| "messages": messages, | |
| "stream": True | |
| } | |
| try: | |
| with requests.post(url, json=data, stream=True) as response: | |
| if response.status_code == 200: | |
| complete_response = "" | |
| for line in response.iter_lines(): | |
| if line: | |
| try: | |
| json_response = json.loads(line) | |
| chunk = json_response.get("message", {}).get("content", "") | |
| if chunk: | |
| complete_response += chunk | |
| queue.put(("chunk", complete_response)) | |
| except json.JSONDecodeError: | |
| continue | |
| queue.put(("done", complete_response)) | |
| else: | |
| queue.put(("error", f"Error: {response.status_code}")) | |
| except Exception as e: | |
| queue.put(("error", f"Error: {str(e)}")) | |
| def text_to_speech(): | |
| data = request.json | |
| text = data.get('text', '') | |
| model = data.get('model') | |
| remove_md = data.get('remove_markdown', False) | |
| if not text or not model: | |
| return jsonify(error="Missing text or model"), 400 | |
| audio_file = convert_to_speech(text, model, remove_md) | |
| if not audio_file: | |
| return jsonify(error="Failed to convert text to speech"), 500 | |
| return jsonify(audio_file=os.path.basename(audio_file)) | |
| def serve_audio(filename): | |
| return send_file(os.path.join(temp_audio_folder, filename)) | |
| if __name__ == '__main__': | |
| app.run(debug=True, port=7860, host='0.0.0.0') | |