Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import asyncio | |
| import logging | |
| import threading | |
| import queue | |
| import gradio as gr | |
| import httpx | |
| from typing import Generator, Any, Dict, List | |
| logger = logging.getLogger(__name__) | |
| class Agent(ABC): | |
| async def generate_response(self, prompt: str, api_key: str) -> str: | |
| pass | |
| class PromptOptimizerAgent(Agent): | |
| async def generate_response(self, prompt: str, api_key: str) -> str: | |
| system_prompt = ( | |
| "Given the user's initial prompt below the ### characters please enhance it. " | |
| "1. Start with clear, precise instructions placed at the beginning of the prompt. " | |
| "2. Include specific details about the desired context, outcome, length, format, and style. " | |
| "3. Provide examples of the desired output format, if possible. " | |
| "4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. " | |
| "5. Avoid any vague or imprecise language. " | |
| "6. Rather than only stating what not to do, provide guidance on what should be done instead. " | |
| "Remember to ensure the revised prompt remains true to the user's original intent. " | |
| ###User initial prompt### | |
| ) | |
| return await call_openai(system_prompt, api_key) | |
| class OrchestratorAgent(Agent): | |
| async def generate_response(self, task_message: str, api_key: str) -> str: | |
| plan = f"You are an orchestrator agent. The user has provided the task: '{task_message}'. Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, a code reviewer agent, and a documentation agent. List the steps as bullet points." | |
| return await call_openai(plan, api_key) | |
| class CoderAgent(Agent): | |
| async def generate_response(self, instructions: str, api_key: str) -> str: | |
| prompt = f"Implement the task as described in the following plan:\n{instructions}" | |
| return await call_openai(prompt, api_key) | |
| class CodeReviewerAgent(Agent): | |
| async def generate_response(self, code: str, task: str, api_key: str) -> str: | |
| feedback = await call_openai( | |
| f"You are a code reviewer agent. Review the provided code: '{code}' and check if it meets the task specifications.", | |
| api_key=api_key | |
| ) | |
| return feedback | |
| class DocumentationAgent(Agent): | |
| async def generate_response(self, code: str, api_key: str) -> str: | |
| prompt = f"You are a documentation agent. Generate a brief documentation for the code:\nCode:\n{code}" | |
| return await call_openai(prompt, api_key) | |
| async def process_conversation_generator(conversation: list, log_q: queue.Queue, api_key: str) -> Generator[str, None, None]: | |
| try: | |
| while True: | |
| if not conversation or not log_q.get(timeout=0.1): | |
| continue | |
| msg = log_q.get(timeout=0.1) | |
| if isinstance(msg, tuple) and msg[0] == "result": | |
| final_result = msg[1] | |
| break | |
| yield msg | |
| except asyncio.CancelledError: | |
| pass | |
| finally: | |
| if log_q.empty(): | |
| log_q.put("Final conversation complete.") | |
| async def multi_agent_conversation( | |
| task_message: str, | |
| log_q: queue.Queue, | |
| api_key: str, | |
| additional_inputs=None | |
| ) -> None: | |
| if additional_inputs is None: | |
| additional_inputs = [gr.Textbox(label="OpenAI API Key (optional)")] | |
| agents = [ | |
| PromptOptimizerAgent(), | |
| OrchestratorAgent(), | |
| CoderAgent(), | |
| CodeReviewerAgent(), | |
| DocumentationAgent() | |
| ] | |
| log_queue = queue.Queue() | |
| run_conversation = None | |
| async def run_conversation_thread() -> None: | |
| nonlocal run_conversation | |
| try: | |
| if run_conversation is not None: | |
| await run_conversation | |
| except asyncio.CancelledError: | |
| pass | |
| thread = asyncio.to_thread(run_conversation_thread) | |
| thread.start() | |
| try: | |
| conversation = [] | |
| log_q.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...") | |
| # Step 0: Use Prompt Optimizer | |
| optimized_task = await agents[0].generate_response(task_message, api_key) | |
| conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"}) | |
| log_q.put(f"[Prompt Optimizer]: Optimized Task:\n{optimized_task}") | |
| # Step 1: Generate Plan | |
| plan = await agents[1].generate_response(optimized_task, api_key) | |
| conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"}) | |
| log_q.put(f"[Orchestrator]: Plan generated:\n{plan}") | |
| # Step 2: Generate Code | |
| code = await agents[2].generate_response(plan, api_key) | |
| conversation.append({"agent": "Coder", "message": f"Code:\n{code}"}) | |
| log_q.put(f"[Coder]: Code generated:\n{code}") | |
| # Step 3: Code Review | |
| code_review = None | |
| iteration = 0 | |
| while True: | |
| if iteration >= 5: | |
| log_q.put("[Code Reviewer]: Code not approved after 5 iterations: terminating.") | |
| break | |
| code_review = await agents[3].generate_response(code, plan, api_key) | |
| revised_code = await agents[2].generate_response( | |
| f"Please revise the code according to the following feedback: {code_review}", | |
| api_key | |
| ) | |
| code = revised_code | |
| iteration += 1 | |
| if code == revised_code: | |
| break | |
| log_q.put(f"[Code Reviewer]: Feedback received:\n{code_review}") | |
| log_q.put(f"[Code Reviewer]: Revised code:\n{revised_code}") | |
| # Step 4: Documentation | |
| doc = await agents[4].generate_response(code, api_key) | |
| conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{doc}"}) | |
| log_q.put(f"[Documentation Agent]: Documentation generated:\n{doc}") | |
| except Exception as e: | |
| log_q.put(f"[All Agents]: An error occurred: {str(e)}") | |
| logger.error(f"Error in multi_agent_conversation: {str(e)}") | |
| finally: | |
| thread.join() | |
| async def multi_agent_conversation_wrapper(task_message: str, api_key: str) -> None: | |
| await multi_agent_conversation( | |
| task_message, | |
| log_q=queue.Queue(), | |
| api_key=api_key, | |
| additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)") if api_key is None else None] | |
| ) | |
| if __name__ == "__main__": | |
| iface = gr.ChatInterface( | |
| fn=multi_agent_conversation_wrapper, | |
| additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)")], | |
| type="messages", | |
| title="Actual Multi-Agent Conversation Chatbot", | |
| description=""" | |
| - Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Agent agents. | |
| - Enter a task description to observe the iterative workflow between the agents. | |
| - NOTE: The kill-switch mechanism will terminate after five code rejection iterations to prevent endless loops. | |
| - NOTE3: You can input your OPENAI_API_KEY at the bottom of the page for this to work! | |
| """, | |
| ) | |
| iface.launch() | |