Skip to Content
ProgrammingComplete RAG System Architecture

Complete RAG System Architecture: From Document Indexing to Production Queries

Building a production-ready Retrieval-Augmented Generation (RAG) system requires understanding two distinct phases: indexing (processing and storing documents) and querying (retrieving and generating responses). Most tutorials focus on simple end-to-end examples, but real-world RAG systems separate these concerns for better performance, scalability and maintainability.

In this comprehensive guide, we’ll explore both phases using a local stack with LlamaIndex, Qdrant and Ollama. You’ll learn how to build a tweet analysis system that can answer questions like “What does the author think about Star Trek?” while maintaining production-grade performance and reliability.

Why Separate Indexing from Querying?

Traditional RAG tutorials often show a single script that loads documents, creates embeddings and answers queries all in one go. While this works for demos, production systems benefit from separation:

Indexing Phase Benefits:

  • Batch processing: Handle large document sets efficiently
  • Scheduled updates: Refresh data during low-traffic periods
  • Resource optimization: Use powerful machines for embedding generation
  • Data validation: Ensure quality before indexing

Querying Phase Benefits:

  • Fast startup: No document processing delays
  • Horizontal scaling: Multiple query services accessing same data
  • Specialized optimization: Tune for query performance
  • High availability: Independent of indexing pipeline

Architecture Overview

Our complete RAG system consists of two main components:

┌─────────────────┐ ┌─────────────────┐ │ Indexing │ │ Querying │ │ Pipeline │───▶│ Service │ │ │ │ │ │ • Load docs │ │ • Connect to │ │ • Generate │ │ vector store │ │ embeddings │ │ • Process │ │ • Store vectors │ │ queries │ └─────────────────┘ │ • Generate │ │ responses │ └─────────────────┘

Phase 1: Document Indexing Pipeline

The indexing pipeline processes raw documents and creates a searchable vector database. Here’s the complete implementation:

from pathlib import Path import qdrant_client from llama_index.core import VectorStoreIndex, StorageContext, Settings from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.llms.ollama import Ollama from llama_index.readers.json import JSONReader from llama_index.vector_stores.qdrant import QdrantVectorStore # Configure models Settings.llm = Ollama(model="mistral", request_timeout=120.0) Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5" ) # Load documents loader = JSONReader() documents = loader.load_data(Path('tinytweets.json')) # Set up vector store client = qdrant_client.QdrantClient(path="./qdrant_data") vector_store = QdrantVectorStore(client=client, collection_name="tweets") storage_context = StorageContext.from_defaults(vector_store=vector_store) # Create index index = VectorStoreIndex.from_documents(documents, storage_context=storage_context) print(f"Indexed {len(documents)} documents")

Key Components Explained

LLM Configuration

Settings.llm = Ollama(model="mistral", request_timeout=120.0)

The LLM is configured for the indexing phase but mainly used for query processing. The 120-second timeout accommodates complex queries on slower hardware.

Embedding Model Selection

Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5" )

Critical: This model choice determines your entire system’s performance. BGE-small-en-v1.5 offers an excellent balance:

  • 384-dimensional vectors: Compact yet expressive
  • Strong retrieval performance: Excellent on English text benchmarks
  • Reasonable resource usage: ~100MB RAM, fast inference
  • Multilingual capability: Good performance across languages

Document Processing

loader = JSONReader() documents = loader.load_data(Path('tinytweets.json'))

The JSONReader automatically handles:

  • JSON parsing: Converts structured data to documents
  • Chunking: Breaks large documents into searchable pieces
  • Metadata preservation: Maintains document relationships

Vector Storage

client = qdrant_client.QdrantClient(path="./qdrant_data") vector_store = QdrantVectorStore(client=client, collection_name="tweets")

Qdrant provides enterprise-grade vector storage with:

  • Persistence: Data survives application restarts
  • Scalability: Handles millions of vectors efficiently
  • Performance: Optimized similarity search algorithms
  • Metadata filtering: Advanced query capabilities

Indexing Process Deep Dive

When you call VectorStoreIndex.from_documents(), several operations occur:

  • Document Chunking: Large documents are split into optimal sizes
  • Embedding Generation: Each chunk is converted to 384-dimensional vectors
  • Vector Storage: Embeddings are stored in Qdrant with metadata
  • Index Creation: Search-optimized data structures are built
# Enhanced indexing with monitoring import time from typing import List def index_documents_with_monitoring(documents: List, storage_context: StorageContext): print(f"Starting indexing of {len(documents)} documents...") start_time = time.time() # Create index with progress tracking index = VectorStoreIndex.from_documents( documents, storage_context=storage_context, show_progress=True ) duration = time.time() - start_time print(f"Indexing completed in {duration:.2f} seconds") print(f"Average time per document: {duration/len(documents):.2f} seconds") return index

Phase 2: Production Query Service

The query service connects to the existing vector store and handles user queries efficiently:

import qdrant_client from llama_index.core import VectorStoreIndex, Settings from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.llms.ollama import Ollama from llama_index.vector_stores.qdrant import QdrantVectorStore # Configure models (must match indexing phase) Settings.llm = Ollama(model="mistral", request_timeout=120.0) Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5" ) # Connect to existing vector store client = qdrant_client.QdrantClient(path="./qdrant_data") vector_store = QdrantVectorStore(client=client, collection_name="tweets") # Reconstruct index from vector store index = VectorStoreIndex.from_vector_store(vector_store=vector_store) query_engine = index.as_query_engine(similarity_top_k=20) # Process queries response = query_engine.query("What does the author think about Star Trek? In One line") print(response)

Key Differences from Indexing Phase

Direct Vector Store Connection

index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

Using from_vector_store() instead of from_documents() provides:

  • Faster startup: No document processing
  • Memory efficiency: Only loads index metadata
  • Immediate availability: Ready to serve queries instantly

Enhanced Retrieval Configuration

query_engine = index.as_query_engine(similarity_top_k=20)

The similarity_top_k=20 parameter retrieves 20 similar chunks instead of the typical 3-5, providing:

  • Comprehensive context: More information for the LLM
  • Better accuracy: Reduces chance of missing relevant information
  • Trade-off consideration: Longer processing time and potential noise

Critical Consistency Requirement

Important: The embedding model must be identical to the one used during indexing. Mismatched models will result in poor search performance:

# Correct: Same model as indexing Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5" ) # Incorrect: Different model will break search # Settings.embed_model = HuggingFaceEmbedding( # model_name="sentence-transformers/all-MiniLM-L6-v2" # )

Production Implementation Patterns

Robust Query Service

import logging from typing import Optional import time class ProductionQueryService: def __init__(self, vector_store_path: str, collection_name: str): self.logger = logging.getLogger(__name__) self.setup_time = time.time() self.query_count = 0 self.total_query_time = 0 try: self.query_engine = self._setup_query_engine(vector_store_path, collection_name) self.logger.info(f"Query service initialized in {time.time() - self.setup_time:.2f}s") except Exception as e: self.logger.error(f"Failed to initialize query service: {e}") raise def _setup_query_engine(self, path: str, collection: str): # Configure models Settings.llm = Ollama(model="mistral", request_timeout=120.0) Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5" ) # Connect to vector store client = qdrant_client.QdrantClient(path=path) vector_store = QdrantVectorStore(client=client, collection_name=collection) index = VectorStoreIndex.from_vector_store(vector_store=vector_store) return index.as_query_engine(similarity_top_k=20) def query(self, question: str) -> Optional[dict]: """Process a query and return structured response""" start_time = time.time() try: response = self.query_engine.query(question) duration = time.time() - start_time self.query_count += 1 self.total_query_time += duration result = { 'question': question, 'answer': response.response, 'sources_count': len(response.source_nodes), 'processing_time': duration, 'confidence': self._calculate_confidence(response) } self.logger.info(f"Query processed in {duration:.2f}s") return result except Exception as e: self.logger.error(f"Query failed: {e}") return { 'question': question, 'answer': "I'm sorry, I couldn't process your query right now.", 'error': str(e), 'processing_time': time.time() - start_time } def _calculate_confidence(self, response) -> float: """Calculate confidence score based on source similarity""" if not response.source_nodes: return 0.0 scores = [node.score for node in response.source_nodes if hasattr(node, 'score')] return sum(scores) / len(scores) if scores else 0.0 def get_statistics(self) -> dict: """Return service statistics""" avg_query_time = self.total_query_time / self.query_count if self.query_count > 0 else 0 return { 'uptime': time.time() - self.setup_time, 'total_queries': self.query_count, 'average_query_time': avg_query_time, 'queries_per_second': self.query_count / (time.time() - self.setup_time) }

Batch Processing Pipeline

from concurrent.futures import ThreadPoolExecutor import json class BatchQueryProcessor: def __init__(self, query_service: ProductionQueryService): self.query_service = query_service self.logger = logging.getLogger(__name__) def process_batch(self, questions: list, max_workers: int = 4) -> list: """Process multiple queries in parallel""" results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all queries future_to_question = { executor.submit(self.query_service.query, q): q for q in questions } # Collect results for future in future_to_question: try: result = future.result() results.append(result) except Exception as e: self.logger.error(f"Batch query failed: {e}") results.append({ 'question': future_to_question[future], 'error': str(e) }) return results def process_from_file(self, input_file: str, output_file: str): """Process queries from JSON file""" with open(input_file, 'r') as f: questions = json.load(f) results = self.process_batch(questions) with open(output_file, 'w') as f: json.dump(results, f, indent=2) self.logger.info(f"Processed {len(questions)} queries, saved to {output_file}")

Web API Integration

from flask import Flask, request, jsonify from flask_cors import CORS app = Flask(__name__) CORS(app) # Initialize query service query_service = ProductionQueryService("./qdrant_data", "tweets") @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint""" stats = query_service.get_statistics() return jsonify({ 'status': 'healthy', 'uptime': stats['uptime'], 'total_queries': stats['total_queries'] }) @app.route('/query', methods=['POST']) def handle_query(): """Main query endpoint""" try: data = request.get_json() question = data.get('question') if not question: return jsonify({'error': 'Question is required'}), 400 # Optional parameters top_k = data.get('top_k', 20) include_sources = data.get('include_sources', False) # Process query result = query_service.query(question) # Format response response = { 'answer': result['answer'], 'processing_time': result['processing_time'], 'confidence': result.get('confidence', 0.0) } if include_sources: response['sources_count'] = result['sources_count'] return jsonify(response) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/batch', methods=['POST']) def handle_batch(): """Batch query endpoint""" try: data = request.get_json() questions = data.get('questions', []) if not questions: return jsonify({'error': 'Questions list is required'}), 400 # Process batch batch_processor = BatchQueryProcessor(query_service) results = batch_processor.process_batch(questions) return jsonify({ 'results': results, 'total_processed': len(results) }) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)

Advanced Configuration and Optimization

Retrieval Strategy Optimization

# Different retrieval strategies for different use cases class AdaptiveQueryEngine: def __init__(self, index: VectorStoreIndex): self.index = index self.engines = { 'precise': index.as_query_engine(similarity_top_k=3), 'balanced': index.as_query_engine(similarity_top_k=10), 'comprehensive': index.as_query_engine(similarity_top_k=20) } def query(self, question: str, strategy: str = 'balanced'): """Query with adaptive strategy""" if strategy not in self.engines: strategy = 'balanced' return self.engines[strategy].query(question) def auto_select_strategy(self, question: str) -> str: """Automatically select strategy based on question complexity""" if len(question.split()) < 5: return 'precise' elif any(word in question.lower() for word in ['compare', 'analyze', 'comprehensive']): return 'comprehensive' else: return 'balanced'

Custom Response Synthesis

from llama_index.core.prompts import PromptTemplate # Custom prompt for tweet analysis TWEET_ANALYSIS_PROMPT = PromptTemplate( """ You are analyzing tweets to answer questions about the author's opinions and thoughts. Context from tweets: {context_str} Question: {query_str} Instructions: 1. Base your answer only on the provided tweet context 2. If the question asks for a one-line answer, provide exactly one sentence 3. If the author's opinion is unclear, state that explicitly 4. Quote specific tweets when relevant 5. Maintain the conversational tone of the original tweets Answer: """ ) # Apply custom prompt query_engine.update_prompts({ "response_synthesizer:text_qa_template": TWEET_ANALYSIS_PROMPT })

