import os import re from datetime import date, datetime from typing import List, Union import pandas as pd from openpyxl import Workbook from openpyxl.cell.rich_text import CellRichText, TextBlock from openpyxl.cell.text import InlineFont from openpyxl.styles import Alignment, Font from openpyxl.utils import get_column_letter from openpyxl.utils.dataframe import dataframe_to_rows from tools.config import OUTPUT_FOLDER from tools.config import model_name_map as global_model_name_map from tools.helper_functions import ( clean_column_name, convert_reference_table_to_pivot_table, ensure_model_in_map, get_basic_response_data, load_in_data_file, ) def markdown_to_richtext( text: Union[str, float, int, None], ) -> Union[CellRichText, str, float, int, None]: """ Convert markdown formatting in text to Excel RichText formatting. Supports: - **text** or __text__ for bold - *text* or _text_ for italic (when not bold) - ***text*** or ___text___ for bold+italic Also removes HTML
tags (and variants like
and
). Args: text: The text to convert (can be string, number, or None) Returns: CellRichText object if markdown is found, otherwise returns the original value """ # Return non-string values as-is if not isinstance(text, str): return text # Remove
tags (case-insensitive, handles
,
,
) text = re.sub(r"", "", text, flags=re.IGNORECASE) # Check if text contains markdown formatting if not re.search(r"(\*\*|__|\*|_)(?=\S)", text): return text # Create RichText object rich_text = CellRichText() # Process in order: triple markers first, then double, then single # This prevents conflicts (e.g., ***text*** being matched as **text** + *text*) # Pattern order matters: longer patterns first # Use word boundaries to ensure markers are not part of words (e.g., filenames with underscores) # (? last_pos: plain_text = text[last_pos:start] if plain_text: rich_text.append(plain_text) # Create font for this segment (use InlineFont for RichText) font = InlineFont(b=is_bold, i=is_italic) rich_text.append(TextBlock(font, content)) last_pos = end # Add remaining plain text if last_pos < len(text): remaining = text[last_pos:] if remaining: rich_text.append(remaining) # If we didn't add anything, return original text if len(rich_text) == 0: return text return rich_text def add_cover_sheet( wb: Workbook, intro_paragraphs: list[str], model_name: str, analysis_date: str, analysis_cost: str, number_of_responses: int, number_of_responses_with_text: int, number_of_responses_with_text_five_plus_words: int, llm_call_number: int, input_tokens: int, output_tokens: int, time_taken: float, file_name: str, column_name: str, number_of_responses_with_topic_assignment: int, custom_title: str = "Cover sheet", ): ws = wb.create_sheet(title=custom_title, index=0) # Freeze top row ws.freeze_panes = "A2" # Write title ws["A1"] = "Large Language Model thematic analysis" ws["A1"].font = Font(size=14, bold=True) ws["A1"].alignment = Alignment(wrap_text=True, vertical="top") # Add intro paragraphs row = 3 for paragraph in intro_paragraphs: ws.merge_cells(start_row=row, start_column=1, end_row=row, end_column=2) formatted_paragraph = markdown_to_richtext(paragraph) cell = ws.cell(row=row, column=1, value=formatted_paragraph) cell.alignment = Alignment(wrap_text=True, vertical="top") ws.row_dimensions[row].height = 60 # Adjust height as needed row += 2 # Add metadata meta_start = row + 1 metadata = { "Date Excel file created": date.today().strftime("%Y-%m-%d"), "File name": file_name, "Column name": column_name, "Model name": model_name, "Analysis date": analysis_date, # "Analysis cost": analysis_cost, "Number of responses": number_of_responses, "Number of responses with text": number_of_responses_with_text, "Number of responses with text five plus words": number_of_responses_with_text_five_plus_words, "Number of responses with at least one assigned topic": number_of_responses_with_topic_assignment, "Number of LLM calls": llm_call_number, "Total number of input tokens from LLM calls": input_tokens, "Total number of output tokens from LLM calls": output_tokens, "Total time taken for all LLM calls (seconds)": round(float(time_taken), 1), } # Define which metadata fields should have number formatting with thousand separators number_format_fields = { "Number of responses": "#,##0", "Number of responses with text": "#,##0", "Number of responses with text five plus words": "#,##0", "Number of responses with at least one assigned topic": "#,##0", "Number of LLM calls": "#,##0", "Total number of input tokens from LLM calls": "#,##0", "Total number of output tokens from LLM calls": "#,##0", "Total time taken for all LLM calls (seconds)": "#,##0.0", } for i, (label, value) in enumerate(metadata.items()): row_num = meta_start + i ws[f"A{row_num}"] = label ws[f"A{row_num}"].font = Font(bold=True) cell = ws[f"B{row_num}"] # Convert markdown to RichText if applicable formatted_value = markdown_to_richtext(value) cell.value = formatted_value # Set left alignment for all metadata values (including numbers) cell.alignment = Alignment(horizontal="left", wrap_text=True) # Apply number formatting with thousand separators for numeric fields if label in number_format_fields: # Only apply formatting if value is not RichText (numbers can't be RichText) if not isinstance(cell.value, CellRichText): cell.number_format = number_format_fields[label] # Optional: Adjust column widths ws.column_dimensions["A"].width = 50 ws.column_dimensions["B"].width = 75 # Ensure first row cells are wrapped on the cover sheet for col_idx in range(1, ws.max_column + 1): header_cell = ws.cell(row=1, column=col_idx) header_cell.alignment = Alignment(wrap_text=True, vertical="center") def csvs_to_excel( csv_files: list[str], output_filename: str, sheet_names: list[str] = None, column_widths: dict = None, # Dict of {sheet_name: {col_letter: width}} wrap_text_columns: dict = None, # Dict of {sheet_name: [col_letters]} intro_text: list[str] = None, model_name: str = "", analysis_date: str = "", analysis_cost: str = "", llm_call_number: int = 0, input_tokens: int = 0, output_tokens: int = 0, time_taken: float = 0, number_of_responses: int = 0, number_of_responses_with_text: int = 0, number_of_responses_with_text_five_plus_words: int = 0, column_name: str = "", number_of_responses_with_topic_assignment: int = 0, file_name: str = "", unique_reference_numbers: list = [], ): if intro_text is None: intro_text = list() wb = Workbook() # Remove default sheet wb.remove(wb.active) for idx, csv_path in enumerate(csv_files): # Use provided sheet name or derive from file name sheet_name = ( sheet_names[idx] if sheet_names and idx < len(sheet_names) else os.path.splitext(os.path.basename(csv_path))[0] ) df = pd.read_csv(csv_path) if sheet_name == "Original data": try: # Create a copy to avoid modifying the original df_copy = df.copy() # Insert the Reference column at position 0 (first column) df_copy.insert(0, "Reference", unique_reference_numbers) df = df_copy except Exception as e: print("Could not add reference number to original data due to:", e) ws = wb.create_sheet(title=sheet_name) for r_idx, row in enumerate( dataframe_to_rows(df, index=False, header=True), start=1 ): ws.append(row) for col_idx, value in enumerate(row, start=1): cell = ws.cell(row=r_idx, column=col_idx) # Convert markdown to RichText if applicable formatted_value = markdown_to_richtext(value) if formatted_value != value: cell.value = formatted_value else: cell.value = value # Bold header row if r_idx == 1: # If cell already has RichText, we need to apply bold to all segments if isinstance(cell.value, CellRichText): # Create new RichText with bold applied to all segments bold_rich_text = CellRichText() for segment in cell.value: if isinstance(segment, TextBlock): # Preserve italic if present, add bold is_italic = segment.font.i if segment.font else False bold_font = InlineFont(b=True, i=is_italic) bold_rich_text.append( TextBlock(bold_font, segment.text) ) else: bold_rich_text.append( TextBlock(InlineFont(b=True), str(segment)) ) cell.value = bold_rich_text else: cell.font = Font(bold=True) # Set vertical alignment to middle by default cell.alignment = Alignment(vertical="center") # Apply wrap text if needed if wrap_text_columns and sheet_name in wrap_text_columns: for col_letter in wrap_text_columns[sheet_name]: cell = ws[f"{col_letter}{r_idx}"] cell.alignment = Alignment(vertical="center", wrap_text=True) # Freeze top row for all data sheets ws.freeze_panes = "A2" # Ensure all header cells (first row) are wrapped for col_idx in range(1, ws.max_column + 1): header_cell = ws.cell(row=1, column=col_idx) header_cell.alignment = Alignment(vertical="center", wrap_text=True) # Set column widths if column_widths and sheet_name in column_widths: for col_letter, width in column_widths[sheet_name].items(): ws.column_dimensions[col_letter].width = width add_cover_sheet( wb, intro_paragraphs=intro_text, model_name=model_name, analysis_date=analysis_date, analysis_cost=analysis_cost, number_of_responses=number_of_responses, number_of_responses_with_text=number_of_responses_with_text, number_of_responses_with_text_five_plus_words=number_of_responses_with_text_five_plus_words, llm_call_number=llm_call_number, input_tokens=input_tokens, output_tokens=output_tokens, time_taken=time_taken, file_name=file_name, column_name=column_name, number_of_responses_with_topic_assignment=number_of_responses_with_topic_assignment, ) wb.save(output_filename) print(f"Output xlsx summary saved as '{output_filename}'") return output_filename ### # Run the functions ### def collect_output_csvs_and_create_excel_output( in_data_files: List, chosen_cols: list[str], reference_data_file_name_textbox: str, in_group_col: str, model_choice: str, master_reference_df_state: pd.DataFrame, master_unique_topics_df_state: pd.DataFrame, summarised_output_df: pd.DataFrame, missing_df_state: pd.DataFrame, excel_sheets: str = "", usage_logs_location: str = "", model_name_map: dict = dict(), output_folder: str = OUTPUT_FOLDER, structured_summaries: str = "No", ): """ Collect together output CSVs from various output boxes and combine them into a single output Excel file. Args: in_data_files (List): A list of paths to the input data files. chosen_cols (list[str]): A list of column names selected for analysis. reference_data_file_name_textbox (str): The name of the reference data file. in_group_col (str): The column used for grouping the data. model_choice (str): The LLM model chosen for the analysis. master_reference_df_state (pd.DataFrame): The master DataFrame containing reference data. master_unique_topics_df_state (pd.DataFrame): The master DataFrame containing unique topics data. summarised_output_df (pd.DataFrame): DataFrame containing the summarised output. missing_df_state (pd.DataFrame): DataFrame containing information about missing data. excel_sheets (str): Information regarding Excel sheets, typically sheet names or structure. usage_logs_location (str, optional): Path to the usage logs CSV file. Defaults to "". model_name_map (dict, optional): A dictionary mapping model choices to their display names. Defaults to {}. output_folder (str, optional): The directory where the output Excel file will be saved. Defaults to OUTPUT_FOLDER. structured_summaries (str, optional): Indicates whether structured summaries are being produced ("Yes" or "No"). Defaults to "No". Returns: tuple: A tuple containing: - list: A list of paths to the generated Excel output files. - list: A duplicate of the list of paths to the generated Excel output files (for UI compatibility). """ # Use passed model_name_map if provided and not empty, otherwise use global one if not model_name_map: model_name_map = global_model_name_map # Ensure custom model_choice is registered in model_name_map ensure_model_in_map(model_choice, model_name_map) if structured_summaries == "Yes": structured_summaries = True else: structured_summaries = False if not chosen_cols: raise Exception("Could not find chosen column") today_date = datetime.today().strftime("%Y-%m-%d") original_data_file_path = os.path.abspath(in_data_files[0]) csv_files = list() sheet_names = list() column_widths = dict() wrap_text_columns = dict() short_file_name = os.path.basename(reference_data_file_name_textbox) reference_pivot_table = pd.DataFrame() reference_table_csv_path = "" reference_pivot_table_csv_path = "" unique_topic_table_csv_path = "" missing_df_state_csv_path = "" overall_summary_csv_path = "" number_of_responses_with_topic_assignment = 0 if in_group_col: group = in_group_col else: group = "All" overall_summary_csv_path = output_folder + "overall_summary_for_xlsx.csv" if structured_summaries is True and not master_unique_topics_df_state.empty: print("Producing overall summary based on structured summaries.") # Create structured summary from master_unique_topics_df_state structured_summary_data = list() # Group by 'Group' column for group_name, group_df in master_unique_topics_df_state.groupby("Group"): group_summary = f"## {group_name}\n\n" # Group by 'General topic' within each group for general_topic, topic_df in group_df.groupby("General topic"): group_summary += f"### {general_topic}\n\n" # Add subtopics under each general topic for _, row in topic_df.iterrows(): subtopic = row["Subtopic"] summary = row["Summary"] # sentiment = row.get('Sentiment', '') # num_responses = row.get('Number of responses', '') # Create subtopic entry subtopic_entry = f"**{subtopic}**" # if sentiment: # subtopic_entry += f" ({sentiment})" # if num_responses: # subtopic_entry += f" - {num_responses} responses" subtopic_entry += "\n\n" if summary and pd.notna(summary): subtopic_entry += f"{summary}\n\n" group_summary += subtopic_entry # Add to structured summary data structured_summary_data.append( {"Group": group_name, "Summary": group_summary.strip()} ) # Create DataFrame for structured summary structured_summary_df = pd.DataFrame(structured_summary_data) structured_summary_df.to_csv(overall_summary_csv_path, index=False) else: # Use original summarised_output_df structured_summary_df = summarised_output_df structured_summary_df.to_csv(overall_summary_csv_path, index=None) if not structured_summary_df.empty: csv_files.append(overall_summary_csv_path) sheet_names.append("Overall summary") column_widths["Overall summary"] = {"A": 15, "B": 120} wrap_text_columns["Overall summary"] = ["A", "B"] if not master_reference_df_state.empty: # Simplify table to just responses column and the Response reference number file_data, file_name, num_batches = load_in_data_file( in_data_files, chosen_cols, 1, in_excel_sheets=excel_sheets ) basic_response_data = get_basic_response_data( file_data, chosen_cols, verify_titles="No" ) reference_pivot_table = convert_reference_table_to_pivot_table( master_reference_df_state, basic_response_data ) unique_reference_numbers = basic_response_data["Reference"].tolist() try: master_reference_df_state.rename( columns={"Topic_number": "Topic number"}, inplace=True, errors="ignore" ) master_reference_df_state.drop( columns=["1", "2", "3"], inplace=True, errors="ignore" ) except Exception as e: print("Could not rename Topic_number due to", e) number_of_responses_with_topic_assignment = len( master_reference_df_state["Response References"].unique() ) reference_table_csv_path = output_folder + "reference_df_for_xlsx.csv" master_reference_df_state.to_csv(reference_table_csv_path, index=None) reference_pivot_table_csv_path = ( output_folder + "reference_pivot_df_for_xlsx.csv" ) reference_pivot_table.to_csv(reference_pivot_table_csv_path, index=None) short_file_name = os.path.basename(file_name) if not master_unique_topics_df_state.empty: master_unique_topics_df_state.drop( columns=["1", "2", "3"], inplace=True, errors="ignore" ) unique_topic_table_csv_path = ( output_folder + "unique_topic_table_df_for_xlsx.csv" ) master_unique_topics_df_state.to_csv(unique_topic_table_csv_path, index=None) if unique_topic_table_csv_path: csv_files.append(unique_topic_table_csv_path) sheet_names.append("Topic summary") column_widths["Topic summary"] = { "A": 25, "B": 25, "C": 12, "D": 15, "E": 10, "F": 100, } wrap_text_columns["Topic summary"] = ["A", "B", "D", "F"] else: print("Relevant unique topic files not found, excluding from xlsx output.") if reference_table_csv_path: if structured_summaries: print( "Structured summaries are being produced, excluding response level data from xlsx output." ) else: csv_files.append(reference_table_csv_path) sheet_names.append("Response level data") column_widths["Response level data"] = { "A": 12, "B": 30, "C": 40, "D": 10, "H": 100, } wrap_text_columns["Response level data"] = ["C", "G"] else: print("Relevant reference files not found, excluding from xlsx output.") if reference_pivot_table_csv_path: if structured_summaries: print( "Structured summaries are being produced, excluding topic response pivot table from xlsx output." ) else: csv_files.append(reference_pivot_table_csv_path) sheet_names.append("Topic response pivot table") if reference_pivot_table.empty: reference_pivot_table = pd.read_csv(reference_pivot_table_csv_path) # Base widths and wrap column_widths["Topic response pivot table"] = {"A": 12, "B": 100, "C": 12} wrap_text_columns["Topic response pivot table"] = ["B", "C"] num_cols = len(reference_pivot_table.columns) col_letters = [get_column_letter(i) for i in range(4, num_cols + 1)] for col_letter in col_letters: column_widths["Topic response pivot table"][col_letter] = 20 wrap_text_columns["Topic response pivot table"].extend(col_letters) else: print( "Relevant reference pivot table files not found, excluding from xlsx output." ) if not missing_df_state.empty: missing_df_state_csv_path = output_folder + "missing_df_state_df_for_xlsx.csv" missing_df_state.to_csv(missing_df_state_csv_path, index=None) if missing_df_state_csv_path: if structured_summaries: print( "Structured summaries are being produced, excluding missing responses from xlsx output." ) else: csv_files.append(missing_df_state_csv_path) sheet_names.append("Missing responses") column_widths["Missing responses"] = {"A": 25, "B": 30, "C": 50} wrap_text_columns["Missing responses"] = ["C"] else: print("Relevant missing responses files not found, excluding from xlsx output.") new_csv_files = csv_files.copy() # Original data file original_ext = os.path.splitext(original_data_file_path)[1].lower() if original_ext == ".csv": csv_files.append(original_data_file_path) else: # Read and convert to CSV if original_ext == ".xlsx": if excel_sheets: df = pd.read_excel(original_data_file_path, sheet_name=excel_sheets) else: df = pd.read_excel(original_data_file_path) elif original_ext == ".parquet": df = pd.read_parquet(original_data_file_path) else: raise Exception(f"Unsupported file type for original data: {original_ext}") # Save as CSV in output folder original_data_csv_path = os.path.join( output_folder, os.path.splitext(os.path.basename(original_data_file_path))[0] + "_for_xlsx.csv", ) df.to_csv(original_data_csv_path, index=False) csv_files.append(original_data_csv_path) sheet_names.append("Original data") column_widths["Original data"] = {"A": 10, "B": 20, "C": 20} wrap_text_columns["Original data"] = ["C"] if isinstance(chosen_cols, list) and chosen_cols: chosen_cols = chosen_cols[0] else: chosen_cols = str(chosen_cols) if chosen_cols else "" # Intro page text intro_text = [ "This workbook contains outputs from the Large Language Model (LLM) thematic analysis of open text data. Each sheet corresponds to a different CSV report included in the analysis.", f"The file analysed was {short_file_name}, the column analysed was '{chosen_cols}' and the data was grouped by column '{group}'." " Please contact the app administrator if you need any explanation on how to use the results." "LLMs are not 100% accurate and may produce biased or harmful outputs. All outputs from this analysis **need to be checked by a human** to check for harmful outputs, false information, and bias.", ] # Get values for number of rows, number of responses, and number of responses longer than five words number_of_responses = basic_response_data.shape[0] # number_of_responses_with_text = basic_response_data["Response"].str.strip().notnull().sum() number_of_responses_with_text = ( basic_response_data["Response"].str.strip().notnull() & (basic_response_data["Response"].str.split().str.len() >= 1) ).sum() number_of_responses_with_text_five_plus_words = ( basic_response_data["Response"].str.strip().notnull() & (basic_response_data["Response"].str.split().str.len() >= 5) ).sum() # Get number of LLM calls, input and output tokens if usage_logs_location: try: usage_logs = pd.read_csv(usage_logs_location) relevant_logs = usage_logs.loc[ ( usage_logs["Reference data file name"] == reference_data_file_name_textbox ) & ( usage_logs[ "Large language model for topic extraction and summarisation" ] == model_choice ) & ( usage_logs[ "Select the open text column of interest. In an Excel file, this shows columns across all sheets." ] == ( chosen_cols[0] if isinstance(chosen_cols, list) and chosen_cols else chosen_cols ) ), :, ] llm_call_number = sum(relevant_logs["Total LLM calls"].astype(int)) input_tokens = sum(relevant_logs["Total input tokens"].astype(int)) output_tokens = sum(relevant_logs["Total output tokens"].astype(int)) time_taken = sum( relevant_logs["Estimated time taken (seconds)"].astype(float) ) except Exception as e: print("Could not obtain usage logs due to:", e) usage_logs = pd.DataFrame() llm_call_number = 0 input_tokens = 0 output_tokens = 0 time_taken = 0 else: print("LLM call logs location not provided") usage_logs = pd.DataFrame() llm_call_number = 0 input_tokens = 0 output_tokens = 0 time_taken = 0 # Create short filename: model_choice_clean_short = clean_column_name( model_name_map[model_choice]["short_name"], max_length=20, front_characters=False, ) # Extract first column name as string for cleaning and Excel output chosen_col_str = ( chosen_cols[0] if isinstance(chosen_cols, list) and chosen_cols else str(chosen_cols) if chosen_cols else "" ) in_column_cleaned = clean_column_name(chosen_col_str, max_length=20) file_name_cleaned = clean_column_name( file_name, max_length=20, front_characters=True ) # Save outputs for each batch. If master file created, label file as master file_path_details = ( f"{file_name_cleaned}_col_{in_column_cleaned}_{model_choice_clean_short}" ) output_xlsx_filename = ( output_folder + file_path_details + ("_structured_summaries" if structured_summaries else "_theme_analysis") + ".xlsx" ) xlsx_output_filename = csvs_to_excel( csv_files=csv_files, output_filename=output_xlsx_filename, sheet_names=sheet_names, column_widths=column_widths, wrap_text_columns=wrap_text_columns, intro_text=intro_text, model_name=model_choice, analysis_date=today_date, analysis_cost="", llm_call_number=llm_call_number, input_tokens=input_tokens, output_tokens=output_tokens, time_taken=time_taken, number_of_responses=number_of_responses, number_of_responses_with_text=number_of_responses_with_text, number_of_responses_with_text_five_plus_words=number_of_responses_with_text_five_plus_words, column_name=chosen_col_str, number_of_responses_with_topic_assignment=number_of_responses_with_topic_assignment, file_name=short_file_name, unique_reference_numbers=unique_reference_numbers, ) xlsx_output_filenames = [xlsx_output_filename] # Delete intermediate csv files for csv_file in new_csv_files: os.remove(csv_file) return xlsx_output_filenames, xlsx_output_filenames