Hivenet_ComputeAgent / Compute_MCP /api_data_structure.py
carraraig's picture
Token (#12)
5d5ee81 verified
import logging
from typing import List, Optional, Dict, Any, Union
from pydantic import BaseModel, Field
from typing import Any
from enum import Enum
from constant import Constants
import requests
# Enum for instance status
class InstanceStatus(Enum):
CREATED = 0
DEPLOYED = 1
STARTING = 2
RUNNING = 3
ERRORED = 4
TERMINATING = 5
TERMINATED = 6
STOPPING = 7
STOPPED = 8
class Timestamp(BaseModel):
seconds: int
nanos: int
# GPU model
class GPUInfo(BaseModel):
model: str
# Port info
class PortInfo(BaseModel):
protocol: str
container_port: int
node_port: int
class InstanceSpending(BaseModel):
instance_id: str
hourly_price: float
total_spend: float
# InstanceInfo for GET method
class InstanceInfo(BaseModel): # New fields added
id: Optional[str] = None
deployment_id: Optional[str] = None
name: Optional[str] = None
user_id: Optional[str] = None
container_image: Optional[str] = None
status: Optional[InstanceStatus] = None
status_string: Optional[str] = None
additional_info: Optional[str] = None
type: Optional[int] = None
created_at: Optional[Timestamp] = None
updated_at: Optional[Timestamp] = None
ready_at: Optional[Timestamp] = None
stopped_at: Optional[Timestamp] = None
cpu: Optional[int] = None
memory: Optional[int] = None
gpu: Optional[List[GPUInfo]] = None
disk: Optional[int] = None
bandwidth: Optional[int] = None
ssh_key_id: Optional[str] = None
location: Optional[str] = None
ports: Optional[Dict[str, PortInfo]] = None
hive_environment_variables: Optional[Dict[str, Any]] = None
environment_variables: Optional[Dict[str, Any]] = None
runtime: Optional[int] = None
spending: Optional[InstanceSpending] = None
def __init__(self, **data):
super().__init__(**data)
if self.status_string is None and isinstance(self.status, InstanceStatus):
self.status_string = self.status.name
# Spending info model
class InstanceSpending(BaseModel):
instance_id: str
hourly_price: float
total_spend: float
# Configuration mappings based on the UI tables
GPU_CONFIGS = {
"1x RTX 4090": {
"gpu": ["RTX 4090"],
"cpu": 8,
"memory": 48,
"disk": 250,
"bandwidth": 1000
},
"2x RTX 4090": {
"gpu": ["RTX 4090", "RTX 4090"],
"cpu": 16,
"memory": 96,
"disk": 500,
"bandwidth": 1000
},
"4x RTX 4090": {
"gpu": ["RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090"],
"cpu": 32,
"memory": 192,
"disk": 1000,
"bandwidth": 1000
},
"8x RTX 4090": {
"gpu": ["RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090",
"RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090"],
"cpu": 64,
"memory": 384,
"disk": 2000,
"bandwidth": 1000
},
"1x RTX 5090": {
"gpu": ["RTX 5090"],
"cpu": 8,
"memory": 48,
"disk": 250,
"bandwidth": 1000
},
"2x RTX 5090": {
"gpu": ["RTX 5090", "RTX 5090"],
"cpu": 16,
"memory": 96,
"disk": 500,
"bandwidth": 1000
},
"4x RTX 5090": {
"gpu": ["RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090"],
"cpu": 32,
"memory": 192,
"disk": 1000,
"bandwidth": 1000
},
"8x RTX 5090": {
"gpu": ["RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090",
"RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090"],
"cpu": 64,
"memory": 384,
"disk": 2000,
"bandwidth": 1000
}
}
VCPU_CONFIGS = {
"2vCPU": {
"gpu": [],
"cpu": 2,
"memory": 4,
"disk": 50,
"bandwidth": 250
},
"4vCPU": {
"gpu": [],
"cpu": 4,
"memory": 8,
"disk": 100,
"bandwidth": 250
},
"8vCPU": {
"gpu": [],
"cpu": 8,
"memory": 16,
"disk": 200,
"bandwidth": 500
},
"16vCPU": {
"gpu": [],
"cpu": 16,
"memory": 32,
"disk": 400,
"bandwidth": 1000
},
"32vCPU": {
"gpu": [],
"cpu": 32,
"memory": 64,
"disk": 800,
"bandwidth": 1000
}
}
# Location-GPU validation map (using API format - lowercase)
LOCATION_GPU_MAP = {
"france": ["RTX 4090"],
"uae": ["RTX 4090"],
"texas": ["RTX 5090"],
"uae-2": ["RTX 5090"]
}
class HiveComputeAPI:
"""
A wrapper class that provides methods to interact with the Hive Compute API.
"""
def __init__(self, base_url: str = Constants.HIVE_COMPUTE_BASE_API_URL, token: str = Constants.HIVE_COMPUTE_DEFAULT_API_TOKEN):
"""
Initializes the HiveComputeAPI handler.
Args:
base_url (str): The base URL of the Hive Compute API.
token (str): The authentication token for the Hive Compute API.
Note: The ModelRouter will automatically refresh the map of served models upon initialization.
"""
self.base_url = base_url.strip("/")
self.token = token
self.logger = logging.getLogger(__name__)
def __fetch_instance_structure(self, instance_json) -> InstanceInfo:
"""
Fetches the structure of an instance from the API.
Returns:
InstanceInfo: An InstanceInfo object representing the structure of an instance.
"""
# Ensure instance_json is a dict
if not isinstance(instance_json, dict):
return {}
# Convert only problematic fields
if "status" in instance_json and not isinstance(instance_json["status"], InstanceStatus):
try:
instance_json["status"] = InstanceStatus(instance_json["status"])
except Exception:
instance_json["status"] = InstanceStatus.CREATED
for field in ["created_at", "updated_at", "ready_at", "stopped_at"]:
value = instance_json.get(field)
if isinstance(value, dict):
instance_json[field] = Timestamp(**value)
else:
instance_json[field] = None
if "gpu" in instance_json:
instance_json["gpu"] = [GPUInfo(**gpu) for gpu in instance_json.get("gpu", []) if isinstance(gpu, dict)]
if "ports" in instance_json:
instance_json["ports"] = {k: PortInfo(**v) for k, v in instance_json.get("ports", {}).items() if isinstance(v, dict)}
return InstanceInfo(**instance_json)
def get_all_instances(self) -> List[InstanceInfo]:
"""
Fetches all compute instances for the authenticated user.
Returns:
List[InstanceInfo]: A list of InstanceInfo objects representing the user's compute instances.
"""
try:
response = requests.get(f"{self.base_url}/instances", headers={
"Authorization": f"Bearer {self.token}"
})
response.raise_for_status()
response_json = response.json()
spending_map = response_json.get("spending", {})
instances = []
for inst in response_json.get("instances", []):
inst_struct = self.__fetch_instance_structure(inst)
spend = spending_map.get(inst.get("id"))
if spend:
inst_struct.spending = InstanceSpending(**spend)
instances.append(InstanceInfo.model_validate(inst_struct))
return instances
except requests.RequestException as e:
self.logger.error(f"Failed to fetch instances: {e}")
return []
def create_instance(
self,
name: str = "default",
location: str = "uae", # Changed default to API format
config: str = "1x RTX 4090",
container_image: str = "Dockerfile.vulkan",
tcp_ports: Optional[List[int]] = None,
https_ports: Optional[List[int]] = None,
udp_ports: Optional[List[int]] = None,
launch_jupyter_notebook: bool = False,
instance_type: int = 0,
custom_config: Optional[Dict[str, Any]] = None
) -> Optional[Dict[str, Any]]:
"""
Creates a new compute instance using predefined configurations or custom settings.
Args:
name (str): Name of the instance. Defaults to "default".
location (str): Location where the instance will be deployed. Defaults to "uae".
Valid locations: france, uae, texas, uae-2
config (str): Predefined configuration. Options:
GPU configs: "1x RTX 4090", "2x RTX 4090", "4x RTX 4090", "8x RTX 4090",
"1x RTX 5090", "2x RTX 5090", "4x RTX 5090", "8x RTX 5090"
vCPU configs: "2vCPU", "4vCPU", "8vCPU", "16vCPU", "32vCPU"
Defaults to "1x RTX 4090".
container_image (str): Docker container image to use. Defaults to "Dockerfile.vulkan".
tcp_ports (List[int], optional): List of TCP ports to expose.
https_ports (List[int], optional): List of HTTPS ports to expose.
udp_ports (List[int], optional): List of UDP ports to expose.
launch_jupyter_notebook (bool): Whether to launch Jupyter notebook. Defaults to False.
instance_type (int): Type of instance. Defaults to 0.
custom_config (Dict[str, Any], optional): Custom configuration to override defaults.
Keys: cpu, memory, disk, bandwidth, gpu
Returns:
Optional[Dict[str, Any]]: A dictionary with 'id' and 'status' keys if successful, None otherwise.
Raises:
ValueError: If configuration is invalid or GPU type not available in location.
"""
# Combine all configs
ALL_CONFIGS = {**GPU_CONFIGS, **VCPU_CONFIGS}
# Validate configuration
if config not in ALL_CONFIGS:
available_configs = list(ALL_CONFIGS.keys())
raise ValueError(
f"Invalid config: {config}. Available configs: {available_configs}"
)
# Get base configuration
instance_config = ALL_CONFIGS[config].copy()
# Apply custom config if provided
if custom_config:
instance_config.update(custom_config)
# Validate location
if location not in LOCATION_GPU_MAP:
raise ValueError(
f"Invalid location: {location}. Valid locations: {list(LOCATION_GPU_MAP.keys())}"
)
# Validate GPU type for location (only if GPU instance)
if instance_config["gpu"]: # If not empty (i.e., GPU instance)
gpu_type = instance_config["gpu"][0] # Get the GPU model
if gpu_type not in LOCATION_GPU_MAP[location]:
raise ValueError(
f"GPU type '{gpu_type}' not available in location '{location}'. "
f"Available GPUs: {LOCATION_GPU_MAP[location]}"
)
# Build the payload - exact format matching the API request
payload = {
"bandwidth": instance_config["bandwidth"],
"container_image": container_image,
"cpu": instance_config["cpu"],
"disk": instance_config["disk"],
"gpu": instance_config["gpu"],
"https_ports": https_ports if https_ports is not None else [8888],
"launch_jupyter_notebook": launch_jupyter_notebook,
"location": location,
"memory": instance_config["memory"],
"name": name,
"tcp_ports": tcp_ports if tcp_ports is not None else [],
"type": instance_type,
"udp_ports": udp_ports if udp_ports is not None else []
}
# Log the payload for debugging
self.logger.info(f"Creating instance with payload: {payload}")
try:
response = requests.post(
f"{self.base_url}/instances/instance",
headers={
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
},
json=payload
)
# Log response details for debugging
self.logger.info(f"Response status code: {response.status_code}")
if response.status_code != 200:
self.logger.error(f"Response body: {response.text}")
response.raise_for_status()
response_data = response.json()
instance_data = response_data.get("instance", {})
return {
"id": instance_data.get("id"),
"status": instance_data.get("status")
}
except requests.RequestException as e:
self.logger.error(f"Failed to create instance: {e}")
if hasattr(e, 'response') and e.response is not None:
self.logger.error(f"Response content: {e.response.text}")
return None
def get_available_locations(self, gpu_type: Optional[str] = None) -> List[str]:
"""
Get available locations, optionally filtered by GPU type.
Args:
gpu_type (str, optional): GPU model to filter locations by.
Returns:
List[str]: List of available locations.
"""
if gpu_type:
return [loc for loc, gpus in LOCATION_GPU_MAP.items() if gpu_type in gpus]
return list(LOCATION_GPU_MAP.keys())