Machine Learning

How to Build an Effective Knowledge Base for AI Models

only as strong as their knowledge base. An accurate and curated knowledge base improves both model speed and accuracy—areas where current models often fail. In fact, recent research shows that big AI debates are wrong on almost every secondary question.

In this article, I will cover how to build a knowledge base with detailed steps and mistakes to avoid.

6 steps to building an effective knowledge base

Steps to build a knowledge base | Photo by the author

Taking a systematic approach to building a knowledge base helps you create one that is standardized, scalable, and self-explanatory. Any new developer can easily add to or update the knowledge base over time to keep it current and reliable.

To make sure you get there, you can follow these six steps whenever you start building a knowledge base:

1. Collect data

The main misconception about collecting database data is thinking that more is better. It makes you fall into the classic “garbage in, garbage out” issue.

Prioritize value over volume and collect all the data relevant to your model. It can be in the form of:

  • Authentic and educational content that includes facts and practices
  • Problem-solving content in the form of informative text or videos
  • Historical data showing past problems or log file
  • Real-time data covering live system status or the latest news feeds
  • Model domain data for more context

It is important to understand that your system does not need all the information. For example, if you are building a chatbot for customer support, your model may only need factual and educational content that explains company policy and procedures. It ensures that your model doesn't generate an invalid or out-of-range response and sticks to what you're given.

Tip: There is a growing trend to feed AI-generated data while building a knowledge base for new AI models. I feel like this trend is a double-edged sword. It offers speed, but you have to test the output for reliability and fluff. Always prepare content for quick responses and validate your output before adding it to the knowledge base.

2. Clean and segment the data

After the raw data is ready, you can clean it first. The cleaning process will usually include:

  • Removes duplicate and outdated content
  • Removing unnecessary information such as headings, footers, and page numbers
  • Content moderation, both format and content wise (consistent terms)

This cleaned data is then divided into logical chunks, where each chunk contains one clear idea or topic.

Every component is also assigned metadata that provides quick context about its content. This metadata helps AI models to browse the knowledge base quickly and quickly access the parts with relevant information.

You can also set up role-based access for episodes to ensure which roles have access to information in that episode. Although many roles may have access to the model, not everyone can access all the data. Chunking is where you can set up security and access control within the model.

Tip: The best practice I always follow is to aggregate data based on user queries instead of document structure. For example, you have an access control document. You can include it in common user questions like 'How to change password?', 'What is the password policy?', etc. You can then verify these passages by checking the actual questions. A safe set would be 10-12 questions.

3. Organize and identify the data

Text fragments are converted into numbers called vectors using an embedding model such as OpenAI v3-Large, BGE-M3, etc.

AI models can read vectors faster than a large block of text. After vectorization, the metadata attached to the chunk is then attached to the vector. The final episode will look like this:

[ Vector (numbers) ] + [ Original text ] + [ Metadata ]

4. Select a data storage platform

You can save this vector output to a vector database such as Pinecone, Milvus, or Weaviate for download. You can load vector data by writing simple python code.

  import math
  import time
  import json
  from dataclasses import dataclass, field
  from typing import Any

  import numpy as np


  # Vector Normalization + Metadata

  def normalize_l2(vector: list[float]) -> list[float]:
    """
    Return an L2-normalized copy of `vec`.
    Many vector stores use dot-product similarity. If you normalize vectors to
    unit length, dot-product becomes equivalent to cosine similarity.
    """
      arr = np.array(vector, dtype=np.float32)
      norm = np.linalg.norm(arr)
      if norm == 0:
          return vector
      return (arr / norm).tolist()


  def prepare_record(
      doc_id: str,
      embedding: list[float],
      text: str,
      source: str,
      extra_metadata: dict[str, Any] | None = None,
  ) -> dict:
      """
      Prepare a single record for vector DB upsert.
      Metadata serves two purposes:
      - Filtering: narrow down search to a subset
      """
      metadata = {
          "source": source,
          "text_preview": text[:500],
          "char_count": len(text),
      }
      if extra_metadata:
          metadata.update(extra_metadata)

      return {
          "id": doc_id,
          "values": normalize_l2(embedding),
          "metadata": metadata,
      }


