Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from huggingface_hub import InferenceClient, HfHubHTTPError | |
| import os | |
| import re | |
| import traceback | |
| # --- Configuration --- | |
| API_TOKEN = os.getenv("HF_TOKEN", None) | |
| # MODEL = "Qwen/Qwen3-32B" # This is a very large model, might require specific inference endpoint/hardware | |
| # Let's try a smaller, generally available model for testing first, e.g., Mixtral | |
| # You can change this back if you are sure Qwen3-32B is available and configured for your space/token | |
| # MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
| # Or uncomment the Qwen model if you are certain it's correctly set up for inference: | |
| MODEL = "Qwen/Qwen3-32B" | |
| # i have used Qwen3 because its quiet compatible | |
| # --- Hugging Face Client Initialization --- | |
| print("--- App Start ---") | |
| if not API_TOKEN: | |
| print("Warning: HF_TOKEN environment variable not set. Using anonymous access.") | |
| print("Certain models might require a token for access.") | |
| else: | |
| print(f"HF_TOKEN found (length={len(API_TOKEN)}).") # Don't print the token itself | |
| try: | |
| print(f"Initializing Inference Client for model: {MODEL}") | |
| # Explicitly pass token=None if not found, though InferenceClient handles it. | |
| client = InferenceClient(model=MODEL, token=API_TOKEN if API_TOKEN else None) | |
| print("Inference Client Initialized Successfully.") | |
| # Optional: Add a quick test call if feasible, but be mindful of potential costs/rate limits | |
| # try: | |
| # client.text_generation("test", max_new_tokens=1) | |
| # print("Test generation successful.") | |
| # except Exception as test_e: | |
| # print(f"Warning: Test generation failed. Client might be initialized but model access could be problematic. Error: {test_e}") | |
| except HfHubHTTPError as http_err: | |
| # More specific error handling for HTTP errors (like 401 Unauthorized, 403 Forbidden, 404 Not Found) | |
| error_message = ( | |
| f"Failed to initialize model client for {MODEL} due to an HTTP error.\n" | |
| f"Status Code: {http_err.response.status_code}\n" | |
| f"Error: {http_err}\n" | |
| f"Check:\n" | |
| f"1. If '{MODEL}' is a valid model ID on Hugging Face Hub.\n" | |
| f"2. If the model requires gating or specific permissions.\n" | |
| f"3. If your HF_TOKEN is correct and has the necessary permissions (set as a Secret in your Space).\n" | |
| f"4. If the default Inference API supports this model or if a dedicated Inference Endpoint is needed." | |
| ) | |
| print(f"ERROR: {error_message}") | |
| raise gr.Error(error_message) | |
| except Exception as e: | |
| error_message = ( | |
| f"An unexpected error occurred while initializing the model client for {MODEL}.\n" | |
| f"Error Type: {type(e).__name__}\n" | |
| f"Error: {e}\n" | |
| f"Traceback:\n{traceback.format_exc()}\n" # Add traceback | |
| f"Check HF_TOKEN, model availability, network connection, and Space resources." | |
| ) | |
| print(f"ERROR: {error_message}") | |
| raise gr.Error(error_message) | |
| # --- Helper Functions --- | |
| # Parse all ```filename.ext\n<code>``` blocks | |
| def parse_code_blocks(response: str) -> list: | |
| pattern = r"```([^\n]+)\n(.*?)```" | |
| blocks = re.findall(pattern, response, re.DOTALL) | |
| files = [] | |
| for filename, code in blocks: | |
| filename = filename.strip() | |
| code = code.strip() | |
| # Basic language detection (can be expanded) | |
| lang = None | |
| if filename.endswith(".py"): | |
| lang = "python" | |
| elif filename.endswith(".js"): | |
| lang = "javascript" | |
| elif filename.endswith(".html"): | |
| lang = "html" | |
| elif filename.endswith(".css"): | |
| lang = "css" | |
| elif filename.endswith(".json"): | |
| lang = "json" | |
| elif filename.endswith(".md"): | |
| lang = "markdown" | |
| elif filename.endswith(".sh") or filename.endswith(".bash"): | |
| lang = "bash" | |
| elif filename.endswith(".java"): | |
| lang = "java" | |
| # Add more extensions as needed | |
| files.append({ | |
| "filename": filename, | |
| "language": lang, | |
| "code": code | |
| }) | |
| # Add logging to see what's parsed | |
| # print(f"Parsed {len(files)} code blocks.") | |
| # for i, f in enumerate(files): | |
| # print(f" Block {i}: filename='{f['filename']}', lang='{f['language']}', code_len={len(f['code'])}") | |
| return files | |
| def strip_think_tags(text: str) -> str: | |
| return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() | |
| def extract_thoughts(text: str) -> str: | |
| matches = re.findall(r"<think>(.*?)</think>", text, flags=re.DOTALL) | |
| # Only return the last thought block for cleaner display? Or join all? Let's join. | |
| return "\n---\n".join(match.strip() for match in matches).strip() | |
| # --- System Message --- | |
| system_message = ( | |
| "You are a helpful AI assistant specialized in generating website code. " | |
| "Generate all the necessary files based on the user's request. " | |
| "Output each file within a separate markdown code block formatted exactly like this:\n" | |
| "```filename.ext\n" | |
| "<code>\n" | |
| "```\n" | |
| "Do not add any explanatory text outside the code blocks. Ensure the filenames have appropriate extensions. " | |
| "If you need to think step-by-step, use <think>...</think> tags. These tags will be hidden from the final user output but help guide your generation process." | |
| ) | |
| # --- Code Generation Function --- | |
| def generate_code(prompt, backend_choice, max_tokens, temperature, top_p): | |
| if not prompt: | |
| # Handle empty prompt case | |
| yield [], gr.update(value="Please enter a description for the website.", visible=True) | |
| return | |
| # Use f-string formatting for clarity | |
| user_prompt = f"USER_PROMPT: {prompt}\nUSER_BACKEND_PREFERENCE: {backend_choice}" | |
| messages = [ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| full_response = "" | |
| current_thoughts = "" | |
| accumulated_error = "" # Accumulate errors during stream | |
| # Reset outputs: Clear previous code blocks and show/clear thinking box | |
| # Yield an empty list to the gr.Column to clear it. | |
| # Make thinking box visible but empty. | |
| yield [], gr.update(visible=True, value="Generating code...") | |
| print(f"\n--- Generating Code ---") | |
| print(f"Prompt: {prompt[:100]}...") # Log truncated prompt | |
| print(f"Backend: {backend_choice}, Max Tokens: {max_tokens}, Temp: {temperature}, Top-P: {top_p}") | |
| try: | |
| stream = client.chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| stream=True, | |
| temperature=temperature if temperature > 0 else 0.01, # Ensure temp is positive | |
| top_p=top_p, | |
| # Consider adding stop sequences if the model tends to run on | |
| # stop=["```\n\n", "\n\nHuman:", "\n\nUSER:"] # Example stop sequences | |
| ) | |
| code_updates = [] # Store the gr.Code components to yield | |
| for i, message in enumerate(stream): | |
| # Check for errors in the stream message (some providers might include error info) | |
| if hasattr(message, 'error') and message.error: | |
| accumulated_error += f"Error in stream chunk {i}: {message.error}\n" | |
| print(f"ERROR in stream chunk {i}: {message.error}") | |
| continue # Skip this chunk if it's an error indicator | |
| # Ensure the path to content is correct | |
| try: | |
| # Common path: message.choices[0].delta.content | |
| token = message.choices[0].delta.content | |
| # Handle potential None token at the end of the stream or in error cases | |
| if token is None: | |
| token = "" | |
| # print(f"Token {i}: '{token}'") # DEBUG: print each token | |
| except (AttributeError, IndexError, TypeError) as e: | |
| # Handle unexpected message structure | |
| print(f"Warning: Could not extract token from stream message {i}. Structure: {message}. Error: {e}") | |
| token = "" # Assign empty string to avoid breaking accumulation | |
| if isinstance(token, str): | |
| full_response += token | |
| # Update thinking box periodically (e.g., every 10 tokens or if thoughts change) | |
| if i % 10 == 0 or "<think>" in token or "</think>" in token: | |
| thoughts = extract_thoughts(full_response) | |
| if thoughts != current_thoughts: | |
| current_thoughts = thoughts | |
| # Don't yield code_updates here yet, only update thoughts | |
| yield code_updates, gr.update(value=current_thoughts if current_thoughts else "Thinking...", visible=True) | |
| # Update code blocks less frequently or when a block seems complete | |
| # Heuristic: update if the response ends with ``` | |
| if token.strip().endswith("```") or i % 20 == 0: # Adjust frequency as needed | |
| cleaned_response = strip_think_tags(full_response) | |
| parsed_files = parse_code_blocks(cleaned_response) | |
| # Create gr.Code components for the parsed files | |
| # Compare with existing code_updates to avoid redundant updates if content hasn't changed significantly | |
| new_code_updates = [] | |
| changed = False | |
| if len(parsed_files) != len(code_updates): | |
| changed = True | |
| else: | |
| # Quick check if filenames/code lengths differ significantly | |
| for idx, f in enumerate(parsed_files): | |
| if (idx >= len(code_updates) or | |
| f["filename"] != code_updates[idx].label or | |
| len(f["code"]) != len(code_updates[idx].value)): # Simple length check | |
| changed = True | |
| break | |
| if changed or not code_updates: # Update if changed or first time | |
| code_updates = [] | |
| for f in parsed_files: | |
| code_updates.append( | |
| gr.Code( | |
| value=f["code"], | |
| label=f["filename"], | |
| language=f["language"] | |
| ) | |
| ) | |
| # Yield the list of gr.Code components to the gr.Column | |
| # Also update thoughts (might be slightly out of sync, but acceptable) | |
| yield code_updates, gr.update(value=current_thoughts if current_thoughts else "Thinking...", visible=True) | |
| # --- Final Update after Stream Ends --- | |
| print("Stream finished.") | |
| if accumulated_error: | |
| print(f"Errors occurred during stream:\n{accumulated_error}") | |
| # Decide how to show this to the user, e.g., append to thoughts or show separately | |
| current_thoughts += f"\n\n**Streaming Errors:**\n{accumulated_error}" | |
| cleaned_response = strip_think_tags(full_response) | |
| final_files = parse_code_blocks(cleaned_response) | |
| print(f"Final parsed files: {len(final_files)}") | |
| final_code_updates = [] | |
| if not final_files and not accumulated_error: | |
| # Handle case where no code blocks were generated | |
| final_code_updates.append(gr.Markdown("No code blocks were generated. The model might have responded with text instead, or the format was incorrect.")) | |
| print("Warning: No code blocks found in the final response.") | |
| # Optionally show the raw response for debugging | |
| # final_code_updates.append(gr.Code(label="Raw Response", value=cleaned_response, language="text")) | |
| elif not final_files and accumulated_error: | |
| final_code_updates.append(gr.Markdown(f"**Error during generation:**\n{accumulated_error}")) | |
| else: | |
| for f in final_files: | |
| final_code_updates.append( | |
| gr.Code( | |
| value=f["code"], | |
| label=f["filename"], | |
| language=f["language"] | |
| ) | |
| ) | |
| # Yield final code blocks and hide thinking box (or show final thoughts/errors) | |
| final_thought_update = gr.update(visible=True if current_thoughts else False, value=current_thoughts) | |
| yield final_code_updates, final_thought_update | |
| except HfHubHTTPError as http_err: | |
| # Handle errors during the streaming call itself | |
| error_message = ( | |
| f"**Error during code generation (HTTP Error):**\n" | |
| f"Status Code: {http_err.response.status_code}\n" | |
| f"Error: {http_err}\n" | |
| f"This could be due to rate limits, invalid input, model errors, or token issues.\n" | |
| f"Check the Hugging Face Space logs for more details." | |
| ) | |
| print(f"ERROR: {error_message}") | |
| print(traceback.format_exc()) | |
| # Yield error message in the output area | |
| yield [gr.Markdown(error_message)], gr.update(visible=False) # Hide thinking box on error | |
| except Exception as e: | |
| error_message = ( | |
| f"**An unexpected error occurred during code generation:**\n" | |
| f"Error Type: {type(e).__name__}\n" | |
| f"Error: {e}\n\n" | |
| f"**Traceback:**\n```\n{traceback.format_exc()}\n```\n" | |
| f"Check the Hugging Face Space logs for more details." | |
| ) | |
| print(f"ERROR: {error_message}") | |
| # Yield error message in the output area | |
| yield [gr.Markdown(error_message)], gr.update(visible=False) # Hide thinking box on error | |
| # --- Gradio Interface --- | |
| with gr.Blocks(css=".gradio-container { max-width: 90% !important; }") as demo: | |
| gr.Markdown("# ✨ Website Code Generator ✨") | |
| gr.Markdown("Describe the website you want. Code files will appear below. Uses `mistralai/Mixtral-8x7B-Instruct-v0.1` by default (check code to change).") # Update description | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| prompt_input = gr.Textbox(label="Website Description", lines=6, placeholder="e.g., A simple landing page with a title, a paragraph, and a button linking to example.com") | |
| backend_radio = gr.Radio(["Static (HTML/CSS/JS)", "Flask", "Node.js"], label="Backend Preference (Influences AI)", value="Static (HTML/CSS/JS)") | |
| generate_button = gr.Button("✨ Generate Website Code", variant="primary") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| max_tokens_slider = gr.Slider(512, 8192, value=4096, step=256, label="Max New Tokens") # Increased max potential tokens | |
| temperature_slider = gr.Slider(0.0, 1.2, value=0.6, step=0.05, label="Temperature (0=deterministic, >1=more creative)") # Allow 0 | |
| top_p_slider = gr.Slider(0.1, 1.0, value=0.9, step=0.05, label="Top-P (Nucleus Sampling)") | |
| with gr.Column(scale=3): | |
| thinking_box = gr.Textbox(label="Model Activity / Thoughts", visible=False, interactive=False, lines=2) | |
| # Use gr.Column to hold the dynamic code blocks | |
| # Remove the update lambda, it's not needed for Column | |
| file_outputs = gr.Column(elem_id="code-output-area") | |
| generate_button.click( | |
| fn=generate_code, | |
| inputs=[prompt_input, backend_radio, max_tokens_slider, temperature_slider, top_p_slider], | |
| # Output to the Column and the Textbox | |
| outputs=[file_outputs, thinking_box], | |
| # api_name="generate_code" # Optional: for API access | |
| ) | |
| # --- Launch --- | |
| if __name__ == "__main__": | |
| print("Starting Gradio App...") | |
| # Use queue() for handling multiple users and streaming | |
| # Set share=False unless you specifically want a public link from local execution | |
| # Set debug=True for more detailed Gradio errors locally (remove/set False for production) | |
| demo.queue().launch(debug=False, share=False) | |
| print("Gradio App Launched.") |