abhi02072005's picture
final
82f84b6
import pandas as pd
import streamlit as st
import cv2
import numpy as np
import mediapipe as mp
from pathlib import Path
from scipy.signal import find_peaks, savgol_filter
import json
import subprocess
import os
import soundfile as sf
from datetime import datetime
import tempfile
from ultralytics import YOLO
from agent import process_video_for_footstep_audio
from sound_agent import main_sound
from qsec import extract_second_audio_librosa
import threading
import queue
import time
from PIL import Image
import io
from agent2 import process_video_for_footstep_audio
# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import absl.logging
absl.logging.set_verbosity(absl.logging.ERROR)
def get_ffmpeg_path():
"""Get FFmpeg path with multiple fallback options"""
possible_paths = [
"ffmpeg", # Try system ffmpeg first (Docker/Linux)
r"C:\Users\abhiv\OneDrive\Desktop\agentic ai\SoundFeet\ffmpeg-7.1-essentials_build\bin\ffmpeg.exe", # Local Windows path
"./ffmpeg-7.1-essentials_build/bin/ffmpeg.exe", # Relative path
]
for path in possible_paths:
if path == "ffmpeg":
try:
result = subprocess.run([path, '-version'], capture_output=True, timeout=5)
if result.returncode == 0:
return path
except:
continue
else:
if os.path.exists(path):
return path
return "ffmpeg" # Default to system ffmpeg
FFMPEG_PATH = get_ffmpeg_path()
# Streamlit Configuration
st.set_page_config(
page_title="Hybrid YOLO-MediaPipe Footstep Detection",
page_icon="🎬",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
<style>
.main-header {
font-size: 2.5rem;
font-weight: 700;
background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin-bottom: 2rem;
}
.metric-card {
background: #f0f2f6;
padding: 1rem;
border-radius: 0.5rem;
border-left: 4px solid #667eea;
}
.success-box {
padding: 1rem;
background: #d4edda;
border: 1px solid #c3e6cb;
border-radius: 0.5rem;
color: #155724;
}
.hybrid-badge {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 0.5rem 1rem;
border-radius: 20px;
display: inline-block;
font-weight: 600;
margin: 1rem 0;
}
.live-indicator {
background: #dc3545;
color: white;
padding: 0.5rem 1rem;
border-radius: 20px;
display: inline-block;
font-weight: 600;
animation: pulse 1.5s infinite;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
.ready-badge {
background: #28a745;
color: white;
padding: 0.5rem 1rem;
border-radius: 20px;
display: inline-block;
font-weight: 600;
}
</style>
""", unsafe_allow_html=True)
class LiveFootstepDetector:
"""Real-time footstep detection for live camera feed"""
def __init__(self, audio_path, sensitivity='medium', yolo_conf=0.5):
self.audio_path = audio_path
self.sensitivity = sensitivity
self.yolo_conf = yolo_conf
self.running = False
self.audio_ready = False
# Load footstep audio
try:
self.footstep_audio, self.sample_rate = extract_second_audio_librosa(
file_path=audio_path,
target_second=0,
sample_rate=44100
)
self.audio_ready = True
except Exception as e:
st.error(f"Failed to load audio: {str(e)}")
self.audio_ready = False
# Initialize detection models
try:
self.yolo_model = YOLO('yolov8n.pt')
self.mp_pose = mp.solutions.pose
self.pose = self.mp_pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
except Exception as e:
st.error(f"Failed to initialize models: {str(e)}")
return
# Landmark indices
self.LEFT_HEEL = 29
self.RIGHT_HEEL = 30
# Detection thresholds
self.thresholds = {
'low': {'prominence': 0.02, 'velocity_threshold': 0.015},
'medium': {'prominence': 0.015, 'velocity_threshold': 0.012},
'high': {'prominence': 0.01, 'velocity_threshold': 0.010}
}[sensitivity]
# Tracking state
self.prev_left_y = None
self.prev_right_y = None
self.prev_time = None
self.left_buffer = []
self.right_buffer = []
self.buffer_size = 10
# Audio playback
self.audio_queue = queue.Queue()
self.audio_thread = None
def start_audio_playback(self):
"""Start audio playback thread"""
if not self.audio_ready:
return
def play_audio():
import pyaudio
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paFloat32,
channels=1,
rate=self.sample_rate,
output=True
)
while self.running:
try:
foot = self.audio_queue.get(timeout=0.1)
# Play footstep sound
stream.write(self.footstep_audio.astype(np.float32).tobytes())
except queue.Empty:
continue
except Exception as e:
print(f"Audio playback error: {e}")
stream.stop_stream()
stream.close()
p.terminate()
self.audio_thread = threading.Thread(target=play_audio, daemon=True)
self.audio_thread.start()
def detect_heel_strike(self, current_y, prev_y, foot_buffer):
"""Detect heel strike based on vertical velocity and position"""
if prev_y is None:
return False
# Calculate vertical velocity (downward is positive)
velocity = current_y - prev_y
# Add to buffer
foot_buffer.append(current_y)
if len(foot_buffer) > self.buffer_size:
foot_buffer.pop(0)
if len(foot_buffer) < 5:
return False
# Detect strike: downward movement followed by stabilization
# Current position is low (heel on ground)
# Recent movement was downward
# Velocity is slowing (strike impact)
recent_velocities = [foot_buffer[i + 1] - foot_buffer[i]
for i in range(len(foot_buffer) - 1)]
avg_velocity = np.mean(recent_velocities[-3:]) if len(recent_velocities) >= 3 else 0
is_strike = (
current_y > 0.7 and # Heel is low in frame
velocity > self.thresholds['velocity_threshold'] and # Moving down
avg_velocity < velocity * 0.5 # Velocity decreasing (impact)
)
return is_strike
def process_frame(self, frame):
"""Process single frame and detect footsteps"""
if not self.audio_ready:
return frame, None
detected_foot = None
try:
# YOLO detection
results = self.yolo_model(frame, conf=self.yolo_conf, classes=[0], verbose=False)
person_detected = False
bbox = None
for result in results:
boxes = result.boxes
if len(boxes) > 0:
person_detected = True
box = boxes[0] # Take first person
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
bbox = (int(x1), int(y1), int(x2), int(y2))
# Draw YOLO bbox
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]),
(255, 255, 0), 2)
break
# MediaPipe pose estimation
if person_detected and bbox:
# Crop to person region with padding
x1, y1, x2, y2 = bbox
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
cropped = frame[y1:y2, x1:x2]
rgb_frame = cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)
pose_results = self.pose.process(rgb_frame)
if pose_results.pose_landmarks:
landmarks = pose_results.pose_landmarks.landmark
# Get heel positions (adjusted to full frame)
left_heel = landmarks[self.LEFT_HEEL]
right_heel = landmarks[self.RIGHT_HEEL]
left_y = (left_heel.y * (y2 - y1) + y1) / frame.shape[0]
right_y = (right_heel.y * (y2 - y1) + y1) / frame.shape[0]
# Detect strikes
left_strike = self.detect_heel_strike(
left_y, self.prev_left_y, self.left_buffer
)
right_strike = self.detect_heel_strike(
right_y, self.prev_right_y, self.right_buffer
)
if left_strike:
detected_foot = 'LEFT'
self.audio_queue.put('LEFT')
elif right_strike:
detected_foot = 'RIGHT'
self.audio_queue.put('RIGHT')
# Update previous positions
self.prev_left_y = left_y
self.prev_right_y = right_y
# Draw skeleton on full frame
for landmark in landmarks:
x = int((landmark.x * (x2 - x1) + x1))
y = int((landmark.y * (y2 - y1) + y1))
cv2.circle(frame, (x, y), 3, (0, 255, 0), -1)
# Highlight heels
left_heel_x = int((left_heel.x * (x2 - x1) + x1))
left_heel_y = int((left_heel.y * (y2 - y1) + y1))
right_heel_x = int((right_heel.x * (x2 - x1) + x1))
right_heel_y = int((right_heel.y * (y2 - y1) + y1))
cv2.circle(frame, (left_heel_x, left_heel_y), 8, (0, 255, 0), -1)
cv2.circle(frame, (right_heel_x, right_heel_y), 8, (0, 100, 255), -1)
if detected_foot:
# Show strike indicator
heel_x = left_heel_x if detected_foot == 'LEFT' else right_heel_x
heel_y = left_heel_y if detected_foot == 'LEFT' else right_heel_y
color = (0, 255, 0) if detected_foot == 'LEFT' else (0, 100, 255)
cv2.circle(frame, (heel_x, heel_y), 30, color, 3)
cv2.putText(frame, f"{detected_foot} STRIKE!",
(heel_x - 50, heel_y - 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# Draw status
status_text = "READY" if self.audio_ready else "NO AUDIO"
status_color = (0, 255, 0) if self.audio_ready else (0, 0, 255)
cv2.rectangle(frame, (10, 10), (150, 50), (0, 0, 0), -1)
cv2.putText(frame, status_text, (20, 35),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, status_color, 2)
except Exception as e:
print(f"Frame processing error: {e}")
return frame, detected_foot
def start(self):
"""Start the detector"""
self.running = True
self.start_audio_playback()
def stop(self):
"""Stop the detector"""
self.running = False
if self.audio_thread:
self.audio_thread.join(timeout=2)
class HybridFootstepDetectionPipeline:
"""
Hybrid Detection Pipeline for video files:
1. YOLO detects person bounding boxes
2. MediaPipe estimates pose on detected regions
3. Track footsteps with improved accuracy
"""
def __init__(self, fps=30, sensitivity='medium', yolo_conf=0.5):
self.fps = fps
self.sensitivity = sensitivity
self.yolo_conf = yolo_conf
# Initialize YOLO detector
try:
self.yolo_model = YOLO('yolov8n.pt')
st.success("βœ… YOLO detector loaded successfully")
except Exception as e:
st.warning(f"⚠️ YOLO loading issue: {str(e)}. Downloading model...")
try:
self.yolo_model = YOLO('yolov8n.pt')
st.success("βœ… YOLO detector loaded successfully")
except Exception as e2:
st.error(f"❌ Failed to load YOLO: {str(e2)}")
self.yolo_model = None
# Initialize MediaPipe pose estimator
try:
self.mp_pose = mp.solutions.pose
self.pose = self.mp_pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
st.success("βœ… MediaPipe pose estimator loaded successfully")
except Exception as e:
st.error(f"❌ Failed to initialize MediaPipe: {str(e)}")
self.pose = None
# Landmark indices
self.LEFT_HEEL = 29
self.RIGHT_HEEL = 30
self.LEFT_ANKLE = 27
self.RIGHT_ANKLE = 28
# Detection thresholds
self.thresholds = {
'low': {'prominence': 0.02, 'min_interval': 0.4},
'medium': {'prominence': 0.015, 'min_interval': 0.3},
'high': {'prominence': 0.01, 'min_interval': 0.25}
}[sensitivity]
# Tracking state
self.person_tracker = PersonTracker()
def detect_person_yolo(self, frame):
"""Detect person using YOLO"""
if self.yolo_model is None:
return []
try:
results = self.yolo_model(frame, conf=self.yolo_conf, classes=[0], verbose=False)
person_boxes = []
for result in results:
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = box.conf[0].cpu().numpy()
person_boxes.append((int(x1), int(y1), int(x2), int(y2), float(conf)))
return person_boxes
except Exception as e:
st.warning(f"YOLO detection failed: {str(e)}")
return []
def estimate_pose_mediapipe(self, frame, bbox=None):
"""Estimate pose using MediaPipe on specified region"""
if self.pose is None:
return None
try:
if bbox is not None:
x1, y1, x2, y2 = bbox
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
cropped = frame[y1:y2, x1:x2]
if cropped.size == 0:
return None
rgb_frame = cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)
results = self.pose.process(rgb_frame)
if results.pose_landmarks:
for landmark in results.pose_landmarks.landmark:
landmark.x = (landmark.x * (x2 - x1) + x1) / frame.shape[1]
landmark.y = (landmark.y * (y2 - y1) + y1) / frame.shape[0]
return results
else:
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return self.pose.process(rgb_frame)
except Exception as e:
return None
def process_video(self, video_path, progress_callback=None):
"""Process video with hybrid YOLO-MediaPipe pipeline"""
if self.yolo_model is None or self.pose is None:
st.error("❌ Detection models not available")
return None
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
st.error("❌ Could not open video file")
return None
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if fps <= 0 or total_frames <= 0:
st.error("❌ Invalid video properties")
cap.release()
return None
left_positions = []
right_positions = []
detection_confidence = []
frame_idx = 0
yolo_detections = 0
pose_detections = 0
st.info(f"πŸ”„ Processing with Hybrid Pipeline: {total_frames} frames")
try:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
person_boxes = self.detect_person_yolo(frame)
if person_boxes:
yolo_detections += 1
best_box = self.person_tracker.select_best_person(person_boxes, frame_idx)
bbox = best_box[:4]
results = self.estimate_pose_mediapipe(frame, bbox)
if results and results.pose_landmarks:
pose_detections += 1
landmarks = results.pose_landmarks.landmark
left_y = landmarks[self.LEFT_HEEL].y
right_y = landmarks[self.RIGHT_HEEL].y
conf = (landmarks[self.LEFT_HEEL].visibility +
landmarks[self.RIGHT_HEEL].visibility) / 2
left_positions.append(left_y)
right_positions.append(right_y)
detection_confidence.append(conf)
else:
left_positions.append(np.nan)
right_positions.append(np.nan)
detection_confidence.append(0.0)
else:
results = self.estimate_pose_mediapipe(frame, bbox=None)
if results and results.pose_landmarks:
pose_detections += 1
landmarks = results.pose_landmarks.landmark
left_positions.append(landmarks[self.LEFT_HEEL].y)
right_positions.append(landmarks[self.RIGHT_HEEL].y)
detection_confidence.append(0.5)
else:
left_positions.append(np.nan)
right_positions.append(np.nan)
detection_confidence.append(0.0)
frame_idx += 1
if progress_callback and frame_idx % 10 == 0:
progress = min(frame_idx / total_frames, 1.0)
progress_callback(progress)
except Exception as e:
st.error(f"❌ Video processing error: {str(e)}")
cap.release()
return None
cap.release()
st.info(
f"πŸ“Š YOLO detections: {yolo_detections}/{total_frames} frames ({yolo_detections / total_frames * 100:.1f}%)")
st.info(
f"πŸ“Š Pose detections: {pose_detections}/{total_frames} frames ({pose_detections / total_frames * 100:.1f}%)")
if len(left_positions) == 0:
st.error("❌ No frames processed successfully")
return None
try:
left_series = pd.Series(left_positions).interpolate(method='linear')
left_series = left_series.bfill().ffill()
left_positions = left_series.values
right_series = pd.Series(right_positions).interpolate(method='linear')
right_series = right_series.bfill().ffill()
right_positions = right_series.values
if len(left_positions) > 5:
window = min(11, len(left_positions) if len(left_positions) % 2 == 1 else len(left_positions) - 1)
if window >= 3:
left_positions = savgol_filter(left_positions, window, 2)
right_positions = savgol_filter(right_positions, window, 2)
left_strikes = self._detect_strikes(left_positions, fps)
right_strikes = self._detect_strikes(right_positions, fps)
events = []
for frame in left_strikes:
events.append({
'frame': int(frame),
'timecode': self._frames_to_smpte(frame, fps),
'foot': 'LEFT',
'event': 'HEEL_STRIKE',
'time_seconds': frame / fps,
'confidence': detection_confidence[int(frame)] if int(frame) < len(detection_confidence) else 0.5
})
for frame in right_strikes:
events.append({
'frame': int(frame),
'timecode': self._frames_to_smpte(frame, fps),
'foot': 'RIGHT',
'event': 'HEEL_STRIKE',
'time_seconds': frame / fps,
'confidence': detection_confidence[int(frame)] if int(frame) < len(detection_confidence) else 0.5
})
events = sorted(events, key=lambda x: x['frame'])
return {
'events': events,
'fps': fps,
'total_frames': total_frames,
'width': width,
'height': height,
'left_positions': left_positions.tolist() if hasattr(left_positions, 'tolist') else left_positions,
'right_positions': right_positions.tolist() if hasattr(right_positions, 'tolist') else right_positions,
'detection_stats': {
'yolo_detections': yolo_detections,
'pose_detections': pose_detections,
'total_frames': total_frames
}
}
except Exception as e:
st.error(f"❌ Data processing error: {str(e)}")
return None
def _detect_strikes(self, positions, fps):
"""Detect heel strikes from position data"""
try:
peaks, _ = find_peaks(
positions,
prominence=self.thresholds['prominence'],
distance=int(fps * self.thresholds['min_interval']),
height=0.7
)
return peaks
except Exception as e:
st.warning(f"Peak detection failed: {str(e)}")
return np.array([])
def _frames_to_smpte(self, frame, fps):
"""Convert frame number to SMPTE timecode"""
total_seconds = frame / fps
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
frames = int((total_seconds * fps) % fps)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}:{frames:02d}"
class PersonTracker:
"""Track person across frames for consistency"""
def __init__(self, iou_threshold=0.3):
self.tracked_box = None
self.last_frame = -1
self.iou_threshold = iou_threshold
def calculate_iou(self, box1, box2):
"""Calculate IoU between two bounding boxes"""
x1_1, y1_1, x2_1, y2_1 = box1[:4]
x1_2, y1_2, x2_2, y2_2 = box2[:4]
xi1 = max(x1_1, x1_2)
yi1 = max(y1_1, y1_2)
xi2 = min(x2_1, x2_2)
yi2 = min(y2_1, y2_2)
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
box1_area = (x2_1 - x1_1) * (y2_1 - y1_1)
box2_area = (x2_2 - x1_2) * (y2_2 - y1_2)
union_area = box1_area + box2_area - inter_area
return inter_area / union_area if union_area > 0 else 0
def select_best_person(self, person_boxes, frame_idx):
"""Select best person box for tracking consistency"""
if not person_boxes:
return None
if self.tracked_box is not None and frame_idx - self.last_frame < 10:
max_iou = 0
best_box = None
for box in person_boxes:
iou = self.calculate_iou(self.tracked_box, box)
if iou > max_iou:
max_iou = iou
best_box = box
if max_iou > self.iou_threshold:
self.tracked_box = best_box
self.last_frame = frame_idx
return best_box
best_box = max(person_boxes, key=lambda x: (x[2] - x[0]) * (x[3] - x[1]) * x[4])
self.tracked_box = best_box
self.last_frame = frame_idx
return best_box
class AudioGenerator:
"""Generate footstep audio"""
def __init__(self, sample_rate=44100):
self.sample_rate = sample_rate
def generate_footstep(self, aud_path):
arr, rate = extract_second_audio_librosa(
file_path=aud_path,
target_second=0,
sample_rate=self.sample_rate
)
# If extraction failed, create a fallback footstep sound
if arr is None:
print(f"[WARNING] Failed to extract audio from {aud_path}, generating fallback sound")
# Create a simple footstep-like sound (dampened click)
t = np.linspace(0, 0.2, int(0.2 * self.sample_rate))
arr = np.sin(2 * np.pi * 100 * t) * np.exp(-10 * t)
arr = arr.astype(np.float32)
return arr
def create_audio_track(self, events, aud_path, duration=0.3):
total_samples = int(duration * self.sample_rate)
audio_track = np.zeros(total_samples, dtype=np.float32)
for i, event in enumerate(events):
step_sound = self.generate_footstep(aud_path)
# Safety check: ensure step_sound is valid
if step_sound is None or len(step_sound) == 0:
print(f"[WARNING] Invalid step_sound for event {i}, skipping")
continue
pitch_shift = 1.0 + (i % 5 - 2) * 0.03
indices = np.arange(len(step_sound)) * pitch_shift
indices = np.clip(indices, 0, len(step_sound) - 1).astype(int)
step_sound = step_sound[indices]
start_sample = int(event['time_seconds'] * self.sample_rate)
end_sample = min(start_sample + len(step_sound), total_samples)
sound_len = end_sample - start_sample
if sound_len > 0:
audio_track[start_sample:end_sample] += step_sound[:sound_len]
max_val = np.max(np.abs(audio_track))
if max_val > 0:
audio_track = audio_track / max_val * 0.8
return audio_track
def create_annotated_video(input_path, events, output_path, use_hybrid=True, progress_callback=None):
"""Create annotated video with hybrid detection visualization"""
try:
cap = cv2.VideoCapture(str(input_path))
if not cap.isOpened():
st.error("❌ Could not open input video file")
return False
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
if not out.isOpened():
st.error("❌ Could not create output video file")
cap.release()
return False
event_frames = {e['frame']: e for e in events}
if use_hybrid:
yolo_model = YOLO('yolov8n.pt')
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
else:
yolo_model = None
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
static_image_mode=False,
model_complexity=1,
smooth_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
frame_idx = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
try:
if use_hybrid and yolo_model:
results = yolo_model(frame, conf=0.5, classes=[0], verbose=False)
for result in results:
boxes = result.boxes
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = box.conf[0].cpu().numpy()
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)),
(255, 255, 0), 2)
cv2.putText(frame, f'YOLO: {conf:.2f}',
(int(x1), int(y1) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if results.pose_landmarks:
mp.solutions.drawing_utils.draw_landmarks(
frame,
results.pose_landmarks,
mp_pose.POSE_CONNECTIONS,
landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
color=(0, 255, 0), thickness=2, circle_radius=2
),
connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(
color=(255, 255, 255), thickness=2
)
)
if frame_idx in event_frames:
event = event_frames[frame_idx]
banner_height = 100
cv2.rectangle(frame, (0, 0), (width, banner_height), (0, 0, 0), -1)
text = f"{event['foot']} HEEL STRIKE"
color = (0, 255, 0) if event['foot'] == 'LEFT' else (0, 100, 255)
cv2.putText(frame, text, (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, color, 3)
if 'confidence' in event:
conf_text = f"Conf: {event['confidence']:.2f}"
cv2.putText(frame, conf_text, (50, 85),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
circle_x = 50 if event['foot'] == 'LEFT' else width - 50
cv2.circle(frame, (circle_x, height - 100), 40, color, -1)
if use_hybrid:
cv2.rectangle(frame, (width - 250, 10), (width - 10, 50), (102, 126, 234), -1)
cv2.putText(frame, "HYBRID MODE", (width - 240, 35),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
time_seconds = frame_idx / fps
hours = int(time_seconds // 3600)
minutes = int((time_seconds % 3600) // 60)
seconds = int(time_seconds % 60)
frame_num = int((time_seconds * fps) % fps)
timecode = f"TC: {hours:02d}:{minutes:02d}:{seconds:02d}:{frame_num:02d}"
cv2.rectangle(frame, (0, height - 80), (400, height), (0, 0, 0), -1)
cv2.putText(frame, timecode, (10, height - 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(frame, f"Frame: {frame_idx}/{total_frames}", (10, height - 55),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
out.write(frame)
frame_idx += 1
if progress_callback and frame_idx % 5 == 0:
progress = min(frame_idx / total_frames, 1.0)
progress_callback(progress)
except Exception as e:
st.warning(f"⚠️ Error processing frame {frame_idx}: {str(e)}")
frame_idx += 1
continue
cap.release()
out.release()
pose.close()
return True
except Exception as e:
st.error(f"❌ Video annotation failed: {str(e)}")
try:
cap.release()
out.release()
pose.close()
except:
pass
return False
def merge_audio_with_video(video_path, audio_track, sample_rate, output_path):
"""Merge audio with video using FFmpeg"""
temp_audio = tempfile.mktemp(suffix='.wav')
sf.write(temp_audio, audio_track, sample_rate)
ffmpeg_cmd = FFMPEG_PATH if FFMPEG_PATH else "ffmpeg"
cmd = [
ffmpeg_cmd, '-y',
'-i', str(video_path),
'-i', temp_audio,
'-map', '0:v', '-map', '1:a',
'-c:v', 'libx264', '-preset', 'medium',
'-c:a', 'aac', '-b:a', '192k',
'-shortest',
str(output_path)
]
try:
if FFMPEG_PATH is None:
st.warning("FFmpeg not found. Using fallback method.")
return None
result = subprocess.run(cmd, check=True, capture_output=True, text=True, timeout=30)
return True
except subprocess.CalledProcessError as e:
st.error(f"FFmpeg error: {e.stderr}")
return False
except subprocess.TimeoutExpired:
st.error("FFmpeg timed out")
return False
finally:
if os.path.exists(temp_audio):
os.remove(temp_audio)
def live_streaming_mode():
"""Live streaming mode with frame capture and real-time detection"""
st.markdown('<h2>πŸ“Ή Live Streaming Mode</h2>', unsafe_allow_html=True)
st.info("πŸŽ₯ This mode allows real-time footstep detection with your device camera")
# Initialize session state
if 'floor_frame_captured' not in st.session_state:
st.session_state.floor_frame_captured = False
if 'audio_downloaded' not in st.session_state:
st.session_state.audio_downloaded = False
if 'live_audio_path' not in st.session_state:
st.session_state.live_audio_path = None
if 'live_detector' not in st.session_state:
st.session_state.live_detector = None
if 'camera_active' not in st.session_state:
st.session_state.camera_active = False
# Step 1: Capture floor frame
st.markdown("### Step 1: Capture Floor Frame πŸ“Έ")
st.write("Capture a single frame showing the floor surface for audio analysis")
col1, col2 = st.columns([2, 1])
with col1:
# Camera input for frame capture
camera_image = st.camera_input("Capture floor image", key="floor_capture")
if camera_image is not None and not st.session_state.floor_frame_captured:
# Save captured frame
image = Image.open(camera_image)
temp_frame_path = tempfile.mktemp(suffix='.jpg')
image.save(temp_frame_path)
st.session_state.floor_frame_path = temp_frame_path
# Display captured frame
st.image(image, caption="Captured Floor Frame", use_container_width=True)
if st.button("βœ… Confirm Floor Capture", type="primary", use_container_width=True):
st.session_state.floor_frame_captured = True
st.success("βœ… Floor frame captured successfully!")
st.rerun()
with col2:
if st.session_state.floor_frame_captured:
st.markdown('<div class="success-box">βœ… Floor Captured</div>', unsafe_allow_html=True)
else:
st.info("πŸ“Έ Capture floor frame to proceed")
# Step 2: Analyze and download audio
if st.session_state.floor_frame_captured and not st.session_state.audio_downloaded:
st.markdown("---")
st.markdown("### Step 2: Analyze Floor & Download Audio πŸ”Š")
col1, col2 = st.columns([2, 1])
with col1:
if st.button("πŸ” Analyze Floor & Generate Audio", type="primary", use_container_width=True):
with st.spinner("πŸ”„ Analyzing floor surface and generating audio..."):
try:
# Create temporary video from frame for processing
temp_video = tempfile.mktemp(suffix='.mp4')
# Create 1-second video from the captured frame
img = cv2.imread(st.session_state.floor_frame_path)
height, width = img.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_video, fourcc, 30, (width, height))
# Write 30 frames (1 second at 30fps)
for _ in range(30):
out.write(img)
out.release()
# Process video for footstep audio
st.info("🎡 Generating footstep audio based on floor analysis...")
aud_path = process_video_for_footstep_audio(temp_video)
st.session_state.live_audio_path = aud_path
st.session_state.audio_downloaded = True
# Clean up temp video
if os.path.exists(temp_video):
os.remove(temp_video)
st.success("βœ… Audio generated successfully!")
st.balloons()
st.rerun()
except Exception as e:
st.error(f"❌ Error generating audio: {str(e)}")
with col2:
st.info("🎡 Audio will be generated based on floor type")
# Step 3: Initialize live detector
if st.session_state.audio_downloaded and st.session_state.live_detector is None:
st.markdown("---")
st.markdown("### Step 3: Initialize Live Detection πŸš€")
col1, col2 = st.columns([2, 1])
with col1:
sensitivity = st.select_slider(
"Detection Sensitivity",
options=['low', 'medium', 'high'],
value='medium'
)
yolo_conf = st.slider(
"YOLO Confidence",
min_value=0.1,
max_value=0.9,
value=0.5,
step=0.05
)
if st.button("🎬 Initialize Live Detector", type="primary", use_container_width=True):
with st.spinner("βš™οΈ Initializing detector..."):
try:
detector = LiveFootstepDetector(
audio_path=st.session_state.live_audio_path,
sensitivity=sensitivity,
yolo_conf=yolo_conf
)
st.session_state.live_detector = detector
st.success("βœ… Live detector initialized!")
st.rerun()
except Exception as e:
st.error(f"❌ Failed to initialize detector: {str(e)}")
with col2:
st.info("πŸ€– Configure detection parameters")
# Step 4: Start live detection
if st.session_state.live_detector is not None:
st.markdown("---")
st.markdown('<div class="ready-badge">βœ… SYSTEM READY</div>', unsafe_allow_html=True)
st.markdown("### Step 4: Live Detection 🎯")
col1, col2 = st.columns([3, 1])
with col1:
st.write("πŸ“Ή **Camera is ready for live footstep detection**")
st.write("🚢 Walk in front of the camera and hear footsteps in real-time!")
# Start/Stop controls
col_a, col_b = st.columns(2)
with col_a:
if not st.session_state.camera_active:
if st.button("▢️ Start Live Detection", type="primary", use_container_width=True):
st.session_state.camera_active = True
st.session_state.live_detector.start()
st.rerun()
with col_b:
if st.session_state.camera_active:
if st.button("⏹️ Stop Detection", type="secondary", use_container_width=True):
st.session_state.camera_active = False
st.session_state.live_detector.stop()
st.rerun()
with col2:
if st.session_state.camera_active:
st.markdown('<div class="live-indicator">πŸ”΄ LIVE</div>', unsafe_allow_html=True)
else:
st.info("⏸️ Paused")
# Live video feed
if st.session_state.camera_active:
st.markdown("---")
FRAME_WINDOW = st.image([])
cap = cv2.VideoCapture(0)
if not cap.isOpened():
st.error("❌ Cannot access camera. Please check permissions.")
st.session_state.camera_active = False
else:
st.info("πŸ“Ή Live feed active - Walk to generate footsteps!")
# Statistics
step_counter = st.empty()
left_steps = 0
right_steps = 0
try:
while st.session_state.camera_active:
ret, frame = cap.read()
if not ret:
st.error("❌ Failed to read from camera")
break
# Process frame
processed_frame, detected_foot = st.session_state.live_detector.process_frame(frame)
# Update counters
if detected_foot == 'LEFT':
left_steps += 1
elif detected_foot == 'RIGHT':
right_steps += 1
# Display frame
FRAME_WINDOW.image(cv2.cvtColor(processed_frame, cv2.COLOR_BGR2RGB))
# Update statistics
step_counter.metric("Total Steps Detected", left_steps + right_steps,
f"L: {left_steps} | R: {right_steps}")
# Check if user stopped
if not st.session_state.camera_active:
break
time.sleep(0.033) # ~30 FPS
except Exception as e:
st.error(f"❌ Error during live detection: {str(e)}")
finally:
cap.release()
st.session_state.live_detector.stop()
# Reset button
st.markdown("---")
if st.button("πŸ”„ Reset All", use_container_width=True):
st.session_state.floor_frame_captured = False
st.session_state.audio_downloaded = False
st.session_state.live_audio_path = None
st.session_state.live_detector = None
st.session_state.camera_active = False
st.rerun()
def video_upload_mode():
"""Original video upload mode"""
st.markdown('<h2>πŸ“€ Video Upload Mode</h2>', unsafe_allow_html=True)
# Sidebar configuration
sensitivity = st.sidebar.select_slider(
"Footstep Sensitivity",
options=['low', 'medium', 'high'],
value='medium',
help="Higher sensitivity detects more subtle footsteps"
)
yolo_conf = st.sidebar.slider(
"YOLO Confidence",
min_value=0.1,
max_value=0.9,
value=0.5,
step=0.05,
help="Confidence threshold for YOLO person detection"
)
surface_type = st.sidebar.selectbox(
"Surface Type",
['concrete', 'wood', 'grass', 'gravel', 'metal'],
help="Select surface for audio generation"
)
use_hybrid = st.sidebar.checkbox(
"Enable Hybrid Mode",
value=True,
help="Use YOLO for person detection + MediaPipe for pose estimation"
)
create_annotated = st.sidebar.checkbox("Create Annotated Video", value=True)
add_audio = st.sidebar.checkbox("Add Footstep Audio", value=True)
# File uploader
uploaded_file = st.file_uploader(
"πŸ“€ Upload Video File",
type=['mp4', 'avi', 'mov', 'mkv'],
help="Upload a video file to detect footsteps"
)
if uploaded_file:
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file:
tmp_file.write(uploaded_file.read())
video_path = tmp_file.name
col1, col2 = st.columns([2, 1])
with col1:
st.subheader("πŸ“Ή Input Video")
st.video(video_path)
with col2:
st.subheader("ℹ️ Video Info")
cap = cv2.VideoCapture(video_path)
video_info = {
"Duration": f"{cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS):.2f}s",
"FPS": f"{cap.get(cv2.CAP_PROP_FPS):.2f}",
"Resolution": f"{int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}",
"Frames": int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
}
cap.release()
for key, value in video_info.items():
st.metric(key, value)
if use_hybrid:
st.success("πŸ€– Hybrid Mode Active")
else:
st.info("πŸ“Š MediaPipe Only")
st.markdown("---")
if st.button("πŸš€ Process Video", type="primary", use_container_width=True):
if use_hybrid:
st.info("πŸ”„ Running Hybrid YOLO-MediaPipe Pipeline...")
pipeline = HybridFootstepDetectionPipeline(
fps=float(video_info["FPS"]),
sensitivity=sensitivity,
yolo_conf=yolo_conf
)
else:
st.info("πŸ”„ Running MediaPipe-Only Pipeline...")
pipeline = HybridFootstepDetectionPipeline(
fps=float(video_info["FPS"]),
sensitivity=sensitivity,
yolo_conf=yolo_conf
)
with st.spinner("πŸ” Detecting footsteps..."):
progress_bar = st.progress(0)
status_text = st.empty()
def update_progress(val):
progress_bar.progress(val)
status_text.text(f"Processing: {int(val * 100)}%")
results = pipeline.process_video(video_path, update_progress)
st.session_state['results'] = results
st.session_state['video_path'] = video_path
st.session_state['use_hybrid'] = use_hybrid
progress_bar.empty()
status_text.empty()
if results:
st.markdown('<div class="success-box">βœ… Footstep detection complete!</div>',
unsafe_allow_html=True)
st.success(f"Detected **{len(results['events'])}** footstep events")
if 'detection_stats' in results:
stats = results['detection_stats']
col1, col2, col3 = st.columns(3)
col1.metric("YOLO Detections",
f"{stats['yolo_detections']}/{stats['total_frames']}")
col2.metric("Pose Detections",
f"{stats['pose_detections']}/{stats['total_frames']}")
col3.metric("Success Rate",
f"{stats['pose_detections'] / stats['total_frames'] * 100:.1f}%")
# Display results (existing code continues...)
if 'results' in st.session_state:
results = st.session_state['results']
st.markdown("---")
st.subheader("πŸ“Š Detection Results")
col1, col2, col3, col4 = st.columns(4)
left_count = len([e for e in results['events'] if e['foot'] == 'LEFT'])
right_count = len([e for e in results['events'] if e['foot'] == 'RIGHT'])
avg_cadence = len(results['events']) / (results['total_frames'] / results['fps']) * 60
avg_conf = np.mean([e.get('confidence', 0.5) for e in results['events']])
col1.metric("Total Events", len(results['events']))
col2.metric("Left Foot", left_count)
col3.metric("Right Foot", right_count)
col4.metric("Avg Confidence", f"{avg_conf:.2f}")
st.metric("Average Cadence", f"{avg_cadence:.1f} steps/min")
st.subheader("πŸ“‹ Detected Events")
events_df = pd.DataFrame(results['events'])
if not events_df.empty:
st.dataframe(
events_df.style.apply(
lambda x: ['background-color: #e8f5e9' if x.foot == 'LEFT'
else 'background-color: #fff3e0' for _ in x],
axis=1
),
use_container_width=True,
height=300
)
st.subheader("πŸ’Ύ Export Options")
col1, col2, col3 = st.columns(3)
with col1:
csv = events_df.to_csv(index=False)
st.download_button(
"πŸ“„ Download CSV",
csv,
f"footsteps_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
"text/csv",
use_container_width=True
)
with col2:
json_data = json.dumps(results['events'], indent=2)
st.download_button(
"πŸ“‹ Download JSON",
json_data,
f"footsteps_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
"application/json",
use_container_width=True
)
with col3:
timecode_text = "\n".join([
f"{e['timecode']}\t{e['foot']}\t{e['event']}\t{e.get('confidence', 0.5):.2f}"
for e in results['events']
])
st.download_button(
"⏱️ Download Timecode",
timecode_text,
f"timecode_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
"text/plain",
use_container_width=True
)
st.markdown("---")
st.subheader("πŸŽ₯ Generate Output Video")
col1, col2 = st.columns(2)
with col1:
if create_annotated and st.button("Create Annotated Video", use_container_width=True):
with st.spinner("Creating annotated video..."):
annotated_path = tempfile.mktemp(suffix='_annotated.mp4')
progress_bar = st.progress(0)
success = create_annotated_video(
st.session_state['video_path'],
results['events'],
annotated_path,
use_hybrid=st.session_state.get('use_hybrid', False),
progress_callback=lambda v: progress_bar.progress(v)
)
if success:
st.session_state['annotated_video'] = annotated_path
progress_bar.empty()
st.success("βœ… Annotated video ready!")
else:
st.error("❌ Failed to create annotated video")
with col2:
if add_audio and st.button("Generate with Audio", use_container_width=True):
with st.spinner("Generating audio and merging..."):
audio_gen = AudioGenerator()
aud_path = process_video_for_footstep_audio(temp_video)
duration = results['total_frames'] / results['fps']
audio_track = audio_gen.create_audio_track(
results['events'],
aud_path,
duration
)
temp_video = tempfile.mktemp(suffix='_temp.mp4')
progress_bar = st.progress(0)
create_annotated_video(
st.session_state['video_path'],
results['events'],
temp_video,
use_hybrid=st.session_state.get('use_hybrid', False),
progress_callback=lambda v: progress_bar.progress(v * 0.7)
)
final_output = tempfile.mktemp(suffix='_final.mp4')
success = merge_audio_with_video(
temp_video,
audio_track,
44100,
final_output
)
progress_bar.progress(1.0)
progress_bar.empty()
if success:
st.session_state['final_video'] = final_output
st.success("βœ… Video with audio ready!")
else:
st.error("❌ Failed to merge audio")
if 'annotated_video' in st.session_state:
st.markdown("---")
st.subheader("πŸ“Ί Annotated Video")
st.video(st.session_state['annotated_video'])
with open(st.session_state['annotated_video'], 'rb') as f:
st.download_button(
"πŸ“₯ Download Annotated Video",
f,
f"annotated_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4",
"video/mp4",
use_container_width=True
)
if 'final_video' in st.session_state:
st.markdown("---")
st.subheader("πŸ”Š Final Video with Audio")
st.video(st.session_state['final_video'])
with open(st.session_state['final_video'], 'rb') as f:
st.download_button(
"πŸ“₯ Download Final Video",
f,
f"final_with_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4",
"video/mp4",
use_container_width=True
)
def main():
st.markdown('<h1 class="main-header">🎬 Hybrid YOLO-MediaPipe Footstep Detection</h1>',
unsafe_allow_html=True)
st.markdown('<div class="hybrid-badge">πŸš€ YOLO Person Detection + MediaPipe Pose Estimation</div>',
unsafe_allow_html=True)
st.markdown("### Advanced AI-Powered Foley Tool with Dual-Stage Detection Pipeline")
# Mode selection
st.markdown("---")
st.markdown("## 🎯 Select Mode")
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ“€ Video Upload Mode", use_container_width=True, type="primary"):
st.session_state.mode = 'upload'
with col2:
if st.button("πŸ“Ή Live Streaming Mode", use_container_width=True, type="primary"):
st.session_state.mode = 'live'
# Initialize mode
if 'mode' not in st.session_state:
st.session_state.mode = 'upload'
st.markdown("---")
# Display selected mode
if st.session_state.mode == 'upload':
video_upload_mode()
else:
live_streaming_mode()
# Sidebar info
with st.sidebar:
st.markdown("---")
st.markdown(f"### 🎯 Current Mode: **{st.session_state.mode.upper()}**")
if st.session_state.mode == 'live':
st.markdown("---")
st.markdown("### πŸ“Ή Live Mode Guide")
st.markdown("""
**Steps:**
1. πŸ“Έ **Capture Floor Frame**
- Point camera at floor
- Capture clear image
2. πŸ”Š **Generate Audio**
- AI analyzes floor type
- Downloads matching sound
3. βœ… **System Ready**
- Real-time detection active
- Walk and hear footsteps!
**Tips:**
- Good lighting needed
- Clear floor view
- Stand 2-3 meters away
- Walk naturally
""")
st.markdown("---")
st.markdown("### πŸ€– Hybrid Pipeline")
st.markdown("""
**Stage 1: YOLO Detection**
- Detects person in frame
- Provides bounding box
- Tracks across frames
**Stage 2: MediaPipe Pose**
- Estimates pose on detected region
- Extracts heel landmarks
- Higher accuracy & speed
**Benefits:**
- βœ… More robust detection
- βœ… Better occlusion handling
- βœ… Faster processing
- βœ… Improved accuracy
""")
st.markdown("---")
st.markdown("### ℹ️ System Info")
st.markdown("""
**Detection Engines:**
- YOLOv8 (Person Detection)
- MediaPipe Pose v2 (Pose Estimation)
**Features:**
- Dual-stage AI pipeline
- Person tracking
- Frame-accurate timing
- Confidence scoring
- Real-time live detection
- Autonomous audio generation
""")