OSINTMCPServer / app.py
S-Dreamer's picture
Update app.py
7bec021 verified
raw
history blame
9.73 kB
#!/usr/bin/env python
"""
Parrot OSINT MCP – Gradio Frontend
Modes:
- OSINT Dashboard (deterministic intelligence)
- MCP Bridge (raw tool access)
- Analyst Copilot (LLM interpretive intelligence)
"""
import json
import traceback
from typing import Any, Dict
import gradio as gr
from huggingface_hub import InferenceClient
# ---------------------------------------------------------------------
# Task Registry (auto-loads MCP tasks dynamically)
# ---------------------------------------------------------------------
TASK_REGISTRY: Dict[str, Any] = {}
def _register_tasks():
"""
Import tasks.* modules dynamically and pull their run() functions.
"""
def _try(name, module):
try:
m = __import__(f"tasks.{module}", fromlist=["*"])
fn = getattr(m, "run", None)
if callable(fn):
TASK_REGISTRY[name] = fn
except Exception:
pass # In HF Spaces, missing tasks won't break startup.
_try("lookup_ip", "lookup_ip")
_try("lookup_domain", "lookup_domain")
_try("lookup_hash", "lookup_hash")
_try("correlate_iocs", "correlate_iocs")
_try("generate_report", "generate_report")
_try("enrich_entity", "enrich_entity")
_try("mitre_map", "mitre_map")
_try("quickscan", "quickscan")
_register_tasks()
# ---------------------------------------------------------------------
# Task Execution + Normalization
# ---------------------------------------------------------------------
def call_task(name: str, payload: Dict[str, Any]):
fn = TASK_REGISTRY.get(name)
if not fn:
return {"error": f"Unknown tool '{name}'."}
try:
res = fn(**payload)
if not isinstance(res, dict):
res = {"result": res}
return res
except Exception as e:
return {"error": str(e), "traceback": traceback.format_exc()}
def normalize_result(res: Dict[str, Any]):
"""Ensures consistent UI formatting."""
pretty = json.dumps(res, indent=2, default=str)
summary = res.get("summary", "")
markdown = res.get("markdown") or res.get("report") or ""
if not markdown and summary:
markdown = f"## Summary\n\n{summary}"
safe_json = lambda v: json.dumps(v, indent=2, default=str) if v else ""
return {
"summary": summary,
"markdown": markdown,
"json": pretty,
"mitre": safe_json(res.get("mitre")),
"stix": safe_json(res.get("stix")),
"sarif": safe_json(res.get("sarif")),
}
# ---------------------------------------------------------------------
# Analyst Copilot LLM
# ---------------------------------------------------------------------
def respond(
message,
history,
system_prompt,
model_name,
hf_token,
temperature,
top_p,
max_tokens,
):
"""
Streaming LLM output using WhiteRabbit Neo or Cybertron.
hf_token is a raw string entered by user.
"""
client = InferenceClient(
model=model_name,
token=hf_token, # Direct string, no OAuthToken object
)
msgs = [{"role": "system", "content": system_prompt}]
msgs.extend(history)
msgs.append({"role": "user", "content": message})
buffer = ""
for chunk in client.chat_completion(
messages=msgs,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True,
):
delta = chunk.choices[0].delta.content
if delta:
buffer += delta
yield buffer
def inject_osint(history, osint_obj):
"""Inject OSINT result JSON into copilot context."""
pretty = json.dumps(osint_obj, indent=2, default=str)
history.append({
"role": "system",
"content": f"### Injected OSINT Result\n```\n{pretty}\n```"
})
return history
# ---------------------------------------------------------------------
# OSINT Dashboard Callbacks
# ---------------------------------------------------------------------
def ui_lookup_ip(ip, enrich, mitre):
raw = call_task("lookup_ip", {"ip": ip, "enrich": enrich, "map_mitre": mitre})
norm = normalize_result(raw)
return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
def ui_lookup_domain(domain, enrich, mitre):
raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre})
norm = normalize_result(raw)
return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
def ui_lookup_hash(h, ht, enrich, mitre):
raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre})
norm = normalize_result(raw)
return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
def ui_correlate_iocs(iocs):
lst = [x.strip() for x in iocs.splitlines() if x.strip()]
raw = call_task("correlate_iocs", {"iocs": lst})
norm = normalize_result(raw)
return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw
def ui_quickscan(target):
raw = call_task("quickscan", {"target": target})
norm = normalize_result(raw)
return norm["summary"], norm["markdown"], norm["json"], raw
# ---------------------------------------------------------------------
# MCP Bridge
# ---------------------------------------------------------------------
def ui_bridge(tool, args_json):
try:
payload = json.loads(args_json)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2), "", {}
raw = call_task(tool, payload)
norm = normalize_result(raw)
return norm["json"], norm["markdown"], raw
# ---------------------------------------------------------------------
# UI Layout
# ---------------------------------------------------------------------
def build_interface():
with gr.Blocks(title="Parrot OSINT MCP Console") as demo:
gr.Markdown("# 🦜 Parrot OSINT MCP Console\nMulti-mode Intelligence Workstation.")
osint_state = gr.State({})
# -------------------------
# OSINT Dashboard
# -------------------------
with gr.Tab("OSINT Dashboard"):
# IP Lookup
with gr.Tab("IP Lookup"):
ip = gr.Textbox(label="IP Address", placeholder="8.8.8.8")
enrich = gr.Checkbox(value=True, label="Enrich data")
mitre = gr.Checkbox(value=True, label="MITRE ATT&CK Mapping")
run = gr.Button("Run IP Lookup")
out_s = gr.Textbox(label="Summary")
out_md = gr.Markdown()
out_json = gr.Code(language="json")
out_mitre = gr.Code(language="json")
out_stix = gr.Code(language="json")
run.click(
ui_lookup_ip,
[ip, enrich, mitre],
[out_s, out_md, out_json, out_mitre, out_stix, osint_state]
)
# -------------------------
# MCP Bridge
# -------------------------
with gr.Tab("MCP Bridge"):
tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()), label="Tool")
args = gr.Code(language="json", label="Args JSON")
btn = gr.Button("Run Tool")
out_bridge_json = gr.Code(language="json")
out_bridge_md = gr.Markdown()
btn.click(
ui_bridge,
[tool, args],
[out_bridge_json, out_bridge_md, osint_state]
)
# -------------------------
# Analyst Copilot
# -------------------------
with gr.Tab("Analyst Copilot"):
gr.Markdown("### WhiteRabbit Neo + Cybertron Threat Intelligence Assistant")
system_prompt = gr.Textbox(
label="System Prompt",
value=(
"You are a threat intelligence analyst. "
"You classify TTPs, extract indicators, map MITRE ATT&CK, "
"and provide investigation guidance."
),
)
model_select = gr.Dropdown(
label="LLM Model",
choices=[
"berkeley-nest/WhiteRabbitNeo-8B",
"cybertronai/cybertron-1.1-1b",
"cybertronai/cybertron-1.1-7b",
"cybertronai/cybertron-1.1-32b"
],
value="berkeley-nest/WhiteRabbitNeo-8B",
)
gr.Markdown("### HuggingFace API Token (required for LLM inference)")
hf_token = gr.Textbox(
label="HF Token",
type="password",
placeholder="hf_xxx...",
)
chatbot = gr.ChatInterface(
respond,
type="messages",
additional_inputs=[
system_prompt,
model_select,
hf_token,
gr.Slider(0.1, 2.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
gr.Slider(32, 4096, value=512, step=32, label="Max Tokens"),
],
)
inject_btn = gr.Button("Inject Last OSINT Result into Copilot")
inject_btn.click(
inject_osint,
[chatbot._chatbot_state, osint_state],
[chatbot._chatbot_state],
)
return demo
# ---------------------------------------------------------------------
# MAIN ENTRY
# ---------------------------------------------------------------------
if __name__ == "__main__":
demo = build_interface()
demo.launch()