Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| MarioGPT MCP Server | |
| Provides MCP-compatible interface for generating Mario levels via HuggingChat | |
| """ | |
| import asyncio | |
| import logging | |
| from typing import Any, Optional, List, Union | |
| from mcp.server import Server | |
| from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource | |
| from pydantic import BaseModel, Field | |
| import base64 | |
| import io | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize MCP Server | |
| mcp_server = Server("mariogpt-server") | |
| # Model will be initialized on first use | |
| mario_lm = None | |
| device = None | |
| def initialize_model(): | |
| """Lazy initialization of the Mario model""" | |
| global mario_lm, device | |
| if mario_lm is None: | |
| try: | |
| import torch | |
| from supermariogpt.lm import MarioLM | |
| mario_lm = MarioLM() | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| mario_lm = mario_lm.to(device) | |
| logger.info(f"MarioGPT model loaded on {device}") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize model: {e}") | |
| raise | |
| class GenerateLevelParams(BaseModel): | |
| """Parameters for generating a Mario level""" | |
| prompt: str = Field( | |
| description="Text description of the level (e.g., 'many pipes, some enemies, low elevation')" | |
| ) | |
| temperature: float = Field( | |
| default=2.0, | |
| ge=0.1, | |
| le=2.0, | |
| description="Generation temperature (0.1-2.0). Higher = more diverse but lower quality" | |
| ) | |
| level_size: int = Field( | |
| default=1399, | |
| ge=100, | |
| le=2799, | |
| description="Size of the level in tokens (100-2799)" | |
| ) | |
| async def list_tools() -> List[Tool]: | |
| """List available MCP tools""" | |
| return [ | |
| Tool( | |
| name="generate_mario_level", | |
| description="Generate a playable Super Mario level from text description. " | |
| "Returns both a visual representation and level data. " | |
| "Example prompts: 'many pipes, some enemies, high elevation', " | |
| "'no pipes, many enemies, some blocks, low elevation'", | |
| inputSchema=GenerateLevelParams.model_json_schema() | |
| ), | |
| Tool( | |
| name="get_level_suggestions", | |
| description="Get example prompts and suggestions for creating interesting Mario levels", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": {}, | |
| } | |
| ) | |
| ] | |
| async def call_tool(name: str, arguments: Any) -> List[Union[TextContent, ImageContent, EmbeddedResource]]: | |
| """Handle tool calls""" | |
| if name == "generate_mario_level": | |
| try: | |
| # Initialize model if needed | |
| initialize_model() | |
| # Parse and validate parameters | |
| params = GenerateLevelParams(**arguments) | |
| logger.info(f"Generating level with prompt: {params.prompt}") | |
| logger.info(f"Temperature: {params.temperature}, Size: {params.level_size}") | |
| # Import required modules | |
| from supermariogpt.utils import view_level, convert_level_to_png | |
| TILE_DIR = "data/tiles" | |
| # Generate level | |
| prompts = [params.prompt] | |
| generated_level = mario_lm.sample( | |
| prompts=prompts, | |
| num_steps=params.level_size, | |
| temperature=float(params.temperature), | |
| use_tqdm=False | |
| ) | |
| # Convert to text representation | |
| level_lines = view_level(generated_level, mario_lm.tokenizer) | |
| level_text = '\n'.join(level_lines) | |
| # Generate PNG image | |
| try: | |
| img = convert_level_to_png( | |
| generated_level.squeeze(), | |
| TILE_DIR, | |
| mario_lm.tokenizer | |
| )[0] | |
| # Convert PIL Image to base64 | |
| buffered = io.BytesIO() | |
| img.save(buffered, format="PNG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode() | |
| return [ | |
| TextContent( | |
| type="text", | |
| text=f"Successfully generated Mario level!\n\n" | |
| f"Prompt: {params.prompt}\n" | |
| f"Temperature: {params.temperature}\n" | |
| f"Level size: {params.level_size}\n\n" | |
| f"Level representation:\n{level_text[:500] + '...' if len(level_text) > 500 else level_text}" | |
| ), | |
| ImageContent( | |
| type="image", | |
| data=img_base64, | |
| mimeType="image/png" | |
| ) | |
| ] | |
| except Exception as img_error: | |
| logger.warning(f"Could not generate image: {img_error}") | |
| return [ | |
| TextContent( | |
| type="text", | |
| text=f"Successfully generated Mario level!\n\n" | |
| f"Prompt: {params.prompt}\n\n" | |
| f"Level representation:\n{level_text}" | |
| ) | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error generating level: {e}") | |
| return [ | |
| TextContent( | |
| type="text", | |
| text=f"Error generating Mario level: {str(e)}" | |
| ) | |
| ] | |
| elif name == "get_level_suggestions": | |
| suggestions = """ | |
| # Mario Level Generation Suggestions | |
| ## Pipe Variations: | |
| - "no pipes, many enemies, some blocks, low elevation" | |
| - "many pipes, few enemies, many blocks, high elevation" | |
| - "some pipes, some enemies, some blocks, low elevation" | |
| ## Enemy Focused: | |
| - "little pipes, many enemies, little blocks, low elevation" | |
| - "no pipes, many enemies, many blocks, high elevation" | |
| ## Platform Challenges: | |
| - "some pipes, few enemies, many blocks, high elevation" | |
| - "no pipes, some enemies, many blocks, high elevation" | |
| ## Balanced Levels: | |
| - "some pipes, some enemies, some blocks, low elevation" | |
| - "many pipes, some enemies, some blocks, high elevation" | |
| ## Tips: | |
| - Use "high elevation" for more vertical platforming challenges | |
| - Use "low elevation" for more horizontal levels | |
| - "many enemies" creates more combat-focused levels | |
| - "many blocks" creates more platform-jumping challenges | |
| - Temperature 1.0-1.5: More consistent, quality levels | |
| - Temperature 1.5-2.0: More diverse, experimental levels | |
| """ | |
| return [ | |
| TextContent( | |
| type="text", | |
| text=suggestions | |
| ) | |
| ] | |
| else: | |
| raise ValueError(f"Unknown tool: {name}") | |
| async def run_server(): | |
| """Run the MCP server""" | |
| from mcp.server.stdio import stdio_server | |
| logger.info("Starting MarioGPT MCP Server...") | |
| async with stdio_server() as (read_stream, write_stream): | |
| await mcp_server.run( | |
| read_stream, | |
| write_stream, | |
| mcp_server.create_initialization_options() | |
| ) | |
| if __name__ == "__main__": | |
| asyncio.run(run_server()) | |