Spaces:
Running
Running
| import PyPDF2 | |
| from openpyxl import load_workbook | |
| from pptx import Presentation | |
| import gradio as gr | |
| import io | |
| from huggingface_hub import InferenceClient | |
| import re | |
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| # Constants | |
| CHUNK_SIZE = 32000 | |
| MAX_NEW_TOKENS = 4096 | |
| # Initialize the Mistral chat model | |
| client = InferenceClient("mistralai/Mistral-Nemo-Instruct-2407") | |
| # --- Utility Functions --- | |
| def xml2text(xml): | |
| """Extracts text from XML data.""" | |
| text = u'' | |
| root = ET.fromstring(xml) | |
| for child in root.iter(): | |
| text += child.text + " " if child.text is not None else '' | |
| return text | |
| def clean_text(content): | |
| """Cleans text content based on the 'clean' parameter.""" | |
| if clean: | |
| content = content.replace('\n', ' ') | |
| content = content.replace('\r', ' ') | |
| content = content.replace('\t', ' ') | |
| content = content.replace(' ', ' ') # Replace double spaces with single | |
| content = content.strip() | |
| return content | |
| def split_content(content, chunk_size=CHUNK_SIZE): | |
| """Splits content into chunks of a specified size.""" | |
| chunks = [] | |
| for i in range(0, len(content), chunk_size): | |
| chunks.append(content[i:i + chunk_size]) | |
| return chunks | |
| # --- Document Reading Functions --- | |
| def extract_text_from_docx(docx_data, clean=True): | |
| """Extracts text from DOCX files.""" | |
| text = u'' | |
| zipf = zipfile.ZipFile(io.BytesIO(docx_data)) | |
| filelist = zipf.namelist() | |
| header_xmls = 'word/header[0-9]*.xml' | |
| for fname in filelist: | |
| if re.match(header_xmls, fname): | |
| text += xml2text(zipf.read(fname)) | |
| doc_xml = 'word/document.xml' | |
| text += xml2text(zipf.read(doc_xml)) | |
| footer_xmls = 'word/footer[0-9]*.xml' | |
| for fname in filelist: | |
| if re.match(footer_xmls, fname): | |
| text += xml2text(zipf.read(fname)) | |
| zipf.close() | |
| if clean | |
| text = clean_text(text) | |
| return text, len(text) | |
| def read_document(file, clean=True): | |
| """Reads content from various document formats.""" | |
| file_path = file.name | |
| file_extension = file_path.split('.')[-1].lower() | |
| with open(file_path, "rb") as f: | |
| file_content = f.read() | |
| if file_extension == 'pdf': | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_content)) | |
| content = '' | |
| for page in range(len(pdf_reader.pages)): | |
| content += pdf_reader.pages[page].extract_text() | |
| if clean: | |
| content = clean_text(content) | |
| return content, len(content) | |
| except Exception as e: | |
| return f"Error reading PDF: {e}", 0 | |
| elif file_extension == 'xlsx': | |
| try: | |
| wb = load_workbook(io.BytesIO(file_content)) | |
| content = '' | |
| for sheet in wb.worksheets: | |
| for row in sheet.rows: | |
| for cell in row: | |
| if cell.value is not None: | |
| content += str(cell.value) + ' ' | |
| if clean | |
| content = clean_text(content) | |
| return content, len(content) | |
| except Exception as e: | |
| return f"Error reading XLSX: {e}", 0 | |
| elif file_extension == 'pptx': | |
| try: | |
| presentation = Presentation(io.BytesIO(file_content)) | |
| content = '' | |
| for slide in presentation.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| content += shape.text + ' ' | |
| if clean: | |
| content = clean_text(content) | |
| return content, len(content) | |
| except Exception as e: | |
| return f"Error reading PPTX: {e}", 0 | |
| elif file_extension == 'doc' or file_extension == 'docx': | |
| try: | |
| return extract_text_from_docx(file_content, clean) | |
| except Exception as e: | |
| return f"Error reading DOC/DOCX: {e}", 0 | |
| else: | |
| try: | |
| content = file_content.decode('utf-8') | |
| if clean: | |
| content = clean_text(content) | |
| return content, len(content) | |
| except Exception as e: | |
| return f"Error reading file: {e}", 0 | |
| # --- Chat Functions --- | |
| def generate_mistral_response(message): | |
| """Generates a response from the Mistral API.""" | |
| stream = client.text_generation( | |
| message, | |
| max_new_tokens=MAX_NEW_TOKENS, | |
| stream=True, | |
| details=True, | |
| return_full_text=False | |
| ) | |
| output = "" | |
| for response in stream: | |
| if not response.token.text == "</s>": | |
| output += response.token.text | |
| yield output | |
| def chat_document(file, question, clean=True): | |
| """Chats with a document using a single Mistral API call.""" | |
| content, length = read_document(file, clean) | |
| if length > CHUNK_SIZE: | |
| content = content[:CHUNK_SIZE] # Limit to max chunk size | |
| system_prompt = """ | |
| You are a helpful and informative assistant that can answer questions based on the content of documents. | |
| You will receive the content of a document and a question about it. | |
| Your task is to provide a concise and accurate answer to the question based solely on the provided document content. | |
| If the document does not contain enough information to answer the question, simply state that you cannot answer the question based on the provided information. | |
| """ | |
| message = f"""[INST] [SYSTEM] {system_prompt} | |
| Document Content: {content} | |
| Question: {question} | |
| Answer:""" | |
| yield from generate_mistral_response(message) | |
| def chat_document_v2(file, question, clean=True): | |
| """Chats with a document using chunk-based Mistral API calls and summarizes the answers.""" | |
| content, length = read_document(file, clean) | |
| chunks = split_content(content) | |
| system_prompt = """ | |
| You are a helpful and informative assistant that can answer questions based on the content of documents. | |
| You will receive the content of a document and a question about it. | |
| Your task is to provide a concise and accurate answer to the question based solely on the provided document content. | |
| If the document does not contain enough information to answer the question, simply state that you cannot answer the question based on the provided information. | |
| """ | |
| all_answers = [] | |
| for chunk in chunks: | |
| message = f"""[INST] [SYSTEM] {system_prompt} | |
| Document Content: {chunk[:CHUNK_SIZE]} | |
| Question: {question} | |
| Answer:""" | |
| response = "" | |
| for stream_response in generate_mistral_response(message): | |
| response = stream_response # Update with latest response | |
| all_answers.append(response) | |
| # Summarize all answers using Mistral | |
| summary_prompt = """ | |
| You are a helpful and informative assistant that can summarize multiple answers related to the same question. | |
| You will receive a list of answers to a question, and your task is to generate a concise and comprehensive summary that incorporates the key information from all the answers. | |
| Avoid repeating information unnecessarily and focus on providing the most relevant and accurate summary based on the provided answers. | |
| Answers: | |
| """ | |
| all_answers_str = "\n".join(all_answers) | |
| summary_message = f"""[INST] [SYSTEM] {summary_prompt} | |
| {all_answers_str[:30000]} | |
| Summary:""" | |
| yield from generate_mistral_response(summary_message) | |
| # --- Gradio Interface --- | |
| with gr.Blocks() as demo: | |
| with gr.Tabs(): | |
| with gr.TabItem("Document Reader"): | |
| iface1 = gr.Interface( | |
| fn=read_document, | |
| inputs=[ | |
| gr.File(label="Upload a Document"), | |
| gr.Checkbox(label="Clean Text", value=True), | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Document Content"), | |
| gr.Number(label="Document Length (characters)"), | |
| ], | |
| title="Document Reader", | |
| description="Upload a document (PDF, XLSX, PPTX, TXT, CSV, DOC, DOCX and Code or text file) to read its content." | |
| ) | |
| with gr.TabItem("Document Chat"): | |
| iface2 = gr.Interface( | |
| fn=chat_document, | |
| inputs=[ | |
| gr.File(label="Upload a Document"), | |
| gr.Textbox(label="Question"), | |
| gr.Checkbox(label="Clean and Compress Text", value=True), | |
| ], | |
| outputs=gr.Markdown(label="Answer"), | |
| title="Document Chat", | |
| description="Upload a document and ask questions about its content." | |
| ) | |
| with gr.TabItem("Document Chat V2"): | |
| iface3 = gr.Interface( | |
| fn=chat_document_v2, | |
| inputs=[ | |
| gr.File(label="Upload a Document"), | |
| gr.Textbox(label="Question"), | |
| gr.Checkbox(label="Clean Text", value=True), | |
| ], | |
| outputs=gr.Markdown(label="Answer"), | |
| title="Document Chat V2", | |
| description="Upload a document and ask questions about its content (using chunk-based approach)." | |
| ) | |
| demo.launch() |