wwieerrz's picture
🎨 Launch DimensioDepth - Advanced AI Depth Estimation
463afdd
from fastapi import FastAPI, UploadFile, File, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from typing import Optional, Literal
import asyncio
import time
import hashlib
import io
# Import our utilities
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
from config import get_settings
from utils.model_loader import ModelManager
from utils.image_processing import (
load_image_from_bytes,
load_image_from_base64,
array_to_base64,
depth_to_colormap,
create_side_by_side
)
from utils.demo_depth import generate_smart_depth
# Initialize FastAPI app
app = FastAPI(
title="Dimensio API",
description="Add Dimension to Everything - High-performance depth estimation and 3D visualization API",
version="1.0.0"
)
settings = get_settings()
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global model manager
model_manager = ModelManager()
DEMO_MODE = False # Will be set to True if no models available
# Request/Response models
class DepthRequest(BaseModel):
"""Request model for depth estimation"""
image: str # Base64 encoded image
model: Literal["small", "large"] = "small"
output_format: Literal["grayscale", "colormap", "both"] = "colormap"
colormap: Literal["inferno", "viridis", "plasma", "turbo"] = "inferno"
class DepthResponse(BaseModel):
"""Response model for depth estimation"""
depth_map: str # Base64 encoded depth map
metadata: dict
processing_time_ms: float
# Startup/shutdown events
@app.on_event("startup")
async def startup_event():
"""Initialize models on startup"""
print(">> Starting Dimensio API...")
try:
# Load small model (fast preview)
small_model_path = Path(settings.MODEL_CACHE_DIR) / settings.DEPTH_MODEL_SMALL
if small_model_path.exists():
model_manager.load_model(
"small",
str(small_model_path),
use_gpu=settings.USE_GPU,
use_tensorrt=settings.TRT_OPTIMIZATION
)
print("[+] Small model loaded")
else:
print(f"[!] Small model not found: {small_model_path}")
# Load large model (high quality)
large_model_path = Path(settings.MODEL_CACHE_DIR) / settings.DEPTH_MODEL_LARGE
if large_model_path.exists():
model_manager.load_model(
"large",
str(large_model_path),
use_gpu=settings.USE_GPU,
use_tensorrt=settings.TRT_OPTIMIZATION
)
print("[+] Large model loaded")
else:
print(f"[!] Large model not found: {large_model_path}")
if not model_manager.models:
global DEMO_MODE
DEMO_MODE = True
print("\n[!] No models loaded - Running in DEMO MODE")
print("Demo mode uses synthetic depth maps for testing the UI.")
print("\nTo use real AI models:")
print("1. Run: python download_models.py")
print("2. Place ONNX models in models/cache/")
print("3. Restart the server")
except Exception as e:
print(f"[X] Error loading models: {e}")
print("Server will start but depth estimation will not work.")
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
print(">> Shutting down Depth Flow Pro API...")
# Health check
@app.get("/")
async def root():
"""API health check"""
return {
"name": "Depth Flow Pro API",
"version": "1.0.0",
"status": "online",
"models_loaded": list(model_manager.models.keys())
}
@app.get("/health")
async def health_check():
"""Detailed health check"""
return {
"status": "healthy",
"models": {
name: "loaded" for name in model_manager.models.keys()
},
"gpu_enabled": settings.USE_GPU,
"tensorrt_enabled": settings.TRT_OPTIMIZATION
}
# Depth estimation endpoints
@app.post("/api/v1/depth/preview", response_model=DepthResponse)
async def estimate_depth_preview(file: UploadFile = File(...)):
"""
Fast depth estimation using small model (preview quality)
Optimized for speed, ~50-100ms on GPU
"""
try:
start_time = time.time()
# Load image
image_bytes = await file.read()
image = load_image_from_bytes(image_bytes)
# Check if demo mode or use real model
if DEMO_MODE:
# Use synthetic depth for demo
depth = generate_smart_depth(image)
model_name = "demo"
else:
# Get small model
model = model_manager.get_model("small")
if model is None:
raise HTTPException(
status_code=503,
detail="Small model not loaded. Please check server logs."
)
# Run depth estimation
depth = model.predict(image)
model_name = "small"
# Convert to colormap
depth_colored = depth_to_colormap(depth)
# Encode to base64
depth_base64 = array_to_base64(depth_colored, format='PNG')
processing_time = (time.time() - start_time) * 1000
return DepthResponse(
depth_map=depth_base64,
metadata={
"model": model_name,
"input_size": image.shape[:2],
"output_size": depth.shape[:2],
"demo_mode": DEMO_MODE
},
processing_time_ms=round(processing_time, 2)
)
except Exception as e:
print(f"❌ Error: {type(e).__name__}: {str(e)}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/depth/hq", response_model=DepthResponse)
async def estimate_depth_hq(file: UploadFile = File(...)):
"""
High-quality depth estimation using large model
Slower but more accurate, ~500-1500ms on GPU
"""
try:
start_time = time.time()
# Load image
image_bytes = await file.read()
image = load_image_from_bytes(image_bytes)
# Check if demo mode or use real model
if DEMO_MODE:
# Use synthetic depth for demo
depth = generate_smart_depth(image)
model_name = "demo (HQ)"
else:
# Get large model
model = model_manager.get_model("large")
if model is None:
# Fallback to small model if large not available
model = model_manager.get_model("small")
if model is None:
raise HTTPException(
status_code=503,
detail="No models loaded. Please check server logs."
)
model_name = "small (fallback)"
else:
model_name = "large"
# Run depth estimation
depth = model.predict(image)
# Convert to colormap
depth_colored = depth_to_colormap(depth)
# Encode to base64
depth_base64 = array_to_base64(depth_colored, format='PNG')
processing_time = (time.time() - start_time) * 1000
return DepthResponse(
depth_map=depth_base64,
metadata={
"model": model_name,
"input_size": image.shape[:2],
"output_size": depth.shape[:2],
"demo_mode": DEMO_MODE
},
processing_time_ms=round(processing_time, 2)
)
except Exception as e:
print(f"❌ Error: {type(e).__name__}: {str(e)}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/depth/estimate")
async def estimate_depth(request: DepthRequest):
"""
Depth estimation with custom options
Accepts base64 encoded image
"""
try:
start_time = time.time()
# Load image from base64
image = load_image_from_base64(request.image)
# Get model
model = model_manager.get_model(request.model)
if model is None:
raise HTTPException(
status_code=503,
detail=f"Model '{request.model}' not loaded"
)
# Run depth estimation
depth = model.predict(image)
# Process output based on format
if request.output_format == "grayscale":
output = (depth * 255).astype('uint8')
depth_base64 = array_to_base64(output, format='PNG')
elif request.output_format == "colormap":
import cv2
colormap_dict = {
"inferno": cv2.COLORMAP_INFERNO,
"viridis": cv2.COLORMAP_VIRIDIS,
"plasma": cv2.COLORMAP_PLASMA,
"turbo": cv2.COLORMAP_TURBO
}
depth_colored = depth_to_colormap(depth, colormap_dict[request.colormap])
depth_base64 = array_to_base64(depth_colored, format='PNG')
else: # both
side_by_side = create_side_by_side(image, depth, colormap=True)
depth_base64 = array_to_base64(side_by_side, format='PNG')
processing_time = (time.time() - start_time) * 1000
return DepthResponse(
depth_map=depth_base64,
metadata={
"model": request.model,
"output_format": request.output_format,
"colormap": request.colormap,
"input_size": image.shape[:2],
"output_size": depth.shape[:2]
},
processing_time_ms=round(processing_time, 2)
)
except Exception as e:
print(f"❌ Error: {type(e).__name__}: {str(e)}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
# WebSocket for streaming
@app.websocket("/api/v1/stream")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for real-time depth estimation
Supports streaming multiple images
"""
await websocket.accept()
try:
while True:
# Receive image data
data = await websocket.receive_json()
if data.get("action") == "estimate":
start_time = time.time()
# Load image
image = load_image_from_base64(data["image"])
# Get model
model_name = data.get("model", "small")
model = model_manager.get_model(model_name)
if model is None:
await websocket.send_json({
"error": f"Model '{model_name}' not loaded"
})
continue
# Send progress update
await websocket.send_json({
"status": "processing",
"progress": 50
})
# Run depth estimation
depth = model.predict(image)
# Convert to colormap
depth_colored = depth_to_colormap(depth)
depth_base64 = array_to_base64(depth_colored, format='PNG')
processing_time = (time.time() - start_time) * 1000
# Send result
await websocket.send_json({
"status": "complete",
"depth_map": depth_base64,
"processing_time_ms": round(processing_time, 2)
})
except WebSocketDisconnect:
print("WebSocket disconnected")
except Exception as e:
print(f"WebSocket error: {e}")
await websocket.send_json({"error": str(e)})
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.HOST,
port=settings.PORT,
reload=settings.DEBUG
)