How to Build an Effective Knowledge Base for AI Models

only as strong as their knowledge base. An accurate and curated knowledge base improves both model speed and accuracy—areas where current models often fail. In fact, recent research shows that big AI debates are wrong on almost every secondary question.
In this article, I will cover how to build a knowledge base with detailed steps and mistakes to avoid.
6 steps to building an effective knowledge base
Taking a systematic approach to building a knowledge base helps you create one that is standardized, scalable, and self-explanatory. Any new developer can easily add to or update the knowledge base over time to keep it current and reliable.
To make sure you get there, you can follow these six steps whenever you start building a knowledge base:
1. Collect data
The main misconception about collecting database data is thinking that more is better. It makes you fall into the classic “garbage in, garbage out” issue.
Prioritize value over volume and collect all the data relevant to your model. It can be in the form of:
- Authentic and educational content that includes facts and practices
- Problem-solving content in the form of informative text or videos
- Historical data showing past problems or log file
- Real-time data covering live system status or the latest news feeds
- Model domain data for more context
It is important to understand that your system does not need all the information. For example, if you are building a chatbot for customer support, your model may only need factual and educational content that explains company policy and procedures. It ensures that your model doesn't generate an invalid or out-of-range response and sticks to what you're given.
Tip: There is a growing trend to feed AI-generated data while building a knowledge base for new AI models. I feel like this trend is a double-edged sword. It offers speed, but you have to test the output for reliability and fluff. Always prepare content for quick responses and validate your output before adding it to the knowledge base.
2. Clean and segment the data
After the raw data is ready, you can clean it first. The cleaning process will usually include:
- Removes duplicate and outdated content
- Removing unnecessary information such as headings, footers, and page numbers
- Content moderation, both format and content wise (consistent terms)
This cleaned data is then divided into logical chunks, where each chunk contains one clear idea or topic.
Every component is also assigned metadata that provides quick context about its content. This metadata helps AI models to browse the knowledge base quickly and quickly access the parts with relevant information.
You can also set up role-based access for episodes to ensure which roles have access to information in that episode. Although many roles may have access to the model, not everyone can access all the data. Chunking is where you can set up security and access control within the model.
Tip: The best practice I always follow is to aggregate data based on user queries instead of document structure. For example, you have an access control document. You can include it in common user questions like 'How to change password?', 'What is the password policy?', etc. You can then verify these passages by checking the actual questions. A safe set would be 10-12 questions.
3. Organize and identify the data
Text fragments are converted into numbers called vectors using an embedding model such as OpenAI v3-Large, BGE-M3, etc.
AI models can read vectors faster than a large block of text. After vectorization, the metadata attached to the chunk is then attached to the vector. The final episode will look like this:
[ Vector (numbers) ] + [ Original text ] + [ Metadata ]
4. Select a data storage platform
You can save this vector output to a vector database such as Pinecone, Milvus, or Weaviate for download. You can load vector data by writing simple python code.
import math
import time
import json
from dataclasses import dataclass, field
from typing import Any
import numpy as np
# Vector Normalization + Metadata
def normalize_l2(vector: list[float]) -> list[float]:
"""
Return an L2-normalized copy of `vec`.
Many vector stores use dot-product similarity. If you normalize vectors to
unit length, dot-product becomes equivalent to cosine similarity.
"""
arr = np.array(vector, dtype=np.float32)
norm = np.linalg.norm(arr)
if norm == 0:
return vector
return (arr / norm).tolist()
def prepare_record(
doc_id: str,
embedding: list[float],
text: str,
source: str,
extra_metadata: dict[str, Any] | None = None,
) -> dict:
"""
Prepare a single record for vector DB upsert.
Metadata serves two purposes:
- Filtering: narrow down search to a subset
"""
metadata = {
"source": source,
"text_preview": text[:500],
"char_count": len(text),
}
if extra_metadata:
metadata.update(extra_metadata)
return {
"id": doc_id,
"values": normalize_l2(embedding),
"metadata": metadata,
}
# Vector Quantization
# Scalar Quantization / SQ
def scalar_quantization(input_vec) -> dict:
"""
This funtion demonstrates
how to compress float32 input_vec to uint8
"""
input_arr = np.array(input_vec, dtype=np.float32)
min, max = input_arr.min(), input_arr.max()
range = (max - min)
if range == 0:
quantized = np.zeros_like(arr, dtype=np.uint8)
else:
quantized = ((input_arr - min) / range * 255).astype(np.uint8)
return {
"quantized": quantized.tolist(),
"min": float(min),
"max": float(max),
}
def scalar_dequantization(record: dict) -> list[float]:
"""
You can Reconstruct the original vector
by approximate float32 vector from uint8.
"""
arr = np.array(record["quantized"], dtype=np.float32)
return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist()
# Product Quantization / PQ
def train_product_quantizer( vectors, num_subvectors: int = 8, num_centroids: int = 256, max_iterations: int = 20) -> list:
"""
This function demonstrates
split vector into subvectors, cluster each independently
"""
from sklearn.cluster import KMeans
dim = vectors.shape[1]
assert dim % num_subvectors == 0, "dim must be divisible by num_subvectors"
sub_dim = dim // num_subvectors
codebooks = []
for i in range(num_subvectors):
sub_vectors = vectors[:, i * sub_dim : (i + 1) * sub_dim]
kmeans = KMeans(n_clusters=num_centroids, max_iter=max_iterations, n_init=1)
kmeans.fit(sub_vectors)
codebooks.append(kmeans.cluster_centers_)
return codebooks
def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]:
"""
Encode a single vector into PQ codes (one uint8 per subvector)
"""
num_subvectors = len(codebooks)
sub_dim = len(vector) // num_subvectors
codes = []
for i, codebook in enumerate(codebooks):
sub_vec = vector[i * sub_dim : (i + 1) * sub_dim]
distances = np.linalg.norm(codebook - sub_vec, axis=1)
codes.append(int(np.argmin(distances)))
return codes
def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray:
"""
Reconstruct approximate vector from PQ codes
"""
return np.concatenate(
[codebook[code] for code, codebook in zip(codes, codebooks)]
)
Tip: To increase the loading speed, I suggest using the batch installation option. You can also normalize the vectors (make them all the same size) during the loading phase. After normalization, scale (compress) it to increase storage. This extra normalization and calibration step strengthens later retrieval.
5. Prepare for recovery
To enable retrieval from a vector database, you can use orchestration frameworks such as LlamaIndex and LangChain.
LlamaIndex can browse through the vector database quickly and get to the node where there is content related to the user's query.
LangChain then takes the data from the chunk and transforms it according to the user's query. For example, summarizing a text or writing an email in it.
"""
Hybrid Retrieval: Take benefits from both keyword search and vector similarity
Where each approach shines:
- Keywords: looks for exact matches, but will miss searches with synonym
- Embeddings: has advantage of capturing the meaning, but there is possibility of missing exact keyword
Hybrid is a combination of both to get the best of each.
"""
import math
from collections import defaultdict
from dataclasses import dataclass
import numpy as np
@dataclass
class Document:
id: str
text: str
embedding: list[float]
class BestMatching25Index:
def __init__(self, k1: float = 1.5, b: float = 0.75):
# Here k1 is the term frequency saturation limit
# and b is length of normalization
self.k1 = k1
self.b = b
self.doc_lengths: dict[str, int] = {}
self.avg_doc_length: float = 0
self.doc_freqs: dict[str, int] = {}
self.term_freqs: dict[str, dict[str, int]] = {}
self.corpus_size: int = 0
def _tokenize(self, text: str) -> list[str]:
return text.lower().split()
def index(self, documents: list[Document]) -> None:
self.corpus_size = len(documents)
for doc in documents:
tokens = self._tokenize(doc.text)
self.doc_lengths[doc.id] = len(tokens)
self.term_freqs[doc.id] = {}
seen_terms: set[str] = set()
for token in tokens:
self.term_freqs[doc.id][token] = self.term_freqs[doc.id].get(token, 0) + 1
if token not in seen_terms:
self.doc_freqs[token] = self.doc_freqs.get(token, 0) + 1
seen_terms.add(token)
self.avg_doc_length = sum(self.doc_lengths.values()) / self.corpus_size
def score(self, query: str, doc_id: str) -> float:
query_terms = self._tokenize(query)
doc_len = self.doc_lengths[doc_id]
score = 0.0
for term in query_terms:
if term not in self.doc_freqs or term not in self.term_freqs.get(doc_id, {}):
continue
tf = self.term_freqs[doc_id][term]
df = self.doc_freqs[term]
idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
tf_norm = (tf * (self.k1 + 1)) / (
tf + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_length)
)
score += idf * tf_norm
return score
def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]:
scores = [
(doc_id, self.score(query, doc_id))
for doc_id in self.doc_lengths
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
class VectorIndex:
"""This class implements the smart search using the hybrid search.
The index function normalize and stores the document
search implements a cosine similarity search
hybrid_search_weighted merges BM25 index and vector index using weighted average
Reciprocal_rank_fusion Combines the results in an efficient way
"""
def __init__(self):
self.documents: dict[str, np.ndarray] = {}
def index(self, documents: list[Document]) -> None:
for doc in documents:
arr = np.array(doc.embedding, dtype=np.float32)
norm = np.linalg.norm(arr)
self.documents[doc.id] = arr / norm if norm > 0 else arr
def search(self, query_embedding: list[float], top_k: int = 10) -> list[tuple[str, float]]:
q = np.array(query_embedding, dtype=np.float32)
q = q / np.linalg.norm(q)
scores = [
(doc_id, float(np.dot(q, emb)))
for doc_id, emb in self.documents.items()
]
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
def hybrid_search_weighted(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
alpha: float = 0.5,
top_k: int = 10,
) -> list[dict]:
"""Combine keyword and vector scores with a tunable weight.
alpha = 1.0 → pure vector search
alpha = 0.0 → pure keyword search
alpha = 0.5 → equal weight (good starting point)
"""
keyword_results = bm25_index.search(query, top_k=top_k * 2)
vector_results = vector_index.search(query_embedding, top_k=top_k * 2)
# Normalize (min-max) each score list to [0, 1]
def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]:
if not results:
return {}
scores = [s for _, s in results]
min_s, max_s = min(scores), max(scores)
rng = max_s - min_s
if rng == 0:
return {doc_id: 1.0 for doc_id, _ in results}
return {doc_id: (s - min_s) / rng for doc_id, s in results}
keyword_scores = normalize_scores(keyword_results)
vector_scores = normalize_scores(vector_results)
# Merge
all_doc_ids = set(keyword_scores) | set(vector_scores)
combined = []
for doc_id in all_doc_ids:
ks = keyword_scores.get(doc_id, 0.0)
vs = vector_scores.get(doc_id, 0.0)
combined.append({
"id": doc_id,
"score": alpha * vs + (1 - alpha) * ks,
"keyword_score": ks,
"vector_score": vs,
})
combined.sort(key=lambda x: x["score"], reverse=True)
return combined[:top_k]
def reciprocal_rank_fusion(
*ranked_lists: list[tuple[str, float]],
k: int = 60,
top_n: int = 10,
) -> list[dict]:
"""
Merge multiple ranked lists, uses RRF (Reciprocal Rank Fusion)
RRF score = sum over all lists of: 1 / (k + rank)
Why RRF over weighted combination?
- No score normalization needed (works on ranks, not raw scores)
- No alpha tuning needed
- Robust across different score distributions
- Used by Elasticsearch, Pinecone, Weaviate under the hood
"""
rrf_scores: dict[str, float] = defaultdict(float)
doc_details: dict[str, dict] = {}
for list_idx, ranked_list in enumerate(ranked_lists):
for rank, (doc_id, raw_score) in enumerate(ranked_list, start=1):
rrf_scores[doc_id] += 1.0 / (k + rank)
if doc_id not in doc_details:
doc_details[doc_id] = {}
doc_details[doc_id][f"list_{list_idx}_rank"] = rank
doc_details[doc_id][f"list_{list_idx}_score"] = raw_score
results = []
for doc_id, rrf_score in rrf_scores.items():
results.append({
"id": doc_id,
"rrf_score": round(rrf_score, 6),
**doc_details[doc_id],
})
results.sort(key=lambda x: x["rrf_score"], reverse=True)
return results[:top_n]
def hybrid_search_rrf(
query: str,
query_embedding: list[float],
bm25_index: BestMatching25Index,
vector_index: VectorIndex,
top_k: int = 10,
) -> list[dict]:
keyword_results = bm25_index.search(query, top_k=top_k * 2)
vector_results = vector_index.search(query_embedding, top_k=top_k * 2)
return reciprocal_rank_fusion(keyword_results, vector_results, top_n=top_k)
Tip: I recommend mixed retrieval based on both keywords and embedding for faster discovery. Keyword retrieval is good for specific words (“Password Policy”). Embedding is better for concept-based or description-based matches. LlamaIndex is great for mixed retrieval, where it can search for specific words and context in a query.
6. Stop the automatic renewal and renewal schedule
The last step is to make sure you keep the knowledge base up to date. To do this, you can use selective forgetting. It is the process of overwriting or removing outdated and redundant data to keep the model accurate.
How can you find out what data to delete? There are rating and recognition platforms to help. You can program test rules/questions in the DeepEval framework that constantly checks if your AI model is accurate. If the answers are incorrect, the TruLens platform helps you get to the exact part where this answer was selected.
"""
Knowledge Base Quality Monitoring
Knowledge base health with the help of automated checks:
1. Retrieval quality — is it finding the right documents?
2. Freshness detection — Are documents stale or embeddings drifting?
3. Unified pipeline — Scheduled monitoring with alerts
"""
import time
import json
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Any, Callable
import numpy as np
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("kb_monitor")
def setup_deepeval_metrics():
"""Define retrieval quality metrics using DeepEval.
DeepEval provides LLM-evaluated metrics — it uses a judge LLM to score
whether retrieved context actually helps answer the question.
"""
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
from deepeval.test_case import LLMTestCase
metrics = {
# Does the answer address the question?
"relevancy": AnswerRelevancyMetric(threshold=0.7),
# Is the answer grounded in the retrieved context (no hallucination)?
"faithfulness": FaithfulnessMetric(threshold=0.7),
# Are the top-ranked retrieved docs actually relevant?
"context_precision": ContextualPrecisionMetric(threshold=0.7),
# Did we retrieve all the docs needed to answer?
"context_recall": ContextualRecallMetric(threshold=0.7),
}
return metrics, LLMTestCase
def evaluate_retrieval_quality(
rag_pipeline: Callable,
test_cases: list[dict],
) -> list[dict]:
"""Run a set of test queries through your RAG pipeline and score them.
Each test case should have:
- query: the user question
- expected_answer: ground truth answer (for recall/relevancy)
"""
from deepeval import evaluate
from deepeval.test_case import LLMTestCase
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
)
results = []
for tc in test_cases:
# Run your actual RAG pipeline
response = rag_pipeline(tc["query"])
test_case = LLMTestCase(
input=tc["query"],
actual_output=response["answer"],
expected_output=tc["expected_answer"],
retrieval_context=response["retrieved_contexts"],
)
metrics = [
AnswerRelevancyMetric(threshold=0.7),
FaithfulnessMetric(threshold=0.7),
ContextualPrecisionMetric(threshold=0.7),
ContextualRecallMetric(threshold=0.7),
]
for metric in metrics:
metric.measure(test_case)
results.append({
"query": tc["query"],
"scores": {m.__class__.__name__: m.score for m in metrics},
"passed": all(m.is_successful() for m in metrics),
})
return results
def setup_trulens_monitoring(rag_pipeline: Callable, app_name: str = "my_kb"):
"""Wrap your RAG pipeline with TruLens for continuous feedback logging.
TruLens records every query + response + retrieved context, then
runs feedback functions asynchronously to score each interaction.
"""
from trulens.core import TruSession, Feedback, Select
from trulens.providers.openai import OpenAI as TruLensOpenAI
from trulens.apps.custom import TruCustomApp, instrument
session = TruSession()
# Feedback provider (uses an LLM to judge quality)
provider = TruLensOpenAI()
feedbacks = [
# Is the response relevant to the query?
Feedback(provider.relevance)
.on_input()
.on_output(),
# Is the response grounded in retrieved context?
Feedback(provider.groundedness_measure_with_cot_reasons)
.on(Select.RecordCalls.retrieve.rets)
.on_output(),
# Is the retrieved context relevant to the query?
Feedback(provider.context_relevance)
.on_input()
.on(Select.RecordCalls.retrieve.rets),
]
# Wrap your pipeline — every call is now logged and scored
@instrument
class InstrumentedRAG:
def __init__(self, pipeline):
self._pipeline = pipeline
@instrument
def retrieve(self, query: str) -> list[str]:
result = self._pipeline(query)
return result["retrieved_contexts"]
@instrument
def query(self, query: str) -> str:
result = self._pipeline(query)
return result["answer"]
instrumented = InstrumentedRAG(rag_pipeline)
tru_app = TruCustomApp(
instrumented,
app_name=app_name,
feedbacks=feedbacks,
)
return tru_app, session
def get_trulens_dashboard_url(session) -> str:
"""Launch the TruLens dashboard to visualize quality over time."""
session.run_dashboard(port=8501)
return "
@dataclass
class DocumentFreshness:
doc_id: str
last_updated: datetime
last_embedded: datetime
source_hash: str # hash of source content at embedding time
class FreshnessMonitor:
"""Detect stale documents and embedding drift."""
def __init__(self, staleness_threshold_days: int = 30):
self.threshold = timedelta(days=staleness_threshold_days)
self.freshness_records: dict[str, DocumentFreshness] = {}
def register(self, doc_id: str, source_hash: str) -> None:
now = datetime.utcnow()
self.freshness_records[doc_id] = DocumentFreshness(
doc_id=doc_id,
last_updated=now,
last_embedded=now,
source_hash=source_hash,
)
def check_staleness(self) -> dict:
"""Find documents that haven't been re-embedded recently."""
now = datetime.utcnow()
stale, fresh = [], []
for doc_id, record in self.freshness_records.items():
age = now - record.last_embedded
if age > self.threshold:
stale.append({"id": doc_id, "days_stale": age.days})
else:
fresh.append(doc_id)
return {
"total": len(self.freshness_records),
"fresh": len(fresh),
"stale": len(stale),
"stale_documents": stale,
}
def check_content_drift(
self, doc_id: str, current_source_hash: str
) -> bool:
"""Check if source content changed since last embedding."""
record = self.freshness_records.get(doc_id)
if not record:
return True # unknown doc, treat as drifted
return record.source_hash != current_source_hash
def detect_embedding_drift(
old_embeddings: dict[str, list[float]],
new_embeddings: dict[str, list[float]],
drift_threshold: float = 0.1,
) -> dict:
"""Compare old vs new embeddings for the same documents.
If your embedding model gets updated (or you switch models),
existing vectors may no longer be compatible. This detects that.
"""
drifted = []
common_ids = set(old_embeddings) & set(new_embeddings)
for doc_id in common_ids:
old = np.array(old_embeddings[doc_id])
new = np.array(new_embeddings[doc_id])
# cosine distance: 0 = identical, 2 = opposite
cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new))
cos_dist = 1 - cos_sim
if cos_dist > drift_threshold:
drifted.append({
"id": doc_id,
"cosine_distance": round(float(cos_dist), 4),
})
return {
"documents_compared": len(common_ids),
"drifted": len(drifted),
"drift_threshold": drift_threshold,
"drifted_documents": sorted(drifted, key=lambda x: x["cosine_distance"], reverse=True),
}
Using DeepEval in conjunction with TruLens enables periodic evaluation of your knowledge base.
Top challenges in building a knowledge base (+ solutions)
Here are some common problems I've seen with the knowledge base:
1. Increasing data quality errors
AI models built over the years, even by well-known companies with strong teams, are deceptive. Air Canada's famous chatbot error is one example where a model promised a refund to a customer against a policy that never existed.
While all developers are trying to put the right content in the knowledge base, the output is still problematic. In my experience, lack of background knowledge creates mistakes in determining what is right. Take off the tech hat and put on the domain cap to identify outdated, conflicting, and irrelevant information in your knowledge base.
2. Slowness in recovery
An AI model that gives the right answer is not enough. Users hate loading or slowness and want answers instantly, at least on the machine.
Developers tend to stick to functionality and don't prioritize the development part, which is completely negotiable. Use the following tips to solve common slow motion problems:
- Follow the HNSW (Hierarchical Navigable Small World) or IVF indexes instead of the flat indexes, as these groups the topics together so you can find them quickly.
- Perform parsing (reducing the transformed vectors in queries to take up less memory) or splitting repeated characters (breaking them into snippets) of queries to take up less memory
- Store your database and AI service in the same cloud environment for faster access.
3. A smooth descent
To rush implementation developers often make poor design decisions that affect scalability over time. One such problem follows a monolithic architecture where all data storage and query processing occurs in a single, tightly coupled cluster. As the model usage increases, the CPU/RAM usage increases significantly complete a collection of all questions. I suggest horizontal partitioning (splitting data across multiple smaller servers) to manage scale effectively.
Another problem is the increasing cost of scaling, which usually happens if you don't shrink or compress vectors to optimize storage. Engineers missed the measurement step to get to the model faster. The downside is not apparent at first, but soon the slow and growing debt cloud shows the gap.
A knowledge base is not a data dump but a curated asset
Building a knowledge base is not a one-time project. It is a dynamic asset that requires regular development. The structure you create today will reveal gaps tomorrow. Every failed question is an answer and each successful retrieval confirms your design choice.
I suggest starting small, choosing ten common model questions, creating clear scripts, and then testing whether your model can provide the right answers in the right time. Once you start getting the expected output, you can repeat the process to expand the knowledge base.
The difference between a predictive model and a cognitive one comes down to this deliberate maintenance work. Continuous refinement makes subsequent searches easier and results more reliable.



