Multi-Agent Sql Assistant, Part 2: Building a Rag Manager

In my blog post, I had explored how to create an agent-agent sql server using Crewai & Streamlit. The user can query the SQLite database in natural language. AI agents generate a SQL query based on user input, review it, and check for compliance before running it against the data to get results. I also used the in-theop user interface to maintain control and display the LLM costs associated with all questions clearly and cost controlled. While the prototype was good and produced good results for my small demo data, I knew this wouldn't be enough to get real data. In the previous version, I was sending the entire data schema as context and user input. As database schemas grow larger, they exceed the full schema in LLM. It increases token usage, lowers response times, and makes connections more likely. I needed a way to feed the correct schema summaries to LLM. That's it Rag (to bring back a generation) It comes in.
In this blog tutorial, I create a rag manager and add – many times Rag Techniques In my SQL Assistant it compares its performance on metrics like response time and token usage. Assistant now supports four rag techniques:
- No rag: Exceeds the complete schema (which is the basis of the comparison)
- Keyword Rag: It uses special keyword matching to select relevant tables
- Faiss Rag: You comply with the similarity of the semantic vector by faiss with
all-MiniLM-L6-v2embedding - Chroma Rag: A Vector Store Solution with CHROMADB for limited search of distance generation
For this project, I focused only on practical, lightweight and inexpensive (free) rag techniques. You can add any artificial number to the top and choose the best one for your case. To facilitate testing and analysis, I developed a practical performance comparison tool that analyzes token reduction, table count, response time, and query accuracy for all four strategies.
Building a Rag Manager
This page rag_manager.py The file contains all the execution of the Rag manager. First, I created a BaseRAG Section – The template I use for all my different rag designs. It ensures that every rag pattern follows the same structure. Any new strategy will have two components: a method to fetch the appropriate schema based on the user's query, and another method that defines what the method means. By using Abstract Base Section (ABC), I keep the code clean, modular, and easy to extend over time.
from typing import Dict, List, Any, Optional
from abc import ABC, abstractmethod
class BaseRAG(ABC):
"""Base class for all RAG implementations."""
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
self.name = self.__class__.__name__
@abstractmethod
def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
"""Get relevant schema for the user query."""
pass
@abstractmethod
def get_approach_info(self) -> Dict[str, Any]:
"""Get information about this RAG approach."""
pass
There is no rag strategy
This is actually the same method I used before, where I sent the whole data schema as LLM content without filtering or optimization. This method is suitable for very small Schems (perhaps less than 10 tables).
class NoRAG(BaseRAG):
"""No RAG - returns full schema."""
def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
return get_structured_schema(self.db_path)
def get_approach_info(self) -> Dict[str, Any]:
return {
"name": "No RAG (Full Schema)",
"description": "Uses complete database schema",
"pros": ["Simple", "Always complete", "No setup required"],
"cons": ["High token usage", "Slower for large schemas"],
"best_for": "Small schemas (< 10 tables)"
}
Keyword rag strategy
In the keyword phase, I use a bunch of pre-defined keywords that are entered in each table in the schema. When a user queries something, the system checks for keyword matches in the query and pulls out the most relevant tables. This way, I don't send the whole schema to LLM – save tokens and speed things up. It works best when your schema is familiar and your queries are business related or follow common patterns.
This page _build_table_keywords(self) Method is the core of how Keyword Logic works. It has a strong keyword mapping for each table in the schema. It is useful to combine user query terms (such as “sales”, “brand”, “customer”) with the most relevant tables.
class KeywordRAG(BaseRAG):
"""Keyword-based RAG using business context matching."""
def __init__(self, db_path: str = DB_PATH):
super().__init__(db_path)
self.table_keywords = self._build_table_keywords()
def _build_table_keywords(self) -> Dict[str, List[str]]:
"""Build keyword mappings for each table."""
return {
'products': ['product', 'item', 'catalog', 'price', 'category', 'brand', 'sales', 'sold'],
'product_variants': ['variant', 'product', 'sku', 'color', 'size', 'brand', 'sales', 'sold'],
'customers': ['customer', 'user', 'client', 'buyer', 'person', 'email', 'name'],
'orders': ['order', 'purchase', 'transaction', 'sale', 'buy', 'total', 'amount', 'sales'],
'order_items': ['item', 'product', 'quantity', 'line', 'detail', 'sales', 'sold', 'brand'],
'payments': ['payment', 'pay', 'money', 'revenue', 'amount'],
'inventory': ['inventory', 'stock', 'quantity', 'warehouse', 'available'],
'reviews': ['review', 'rating', 'feedback', 'comment', 'opinion'],
'suppliers': ['supplier', 'vendor', 'procurement', 'purchase'],
'categories': ['category', 'type', 'classification', 'group'],
'brands': ['brand', 'manufacturer', 'company', 'sales', 'sold', 'quantity', 'total'],
'addresses': ['address', 'location', 'shipping', 'billing'],
'shipments': ['shipment', 'delivery', 'shipping', 'tracking'],
'discounts': ['discount', 'coupon', 'promotion', 'offer'],
'warehouses': ['warehouse', 'facility', 'location', 'storage'],
'employees': ['employee', 'staff', 'worker', 'person'],
'departments': ['department', 'division', 'team'],
'product_images': ['image', 'photo', 'picture', 'media'],
'purchase_orders': ['purchase', 'procurement', 'supplier', 'order'],
'purchase_order_items': ['purchase', 'procurement', 'supplier', 'item'],
'order_discounts': ['discount', 'coupon', 'promotion', 'order'],
'shipment_items': ['shipment', 'delivery', 'item', 'tracking']
}
def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
import re
# Score tables by keyword relevance
query_words = set(re.findall(r'bw+b', user_query.lower()))
table_scores = {}
for table_name, keywords in self.table_keywords.items():
score = 0
# Count keyword matches
for keyword in keywords:
if keyword in query_words:
score += 3
# Partial matches
for query_word in query_words:
if keyword in query_word or query_word in keyword:
score += 1
# Bonus for exact table name match
if table_name.lower() in query_words:
score += 10
table_scores[table_name] = score
# Get top scoring tables
sorted_tables = sorted(table_scores.items(), key=lambda x: x[1], reverse=True)
relevant_tables = [table for table, score in sorted_tables[:max_tables] if score > 0]
# Fallback to default tables if no matches
if not relevant_tables:
relevant_tables = self._get_default_tables(user_query)[:max_tables]
# Build schema for selected tables
return self._build_schema(relevant_tables)
def _get_default_tables(self, user_query: str) -> List[str]:
"""Get default tables based on query patterns."""
query_lower = user_query.lower()
# Sales/revenue queries
if any(word in query_lower for word in ['revenue', 'sales', 'total', 'amount', 'brand']):
return ['orders', 'order_items', 'product_variants', 'products', 'brands']
# Product queries
if any(word in query_lower for word in ['product', 'item', 'catalog']):
return ['products', 'product_variants', 'categories', 'brands']
# Customer queries
if any(word in query_lower for word in ['customer', 'user', 'buyer']):
return ['customers', 'orders', 'addresses']
# Default
return ['products', 'customers', 'orders', 'order_items']
def _build_schema(self, table_names: List[str]) -> str:
"""Build schema string for specified tables."""
if not table_names:
return get_structured_schema(self.db_path)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
schema_lines = ["Available tables and columns:"]
try:
for table_name in table_names:
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
if columns:
col_names = [col[1] for col in columns]
schema_lines.append(f"- {table_name}: {', '.join(col_names)}")
finally:
conn.close()
return 'n'.join(schema_lines)
def get_approach_info(self) -> Dict[str, Any]:
return {
"name": "Keyword RAG",
"description": "Uses business context keywords to match relevant tables",
"pros": ["Fast", "No external dependencies", "Good for business queries"],
"cons": ["Limited by predefined keywords", "May miss complex relationships"],
"best_for": "Business queries with clear domain terms"
}
Faiss rag method
This page Faiss Rag Strategy is where things get professional. Instead of dumping the entire schema, I embedded the metadata of each table (columns, relationships, business context) into vectors using terformer using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using sentence using terform. When the user asks a question, he is included in that question and also uses Faiss to make the matching of the layout. I like faiss because it's free, works locally, and gives accurate results while saving tokens.
The only catch is that the setup takes more steps, and uses more memory than the basic methods. LLMS models and embedded models don't know what your tables mean unless you define them. In _get_business_context() Method, we need to manually write a brief description of each table represented in the business.
In _extract_table_info() Method, I pull in table names, column names, and good foreign relations from SQLite's Pragma queries to build a dictionary with structured information about each table. Finally, in _create_table_description() By the way, the complete definitions of each table are created for embedding by Sentencetransformer.
class FAISSVectorRAG(BaseRAG):
"""FAISS-based vector RAG using sentence transformers."""
def __init__(self, db_path: str = DB_PATH):
super().__init__(db_path)
self.model = None
self.index = None
self.table_info = {}
self.table_names = []
self._initialize()
def _initialize(self):
"""Initialize the FAISS vector store and embeddings."""
try:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
print("🔄 Initializing FAISS Vector RAG...")
# Load embedding model
self.model = SentenceTransformer('all-MiniLM-L6-v2')
print("✅ Loaded embedding model: all-MiniLM-L6-v2")
# Extract table information and create embeddings
self.table_info = self._extract_table_info()
# Create embeddings for each table
table_descriptions = []
self.table_names = []
for table_name, info in self.table_info.items():
description = self._create_table_description(table_name, info)
table_descriptions.append(description)
self.table_names.append(table_name)
# Generate embeddings
print(f"🔄 Generating embeddings for {len(table_descriptions)} tables...")
embeddings = self.model.encode(table_descriptions)
# Create FAISS index
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity
# Normalize embeddings for cosine similarity
faiss.normalize_L2(embeddings)
self.index.add(embeddings.astype('float32'))
print(f"✅ FAISS Vector RAG initialized with {len(table_descriptions)} tables")
except Exception as e:
print(f"❌ Error initializing FAISS Vector RAG: {e}")
self.model = None
self.index = None
def _extract_table_info(self) -> Dict[str, Dict]:
"""Extract detailed information about each table."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
table_info = {}
try:
# Get all table names
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
for (table_name,) in tables:
info = {
'columns': [],
'foreign_keys': [],
'business_context': self._get_business_context(table_name)
}
# Get column information
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
for col in columns:
info['columns'].append({
'name': col[1],
'type': col[2],
'primary_key': bool(col[5])
})
# Get foreign key information
cursor.execute(f"PRAGMA foreign_key_list({table_name});")
fks = cursor.fetchall()
for fk in fks:
info['foreign_keys'].append({
'column': fk[3],
'references_table': fk[2],
'references_column': fk[4]
})
table_info[table_name] = info
finally:
conn.close()
return table_info
def _get_business_context(self, table_name: str) -> str:
"""Get business context description for tables."""
contexts = {
'products': 'Product catalog with items, prices, categories, and brand information. Core inventory data.',
'product_variants': 'Product variations like colors, sizes, SKUs. Links products to specific sellable items.',
'customers': 'Customer profiles with personal information, contact details, and account status.',
'orders': 'Purchase transactions with totals, dates, status, and customer relationships.',
'order_items': 'Individual line items within orders. Contains quantities, prices, and product references.',
'payments': 'Payment processing records with methods, amounts, and transaction status.',
'inventory': 'Stock levels and warehouse quantities for product variants.',
'reviews': 'Customer feedback, ratings, and product reviews.',
'suppliers': 'Vendor information for procurement and supply chain management.',
'categories': 'Product categorization hierarchy for organizing catalog.',
'brands': 'Brand information for products and marketing purposes.',
'addresses': 'Customer shipping and billing address information.',
'shipments': 'Delivery tracking and shipping status information.',
'discounts': 'Promotional codes, coupons, and discount campaigns.',
'warehouses': 'Storage facility locations and warehouse management.',
'employees': 'Staff information and organizational structure.',
'departments': 'Organizational divisions and team structure.',
'product_images': 'Product photography and media assets.',
'purchase_orders': 'Procurement orders from suppliers.',
'purchase_order_items': 'Line items for supplier purchase orders.',
'order_discounts': 'Applied discounts and promotions on orders.',
'shipment_items': 'Individual items within shipment packages.'
}
return contexts.get(table_name, f'Database table for {table_name} related operations.')
def _create_table_description(self, table_name: str, info: Dict) -> str:
"""Create a comprehensive description for embedding."""
description = f"Table: {table_name}n"
description += f"Purpose: {info['business_context']}n"
# Add column information
description += "Columns: "
col_names = [col['name'] for col in info['columns']]
description += ", ".join(col_names) + "n"
# Add relationship information
if info['foreign_keys']:
description += "Relationships: "
relationships = []
for fk in info['foreign_keys']:
relationships.append(f"links to {fk['references_table']} via {fk['column']}")
description += "; ".join(relationships) + "n"
# Add common use cases based on table type
use_cases = self._get_use_cases(table_name)
if use_cases:
description += f"Common queries: {use_cases}"
return description
def _get_use_cases(self, table_name: str) -> str:
"""Get common use cases for each table."""
use_cases = {
'products': 'product searches, catalog listings, price queries, inventory checks',
'customers': 'customer lookup, registration analysis, geographic distribution',
'orders': 'sales analysis, revenue tracking, order history, status monitoring',
'order_items': 'product sales performance, revenue by product, order composition',
'payments': 'payment processing, revenue reconciliation, payment method analysis',
'brands': 'brand performance, sales by brand, brand comparison',
'categories': 'category analysis, product organization, catalog structure'
}
return use_cases.get(table_name, 'general data queries and analysis')
def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
"""Get relevant schema using vector similarity search."""
if self.model is None or self.index is None:
print("⚠️ FAISS not initialized, falling back to full schema")
return get_structured_schema(self.db_path)
try:
import faiss
import numpy as np
# Generate query embedding
query_embedding = self.model.encode([user_query])
faiss.normalize_L2(query_embedding)
# Search for similar tables
scores, indices = self.index.search(query_embedding.astype('float32'), max_tables)
# Get relevant table names
relevant_tables = []
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
if idx < len(self.table_names) and score > 0.1: # Minimum similarity threshold
relevant_tables.append(self.table_names[idx])
# Fallback if no relevant tables found
if not relevant_tables:
print("⚠️ No relevant tables found, using defaults")
relevant_tables = self._get_default_tables(user_query)[:max_tables]
# Build schema for selected tables
return self._build_schema(relevant_tables)
except Exception as e:
print(f"⚠️ Vector search failed: {e}, falling back to full schema")
return get_structured_schema(self.db_path)
def _get_default_tables(self, user_query: str) -> List[str]:
"""Get default tables based on query patterns."""
query_lower = user_query.lower()
if any(word in query_lower for word in ['revenue', 'sales', 'total', 'amount', 'brand']):
return ['orders', 'order_items', 'product_variants', 'products', 'brands']
elif any(word in query_lower for word in ['product', 'item', 'catalog']):
return ['products', 'product_variants', 'categories', 'brands']
elif any(word in query_lower for word in ['customer', 'user', 'buyer']):
return ['customers', 'orders', 'addresses']
else:
return ['products', 'customers', 'orders', 'order_items']
def _build_schema(self, table_names: List[str]) -> str:
"""Build schema string for specified tables."""
if not table_names:
return get_structured_schema(self.db_path)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
schema_lines = ["Available tables and columns:"]
try:
for table_name in table_names:
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
if columns:
col_names = [col[1] for col in columns]
schema_lines.append(f"- {table_name}: {', '.join(col_names)}")
finally:
conn.close()
return 'n'.join(schema_lines)
def get_approach_info(self) -> Dict[str, Any]:
return {
"name": "FAISS Vector RAG",
"description": "Uses semantic embeddings and vector similarity search",
"pros": ["Semantic understanding", "Handles complex queries", "No API costs"],
"cons": ["Requires model download", "Higher memory usage", "Setup complexity"],
"best_for": "Complex queries, large schemas, semantic relationships"
}
Chroma Rag technique
Chroma Rag is a more friendly version of Faiss because it offers persistent storage. Instead of saving the declaration in memory, Chroma saves it locally, so even if I restart the application, the vector index is still there. Love to faiss, I still need to manually define what each table does with business names (between _get_business_context()). I embed my schema definitions and store them in CHROMADDB. When you launch, Transformer (Minilm) is loaded. If the vector already exists, it is loaded. If not, I release information + phone descriptions _populate_collection() generating and storing vectors. This process needs to be done once or when the schema changes.
It's fast, consistent across sessions, and easy to set up. I chose it because it's free, doesn't require external services, and works well for real-world use cases where you want to scale without worrying about losing the vector index or redoing everything every time.
class ChromaVectorRAG(BaseRAG):
"""Chroma-based vector RAG using sentence transformers with persistent storage."""
def __init__(self, db_path: str = DB_PATH):
super().__init__(db_path)
self.model = None
self.chroma_client = None
self.collection = None
self.table_info = {}
self.table_names = []
self._initialize()
def _initialize(self):
"""Initialize the Chroma vector store and embeddings."""
try:
import chromadb
from sentence_transformers import SentenceTransformer
print("🔄 Initializing Chroma Vector RAG...")
# Load embedding model
self.model = SentenceTransformer('all-MiniLM-L6-v2')
print("✅ Loaded embedding model: all-MiniLM-L6-v2")
# Initialize Chroma client (persistent storage)
self.chroma_client = chromadb.PersistentClient(path="./data/chroma_db")
# Get or create collection
collection_name = "schema_tables"
try:
self.collection = self.chroma_client.get_collection(collection_name)
print("✅ Loaded existing Chroma collection")
except:
# Create new collection if it doesn't exist
self.collection = self.chroma_client.create_collection(
name=collection_name,
metadata={"description": "Database schema table embeddings"}
)
print("✅ Created new Chroma collection")
# Extract table information and create embeddings
self.table_info = self._extract_table_info()
self._populate_collection()
# Load table names for reference
self._load_table_names()
print(f"✅ Chroma Vector RAG initialized with {len(self.table_names)} tables")
except Exception as e:
print(f"❌ Error initializing Chroma Vector RAG: {e}")
self.model = None
self.chroma_client = None
self.collection = None
def _extract_table_info(self) -> Dict[str, Dict]:
"""Extract detailed information about each table."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
table_info = {}
try:
# Get all table names
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
for (table_name,) in tables:
info = {
'columns': [],
'foreign_keys': [],
'business_context': self._get_business_context(table_name)
}
# Get column information
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
for col in columns:
info['columns'].append({
'name': col[1],
'type': col[2],
'primary_key': bool(col[5])
})
# Get foreign key information
cursor.execute(f"PRAGMA foreign_key_list({table_name});")
fks = cursor.fetchall()
for fk in fks:
info['foreign_keys'].append({
'column': fk[3],
'references_table': fk[2],
'references_column': fk[4]
})
table_info[table_name] = info
finally:
conn.close()
return table_info
def _get_business_context(self, table_name: str) -> str:
"""Get business context description for tables."""
contexts = {
'products': 'Product catalog with items, prices, categories, and brand information. Core inventory data.',
'product_variants': 'Product variations like colors, sizes, SKUs. Links products to specific sellable items.',
'customers': 'Customer profiles with personal information, contact details, and account status.',
'orders': 'Purchase transactions with totals, dates, status, and customer relationships.',
'order_items': 'Individual line items within orders. Contains quantities, prices, and product references.',
'payments': 'Payment processing records with methods, amounts, and transaction status.',
'inventory': 'Stock levels and warehouse quantities for product variants.',
'reviews': 'Customer feedback, ratings, and product reviews.',
'suppliers': 'Vendor information for procurement and supply chain management.',
'categories': 'Product categorization hierarchy for organizing catalog.',
'brands': 'Brand information for products and marketing purposes.',
'addresses': 'Customer shipping and billing address information.',
'shipments': 'Delivery tracking and shipping status information.',
'discounts': 'Promotional codes, coupons, and discount campaigns.',
'warehouses': 'Storage facility locations and warehouse management.',
'employees': 'Staff information and organizational structure.',
'departments': 'Organizational divisions and team structure.',
'product_images': 'Product photography and media assets.',
'purchase_orders': 'Procurement orders from suppliers.',
'purchase_order_items': 'Line items for supplier purchase orders.',
'order_discounts': 'Applied discounts and promotions on orders.',
'shipment_items': 'Individual items within shipment packages.'
}
return contexts.get(table_name, f'Database table for {table_name} related operations.')
def _populate_collection(self):
"""Populate Chroma collection with table embeddings."""
if not self.collection or not self.table_info:
return
documents = []
metadatas = []
ids = []
for table_name, info in self.table_info.items():
# Create comprehensive description
description = self._create_table_description(table_name, info)
documents.append(description)
metadatas.append({
'table_name': table_name,
'column_count': len(info['columns']),
'has_foreign_keys': len(info['foreign_keys']) > 0,
'business_context': info['business_context']
})
ids.append(f"table_{table_name}")
# Add to collection
self.collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
print(f"✅ Added {len(documents)} table embeddings to Chroma collection")
def _create_table_description(self, table_name: str, info: Dict) -> str:
"""Create a comprehensive description for embedding."""
description = f"Table: {table_name}n"
description += f"Purpose: {info['business_context']}n"
# Add column information
description += "Columns: "
col_names = [col['name'] for col in info['columns']]
description += ", ".join(col_names) + "n"
# Add relationship information
if info['foreign_keys']:
description += "Relationships: "
relationships = []
for fk in info['foreign_keys']:
relationships.append(f"links to {fk['references_table']} via {fk['column']}")
description += "; ".join(relationships) + "n"
# Add common use cases
use_cases = self._get_use_cases(table_name)
if use_cases:
description += f"Common queries: {use_cases}"
return description
def _get_use_cases(self, table_name: str) -> str:
"""Get common use cases for each table."""
use_cases = {
'products': 'product searches, catalog listings, price queries, inventory checks',
'customers': 'customer lookup, registration analysis, geographic distribution',
'orders': 'sales analysis, revenue tracking, order history, status monitoring',
'order_items': 'product sales performance, revenue by product, order composition',
'payments': 'payment processing, revenue reconciliation, payment method analysis',
'brands': 'brand performance, sales by brand, brand comparison',
'categories': 'category analysis, product organization, catalog structure'
}
return use_cases.get(table_name, 'general data queries and analysis')
def _load_table_names(self):
"""Load table names from the collection."""
if not self.collection:
return
try:
# Get all items from collection
results = self.collection.get()
self.table_names = [metadata['table_name'] for metadata in results['metadatas']]
except Exception as e:
print(f"⚠️ Could not load table names from Chroma: {e}")
self.table_names = []
def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
"""Get relevant schema using Chroma vector similarity search."""
if not self.collection:
print("⚠️ Chroma not initialized, falling back to full schema")
return get_structured_schema(self.db_path)
try:
# Search for similar tables
results = self.collection.query(
query_texts=[user_query],
n_results=max_tables
)
# Extract relevant table names
relevant_tables = []
if results['metadatas'] and len(results['metadatas']) > 0:
for metadata in results['metadatas'][0]:
relevant_tables.append(metadata['table_name'])
# Fallback if no relevant tables found
if not relevant_tables:
print("⚠️ No relevant tables found, using defaults")
relevant_tables = self._get_default_tables(user_query)[:max_tables]
# Build schema for selected tables
return self._build_schema(relevant_tables)
except Exception as e:
print(f"⚠️ Chroma search failed: {e}, falling back to full schema")
return get_structured_schema(self.db_path)
def _get_default_tables(self, user_query: str) -> List[str]:
"""Get default tables based on query patterns."""
query_lower = user_query.lower()
if any(word in query_lower for word in ['revenue', 'sales', 'total', 'amount', 'brand']):
return ['orders', 'order_items', 'product_variants', 'products', 'brands']
elif any(word in query_lower for word in ['product', 'item', 'catalog']):
return ['products', 'product_variants', 'categories', 'brands']
elif any(word in query_lower for word in ['customer', 'user', 'buyer']):
return ['customers', 'orders', 'addresses']
else:
return ['products', 'customers', 'orders', 'order_items']
def _build_schema(self, table_names: List[str]) -> str:
"""Build schema string for specified tables."""
if not table_names:
return get_structured_schema(self.db_path)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
schema_lines = ["Available tables and columns:"]
try:
for table_name in table_names:
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
if columns:
col_names = [col[1] for col in columns]
schema_lines.append(f"- {table_name}: {', '.join(col_names)}")
finally:
conn.close()
return 'n'.join(schema_lines)
def get_approach_info(self) -> Dict[str, Any]:
return {
"name": "Chroma Vector RAG",
"description": "Uses Chroma DB for persistent vector storage with semantic search",
"pros": ["Persistent storage", "Fast queries", "Scalable", "Easy management"],
"cons": ["Requires disk space", "Initial setup time", "Additional dependency"],
"best_for": "Production environments, persistent workflows, team collaboration"
}
Compare different rag techniques
This one RAGManager Control Center Control Center Switch between different rag techniques. Based on the user's query, it takes the appropriate path, fetches the appropriate part of the schema, and tracks performance such as response time, token savings and table count. It also has a function to compare with Benchmark all the rags side-by-side, and stores historical metrics to analyze how each one performs over time. Super Handy for Good Functional Testing and Keeping Things Done.
All different classes of rag techniques are implemented and stored internally self.approaches. Each method of Rag is a class that it inherits from BaseRAGso they all have a constant interface (get_relevant_schema() and get_approach_info()). This means you can easily connect a new technique (say pinecone or braid) as long as you stretch BaseRAG.
The way get_relevant_schema() Returns the appropriate schema for that query based on the selected schema. If an invalid strategy is passed or their failure for some reason, it goes back to 'Keyword RAG' Strategy.
The way compare_approaches() It avoids the same question in all rag techniques. It measures:
- The length of the resulting schema
- % Token reduction vs full schema
- Time to answer
- Number of tables returned
This is really useful for benchmark techniques side-by-side and choose the one that best suits your use case.
class RAGManager:
"""Manager for multiple RAG approaches."""
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
self.approaches = {
'no_rag': NoRAG(db_path),
'keyword': KeywordRAG(db_path),
'faiss': FAISSVectorRAG(db_path),
'chroma': ChromaVectorRAG(db_path)
}
self.performance_metrics = {}
def get_available_approaches(self) -> Dict[str, Dict[str, Any]]:
"""Get information about all available RAG approaches."""
return {
approach_id: approach.get_approach_info()
for approach_id, approach in self.approaches.items()
}
def get_relevant_schema(self, user_query: str, approach: str = 'keyword', max_tables: int = 5) -> str:
"""Get relevant schema using specified approach."""
if approach not in self.approaches:
print(f"⚠️ Unknown approach '{approach}', falling back to keyword")
approach = 'keyword'
start_time = time.time()
try:
schema = self.approaches[approach].get_relevant_schema(user_query, max_tables)
# Record performance metrics
end_time = time.time()
self._record_performance(approach, user_query, schema, end_time - start_time)
return schema
except Exception as e:
print(f"⚠️ Error with {approach} approach: {e}")
# Fallback to keyword approach
if approach != 'keyword':
return self.get_relevant_schema(user_query, 'keyword', max_tables)
else:
return get_structured_schema(self.db_path)
def compare_approaches(self, user_query: str, max_tables: int = 5) -> Dict[str, Any]:
"""Compare all approaches for a given query."""
results = {}
full_schema = get_structured_schema(self.db_path)
full_schema_length = len(full_schema)
for approach_id, approach in self.approaches.items():
start_time = time.time()
try:
schema = approach.get_relevant_schema(user_query, max_tables)
end_time = time.time()
results[approach_id] = {
'schema': schema,
'schema_length': len(schema),
'token_reduction': ((full_schema_length - len(schema)) / full_schema_length) * 100,
'response_time': end_time - start_time,
'table_count': len([line for line in schema.split('n') if line.startswith('- ')]),
'success': True
}
except Exception as e:
results[approach_id] = {
'schema': '',
'schema_length': 0,
'token_reduction': 0,
'response_time': 0,
'table_count': 0,
'success': False,
'error': str(e)
}
return results
def _record_performance(self, approach: str, query: str, schema: str, response_time: float):
"""Record performance metrics for analysis."""
if approach not in self.performance_metrics:
self.performance_metrics[approach] = []
full_schema_length = len(get_structured_schema(self.db_path))
schema_length = len(schema)
metrics = {
'query': query,
'schema_length': schema_length,
'token_reduction': ((full_schema_length - schema_length) / full_schema_length) * 100,
'response_time': response_time,
'table_count': len([line for line in schema.split('n') if line.startswith('- ')]),
'timestamp': time.time()
}
self.performance_metrics[approach].append(metrics)
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary for all approaches."""
summary = {}
for approach, metrics_list in self.performance_metrics.items():
if not metrics_list:
continue
avg_token_reduction = sum(m['token_reduction'] for m in metrics_list) / len(metrics_list)
avg_response_time = sum(m['response_time'] for m in metrics_list) / len(metrics_list)
avg_table_count = sum(m['table_count'] for m in metrics_list) / len(metrics_list)
summary[approach] = {
'queries_processed': len(metrics_list),
'avg_token_reduction': round(avg_token_reduction, 1),
'avg_response_time': round(avg_response_time, 3),
'avg_table_count': round(avg_table_count, 1)
}
return summary
# Convenience functions for backward compatibility
def get_rag_enhanced_schema(user_query: str, db_path: str = DB_PATH, approach: str = 'keyword') -> str:
"""Get RAG-enhanced schema using specified approach."""
manager = RAGManager(db_path)
return manager.get_relevant_schema(user_query, approach)
# Global cached instance
_rag_manager_instance = None
def get_cached_rag_manager(db_path: str = DB_PATH) -> RAGManager:
"""Get cached RAG manager instance."""
global _rag_manager_instance
if _rag_manager_instance is None:
_rag_manager_instance = RAGManager(db_path)
return _rag_manager_instance
The Streamlit app is fully integrated with this manager, so users can choose the strategy they want and see real results. You can check out the complete code on GitHub here. Here's a working demo of the new app in Action:

Final thoughts
This is not the end; There is still a lot to improve. I need to emphasize the testing of various attacks and strengthen Gudderails to reduce hallucinations and ensure data security. It would be good to create a role-based access system for data management. Perhaps replacing Slitterlit with a Frontand Frontal framework like Rect would make the app more realistic for real-world presentation. All this, next time.
Before you go…
Follow me so you don't miss any new posts I write in the future; You will find my other articles on my profile page. You can connect with me LinkedIn or X!



