Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import asyncio | |
| import logging | |
| import threading | |
| import queue | |
| import gradio as gr | |
| import httpx | |
| from typing import Generator, Any, Dict, List | |
| # -------------------- Configuration -------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| # -------------------- External Model Call -------------------- | |
| async def call_model(prompt: str, model: str = "gpt-4o-mini", api_key: str = None) -> str: | |
| """ | |
| Sends a prompt to the OpenAI API endpoint using the specified model (overridden to gpt-4o-mini) | |
| and returns the generated response. | |
| """ | |
| # Use the provided API key or fall back to the environment variable | |
| if api_key is None: | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| url = "https://api.openai.com/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| # Override the model value to always be "gpt-4o-mini" | |
| payload = { | |
| "model": "gpt-4o-mini", | |
| "messages": [{"role": "user", "content": prompt}], | |
| } | |
| async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client: | |
| response = await client.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| response_json = response.json() | |
| return response_json["choices"][0]["message"]["content"] | |
| # -------------------- Agent Classes -------------------- | |
| class PromptOptimizerAgent: | |
| async def optimize_prompt(self, user_prompt: str, api_key: str) -> str: | |
| """ | |
| Optimizes the user's initial prompt according to the following instructions: | |
| >>> Given the user's initial prompt below the ### characters please enhance it. | |
| 1. Start with clear, precise instructions placed at the beginning of the prompt. | |
| 2. Include specific details about the desired context, outcome, length, format, and style. | |
| 3. Provide examples of the desired output format, if possible. | |
| 4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. | |
| 5. Avoid any vague or imprecise language. | |
| 6. Rather than only stating what not to do, provide guidance on what should be done instead. | |
| Remember to ensure the revised prompt remains true to the user's original intent. <<< | |
| ###User initial prompt below ### | |
| """ | |
| system_prompt = ( | |
| "Given the user's initial prompt below the ### characters please enhance it. " | |
| "1. Start with clear, precise instructions placed at the beginning of the prompt. " | |
| "2. Include specific details about the desired context, outcome, length, format, and style. " | |
| "3. Provide examples of the desired output format, if possible. " | |
| "4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. " | |
| "5. Avoid any vague or imprecise language. " | |
| "6. Rather than only stating what not to do, provide guidance on what should be done instead. " | |
| "Remember to ensure the revised prompt remains true to the user's original intent. " | |
| "###User initial prompt ###" | |
| ) | |
| full_prompt = f"{system_prompt}\n{user_prompt}\n<<<" | |
| optimized = await call_model(full_prompt, api_key=api_key) | |
| return optimized | |
| class OrchestratorAgent: | |
| def __init__(self, log_queue: queue.Queue) -> None: | |
| self.log_queue = log_queue | |
| async def generate_plan(self, task: str, api_key: str) -> str: | |
| """ | |
| Generates a detailed, step-by-step plan for completing the given task. | |
| """ | |
| prompt = ( | |
| f"You are an orchestrator agent. The user has provided the task: '{task}'.\n" | |
| "Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, " | |
| "a code reviewer agent, and a documentation agent. List the steps as bullet points." | |
| ) | |
| plan = await call_model(prompt, api_key=api_key) | |
| return plan | |
| class CoderAgent: | |
| async def generate_code(self, instructions: str, api_key: str) -> str: | |
| """ | |
| Generates code based on the given instructions. | |
| """ | |
| prompt = ( | |
| "You are a coder agent. Based on the following instructions, generate the requested code. " | |
| "Only output the generated code, never any explanations or any other information besides the actual code!\n" | |
| f"{instructions}\n" | |
| ) | |
| code = await call_model(prompt, api_key=api_key) | |
| return code | |
| class CodeReviewerAgent: | |
| async def review_code(self, code: str, task: str, api_key: str) -> str: | |
| """ | |
| Reviews the provided code to check if it meets the task specifications. | |
| NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct. | |
| """ | |
| prompt = ( | |
| "You are a code reviewing agent highly skilled in evaluating code quality. " | |
| "Review the provided code and check if it meets the task specifications and properly addresses any provided feedback. " | |
| "NEVER generate any code yourself! Respond only with feedback or with 'APPROVE' if everything is correct. " | |
| "Do not mention 'APPROVE' before actually approving! Do not request documentation or user guides:\n" | |
| f"Task: {task}\n" | |
| f"Code:\n{code}\n\n" | |
| ) | |
| review = await call_model(prompt, api_key=api_key) | |
| return review | |
| class DocumentationAgent: | |
| async def generate_documentation(self, code: str, api_key: str) -> str: | |
| """ | |
| Generates clear and concise documentation for the approved code, | |
| including a brief and concise --help message. | |
| """ | |
| prompt = ( | |
| "You are a documentation agent. Generate a brief, clear and concise documentation for the following approved code. " | |
| "Keep it short and compact, focusing on the main elements, do not include unnecessary extras that limit readability. " | |
| "Additionally, generate a brief and concise --help message for the code:\n" | |
| f"{code}\n" | |
| "Briefly explain what the code does and how it works. Make sure to be clear and concise, do not include unnecessary extras that limit readability." | |
| ) | |
| documentation = await call_model(prompt, api_key=api_key) | |
| return documentation | |
| # -------------------- Multi-Agent Conversation -------------------- | |
| async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str) -> None: | |
| """ | |
| Conducts a multi-agent conversation where each agent's response is generated via the external model API. | |
| The conversation is logged to the provided queue. | |
| """ | |
| conversation: List[Dict[str, str]] = [] # List to store each agent's message | |
| # Step 0: Use Prompt Optimizer to enhance the user's initial prompt. | |
| log_queue.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...") | |
| prompt_optimizer = PromptOptimizerAgent() | |
| optimized_task = await prompt_optimizer.optimize_prompt(task_message, api_key=api_key) | |
| conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"}) | |
| log_queue.put(f"[Prompt Optimizer]: Optimized task prompt:\n{optimized_task}") | |
| # Step 1: Orchestrator generates a plan based on the optimized task. | |
| log_queue.put("[Orchestrator]: Received optimized task. Generating plan...") | |
| orchestrator = OrchestratorAgent(log_queue) | |
| plan = await orchestrator.generate_plan(optimized_task, api_key=api_key) | |
| conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"}) | |
| log_queue.put(f"[Orchestrator]: Plan generated:\n{plan}") | |
| # Step 2: Coder generates code based on the plan. | |
| coder = CoderAgent() | |
| coder_instructions = f"Implement the task as described in the following plan:\n{plan}" | |
| log_queue.put("[Coder]: Received coding task from the Orchestrator.") | |
| code = await coder.generate_code(coder_instructions, api_key=api_key) | |
| conversation.append({"agent": "Coder", "message": f"Code:\n{code}"}) | |
| log_queue.put(f"[Coder]: Code generated:\n{code}") | |
| # Step 3: Code Reviewer reviews the generated code. | |
| reviewer = CodeReviewerAgent() | |
| approval_keyword = "approve" | |
| revision_iteration = 0 | |
| while True: | |
| if revision_iteration == 0: | |
| log_queue.put("[Code Reviewer]: Starting review of the generated code...") | |
| else: | |
| log_queue.put(f"[Code Reviewer]: Reviewing the revised code (Iteration {revision_iteration})...") | |
| review = await reviewer.review_code(code, optimized_task, api_key=api_key) | |
| conversation.append({"agent": "Code Reviewer", "message": f"Review (Iteration {revision_iteration}):\n{review}"}) | |
| log_queue.put(f"[Code Reviewer]: Review feedback (Iteration {revision_iteration}):\n{review}") | |
| # Check if the code has been approved | |
| if approval_keyword in review.lower(): | |
| log_queue.put("[Code Reviewer]: Code approved.") | |
| break # Exit the loop if approved | |
| # If not approved, increment the revision count. | |
| revision_iteration += 1 | |
| # Kill-switch: After 5 generations without approval, shut down. | |
| if revision_iteration >= 5: | |
| log_queue.put("Unable to solve your task to full satisfaction :(") | |
| sys.exit("Unable to solve your task to full satisfaction :(") | |
| # If under the limit, instruct the coder to revise the code. | |
| log_queue.put(f"[Orchestrator]: Code not approved. Instructing coder to revise the code (Iteration {revision_iteration}).") | |
| update_instructions = f"Please revise the code according to the following feedback. Feedback: {review}" | |
| revised_code = await coder.generate_code(update_instructions, api_key=api_key) | |
| conversation.append({"agent": "Coder", "message": f"Revised Code (Iteration {revision_iteration}):\n{revised_code}"}) | |
| log_queue.put(f"[Coder]: Revised code submitted (Iteration {revision_iteration}):\n{revised_code}") | |
| code = revised_code # Update the code for the next review iteration | |
| # Step 4: Documentation Agent generates documentation for the approved code. | |
| doc_agent = DocumentationAgent() | |
| log_queue.put("[Documentation Agent]: Generating documentation for the approved code.") | |
| documentation = await doc_agent.generate_documentation(code, api_key=api_key) | |
| conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{documentation}"}) | |
| log_queue.put(f"[Documentation Agent]: Documentation generated:\n{documentation}") | |
| log_queue.put("Multi-agent conversation complete.") | |
| log_queue.put(("result", conversation)) | |
| # -------------------- Process Generator for Streaming -------------------- | |
| def process_conversation_generator(task_message: str, api_key: str) -> Generator[str, None, None]: | |
| """ | |
| Wraps the asynchronous multi-agent conversation and yields log messages as they are generated. | |
| """ | |
| log_q: queue.Queue = queue.Queue() | |
| def run_conversation() -> None: | |
| asyncio.run(multi_agent_conversation(task_message, log_q, api_key)) | |
| thread = threading.Thread(target=run_conversation) | |
| thread.start() | |
| final_result = None | |
| # Yield log messages as long as the thread is running or the queue is not empty. | |
| while thread.is_alive() or not log_q.empty(): | |
| try: | |
| msg = log_q.get(timeout=0.1) | |
| if isinstance(msg, tuple) and msg[0] == "result": | |
| final_result = msg[1] | |
| yield "Final conversation complete." | |
| else: | |
| yield msg | |
| except queue.Empty: | |
| continue | |
| thread.join() | |
| if final_result: | |
| # Format the final conversation log. | |
| conv_text = "\n========== Multi-Agent Conversation ==========\n" | |
| for entry in final_result: | |
| conv_text += f"[{entry['agent']}]: {entry['message']}\n\n" | |
| yield conv_text | |
| # -------------------- Chat Function for Gradio -------------------- | |
| def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]: | |
| """ | |
| Chat function for Gradio. | |
| The user's message is interpreted as the task description. | |
| An optional OpenAI API key can be provided via the additional input; if not provided, the environment variable is used. | |
| This function streams the multi-agent conversation log messages. | |
| """ | |
| if not openai_api_key: | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| yield from process_conversation_generator(message, openai_api_key) | |
| # -------------------- Launch the Chatbot -------------------- | |
| # Use Gradio's ChatInterface with an additional input field for the OpenAI API key. | |
| iface = gr.ChatInterface( | |
| fn=multi_agent_chat, | |
| additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")], | |
| type="messages", | |
| title="Actual Multi-Agent Conversation Chatbot", | |
| description="""Collaborative workflow between Primary, Critic, and Documentation agents: | |
| "1. Enter a task and observe as your prompt gets magically improved by the Prompt-Enhancing agent before reaching the | |
| 2. Orchestrator, who then devises a plan and enlists the assistance of a | |
| 3. Coder agent that writes code which is iteratively improved upon thanks to the | |
| 4. Code Reviewer who finally decides if the code should be approved and get sent off to the | |
| 5. Code Documentation Generator that will write documentation for your freshly generated code!" | |
| "NOTE: The full conversation log will be displayed at the end, showing all the steps taken!" | |
| "NOTE2: If the Coder is unable to satisfactorily complete the task after five attempts, the script will terminate to prevent endless iterations. | |
| "NOTE3: You will have to input your OPENAI_API_KEY at the bottom of the page for this to work!""" | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |