Spaces:
Runtime error
Runtime error
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| import ast | |
| import json | |
| import logging | |
| import os | |
| import uuid | |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union | |
| if TYPE_CHECKING: | |
| from cohere.types import ChatMessageV2, ChatResponse | |
| from camel.configs import COHERE_API_PARAMS, CohereConfig | |
| from camel.messages import OpenAIMessage | |
| from camel.models import BaseModelBackend | |
| from camel.types import ChatCompletion, ModelType | |
| from camel.utils import ( | |
| BaseTokenCounter, | |
| OpenAITokenCounter, | |
| api_keys_required, | |
| ) | |
| try: | |
| if os.getenv("AGENTOPS_API_KEY") is not None: | |
| from agentops import LLMEvent, record | |
| else: | |
| raise ImportError | |
| except (ImportError, AttributeError): | |
| LLMEvent = None | |
| class CohereModel(BaseModelBackend): | |
| r"""Cohere API in a unified BaseModelBackend interface.""" | |
| def __init__( | |
| self, | |
| model_type: Union[ModelType, str], | |
| model_config_dict: Optional[Dict[str, Any]] = None, | |
| api_key: Optional[str] = None, | |
| url: Optional[str] = None, | |
| token_counter: Optional[BaseTokenCounter] = None, | |
| ): | |
| import cohere | |
| if model_config_dict is None: | |
| model_config_dict = CohereConfig().as_dict() | |
| api_key = api_key or os.environ.get("COHERE_API_KEY") | |
| url = url or os.environ.get("COHERE_API_BASE_URL") | |
| super().__init__( | |
| model_type, model_config_dict, api_key, url, token_counter | |
| ) | |
| self._client = cohere.ClientV2(api_key=self._api_key) | |
| def _to_openai_response(self, response: 'ChatResponse') -> ChatCompletion: | |
| if response.usage and response.usage.tokens: | |
| input_tokens = response.usage.tokens.input_tokens or 0 | |
| output_tokens = response.usage.tokens.output_tokens or 0 | |
| usage = { | |
| "prompt_tokens": input_tokens, | |
| "completion_tokens": output_tokens, | |
| "total_tokens": input_tokens + output_tokens, | |
| } | |
| else: | |
| usage = {} | |
| tool_calls = response.message.tool_calls | |
| choices = [] | |
| if tool_calls: | |
| for tool_call in tool_calls: | |
| openai_tool_calls = [ | |
| dict( | |
| id=tool_call.id, | |
| function={ | |
| "name": tool_call.function.name, | |
| "arguments": tool_call.function.arguments, | |
| } | |
| if tool_call.function | |
| else {}, | |
| type=tool_call.type, | |
| ) | |
| ] | |
| choice = dict( | |
| index=None, | |
| message={ | |
| "role": "assistant", | |
| "content": response.message.tool_plan, | |
| "tool_calls": openai_tool_calls, | |
| }, | |
| finish_reason=response.finish_reason | |
| if response.finish_reason | |
| else None, | |
| ) | |
| choices.append(choice) | |
| else: | |
| openai_tool_calls = None | |
| choice = dict( | |
| index=None, | |
| message={ | |
| "role": "assistant", | |
| "content": response.message.content[0].text, # type: ignore[union-attr,index] | |
| "tool_calls": openai_tool_calls, | |
| }, | |
| finish_reason=response.finish_reason | |
| if response.finish_reason | |
| else None, | |
| ) | |
| choices.append(choice) | |
| obj = ChatCompletion.construct( | |
| id=response.id, | |
| choices=choices, | |
| created=None, | |
| model=self.model_type, | |
| object="chat.completion", | |
| usage=usage, | |
| ) | |
| return obj | |
| def _to_cohere_chatmessage( | |
| self, messages: List[OpenAIMessage] | |
| ) -> List["ChatMessageV2"]: | |
| from cohere.types import ToolCallV2Function | |
| from cohere.types.chat_message_v2 import ( | |
| AssistantChatMessageV2, | |
| SystemChatMessageV2, | |
| ToolCallV2, | |
| ToolChatMessageV2, | |
| UserChatMessageV2, | |
| ) | |
| tool_call_id = None | |
| new_messages = [] | |
| for msg in messages: | |
| role = msg.get("role") | |
| content = msg.get("content") | |
| function_call = msg.get("function_call") | |
| if role == "user": | |
| new_message = UserChatMessageV2(role="user", content=content) # type: ignore[arg-type] | |
| elif role in {"tool", "function"}: | |
| new_message = ToolChatMessageV2( | |
| role="tool", | |
| tool_call_id=tool_call_id, # type: ignore[arg-type] | |
| content=content, # type: ignore[assignment,arg-type] | |
| ) | |
| elif role == "assistant": | |
| if not function_call: | |
| new_message = AssistantChatMessageV2( # type: ignore[assignment] | |
| role="assistant", | |
| content=content, # type: ignore[arg-type] | |
| ) | |
| else: | |
| arguments = function_call.get("arguments") # type: ignore[attr-defined] | |
| arguments_dict = ast.literal_eval(arguments) | |
| arguments_json = json.dumps(arguments_dict) | |
| assis_tool_call_id = str(uuid.uuid4()) | |
| tool_call_id = assis_tool_call_id | |
| new_message = AssistantChatMessageV2( # type: ignore[assignment] | |
| role="assistant", | |
| tool_calls=[ | |
| ToolCallV2( | |
| id=assis_tool_call_id, | |
| type="function", | |
| function=ToolCallV2Function( | |
| name=function_call.get("name"), # type: ignore[attr-defined] | |
| arguments=arguments_json, # type: ignore[attr-defined] | |
| ), | |
| ) | |
| ], | |
| content=content, # type: ignore[arg-type] | |
| ) | |
| elif role == "system": | |
| new_message = SystemChatMessageV2( # type: ignore[assignment] | |
| role="system", | |
| content=content, # type: ignore[arg-type] | |
| ) | |
| else: | |
| raise ValueError(f"Unsupported message role: {role}") | |
| new_messages.append(new_message) | |
| return new_messages # type: ignore[return-value] | |
| def token_counter(self) -> BaseTokenCounter: | |
| r"""Initialize the token counter for the model backend. | |
| Returns: | |
| BaseTokenCounter: The token counter following the model's | |
| tokenization style. | |
| """ | |
| if not self._token_counter: | |
| self._token_counter = OpenAITokenCounter( | |
| model=ModelType.GPT_4O_MINI | |
| ) | |
| return self._token_counter | |
| def run(self, messages: List[OpenAIMessage]) -> ChatCompletion: | |
| r"""Runs inference of Cohere chat completion. | |
| Args: | |
| messages (List[OpenAIMessage]): Message list with the chat history | |
| in OpenAI API format. | |
| Returns: | |
| ChatCompletion. | |
| """ | |
| from cohere.core.api_error import ApiError | |
| cohere_messages = self._to_cohere_chatmessage(messages) | |
| # Removing 'strict': True from the dictionary for | |
| # cohere client | |
| if self.model_config_dict.get('tools') is not None: | |
| for tool in self.model_config_dict.get('tools', []): | |
| function_dict = tool.get('function', {}) | |
| if 'strict' in function_dict: | |
| del function_dict['strict'] | |
| try: | |
| response = self._client.chat( | |
| messages=cohere_messages, | |
| model=self.model_type, | |
| **self.model_config_dict, | |
| ) | |
| except ApiError as e: | |
| logging.error(f"Cohere API Error: {e.status_code}") | |
| logging.error(f"Error body: {e.body}") | |
| raise | |
| except Exception as e: | |
| logging.error(f"Unexpected error when calling Cohere API: {e!s}") | |
| raise | |
| openai_response = self._to_openai_response(response) | |
| # Add AgentOps LLM Event tracking | |
| if LLMEvent: | |
| llm_event = LLMEvent( | |
| thread_id=openai_response.id, | |
| prompt=" ".join( | |
| [message.get("content") for message in messages] # type: ignore[misc] | |
| ), | |
| prompt_tokens=openai_response.usage.prompt_tokens, # type: ignore[union-attr] | |
| completion=openai_response.choices[0].message.content, | |
| completion_tokens=openai_response.usage.completion_tokens, # type: ignore[union-attr] | |
| model=self.model_type, | |
| ) | |
| record(llm_event) | |
| return openai_response | |
| def check_model_config(self): | |
| r"""Check whether the model configuration contains any unexpected | |
| arguments to Cohere API. | |
| Raises: | |
| ValueError: If the model configuration dictionary contains any | |
| unexpected arguments to Cohere API. | |
| """ | |
| for param in self.model_config_dict: | |
| if param not in COHERE_API_PARAMS: | |
| raise ValueError( | |
| f"Unexpected argument `{param}` is " | |
| "input into Cohere model backend." | |
| ) | |
| def stream(self) -> bool: | |
| r"""Returns whether the model is in stream mode, which sends partial | |
| results each time. Current it's not supported. | |
| Returns: | |
| bool: Whether the model is in stream mode. | |
| """ | |
| return False | |