Search Result Integration
Search Result Integration
Table of Contents
- Introduction
- Core Architecture
- Search Result Manager Implementation
- Integration with Decision-Making Workflow
- Data Flow and Processing Pipeline
- Caching and State Management
- Error Handling and Resilience
- Configuration and Extensibility
- Troubleshooting Guide
Introduction
The SearchResultManager component is a critical subsystem within the RAVANA AGI framework responsible for handling external information retrieval and integration into the decision-making process. It enables real-time augmentation of cognitive processes by managing search results from external sources, primarily through LLM-powered web searches. This document details its architecture, workflow, integration points, and operational characteristics, providing a comprehensive understanding of how external knowledge is retrieved, processed, and utilized to enhance autonomous decision-making.
Core Architecture
The search result integration system follows a producer-consumer pattern with asynchronous processing and shared state coordination. External information is retrieved via LLM providers with search capabilities, placed into a thread-safe queue by the SearchResultManager, and consumed by the core system loop for integration into the AGI's cognitive context. The architecture emphasizes loose coupling, resilience to API failures, and seamless integration with memory and knowledge systems.
Diagram sources
Section sources
Search Result Manager Implementation
The SearchResultManager is implemented as a simple yet effective thread-safe queue manager that decouples search execution from result consumption. It uses Python's built-in queue.Queue
for thread safety, allowing search operations to run in background threads while the main event loop safely retrieves results.
Class Interface and Methods
The SearchResultManager class provides a minimal but robust interface for managing search results:
class SearchResultManager:
def __init__(self):
self.result_queue = queue.Queue()
def add_result(self, result):
self.result_queue.put(result)
def get_result(self):
try:
return self.result_queue.get_nowait()
except queue.Empty:
return None
Key Methods:
- add_result(result): Thread-safe method to enqueue a search result. Called by background search threads.
- get_result(): Non-blocking method to retrieve a result. Returns
None
if the queue is empty, preventing the main loop from hanging.
The module also instantiates a global singleton instance:
search_result_manager = SearchResultManager()
This singleton pattern ensures all components interact with the same result queue.
Diagram sources
Section sources
Integration with Decision-Making Workflow
The search result system is tightly integrated with the decision-making workflow through Retrieval-Augmented Generation (RAG) patterns. Search results are incorporated into the decision context, allowing the LLM-based decision maker to leverage up-to-date external information when formulating responses and actions.
Query Generation and Execution
Search queries are generated implicitly through the LLM's natural language processing capabilities. When the call_gemini_with_search
function is invoked with a prompt, the Gemini API automatically determines if a web search is needed:
def call_gemini_with_search(prompt):
def search_thread():
try:
# Gemini API automatically performs search based on prompt
response = client.models.generate_content(
model=model_id,
contents=prompt,
config=GenerateContentConfig(tools=[google_search_tool])
)
result = process_response(response)
search_result_manager.add_result(result)
except Exception as e:
search_result_manager.add_result(f"[Gemini with search failed: {e}]")
thread = threading.Thread(target=search_thread)
thread.start()
return "Search started in the background. Check for results later."
This asynchronous design allows the system to continue processing while waiting for search results.
Result Parsing and Relevance Scoring
The system does not perform explicit relevance scoring through traditional information retrieval methods. Instead, relevance is implicitly handled by:
- The LLM's ability to extract relevant information during search
- The grounding metadata provided by Gemini API
- Subsequent processing in the decision loop
Search results include grounding metadata that indicates the sources used:
Diagram sources
Section sources
Data Flow and Processing Pipeline
The search result processing pipeline follows a well-defined sequence from query initiation to cognitive integration, ensuring that external information is properly contextualized and made available for decision-making.
Workflow Sequence
- Query Initiation: A component calls
call_gemini_with_search()
with a natural language prompt - Asynchronous Search: The search executes in a background thread using Gemini's search tool
- Result Queuing: The search result is added to the SearchResultManager's queue
- Result Retrieval: The main loop periodically checks for new results via
_check_for_search_results()
- State Integration: Results are added to
shared_state.search_results
- Memory Storage: Results are processed into structured memories
- Knowledge Base Storage: Results are added to the long-term knowledge base
- Decision Context: Results are provided as RAG context to the decision maker
Real-Time Data Augmentation
The DecisionMaker receives search results through the RAG (Retrieval-Augmented Generation) context parameter:
decision_context = {
'situation': situation,
'mood': self.shared_state.mood,
'memory': self.shared_state.recent_memories,
'rag_context': getattr(self.shared_state, 'search_results', [])
}
The decision_maker_loop
function formats this context and incorporates it into the LLM prompt:
# Format RAG context safely
rag_text = ""
if rag_context:
if isinstance(rag_context, list):
rag_text = "\n".join([str(item) for item in rag_context[:5]])
else:
rag_text = str(rag_context)
This allows the decision maker to consider recent search results when making decisions, effectively augmenting its knowledge with current external information.
Diagram sources
Section sources
Caching and State Management
The system employs a multi-layered approach to caching and state management, ensuring efficient use of search results while preventing resource exhaustion.
Caching Strategies
The primary caching mechanism is the in-memory storage of search results in the shared state:
# Add to shared state for immediate use
if not hasattr(self.shared_state, 'search_results'):
self.shared_state.search_results = []
self.shared_state.search_results.append(search_result)
# Limit search results to prevent memory bloat
if len(self.shared_state.search_results) > 10:
self.shared_state.search_results = self.shared_state.search_results[-10:]
This implements a sliding window cache that retains only the 10 most recent search results, preventing unbounded memory growth.
Shared State Integration
The SharedState class explicitly includes a search_results attribute:
class SharedState:
def __init__(self, initial_mood: Dict[str, float]):
# ... other state
self.search_results: List[str] = []
# ... other state
This design ensures that search results are treated as first-class citizens in the AGI's cognitive state, available for inspection and manipulation by various components.
Long-Term Storage
Beyond immediate caching, search results are persisted through two mechanisms:
- Memory Service Integration:
memory_summary = f"Search result retrieved: {search_result[:300]}..."
memories_to_save = await self.memory_service.extract_memories(memory_summary, "")
await self.memory_service.save_memories(memories_to_save.memories)
- Knowledge Service Integration:
knowledge_result = await asyncio.to_thread(
self.knowledge_service.add_knowledge,
content=search_result,
source="web_search",
category="search_result"
)
This dual-storage approach ensures that important information from search results is retained long-term while immediate results are available for current decision cycles.
Diagram sources
Section sources
Error Handling and Resilience
The search integration system includes comprehensive error handling to maintain system stability despite the inherent unreliability of external APIs and network services.
Rate Limiting Strategy
The system does not implement explicit rate limiting at the SearchResultManager level. Instead, it relies on:
- The asynchronous nature of search operations
- The background thread execution model
- Natural throttling through the main loop cycle time
This approach prevents search operations from overwhelming external APIs while allowing the system to continue functioning when searches are delayed.
Error Handling Mechanisms
The system employs a robust error handling strategy at multiple levels:
- Search Execution Level:
except Exception as e:
search_result_manager.add_result(f"[Gemini with search failed: {e}]")
Even when searches fail, an error message is added to the result queue, ensuring consumers are informed of the failure.
- Result Processing Level:
async def _check_for_search_results(self):
try:
search_result = search_result_manager.get_result()
# ... processing
except Exception as e:
logger.error(f"Error in search result processing: {e}", exc_info=True)
The main loop wraps result processing in try-catch blocks to prevent search-related errors from crashing the system.
- Memory and Knowledge Integration:
try:
# Save to memory
except Exception as e:
logger.warning(f"Failed to save search result to memory: {e}")
try:
# Save to knowledge base
except Exception as e:
logger.warning(f"Failed to add search result to knowledge base: {e}")
Secondary storage operations are wrapped in try-catch blocks with warnings, ensuring that failures in one system don't affect others.
Common Issues and Mitigation
API Failures: Mitigated by adding error messages to the result queue and continuing system operation.
Irrelevant Results: Addressed through the LLM's natural filtering capabilities and the grounding metadata that shows sources.
Excessive Latency: Handled through the asynchronous design, which prevents the main loop from blocking on search operations.
Memory Bloat: Prevented by limiting the search_results list to 10 items and processing results into structured memories.
Section sources
Configuration and Extensibility
The current implementation has limited explicit configuration options for search functionality, relying instead on environment variables and provider-specific settings.
Configuration Options
While not explicitly defined in configuration files, the system respects several environment variables that affect search behavior:
GEMINI_API_KEY
: Authentication for the Gemini API- Provider-specific API keys (A4F_API_KEY, ZUKIJOURNEY_API_KEY, etc.)
Search Provider Integration
The system is designed to work primarily with Gemini's search capabilities, but the architecture allows for potential integration with other search providers through the LLM abstraction layer. The call_llm
function provides a fallback mechanism:
def call_llm(prompt, preferred_provider=None, model=None):
providers = [
(call_zuki, 'zuki'),
(call_electronhub, 'electronhub'),
(call_zanity, 'zanity'),
(call_a4f, 'a4f'),
]
# Try providers in order
# Fallback to Gemini
return call_gemini(prompt)
This design ensures that search functionality remains available even if preferred providers fail.
Result Limits and Freshness
The system implements implicit limits on search results:
- Quantity Limit: Maximum of 10 recent search results stored in shared state
- Freshness: Results are consumed in FIFO order, with older results eventually being removed from the cache
There is no explicit freshness threshold based on time, as the system assumes that new searches will be performed when current information is insufficient.
Section sources
Troubleshooting Guide
This section addresses common issues encountered with the search result integration system and provides diagnostic steps and solutions.
Common Issues
Issue: Search results not appearing in decision context
- Diagnosis: Check if
call_gemini_with_search
is being called and if results are being added to the queue - Solution: Verify that
_check_for_search_results
is being called in the main loop and that the shared state has the search_results attribute
Issue: System hanging during search operations
- Diagnosis: The system should not hang due to the asynchronous design
- Solution: If hanging occurs, check for exceptions in the search thread that might prevent proper thread completion
Issue: Memory bloat from search results
- Diagnosis: Monitor the size of
shared_state.search_results
- Solution: The system automatically limits results to 10 items, but ensure that the memory and knowledge services are properly processing and deduplicating content
Issue: Repeated API failures
- Diagnosis: Check logs for consistent error messages from the Gemini API
- Solution: Verify API key validity and network connectivity; consider implementing exponential backoff in future versions
Diagnostic Commands
# Check current search results
print(agi_system.shared_state.search_results)
# Check if search result manager has pending results
result = search_result_manager.get_result()
if result:
print(f"Pending result: {result[:100]}...")
else:
print("No pending search results")
# View recent memory entries related to searches
recent_memories = [m for m in agi_system.shared_state.recent_memories
if "Search result retrieved" in str(m)]
print(f"Found {len(recent_memories)} search-related memories")
Section sources
Referenced Files in This Document