Performance Monitoring and Analytics

import time from collections import defaultdict, deque from datetime import datetime class QueryAnalytics: def __init__(self, window_size: int = 100): self.window_size = window_size self.query_times = deque(maxlen=window_size) self.query_types = defaultdict(int) self.hourly_counts = defaultdict(int) self.error_count = 0 def record_query(self, question: str, response_time: float, error: bool = False): """Record query metrics""" self.query_times.append(response_time) if error: self.error_count += 1 else: # Categorize queries self._categorize_query(question) # Record hourly stats hour = datetime.now().hour self.hourly_counts[hour] += 1 def _categorize_query(self, question: str): """Categorize queries by type""" question_lower = question.lower() if any(word in question_lower for word in ['think', 'opinion', 'believe']): self.query_types['opinion'] += 1 elif any(word in question_lower for word in ['what', 'topic', 'about']): self.query_types['factual'] += 1 elif any(word in question_lower for word in ['how', 'why', 'explain']): self.query_types['analytical'] += 1 else: self.query_types['other'] += 1 def get_performance_metrics(self) -> dict: """Get current performance metrics""" if not self.query_times: return {'error': 'No queries recorded yet'} return { 'average_response_time': sum(self.query_times) / len(self.query_times), 'median_response_time': sorted(self.query_times)[len(self.query_times)//2], 'total_queries': len(self.query_times), 'error_rate': self.error_count / (len(self.query_times) + self.error_count), 'query_distribution': dict(self.query_types), 'peak_hour': max(self.hourly_counts.items(), key=lambda x: x[1])[0] if self.hourly_counts else None }

Deployment Architecture

Single-Node Production Setup

# docker-compose.yml equivalent in code import subprocess import os def setup_production_environment(): """Set up production environment""" # Create necessary directories os.makedirs('./qdrant_data', exist_ok=True) os.makedirs('./logs', exist_ok=True) # Start Ollama (assuming it's installed) try: subprocess.run(['ollama', 'serve'], check=True, background=True) subprocess.run(['ollama', 'pull', 'mistral'], check=True) print("Ollama service started successfully") except subprocess.CalledProcessError: print("Failed to start Ollama - ensure it's installed") # Setup logging import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('./logs/rag_system.log'), logging.StreamHandler() ] ) print("Production environment setup complete")

Multi-Node Scaling Strategy

# Load balancer configuration class LoadBalancedQueryService: def __init__(self, nodes: list): self.nodes = nodes self.current_node = 0 def query(self, question: str): """Distribute queries across nodes""" node = self.nodes[self.current_node] self.current_node = (self.current_node + 1) % len(self.nodes) return node.query(question) def health_check(self): """Check health of all nodes""" healthy_nodes = [] for i, node in enumerate(self.nodes): try: # Simple health check node.query("test") healthy_nodes.append(i) except Exception: pass return len(healthy_nodes) == len(self.nodes)

Best Practices and Common Pitfalls

Embedding Model Consistency

Critical Rule: Always use the same embedding model for indexing and querying.

# Good: Centralized model configuration def get_embedding_model(): return HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5") # Use in both indexing and querying Settings.embed_model = get_embedding_model()

Error Handling and Graceful Degradation

def robust_query_handler(query_engine, question: str) -> dict: """Handle queries with comprehensive error handling""" try: response = query_engine.query(question) return { 'success': True, 'answer': response.response, 'sources': len(response.source_nodes) } except ConnectionError: return { 'success': False, 'error': 'Vector store connection failed', 'fallback': 'Please try again later' } except TimeoutError: return { 'success': False, 'error': 'Query timeout', 'fallback': 'Try a simpler question' } except Exception as e: return { 'success': False, 'error': f'Unexpected error: {str(e)}', 'fallback': 'Please contact support' }

Performance Optimization

# Optimize for different query patterns def optimize_query_engine(index: VectorStoreIndex, query_type: str): """Create optimized query engines for different use cases""" configs = { 'fast': { 'similarity_top_k': 3, 'response_mode': 'compact' }, 'accurate': { 'similarity_top_k': 20, 'response_mode': 'tree_summarize' }, 'streaming': { 'similarity_top_k': 10, 'streaming': True } } config = configs.get(query_type, configs['fast']) return index.as_query_engine(**config)

