ColettoG's picture
add staking + registry swap
21d8407
"""Gateway-backed storage for staking intents with local fallback."""
from __future__ import annotations
import copy
import time
import logging
from datetime import datetime, timezone
from threading import Lock
from typing import Any, Dict, List, Optional
from src.integrations.panorama_gateway import (
PanoramaGatewayClient,
PanoramaGatewayError,
PanoramaGatewaySettings,
get_panorama_settings,
)
STAKING_SESSION_ENTITY = "staking-sessions"
STAKING_HISTORY_ENTITY = "staking-histories"
def _utc_now_iso() -> str:
return datetime.utcnow().replace(tzinfo=timezone.utc).isoformat()
def _identifier(user_id: str, conversation_id: str) -> str:
return f"{user_id}:{conversation_id}"
def _as_float(value: Any) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
class StakingStateRepository:
"""Stores staking agent state via Panorama's gateway or an in-memory fallback."""
_instance: "StakingStateRepository" | None = None
_instance_lock: Lock = Lock()
def __init__(
self,
*,
client: PanoramaGatewayClient | None = None,
settings: PanoramaGatewaySettings | None = None,
history_limit: int = 10,
) -> None:
self._logger = logging.getLogger(__name__)
self._history_limit = history_limit
try:
self._settings = settings or get_panorama_settings()
self._client = client or PanoramaGatewayClient(self._settings)
self._use_gateway = True
except ValueError:
# PANORAMA_GATEWAY_URL or JWT secrets not configured - fall back to local store.
self._settings = None
self._client = None
self._use_gateway = False
self._init_local_store()
def _init_local_store(self) -> None:
if not hasattr(self, "_state"):
self._state = {"intents": {}, "metadata": {}, "history": {}}
def _tenant_id(self) -> str:
return self._settings.tenant_id if self._settings else "tenant-agent"
def _fallback_to_local_store(self) -> None:
if self._use_gateway:
self._logger.warning("Panorama gateway unavailable for staking state; switching to in-memory fallback.")
self._use_gateway = False
self._init_local_store()
def _handle_gateway_failure(self, exc: PanoramaGatewayError) -> None:
self._logger.warning(
"Panorama gateway error (%s) for staking repository: %s",
getattr(exc, "status_code", "unknown"),
getattr(exc, "payload", exc),
)
self._fallback_to_local_store()
# ---- Singleton helpers -----------------------------------------------
@classmethod
def instance(cls) -> "StakingStateRepository":
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
@classmethod
def reset(cls) -> None:
with cls._instance_lock:
cls._instance = None
# ---- Core API ---------------------------------------------------------
def load_intent(self, user_id: str, conversation_id: str) -> Optional[Dict[str, Any]]:
if not self._use_gateway:
self._init_local_store()
record = self._state["intents"].get(_identifier(user_id, conversation_id))
if not record:
return None
return copy.deepcopy(record.get("intent"))
session = self._get_session(user_id, conversation_id)
if not self._use_gateway:
return self.load_intent(user_id, conversation_id)
if not session:
return None
return session.get("intent") or None
def persist_intent(
self,
user_id: str,
conversation_id: str,
intent: Dict[str, Any],
metadata: Dict[str, Any],
done: bool,
summary: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
if not self._use_gateway:
self._init_local_store()
key = _identifier(user_id, conversation_id)
now = time.time()
if done:
self._state["intents"].pop(key, None)
else:
self._state["intents"][key] = {"intent": copy.deepcopy(intent), "updated_at": now}
if metadata:
meta_copy = copy.deepcopy(metadata)
meta_copy["updated_at"] = now
self._state["metadata"][key] = meta_copy
if done and summary:
history = self._state["history"].setdefault(key, [])
summary_copy = copy.deepcopy(summary)
summary_copy.setdefault("timestamp", now)
history.append(summary_copy)
self._state["history"][key] = history[-self._history_limit:]
return self.get_history(user_id, conversation_id)
try:
if done:
if summary:
self._create_history_entry(user_id, conversation_id, summary)
self._delete_session(user_id, conversation_id)
else:
payload = self._session_payload(intent, metadata)
self._upsert_session(user_id, conversation_id, payload)
return self.get_history(user_id, conversation_id)
except PanoramaGatewayError as exc:
self._handle_gateway_failure(exc)
return self.persist_intent(user_id, conversation_id, intent, metadata, done, summary)
def set_metadata(
self,
user_id: str,
conversation_id: str,
metadata: Dict[str, Any],
) -> None:
if not self._use_gateway:
self._init_local_store()
key = _identifier(user_id, conversation_id)
if metadata:
meta_copy = copy.deepcopy(metadata)
meta_copy["updated_at"] = time.time()
self._state["metadata"][key] = meta_copy
else:
self._state["metadata"].pop(key, None)
return
try:
if not metadata:
self._delete_session(user_id, conversation_id)
return
session = self._get_session(user_id, conversation_id)
if not self._use_gateway:
return self.set_metadata(user_id, conversation_id, metadata)
intent = session.get("intent") if session else {}
payload = self._session_payload(intent or {}, metadata)
self._upsert_session(user_id, conversation_id, payload)
except PanoramaGatewayError as exc:
self._handle_gateway_failure(exc)
self.set_metadata(user_id, conversation_id, metadata)
def clear_metadata(self, user_id: str, conversation_id: str) -> None:
self.set_metadata(user_id, conversation_id, {})
def clear_intent(self, user_id: str, conversation_id: str) -> None:
if not self._use_gateway:
self._init_local_store()
self._state["intents"].pop(_identifier(user_id, conversation_id), None)
self._state["metadata"].pop(_identifier(user_id, conversation_id), None)
return
try:
self._delete_session(user_id, conversation_id)
except PanoramaGatewayError as exc:
self._handle_gateway_failure(exc)
self.clear_intent(user_id, conversation_id)
def get_metadata(self, user_id: str, conversation_id: str) -> Dict[str, Any]:
if not self._use_gateway:
self._init_local_store()
record = self._state["metadata"].get(_identifier(user_id, conversation_id))
if not record:
return {}
entry = copy.deepcopy(record)
ts = entry.pop("updated_at", None)
if ts is not None:
entry["updated_at"] = datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat()
return entry
session = self._get_session(user_id, conversation_id)
if not self._use_gateway:
return self.get_metadata(user_id, conversation_id)
if not session:
return {}
intent = session.get("intent") or {}
metadata: Dict[str, Any] = {
"event": session.get("event"),
"status": session.get("status"),
"missing_fields": session.get("missingFields") or [],
"next_field": session.get("nextField"),
"pending_question": session.get("pendingQuestion"),
"choices": session.get("choices") or [],
"error": session.get("errorMessage"),
"user_id": user_id,
"conversation_id": conversation_id,
}
metadata["action"] = intent.get("action")
metadata["amount"] = intent.get("amount")
metadata["network"] = intent.get("network")
metadata["protocol"] = intent.get("protocol")
metadata["input_token"] = intent.get("input_token")
metadata["output_token"] = intent.get("output_token")
history = self.get_history(user_id, conversation_id)
if history:
metadata["history"] = history
updated_at = session.get("updatedAt")
if updated_at:
metadata["updated_at"] = updated_at
return metadata
def get_history(
self,
user_id: str,
conversation_id: str,
limit: Optional[int] = None,
) -> List[Dict[str, Any]]:
if not self._use_gateway:
key = _identifier(user_id, conversation_id)
history = self._state["history"].get(key, [])
effective = limit or self._history_limit
result: List[Dict[str, Any]] = []
for item in sorted(history, key=lambda entry: entry.get("timestamp", 0), reverse=True)[:effective]:
entry = copy.deepcopy(item)
ts = entry.get("timestamp")
if ts is not None:
entry["timestamp"] = datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat()
result.append(entry)
return result
effective_limit = limit or self._history_limit
try:
result = self._client.list(
STAKING_HISTORY_ENTITY,
{
"where": {"userId": user_id, "conversationId": conversation_id},
"orderBy": {"recordedAt": "desc"},
"take": effective_limit,
},
)
except PanoramaGatewayError as exc:
if exc.status_code == 404:
return []
self._handle_gateway_failure(exc)
return self.get_history(user_id, conversation_id, limit)
except ValueError:
self._logger.warning("Invalid staking history response from gateway; falling back to local store.")
self._fallback_to_local_store()
return self.get_history(user_id, conversation_id, limit)
data = result.get("data", []) if isinstance(result, dict) else []
history: List[Dict[str, Any]] = []
for entry in data:
history.append(
{
"status": entry.get("status"),
"action": entry.get("action"),
"amount": entry.get("amount"),
"network": entry.get("network"),
"protocol": entry.get("protocol"),
"input_token": entry.get("inputToken"),
"output_token": entry.get("outputToken"),
"error": entry.get("errorMessage"),
"timestamp": entry.get("recordedAt"),
}
)
return history
# ---- Gateway helpers --------------------------------------------------
def _get_session(self, user_id: str, conversation_id: str) -> Optional[Dict[str, Any]]:
identifier = _identifier(user_id, conversation_id)
try:
return self._client.get(STAKING_SESSION_ENTITY, identifier)
except PanoramaGatewayError as exc:
if exc.status_code == 404:
return None
self._handle_gateway_failure(exc)
return None
def _delete_session(self, user_id: str, conversation_id: str) -> None:
identifier = _identifier(user_id, conversation_id)
try:
self._client.delete(STAKING_SESSION_ENTITY, identifier)
except PanoramaGatewayError as exc:
if exc.status_code != 404:
self._handle_gateway_failure(exc)
raise
def _upsert_session(
self,
user_id: str,
conversation_id: str,
data: Dict[str, Any],
) -> None:
identifier = _identifier(user_id, conversation_id)
payload = {**data, "updatedAt": _utc_now_iso()}
try:
self._client.update(STAKING_SESSION_ENTITY, identifier, payload)
except PanoramaGatewayError as exc:
if exc.status_code != 404:
self._handle_gateway_failure(exc)
raise
create_payload = {
"userId": user_id,
"conversationId": conversation_id,
"tenantId": self._tenant_id(),
**payload,
}
try:
self._client.create(STAKING_SESSION_ENTITY, create_payload)
except PanoramaGatewayError as create_exc:
if create_exc.status_code == 409:
return
if create_exc.status_code == 404:
self._handle_gateway_failure(create_exc)
raise
self._handle_gateway_failure(create_exc)
raise
def _create_history_entry(
self,
user_id: str,
conversation_id: str,
summary: Dict[str, Any],
) -> None:
history_payload = {
"userId": user_id,
"conversationId": conversation_id,
"status": summary.get("status"),
"action": summary.get("action"),
"amount": _as_float(summary.get("amount")),
"network": summary.get("network"),
"protocol": summary.get("protocol"),
"inputToken": summary.get("input_token"),
"outputToken": summary.get("output_token"),
"errorMessage": summary.get("error"),
"recordedAt": _utc_now_iso(),
"tenantId": self._tenant_id(),
}
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(
"Persisting staking history for user=%s conversation=%s payload=%s",
user_id,
conversation_id,
history_payload,
)
try:
self._client.create(STAKING_HISTORY_ENTITY, history_payload)
except PanoramaGatewayError as exc:
if exc.status_code == 404:
self._handle_gateway_failure(exc)
raise
elif exc.status_code != 409:
self._handle_gateway_failure(exc)
raise
@staticmethod
def _session_payload(intent: Dict[str, Any], metadata: Dict[str, Any]) -> Dict[str, Any]:
missing = metadata.get("missing_fields") or []
if not isinstance(missing, list):
missing = list(missing)
return {
"status": metadata.get("status"),
"event": metadata.get("event"),
"intent": intent,
"missingFields": missing,
"nextField": metadata.get("next_field"),
"pendingQuestion": metadata.get("pending_question"),
"choices": metadata.get("choices"),
"errorMessage": metadata.get("error"),
"historyCursor": metadata.get("history_cursor") or 0,
}