# Vector Quantization

  # Scalar Quantization / SQ

  def scalar_quantization(input_vec) -> dict:
      """
      This funtion demonstrates 
        how to compress float32 input_vec to uint8
      """
      input_arr = np.array(input_vec, dtype=np.float32)
      min, max = input_arr.min(), input_arr.max()
      range = (max - min)
      if range == 0:
          quantized = np.zeros_like(arr, dtype=np.uint8)
      else:
          quantized = ((input_arr - min) / range * 255).astype(np.uint8)

      return {
          "quantized": quantized.tolist(),
          "min": float(min),
          "max": float(max),
      }


  def scalar_dequantization(record: dict) -> list[float]:
      """
      You can Reconstruct the original vector 
        by approximate float32 vector from uint8.
      """
      arr = np.array(record["quantized"], dtype=np.float32)
      return (arr / 255 * (record["max"] - record["min"]) + record["min"]).tolist()


  # Product Quantization / PQ

  def train_product_quantizer( vectors, num_subvectors: int = 8, num_centroids: int = 256, max_iterations: int = 20) -> list:
      """
      This function demonstrates 
        split vector into subvectors, cluster each independently
      """
      from sklearn.cluster import KMeans

      dim = vectors.shape[1]
      assert dim % num_subvectors == 0, "dim must be divisible by num_subvectors"
      sub_dim = dim // num_subvectors

      codebooks = []
      for i in range(num_subvectors):
          sub_vectors = vectors[:, i * sub_dim : (i + 1) * sub_dim]
          kmeans = KMeans(n_clusters=num_centroids, max_iter=max_iterations, n_init=1)
          kmeans.fit(sub_vectors)
          codebooks.append(kmeans.cluster_centers_)

      return codebooks


  def pq_encode(vector: np.ndarray, codebooks: list[np.ndarray]) -> list[int]:
      """
      Encode a single vector into PQ codes (one uint8 per subvector)
      """
      num_subvectors = len(codebooks)
      sub_dim = len(vector) // num_subvectors
      codes = []

      for i, codebook in enumerate(codebooks):
          sub_vec = vector[i * sub_dim : (i + 1) * sub_dim]
          distances = np.linalg.norm(codebook - sub_vec, axis=1)
          codes.append(int(np.argmin(distances)))

      return codes


  def pq_decode(codes: list[int], codebooks: list[np.ndarray]) -> np.ndarray:
      """
      Reconstruct approximate vector from PQ codes
      """
      return np.concatenate(
        [codebook[code] for code, codebook in zip(codes, codebooks)]
      )

Tip: To increase the loading speed, I suggest using the batch installation option. You can also normalize the vectors (make them all the same size) during the loading phase. After normalization, scale (compress) it to increase storage. This extra normalization and calibration step strengthens later retrieval.

5. Prepare for recovery

To enable retrieval from a vector database, you can use orchestration frameworks such as LlamaIndex and LangChain.

LlamaIndex can browse through the vector database quickly and get to the node where there is content related to the user's query.

LangChain then takes the data from the chunk and transforms it according to the user's query. For example, summarizing a text or writing an email in it.

