S-Dreamer commited on
Commit
7bec021
·
verified ·
1 Parent(s): eb43500

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +80 -40
app.py CHANGED
@@ -16,12 +16,15 @@ import gradio as gr
16
  from huggingface_hub import InferenceClient
17
 
18
  # ---------------------------------------------------------------------
19
- # Task Registry (auto-loads your MCP tasks)
20
  # ---------------------------------------------------------------------
21
 
22
  TASK_REGISTRY: Dict[str, Any] = {}
23
 
24
  def _register_tasks():
 
 
 
25
  def _try(name, module):
26
  try:
27
  m = __import__(f"tasks.{module}", fromlist=["*"])
@@ -29,7 +32,7 @@ def _register_tasks():
29
  if callable(fn):
30
  TASK_REGISTRY[name] = fn
31
  except Exception:
32
- pass
33
 
34
  _try("lookup_ip", "lookup_ip")
35
  _try("lookup_domain", "lookup_domain")
@@ -42,8 +45,9 @@ def _register_tasks():
42
 
43
  _register_tasks()
44
 
 
45
  # ---------------------------------------------------------------------
46
- # Core Task Execution
47
  # ---------------------------------------------------------------------
48
 
49
  def call_task(name: str, payload: Dict[str, Any]):
@@ -61,24 +65,28 @@ def call_task(name: str, payload: Dict[str, Any]):
61
 
62
 
63
  def normalize_result(res: Dict[str, Any]):
64
- """Formats UI fields cleanly."""
65
  pretty = json.dumps(res, indent=2, default=str)
66
  summary = res.get("summary", "")
67
  markdown = res.get("markdown") or res.get("report") or ""
 
68
  if not markdown and summary:
69
  markdown = f"## Summary\n\n{summary}"
70
 
 
 
71
  return {
72
  "summary": summary,
73
  "markdown": markdown,
74
  "json": pretty,
75
- "mitre": json.dumps(res.get("mitre", ""), indent=2, default=str) if res.get("mitre") else "",
76
- "stix": json.dumps(res.get("stix", ""), indent=2, default=str) if res.get("stix") else "",
77
- "sarif": json.dumps(res.get("sarif", ""), indent=2, default=str) if res.get("sarif") else "",
78
  }
79
 
 
80
  # ---------------------------------------------------------------------
81
- # ANALYST COPILOT (LLM)
82
  # ---------------------------------------------------------------------
83
 
84
  def respond(
@@ -91,14 +99,21 @@ def respond(
91
  top_p,
92
  max_tokens,
93
  ):
94
- """Streaming response from WhiteRabbit Neo or Cybertron."""
95
- client = InferenceClient(model=model_name, token=hf_token.token)
 
 
 
 
 
 
96
 
97
  msgs = [{"role": "system", "content": system_prompt}]
98
  msgs.extend(history)
99
  msgs.append({"role": "user", "content": message})
100
 
101
- buf = ""
 
102
  for chunk in client.chat_completion(
103
  messages=msgs,
104
  max_tokens=max_tokens,
@@ -108,12 +123,12 @@ def respond(
108
  ):
109
  delta = chunk.choices[0].delta.content
110
  if delta:
111
- buf += delta
112
- yield buf
113
 
114
 
115
  def inject_osint(history, osint_obj):
116
- """Inject raw JSON results into the chat context."""
117
  pretty = json.dumps(osint_obj, indent=2, default=str)
118
  history.append({
119
  "role": "system",
@@ -121,6 +136,7 @@ def inject_osint(history, osint_obj):
121
  })
122
  return history
123
 
 
124
  # ---------------------------------------------------------------------
125
  # OSINT Dashboard Callbacks
126
  # ---------------------------------------------------------------------
@@ -130,27 +146,32 @@ def ui_lookup_ip(ip, enrich, mitre):
130
  norm = normalize_result(raw)
131
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
132
 
 
133
  def ui_lookup_domain(domain, enrich, mitre):
134
  raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre})
135
  norm = normalize_result(raw)
136
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
137
 
 
138
  def ui_lookup_hash(h, ht, enrich, mitre):
139
  raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre})
140
  norm = normalize_result(raw)
141
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
142
 
 
143
  def ui_correlate_iocs(iocs):
