Spaces:
Running
Running
| """ | |
| Professional WhatsApp Bot using Green-API | |
| Author: Assistant | |
| Description: A comprehensive WhatsApp bot with a professional, class-based structure. | |
| Features include image generation, image editing, voice replies, | |
| and various utility functions, all handled by an asynchronous task queue. | |
| """ | |
| import os | |
| import threading | |
| import requests | |
| import logging | |
| import queue | |
| import json | |
| import base64 | |
| from typing import List, Optional, Union, Literal, Dict, Any, Tuple | |
| from collections import defaultdict, deque | |
| from concurrent.futures import ThreadPoolExecutor | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse, PlainTextResponse | |
| from pydantic import BaseModel, Field, ValidationError | |
| import uvicorn | |
| # Assume these are your custom libraries for AI functionalities | |
| from FLUX import generate_image | |
| from VoiceReply import generate_voice_reply | |
| from polLLM import generate_llm, LLMBadRequestError | |
| import flux_kontext_lib | |
| # --- Configuration --------------------------------------------------------- | |
| class BotConfig: | |
| """Manages all bot configuration from environment variables.""" | |
| GREEN_API_URL: str | |
| GREEN_API_TOKEN: str | |
| GREEN_API_ID_INSTANCE: str | |
| WEBHOOK_AUTH_TOKEN: str | |
| IMAGE_DIR: str = "/tmp/whatsapp_images" | |
| AUDIO_DIR: str = "/tmp/whatsapp_audio" | |
| TEMP_DIR: str = "/tmp/whatsapp_edit" | |
| DEFAULT_IMAGE_COUNT: int = 4 | |
| MAX_HISTORY_SIZE: int = 20 | |
| WORKER_THREADS: int = 4 | |
| LOG_LEVEL: str = "INFO" | |
| def __init__(self): | |
| self.GREEN_API_URL = os.getenv("GREEN_API_URL") | |
| self.GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
| self.GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
| self.WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
| self.LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() | |
| self._validate() | |
| def _validate(self): | |
| """Ensures all required environment variables are set.""" | |
| missing = [ | |
| var for var in ("GREEN_API_URL", "GREEN_API_TOKEN", | |
| "GREEN_API_ID_INSTANCE", "WEBHOOK_AUTH_TOKEN") | |
| if not getattr(self, var) | |
| ] | |
| if missing: | |
| raise ValueError(f"Missing required environment variables: {', '.join(missing)}") | |
| # --- Logging Setup --------------------------------------------------------- | |
| class LoggerSetup: | |
| """Sets up and manages structured logging for the application.""" | |
| def setup(level: str) -> logging.Logger: | |
| logger = logging.getLogger("whatsapp_bot") | |
| logger.setLevel(level) | |
| logger.handlers.clear() | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter( | |
| "%(asctime)s [%(levelname)s] [%(chat_id)s] %(funcName)s:%(lineno)d - %(message)s" | |
| ) | |
| handler.setFormatter(formatter) | |
| class ContextFilter(logging.Filter): | |
| def filter(self, record): | |
| record.chat_id = ThreadContext.get_context().get("chat_id", "-") | |
| return True | |
| handler.addFilter(ContextFilter()) | |
| logger.addHandler(handler) | |
| return logger | |
| # --- Thread Context Management --------------------------------------------- | |
| class ThreadContext: | |
| """Manages thread-local context for chat and message IDs.""" | |
| _context = threading.local() | |
| def set_context(cls, chat_id: str, message_id: str): | |
| cls._context.chat_id = chat_id | |
| cls._context.message_id = message_id | |
| def get_context(cls) -> Dict[str, Optional[str]]: | |
| return { | |
| "chat_id": getattr(cls._context, "chat_id", None), | |
| "message_id": getattr(cls._context, "message_id", None), | |
| } | |
| # --- Conversation History ------------------------------------------------- | |
| class ConversationManager: | |
| """Manages conversation history for each chat.""" | |
| def __init__(self, max_size: int): | |
| self.history = defaultdict(lambda: deque(maxlen=max_size)) | |
| def add_user_message(self, chat_id: str, message: str): | |
| self.history[chat_id].append(f"User: {message}") | |
| def add_bot_message(self, chat_id: str, message: str): | |
| self.history[chat_id].append(f"Assistant: {message}") | |
| def get_history_text(self, chat_id: str) -> str: | |
| return "\n".join(self.history[chat_id]) | |
| def clear_history(self, chat_id: str): | |
| self.history[chat_id].clear() | |
| # --- Green-API Client ----------------------------------------------------- | |
| class GreenApiClient: | |
| """A client for interacting with the Green-API for WhatsApp.""" | |
| def __init__(self, config: BotConfig, logger: logging.Logger): | |
| self.config = config | |
| self.logger = logger | |
| self.session = requests.Session() | |
| self.base_url = ( | |
| f"{self.config.GREEN_API_URL}/waInstance" | |
| f"{self.config.GREEN_API_ID_INSTANCE}" | |
| ) | |
| def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]: | |
| """Makes a request to the Green-API with retries.""" | |
| url = f"{self.base_url}/{endpoint}/{self.config.GREEN_API_TOKEN}" | |
| for attempt in range(3): | |
| try: | |
| response = self.session.request(method, url, timeout=20, **kwargs) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| self.logger.warning( | |
| f"API request to {endpoint} failed (attempt {attempt + 1}): {e}" | |
| ) | |
| self.logger.error(f"API request to {endpoint} failed after all retries.") | |
| return None | |
| def send_message(self, chat_id: str, text: str, quoted_message_id: str = None): | |
| payload = {"chatId": chat_id, "message": text} | |
| if quoted_message_id: | |
| payload["quotedMessageId"] = quoted_message_id | |
| return self._request("POST", "sendMessage", json=payload) | |
| def send_file(self, chat_id: str, file_path: str, caption: str = "", quoted_message_id: str = None): | |
| """Uploads and sends a file (image or audio).""" | |
| filename = os.path.basename(file_path) | |
| payload = {"chatId": chat_id, "caption": caption} | |
| if quoted_message_id: | |
| payload["quotedMessageId"] = quoted_message_id | |
| with open(file_path, "rb") as f: | |
| files = {"file": (filename, f)} | |
| return self._request("POST", "sendFileByUpload", data=payload, files=files) | |
| def download_file(self, url: str) -> Optional[bytes]: | |
| """Downloads a file from a given URL.""" | |
| try: | |
| response = self.session.get(url, timeout=30) | |
| response.raise_for_status() | |
| return response.content | |
| except requests.RequestException as e: | |
| self.logger.error(f"Failed to download file from {url}: {e}") | |
| return None | |
| # --- Pydantic Models for Intent Recognition -------------------------------- | |
| class BaseIntent(BaseModel): | |
| action: str | |
| class SummarizeIntent(BaseIntent): action: Literal["summarize"]; text: str | |
| class TranslateIntent(BaseIntent): action: Literal["translate"]; lang: str; text: str | |
| class JokeIntent(BaseIntent): action: Literal["joke"] | |
| class WeatherIntent(BaseIntent): action: Literal["weather"]; location: str | |
| class InspireIntent(BaseIntent): action: Literal["inspire"] | |
| class MemeIntent(BaseIntent): action: Literal["meme"]; text: str | |
| class EditImageIntent(BaseIntent): action: Literal["edit_image"]; prompt: str | |
| class GenerateImageIntent(BaseModel): | |
| action: Literal["generate_image"] | |
| prompt: str | |
| count: int = Field(default=1, ge=1, le=10) | |
| width: Optional[int] = Field(default=None, ge=512, le=2048) | |
| height: Optional[int] = Field(default=None, ge=512, le=2048) | |
| class SendTextIntent(BaseIntent): | |
| action: Literal["send_text"] | |
| message: str | |
| # --- Intent Router -------------------------------------------------------- | |
| class IntentRouter: | |
| """Recognizes user intent using an LLM and routes to appropriate actions.""" | |
| INTENT_MODELS = [ | |
| SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, | |
| InspireIntent, MemeIntent, GenerateImageIntent, EditImageIntent, SendTextIntent | |
| ] | |
| def __init__(self, conv_manager: ConversationManager, logger: logging.Logger): | |
| self.conv_manager = conv_manager | |
| self.logger = logger | |
| def get_intent(self, user_input: str, chat_id: str) -> BaseIntent: | |
| history_text = self.conv_manager.get_history_text(chat_id) | |
| system_prompt = self._build_system_prompt(history_text, user_input) | |
| try: | |
| raw_response = generate_llm(system_prompt) | |
| except LLMBadRequestError: | |
| self.conv_manager.clear_history(chat_id) | |
| return SendTextIntent(action="send_text", message="Oops! Let's start fresh! π") | |
| return self._parse_response(raw_response) | |
| def _build_system_prompt(self, history: str, user_input: str) -> str: | |
| return ( | |
| "You are a function dispatcher. You only invoke functions by returning a single JSON object.\n" | |
| "Available functions:\n" | |
| "- summarize(text): Summarize given text\n" | |
| "- translate(lang, text): Translate text to a language\n" | |
| "- joke(): Tell a random joke\n" | |
| "- weather(location): Get weather for a location\n" | |
| "- inspire(): Get an inspirational quote\n" | |
| "- meme(text): Generate a meme from text\n" | |
| "- generate_image(prompt, count, width, height): Generate images\n" | |
| "- edit_image(prompt): Edit an image (requires replying to an image)\n" | |
| "- send_text(message): Send a plain text response\n\n" | |
| "Return only raw JSON. Examples:\n" | |
| '{"action":"generate_image","prompt":"a red fox","count":2}\n' | |
| '{"action":"edit_image","prompt":"make the sky purple"}\n' | |
| '{"action":"send_text","message":"Hello there!"}\n\n' | |
| f"Conversation history:\n{history}\n\n" | |
| f"Current message: User: {user_input}" | |
| ) | |
| def _parse_response(self, raw_response: str) -> BaseIntent: | |
| try: | |
| parsed = json.loads(raw_response) | |
| for model in self.INTENT_MODELS: | |
| try: | |
| return model.model_validate(parsed) | |
| except ValidationError: | |
| continue | |
| except json.JSONDecodeError: | |
| pass | |
| # Fallback for non-JSON or unparsable responses | |
| return SendTextIntent(action="send_text", message=raw_response) | |
| # --- Main Application Class ------------------------------------------------ | |
| class WhatsAppBot: | |
| def __init__(self, config: BotConfig): | |
| self.config = config | |
| self.logger = LoggerSetup.setup(config.LOG_LEVEL) | |
| self.api_client = GreenApiClient(config, self.logger) | |
| self.conv_manager = ConversationManager(config.MAX_HISTORY_SIZE) | |
| self.intent_router = IntentRouter(self.conv_manager, self.logger) | |
| self.task_queue = queue.Queue() | |
| self.fastapi_app = FastAPI(title="WhatsApp Eve Bot", version="2.0.0") | |
| self._setup_routes() | |
| self._start_workers() | |
| def _setup_routes(self): | |
| async def webhook(request: Request): | |
| if request.headers.get("Authorization") != f"Bearer {self.config.WEBHOOK_AUTH_TOKEN}": | |
| raise HTTPException(403, "Unauthorized") | |
| payload = await request.json() | |
| self.logger.debug(f"Incoming webhook: {json.dumps(payload)}") | |
| # Process valid incoming messages in the background | |
| if payload.get("typeWebhook") == "incomingMessageReceived": | |
| executor.submit(self._process_incoming_message, payload) | |
| return JSONResponse(content={"status": "received"}) | |
| def health_check(): | |
| return JSONResponse(content={"status": "healthy"}) | |
| def _start_workers(self): | |
| for i in range(self.config.WORKER_THREADS): | |
| threading.Thread(target=self._worker, name=f"Worker-{i}", daemon=True).start() | |
| self.logger.info(f"Started {self.config.WORKER_THREADS} worker threads.") | |
| def _worker(self): | |
| """Worker thread to process tasks from the queue.""" | |
| while True: | |
| task = self.task_queue.get() | |
| try: | |
| handler = getattr(self, f"_task_{task['type']}", None) | |
| if handler: | |
| handler(task) | |
| else: | |
| self.logger.warning(f"Unknown task type: {task['type']}") | |
| except Exception as e: | |
| self.logger.error(f"Error processing task {task['type']}: {e}", exc_info=True) | |
| finally: | |
| self.task_queue.task_done() | |
| def _process_incoming_message(self, payload: Dict[str, Any]): | |
| """Main logic for handling an incoming message payload.""" | |
| try: | |
| chat_id = payload["senderData"]["chatId"] | |
| message_id = payload["idMessage"] | |
| ThreadContext.set_context(chat_id, message_id) | |
| message_data = payload.get("messageData", {}) | |
| type_message = message_data.get("typeMessage") | |
| text = "" | |
| if type_message == "textMessage": | |
| text = message_data["textMessageData"]["textMessage"] | |
| elif type_message == "extendedTextMessage": | |
| text = message_data["extendedTextMessageData"]["text"] | |
| text = text.strip() | |
| if not text: | |
| return | |
| self.conv_manager.add_user_message(chat_id, text) | |
| # Handle direct commands | |
| if text.startswith('/'): | |
| self._handle_command(chat_id, message_id, text, payload) | |
| else: | |
| # Handle natural language and replies | |
| self._handle_natural_language(chat_id, message_id, text, payload) | |
| except Exception as e: | |
| self.logger.error(f"Failed to process message payload: {e}", exc_info=True) | |
| def _handle_command(self, chat_id, message_id, text, payload): | |
| """Processes direct slash commands.""" | |
| parts = text.lower().split() | |
| command = parts[0] | |
| args = text.split(maxsplit=1)[1] if len(parts) > 1 else "" | |
| if command == "/help": | |
| help_text = ( | |
| "*π€ Eve's Command Center:*\n\n" | |
| "πΉ `/help` - Show this help message\n" | |
| "πΉ `/gen <prompt>` - Generate an image\n" | |
| "πΉ `/edit <prompt>` - Reply to an image to edit it\n" | |
| "πΉ `/joke` - Get a random joke\n" | |
| "πΉ `/inspire` - Receive an inspirational quote\n" | |
| "πΉ `/weather <location>` - Check the weather\n\n" | |
| "You can also just chat with me naturally!" | |
| ) | |
| self.api_client.send_message(chat_id, help_text, message_id) | |
| elif command == "/gen": | |
| self.task_queue.put({"type": "generate_image", "chat_id": chat_id, "message_id": message_id, "prompt": args}) | |
| elif command == "/edit": | |
| self._dispatch_edit_image(chat_id, message_id, args, payload) | |
| elif command == "/joke": | |
| self._task_joke({"chat_id": chat_id, "message_id": message_id}) | |
| elif command == "/inspire": | |
| self._task_inspire({"chat_id": chat_id, "message_id": message_id}) | |
| elif command == "/weather": | |
| self._task_weather({"chat_id": chat_id, "message_id": message_id, "location": args}) | |
| else: | |
| self.api_client.send_message(chat_id, "Unknown command. Type /help for options.", message_id) | |
| def _handle_natural_language(self, chat_id, message_id, text, payload): | |
| """Processes natural language using the intent router.""" | |
| intent = self.intent_router.get_intent(text, chat_id) | |
| task_data = { | |
| "chat_id": chat_id, | |
| "message_id": message_id, | |
| **intent.model_dump() | |
| } | |
| if intent.action == "edit_image": | |
| # This action needs the original payload to find the replied-to image | |
| self._dispatch_edit_image(chat_id, message_id, intent.prompt, payload) | |
| elif hasattr(self, f"_task_{intent.action}"): | |
| self.task_queue.put({"type": intent.action, **task_data}) | |
| else: | |
| self.logger.warning(f"No handler found for intent action: {intent.action}") | |
| self.api_client.send_message(chat_id, "Sorry, I'm not sure how to handle that.", message_id) | |
| def _dispatch_edit_image(self, chat_id, message_id, prompt, payload): | |
| """Checks for a replied-to image and dispatches the edit task.""" | |
| quoted_message = payload.get("messageData", {}).get("quotedMessage") | |
| if not quoted_message or quoted_message.get("typeMessage") != "imageMessage": | |
| self.api_client.send_message(chat_id, "To edit an image, please reply to it with your instructions.", message_id) | |
| return | |
| download_url = quoted_message["imageMessage"]["downloadUrl"] | |
| self.task_queue.put({ | |
| "type": "edit_image", | |
| "chat_id": chat_id, | |
| "message_id": message_id, | |
| "prompt": prompt, | |
| "download_url": download_url | |
| }) | |
| # --- Task Handler Methods --- | |
| def _task_send_text(self, task: Dict[str, Any]): | |
| chat_id, message_id, message = task["chat_id"], task["message_id"], task["message"] | |
| self.api_client.send_message(chat_id, message, message_id) | |
| self.conv_manager.add_bot_message(chat_id, message) | |
| self.task_queue.put({"type": "voice_reply", "chat_id": chat_id, "message_id": message_id, "text": message}) | |
| def _task_generate_image(self, task: Dict[str, Any]): | |
| chat_id, mid, prompt, count = task["chat_id"], task["message_id"], task["prompt"], task.get("count", 1) | |
| self.api_client.send_message(chat_id, f"π¨ Generating {count} image(s) for: \"{prompt}\"...", mid) | |
| for i in range(count): | |
| try: | |
| _, path, _, url = generate_image(prompt, mid, str(i), self.config.IMAGE_DIR, width=task.get("width"), height=task.get("height")) | |
| caption = f"β¨ Image {i+1}/{count}: {prompt}" | |
| self.api_client.send_file(chat_id, path, caption, mid) | |
| os.remove(path) | |
| except Exception as e: | |
| self.logger.error(f"Image generation {i+1} failed: {e}") | |
| self.api_client.send_message(chat_id, f"π’ Failed to generate image {i+1}.", mid) | |
| def _task_edit_image(self, task: Dict[str, Any]): | |
| chat_id, mid, prompt, url = task["chat_id"], task["message_id"], task["prompt"], task["download_url"] | |
| self.api_client.send_message(chat_id, f"π¨ Editing image with prompt: \"{prompt}\"...", mid) | |
| input_path, output_path = None, None | |
| try: | |
| image_data = self.api_client.download_file(url) | |
| if not image_data: | |
| raise ValueError("Failed to download image.") | |
| input_path = os.path.join(self.config.TEMP_DIR, f"input_{mid}.jpg") | |
| output_path = os.path.join(self.config.TEMP_DIR, f"output_{mid}.jpg") | |
| with open(input_path, 'wb') as f: | |
| f.write(image_data) | |
| flux_kontext_lib.generate_image(prompt, input_path, download_path=output_path) | |
| if os.path.exists(output_path): | |
| caption = f"β¨ Edited: {prompt}" | |
| self.api_client.send_file(chat_id, output_path, caption, mid) | |
| else: | |
| raise ValueError("Edited image file not found.") | |
| except Exception as e: | |
| self.logger.error(f"Image editing task failed: {e}") | |
| self.api_client.send_message(chat_id, "π’ Sorry, I failed to edit the image.", mid) | |
| finally: | |
| for path in [input_path, output_path]: | |
| if path and os.path.exists(path): | |
| os.remove(path) | |
| def _task_voice_reply(self, task: Dict[str, Any]): | |
| text = task["text"] | |
| prompt = f"Say this in a friendly, playful, and slightly clumsy-cute way: {text}" | |
| try: | |
| result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=self.config.AUDIO_DIR) | |
| if result and result[0]: | |
| path, _ = result | |
| self.api_client.send_file(task["chat_id"], path, quoted_message_id=task["message_id"]) | |
| os.remove(path) | |
| except Exception as e: | |
| self.logger.warning(f"Voice reply generation failed: {e}") | |
| def _task_joke(self, task: Dict[str, Any]): | |
| try: | |
| j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
| joke = f"{j['setup']}\n\n{j['punchline']}" | |
| except Exception: | |
| joke = generate_llm("Tell me a short, clean joke.") | |
| self._task_send_text({"type": "send_text", **task, "message": f"π {joke}"}) | |
| def _task_inspire(self, task: Dict[str, Any]): | |
| quote = generate_llm("Give me a unique, short, uplifting inspirational quote with attribution.") | |
| self._task_send_text({"type": "send_text", **task, "message": f"β¨ {quote}"}) | |
| def _task_weather(self, task: Dict[str, Any]): | |
| location = task.get("location", "New York") | |
| try: | |
| raw = requests.get(f"http://wttr.in/{location.replace(' ', '+')}?format=4", timeout=10).text | |
| report = generate_llm(f"Create a friendly weather report in Celsius from this data:\n\n{raw}") | |
| self._task_send_text({"type": "send_text", **task, "message": f"π€οΈ Weather for {location}:\n{report}"}) | |
| except Exception as e: | |
| self.logger.error(f"Weather task failed: {e}") | |
| self.api_client.send_message(task["chat_id"], "Sorry, I couldn't get the weather.", task["message_id"]) | |
| def run(self): | |
| """Starts the bot and FastAPI server.""" | |
| self.logger.info("Starting Eve WhatsApp Bot...") | |
| for d in [self.config.IMAGE_DIR, self.config.AUDIO_DIR, self.config.TEMP_DIR]: | |
| os.makedirs(d, exist_ok=True) | |
| self.logger.info(f"Ensured directory exists: {d}") | |
| self.api_client.send_message( | |
| "120363312903494448@g.us", | |
| "π Eve is online and ready to help! Type /help to see commands." | |
| ) | |
| uvicorn.run(self.fastapi_app, host="0.0.0.0", port=7860) | |
| if __name__ == "__main__": | |
| try: | |
| config = BotConfig() | |
| executor = ThreadPoolExecutor(max_workers=config.WORKER_THREADS * 2) | |
| bot = WhatsAppBot(config) | |
| bot.run() | |
| except ValueError as e: | |
| # Catch config validation errors | |
| print(f"β CONFIGURATION ERROR: {e}") | |
| except KeyboardInterrupt: | |
| print("\nπ Bot stopped by user.") | |
| except Exception as e: | |
| print(f"β A fatal error occurred: {e}") | |