"""                                                                                                                             
  Hybrid Retrieval: Take benefits from both keyword search and vector similarity                                                     
                                                                                                                                  
  Where each approach shines:                                                                                  
  - Keywords: looks for exact matches, but will miss searches with synonym
  - Embeddings: has advantage of capturing the meaning, but there is possibility of missing exact keyword
  Hybrid is a combination of both to get the best of each.
  """

  import math
  from collections import defaultdict
  from dataclasses import dataclass
  import numpy as np

  @dataclass
  class Document:
      id: str
      text: str
      embedding: list[float]


  class BestMatching25Index:
      def __init__(self, k1: float = 1.5, b: float = 0.75):
          # Here k1 is the term frequency saturation limit 
          # and b is length of normalization
          self.k1 = k1
          self.b = b
          self.doc_lengths: dict[str, int] = {}
          self.avg_doc_length: float = 0
          self.doc_freqs: dict[str, int] = {} 
          self.term_freqs: dict[str, dict[str, int]] = {} 
          self.corpus_size: int = 0

      def _tokenize(self, text: str) -> list[str]:
          return text.lower().split()

      def index(self, documents: list[Document]) -> None:
          self.corpus_size = len(documents)

          for doc in documents:
              tokens = self._tokenize(doc.text)
              self.doc_lengths[doc.id] = len(tokens)
              self.term_freqs[doc.id] = {}

              seen_terms: set[str] = set()
              for token in tokens:
                  self.term_freqs[doc.id][token] = self.term_freqs[doc.id].get(token, 0) + 1
                  if token not in seen_terms:
                      self.doc_freqs[token] = self.doc_freqs.get(token, 0) + 1
                      seen_terms.add(token)

          self.avg_doc_length = sum(self.doc_lengths.values()) / self.corpus_size

      def score(self, query: str, doc_id: str) -> float:
          query_terms = self._tokenize(query)
          doc_len = self.doc_lengths[doc_id]
          score = 0.0

          for term in query_terms:
              if term not in self.doc_freqs or term not in self.term_freqs.get(doc_id, {}):
                  continue

              tf = self.term_freqs[doc_id][term]
              df = self.doc_freqs[term]
              idf = math.log((self.corpus_size - df + 0.5) / (df + 0.5) + 1)
              tf_norm = (tf * (self.k1 + 1)) / (
                  tf + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_length)
              )
              score += idf * tf_norm

          return score

      def search(self, query: str, top_k: int = 10) -> list[tuple[str, float]]:
          scores = [
              (doc_id, self.score(query, doc_id))
              for doc_id in self.doc_lengths
          ]
          scores.sort(key=lambda x: x[1], reverse=True)
          return scores[:top_k]


  class VectorIndex:
      """This class implements the smart search using the hybrid search.
         The index function normalize and stores the document
         search implements a cosine similarity search
        hybrid_search_weighted merges BM25 index and vector index using weighted average
       Reciprocal_rank_fusion Combines the results in an efficient way
     """

      def __init__(self):
          self.documents: dict[str, np.ndarray] = {}

      def index(self, documents: list[Document]) -> None:
          for doc in documents:
              arr = np.array(doc.embedding, dtype=np.float32)
              norm = np.linalg.norm(arr)
              self.documents[doc.id] = arr / norm if norm > 0 else arr

      def search(self, query_embedding: list[float], top_k: int = 10) -> list[tuple[str, float]]:
          q = np.array(query_embedding, dtype=np.float32)
          q = q / np.linalg.norm(q)

          scores = [
              (doc_id, float(np.dot(q, emb)))
              for doc_id, emb in self.documents.items()
          ]
          scores.sort(key=lambda x: x[1], reverse=True)
          return scores[:top_k]

  def hybrid_search_weighted(
      query: str,
      query_embedding: list[float],
      bm25_index: BestMatching25Index,
      vector_index: VectorIndex,
      alpha: float = 0.5,
      top_k: int = 10,
  ) -> list[dict]:
      """Combine keyword and vector scores with a tunable weight.

      alpha = 1.0 → pure vector search
      alpha = 0.0 → pure keyword search
      alpha = 0.5 → equal weight (good starting point)
      """
      keyword_results = bm25_index.search(query, top_k=top_k * 2)
      vector_results = vector_index.search(query_embedding, top_k=top_k * 2)

      # Normalize (min-max) each score list to [0, 1]
      def normalize_scores(results: list[tuple[str, float]]) -> dict[str, float]:
          if not results:
              return {}
          scores = [s for _, s in results]
          min_s, max_s = min(scores), max(scores)
          rng = max_s - min_s
          if rng == 0:
              return {doc_id: 1.0 for doc_id, _ in results}
          return {doc_id: (s - min_s) / rng for doc_id, s in results}

      keyword_scores = normalize_scores(keyword_results)
      vector_scores = normalize_scores(vector_results)

      # Merge
      all_doc_ids = set(keyword_scores) | set(vector_scores)
      combined = []
      for doc_id in all_doc_ids:
          ks = keyword_scores.get(doc_id, 0.0)
          vs = vector_scores.get(doc_id, 0.0)
          combined.append({
              "id": doc_id,
              "score": alpha * vs + (1 - alpha) * ks,
              "keyword_score": ks,
              "vector_score": vs,
          })

      combined.sort(key=lambda x: x["score"], reverse=True)
      return combined[:top_k]

  def reciprocal_rank_fusion(
      *ranked_lists: list[tuple[str, float]],
      k: int = 60,
      top_n: int = 10,
  ) -> list[dict]:
      """
     Merge multiple ranked lists,  uses RRF (Reciprocal Rank Fusion)

      RRF score = sum over all lists of: 1 / (k + rank)

      Why RRF over weighted combination?
      - No score normalization needed (works on ranks, not raw scores)
      - No alpha tuning needed
      - Robust across different score distributions
      - Used by Elasticsearch, Pinecone, Weaviate under the hood
      """
      rrf_scores: dict[str, float] = defaultdict(float)
      doc_details: dict[str, dict] = {}

      for list_idx, ranked_list in enumerate(ranked_lists):
          for rank, (doc_id, raw_score) in enumerate(ranked_list, start=1):
              rrf_scores[doc_id] += 1.0 / (k + rank)
              if doc_id not in doc_details:
                  doc_details[doc_id] = {}
              doc_details[doc_id][f"list_{list_idx}_rank"] = rank
              doc_details[doc_id][f"list_{list_idx}_score"] = raw_score

      results = []
      for doc_id, rrf_score in rrf_scores.items():
          results.append({
              "id": doc_id,
              "rrf_score": round(rrf_score, 6),
              **doc_details[doc_id],
          })

      results.sort(key=lambda x: x["rrf_score"], reverse=True)
      return results[:top_n]


  def hybrid_search_rrf(
      query: str,
      query_embedding: list[float],
      bm25_index: BestMatching25Index,
      vector_index: VectorIndex,
      top_k: int = 10,
  ) -> list[dict]:
      keyword_results = bm25_index.search(query, top_k=top_k * 2)
      vector_results = vector_index.search(query_embedding, top_k=top_k * 2)

      return reciprocal_rank_fusion(keyword_results, vector_results, top_n=top_k)

