#!/usr/bin/env python """ Parrot OSINT MCP – Gradio Frontend Modes: - OSINT Dashboard (deterministic intelligence) - MCP Bridge (raw tool access) - Analyst Copilot (LLM interpretive intelligence) """ import json import traceback from typing import Any, Dict import gradio as gr from huggingface_hub import InferenceClient # --------------------------------------------------------------------- # Task Registry (auto-loads MCP tasks dynamically) # --------------------------------------------------------------------- TASK_REGISTRY: Dict[str, Any] = {} def _register_tasks(): """ Import tasks.* modules dynamically and pull their run() functions. """ def _try(name, module): try: m = __import__(f"tasks.{module}", fromlist=["*"]) fn = getattr(m, "run", None) if callable(fn): TASK_REGISTRY[name] = fn except Exception: pass # In HF Spaces, missing tasks won't break startup. _try("lookup_ip", "lookup_ip") _try("lookup_domain", "lookup_domain") _try("lookup_hash", "lookup_hash") _try("correlate_iocs", "correlate_iocs") _try("generate_report", "generate_report") _try("enrich_entity", "enrich_entity") _try("mitre_map", "mitre_map") _try("quickscan", "quickscan") _register_tasks() # --------------------------------------------------------------------- # Task Execution + Normalization # --------------------------------------------------------------------- def call_task(name: str, payload: Dict[str, Any]): fn = TASK_REGISTRY.get(name) if not fn: return {"error": f"Unknown tool '{name}'."} try: res = fn(**payload) if not isinstance(res, dict): res = {"result": res} return res except Exception as e: return {"error": str(e), "traceback": traceback.format_exc()} def normalize_result(res: Dict[str, Any]): """Ensures consistent UI formatting.""" pretty = json.dumps(res, indent=2, default=str) summary = res.get("summary", "") markdown = res.get("markdown") or res.get("report") or "" if not markdown and summary: markdown = f"## Summary\n\n{summary}" safe_json = lambda v: json.dumps(v, indent=2, default=str) if v else "" return { "summary": summary, "markdown": markdown, "json": pretty, "mitre": safe_json(res.get("mitre")), "stix": safe_json(res.get("stix")), "sarif": safe_json(res.get("sarif")), } # --------------------------------------------------------------------- # Analyst Copilot LLM # --------------------------------------------------------------------- def respond( message, history, system_prompt, model_name, hf_token, temperature, top_p, max_tokens, ): """ Streaming LLM output using WhiteRabbit Neo or Cybertron. hf_token is a raw string entered by user. """ client = InferenceClient( model=model_name, token=hf_token, # Direct string, no OAuthToken object ) msgs = [{"role": "system", "content": system_prompt}] msgs.extend(history) msgs.append({"role": "user", "content": message}) buffer = "" for chunk in client.chat_completion( messages=msgs, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stream=True, ): delta = chunk.choices[0].delta.content if delta: buffer += delta yield buffer def inject_osint(history, osint_obj): """Inject OSINT result JSON into copilot context.""" pretty = json.dumps(osint_obj, indent=2, default=str) history.append({ "role": "system", "content": f"### Injected OSINT Result\n```\n{pretty}\n```" }) return history # --------------------------------------------------------------------- # OSINT Dashboard Callbacks # --------------------------------------------------------------------- def ui_lookup_ip(ip, enrich, mitre): raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw def ui_lookup_domain(domain, enrich, mitre): raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw def ui_lookup_hash(h, ht, enrich, mitre): raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw def ui_correlate_iocs(iocs): lst = [x.strip() for x in iocs.splitlines() if x.strip()] raw = call_task("correlate_iocs", {"iocs": lst}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw def ui_quickscan(target): raw = call_task("quickscan", {"target": target}) norm = normalize_result(raw) return norm["summary"], norm["markdown"], norm["json"], raw # --------------------------------------------------------------------- # MCP Bridge # --------------------------------------------------------------------- def ui_bridge(tool, args_json): try: payload = json.loads(args_json) except Exception as e: return json.dumps({"error": str(e)}, indent=2), "", {} raw = call_task(tool, payload) norm = normalize_result(raw) return norm["json"], norm["markdown"], raw # --------------------------------------------------------------------- # UI Layout # --------------------------------------------------------------------- def build_interface(): with gr.Blocks(title="Parrot OSINT MCP Console") as demo: gr.Markdown("# 🦜 Parrot OSINT MCP Console\nMulti-mode Intelligence Workstation.") osint_state = gr.State({}) # ------------------------- # OSINT Dashboard # ------------------------- with gr.Tab("OSINT Dashboard"): # IP Lookup with gr.Tab("IP Lookup"): ip = gr.Textbox(label="IP Address", placeholder="8.8.8.8") enrich = gr.Checkbox(value=True, label="Enrich data") mitre = gr.Checkbox(value=True, label="MITRE ATT&CK Mapping") run = gr.Button("Run IP Lookup") out_s = gr.Textbox(label="Summary") out_md = gr.Markdown() out_json = gr.Code(language="json") out_mitre = gr.Code(language="json") out_stix = gr.Code(language="json") run.click( ui_lookup_ip, [ip, enrich, mitre], [out_s, out_md, out_json, out_mitre, out_stix, osint_state] ) # ------------------------- # MCP Bridge # ------------------------- with gr.Tab("MCP Bridge"): tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()), label="Tool") args = gr.Code(language="json", label="Args JSON") btn = gr.Button("Run Tool") out_bridge_json = gr.Code(language="json") out_bridge_md = gr.Markdown() btn.click( ui_bridge, [tool, args], [out_bridge_json, out_bridge_md, osint_state] ) # ------------------------- # Analyst Copilot # ------------------------- with gr.Tab("Analyst Copilot"): gr.Markdown("### WhiteRabbit Neo + Cybertron Threat Intelligence Assistant") system_prompt = gr.Textbox( label="System Prompt", value=( "You are a threat intelligence analyst. " "You classify TTPs, extract indicators, map MITRE ATT&CK, " "and provide investigation guidance." ), ) model_select = gr.Dropdown( label="LLM Model", choices=[ "berkeley-nest/WhiteRabbitNeo-8B", "cybertronai/cybertron-1.1-1b", "cybertronai/cybertron-1.1-7b", "cybertronai/cybertron-1.1-32b" ], value="berkeley-nest/WhiteRabbitNeo-8B", ) gr.Markdown("### HuggingFace API Token (required for LLM inference)") hf_token = gr.Textbox( label="HF Token", type="password", placeholder="hf_xxx...", ) chatbot = gr.ChatInterface( respond, type="messages", additional_inputs=[ system_prompt, model_select, hf_token, gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"), gr.Slider(32, 4096, value=512, step=32, label="Max Tokens"), ], ) inject_btn = gr.Button("Inject Last OSINT Result into Copilot") inject_btn.click( inject_osint, [chatbot._chatbot_state, osint_state], [chatbot._chatbot_state], ) return demo # --------------------------------------------------------------------- # MAIN ENTRY # --------------------------------------------------------------------- if __name__ == "__main__": demo = build_interface() demo.launch()