OSINTMCPServer / app.py
S-Dreamer's picture
Update app.py
6988b5e verified
raw
history blame
11.1 kB
#!/usr/bin/env python
"""
Parrot OSINT MCP – Gradio Frontend
Modes:
- OSINT Dashboard (deterministic intelligence)
- MCP Bridge (raw tool access)
- Analyst Copilot (LLM interpretive intelligence)
"""
import json
import traceback
from typing import Any, Dict
import gradio as gr
from huggingface_hub import InferenceClient
# ---------------------------------------------------------------------
# Task Registry (auto-loads MCP tasks dynamically)
# ---------------------------------------------------------------------
TASK_REGISTRY: Dict[str, Any] = {}
def _register_tasks():
"""
Import tasks.* modules dynamically and pull their run() functions.
Missing modules are ignored so the UI can still boot.
"""
def _try(name: str, module: str):
try:
m = __import__(f"tasks.{module}", fromlist=["*"])
fn = getattr(m, "run", None)
if callable(fn):
TASK_REGISTRY[name] = fn
except Exception:
# In Spaces, you might not have all tasks yet; that's fine.
pass
_try("lookup_ip", "lookup_ip")
_try("lookup_domain", "lookup_domain")
_try("lookup_hash", "lookup_hash")
_try("correlate_iocs", "correlate_iocs")
_try("generate_report", "generate_report")
_try("enrich_entity", "enrich_entity")
_try("mitre_map", "mitre_map")
_try("quickscan", "quickscan")
_register_tasks()
# ---------------------------------------------------------------------
# Task Execution + Normalization
# ---------------------------------------------------------------------
def call_task(name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
fn = TASK_REGISTRY.get(name)
if not fn:
return {"error": f"Unknown tool '{name}'."}
try:
res = fn(**payload)
if not isinstance(res, dict):
res = {"result": res}
return res
except Exception as e:
return {"error": str(e), "traceback": traceback.format_exc()}
def normalize_result(res: Dict[str, Any]) -> Dict[str, str]:
"""Ensures consistent UI formatting."""
pretty = json.dumps(res, indent=2, default=str)
summary = res.get("summary", "")
markdown = res.get("markdown") or res.get("report") or ""
if not markdown and summary:
markdown = f"## Summary\n\n{summary}"
def safe_json(value: Any) -> str:
return json.dumps(value, indent=2, default=str) if value else ""
return {
"summary": summary,
"markdown": markdown,
"json": pretty,
"mitre": safe_json(res.get("mitre")),
"stix": safe_json(res.get("stix")),
"sarif": safe_json(res.get("sarif")),
}
# ---------------------------------------------------------------------
# Analyst Copilot LLM
# ---------------------------------------------------------------------
def respond(
message,
history,
system_prompt,
model_name,
hf_token,
temperature,
top_p,
max_tokens,
):
"""
Streaming LLM output using WhiteRabbit Neo or Cybertron.
`hf_token` is a raw string entered by the user.
"""
client = InferenceClient(
model=model_name,
token=hf_token, # Direct string token
)
msgs = [{"role": "system", "content": system_prompt}]
msgs.extend(history)
msgs.append({"role": "user", "content": message})
buffer = ""
for chunk in client.chat_completion(
messages=msgs,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True,
):
delta = chunk.choices[0].delta.content
if delta:
buffer += delta
yield buffer
def inject_osint(history, osint_obj):
"""
Inject OSINT result JSON into the copilot's chat history as a system message.
`history` is the ChatInterface state (list of messages).
"""
pretty = json.dumps(osint_obj, indent=2, default=str)
history.append(
{
"role": "system",
"content": f"### Injected OSINT Result\n```\n{pretty}\n```",
}
)
return history
# ---------------------------------------------------------------------
# OSINT Dashboard Callbacks
# ---------------------------------------------------------------------
def ui_lookup_ip(ip, enrich, mitre):
raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre})
norm = normalize_result(raw)
return (
norm["summary"],
norm["markdown"],
norm["json"],
norm["mitre"],
norm["stix"],
raw,
)
def ui_lookup_domain(domain, enrich, mitre):
raw = call_task(
"lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre}
)
norm = normalize_result(raw)
return (
norm["summary"],
norm["markdown"],
norm["json"],
norm["mitre"],
norm["stix"],
raw,
)
def ui_lookup_hash(h, ht, enrich, mitre):
raw = call_task(
"lookup_hash",
{"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre},
)
norm = normalize_result(raw)
return (
norm["summary"],
norm["markdown"],
norm["json"],
norm["mitre"],
norm["stix"],
raw,
)
def ui_correlate_iocs(iocs):
lst = [x.strip() for x in iocs.splitlines() if x.strip()]
raw = call_task("correlate_iocs", {"iocs": lst})
norm = normalize_result(raw)
return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw
def ui_quickscan(target):
raw = call_task("quickscan", {"target": target})
norm = normalize_result(raw)
return norm["summary"], norm["markdown"], norm["json"], raw
# ---------------------------------------------------------------------
# MCP Bridge
# ---------------------------------------------------------------------
def ui_bridge(tool, args_json):
try:
payload = json.loads(args_json)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2), "", {}
raw = call_task(tool, payload)
norm = normalize_result(raw)
return norm["json"], norm["markdown"], raw
# ---------------------------------------------------------------------
# UI Layout
# ---------------------------------------------------------------------
def build_interface():
with gr.Blocks(title="Parrot OSINT MCP Console") as demo:
gr.Markdown(
"# 🦜 Parrot OSINT MCP Console\n"
"Multi-mode Intelligence Workstation."
)
# Holds the last OSINT result (dict) to inject into the copilot
osint_state = gr.State({})
# -------------------------
# OSINT Dashboard
# -------------------------
with gr.Tab("OSINT Dashboard"):
# IP Lookup
with gr.Tab("IP Lookup"):
ip = gr.Textbox(label="IP Address", placeholder="8.8.8.8")
enrich = gr.Checkbox(value=True, label="Enrich data")
mitre = gr.Checkbox(value=True, label="MITRE ATT&CK Mapping")
run = gr.Button("Run IP Lookup")
out_s = gr.Textbox(label="Summary")
out_md = gr.Markdown()
out_json = gr.Code(language="json")
out_mitre = gr.Code(language="json")
out_stix = gr.Code(language="json")
run.click(
ui_lookup_ip,
[ip, enrich, mitre],
[out_s, out_md, out_json, out_mitre, out_stix, osint_state],
)
# You can add more tabs here: Domain Lookup, Hash Lookup, Correlate IOCs, Quickscan
# -------------------------
# MCP Bridge
# -------------------------
with gr.Tab("MCP Bridge"):
tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()), label="Tool")
args = gr.Code(language="json", label="Args JSON")
btn = gr.Button("Run Tool")
out_bridge_json = gr.Code(language="json")
out_bridge_md = gr.Markdown()
btn.click(
ui_bridge,
[tool, args],
[out_bridge_json, out_bridge_md, osint_state],
)
# -------------------------
# Analyst Copilot
# -------------------------
with gr.Tab("Analyst Copilot"):
gr.Markdown(
"### WhiteRabbit Neo + Cybertron Threat Intelligence Assistant"
)
system_prompt = gr.Textbox(
label="System Prompt",
value=(
"You are a threat intelligence analyst. "
"You classify TTPs, extract indicators, map MITRE ATT&CK, "
"and provide investigation guidance."
),
)
model_select = gr.Dropdown(
label="LLM Model",
choices=[
"berkeley-nest/WhiteRabbitNeo-8B",
"cybertronai/cybertron-1.1-1b",
"cybertronai/cybertron-1.1-7b",
"cybertronai/cybertron-1.1-32b",
],
value="berkeley-nest/WhiteRabbitNeo-8B",
)
gr.Markdown(
"### HuggingFace API Token (required for LLM inference)"
)
hf_token = gr.Textbox(
label="HF Token",
type="password",
placeholder="hf_xxx...",
)
# Chat history state for the copilot (list of messages)
chat_state = gr.State([])
chatbot = gr.ChatInterface(
respond,
type="messages",
additional_inputs=[
system_prompt,
model_select,
hf_token,
gr.Slider(
0.1,
2.0,
value=0.7,
step=0.1,
label="Temperature",
),
gr.Slider(
0.1,
1.0,
value=0.95,
step=0.05,
label="Top-p",
),
gr.Slider(
32,
4096,
value=512,
step=32,
label="Max Tokens",
),
],
state=chat_state,
)
inject_btn = gr.Button("Inject Last OSINT Result into Copilot")
inject_btn.click(
inject_osint,
inputs=[chat_state, osint_state],
outputs=[chat_state],
)
return demo
# ---------------------------------------------------------------------
# MAIN ENTRY
# ---------------------------------------------------------------------
if __name__ == "__main__":
demo = build_interface()
demo.launch()