144
  lst = [x.strip() for x in iocs.splitlines() if x.strip()]
145
  raw = call_task("correlate_iocs", {"iocs": lst})
146
  norm = normalize_result(raw)
147
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw
148
 
 
149
  def ui_quickscan(target):
150
  raw = call_task("quickscan", {"target": target})
151
  norm = normalize_result(raw)
152
  return norm["summary"], norm["markdown"], norm["json"], raw
153
 
 
154
  # ---------------------------------------------------------------------
155
  # MCP Bridge
156
  # ---------------------------------------------------------------------
@@ -160,17 +181,19 @@ def ui_bridge(tool, args_json):
160
  payload = json.loads(args_json)
161
  except Exception as e:
162
  return json.dumps({"error": str(e)}, indent=2), "", {}
 
163
  raw = call_task(tool, payload)
164
  norm = normalize_result(raw)
165
  return norm["json"], norm["markdown"], raw
166
 
 
167
  # ---------------------------------------------------------------------
168
- # BUILD UI
169
  # ---------------------------------------------------------------------
170
 
171
  def build_interface():
172
  with gr.Blocks(title="Parrot OSINT MCP Console") as demo:
173
- gr.Markdown("# Parrot OSINT MCP Console\nA multi-mode intelligence workstation.")
174
 
175
  osint_state = gr.State({})
176
 
@@ -178,43 +201,55 @@ def build_interface():
178
  # OSINT Dashboard
179
  # -------------------------
180
  with gr.Tab("OSINT Dashboard"):
181
- with gr.Tab("IP"):
182
- ip = gr.Textbox(label="IP Address")
183
- enrich = gr.Checkbox(value=True, label="Enrich")
184
- mitre = gr.Checkbox(value=True, label="MITRE Map")
 
 
185
  run = gr.Button("Run IP Lookup")
186
 
187
- s = gr.Textbox(label="Summary")
188
- md = gr.Markdown()
189
- js = gr.Code(language="json")
190
- mt = gr.Code(language="json")
191
- st = gr.Code(language="json")
192
 
193
- run.click(ui_lookup_ip, [ip, enrich, mitre], [s, md, js, mt, st, osint_state])
 
 
 
 
194
 
195
  # -------------------------
196
  # MCP Bridge
197
  # -------------------------
198
  with gr.Tab("MCP Bridge"):
199
- tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()))
200
- args = gr.Code(language="json")
201
- btn = gr.Button("Call Tool")
202
- out_js = gr.Code(language="json")
203
- out_md = gr.Markdown()
204
 
205
- btn.click(ui_bridge, [tool, args], [out_js, out_md, osint_state])
 
 
 
 
 
 
 
206
 
207
  # -------------------------
208
  # Analyst Copilot
209
  # -------------------------
210
  with gr.Tab("Analyst Copilot"):
211
- gr.Markdown("### WhiteRabbit Neo + Cybertron TI Assistant")
212
 
213
  system_prompt = gr.Textbox(
214
  label="System Prompt",
215
  value=(
216
  "You are a threat intelligence analyst. "
217
- "You classify TTPs, map MITRE ATT&CK, and provide investigation guidance."
 
218
  ),
219
  )
220
 
@@ -229,8 +264,12 @@ def build_interface():
229
  value="berkeley-nest/WhiteRabbitNeo-8B",
230
  )
231
 
232
- gr.Markdown("### Provide your HuggingFace API Token for the LLM:")
233
- hf_token = gr.OAuthToken()
 
 
 
 
234
 
235
  chatbot = gr.ChatInterface(
236
  respond,
@@ -245,17 +284,18 @@ def build_interface():
245
  ],
246
  )
247
 
248
- inject_btn = gr.Button("Inject Last OSINT Result into Chat")
249
  inject_btn.click(
250
  inject_osint,
251
- inputs=[chatbot._chatbot_state, osint_state],
252
- outputs=[chatbot._chatbot_state],
253
  )
254
 
255
  return demo
256
 
 
257
  # ---------------------------------------------------------------------
258
- # MAIN
259
  # ---------------------------------------------------------------------
260
 
261
  if __name__ == "__main__":
 
16
  from huggingface_hub import InferenceClient
17
 
18
  # ---------------------------------------------------------------------