Tip: I recommend mixed retrieval based on both keywords and embedding for faster discovery. Keyword retrieval is good for specific words (“Password Policy”). Embedding is better for concept-based or description-based matches. LlamaIndex is great for mixed retrieval, where it can search for specific words and context in a query.

6. Stop the automatic renewal and renewal schedule

The last step is to make sure you keep the knowledge base up to date. To do this, you can use selective forgetting. It is the process of overwriting or removing outdated and redundant data to keep the model accurate.

How can you find out what data to delete? There are rating and recognition platforms to help. You can program test rules/questions in the DeepEval framework that constantly checks if your AI model is accurate. If the answers are incorrect, the TruLens platform helps you get to the exact part where this answer was selected.

 """                                                                                                                             
  Knowledge Base Quality Monitoring                                                                                               
                                                                                                                                  
  Knowledge base health with the help of automated checks:                                                                                     
  1. Retrieval quality — is it finding the right documents?
  2. Freshness detection — Are documents stale or embeddings drifting?
  3. Unified pipeline — Scheduled monitoring with alerts
  """

  import time
  import json
  import logging
  from datetime import datetime, timedelta
  from dataclasses import dataclass, field
  from typing import Any, Callable

  import numpy as np

  logging.basicConfig(level=logging.INFO)
  logger = logging.getLogger("kb_monitor")


    def setup_deepeval_metrics():
      """Define retrieval quality metrics using DeepEval.

      DeepEval provides LLM-evaluated metrics — it uses a judge LLM to score
      whether retrieved context actually helps answer the question.
      """
      from deepeval.metrics import (
          AnswerRelevancyMetric,
          FaithfulnessMetric,
          ContextualPrecisionMetric,
          ContextualRecallMetric,
      )
      from deepeval.test_case import LLMTestCase

      metrics = {
          # Does the answer address the question?
          "relevancy": AnswerRelevancyMetric(threshold=0.7),
          # Is the answer grounded in the retrieved context (no hallucination)?
          "faithfulness": FaithfulnessMetric(threshold=0.7),
          # Are the top-ranked retrieved docs actually relevant?
          "context_precision": ContextualPrecisionMetric(threshold=0.7),
          # Did we retrieve all the docs needed to answer?
          "context_recall": ContextualRecallMetric(threshold=0.7),
      }

      return metrics, LLMTestCase


  def evaluate_retrieval_quality(
      rag_pipeline: Callable,
      test_cases: list[dict],
  ) -> list[dict]:
      """Run a set of test queries through your RAG pipeline and score them.

      Each test case should have:
      - query: the user question
      - expected_answer: ground truth answer (for recall/relevancy)
      """
      from deepeval import evaluate
      from deepeval.test_case import LLMTestCase
      from deepeval.metrics import (
          AnswerRelevancyMetric,
          FaithfulnessMetric,
          ContextualPrecisionMetric,
          ContextualRecallMetric,
      )

      results = []

      for tc in test_cases:
          # Run your actual RAG pipeline
          response = rag_pipeline(tc["query"])

          test_case = LLMTestCase(
              input=tc["query"],
              actual_output=response["answer"],
              expected_output=tc["expected_answer"],
              retrieval_context=response["retrieved_contexts"],
          )

          metrics = [
              AnswerRelevancyMetric(threshold=0.7),
              FaithfulnessMetric(threshold=0.7),
              ContextualPrecisionMetric(threshold=0.7),
              ContextualRecallMetric(threshold=0.7),
          ]

          for metric in metrics:
              metric.measure(test_case)

          results.append({
              "query": tc["query"],
              "scores": {m.__class__.__name__: m.score for m in metrics},
              "passed": all(m.is_successful() for m in metrics),
          })

      return results


  def setup_trulens_monitoring(rag_pipeline: Callable, app_name: str = "my_kb"):
      """Wrap your RAG pipeline with TruLens for continuous feedback logging.

      TruLens records every query + response + retrieved context, then
      runs feedback functions asynchronously to score each interaction.
      """
      from trulens.core import TruSession, Feedback, Select
      from trulens.providers.openai import OpenAI as TruLensOpenAI
      from trulens.apps.custom import TruCustomApp, instrument

      session = TruSession()

      # Feedback provider (uses an LLM to judge quality)
      provider = TruLensOpenAI()

      feedbacks = [
          # Is the response relevant to the query?
          Feedback(provider.relevance)
          .on_input()
          .on_output(),

          # Is the response grounded in retrieved context?
          Feedback(provider.groundedness_measure_with_cot_reasons)
          .on(Select.RecordCalls.retrieve.rets)
          .on_output(),

          # Is the retrieved context relevant to the query?
          Feedback(provider.context_relevance)
          .on_input()
          .on(Select.RecordCalls.retrieve.rets),
      ]

      # Wrap your pipeline — every call is now logged and scored
      @instrument
      class InstrumentedRAG:
          def __init__(self, pipeline):
              self._pipeline = pipeline

          @instrument
          def retrieve(self, query: str) -> list[str]:
              result = self._pipeline(query)
              return result["retrieved_contexts"]

          @instrument
          def query(self, query: str) -> str:
              result = self._pipeline(query)
              return result["answer"]

      instrumented = InstrumentedRAG(rag_pipeline)

      tru_app = TruCustomApp(
          instrumented,
          app_name=app_name,
          feedbacks=feedbacks,
      )

      return tru_app, session


  def get_trulens_dashboard_url(session) -> str:
      """Launch the TruLens dashboard to visualize quality over time."""
      session.run_dashboard(port=8501)
      return "

  @dataclass
  class DocumentFreshness:
      doc_id: str
      last_updated: datetime
      last_embedded: datetime
      source_hash: str  # hash of source content at embedding time


  class FreshnessMonitor:
      """Detect stale documents and embedding drift."""

      def __init__(self, staleness_threshold_days: int = 30):
          self.threshold = timedelta(days=staleness_threshold_days)
          self.freshness_records: dict[str, DocumentFreshness] = {}

      def register(self, doc_id: str, source_hash: str) -> None:
          now = datetime.utcnow()
          self.freshness_records[doc_id] = DocumentFreshness(
              doc_id=doc_id,
              last_updated=now,
              last_embedded=now,
              source_hash=source_hash,
          )

      def check_staleness(self) -> dict:
          """Find documents that haven't been re-embedded recently."""
          now = datetime.utcnow()
          stale, fresh = [], []

          for doc_id, record in self.freshness_records.items():
              age = now - record.last_embedded
              if age > self.threshold:
                  stale.append({"id": doc_id, "days_stale": age.days})
              else:
                  fresh.append(doc_id)

          return {
              "total": len(self.freshness_records),
              "fresh": len(fresh),
              "stale": len(stale),
              "stale_documents": stale,
          }

      def check_content_drift(
          self, doc_id: str, current_source_hash: str
      ) -> bool:
          """Check if source content changed since last embedding."""
          record = self.freshness_records.get(doc_id)
          if not record:
              return True  # unknown doc, treat as drifted
          return record.source_hash != current_source_hash


  def detect_embedding_drift(
      old_embeddings: dict[str, list[float]],
      new_embeddings: dict[str, list[float]],
      drift_threshold: float = 0.1,
  ) -> dict:
      """Compare old vs new embeddings for the same documents.

      If your embedding model gets updated (or you switch models),
      existing vectors may no longer be compatible. This detects that.
      """
      drifted = []
      common_ids = set(old_embeddings) & set(new_embeddings)

      for doc_id in common_ids:
          old = np.array(old_embeddings[doc_id])
          new = np.array(new_embeddings[doc_id])

          # cosine distance: 0 = identical, 2 = opposite
          cos_sim = np.dot(old, new) / (np.linalg.norm(old) * np.linalg.norm(new))
          cos_dist = 1 - cos_sim

          if cos_dist > drift_threshold:
              drifted.append({
                  "id": doc_id,
                  "cosine_distance": round(float(cos_dist), 4),
              })

      return {
          "documents_compared": len(common_ids),
          "drifted": len(drifted),
          "drift_threshold": drift_threshold,
          "drifted_documents": sorted(drifted, key=lambda x: x["cosine_distance"], reverse=True),
      }

