Spaces:
Running
Running
| import logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s %(levelname)s %(name)s: %(message)s", | |
| handlers=[logging.StreamHandler()] | |
| ) | |
| logging.info("Test log from app.py startup") | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import List | |
| import re | |
| from src.agents.config import Config | |
| from src.agents.supervisor.agent import Supervisor | |
| from src.models.chatMessage import ChatMessage | |
| from src.routes.chat_manager_routes import router as chat_manager_router | |
| from src.service.chat_manager import chat_manager_instance | |
| from src.agents.crypto_data.tools import get_coingecko_id, get_tradingview_symbol | |
| from src.agents.metadata import metadata | |
| # Initialize FastAPI app | |
| app = FastAPI(title="Zico Agent API", version="1.0") | |
| # Enable CORS for local/frontend dev | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Instantiate Supervisor agent (singleton LLM) | |
| supervisor = Supervisor(Config.get_llm()) | |
| logger = logging.getLogger(__name__) | |
| class ChatRequest(BaseModel): | |
| message: ChatMessage | |
| chain_id: str = "default" | |
| wallet_address: str = "default" | |
| conversation_id: str = "default" | |
| user_id: str = "anonymous" | |
| # Lightweight in-memory agent config for frontend integrations | |
| AVAILABLE_AGENTS = [ | |
| {"name": "default", "human_readable_name": "Default General Purpose", "description": "General chat and meta-queries about agents."}, | |
| {"name": "crypto data", "human_readable_name": "Crypto Data Fetcher", "description": "Real-time cryptocurrency prices, market cap, FDV, TVL."}, | |
| {"name": "token swap", "human_readable_name": "Token Swap Agent", "description": "Swap tokens using supported DEX APIs."}, | |
| {"name": "realtime search", "human_readable_name": "Real-Time Search", "description": "Search the web for recent information."}, | |
| {"name": "dexscreener", "human_readable_name": "DexScreener Analyst", "description": "Fetches and analyzes DEX trading data."}, | |
| {"name": "rugcheck", "human_readable_name": "Token Safety Analyzer", "description": "Analyzes token safety and trends (Solana)."}, | |
| {"name": "imagen", "human_readable_name": "Image Generator", "description": "Generate images from text prompts."}, | |
| {"name": "rag", "human_readable_name": "Document Assistant", "description": "Answer questions about uploaded documents."}, | |
| {"name": "tweet sizzler", "human_readable_name": "Tweet / X-Post Generator", "description": "Generate engaging tweets."}, | |
| {"name": "dca", "human_readable_name": "DCA Strategy Manager", "description": "Plan and manage DCA strategies."}, | |
| {"name": "base", "human_readable_name": "Base Transaction Manager", "description": "Handle transactions on Base network."}, | |
| {"name": "mor rewards", "human_readable_name": "MOR Rewards Tracker", "description": "Track MOR rewards and balances."}, | |
| {"name": "mor claims", "human_readable_name": "MOR Claims Agent", "description": "Claim MOR tokens."}, | |
| {"name": "lending", "human_readable_name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."}, | |
| ] | |
| # Default to a small, reasonable subset | |
| SELECTED_AGENTS = [agent["name"] for agent in AVAILABLE_AGENTS[:6]] | |
| # Commands exposed to the ChatInput autocomplete | |
| AGENT_COMMANDS = [ | |
| {"command": "morpheus", "name": "Default General Purpose", "description": "General assistant for simple queries and meta-questions."}, | |
| {"command": "crypto", "name": "Crypto Data Fetcher", "description": "Get prices, market cap, FDV, TVL and more."}, | |
| {"command": "document", "name": "Document Assistant", "description": "Ask questions about uploaded documents."}, | |
| {"command": "tweet", "name": "Tweet / X-Post Generator", "description": "Create engaging tweets about crypto and web3."}, | |
| {"command": "search", "name": "Real-Time Search", "description": "Search the web for recent events or updates."}, | |
| {"command": "dexscreener", "name": "DexScreener Analyst", "description": "Analyze DEX trading data on supported chains."}, | |
| {"command": "rugcheck", "name": "Token Safety Analyzer", "description": "Check token safety and view trending tokens."}, | |
| {"command": "dca", "name": "DCA Strategy Manager", "description": "Plan a dollar-cost averaging strategy."}, | |
| {"command": "base", "name": "Base Transaction Manager", "description": "Send tokens and swap on Base."}, | |
| {"command": "rewards", "name": "MOR Rewards Tracker", "description": "Check rewards balance and accrual."}, | |
| {"command": "lending", "name": "Lending Agent", "description": "Supply, borrow, repay, or withdraw assets."}, | |
| ] | |
| # Agents endpoints expected by the frontend | |
| def get_available_agents(): | |
| return { | |
| "selected_agents": SELECTED_AGENTS, | |
| "available_agents": AVAILABLE_AGENTS, | |
| } | |
| async def set_selected_agents(request: Request): | |
| global SELECTED_AGENTS | |
| data = await request.json() | |
| agents = data.get("agents", []) | |
| # Validate provided names against available agents | |
| available_names = {a["name"] for a in AVAILABLE_AGENTS} | |
| valid_agents = [a for a in agents if a in available_names] | |
| if not valid_agents: | |
| # Keep previous selection if nothing valid provided | |
| return {"status": "no_change", "agents": SELECTED_AGENTS} | |
| # Update selection | |
| SELECTED_AGENTS = valid_agents[:6] | |
| return {"status": "success", "agents": SELECTED_AGENTS} | |
| def get_agent_commands(): | |
| return {"commands": AGENT_COMMANDS} | |
| # Map agent runtime names to high-level types for storage/analytics | |
| def _map_agent_type(agent_name: str) -> str: | |
| mapping = { | |
| "crypto_agent": "crypto data", | |
| "default_agent": "default", | |
| "database_agent": "analysis", | |
| "search_agent": "realtime search", | |
| "swap_agent": "token swap", | |
| "lending_agent": "lending", | |
| "staking_agent": "staking", | |
| "supervisor": "supervisor", | |
| } | |
| return mapping.get(agent_name, "supervisor") | |
| def _sanitize_user_message_content(content: str | None) -> str | None: | |
| """Strip wrapper prompts (e.g., 'User Message: ...') the frontend might send.""" | |
| if not content: | |
| return content | |
| text = content.strip() | |
| marker = "user message:" | |
| lowered = text.lower() | |
| idx = lowered.rfind(marker) | |
| if idx != -1: | |
| candidate = text[idx + len(marker):].strip() | |
| if candidate: | |
| return candidate | |
| return text | |
| def _resolve_identity(request: ChatRequest) -> tuple[str, str]: | |
| """Ensure each request has a stable user and conversation identifier.""" | |
| user_id = (request.user_id or "").strip() | |
| if not user_id or user_id.lower() == "anonymous": | |
| wallet = (request.wallet_address or "").strip() | |
| if wallet and wallet.lower() != "default": | |
| user_id = f"wallet::{wallet.lower()}" | |
| else: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="A stable 'user_id' or wallet_address is required for swap operations.", | |
| ) | |
| conversation_id = (request.conversation_id or "").strip() or "default" | |
| return user_id, conversation_id | |
| def health_check(): | |
| return {"status": "ok"} | |
| def get_messages(request: Request): | |
| params = request.query_params | |
| conversation_id = params.get("conversation_id", "default") | |
| user_id = params.get("user_id", "anonymous") | |
| return {"messages": chat_manager_instance.get_messages(conversation_id, user_id)} | |
| def get_conversations(request: Request): | |
| params = request.query_params | |
| user_id = params.get("user_id", "anonymous") | |
| return {"conversation_ids": chat_manager_instance.get_all_conversation_ids(user_id)} | |
| def chat(request: ChatRequest): | |
| user_id: str | None = None | |
| conversation_id: str | None = None | |
| try: | |
| logger.debug("Received chat payload: %s", request.model_dump()) | |
| user_id, conversation_id = _resolve_identity(request) | |
| logger.debug( | |
| "Resolved chat identity user=%s conversation=%s wallet=%s", | |
| user_id, | |
| conversation_id, | |
| (request.wallet_address or "").strip() if request.wallet_address else None, | |
| ) | |
| wallet = request.wallet_address.strip() if request.wallet_address else None | |
| if wallet and wallet.lower() == "default": | |
| wallet = None | |
| display_name = None | |
| if isinstance(request.message.metadata, dict): | |
| display_name = request.message.metadata.get("display_name") | |
| chat_manager_instance.ensure_session( | |
| user_id, | |
| conversation_id, | |
| wallet_address=wallet, | |
| display_name=display_name, | |
| ) | |
| if request.message.role == "user": | |
| clean_content = _sanitize_user_message_content(request.message.content) | |
| if clean_content is not None: | |
| request.message.content = clean_content | |
| # Add the user message to the conversation | |
| chat_manager_instance.add_message( | |
| message=request.message.dict(), | |
| conversation_id=conversation_id, | |
| user_id=user_id | |
| ) | |
| # Get all messages from the conversation to pass to the agent | |
| conversation_messages = chat_manager_instance.get_messages( | |
| conversation_id=conversation_id, | |
| user_id=user_id | |
| ) | |
| # Invoke the supervisor agent with the conversation | |
| result = supervisor.invoke( | |
| conversation_messages, | |
| conversation_id=conversation_id, | |
| user_id=user_id, | |
| ) | |
| logger.debug( | |
| "Supervisor returned result for user=%s conversation=%s: %s", | |
| user_id, | |
| conversation_id, | |
| result, | |
| ) | |
| # Add the agent's response to the conversation | |
| if result and isinstance(result, dict): | |
| agent_name = result.get("agent", "supervisor") | |
| agent_name = _map_agent_type(agent_name) | |
| # Build response metadata and enrich with coin info for crypto price queries | |
| response_metadata = {"supervisor_result": result} | |
| swap_meta_snapshot = None | |
| # Prefer supervisor-provided metadata | |
| if isinstance(result, dict) and result.get("metadata"): | |
| response_metadata.update(result.get("metadata") or {}) | |
| elif agent_name == "token swap": | |
| swap_meta = metadata.get_swap_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if swap_meta: | |
| response_metadata.update(swap_meta) | |
| swap_meta_snapshot = swap_meta | |
| elif agent_name == "lending": | |
| lending_meta = metadata.get_lending_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if lending_meta: | |
| response_metadata.update(lending_meta) | |
| elif agent_name == "staking": | |
| staking_meta = metadata.get_staking_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if staking_meta: | |
| response_metadata.update(staking_meta) | |
| logger.debug( | |
| "Response metadata for user=%s conversation=%s: %s", | |
| user_id, | |
| conversation_id, | |
| response_metadata, | |
| ) | |
| # Create a ChatMessage from the supervisor response | |
| response_message = ChatMessage( | |
| role="assistant", | |
| content=result.get("response", "No response available"), | |
| agent_name=agent_name, | |
| agent_type=_map_agent_type(agent_name), | |
| metadata=result.get("metadata", {}), | |
| conversation_id=conversation_id, | |
| user_id=user_id, | |
| requires_action=True if agent_name in ["token swap", "lending", "staking"] else False, | |
| action_type="swap" if agent_name == "token swap" else "lending" if agent_name == "lending" else "staking" if agent_name == "staking" else None | |
| ) | |
| # Add the response message to the conversation | |
| chat_manager_instance.add_message( | |
| message=response_message.dict(), | |
| conversation_id=conversation_id, | |
| user_id=user_id | |
| ) | |
| # Return only the clean response | |
| response_payload = { | |
| "response": result.get("response", "No response available"), | |
| "agentName": agent_name, | |
| } | |
| response_meta = result.get("metadata") or {} | |
| if agent_name == "token swap" and not response_meta: | |
| if swap_meta_snapshot: | |
| response_meta = swap_meta_snapshot | |
| else: | |
| swap_meta = metadata.get_swap_agent( | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if swap_meta: | |
| response_meta = swap_meta | |
| if response_meta: | |
| response_payload["metadata"] = response_meta | |
| if agent_name == "token swap": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "swap_intent_ready" | |
| if should_clear: | |
| metadata.set_swap_agent( | |
| {}, | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if agent_name == "lending": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "lending_intent_ready" | |
| if should_clear: | |
| metadata.set_lending_agent( | |
| {}, | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| if agent_name == "staking": | |
| should_clear = False | |
| if response_meta: | |
| status = response_meta.get("status") if isinstance(response_meta, dict) else None | |
| event = response_meta.get("event") if isinstance(response_meta, dict) else None | |
| should_clear = status == "ready" or event == "staking_intent_ready" | |
| if should_clear: | |
| metadata.set_staking_agent( | |
| {}, | |
| user_id=user_id, | |
| conversation_id=conversation_id, | |
| ) | |
| return response_payload | |
| return {"response": "No response available", "agent": "supervisor"} | |
| except Exception as e: | |
| logger.exception( | |
| "Chat handler failed for user=%s conversation=%s", | |
| user_id, | |
| conversation_id, | |
| ) | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Include chat manager router | |
| app.include_router(chat_manager_router) | |