19
+ # Task Registry (auto-loads MCP tasks dynamically)
20
  # ---------------------------------------------------------------------
21
 
22
  TASK_REGISTRY: Dict[str, Any] = {}
23
 
24
  def _register_tasks():
25
+ """
26
+ Import tasks.* modules dynamically and pull their run() functions.
27
+ """
28
  def _try(name, module):
29
  try:
30
  m = __import__(f"tasks.{module}", fromlist=["*"])
 
32
  if callable(fn):
33
  TASK_REGISTRY[name] = fn
34
  except Exception:
35
+ pass # In HF Spaces, missing tasks won't break startup.
36
 
37
  _try("lookup_ip", "lookup_ip")
38
  _try("lookup_domain", "lookup_domain")
 
45
 
46
  _register_tasks()
47
 
48
+
49
  # ---------------------------------------------------------------------
50
+ # Task Execution + Normalization
51
  # ---------------------------------------------------------------------
52
 
53
  def call_task(name: str, payload: Dict[str, Any]):
 
65
 
66
 
67
  def normalize_result(res: Dict[str, Any]):
68
+ """Ensures consistent UI formatting."""
69
  pretty = json.dumps(res, indent=2, default=str)
70
  summary = res.get("summary", "")
71
  markdown = res.get("markdown") or res.get("report") or ""
72
+
73
  if not markdown and summary:
74
  markdown = f"## Summary\n\n{summary}"
75
 
76
+ safe_json = lambda v: json.dumps(v, indent=2, default=str) if v else ""
77
+
78
  return {
79
  "summary": summary,
80
  "markdown": markdown,
81
  "json": pretty,
82
+ "mitre": safe_json(res.get("mitre")),
83
+ "stix": safe_json(res.get("stix")),
84
+ "sarif": safe_json(res.get("sarif")),
85
  }
86
 
87
+
88
  # ---------------------------------------------------------------------
89
+ # Analyst Copilot LLM
90
  # ---------------------------------------------------------------------
91
 
92
  def respond(
 
99
  top_p,
100
  max_tokens,
101
  ):
102
+ """
103
+ Streaming LLM output using WhiteRabbit Neo or Cybertron.
104
+ hf_token is a raw string entered by user.
105
+ """
106
+ client = InferenceClient(
107
+ model=model_name,
108
+ token=hf_token, # Direct string, no OAuthToken object
109
+ )
110
 
111
  msgs = [{"role": "system", "content": system_prompt}]
112
  msgs.extend(history)
113
  msgs.append({"role": "user", "content": message})
114
 
115
+ buffer = ""
116
+
117
  for chunk in client.chat_completion(
118
  messages=msgs,
119
  max_tokens=max_tokens,
 
123
  ):
124
  delta = chunk.choices[0].delta.content
125
  if delta:
126
+ buffer += delta
127
+ yield buffer
128
 
129
 
130
  def inject_osint(history, osint_obj):
131
+ """Inject OSINT result JSON into copilot context."""
132
  pretty = json.dumps(osint_obj, indent=2, default=str)
133
  history.append({
134
  "role": "system",
 
136
  })
137
  return history
138
 
139
+
140
  # ---------------------------------------------------------------------
141
  # OSINT Dashboard Callbacks
142
  # ---------------------------------------------------------------------
 
146
  norm = normalize_result(raw)
147
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
148
 
149
+
150
  def ui_lookup_domain(domain, enrich, mitre):
151
  raw = call_task("lookup_domain", {"domain": domain, "enrich": enrich, "map_mitre": mitre})
152
  norm = normalize_result(raw)
153
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
154
 
155
+
156
  def ui_lookup_hash(h, ht, enrich, mitre):
157
  raw = call_task("lookup_hash", {"hash": h, "hash_type": ht, "enrich": enrich, "map_mitre": mitre})
158
  norm = normalize_result(raw)
159
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], norm["stix"], raw
160
 
161
+
162
  def ui_correlate_iocs(iocs):
163
  lst = [x.strip() for x in iocs.splitlines() if x.strip()]
164
  raw = call_task("correlate_iocs", {"iocs": lst})
165
  norm = normalize_result(raw)
166
  return norm["summary"], norm["markdown"], norm["json"], norm["mitre"], raw
167
 
168
+
169
  def ui_quickscan(target):
170
  raw = call_task("quickscan", {"target": target})
171
  norm = normalize_result(raw)
172
  return norm["summary"], norm["markdown"], norm["json"], raw
173
 
174
+
175
  # ---------------------------------------------------------------------
176
  # MCP Bridge
177
  # ---------------------------------------------------------------------
 
181
  payload = json.loads(args_json)
182
  except Exception as e:
183
  return json.dumps({"error": str(e)}, indent=2), "", {}
184
+
185
  raw = call_task(tool, payload)
186
  norm = normalize_result(raw)
187
  return norm["json"], norm["markdown"], raw
188
 
189
+
190
  # ---------------------------------------------------------------------
191
+ # UI Layout
192
  # ---------------------------------------------------------------------
193
 
194
  def build_interface():
195
  with gr.Blocks(title="Parrot OSINT MCP Console") as demo:
196
+ gr.Markdown("# 🦜 Parrot OSINT MCP Console\nMulti-mode Intelligence Workstation.")
197
 
198
  osint_state = gr.State({})
199
 
 
201
  # OSINT Dashboard
202
  # -------------------------
203
  with gr.Tab("OSINT Dashboard"):
204
+
205
+ # IP Lookup
206
+ with gr.Tab("IP Lookup"):
207
+ ip = gr.Textbox(label="IP Address", placeholder="8.8.8.8")
208
+ enrich = gr.Checkbox(value=True, label="Enrich data")
209
+ mitre = gr.Checkbox(value=True, label="MITRE ATT&CK Mapping")
210
  run = gr.Button("Run IP Lookup")
211
 
212
+ out_s = gr.Textbox(label="Summary")
213
+ out_md = gr.Markdown()
214
+ out_json = gr.Code(language="json")
215
+ out_mitre = gr.Code(language="json")
216
+ out_stix = gr.Code(language="json")
217
 
218
+ run.click(
219
+ ui_lookup_ip,
220
+ [ip, enrich, mitre],
221
+ [out_s, out_md, out_json, out_mitre, out_stix, osint_state]
222
+ )
223
 
224
  # -------------------------
225
  # MCP Bridge
226
  # -------------------------
227
  with gr.Tab("MCP Bridge"):
228
+ tool = gr.Dropdown(sorted(TASK_REGISTRY.keys()), label="Tool")
229
+ args = gr.Code(language="json", label="Args JSON")
230
+ btn = gr.Button("Run Tool")
 
 
231
 
232
+ out_bridge_json = gr.Code(language="json")
233
+ out_bridge_md = gr.Markdown()
234
+
235
+ btn.click(
236
+ ui_bridge,
237
+ [tool, args],
238
+ [out_bridge_json, out_bridge_md, osint_state]
239
+ )
240
 
241
  # -------------------------
242
  # Analyst Copilot
243
  # -------------------------
244
  with gr.Tab("Analyst Copilot"):
245
+ gr.Markdown("### WhiteRabbit Neo + Cybertron Threat Intelligence Assistant")
246
 
247
  system_prompt = gr.Textbox(
248
  label="System Prompt",
249
  value=(
250
  "You are a threat intelligence analyst. "
251
+ "You classify TTPs, extract indicators, map MITRE ATT&CK, "
252
+ "and provide investigation guidance."
253
  ),
254
  )
255
 
 
264
  value="berkeley-nest/WhiteRabbitNeo-8B",
265
  )
266
 
267
+ gr.Markdown("### HuggingFace API Token (required for LLM inference)")
268
+ hf_token = gr.Textbox(
269
+ label="HF Token",
270
+ type="password",
271
+ placeholder="hf_xxx...",
272
+ )
273
 
274
  chatbot = gr.ChatInterface(
275
  respond,
 
284
  ],
285
  )
286
 
287
+ inject_btn = gr.Button("Inject Last OSINT Result into Copilot")
288
  inject_btn.click(
289
  inject_osint,
290
+ [chatbot._chatbot_state, osint_state],
291
+ [chatbot._chatbot_state],
292
  )
293
 
294
  return demo
295
 
296
+
297
  # ---------------------------------------------------------------------
298
+ # MAIN ENTRY
299
  # ---------------------------------------------------------------------
300
 
301
  if __name__ == "__main__":