Complete RAG System Architecture: From Document Indexing to Production Queries
Building a production-ready Retrieval-Augmented Generation (RAG) system requires understanding two distinct phases: indexing (processing and storing documents) and querying (retrieving and generating responses). Most tutorials focus on simple end-to-end examples, but real-world RAG systems separate these concerns for better performance, scalability and maintainability.
In this comprehensive guide, we’ll explore both phases using a local stack with LlamaIndex, Qdrant and Ollama. You’ll learn how to build a tweet analysis system that can answer questions like “What does the author think about Star Trek?” while maintaining production-grade performance and reliability.
Why Separate Indexing from Querying?
Traditional RAG tutorials often show a single script that loads documents, creates embeddings and answers queries all in one go. While this works for demos, production systems benefit from separation:
Indexing Phase Benefits:
- Batch processing: Handle large document sets efficiently
- Scheduled updates: Refresh data during low-traffic periods
- Resource optimization: Use powerful machines for embedding generation
- Data validation: Ensure quality before indexing
Querying Phase Benefits:
- Fast startup: No document processing delays
- Horizontal scaling: Multiple query services accessing same data
- Specialized optimization: Tune for query performance
- High availability: Independent of indexing pipeline
Architecture Overview
Our complete RAG system consists of two main components:
┌─────────────────┐ ┌─────────────────┐
│ Indexing │ │ Querying │
│ Pipeline │───▶│ Service │
│ │ │ │
│ • Load docs │ │ • Connect to │
│ • Generate │ │ vector store │
│ embeddings │ │ • Process │
│ • Store vectors │ │ queries │
└─────────────────┘ │ • Generate │
│ responses │
└─────────────────┘
Phase 1: Document Indexing Pipeline
The indexing pipeline processes raw documents and creates a searchable vector database. Here’s the complete implementation:
from pathlib import Path
import qdrant_client
from llama_index.core import VectorStoreIndex, StorageContext, Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.readers.json import JSONReader
from llama_index.vector_stores.qdrant import QdrantVectorStore
# Configure models
Settings.llm = Ollama(model="mistral", request_timeout=120.0)
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5"
)
# Load documents
loader = JSONReader()
documents = loader.load_data(Path('tinytweets.json'))
# Set up vector store
client = qdrant_client.QdrantClient(path="./qdrant_data")
vector_store = QdrantVectorStore(client=client, collection_name="tweets")
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# Create index
index = VectorStoreIndex.from_documents(documents, storage_context=storage_context)
print(f"Indexed {len(documents)} documents")
Key Components Explained
LLM Configuration
Settings.llm = Ollama(model="mistral", request_timeout=120.0)
The LLM is configured for the indexing phase but mainly used for query processing. The 120-second timeout accommodates complex queries on slower hardware.
Embedding Model Selection
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5"
)
Critical: This model choice determines your entire system’s performance. BGE-small-en-v1.5 offers an excellent balance:
- 384-dimensional vectors: Compact yet expressive
- Strong retrieval performance: Excellent on English text benchmarks
- Reasonable resource usage: ~100MB RAM, fast inference
- Multilingual capability: Good performance across languages
Document Processing
loader = JSONReader()
documents = loader.load_data(Path('tinytweets.json'))
The JSONReader automatically handles:
- JSON parsing: Converts structured data to documents
- Chunking: Breaks large documents into searchable pieces
- Metadata preservation: Maintains document relationships
Vector Storage
client = qdrant_client.QdrantClient(path="./qdrant_data")
vector_store = QdrantVectorStore(client=client, collection_name="tweets")
Qdrant provides enterprise-grade vector storage with:
- Persistence: Data survives application restarts
- Scalability: Handles millions of vectors efficiently
- Performance: Optimized similarity search algorithms
- Metadata filtering: Advanced query capabilities
Indexing Process Deep Dive
When you call VectorStoreIndex.from_documents()
, several operations occur:
- Document Chunking: Large documents are split into optimal sizes
- Embedding Generation: Each chunk is converted to 384-dimensional vectors
- Vector Storage: Embeddings are stored in Qdrant with metadata
- Index Creation: Search-optimized data structures are built
# Enhanced indexing with monitoring
import time
from typing import List
def index_documents_with_monitoring(documents: List, storage_context: StorageContext):
print(f"Starting indexing of {len(documents)} documents...")
start_time = time.time()
# Create index with progress tracking
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
show_progress=True
)
duration = time.time() - start_time
print(f"Indexing completed in {duration:.2f} seconds")
print(f"Average time per document: {duration/len(documents):.2f} seconds")
return index
Phase 2: Production Query Service
The query service connects to the existing vector store and handles user queries efficiently:
import qdrant_client
from llama_index.core import VectorStoreIndex, Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.vector_stores.qdrant import QdrantVectorStore
# Configure models (must match indexing phase)
Settings.llm = Ollama(model="mistral", request_timeout=120.0)
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5"
)
# Connect to existing vector store
client = qdrant_client.QdrantClient(path="./qdrant_data")
vector_store = QdrantVectorStore(client=client, collection_name="tweets")
# Reconstruct index from vector store
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
query_engine = index.as_query_engine(similarity_top_k=20)
# Process queries
response = query_engine.query("What does the author think about Star Trek? In One line")
print(response)
Key Differences from Indexing Phase
Direct Vector Store Connection
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
Using from_vector_store()
instead of from_documents()
provides:
- Faster startup: No document processing
- Memory efficiency: Only loads index metadata
- Immediate availability: Ready to serve queries instantly
Enhanced Retrieval Configuration
query_engine = index.as_query_engine(similarity_top_k=20)
The similarity_top_k=20
parameter retrieves 20 similar chunks instead of the typical 3-5, providing:
- Comprehensive context: More information for the LLM
- Better accuracy: Reduces chance of missing relevant information
- Trade-off consideration: Longer processing time and potential noise
Critical Consistency Requirement
Important: The embedding model must be identical to the one used during indexing. Mismatched models will result in poor search performance:
# Correct: Same model as indexing
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5"
)
# Incorrect: Different model will break search
# Settings.embed_model = HuggingFaceEmbedding(
# model_name="sentence-transformers/all-MiniLM-L6-v2"
# )
Production Implementation Patterns
Robust Query Service
import logging
from typing import Optional
import time
class ProductionQueryService:
def __init__(self, vector_store_path: str, collection_name: str):
self.logger = logging.getLogger(__name__)
self.setup_time = time.time()
self.query_count = 0
self.total_query_time = 0
try:
self.query_engine = self._setup_query_engine(vector_store_path, collection_name)
self.logger.info(f"Query service initialized in {time.time() - self.setup_time:.2f}s")
except Exception as e:
self.logger.error(f"Failed to initialize query service: {e}")
raise
def _setup_query_engine(self, path: str, collection: str):
# Configure models
Settings.llm = Ollama(model="mistral", request_timeout=120.0)
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5"
)
# Connect to vector store
client = qdrant_client.QdrantClient(path=path)
vector_store = QdrantVectorStore(client=client, collection_name=collection)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
return index.as_query_engine(similarity_top_k=20)
def query(self, question: str) -> Optional[dict]:
"""Process a query and return structured response"""
start_time = time.time()
try:
response = self.query_engine.query(question)
duration = time.time() - start_time
self.query_count += 1
self.total_query_time += duration
result = {
'question': question,
'answer': response.response,
'sources_count': len(response.source_nodes),
'processing_time': duration,
'confidence': self._calculate_confidence(response)
}
self.logger.info(f"Query processed in {duration:.2f}s")
return result
except Exception as e:
self.logger.error(f"Query failed: {e}")
return {
'question': question,
'answer': "I'm sorry, I couldn't process your query right now.",
'error': str(e),
'processing_time': time.time() - start_time
}
def _calculate_confidence(self, response) -> float:
"""Calculate confidence score based on source similarity"""
if not response.source_nodes:
return 0.0
scores = [node.score for node in response.source_nodes if hasattr(node, 'score')]
return sum(scores) / len(scores) if scores else 0.0
def get_statistics(self) -> dict:
"""Return service statistics"""
avg_query_time = self.total_query_time / self.query_count if self.query_count > 0 else 0
return {
'uptime': time.time() - self.setup_time,
'total_queries': self.query_count,
'average_query_time': avg_query_time,
'queries_per_second': self.query_count / (time.time() - self.setup_time)
}
Batch Processing Pipeline
from concurrent.futures import ThreadPoolExecutor
import json
class BatchQueryProcessor:
def __init__(self, query_service: ProductionQueryService):
self.query_service = query_service
self.logger = logging.getLogger(__name__)
def process_batch(self, questions: list, max_workers: int = 4) -> list:
"""Process multiple queries in parallel"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all queries
future_to_question = {
executor.submit(self.query_service.query, q): q
for q in questions
}
# Collect results
for future in future_to_question:
try:
result = future.result()
results.append(result)
except Exception as e:
self.logger.error(f"Batch query failed: {e}")
results.append({
'question': future_to_question[future],
'error': str(e)
})
return results
def process_from_file(self, input_file: str, output_file: str):
"""Process queries from JSON file"""
with open(input_file, 'r') as f:
questions = json.load(f)
results = self.process_batch(questions)
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
self.logger.info(f"Processed {len(questions)} queries, saved to {output_file}")
Web API Integration
from flask import Flask, request, jsonify
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
# Initialize query service
query_service = ProductionQueryService("./qdrant_data", "tweets")
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint"""
stats = query_service.get_statistics()
return jsonify({
'status': 'healthy',
'uptime': stats['uptime'],
'total_queries': stats['total_queries']
})
@app.route('/query', methods=['POST'])
def handle_query():
"""Main query endpoint"""
try:
data = request.get_json()
question = data.get('question')
if not question:
return jsonify({'error': 'Question is required'}), 400
# Optional parameters
top_k = data.get('top_k', 20)
include_sources = data.get('include_sources', False)
# Process query
result = query_service.query(question)
# Format response
response = {
'answer': result['answer'],
'processing_time': result['processing_time'],
'confidence': result.get('confidence', 0.0)
}
if include_sources:
response['sources_count'] = result['sources_count']
return jsonify(response)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/batch', methods=['POST'])
def handle_batch():
"""Batch query endpoint"""
try:
data = request.get_json()
questions = data.get('questions', [])
if not questions:
return jsonify({'error': 'Questions list is required'}), 400
# Process batch
batch_processor = BatchQueryProcessor(query_service)
results = batch_processor.process_batch(questions)
return jsonify({
'results': results,
'total_processed': len(results)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Advanced Configuration and Optimization
Retrieval Strategy Optimization
# Different retrieval strategies for different use cases
class AdaptiveQueryEngine:
def __init__(self, index: VectorStoreIndex):
self.index = index
self.engines = {
'precise': index.as_query_engine(similarity_top_k=3),
'balanced': index.as_query_engine(similarity_top_k=10),
'comprehensive': index.as_query_engine(similarity_top_k=20)
}
def query(self, question: str, strategy: str = 'balanced'):
"""Query with adaptive strategy"""
if strategy not in self.engines:
strategy = 'balanced'
return self.engines[strategy].query(question)
def auto_select_strategy(self, question: str) -> str:
"""Automatically select strategy based on question complexity"""
if len(question.split()) < 5:
return 'precise'
elif any(word in question.lower() for word in ['compare', 'analyze', 'comprehensive']):
return 'comprehensive'
else:
return 'balanced'
Custom Response Synthesis
from llama_index.core.prompts import PromptTemplate
# Custom prompt for tweet analysis
TWEET_ANALYSIS_PROMPT = PromptTemplate(
"""
You are analyzing tweets to answer questions about the author's opinions and thoughts.
Context from tweets:
{context_str}
Question: {query_str}
Instructions:
1. Base your answer only on the provided tweet context
2. If the question asks for a one-line answer, provide exactly one sentence
3. If the author's opinion is unclear, state that explicitly
4. Quote specific tweets when relevant
5. Maintain the conversational tone of the original tweets
Answer:
"""
)
# Apply custom prompt
query_engine.update_prompts({
"response_synthesizer:text_qa_template": TWEET_ANALYSIS_PROMPT
})
Performance Monitoring and Analytics
import time
from collections import defaultdict, deque
from datetime import datetime
class QueryAnalytics:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.query_times = deque(maxlen=window_size)
self.query_types = defaultdict(int)
self.hourly_counts = defaultdict(int)
self.error_count = 0
def record_query(self, question: str, response_time: float, error: bool = False):
"""Record query metrics"""
self.query_times.append(response_time)
if error:
self.error_count += 1
else:
# Categorize queries
self._categorize_query(question)
# Record hourly stats
hour = datetime.now().hour
self.hourly_counts[hour] += 1
def _categorize_query(self, question: str):
"""Categorize queries by type"""
question_lower = question.lower()
if any(word in question_lower for word in ['think', 'opinion', 'believe']):
self.query_types['opinion'] += 1
elif any(word in question_lower for word in ['what', 'topic', 'about']):
self.query_types['factual'] += 1
elif any(word in question_lower for word in ['how', 'why', 'explain']):
self.query_types['analytical'] += 1
else:
self.query_types['other'] += 1
def get_performance_metrics(self) -> dict:
"""Get current performance metrics"""
if not self.query_times:
return {'error': 'No queries recorded yet'}
return {
'average_response_time': sum(self.query_times) / len(self.query_times),
'median_response_time': sorted(self.query_times)[len(self.query_times)//2],
'total_queries': len(self.query_times),
'error_rate': self.error_count / (len(self.query_times) + self.error_count),
'query_distribution': dict(self.query_types),
'peak_hour': max(self.hourly_counts.items(), key=lambda x: x[1])[0] if self.hourly_counts else None
}
Deployment Architecture
Single-Node Production Setup
# docker-compose.yml equivalent in code
import subprocess
import os
def setup_production_environment():
"""Set up production environment"""
# Create necessary directories
os.makedirs('./qdrant_data', exist_ok=True)
os.makedirs('./logs', exist_ok=True)
# Start Ollama (assuming it's installed)
try:
subprocess.run(['ollama', 'serve'], check=True, background=True)
subprocess.run(['ollama', 'pull', 'mistral'], check=True)
print("Ollama service started successfully")
except subprocess.CalledProcessError:
print("Failed to start Ollama - ensure it's installed")
# Setup logging
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('./logs/rag_system.log'),
logging.StreamHandler()
]
)
print("Production environment setup complete")
Multi-Node Scaling Strategy
# Load balancer configuration
class LoadBalancedQueryService:
def __init__(self, nodes: list):
self.nodes = nodes
self.current_node = 0
def query(self, question: str):
"""Distribute queries across nodes"""
node = self.nodes[self.current_node]
self.current_node = (self.current_node + 1) % len(self.nodes)
return node.query(question)
def health_check(self):
"""Check health of all nodes"""
healthy_nodes = []
for i, node in enumerate(self.nodes):
try:
# Simple health check
node.query("test")
healthy_nodes.append(i)
except Exception:
pass
return len(healthy_nodes) == len(self.nodes)
Best Practices and Common Pitfalls
Embedding Model Consistency
Critical Rule: Always use the same embedding model for indexing and querying.
# Good: Centralized model configuration
def get_embedding_model():
return HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
# Use in both indexing and querying
Settings.embed_model = get_embedding_model()
Error Handling and Graceful Degradation
def robust_query_handler(query_engine, question: str) -> dict:
"""Handle queries with comprehensive error handling"""
try:
response = query_engine.query(question)
return {
'success': True,
'answer': response.response,
'sources': len(response.source_nodes)
}
except ConnectionError:
return {
'success': False,
'error': 'Vector store connection failed',
'fallback': 'Please try again later'
}
except TimeoutError:
return {
'success': False,
'error': 'Query timeout',
'fallback': 'Try a simpler question'
}
except Exception as e:
return {
'success': False,
'error': f'Unexpected error: {str(e)}',
'fallback': 'Please contact support'
}
Performance Optimization
# Optimize for different query patterns
def optimize_query_engine(index: VectorStoreIndex, query_type: str):
"""Create optimized query engines for different use cases"""
configs = {
'fast': {
'similarity_top_k': 3,
'response_mode': 'compact'
},
'accurate': {
'similarity_top_k': 20,
'response_mode': 'tree_summarize'
},
'streaming': {
'similarity_top_k': 10,
'streaming': True
}
}
config = configs.get(query_type, configs['fast'])
return index.as_query_engine(**config)
Testing and Validation
Unit Tests for RAG Components
import unittest
from unittest.mock import Mock, patch
class TestRAGSystem(unittest.TestCase):
def setUp(self):
self.mock_query_service = Mock()
def test_query_response_format(self):
"""Test that query responses have correct format"""
mock_response = {
'answer': 'Test answer',
'sources_count': 5,
'processing_time': 1.5
}
self.mock_query_service.query.return_value = mock_response
result = self.mock_query_service.query("test question")
self.assertIn('answer', result)
self.assertIn('sources_count', result)
self.assertIn('processing_time', result)
def test_error_handling(self):
"""Test error handling in query service"""
self.mock_query_service.query.side_effect = Exception("Test error")
with self.assertRaises(Exception):
self.mock_query_service.query("test question")
@patch('qdrant_client.QdrantClient')
def test_vector_store_connection(self, mock_client):
"""Test vector store connection"""
mock_client.return_value = Mock()
# Test connection logic
client = qdrant_client.QdrantClient(path="./test_data")
self.assertIsNotNone(client)
if __name__ == '__main__':
unittest.main()
Integration Tests
def test_end_to_end_workflow():
"""Test complete indexing and querying workflow"""
# Create test data
test_documents = [
{'id': '1', 'text': 'I love Star Trek, it\'s the best sci-fi show ever!'},
{'id': '2', 'text': 'Star Trek has great characters and storylines.'},
{'id': '3', 'text': 'The technology in Star Trek is fascinating.'}
]
# Test indexing
with open('test_tweets.json', 'w') as f:
json.dump(test_documents, f)
# Run indexing pipeline
index = create_index_from_documents('test_tweets.json')
# Test querying
query_service = ProductionQueryService('./qdrant_data', 'tweets')
result = query_service.query("What does the author think about Star Trek?")
# Validate results
assert 'love' in result['answer'].lower() or 'best' in result['answer'].lower()
assert result['sources_count'] > 0
assert result['processing_time'] < 30 # Should be fast
print("End-to-end test passed!")
Monitoring and Maintenance
Health Monitoring
class SystemHealthMonitor:
def __init__(self, query_service: ProductionQueryService):
self.query_service = query_service
self.alerts = []
def check_system_health(self) -> dict:
"""Comprehensive system health check"""
health_report = {
'timestamp': datetime.now().isoformat(),
'status': 'healthy',
'components': {}
}
# Check vector store
try:
test_result = self.query_service.query("test")
health_report['components']['vector_store'] = 'healthy'
except Exception as e:
health_report['components']['vector_store'] = f'unhealthy: {e}'
health_report['status'] = 'degraded'
# Check response times
stats = self.query_service.get_statistics()
if stats['average_query_time'] > 10: # 10 second threshold
health_report['components']['performance'] = 'slow'
health_report['status'] = 'degraded'
else:
health_report['components']['performance'] = 'healthy'
return health_report
def setup_alerts(self):
"""Set up alerting for system issues"""
# This would integrate with your monitoring system
pass
Conclusion
Building a production-ready RAG system requires careful consideration of both the indexing and querying phases. By separating these concerns, you can:
- Scale independently: Optimize each phase for its specific requirements
- Maintain efficiently: Update indexes without affecting query performance
- Deploy flexibly: Run multiple query services against the same data
- Monitor effectively: Track performance metrics for each component
The architecture presented here provides a solid foundation for enterprise RAG systems. Key takeaways:
- Consistency is critical: Use identical embedding models across phases
- Separation of concerns: Keep indexing and querying independent
- Error handling: Implement comprehensive error handling and fallbacks
- Performance tuning: Optimize retrieval parameters for your use case
- Production readiness: Include monitoring, logging and health checks
Whether you’re building a customer support system, internal knowledge base, or data analysis platform, this architecture scales from prototype to production while maintaining the flexibility to adapt to changing requirements.
The combination of LlamaIndex’s powerful abstractions, Qdrant’s robust vector storage and Ollama’s local LLM deployment creates a privacy-preserving, cost-effective solution that puts you in complete control of your data and AI capabilities.