Spaces:
Runtime error
Runtime error
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| import os | |
| from typing import Any, Dict, List, Optional, Union | |
| import httpx | |
| timeout = httpx.Timeout( | |
| connect=5000, # max time to establish TCP connection | |
| write=5000, # max time per chunk sent | |
| read=5000, # max time per chunk received | |
| pool=5000 # max time to get a connection from the pool | |
| ) | |
| from openai import OpenAI, Stream | |
| from camel.configs import QWEN_API_PARAMS, QwenConfig | |
| from camel.messages import OpenAIMessage | |
| from camel.models import BaseModelBackend | |
| from camel.types import ( | |
| ChatCompletion, | |
| ChatCompletionChunk, | |
| ModelType, | |
| ) | |
| from camel.utils import ( | |
| BaseTokenCounter, | |
| OpenAITokenCounter, | |
| api_keys_required, | |
| ) | |
| class QwenModel(BaseModelBackend): | |
| r"""Qwen API in a unified BaseModelBackend interface. | |
| Args: | |
| model_type (Union[ModelType, str]): Model for which a backend is | |
| created, one of Qwen series. | |
| model_config_dict (Optional[Dict[str, Any]], optional): A dictionary | |
| that will be fed into:obj:`openai.ChatCompletion.create()`. If | |
| :obj:`None`, :obj:`QwenConfig().as_dict()` will be used. | |
| (default: :obj:`None`) | |
| api_key (Optional[str], optional): The API key for authenticating with | |
| the Qwen service. (default: :obj:`None`) | |
| url (Optional[str], optional): The url to the Qwen service. | |
| (default: :obj:`https://dashscope.aliyuncs.com/compatible-mode/v1`) | |
| token_counter (Optional[BaseTokenCounter], optional): Token counter to | |
| use for the model. If not provided, :obj:`OpenAITokenCounter( | |
| ModelType.GPT_4O_MINI)` will be used. | |
| (default: :obj:`None`) | |
| """ | |
| def __init__( | |
| self, | |
| model_type: Union[ModelType, str], | |
| model_config_dict: Optional[Dict[str, Any]] = None, | |
| api_key: Optional[str] = None, | |
| url: Optional[str] = None, | |
| token_counter: Optional[BaseTokenCounter] = None, | |
| ) -> None: | |
| if model_config_dict is None: | |
| model_config_dict = QwenConfig().as_dict() | |
| api_key = api_key or os.environ.get("QWEN_API_KEY") | |
| url = url or os.environ.get( | |
| "QWEN_API_BASE_URL", | |
| "https://dashscope.aliyuncs.com/compatible-mode/v1", | |
| ) | |
| super().__init__( | |
| model_type, model_config_dict, api_key, url, token_counter | |
| ) | |
| self._client = OpenAI( | |
| timeout=httpx.Timeout( | |
| connect=5000, # DNS + TCP + TLS | |
| read=5000, # waiting for the model | |
| write=5000, | |
| pool=5000 # letting you push up to ~512 MB slowly | |
| ), | |
| max_retries=3, | |
| api_key=self._api_key, | |
| base_url=self._url, | |
| ) | |
| def run( | |
| self, | |
| messages: List[OpenAIMessage], | |
| ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| r"""Runs inference of Qwen chat completion. | |
| Args: | |
| messages (List[OpenAIMessage]): Message list with the chat history | |
| in OpenAI API format. | |
| Returns: | |
| Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| `ChatCompletion` in the non-stream mode, or | |
| `Stream[ChatCompletionChunk]` in the stream mode. | |
| """ | |
| response = self._client.chat.completions.create( | |
| messages=messages, | |
| model=self.model_type, | |
| **self.model_config_dict, | |
| ) | |
| return response | |
| def token_counter(self) -> BaseTokenCounter: | |
| r"""Initialize the token counter for the model backend. | |
| Returns: | |
| OpenAITokenCounter: The token counter following the model's | |
| tokenization style. | |
| """ | |
| if not self._token_counter: | |
| self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) | |
| return self._token_counter | |
| def check_model_config(self): | |
| r"""Check whether the model configuration contains any | |
| unexpected arguments to Qwen API. | |
| Raises: | |
| ValueError: If the model configuration dictionary contains any | |
| unexpected arguments to Qwen API. | |
| """ | |
| for param in self.model_config_dict: | |
| if param not in QWEN_API_PARAMS: | |
| raise ValueError( | |
| f"Unexpected argument `{param}` is " | |
| "input into Qwen model backend." | |
| ) | |
| def stream(self) -> bool: | |
| r"""Returns whether the model is in stream mode, which sends partial | |
| results each time. | |
| Returns: | |
| bool: Whether the model is in stream mode. | |
| """ | |
| return self.model_config_dict.get('stream', False) | |
| class DeepInfraQwenModel(BaseModelBackend): | |
| r"""Qwen API in a unified BaseModelBackend interface. | |
| Args: | |
| model_type (Union[ModelType, str]): Model for which a backend is | |
| created, one of Qwen series. | |
| model_config_dict (Optional[Dict[str, Any]], optional): A dictionary | |
| that will be fed into:obj:`openai.ChatCompletion.create()`. If | |
| :obj:`None`, :obj:`QwenConfig().as_dict()` will be used. | |
| (default: :obj:`None`) | |
| api_key (Optional[str], optional): The API key for authenticating with | |
| the Qwen service. (default: :obj:`None`) | |
| url (Optional[str], optional): The url to the Qwen service. | |
| (default: :obj:`https://dashscope.aliyuncs.com/compatible-mode/v1`) | |
| token_counter (Optional[BaseTokenCounter], optional): Token counter to | |
| use for the model. If not provided, :obj:`OpenAITokenCounter( | |
| ModelType.GPT_4O_MINI)` will be used. | |
| (default: :obj:`None`) | |
| """ | |
| def __init__( | |
| self, | |
| model_type: Union[ModelType, str], | |
| model_config_dict: Optional[Dict[str, Any]] = None, | |
| api_key: Optional[str] = None, | |
| url: Optional[str] = None, | |
| token_counter: Optional[BaseTokenCounter] = None, | |
| ) -> None: | |
| if model_config_dict is None: | |
| model_config_dict = QwenConfig().as_dict() | |
| api_key = api_key or os.environ.get("DEEPINFRA_API_KEY") | |
| url = url or os.environ.get( | |
| "QWEN_API_BASE_URL", | |
| "https://api.deepinfra.com/v1/openai", | |
| ) | |
| super().__init__( | |
| model_type, model_config_dict, api_key, url, token_counter | |
| ) | |
| self._client = OpenAI( | |
| timeout=180, | |
| max_retries=3, | |
| api_key=self._api_key, | |
| base_url=self._url, | |
| ) | |
| def run( | |
| self, | |
| messages: List[OpenAIMessage], | |
| ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| r"""Runs inference of Qwen chat completion. | |
| Args: | |
| messages (List[OpenAIMessage]): Message list with the chat history | |
| in OpenAI API format. | |
| Returns: | |
| Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| `ChatCompletion` in the non-stream mode, or | |
| `Stream[ChatCompletionChunk]` in the stream mode. | |
| """ | |
| response = self._client.chat.completions.create( | |
| messages=messages, | |
| model=self.model_type, | |
| **self.model_config_dict, | |
| ) | |
| return response | |
| def token_counter(self) -> BaseTokenCounter: | |
| r"""Initialize the token counter for the model backend. | |
| Returns: | |
| OpenAITokenCounter: The token counter following the model's | |
| tokenization style. | |
| """ | |
| if not self._token_counter: | |
| self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) | |
| return self._token_counter | |
| def check_model_config(self): | |
| r"""Check whether the model configuration contains any | |
| unexpected arguments to Qwen API. | |
| Raises: | |
| ValueError: If the model configuration dictionary contains any | |
| unexpected arguments to Qwen API. | |
| """ | |
| for param in self.model_config_dict: | |
| if param not in QWEN_API_PARAMS: | |
| raise ValueError( | |
| f"Unexpected argument `{param}` is " | |
| "input into Qwen model backend." | |
| ) | |
| def stream(self) -> bool: | |
| r"""Returns whether the model is in stream mode, which sends partial | |
| results each time. | |
| Returns: | |
| bool: Whether the model is in stream mode. | |
| """ | |
| return self.model_config_dict.get('stream', False) | |
| class DeepInfraPhi4Model(BaseModelBackend): | |
| def __init__( | |
| self, | |
| model_type: Union[ModelType, str], | |
| model_config_dict: Optional[Dict[str, Any]] = None, | |
| api_key: Optional[str] = None, | |
| url: Optional[str] = None, | |
| token_counter: Optional[BaseTokenCounter] = None, | |
| ) -> None: | |
| if model_config_dict is None: | |
| model_config_dict = QwenConfig().as_dict() | |
| api_key = api_key or os.environ.get("DEEPINFRA_API_KEY") | |
| url = url or os.environ.get( | |
| "PHI4_API_BASE_URL", | |
| "https://api.deepinfra.com/v1/openai", | |
| ) | |
| super().__init__( | |
| model_type, model_config_dict, api_key, url, token_counter | |
| ) | |
| self._client = OpenAI( | |
| timeout=5000, | |
| max_retries=3, | |
| api_key=self._api_key, | |
| base_url=self._url, | |
| ) | |
| def run( | |
| self, | |
| messages: List[OpenAIMessage], | |
| ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| r"""Runs inference of Qwen chat completion. | |
| Args: | |
| messages (List[OpenAIMessage]): Message list with the chat history | |
| in OpenAI API format. | |
| Returns: | |
| Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| `ChatCompletion` in the non-stream mode, or | |
| `Stream[ChatCompletionChunk]` in the stream mode. | |
| """ | |
| response = self._client.chat.completions.create( | |
| messages=messages, | |
| model=self.model_type, | |
| **self.model_config_dict, | |
| ) | |
| return response | |
| def token_counter(self) -> BaseTokenCounter: | |
| r"""Initialize the token counter for the model backend. | |
| Returns: | |
| OpenAITokenCounter: The token counter following the model's | |
| tokenization style. | |
| """ | |
| if not self._token_counter: | |
| self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) | |
| return self._token_counter | |
| def check_model_config(self): | |
| r"""Check whether the model configuration contains any | |
| unexpected arguments to Qwen API. | |
| Raises: | |
| ValueError: If the model configuration dictionary contains any | |
| unexpected arguments to Qwen API. | |
| """ | |
| for param in self.model_config_dict: | |
| if param not in QWEN_API_PARAMS: | |
| raise ValueError( | |
| f"Unexpected argument `{param}` is " | |
| "input into Qwen model backend." | |
| ) | |
| def stream(self) -> bool: | |
| r"""Returns whether the model is in stream mode, which sends partial | |
| results each time. | |
| Returns: | |
| bool: Whether the model is in stream mode. | |
| """ | |
| return self.model_config_dict.get('stream', False) | |
| # class DeepInfraGeminiModel(BaseModelBackend): | |
| # @api_keys_required( | |
| # [ | |
| # ("api_key", "DEEPINFRA_API_KEY"), | |
| # ] | |
| # ) | |
| # def __init__( | |
| # self, | |
| # model_type: Union[ModelType, str], | |
| # model_config_dict: Optional[Dict[str, Any]] = None, | |
| # api_key: Optional[str] = None, | |
| # url: Optional[str] = None, | |
| # token_counter: Optional[BaseTokenCounter] = None, | |
| # ) -> None: | |
| # if model_config_dict is None: | |
| # model_config_dict = QwenConfig().as_dict() | |
| # api_key = api_key or os.environ.get("DEEPINFRA_API_KEY") | |
| # url = url or os.environ.get( | |
| # "GEMINI_API_BASE_URL", | |
| # "https://api.deepinfra.com/v1/openai", | |
| # ) | |
| # super().__init__( | |
| # model_type, model_config_dict, api_key, url, token_counter | |
| # ) | |
| # self._client = OpenAI( | |
| # timeout=5000, | |
| # max_retries=3, | |
| # api_key=self._api_key, | |
| # base_url=self._url, | |
| # ) | |
| # def run( | |
| # self, | |
| # messages: List[OpenAIMessage], | |
| # ) -> Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| # r"""Runs inference of Qwen chat completion. | |
| # Args: | |
| # messages (List[OpenAIMessage]): Message list with the chat history | |
| # in OpenAI API format. | |
| # Returns: | |
| # Union[ChatCompletion, Stream[ChatCompletionChunk]]: | |
| # `ChatCompletion` in the non-stream mode, or | |
| # `Stream[ChatCompletionChunk]` in the stream mode. | |
| # """ | |
| # response = self._client.chat.completions.create( | |
| # messages=messages, | |
| # model=self.model_type, | |
| # **self.model_config_dict, | |
| # ) | |
| # return response | |
| # @property | |
| # def token_counter(self) -> BaseTokenCounter: | |
| # r"""Initialize the token counter for the model backend. | |
| # Returns: | |
| # OpenAITokenCounter: The token counter following the model's | |
| # tokenization style. | |
| # """ | |
| # if not self._token_counter: | |
| # self._token_counter = OpenAITokenCounter(ModelType.GPT_4O_MINI) | |
| # return self._token_counter | |
| # def check_model_config(self): | |
| # r"""Check whether the model configuration contains any | |
| # unexpected arguments to Qwen API. | |
| # Raises: | |
| # ValueError: If the model configuration dictionary contains any | |
| # unexpected arguments to Qwen API. | |
| # """ | |
| # for param in self.model_config_dict: | |
| # if param not in QWEN_API_PARAMS: | |
| # raise ValueError( | |
| # f"Unexpected argument `{param}` is " | |
| # "input into Qwen model backend." | |
| # ) | |
| # @property | |
| # def stream(self) -> bool: | |
| # r"""Returns whether the model is in stream mode, which sends partial | |
| # results each time. | |
| # Returns: | |
| # bool: Whether the model is in stream mode. | |
| # """ | |
| # return self.model_config_dict.get('stream', False) | |