Spaces:
Runtime error
Runtime error
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| import os | |
| from typing import Any, List, Optional, Union | |
| from openai import OpenAI, _legacy_response | |
| from camel.types import AudioModelType, VoiceType | |
| class OpenAIAudioModels: | |
| r"""Provides access to OpenAI's Text-to-Speech (TTS) and Speech_to_Text | |
| (STT) models.""" | |
| def __init__( | |
| self, | |
| api_key: Optional[str] = None, | |
| url: Optional[str] = None, | |
| ) -> None: | |
| r"""Initialize an instance of OpenAI.""" | |
| self._url = url or os.environ.get("OPENAI_API_BASE_URL") | |
| self._api_key = api_key or os.environ.get("OPENAI_API_KEY") | |
| self._client = OpenAI( | |
| timeout=120, | |
| max_retries=3, | |
| base_url=self._url, | |
| api_key=self._api_key, | |
| ) | |
| def text_to_speech( | |
| self, | |
| input: str, | |
| model_type: AudioModelType = AudioModelType.TTS_1, | |
| voice: VoiceType = VoiceType.ALLOY, | |
| storage_path: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Union[ | |
| List[_legacy_response.HttpxBinaryResponseContent], | |
| _legacy_response.HttpxBinaryResponseContent, | |
| ]: | |
| r"""Convert text to speech using OpenAI's TTS model. This method | |
| converts the given input text to speech using the specified model and | |
| voice. | |
| Args: | |
| input (str): The text to be converted to speech. | |
| model_type (AudioModelType, optional): The TTS model to use. | |
| Defaults to `AudioModelType.TTS_1`. | |
| voice (VoiceType, optional): The voice to be used for generating | |
| speech. Defaults to `VoiceType.ALLOY`. | |
| storage_path (str, optional): The local path to store the | |
| generated speech file if provided, defaults to `None`. | |
| **kwargs (Any): Extra kwargs passed to the TTS API. | |
| Returns: | |
| Union[List[_legacy_response.HttpxBinaryResponseContent], | |
| _legacy_response.HttpxBinaryResponseContent]: List of response | |
| content object from OpenAI if input charaters more than 4096, | |
| single response content if input charaters less than 4096. | |
| Raises: | |
| Exception: If there's an error during the TTS API call. | |
| """ | |
| try: | |
| # Model only support at most 4096 characters one time. | |
| max_chunk_size = 4095 | |
| audio_chunks = [] | |
| chunk_index = 0 | |
| if len(input) > max_chunk_size: | |
| while input: | |
| if len(input) <= max_chunk_size: | |
| chunk = input | |
| input = '' | |
| else: | |
| # Find the nearest period before the chunk size limit | |
| while input[max_chunk_size - 1] != '.': | |
| max_chunk_size -= 1 | |
| chunk = input[:max_chunk_size] | |
| input = input[max_chunk_size:].lstrip() | |
| response = self._client.audio.speech.create( | |
| model=model_type.value, | |
| voice=voice.value, | |
| input=chunk, | |
| **kwargs, | |
| ) | |
| if storage_path: | |
| try: | |
| # Create a new storage path for each chunk | |
| file_name, file_extension = os.path.splitext( | |
| storage_path | |
| ) | |
| new_storage_path = ( | |
| f"{file_name}_{chunk_index}{file_extension}" | |
| ) | |
| response.write_to_file(new_storage_path) | |
| chunk_index += 1 | |
| except Exception as e: | |
| raise Exception( | |
| "Error during writing the file" | |
| ) from e | |
| audio_chunks.append(response) | |
| return audio_chunks | |
| else: | |
| response = self._client.audio.speech.create( | |
| model=model_type.value, | |
| voice=voice.value, | |
| input=input, | |
| **kwargs, | |
| ) | |
| if storage_path: | |
| try: | |
| response.write_to_file(storage_path) | |
| except Exception as e: | |
| raise Exception("Error during write the file") from e | |
| return response | |
| except Exception as e: | |
| raise Exception("Error during TTS API call") from e | |
| def _split_audio( | |
| self, audio_file_path: str, chunk_size_mb: int = 24 | |
| ) -> list: | |
| r"""Split the audio file into smaller chunks. Since the Whisper API | |
| only supports files that are less than 25 MB. | |
| Args: | |
| audio_file_path (str): Path to the input audio file. | |
| chunk_size_mb (int, optional): Size of each chunk in megabytes. | |
| Defaults to `24`. | |
| Returns: | |
| list: List of paths to the split audio files. | |
| """ | |
| from pydub import AudioSegment | |
| audio = AudioSegment.from_file(audio_file_path) | |
| audio_format = os.path.splitext(audio_file_path)[1][1:].lower() | |
| # Calculate chunk size in bytes | |
| chunk_size_bytes = chunk_size_mb * 1024 * 1024 | |
| # Number of chunks needed | |
| num_chunks = os.path.getsize(audio_file_path) // chunk_size_bytes + 1 | |
| # Create a directory to store the chunks | |
| output_dir = os.path.splitext(audio_file_path)[0] + "_chunks" | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Get audio chunk len in milliseconds | |
| chunk_size_milliseconds = len(audio) // (num_chunks) | |
| # Split the audio into chunks | |
| split_files = [] | |
| for i in range(num_chunks): | |
| start = i * chunk_size_milliseconds | |
| end = (i + 1) * chunk_size_milliseconds | |
| if i + 1 == num_chunks: | |
| chunk = audio[start:] | |
| else: | |
| chunk = audio[start:end] | |
| # Create new chunk path | |
| chunk_path = os.path.join(output_dir, f"chunk_{i}.{audio_format}") | |
| chunk.export(chunk_path, format=audio_format) | |
| split_files.append(chunk_path) | |
| return split_files | |
| def speech_to_text( | |
| self, | |
| audio_file_path: str, | |
| translate_into_english: bool = False, | |
| **kwargs: Any, | |
| ) -> str: | |
| r"""Convert speech audio to text. | |
| Args: | |
| audio_file_path (str): The audio file path, supporting one of | |
| these formats: flac, mp3, mp4, mpeg, mpga, m4a, ogg, wav, or | |
| webm. | |
| translate_into_english (bool, optional): Whether to translate the | |
| speech into English. Defaults to `False`. | |
| **kwargs (Any): Extra keyword arguments passed to the | |
| Speech-to-Text (STT) API. | |
| Returns: | |
| str: The output text. | |
| Raises: | |
| ValueError: If the audio file format is not supported. | |
| Exception: If there's an error during the STT API call. | |
| """ | |
| supported_formats = [ | |
| "flac", | |
| "mp3", | |
| "mp4", | |
| "mpeg", | |
| "mpga", | |
| "m4a", | |
| "ogg", | |
| "wav", | |
| "webm", | |
| ] | |
| file_format = audio_file_path.split(".")[-1].lower() | |
| if file_format not in supported_formats: | |
| raise ValueError(f"Unsupported audio file format: {file_format}") | |
| try: | |
| if os.path.getsize(audio_file_path) > 24 * 1024 * 1024: | |
| # Split audio into chunks | |
| audio_chunks = self._split_audio(audio_file_path) | |
| texts = [] | |
| for chunk_path in audio_chunks: | |
| audio_data = open(chunk_path, "rb") | |
| if translate_into_english: | |
| translation = self._client.audio.translations.create( | |
| model="whisper-1", file=audio_data, **kwargs | |
| ) | |
| texts.append(translation.text) | |
| else: | |
| transcription = ( | |
| self._client.audio.transcriptions.create( | |
| model="whisper-1", file=audio_data, **kwargs | |
| ) | |
| ) | |
| texts.append(transcription.text) | |
| os.remove(chunk_path) # Delete temporary chunk file | |
| return " ".join(texts) | |
| else: | |
| # Process the entire audio file | |
| audio_data = open(audio_file_path, "rb") | |
| if translate_into_english: | |
| translation = self._client.audio.translations.create( | |
| model="whisper-1", file=audio_data, **kwargs | |
| ) | |
| return translation.text | |
| else: | |
| transcription = self._client.audio.transcriptions.create( | |
| model="whisper-1", file=audio_data, **kwargs | |
| ) | |
| return transcription.text | |
| except Exception as e: | |
| raise Exception("Error during STT API call") from e | |