Using DeepEval in conjunction with TruLens enables periodic evaluation of your knowledge base.

Top challenges in building a knowledge base (+ solutions)

Here are some common problems I've seen with the knowledge base:

1. Increasing data quality errors

AI models built over the years, even by well-known companies with strong teams, are deceptive. Air Canada's famous chatbot error is one example where a model promised a refund to a customer against a policy that never existed.

While all developers are trying to put the right content in the knowledge base, the output is still problematic. In my experience, lack of background knowledge creates mistakes in determining what is right. Take off the tech hat and put on the domain cap to identify outdated, conflicting, and irrelevant information in your knowledge base.

2. Slowness in recovery

An AI model that gives the right answer is not enough. Users hate loading or slowness and want answers instantly, at least on the machine.

Developers tend to stick to functionality and don't prioritize the development part, which is completely negotiable. Use the following tips to solve common slow motion problems:

  • Follow the HNSW (Hierarchical Navigable Small World) or IVF indexes instead of the flat indexes, as these groups the topics together so you can find them quickly.
  • Perform parsing (reducing the transformed vectors in queries to take up less memory) or splitting repeated characters (breaking them into snippets) of queries to take up less memory
  • Store your database and AI service in the same cloud environment for faster access.

3. A smooth descent

To rush implementation developers often make poor design decisions that affect scalability over time. One such problem follows a monolithic architecture where all data storage and query processing occurs in a single, tightly coupled cluster. As the model usage increases, the CPU/RAM usage increases significantly complete a collection of all questions. I suggest horizontal partitioning (splitting data across multiple smaller servers) to manage scale effectively.

Another problem is the increasing cost of scaling, which usually happens if you don't shrink or compress vectors to optimize storage. Engineers missed the measurement step to get to the model faster. The downside is not apparent at first, but soon the slow and growing debt cloud shows the gap.

A knowledge base is not a data dump but a curated asset

Building a knowledge base is not a one-time project. It is a dynamic asset that requires regular development. The structure you create today will reveal gaps tomorrow. Every failed question is an answer and each successful retrieval confirms your design choice.

I suggest starting small, choosing ten common model questions, creating clear scripts, and then testing whether your model can provide the right answers in the right time. Once you start getting the expected output, you can repeat the process to expand the knowledge base.

The difference between a predictive model and a cognitive one comes down to this deliberate maintenance work. Continuous refinement makes subsequent searches easier and results more reliable.

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button