Spaces:
Runtime error
Runtime error
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
| import atexit | |
| import json | |
| import logging | |
| import subprocess | |
| import time | |
| from functools import wraps | |
| from pathlib import Path | |
| from subprocess import Popen | |
| from typing import Any, Dict, List, Optional, Union | |
| import requests | |
| from pydantic import BaseModel | |
| from camel.runtime import BaseRuntime | |
| from camel.toolkits.function_tool import FunctionTool | |
| logger = logging.getLogger(__name__) | |
| class RemoteHttpRuntime(BaseRuntime): | |
| r"""A runtime that runs functions in a remote HTTP server. | |
| You need to run the API server in the remote server first. | |
| Args: | |
| host (str): The host of the remote server. | |
| port (int): The port of the remote server. (default: :obj: `8000`) | |
| python_exec (str): The python executable to run the API server. | |
| (default: :obj: `python3`) | |
| """ | |
| def __init__( | |
| self, host: str, port: int = 8000, python_exec: str = "python3" | |
| ): | |
| super().__init__() | |
| self.host = host | |
| self.port = port | |
| self.python_exec = python_exec | |
| self.api_path = Path(__file__).parent / "api.py" | |
| self.entrypoint: Dict[str, str] = dict() | |
| self.process: Optional[Popen] = None | |
| def build(self) -> "RemoteHttpRuntime": | |
| r"""Build the API server. | |
| Returns: | |
| RemoteHttpRuntime: The current runtime. | |
| """ | |
| self.process = subprocess.Popen( | |
| [ | |
| self.python_exec, | |
| str(self.api_path), | |
| *list(self.entrypoint.values()), | |
| ] | |
| ) | |
| atexit.register(self._cleanup) | |
| return self | |
| def _cleanup(self): | |
| r"""Clean up the API server when exiting.""" | |
| if self.process and self.process.poll() is None: | |
| self.process.terminate() | |
| self.process.wait() | |
| self.process = None | |
| def add( # type: ignore[override] | |
| self, | |
| funcs: Union[FunctionTool, List[FunctionTool]], | |
| entrypoint: str, | |
| redirect_stdout: bool = False, | |
| arguments: Optional[Dict[str, Any]] = None, | |
| ) -> "RemoteHttpRuntime": | |
| r"""Add a function or list of functions to the runtime. | |
| Args: | |
| funcs (Union[FunctionTool, List[FunctionTool]]): The function or | |
| list of functions to add. | |
| entrypoint (str): The entrypoint for the function. | |
| redirect_stdout (bool): Whether to return the stdout of | |
| the function. (default: :obj: `False`) | |
| arguments (Optional[Dict[str, Any]]): The arguments for the | |
| function. (default: :obj: `None`) | |
| Returns: | |
| RemoteHttpRuntime: The current runtime. | |
| """ | |
| if not isinstance(funcs, list): | |
| funcs = [funcs] | |
| if arguments is not None: | |
| entrypoint += json.dumps(arguments) | |
| for func in funcs: | |
| inner_func = func.func | |
| # Create a wrapper that explicitly binds `func` | |
| def wrapper( | |
| *args, func=func, redirect_stdout=redirect_stdout, **kwargs | |
| ): | |
| for key, value in kwargs.items(): | |
| if isinstance(value, BaseModel): | |
| kwargs[key] = value.model_dump() | |
| resp = requests.post( | |
| f"http://{self.host}:{self.port}/{func.get_function_name()}", | |
| json=dict( | |
| args=args, | |
| kwargs=kwargs, | |
| redirect_stdout=redirect_stdout, | |
| ), | |
| ) | |
| if resp.status_code != 200: | |
| logger.error( | |
| f"""ailed to execute function: | |
| {func.get_function_name()}, | |
| status code: {resp.status_code}, | |
| response: {resp.text}""" | |
| ) | |
| return { | |
| "error": f"""Failed to execute function: | |
| {func.get_function_name()}, | |
| response: {resp.text}""" | |
| } | |
| data = resp.json() | |
| if redirect_stdout: | |
| print(data["stdout"]) | |
| return json.loads(data["output"]) | |
| func.func = wrapper | |
| self.tools_map[func.get_function_name()] = func | |
| self.entrypoint[func.get_function_name()] = entrypoint | |
| return self | |
| def ok(self) -> bool: | |
| r"""Check if the API Server is running. | |
| Returns: | |
| bool: Whether the API Server is running. | |
| """ | |
| try: | |
| _ = requests.get(f"http://{self.host}:{self.port}") | |
| return True | |
| except requests.exceptions.ConnectionError: | |
| return False | |
| def wait(self, timeout: int = 10) -> bool: | |
| r"""Wait for the API Server to be ready. | |
| Args: | |
| timeout (int): The number of seconds to wait. (default: :obj: `10`) | |
| Returns: | |
| bool: Whether the API Server is ready. | |
| """ | |
| for _ in range(timeout): | |
| if self.ok: | |
| return True | |
| time.sleep(1) | |
| return False | |
| def __del__(self): | |
| r"""Clean up the API server when the object is deleted.""" | |
| self._cleanup() | |
| def stop(self) -> "RemoteHttpRuntime": | |
| r"""Stop the API server. | |
| Returns: | |
| RemoteHttpRuntime: The current runtime. | |
| """ | |
| self._cleanup() | |
| return self | |
| def reset(self) -> "RemoteHttpRuntime": | |
| r"""Reset the API server. | |
| Returns: | |
| RemoteHttpRuntime: The current runtime. | |
| """ | |
| return self.stop().build() | |
| def docs(self) -> str: | |
| r"""Get the URL for the API documentation. | |
| Returns: | |
| str: The URL for the API documentation. | |
| """ | |
| return f"http://{self.host}:{self.port}/docs" | |