RAG Pipeline Architecture: Chunking Strategies, Hybrid Search, Reranking, and Evaluation Frameworks
Why Most RAG Pipelines Fail
The default RAG tutorial — split documents, embed them, retrieve top-k, stuff into prompt — works for demos. It fails in production for predictable reasons:
- Chunking destroys context. Fixed-size splits break sentences mid-thought, separate headers from their content, and lose document structure.
- Vector search misses keywords. Embeddings capture semantic similarity but fail on exact entity matches, acronyms, and domain-specific terminology.
- Top-k retrieval is noisy. Retrieving 5 chunks by cosine similarity often returns 2-3 irrelevant results that dilute the prompt context.
- No evaluation means no iteration. Without systematic metrics, teams optimize by vibes and ship pipelines that degrade silently.
This article covers the architecture that addresses each failure mode.
Document Ingestion and Preprocessing
Before chunking, document preprocessing determines pipeline quality more than most teams realize.
Format Normalization
Different source formats require different extraction strategies:
# PDF extraction with layout awareness
from unstructured.partition.pdf import partition_pdf
elements = partition_pdf(
filename="report.pdf",
strategy="hi_res", # OCR + layout detection
infer_table_structure=True, # Extract tables as HTML
extract_images_in_pdf=True, # Extract embedded images
)
# Separate element types for different processing
tables = [e for e in elements if e.category == "Table"]
narratives = [e for e in elements if e.category == "NarrativeText"]
titles = [e for e in elements if e.category == "Title"]
Key decisions at this stage:
- Tables: Convert to markdown or HTML, never split across chunks. A table fragment is worse than no table.
- Code blocks: Keep intact. Splitting a function definition across chunks makes both halves useless.
- Images: Extract captions and surrounding text. For diagrams, consider vision model descriptions.
- Headers: Preserve hierarchy — they become metadata that improves retrieval.
Metadata Extraction
Attach metadata at ingestion time, not after:
chunk_metadata = {
"source": "engineering-handbook-v3.pdf",
"section": "Chapter 4: Database Operations",
"page_numbers": [42, 43],
"document_type": "handbook",
"last_updated": "2026-03-15",
"audience": "backend-engineers",
}
This metadata enables filtered retrieval — "find chunks about database operations from the engineering handbook" — which dramatically improves precision.
Chunking Strategies
Chunking is where most pipelines silently lose quality. The right strategy depends on document structure.
Fixed-Size Chunking
The baseline. Split every N tokens with M-token overlap.
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=len, # or tiktoken for token-accurate counts
)
chunks = splitter.split_text(document_text)
When it works: Uniform text without structure — transcripts, plain-text logs, chat histories.
When it fails: Structured documents where splitting at arbitrary positions destroys meaning.
Semantic Chunking
Groups sentences by embedding similarity, splitting where topics shift:
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
chunker = SemanticChunker(
OpenAIEmbeddings(model="text-embedding-3-small"),
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=85, # Split at 85th percentile of distance
)
chunks = chunker.split_text(document_text)
Semantic chunking produces variable-size chunks that respect topic boundaries. The tradeoff is speed — embedding every sentence during ingestion is 10-50x slower than fixed-size splitting.
Document-Structure-Aware Chunking
The most effective approach for structured content. Use document hierarchy to define chunk boundaries:
def structure_aware_chunk(elements, max_tokens=512):
"""Chunk by document structure, respecting headers and sections."""
chunks = []
current_chunk = []
current_tokens = 0
current_header_chain = []
for element in elements:
if element.category == "Title":
# New section — flush current chunk
if current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
current_chunk = []
current_tokens = 0
current_header_chain.append(element.text)
elif element.category == "Table":
# Tables go in their own chunk, never split
if current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
current_chunk = []
current_tokens = 0
chunks.append({
"content": element.metadata.text_as_html,
"headers": list(current_header_chain),
"token_count": count_tokens(element.text),
"type": "table",
})
else:
elem_tokens = count_tokens(element.text)
if current_tokens + elem_tokens > max_tokens and current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
current_chunk = []
current_tokens = 0
current_chunk.append(element.text)
current_tokens += elem_tokens
if current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
return chunks
Contextual Chunk Headers
Prepend the section hierarchy to each chunk before embedding. This single technique improved retrieval recall by 8-12% in our benchmarks:
## Chapter 4: Database Operations > Backup Strategies > Point-in-Time Recovery
To perform point-in-time recovery, configure continuous WAL archiving...
The embedding now captures both the content and its position in the document hierarchy.
Embedding Strategy
Model Selection
The embedding model determines the ceiling of retrieval quality.
| Model | Dimensions | MTEB Score | Latency (p50) | Cost | |---|---|---|---|---| | text-embedding-3-small | 1536 | 62.3 | 12ms | $0.02/1M tokens | | text-embedding-3-large | 3072 | 64.6 | 18ms | $0.13/1M tokens | | voyage-3 | 1024 | 67.1 | 15ms | $0.06/1M tokens | | bge-m3 (self-hosted) | 1024 | 66.8 | 8ms | Infrastructure cost | | Cohere embed-v4 | 1024 | 67.5 | 14ms | $0.10/1M tokens |
Practical recommendation: For English-only workloads, voyage-3 or Cohere embed-v4 deliver the best retrieval quality. For multilingual or cost-sensitive deployments, bge-m3 self-hosted on a single GPU is hard to beat.
Late Interaction Models (ColBERT)
ColBERT produces per-token embeddings instead of a single vector, enabling more fine-grained matching:
from ragatouille import RAGPretrainedModel
rag = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
rag.index(
collection=documents,
index_name="knowledge-base",
max_document_length=512,
)
results = rag.search(query="PITR backup configuration", k=10)
ColBERT consistently outperforms single-vector models on retrieval quality but requires 10-50x more storage per document. Use it when retrieval quality is the bottleneck and storage is cheap.
Hybrid Search Architecture
Vector search alone is not enough. Combining semantic and lexical search covers each method's blind spots.
BM25 + Vector with Reciprocal Rank Fusion
from rank_bm25 import BM25Okapi
import numpy as np
def hybrid_search(query, corpus_chunks, embeddings_index, k=20, alpha=0.5):
"""
Combine BM25 and vector search using Reciprocal Rank Fusion.
alpha controls the balance: 0.5 = equal weight.
"""
# Vector search
query_embedding = embed(query)
vector_scores = embeddings_index.search(query_embedding, k=k * 2)
# BM25 search
tokenized_corpus = [chunk.split() for chunk in corpus_chunks]
bm25 = BM25Okapi(tokenized_corpus)
bm25_scores = bm25.get_scores(query.split())
bm25_top = np.argsort(bm25_scores)[-k * 2:][::-1]
# Reciprocal Rank Fusion
rrf_scores = {}
rrf_k = 60 # Standard RRF constant
for rank, doc_id in enumerate(vector_scores.ids):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + alpha / (rrf_k + rank + 1)
for rank, doc_id in enumerate(bm25_top):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + (1 - alpha) / (rrf_k + rank + 1)
# Sort by fused score
ranked = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:k]
Why Hybrid Works
Consider the query: "What is the SLA for the EU-WEST-1 Redis cluster?"
- Vector search finds chunks about SLAs and Redis clusters in general — semantically similar but potentially wrong region.
- BM25 finds chunks containing the exact string "EU-WEST-1" — lexically matched.
- Hybrid surfaces the chunk that mentions both the SLA policy and the specific region.
Database-Native Hybrid Search
PostgreSQL with pgvector 0.8+ supports both in a single query:
-- Hybrid search with pgvector + tsvector
WITH vector_results AS (
SELECT id, content, 1 - (embedding <=> $1::vector) AS vector_score
FROM documents
WHERE metadata->>'department' = 'engineering'
ORDER BY embedding <=> $1::vector
LIMIT 40
),
text_results AS (
SELECT id, content, ts_rank(search_vector, plainto_tsquery($2)) AS text_score
FROM documents
WHERE search_vector @@ plainto_tsquery($2)
AND metadata->>'department' = 'engineering'
ORDER BY text_score DESC
LIMIT 40
),
combined AS (
SELECT
COALESCE(v.id, t.id) AS id,
COALESCE(v.content, t.content) AS content,
COALESCE(v.vector_score, 0) * 0.5 + COALESCE(t.text_score, 0) * 0.5 AS hybrid_score
FROM vector_results v
FULL OUTER JOIN text_results t ON v.id = t.id
)
SELECT * FROM combined ORDER BY hybrid_score DESC LIMIT 10;
Reranking
Reranking is the highest-leverage improvement you can make to a RAG pipeline. Bi-encoder embeddings are fast but imprecise. Cross-encoder rerankers are slow but accurate. The architecture uses both:
- Retrieve 50-100 candidates with fast hybrid search (~10ms)
- Rerank the top 20-30 with a cross-encoder (~50-80ms)
- Select the top 5-8 for the LLM context
Cross-Encoder Reranking
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("BAAI/bge-reranker-v2-m3", max_length=512)
def rerank(query, candidates, top_k=5):
"""Rerank candidates using cross-encoder."""
pairs = [[query, candidate["content"]] for candidate in candidates]
scores = reranker.predict(pairs)
for i, candidate in enumerate(candidates):
candidate["rerank_score"] = float(scores[i])
ranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
return ranked[:top_k]
Cohere Rerank API
For teams that prefer managed infrastructure:
import cohere
co = cohere.ClientV2()
results = co.rerank(
model="rerank-v3.5",
query="PITR backup configuration for PostgreSQL",
documents=[chunk["content"] for chunk in candidates],
top_n=5,
return_documents=True,
)
reranked = [
{"content": r.document.text, "relevance_score": r.relevance_score}
for r in results.results
]
Impact Measurement
In our production pipeline (2.3M chunks, 10K test queries):
| Configuration | Recall@5 | MRR | Answer Relevance | |---|---|---|---| | Vector only | 0.62 | 0.54 | 0.71 | | Hybrid (BM25 + vector) | 0.74 | 0.63 | 0.78 | | Hybrid + reranker | 0.83 | 0.76 | 0.89 | | Hybrid + reranker + contextual headers | 0.87 | 0.81 | 0.91 |
The reranker alone accounts for a 12-point recall improvement. Combined with hybrid search and contextual headers, the pipeline recovers 87% of relevant chunks in the top 5 results.
Prompt Construction
After retrieval and reranking, how you construct the LLM prompt matters more than most teams realize.
Context Window Management
def build_rag_prompt(query, retrieved_chunks, max_context_tokens=6000):
"""Build prompt with token-budget-aware context injection."""
context_parts = []
token_count = 0
for chunk in retrieved_chunks:
chunk_tokens = count_tokens(chunk["content"])
if token_count + chunk_tokens > max_context_tokens:
break
context_parts.append(
f"[Source: {chunk['metadata']['source']}, "
f"Section: {chunk['metadata'].get('section', 'N/A')}]\n"
f"{chunk['content']}"
)
token_count += chunk_tokens
context = "\n\n---\n\n".join(context_parts)
return f"""Answer the following question based on the provided context.
If the context does not contain enough information to answer fully, say so explicitly.
Cite the source document for each claim.
Context:
{context}
Question: {query}
Answer:"""
Citation Grounding
Force the model to cite sources by requiring inline references:
For each factual claim in your answer, include a citation in the format [Source: filename, Section: section_name].
If you cannot find supporting evidence in the context for a claim, prefix it with [Unsupported].
This makes hallucination detection trivial — any claim marked [Unsupported] or missing a citation is suspect.
Evaluation Framework
A RAG pipeline without evaluation is a demo, not a product. Evaluation happens at three levels.
Level 1: Retrieval Quality
Measured independently from the LLM:
def evaluate_retrieval(test_queries, ground_truth, retriever, k=5):
"""Evaluate retrieval with standard IR metrics."""
recalls, mrrs, ndcgs = [], [], []
for query, relevant_ids in zip(test_queries, ground_truth):
retrieved = retriever.search(query, k=k)
retrieved_ids = [r["id"] for r in retrieved]
# Recall@k
hits = len(set(retrieved_ids) & set(relevant_ids))
recalls.append(hits / len(relevant_ids))
# MRR
for rank, rid in enumerate(retrieved_ids, 1):
if rid in relevant_ids:
mrrs.append(1.0 / rank)
break
else:
mrrs.append(0.0)
return {
"recall@k": np.mean(recalls),
"mrr": np.mean(mrrs),
}
Level 2: Generation Quality with RAGAS
RAGAS evaluates the full pipeline — retrieval and generation together:
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
results = evaluate(
dataset=eval_dataset, # Questions + ground truth answers + contexts
metrics=[
faithfulness, # Does the answer stick to retrieved context?
answer_relevancy, # Is the answer relevant to the question?
context_precision, # Are retrieved chunks relevant?
context_recall, # Were all necessary chunks retrieved?
],
)
print(results)
# {'faithfulness': 0.89, 'answer_relevancy': 0.91,
# 'context_precision': 0.82, 'context_recall': 0.87}
Faithfulness is the most important metric — it measures hallucination. A faithfulness score below 0.85 means the pipeline is generating claims not supported by retrieved context.
Level 3: Human Evaluation
Automate collection, evaluate manually:
def sample_for_human_eval(production_queries, n=200):
"""Sample queries stratified by difficulty and topic."""
# Classify queries by estimated difficulty
easy = [q for q in production_queries if q["retrieval_confidence"] > 0.8]
medium = [q for q in production_queries if 0.5 < q["retrieval_confidence"] <= 0.8]
hard = [q for q in production_queries if q["retrieval_confidence"] <= 0.5]
sample = (
random.sample(easy, min(80, len(easy))) +
random.sample(medium, min(80, len(medium))) +
random.sample(hard, min(40, len(hard)))
)
return sample
Human evaluators rate on three axes: correctness (factually accurate), completeness (covers the question fully), and groundedness (cites context, doesn't hallucinate).
Production Architecture
Pipeline Overview
Documents → Preprocessing → Chunking → Embedding → Vector Store
↓
User Query → Query Expansion → Hybrid Search → Reranker → Prompt Builder → LLM → Response
↓
Evaluation Logger
Query Expansion
Rewrite the user query before retrieval to improve recall:
def expand_query(query, llm):
"""Generate alternative phrasings to improve retrieval."""
expansion_prompt = f"""Generate 3 alternative phrasings of this search query.
Each should capture the same intent but use different terminology.
Return only the queries, one per line.
Query: {query}"""
alternatives = llm.generate(expansion_prompt).strip().split("\n")
return [query] + alternatives[:3]
Run hybrid search on all expanded queries and merge results with RRF. This consistently adds 3-5% recall improvement.
Caching Layer
Cache at two levels to reduce latency and cost:
- Embedding cache: Hash the input text, cache the embedding vector. Avoids re-embedding identical queries.
- Semantic cache: For queries with cosine similarity > 0.95 to a cached query, return the cached response. Reduces LLM calls by 20-40% in support use cases where similar questions recur.
Monitoring
Track these metrics in production:
- Retrieval latency (p50, p95, p99) — target < 100ms for hybrid + rerank
- Faithfulness score on a rolling sample — alert if it drops below 0.85
- Empty retrieval rate — queries where no chunk scores above the relevance threshold
- User feedback signals — thumbs up/down, copy events, follow-up questions
When RAG Is Not Enough
RAG has well-defined limits:
- Multi-hop reasoning: When the answer requires synthesizing information from 5+ documents with inferential steps between them, RAG retrieval often misses intermediate documents.
- Temporal reasoning: "What changed between Q3 and Q4?" requires retrieving and comparing two time-specific document sets — standard top-k retrieval is not designed for this.
- Computation over data: "What is the average SLA across all regions?" requires structured query, not text retrieval.
For these cases, consider agentic RAG — an agent that plans retrieval steps, executes multiple searches, and synthesizes results programmatically — or hybrid approaches that combine RAG with SQL queries and API calls.