Spaces:
Sleeping
Sleeping
| from smolagents import CodeAgent, tool | |
| import datetime | |
| import pytz | |
| import yaml | |
| import os | |
| import re | |
| import numpy as np | |
| from typing import List, Optional, Dict, Any | |
| import io | |
| from tools.final_answer import FinalAnswerTool | |
| from Gradio_UI import GradioUI | |
| # Text Analyzer Tool | |
| def text_analyzer(text: str) -> str: | |
| """Analyzes text and returns statistics about it. | |
| Args: | |
| text: The text to analyze. | |
| """ | |
| try: | |
| # Simple word count | |
| words = text.split() | |
| word_count = len(words) | |
| # Character count | |
| char_count = len(text) | |
| # Unique words | |
| unique_words = len(set(word.lower() for word in words)) | |
| # Average word length | |
| avg_word_length = sum(len(word) for word in words) / max(1, word_count) | |
| # Most common words (top 5) | |
| word_freq = {} | |
| for word in words: | |
| word_lower = word.lower() | |
| word_freq[word_lower] = word_freq.get(word_lower, 0) + 1 | |
| common_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5] | |
| common_words_str = ", ".join(f"{word} ({count})" for word, count in common_words) | |
| return f"""Text Analysis Results: | |
| - Word count: {word_count} | |
| - Character count: {char_count} | |
| - Unique words: {unique_words} | |
| - Average word length: {avg_word_length:.2f} | |
| - Most common words: {common_words_str} | |
| """ | |
| except Exception as e: | |
| return f"Error analyzing text: {str(e)}" | |
| # Timezone Tool | |
| def get_current_time_in_timezone(timezone: str) -> str: | |
| """A tool that fetches the current local time in a specified timezone. | |
| Args: | |
| timezone: A string representing a valid timezone (e.g., 'America/New_York'). | |
| """ | |
| try: | |
| # Create timezone object | |
| tz = pytz.timezone(timezone) | |
| # Get current time in that timezone | |
| local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S") | |
| return f"The current local time in {timezone} is: {local_time}" | |
| except Exception as e: | |
| return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
| # Simple vector embedding function using basic word frequency | |
| def get_embedding(text: str, normalize: bool = True) -> np.ndarray: | |
| """Create a simple bag-of-words embedding for the text""" | |
| # Lowercase and clean text | |
| text = text.lower() | |
| words = re.findall(r'\b\w+\b', text) | |
| # Create a basic vocabulary (this is very simplified) | |
| vocabulary = {} | |
| for word in words: | |
| if word not in vocabulary: | |
| vocabulary[word] = len(vocabulary) | |
| # Create vector | |
| vector = np.zeros(max(1, len(vocabulary))) | |
| for word in words: | |
| if word in vocabulary: | |
| vector[vocabulary[word]] += 1 | |
| # Normalize if requested | |
| if normalize and np.sum(vector) > 0: | |
| vector = vector / np.sqrt(np.sum(vector ** 2)) | |
| return vector | |
| def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: | |
| """Calculate cosine similarity between two vectors""" | |
| # Handle zero vectors | |
| if np.sum(a) == 0 or np.sum(b) == 0: | |
| return 0 | |
| return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) | |
| def extract_text_from_pdf_bytes(pdf_bytes: bytes) -> str: | |
| """Extract text from PDF bytes""" | |
| try: | |
| # First try to import PyPDF2 | |
| try: | |
| import PyPDF2 | |
| except ImportError: | |
| return "PDF processing requires PyPDF2 library which is not available." | |
| with io.BytesIO(pdf_bytes) as pdf_file: | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page_num in range(len(pdf_reader.pages)): | |
| page = pdf_reader.pages[page_num] | |
| text += page.extract_text() + "\n" | |
| return text | |
| except Exception as e: | |
| return f"Error extracting text from PDF: {str(e)}" | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| """Extract text from PDF file""" | |
| try: | |
| # First try to import PyPDF2 | |
| try: | |
| import PyPDF2 | |
| except ImportError: | |
| return "PDF processing requires PyPDF2 library which is not available." | |
| with open(file_path, 'rb') as pdf_file: | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page_num in range(len(pdf_reader.pages)): | |
| page = pdf_reader.pages[page_num] | |
| text += page.extract_text() + "\n" | |
| return text | |
| except Exception as e: | |
| return f"Error extracting text from PDF: {str(e)}" | |
| def semantic_search(corpus: str, query: str, top_k: int = 3, file_path: Optional[str] = None) -> str: | |
| """Performs semantic search on a corpus of text or uploaded PDF. | |
| Args: | |
| corpus: The text corpus to search within (could be a large text or list of documents). | |
| If empty and file_path is provided, will extract text from the PDF. | |
| query: The search query. | |
| top_k: Number of top results to return. | |
| file_path: Optional path to a PDF file to extract text from. | |
| """ | |
| try: | |
| final_corpus = corpus | |
| # Try to handle PDF file if specified | |
| if not corpus and file_path: | |
| # Check if file exists | |
| if os.path.exists(file_path): | |
| # Check if this is a PDF by extension | |
| if file_path.lower().endswith('.pdf'): | |
| pdf_text = extract_text_from_pdf(file_path) | |
| if pdf_text.startswith("Error") or pdf_text.startswith("PDF processing requires"): | |
| return pdf_text | |
| final_corpus = pdf_text | |
| else: | |
| # If not PDF, try to read as text | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| final_corpus = f.read() | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| else: | |
| return f"File not found: {file_path}" | |
| if not final_corpus: | |
| return "Error: No text corpus provided for search." | |
| # Split corpus into chunks/sentences for searching | |
| # This is a simple approach - in a real system you would use a more sophisticated chunking method | |
| chunks = re.split(r'(?<=[.!?])\s+', final_corpus) | |
| chunks = [chunk.strip() for chunk in chunks if len(chunk.strip()) > 10] | |
| if not chunks: | |
| return "No valid text chunks found in the corpus." | |
| # Get query embedding | |
| query_embedding = get_embedding(query) | |
| # Get embeddings for each chunk and calculate similarity | |
| results = [] | |
| for i, chunk in enumerate(chunks): | |
| chunk_embedding = get_embedding(chunk) | |
| similarity = cosine_similarity(query_embedding, chunk_embedding) | |
| results.append((i, chunk, similarity)) | |
| # Sort by similarity score (descending) | |
| results.sort(key=lambda x: x[2], reverse=True) | |
| # Format results | |
| output = f"Search results for: '{query}'\n\n" | |
| for i, (chunk_idx, chunk, score) in enumerate(results[:top_k]): | |
| # Truncate long chunks for display | |
| display_chunk = chunk | |
| if len(display_chunk) > 200: | |
| display_chunk = display_chunk[:197] + "..." | |
| output += f"{i+1}. [Score: {score:.2f}] {display_chunk}\n\n" | |
| if not results: | |
| output += "No matching results found." | |
| return output | |
| except Exception as e: | |
| return f"Error performing semantic search: {str(e)}" | |
| def list_available_tools() -> str: | |
| """Lists all available tools and provides usage examples for each.""" | |
| tools_documentation = """ | |
| # Available Tools | |
| This agent has the following tools available: | |
| ## 1. Text Analyzer | |
| Analyzes text and provides statistics including word count, character count, unique words count, average word length, and most common words. | |
| **Example usage:** | |
| - "Analyze this text: The quick brown fox jumps over the lazy dog." | |
| - "Give me statistics about this paragraph: Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua." | |
| ## 2. Current Time in Timezone | |
| Fetches the current local time for a specified timezone. | |
| **Example usage:** | |
| - "What time is it in Tokyo?" | |
| - "Get the current time in America/New_York" | |
| - "Tell me the time in UTC" | |
| ## 3. Semantic Search | |
| Performs semantic search on a corpus of text or uploaded PDF document to find the most relevant sections matching a query. | |
| **Example usage:** | |
| - "Search for 'climate change' in this text: Global warming is the long-term heating of Earth's surface observed since the pre-industrial period due to human activities, primarily fossil fuel burning, which increases heat-trapping greenhouse gas levels in Earth's atmosphere." | |
| - "If I have uploaded a PDF file called 'research.pdf', search for 'vaccination' in it" | |
| - "Find information about 'neural networks' in this text: [your long text here]" | |
| ## How to Use This Agent | |
| 1. Type your request in the chat box below | |
| 2. The agent will process your request and use the appropriate tool | |
| 3. Results will be displayed in this conversation area | |
| For complex tasks, you may need to provide additional context or data. Be as specific as possible in your requests. | |
| """ | |
| return tools_documentation | |
| # Set up the agent with our tools | |
| final_answer = FinalAnswerTool() | |
| with open("prompts.yaml", 'r') as stream: | |
| prompt_templates = yaml.safe_load(stream) | |
| from smolagents import HfApiModel | |
| model = HfApiModel( | |
| max_tokens=2096, | |
| temperature=0.5, | |
| model_id='Qwen/Qwen2.5-Coder-32B-Instruct', | |
| custom_role_conversions=None, | |
| ) | |
| # Create agent with our tools (including the new list_available_tools) | |
| agent = CodeAgent( | |
| model=model, | |
| tools=[text_analyzer, get_current_time_in_timezone, semantic_search, list_available_tools, final_answer], | |
| max_steps=6, | |
| verbosity_level=1, | |
| grammar=None, | |
| planning_interval=None, | |
| name=None, | |
| description=None, | |
| prompt_templates=prompt_templates | |
| ) | |
| # Launch the Gradio UI | |
| GradioUI(agent).launch() |