|
|
from langchain_core.runnables import RunnablePassthrough |
|
|
from langchain_core.output_parsers import PydanticOutputParser |
|
|
from langchain_core.prompts import ChatPromptTemplate |
|
|
from dotenv import load_dotenv |
|
|
from custom_wrapper import OpenRouterChat |
|
|
from pydantic import BaseModel, Field |
|
|
from typing import List |
|
|
import os |
|
|
import json |
|
|
import cv2 |
|
|
import base64 |
|
|
from PIL import Image |
|
|
import io |
|
|
import numpy as np |
|
|
from scipy.io.wavfile import write |
|
|
|
|
|
load_dotenv() |
|
|
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") |
|
|
|
|
|
|
|
|
|
|
|
class AudioArrayOutput(BaseModel): |
|
|
arr: List[float] = Field(description="Array for the audio waves") |
|
|
environment_description: str = Field(description="Description of the environment") |
|
|
reasoning: str = Field(description="Reasoning behind the audio generation") |
|
|
|
|
|
|
|
|
llm = OpenRouterChat( |
|
|
api_key=OPENROUTER_API_KEY, |
|
|
model="meta-llama/llama-3.2-90b-vision-instruct", |
|
|
temperature=0.7, |
|
|
max_tokens=2048 |
|
|
) |
|
|
|
|
|
parser = PydanticOutputParser(pydantic_object=AudioArrayOutput) |
|
|
|
|
|
|
|
|
def extract_first_frame(video_path): |
|
|
"""Extract the first frame from a video file""" |
|
|
try: |
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Cannot open video file: {video_path}") |
|
|
|
|
|
success, frame = cap.read() |
|
|
cap.release() |
|
|
|
|
|
if not success: |
|
|
raise ValueError("Cannot read the first frame from video") |
|
|
|
|
|
return frame |
|
|
except Exception as e: |
|
|
print(f"Error extracting first frame: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def image_to_base64(image): |
|
|
"""Convert OpenCV image to base64 string""" |
|
|
try: |
|
|
|
|
|
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
pil_image = Image.fromarray(image_rgb) |
|
|
|
|
|
|
|
|
buffered = io.BytesIO() |
|
|
pil_image.save(buffered, format="JPEG", quality=85) |
|
|
img_str = base64.b64encode(buffered.getvalue()).decode() |
|
|
return img_str |
|
|
except Exception as e: |
|
|
print(f"Error converting image to base64: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def save_audio_from_array(audio_array, sample_rate=44100, output_path="generated_audio.wav"): |
|
|
"""Save audio array as WAV file""" |
|
|
try: |
|
|
audio_np = np.array(audio_array, dtype=np.float32) |
|
|
|
|
|
|
|
|
if np.max(np.abs(audio_np)) > 0: |
|
|
audio_np = audio_np / np.max(np.abs(audio_np)) |
|
|
|
|
|
audio_np = np.clip(audio_np, -1.0, 1.0) |
|
|
audio_np = np.int16(audio_np * 32767) |
|
|
|
|
|
write(output_path, sample_rate, audio_np) |
|
|
return output_path |
|
|
except Exception as e: |
|
|
print(f"Error saving audio: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
prompt = ChatPromptTemplate.from_template(""" |
|
|
You are an expert sound designer and acoustic AI generator. |
|
|
Analyze the provided image and generate a footstep sound array. |
|
|
|
|
|
Image Data (base64): {image_data} |
|
|
|
|
|
**CRITICAL INSTRUCTIONS:** |
|
|
- Output ONLY valid JSON format, nothing else |
|
|
- No explanations, no code, no markdown formatting |
|
|
- No additional text before or after the JSON |
|
|
|
|
|
Generate a JSON object with exactly these three fields: |
|
|
1. "arr": Array of 50-80 float values between -1.0 and 1.0 |
|
|
2. "environment_description": Brief description of the environment |
|
|
3. "reasoning": Brief explanation of sound design choices |
|
|
|
|
|
{format_instructions} |
|
|
|
|
|
Output ONLY the JSON: |
|
|
|
|
|
""") |
|
|
|
|
|
chain = ( |
|
|
{"image_data": RunnablePassthrough(), "format_instructions": lambda x: parser.get_format_instructions()} |
|
|
| prompt |
|
|
| llm |
|
|
| parser |
|
|
) |
|
|
|
|
|
|
|
|
def analyze_image_and_generate_audio(image_base64): |
|
|
try: |
|
|
|
|
|
os.makedirs("./audio", exist_ok=True) |
|
|
|
|
|
result = chain.invoke(image_base64) |
|
|
p=open("ss.txt","w") |
|
|
p.write(str(result)) |
|
|
p.close() |
|
|
print("Generated array:", result.arr) |
|
|
print("Array length:", len(result.arr)) |
|
|
|
|
|
|
|
|
if not result.arr or len(result.arr) < 10: |
|
|
print("Warning: Generated audio array is too short or empty") |
|
|
|
|
|
fallback_array = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 50)).tolist() |
|
|
audio_path = save_audio_from_array(audio_array=fallback_array, output_path="./audio/footstep_from_image.wav") |
|
|
else: |
|
|
audio_path = save_audio_from_array(audio_array=result.arr, output_path="./audio/footstep_from_image.wav") |
|
|
|
|
|
print("π§ Environment Description:", result.environment_description) |
|
|
print("π§ Reasoning:", result.reasoning) |
|
|
print(f"β
Audio saved at: {audio_path}") |
|
|
return audio_path |
|
|
|
|
|
except Exception as e: |
|
|
print("Error during LLM audio generation:", e) |
|
|
|
|
|
fallback_array = np.sin(2 * np.pi * 440 * np.linspace(0, 1, 50)).tolist() |
|
|
audio_path = save_audio_from_array(audio_array=fallback_array, output_path="./audio/fallback_footstep.wav") |
|
|
return audio_path |
|
|
|
|
|
|
|
|
def process_video_for_footstep_audio(video_path): |
|
|
try: |
|
|
print("π₯ Extracting first frame...") |
|
|
first_frame = extract_first_frame(video_path) |
|
|
if first_frame is None: |
|
|
print("β Failed to extract frame from video") |
|
|
|
|
|
return create_fallback_audio() |
|
|
|
|
|
image_base64 = image_to_base64(first_frame) |
|
|
if image_base64 is None: |
|
|
print("β Failed to convert image to base64") |
|
|
return create_fallback_audio() |
|
|
|
|
|
print("π€ Generating footstep audio from LLM...") |
|
|
audio_path = analyze_image_and_generate_audio(image_base64) |
|
|
|
|
|
|
|
|
if audio_path and os.path.exists(audio_path): |
|
|
print(f"β
Audio generated successfully at: {audio_path}") |
|
|
return audio_path |
|
|
else: |
|
|
print("β Generated audio file not found") |
|
|
return create_fallback_audio() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Error in process_video_for_footstep_audio: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return create_fallback_audio() |
|
|
|
|
|
|
|
|
def create_fallback_audio(): |
|
|
"""Create a simple fallback audio file""" |
|
|
try: |
|
|
os.makedirs("./audio", exist_ok=True) |
|
|
fallback_path = "./audio/fallback_footstep.wav" |
|
|
|
|
|
|
|
|
sample_rate = 44100 |
|
|
duration = 1.0 |
|
|
t = np.linspace(0, duration, int(sample_rate * duration)) |
|
|
|
|
|
|
|
|
footstep = ( |
|
|
np.sin(2 * np.pi * 80 * t) * np.exp(-8 * t) + |
|
|
np.random.normal(0, 0.1, len(t)) * np.exp(-15 * t) |
|
|
) |
|
|
|
|
|
footstep = footstep / np.max(np.abs(footstep)) * 0.8 |
|
|
|
|
|
write(fallback_path, sample_rate, np.int16(footstep * 32767)) |
|
|
print(f"β
Created fallback audio at: {fallback_path}") |
|
|
return r"./audio/Footsteps on Gravel Path Outdoor.mp3" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Failed to create fallback audio: {e}") |
|
|
return None |
|
|
|
|
|
|