Testing and Validation

Unit Tests for RAG Components

import unittest from unittest.mock import Mock, patch class TestRAGSystem(unittest.TestCase): def setUp(self): self.mock_query_service = Mock() def test_query_response_format(self): """Test that query responses have correct format""" mock_response = { 'answer': 'Test answer', 'sources_count': 5, 'processing_time': 1.5 } self.mock_query_service.query.return_value = mock_response result = self.mock_query_service.query("test question") self.assertIn('answer', result) self.assertIn('sources_count', result) self.assertIn('processing_time', result) def test_error_handling(self): """Test error handling in query service""" self.mock_query_service.query.side_effect = Exception("Test error") with self.assertRaises(Exception): self.mock_query_service.query("test question") @patch('qdrant_client.QdrantClient') def test_vector_store_connection(self, mock_client): """Test vector store connection""" mock_client.return_value = Mock() # Test connection logic client = qdrant_client.QdrantClient(path="./test_data") self.assertIsNotNone(client) if __name__ == '__main__': unittest.main()

Integration Tests

def test_end_to_end_workflow(): """Test complete indexing and querying workflow""" # Create test data test_documents = [ {'id': '1', 'text': 'I love Star Trek, it\'s the best sci-fi show ever!'}, {'id': '2', 'text': 'Star Trek has great characters and storylines.'}, {'id': '3', 'text': 'The technology in Star Trek is fascinating.'} ] # Test indexing with open('test_tweets.json', 'w') as f: json.dump(test_documents, f) # Run indexing pipeline index = create_index_from_documents('test_tweets.json') # Test querying query_service = ProductionQueryService('./qdrant_data', 'tweets') result = query_service.query("What does the author think about Star Trek?") # Validate results assert 'love' in result['answer'].lower() or 'best' in result['answer'].lower() assert result['sources_count'] > 0 assert result['processing_time'] < 30 # Should be fast print("End-to-end test passed!")

Monitoring and Maintenance

Health Monitoring

class SystemHealthMonitor: def __init__(self, query_service: ProductionQueryService): self.query_service = query_service self.alerts = [] def check_system_health(self) -> dict: """Comprehensive system health check""" health_report = { 'timestamp': datetime.now().isoformat(), 'status': 'healthy', 'components': {} } # Check vector store try: test_result = self.query_service.query("test") health_report['components']['vector_store'] = 'healthy' except Exception as e: health_report['components']['vector_store'] = f'unhealthy: {e}' health_report['status'] = 'degraded' # Check response times stats = self.query_service.get_statistics() if stats['average_query_time'] > 10: # 10 second threshold health_report['components']['performance'] = 'slow' health_report['status'] = 'degraded' else: health_report['components']['performance'] = 'healthy' return health_report def setup_alerts(self): """Set up alerting for system issues""" # This would integrate with your monitoring system pass

Conclusion

Building a production-ready RAG system requires careful consideration of both the indexing and querying phases. By separating these concerns, you can:

  • Scale independently: Optimize each phase for its specific requirements
  • Maintain efficiently: Update indexes without affecting query performance
  • Deploy flexibly: Run multiple query services against the same data
  • Monitor effectively: Track performance metrics for each component

The architecture presented here provides a solid foundation for enterprise RAG systems. Key takeaways:

  • Consistency is critical: Use identical embedding models across phases
  • Separation of concerns: Keep indexing and querying independent
  • Error handling: Implement comprehensive error handling and fallbacks
  • Performance tuning: Optimize retrieval parameters for your use case
  • Production readiness: Include monitoring, logging and health checks

Whether you’re building a customer support system, internal knowledge base, or data analysis platform, this architecture scales from prototype to production while maintaining the flexibility to adapt to changing requirements.

The combination of LlamaIndex’s powerful abstractions, Qdrant’s robust vector storage and Ollama’s local LLM deployment creates a privacy-preserving, cost-effective solution that puts you in complete control of your data and AI capabilities.

Last updated on