Multi-modal Service
Multi-modal Service
Table of Contents
- Introduction
- Core Architecture
- Multi-modal Service Implementation
- Action Registry Integration
- Data Transformation Pipeline
- YouTube Transcription Integration
- LLM Integration for Cross-modal Understanding
- Configuration Parameters
- Error Handling and Fallback Strategies
- Performance Considerations
Introduction
The Multi-modal Service is a core component of the RAVANA system designed to process and coordinate non-textual data such as images, audio, and video. This service enables the system to understand and analyze multi-modal inputs, transforming them into structured text representations that can be used for knowledge integration, decision making, and response generation. The service integrates with the Action Registry to enable multi-modal actions and works with LLM providers for cross-modal understanding.
Core Architecture
The Multi-modal Service operates as a centralized processing unit that handles various media types and coordinates with other system components. It follows a modular design with clear separation between media processing, action execution, and knowledge integration.
Diagram sources
Section sources
Multi-modal Service Implementation
The MultiModalService class provides comprehensive functionality for processing various media types and performing cross-modal analysis.
Service Initialization
The service initializes with configuration for supported formats and creates a temporary directory for processing artifacts.
def __init__(self):
self.supported_image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'}
self.supported_audio_formats = {'.mp3', '.wav', '.m4a', '.ogg', '.flac'}
self.temp_dir = Path(tempfile.gettempdir()) / "agi_multimodal"
self.temp_dir.mkdir(exist_ok=True)
Image Processing
The service processes images using the Gemini API for image captioning and analysis.
Diagram sources
Section sources
Audio Processing
Audio files are processed using the Gemini API for audio description and analysis.
Diagram sources
Section sources
Cross-modal Analysis
The service can analyze multiple content types together to identify patterns and relationships.
async def cross_modal_analysis(self, content_list: List[Dict[str, Any]], analysis_prompt: str = None) -> Dict[str, Any]:
# Prepare content descriptions
descriptions = []
content_types = []
for content in content_list:
if content.get('success', False):
descriptions.append(content.get('description', ''))
content_types.append(content.get('type', 'unknown'))
# Create analysis prompt
if not analysis_prompt:
analysis_prompt = f"""
Perform a comprehensive cross-modal analysis of the following content:
Content types: {', '.join(set(content_types))}
Content descriptions:
{chr(10).join["f"{i+1}. {desc}" for i, desc in enumerate(descriptions)"]}
Please provide:
1. Common themes and patterns across all content
2. Relationships and connections between different modalities
3. Insights that emerge from combining these different types of information
4. Potential applications or implications
5. Any contradictions or interesting contrasts
"""
# Use LLM for cross-modal analysis
loop = asyncio.get_event_loop()
from core.llm import safe_call_llm
analysis = await loop.run_in_executor(
None,
safe_call_llm,
analysis_prompt
)
Section sources
Content Summary Generation
The service generates comprehensive summaries of processed multi-modal content.
async def generate_content_summary(self, processed_content: List[Dict[str, Any]]) -> str:
# Create summary header
summary_parts.append(f"Multi-Modal Content Summary ({len(processed_content)} items processed)")
summary_parts.append("=" * 50)
# Add successful content
if successful_content:
summary_parts.append(f"\nSuccessfully Processed ({len(successful_content)} items):")
for i, content in enumerate(successful_content, 1):
content_type = content.get('type', 'unknown').title()
description = content.get('description', 'No description')[:200]
summary_parts.append(f"\n{i}. {content_type}: {description}...")
# Add failed content
if failed_content:
summary_parts.append(f"\n\nFailed to Process ({len(failed_content)} items):")
for i, content in enumerate(failed_content, 1):
content_type = content.get('type', 'unknown').title()
error = content.get('error', 'Unknown error')
summary_parts.append(f"\n{i}. {content_type}: {error}")
# Add cross-modal insights if multiple successful items
if len(successful_content) > 1:
cross_modal = await self.cross_modal_analysis(successful_content)
if cross_modal.get('success', False):
summary_parts.append(f"\n\nCross-Modal Analysis:")
summary_parts.append(cross_modal.get('analysis', 'No analysis available'))
Section sources
Action Registry Integration
The Multi-modal Service integrates with the Action Registry through the EnhancedActionManager, which registers multi-modal actions and provides execution methods.
Action Registration
The EnhancedActionManager registers multi-modal actions during initialization.
def register_enhanced_actions(self):
"""Register new multi-modal actions as Action instances."""
self.action_registry.register_action(ProcessImageAction(self.system, self.data_service))
self.action_registry.register_action(ProcessAudioAction(self.system, self.data_service))
self.action_registry.register_action(AnalyzeDirectoryAction(self.system, self.data_service))
self.action_registry.register_action(CrossModalAnalysisAction(self.system, self.data_service))
Action Implementation
Each multi-modal action is implemented as a class that inherits from the Action base class.
Diagram sources
Section sources
Action Execution Flow
Multi-modal actions are executed through a coordinated process that involves the service and knowledge integration.
Diagram sources
Section sources
Data Transformation Pipeline
The service implements a comprehensive pipeline that transforms raw media into structured text representations.
Pipeline Architecture
Diagram sources
Section sources
Directory Processing
The service can process all supported media files in a directory.
async def process_directory(self, directory_path: str, recursive: bool = False) -> List[Dict[str, Any]]:
# Get all files
if recursive:
files = list(directory.rglob("*"))
else:
files = list(directory.iterdir())
# Filter for supported files
supported_files = []
for file_path in files:
if file_path.is_file():
ext = file_path.suffix.lower()
if ext in self.supported_image_formats or ext in self.supported_audio_formats:
supported_files.append(file_path)
# Process each file
for file_path in supported_files:
if ext in self.supported_image_formats:
result = await self.process_image(str(file_path))
elif ext in self.supported_audio_formats:
result = await self.process_audio(str(file_path))
Section sources
YouTube Transcription Integration
The system includes a dedicated module for YouTube video transcription that uses multiple methods for reliability.
Transcription Process
Diagram sources
Section sources
Implementation Details
The YouTube transcription module uses a fallback strategy to ensure transcription success.
def transcribe_youtube_video(url):
# Fallback to audio-to-text method
yt = YouTube(url)
# Get the audio stream
audio_stream = yt.streams.filter(only_audio=True).first()
# Download the audio stream
output_path = "YoutubeAudios"
filename = "audio.mp3"
audio_stream.download(output_path=output_path, filename=filename)
# Load the base model and transcribe the audio
model = whisper.load_model("large")
result = model.transcribe("YoutubeAudios/audio.mp3")
transcribed_text = result["text"]
# Detect the language
language = detect(transcribed_text)
Section sources
LLM Integration for Cross-modal Understanding
The Multi-modal Service integrates with LLM providers to enable cross-modal understanding and response generation.
LLM Function Calls
The service uses specific LLM functions for different media types.
from core.llm import call_gemini_image_caption, call_gemini_audio_description, call_gemini_with_function_calling
# Image processing uses image captioning
description = await loop.run_in_executor(
None,
call_gemini_image_caption,
image_path,
prompt
)
# Audio processing uses audio description
description = await loop.run_in_executor(
None,
call_gemini_audio_description,
audio_path,
prompt
)
# Cross-modal analysis uses general LLM calling
from core.llm import safe_call_llm
analysis = await loop.run_in_executor(
None,
safe_call_llm,
analysis_prompt
)
Section sources
Cross-modal Analysis Prompt
The service generates comprehensive prompts for cross-modal analysis.
analysis_prompt = f"""
Perform a comprehensive cross-modal analysis of the following content:
Content types: {', '.join(set(content_types))}
Content descriptions:
{chr(10).join["f"{i+1}. {desc}" for i, desc in enumerate(descriptions)"]}
Please provide:
1. Common themes and patterns across all content
2. Relationships and connections between different modalities
3. Insights that emerge from combining these different types of information
4. Potential applications or implications
5. Any contradictions or interesting contrasts
"""
Section sources
Configuration Parameters
The system includes various configuration parameters for media processing and LLM integration.
Media Codecs and Formats
The service defines supported media formats for validation.
self.supported_image_formats = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'}
self.supported_audio_formats = {'.mp3', '.wav', '.m4a', '.ogg', '.flac'}
LLM Routing
The system uses different LLM functions based on the media type and processing requirements.
# Different LLM functions for different tasks
call_gemini_image_caption - For image analysis
call_gemini_audio_description - For audio analysis
safe_call_llm - For general text processing and cross-modal analysis
Multi-modal Service Configuration
The episodic memory module includes configuration for the multi-modal service.
class MultiModalMemoryService:
def __init__(self,
database_url: str,
text_model_name: str = "all-MiniLM-L6-v2",
whisper_model_size: str = "base",
device: Optional[str] = None):
Section sources
Error Handling and Fallback Strategies
The system implements comprehensive error handling and fallback strategies for multi-modal capabilities.
Error Handling in Media Processing
Each processing method includes try-except blocks to handle failures gracefully.
try:
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
file_ext = Path(image_path).suffix.lower()
if file_ext not in self.supported_image_formats:
raise ValueError(f"Unsupported image format: {file_ext}")
# Processing logic
...
except Exception as e:
logger.error(f"Failed to process image {image_path}: {e}")
return {
"type": "image",
"path": image_path,
"success": False,
"error": str(e),
"description": f"Failed to process image: {e}"
}
Fallback Strategies
The system implements multiple fallback strategies:
- YouTube Transcription: Uses YouTube Transcript API as primary method, falls back to audio download and Whisper transcription
- Action Caching: Caches successful action results to avoid reprocessing
- Parallel Execution Limiting: Limits concurrent actions to prevent resource exhaustion
# Action caching
cache_key = f"{action_name}_{hash(str(params))}"
if action_name not in non_cacheable and cache_key in self.action_cache:
logger.info(f"Using cached result for action: {action_name}")
return self.action_cache[cache_key]
# Parallel execution limiting
semaphore = asyncio.Semaphore(self.parallel_limit)
Section sources
Performance Considerations
The system addresses several performance considerations for multi-modal processing.
Processing Latency
The system manages processing latency through:
- Asynchronous Processing: Uses asyncio for non-blocking operations
- Executor Pool: Runs CPU-intensive LLM calls in executor pool
- Parallel Execution: Supports parallel action execution with configurable limits
# Asynchronous processing with executor
loop = asyncio.get_event_loop()
description = await loop.run_in_executor(
None,
call_gemini_image_caption,
image_path,
prompt
)
Quality Degradation
The system addresses quality degradation through:
- Format Validation: Validates input formats before processing
- Error Reporting: Provides detailed error information
- Fallback Methods: Implements alternative processing methods
Resource Management
The system manages resources through:
- Temporary File Cleanup: Regularly cleans up temporary files
- Cache Management: Limits cache size and clears old entries
- Memory Management: Uses efficient data structures
def cleanup_temp_files(self, max_age_hours: int = 24):
"""Clean up temporary files older than specified age."""
import time
current_time = time.time()
max_age_seconds = max_age_hours * 3600
cleaned_count = 0
for file_path in self.temp_dir.iterdir():
if file_path.is_file():
file_age = current_time - file_path.stat().st_mtime
if file_age > max_age_seconds:
file_path.unlink()
cleaned_count += 1
Section sources
Referenced Files in This Document