SmartHeal-Agentic-AI / src /wound_analysis.py
SmartHeal's picture
Upload 33 files
185c377 verified
import logging
import cv2
import numpy as np
from PIL import Image
import json
from datetime import datetime
import os
# Try to import AI libraries (graceful fallback if not available)
try:
from transformers import pipeline
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
logging.warning("Transformers not available")
try:
from ultralytics import YOLO
YOLO_AVAILABLE = True
except ImportError:
YOLO_AVAILABLE = False
logging.warning("YOLO not available")
try:
import tensorflow as tf
TF_AVAILABLE = True
except ImportError:
TF_AVAILABLE = False
logging.warning("TensorFlow not available")
class WoundAnalyzer:
"""AI-powered wound analysis system"""
def __init__(self, config):
"""Initialize wound analyzer with configuration"""
self.config = config
self.models_loaded = False
self.load_models()
def load_models(self):
"""Load AI models for wound analysis"""
try:
# Load models if libraries are available
if TRANSFORMERS_AVAILABLE:
try:
# Load a general image classification model
self.image_classifier = pipeline(
"image-classification",
model="google/vit-base-patch16-224",
token=self.config.HF_TOKEN
)
logging.info("✅ Image classification model loaded")
except Exception as e:
logging.warning(f"Could not load image classifier: {e}")
self.image_classifier = None
if YOLO_AVAILABLE:
try:
# Try to load YOLO model (will download if not present)
self.yolo_model = YOLO('yolov8n.pt')
logging.info("✅ YOLO model loaded")
except Exception as e:
logging.warning(f"Could not load YOLO model: {e}")
self.yolo_model = None
self.models_loaded = True
logging.info("✅ Wound analyzer initialized")
except Exception as e:
logging.error(f"Error loading models: {e}")
self.models_loaded = False
def analyze_wound(self, image, questionnaire_id):
"""Analyze wound image and return comprehensive results"""
start_time = datetime.now()
try:
if not image:
return self._create_error_result("No image provided")
# Get questionnaire data for context
questionnaire_data = self._get_questionnaire_data(questionnaire_id)
# Convert image to various formats for analysis
cv_image = self._pil_to_cv2(image)
np_image = np.array(image)
# Perform basic image analysis
basic_analysis = self._basic_image_analysis(cv_image, np_image)
# Perform AI analysis if models are available
ai_analysis = self._ai_image_analysis(image)
# Use enhanced AI processor if available
try:
from .ai_processor import AIProcessor
ai_processor = AIProcessor()
# Perform visual analysis using AI processor
visual_results = ai_processor.perform_visual_analysis(image)
# Query clinical guidelines
query = f"wound care {questionnaire_data.get('wound_location', '')} {questionnaire_data.get('diabetic_status', '')}"
guideline_context = ai_processor.query_guidelines(query)
# Generate comprehensive report
comprehensive_report = ai_processor.generate_final_report(
questionnaire_data, visual_results, guideline_context, image
)
# Merge AI processor results with basic analysis
ai_analysis.update({
'visual_analysis': visual_results,
'clinical_guidelines': guideline_context,
'comprehensive_report': comprehensive_report
})
logging.info("Enhanced AI analysis completed")
except Exception as e:
logging.warning(f"Enhanced AI processor not available: {e}")
# Combine results
analysis_result = self._combine_analysis_results(
basic_analysis,
ai_analysis,
questionnaire_id
)
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds()
analysis_result['processing_time'] = processing_time
logging.info(f"Wound analysis completed in {processing_time:.2f} seconds")
return analysis_result
except Exception as e:
logging.error(f"Wound analysis error: {e}")
return self._create_error_result(f"Analysis failed: {str(e)}")
def _get_questionnaire_data(self, questionnaire_id):
"""Get questionnaire data for analysis context"""
try:
# This should connect to the database to get questionnaire data
# For now, return empty dict as fallback
return {}
except Exception as e:
logging.warning(f"Could not fetch questionnaire data: {e}")
return {}
def _pil_to_cv2(self, pil_image):
"""Convert PIL image to OpenCV format"""
try:
# Convert PIL to RGB if not already
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
# Convert to numpy array and then to OpenCV format
np_array = np.array(pil_image)
cv_image = cv2.cvtColor(np_array, cv2.COLOR_RGB2BGR)
return cv_image
except Exception as e:
logging.error(f"Error converting PIL to CV2: {e}")
return None
def _basic_image_analysis(self, cv_image, np_image):
"""Perform basic image analysis using OpenCV"""
try:
analysis = {}
if cv_image is not None:
# Image properties
height, width = cv_image.shape[:2]
analysis['dimensions'] = f"{width}x{height}"
analysis['image_quality'] = self._assess_image_quality(cv_image)
# Color analysis
analysis['color_analysis'] = self._analyze_colors(cv_image)
# Texture analysis
analysis['texture_analysis'] = self._analyze_texture(cv_image)
# Edge detection for wound boundaries
analysis['edge_analysis'] = self._analyze_edges(cv_image)
return analysis
except Exception as e:
logging.error(f"Basic image analysis error: {e}")
return {}
def _assess_image_quality(self, cv_image):
"""Assess image quality metrics"""
try:
# Calculate sharpness using Laplacian variance
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
# Calculate brightness
brightness = np.mean(cv_image)
# Calculate contrast
contrast = np.std(cv_image)
# Determine quality rating
if sharpness > 500 and 50 < brightness < 200 and contrast > 30:
quality = "Good"
elif sharpness > 100 and 30 < brightness < 230 and contrast > 15:
quality = "Fair"
else:
quality = "Poor"
return {
'sharpness': float(sharpness),
'brightness': float(brightness),
'contrast': float(contrast),
'overall_quality': quality
}
except Exception as e:
logging.error(f"Image quality assessment error: {e}")
return {'overall_quality': 'Unknown'}
def _analyze_colors(self, cv_image):
"""Analyze color properties of the wound"""
try:
# Convert to HSV for better color analysis
hsv = cv2.cvtColor(cv_image, cv2.COLOR_BGR2HSV)
# Calculate color statistics
mean_hue = np.mean(hsv[:, :, 0])
mean_saturation = np.mean(hsv[:, :, 1])
mean_value = np.mean(hsv[:, :, 2])
# Detect dominant colors
dominant_colors = self._get_dominant_colors(cv_image)
return {
'mean_hue': float(mean_hue),
'mean_saturation': float(mean_saturation),
'mean_value': float(mean_value),
'dominant_colors': dominant_colors
}
except Exception as e:
logging.error(f"Color analysis error: {e}")
return {}
def _get_dominant_colors(self, cv_image, k=3):
"""Get dominant colors in the image"""
try:
# Reshape image to be a list of pixels
data = cv_image.reshape((-1, 3))
data = np.float32(data)
# Apply k-means clustering
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0)
_, labels, centers = cv2.kmeans(data, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
# Convert back to uint8 and get color names
centers = np.uint8(centers)
dominant_colors = []
for center in centers:
color_name = self._classify_color(center)
dominant_colors.append({
'rgb': center.tolist(),
'name': color_name
})
return dominant_colors
except Exception as e:
logging.error(f"Dominant colors error: {e}")
return []
def _classify_color(self, rgb_color):
"""Classify RGB color into medical color categories"""
r, g, b = rgb_color
# Simple color classification for wound assessment
if r > 150 and g < 100 and b < 100:
return "Red/Inflammatory"
elif r > 150 and g > 150 and b < 100:
return "Yellow/Exudate"
elif r < 100 and g < 100 and b < 100:
return "Dark/Necrotic"
elif r > 200 and g > 200 and b > 200:
return "White/Pale"
elif r > 100 and g > 50 and b < 100:
return "Pink/Healthy"
else:
return "Mixed/Other"
def _analyze_texture(self, cv_image):
"""Analyze texture properties"""
try:
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
# Calculate Local Binary Pattern (simplified)
texture_variance = np.var(gray)
texture_mean = np.mean(gray)
# Determine texture category
if texture_variance > 1000:
texture_type = "Rough/Irregular"
elif texture_variance > 500:
texture_type = "Moderate"
else:
texture_type = "Smooth"
return {
'variance': float(texture_variance),
'mean': float(texture_mean),
'type': texture_type
}
except Exception as e:
logging.error(f"Texture analysis error: {e}")
return {}
def _analyze_edges(self, cv_image):
"""Analyze edges for wound boundary detection"""
try:
gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY)
# Apply Canny edge detection
edges = cv2.Canny(gray, 50, 150)
# Count edge pixels
edge_count = np.sum(edges > 0)
total_pixels = edges.shape[0] * edges.shape[1]
edge_ratio = edge_count / total_pixels
# Determine wound boundary clarity
if edge_ratio > 0.1:
boundary_clarity = "Well-defined"
elif edge_ratio > 0.05:
boundary_clarity = "Moderately-defined"
else:
boundary_clarity = "Poorly-defined"
return {
'edge_count': int(edge_count),
'edge_ratio': float(edge_ratio),
'boundary_clarity': boundary_clarity
}
except Exception as e:
logging.error(f"Edge analysis error: {e}")
return {}
def _ai_image_analysis(self, image):
"""Perform AI-based image analysis"""
try:
ai_results = {}
# Use image classifier if available
if hasattr(self, 'image_classifier') and self.image_classifier:
try:
classification = self.image_classifier(image)
ai_results['classification'] = classification[:3] # Top 3 results
except Exception as e:
logging.warning(f"Image classification failed: {e}")
# Use YOLO for object detection if available
if hasattr(self, 'yolo_model') and self.yolo_model:
try:
detection_results = self.yolo_model(image)
ai_results['object_detection'] = self._process_yolo_results(detection_results)
except Exception as e:
logging.warning(f"YOLO detection failed: {e}")
return ai_results
except Exception as e:
logging.error(f"AI image analysis error: {e}")
return {}
def _process_yolo_results(self, results):
"""Process YOLO detection results"""
try:
processed_results = []
for result in results:
if hasattr(result, 'boxes') and result.boxes:
for box in result.boxes:
processed_results.append({
'confidence': float(box.conf.item()) if hasattr(box, 'conf') else 0.0,
'class_name': result.names.get(int(box.cls.item()), 'unknown') if hasattr(box, 'cls') else 'unknown'
})
return processed_results
except Exception as e:
logging.error(f"YOLO results processing error: {e}")
return []
def _combine_analysis_results(self, basic_analysis, ai_analysis, questionnaire_id):
"""Combine all analysis results into a comprehensive report"""
try:
# Create comprehensive analysis result
result = {
'questionnaire_id': questionnaire_id,
'basic_analysis': basic_analysis,
'ai_analysis': ai_analysis,
'model_version': 'SmartHeal-v1.0'
}
# Generate summary
result['summary'] = self._generate_summary(basic_analysis, ai_analysis)
# Generate recommendations
result['recommendations'] = self._generate_recommendations(basic_analysis, ai_analysis)
# Calculate risk assessment
result['risk_assessment'] = self._calculate_risk_assessment(basic_analysis, ai_analysis)
result['risk_level'] = result['risk_assessment']['level']
result['risk_score'] = result['risk_assessment']['score']
# Determine wound type
result['wound_type'] = self._determine_wound_type(basic_analysis, ai_analysis)
# Extract wound dimensions
result['wound_dimensions'] = basic_analysis.get('dimensions', 'Unknown')
return result
except Exception as e:
logging.error(f"Results combination error: {e}")
return self._create_error_result("Failed to combine analysis results")
def _generate_summary(self, basic_analysis, ai_analysis):
"""Generate analysis summary"""
try:
summary_parts = []
# Image quality assessment
if 'image_quality' in basic_analysis:
quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown')
summary_parts.append(f"Image quality: {quality}")
# Color analysis summary
if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']:
colors = basic_analysis['color_analysis']['dominant_colors']
if colors:
color_names = [color['name'] for color in colors[:2]]
summary_parts.append(f"Dominant colors: {', '.join(color_names)}")
# Texture summary
if 'texture_analysis' in basic_analysis:
texture_type = basic_analysis['texture_analysis'].get('type', 'Unknown')
summary_parts.append(f"Texture: {texture_type}")
# Boundary clarity
if 'edge_analysis' in basic_analysis:
boundary = basic_analysis['edge_analysis'].get('boundary_clarity', 'Unknown')
summary_parts.append(f"Wound boundaries: {boundary}")
# AI classification summary
if 'classification' in ai_analysis and ai_analysis['classification']:
top_class = ai_analysis['classification'][0]
summary_parts.append(f"AI classification: {top_class.get('label', 'Unknown')}")
summary = "Wound Analysis Summary: " + "; ".join(summary_parts) if summary_parts else "Basic wound analysis completed."
return summary
except Exception as e:
logging.error(f"Summary generation error: {e}")
return "Wound analysis completed with limited information due to processing constraints."
def _generate_recommendations(self, basic_analysis, ai_analysis):
"""Generate treatment recommendations based on analysis"""
try:
recommendations = []
# Image quality recommendations
if 'image_quality' in basic_analysis:
quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown')
if quality == 'Poor':
recommendations.append("Consider retaking the image with better lighting and focus for more accurate analysis.")
# Color-based recommendations
if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']:
colors = basic_analysis['color_analysis']['dominant_colors']
for color in colors:
color_name = color.get('name', '')
if 'Red/Inflammatory' in color_name:
recommendations.append("Red coloration may indicate inflammation. Monitor for infection signs.")
elif 'Yellow/Exudate' in color_name:
recommendations.append("Yellow areas suggest possible exudate. Consider wound cleansing.")
elif 'Dark/Necrotic' in color_name:
recommendations.append("Dark areas may indicate necrotic tissue. Consult for debridement evaluation.")
elif 'Pink/Healthy' in color_name:
recommendations.append("Pink coloration suggests healthy granulation tissue - positive healing sign.")
# Texture-based recommendations
if 'texture_analysis' in basic_analysis:
texture_type = basic_analysis['texture_analysis'].get('type', '')
if 'Rough/Irregular' in texture_type:
recommendations.append("Irregular texture may require specialized wound care approach.")
# Boundary-based recommendations
if 'edge_analysis' in basic_analysis:
boundary = basic_analysis['edge_analysis'].get('boundary_clarity', '')
if 'Poorly-defined' in boundary:
recommendations.append("Poorly defined wound edges may indicate ongoing tissue breakdown.")
# General recommendations
recommendations.extend([
"Continue regular wound monitoring and documentation.",
"Maintain appropriate wound hygiene and dressing protocols.",
"Consult healthcare provider for persistent or worsening symptoms.",
"Follow established wound care guidelines for optimal healing."
])
return "; ".join(recommendations) if recommendations else "Standard wound care protocols recommended."
except Exception as e:
logging.error(f"Recommendations generation error: {e}")
return "Consult healthcare provider for appropriate wound care recommendations."
def _calculate_risk_assessment(self, basic_analysis, ai_analysis):
"""Calculate risk assessment based on analysis"""
try:
risk_score = 0
risk_factors = []
# Image quality factor
if 'image_quality' in basic_analysis:
quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown')
if quality == 'Poor':
risk_score += 10
risk_factors.append("Poor image quality")
# Color-based risk factors
if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']:
colors = basic_analysis['color_analysis']['dominant_colors']
for color in colors:
color_name = color.get('name', '')
if 'Dark/Necrotic' in color_name:
risk_score += 30
risk_factors.append("Possible necrotic tissue")
elif 'Red/Inflammatory' in color_name:
risk_score += 20
risk_factors.append("Signs of inflammation")
elif 'Yellow/Exudate' in color_name:
risk_score += 15
risk_factors.append("Possible exudate")
# Texture risk factors
if 'texture_analysis' in basic_analysis:
texture_type = basic_analysis['texture_analysis'].get('type', '')
if 'Rough/Irregular' in texture_type:
risk_score += 10
risk_factors.append("Irregular texture")
# Boundary risk factors
if 'edge_analysis' in basic_analysis:
boundary = basic_analysis['edge_analysis'].get('boundary_clarity', '')
if 'Poorly-defined' in boundary:
risk_score += 15
risk_factors.append("Poorly defined boundaries")
# Determine risk level
if risk_score >= 50:
risk_level = "High"
elif risk_score >= 25:
risk_level = "Moderate"
elif risk_score >= 10:
risk_level = "Low"
else:
risk_level = "Minimal"
return {
'score': min(risk_score, 100), # Cap at 100
'level': risk_level,
'factors': risk_factors
}
except Exception as e:
logging.error(f"Risk assessment error: {e}")
return {
'score': 0,
'level': 'Unknown',
'factors': ['Assessment error']
}
def _determine_wound_type(self, basic_analysis, ai_analysis):
"""Determine wound type based on analysis"""
try:
# This is a simplified wound type determination
# In a real system, this would use more sophisticated ML models
wound_characteristics = []
# Color-based characteristics
if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']:
colors = basic_analysis['color_analysis']['dominant_colors']
for color in colors:
color_name = color.get('name', '')
if 'Red/Inflammatory' in color_name:
wound_characteristics.append("inflammatory")
elif 'Pink/Healthy' in color_name:
wound_characteristics.append("granulating")
elif 'Yellow/Exudate' in color_name:
wound_characteristics.append("exudative")
elif 'Dark/Necrotic' in color_name:
wound_characteristics.append("necrotic")
# Determine primary wound type
if "necrotic" in wound_characteristics:
return "Necrotic wound"
elif "inflammatory" in wound_characteristics and "exudative" in wound_characteristics:
return "Infected/Inflammatory wound"
elif "granulating" in wound_characteristics:
return "Healing/Granulating wound"
elif "exudative" in wound_characteristics:
return "Exudative wound"
else:
return "Acute wound"
except Exception as e:
logging.error(f"Wound type determination error: {e}")
return "Undetermined wound type"
def _create_error_result(self, error_message):
"""Create error result structure"""
return {
'error': True,
'summary': f"Analysis Error: {error_message}",
'recommendations': "Please ensure image quality is adequate and try again. Consult healthcare provider if issues persist.",
'risk_level': 'Unknown',
'risk_score': 0,
'wound_type': 'Unknown',
'wound_dimensions': 'Unknown',
'processing_time': 0.0,
'model_version': 'SmartHeal-v1.0'
}