Architettura delle Pipeline RAG: Strategie di Chunking, Ricerca Ibrida, Reranking e Framework di Valutazione
Perché la Maggior Parte delle Pipeline RAG Fallisce
Il tutorial RAG predefinito — dividere documenti, generare embedding, recuperare top-k, inserire nel prompt — funziona per le demo. Fallisce in produzione per ragioni prevedibili:
- Il chunking distrugge il contesto. I tagli a dimensione fissa spezzano le frasi a metà pensiero, separano le intestazioni dal contenuto e perdono la struttura del documento.
- La ricerca vettoriale manca le keyword. Gli embedding catturano la similarità semantica ma falliscono sulle corrispondenze esatte di entità, acronimi e terminologia specifica del dominio.
- Il retrieval top-k è rumoroso. Recuperare 5 chunk per similarità coseno spesso restituisce 2-3 risultati irrilevanti che diluiscono il contesto del prompt.
- Senza valutazione non c'è iterazione. Senza metriche sistematiche, i team ottimizzano a intuito e rilasciano pipeline che si degradano silenziosamente.
Questo articolo copre l'architettura che affronta ciascuna di queste modalità di fallimento.
Ingestione e Pre-elaborazione dei Documenti
Prima del chunking, la pre-elaborazione dei documenti determina la qualità della pipeline più di quanto la maggior parte dei team creda.
Normalizzazione dei Formati
Diversi formati sorgente richiedono diverse strategie di estrazione:
# Estrazione PDF con rilevamento del layout
from unstructured.partition.pdf import partition_pdf
elements = partition_pdf(
filename="report.pdf",
strategy="hi_res", # OCR + rilevamento layout
infer_table_structure=True, # Estrarre tabelle come HTML
extract_images_in_pdf=True, # Estrarre immagini incorporate
)
# Separare i tipi di elementi per elaborazione differenziata
tables = [e for e in elements if e.category == "Table"]
narratives = [e for e in elements if e.category == "NarrativeText"]
titles = [e for e in elements if e.category == "Title"]
Decisioni chiave in questa fase:
- Tabelle: Convertire in markdown o HTML, mai dividere tra chunk. Un frammento di tabella è peggio di nessuna tabella.
- Blocchi di codice: Mantenere intatti. Dividere la definizione di una funzione tra chunk rende entrambe le metà inutili.
- Immagini: Estrarre didascalie e testo circostante. Per i diagrammi, considerare descrizioni con modelli di visione.
- Intestazioni: Preservare la gerarchia — diventano metadati che migliorano il retrieval.
Estrazione dei Metadati
Allegare metadati al momento dell'ingestione, non dopo:
chunk_metadata = {
"source": "engineering-handbook-v3.pdf",
"section": "Capitolo 4: Operazioni Database",
"page_numbers": [42, 43],
"document_type": "handbook",
"last_updated": "2026-03-15",
"audience": "backend-engineers",
}
Questi metadati abilitano il retrieval filtrato — "trovare chunk sulle operazioni database dall'handbook di ingegneria" — il che migliora drasticamente la precisione.
Strategie di Chunking
Il chunking è dove la maggior parte delle pipeline perde qualità silenziosamente. La strategia corretta dipende dalla struttura del documento.
Chunking a Dimensione Fissa
La baseline. Dividere ogni N token con M token di overlap.
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=len, # o tiktoken per conteggio token preciso
)
chunks = splitter.split_text(document_text)
Quando funziona: Testo uniforme senza struttura — trascrizioni, log in testo semplice, cronologie chat.
Quando fallisce: Documenti strutturati dove dividere in posizioni arbitrarie distrugge il significato.
Chunking Semantico
Raggruppa le frasi per similarità degli embedding, dividendo dove i temi cambiano:
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
chunker = SemanticChunker(
OpenAIEmbeddings(model="text-embedding-3-small"),
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=85, # Dividere al percentile 85 della distanza
)
chunks = chunker.split_text(document_text)
Il chunking semantico produce chunk di dimensione variabile che rispettano i confini tematici. Il tradeoff è la velocità — generare embedding di ogni frase durante l'ingestione è 10-50x più lento della divisione a dimensione fissa.
Chunking Consapevole della Struttura del Documento
L'approccio più efficace per contenuti strutturati. Usa la gerarchia del documento per definire i confini dei chunk:
def structure_aware_chunk(elements, max_tokens=512):
"""Dividere per struttura del documento, rispettando intestazioni e sezioni."""
chunks = []
current_chunk = []
current_tokens = 0
current_header_chain = []
for element in elements:
if element.category == "Title":
# Nuova sezione — svuotare il chunk corrente
if current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
current_chunk = []
current_tokens = 0
current_header_chain.append(element.text)
elif element.category == "Table":
# Le tabelle vanno nel proprio chunk, mai divise
if current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
current_chunk = []
current_tokens = 0
chunks.append({
"content": element.metadata.text_as_html,
"headers": list(current_header_chain),
"token_count": count_tokens(element.text),
"type": "table",
})
else:
elem_tokens = count_tokens(element.text)
if current_tokens + elem_tokens > max_tokens and current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
current_chunk = []
current_tokens = 0
current_chunk.append(element.text)
current_tokens += elem_tokens
if current_chunk:
chunks.append({
"content": "\n".join(current_chunk),
"headers": list(current_header_chain),
"token_count": current_tokens,
})
return chunks
Intestazioni Contestuali nei Chunk
Anteporre la gerarchia delle sezioni a ogni chunk prima di generare l'embedding. Questa singola tecnica ha migliorato il recall di retrieval dell'8-12% nei nostri benchmark:
## Capitolo 4: Operazioni Database > Strategie di Backup > Recupero Point-in-Time
Per eseguire il recupero point-in-time, configurare l'archiviazione continua dei WAL...
L'embedding ora cattura sia il contenuto che la sua posizione nella gerarchia del documento.
Strategia di Embedding
Selezione del Modello
Il modello di embedding determina il tetto della qualità di retrieval.
| Modello | Dimensioni | MTEB Score | Latenza (p50) | Costo | |---|---|---|---|---| | text-embedding-3-small | 1536 | 62.3 | 12ms | $0.02/1M token | | text-embedding-3-large | 3072 | 64.6 | 18ms | $0.13/1M token | | voyage-3 | 1024 | 67.1 | 15ms | $0.06/1M token | | bge-m3 (self-hosted) | 1024 | 66.8 | 8ms | Costo infrastruttura | | Cohere embed-v4 | 1024 | 67.5 | 14ms | $0.10/1M token |
Raccomandazione pratica: Per workload solo in inglese, voyage-3 o Cohere embed-v4 offrono la migliore qualità di retrieval. Per deployment multilingue o sensibili ai costi, bge-m3 self-hosted su una singola GPU è difficile da battere.
Modelli a Interazione Tardiva (ColBERT)
ColBERT produce embedding per-token invece di un singolo vettore, consentendo un matching più granulare:
from ragatouille import RAGPretrainedModel
rag = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
rag.index(
collection=documents,
index_name="knowledge-base",
max_document_length=512,
)
results = rag.search(query="configurazione backup PITR", k=10)
ColBERT supera consistentemente i modelli a vettore singolo in qualità di retrieval ma richiede 10-50x più spazio per documento. Usalo quando la qualità del retrieval è il collo di bottiglia e lo storage è economico.
Architettura di Ricerca Ibrida
La ricerca vettoriale da sola non è sufficiente. Combinare ricerca semantica e lessicale copre i punti ciechi di ciascun metodo.
BM25 + Vettoriale con Reciprocal Rank Fusion
from rank_bm25 import BM25Okapi
import numpy as np
def hybrid_search(query, corpus_chunks, embeddings_index, k=20, alpha=0.5):
"""
Combinare BM25 e ricerca vettoriale usando Reciprocal Rank Fusion.
alpha controlla il bilanciamento: 0.5 = peso uguale.
"""
# Ricerca vettoriale
query_embedding = embed(query)
vector_scores = embeddings_index.search(query_embedding, k=k * 2)
# Ricerca BM25
tokenized_corpus = [chunk.split() for chunk in corpus_chunks]
bm25 = BM25Okapi(tokenized_corpus)
bm25_scores = bm25.get_scores(query.split())
bm25_top = np.argsort(bm25_scores)[-k * 2:][::-1]
# Reciprocal Rank Fusion
rrf_scores = {}
rrf_k = 60 # Costante RRF standard
for rank, doc_id in enumerate(vector_scores.ids):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + alpha / (rrf_k + rank + 1)
for rank, doc_id in enumerate(bm25_top):
rrf_scores[doc_id] = rrf_scores.get(doc_id, 0) + (1 - alpha) / (rrf_k + rank + 1)
# Ordinare per score fuso
ranked = sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)
return ranked[:k]
Perché la Ricerca Ibrida Funziona
Considera la query: "Qual è l'SLA del cluster Redis in EU-WEST-1?"
- Ricerca vettoriale trova chunk su SLA e cluster Redis in generale — semanticamente simili ma potenzialmente della regione sbagliata.
- BM25 trova chunk contenenti la stringa esatta "EU-WEST-1" — corrispondenza lessicale.
- Ibrida porta in superficie il chunk che menziona sia la policy SLA che la regione specifica.
Ricerca Ibrida Nativa nel Database
PostgreSQL con pgvector 0.8+ supporta entrambe in una singola query:
-- Ricerca ibrida con pgvector + tsvector
WITH vector_results AS (
SELECT id, content, 1 - (embedding <=> $1::vector) AS vector_score
FROM documents
WHERE metadata->>'department' = 'engineering'
ORDER BY embedding <=> $1::vector
LIMIT 40
),
text_results AS (
SELECT id, content, ts_rank(search_vector, plainto_tsquery($2)) AS text_score
FROM documents
WHERE search_vector @@ plainto_tsquery($2)
AND metadata->>'department' = 'engineering'
ORDER BY text_score DESC
LIMIT 40
),
combined AS (
SELECT
COALESCE(v.id, t.id) AS id,
COALESCE(v.content, t.content) AS content,
COALESCE(v.vector_score, 0) * 0.5 + COALESCE(t.text_score, 0) * 0.5 AS hybrid_score
FROM vector_results v
FULL OUTER JOIN text_results t ON v.id = t.id
)
SELECT * FROM combined ORDER BY hybrid_score DESC LIMIT 10;
Reranking
Il reranking è il miglioramento con il maggiore impatto che puoi fare a una pipeline RAG. Gli embedding bi-encoder sono veloci ma imprecisi. I reranker cross-encoder sono lenti ma accurati. L'architettura usa entrambi:
- Recuperare 50-100 candidati con ricerca ibrida veloce (~10ms)
- Rerankare i top 20-30 con un cross-encoder (~50-80ms)
- Selezionare i top 5-8 per il contesto del LLM
Reranking con Cross-Encoder
from sentence_transformers import CrossEncoder
reranker = CrossEncoder("BAAI/bge-reranker-v2-m3", max_length=512)
def rerank(query, candidates, top_k=5):
"""Rerankare candidati usando cross-encoder."""
pairs = [[query, candidate["content"]] for candidate in candidates]
scores = reranker.predict(pairs)
for i, candidate in enumerate(candidates):
candidate["rerank_score"] = float(scores[i])
ranked = sorted(candidates, key=lambda x: x["rerank_score"], reverse=True)
return ranked[:top_k]
API di Rerank di Cohere
Per i team che preferiscono infrastruttura gestita:
import cohere
co = cohere.ClientV2()
results = co.rerank(
model="rerank-v3.5",
query="configurazione backup PITR per PostgreSQL",
documents=[chunk["content"] for chunk in candidates],
top_n=5,
return_documents=True,
)
reranked = [
{"content": r.document.text, "relevance_score": r.relevance_score}
for r in results.results
]
Misurazione dell'Impatto
Nella nostra pipeline di produzione (2.3M chunk, 10K query di test):
| Configurazione | Recall@5 | MRR | Rilevanza Risposta | |---|---|---|---| | Solo vettoriale | 0.62 | 0.54 | 0.71 | | Ibrida (BM25 + vettoriale) | 0.74 | 0.63 | 0.78 | | Ibrida + reranker | 0.83 | 0.76 | 0.89 | | Ibrida + reranker + intestazioni contestuali | 0.87 | 0.81 | 0.91 |
Il reranker da solo rappresenta un miglioramento di 12 punti nel recall. Combinato con ricerca ibrida e intestazioni contestuali, la pipeline recupera l'87% dei chunk rilevanti nei top 5 risultati.
Costruzione del Prompt
Dopo il retrieval e il reranking, come costruisci il prompt del LLM conta più di quanto la maggior parte dei team creda.
Gestione della Finestra di Contesto
def build_rag_prompt(query, retrieved_chunks, max_context_tokens=6000):
"""Costruire prompt con iniezione di contesto consapevole del budget di token."""
context_parts = []
token_count = 0
for chunk in retrieved_chunks:
chunk_tokens = count_tokens(chunk["content"])
if token_count + chunk_tokens > max_context_tokens:
break
context_parts.append(
f"[Fonte: {chunk['metadata']['source']}, "
f"Sezione: {chunk['metadata'].get('section', 'N/A')}]\n"
f"{chunk['content']}"
)
token_count += chunk_tokens
context = "\n\n---\n\n".join(context_parts)
return f"""Rispondi alla seguente domanda basandoti sul contesto fornito.
Se il contesto non contiene informazioni sufficienti per rispondere completamente, dichiaralo esplicitamente.
Cita il documento sorgente per ogni affermazione.
Contesto:
{context}
Domanda: {query}
Risposta:"""
Grounding con Citazioni
Forza il modello a citare le fonti richiedendo riferimenti inline:
Per ogni affermazione fattuale nella tua risposta, includi una citazione nel formato [Fonte: nome_file, Sezione: nome_sezione].
Se non riesci a trovare evidenza di supporto nel contesto per un'affermazione, prefissala con [Non supportato].
Questo rende la rilevazione delle allucinazioni banale — qualsiasi affermazione contrassegnata come [Non supportato] o senza citazione è sospetta.
Framework di Valutazione
Una pipeline RAG senza valutazione è una demo, non un prodotto. La valutazione avviene su tre livelli.
Livello 1: Qualità del Retrieval
Misurata indipendentemente dal LLM:
def evaluate_retrieval(test_queries, ground_truth, retriever, k=5):
"""Valutare il retrieval con metriche IR standard."""
recalls, mrrs, ndcgs = [], [], []
for query, relevant_ids in zip(test_queries, ground_truth):
retrieved = retriever.search(query, k=k)
retrieved_ids = [r["id"] for r in retrieved]
# Recall@k
hits = len(set(retrieved_ids) & set(relevant_ids))
recalls.append(hits / len(relevant_ids))
# MRR
for rank, rid in enumerate(retrieved_ids, 1):
if rid in relevant_ids:
mrrs.append(1.0 / rank)
break
else:
mrrs.append(0.0)
return {
"recall@k": np.mean(recalls),
"mrr": np.mean(mrrs),
}
Livello 2: Qualità della Generazione con RAGAS
RAGAS valuta la pipeline completa — retrieval e generazione insieme:
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
results = evaluate(
dataset=eval_dataset, # Domande + risposte ground truth + contesti
metrics=[
faithfulness, # La risposta rimane fedele al contesto recuperato?
answer_relevancy, # La risposta è rilevante alla domanda?
context_precision, # I chunk recuperati sono rilevanti?
context_recall, # Sono stati recuperati tutti i chunk necessari?
],
)
print(results)
# {'faithfulness': 0.89, 'answer_relevancy': 0.91,
# 'context_precision': 0.82, 'context_recall': 0.87}
Faithfulness è la metrica più importante — misura l'allucinazione. Un punteggio di faithfulness sotto 0.85 significa che la pipeline sta generando affermazioni non supportate dal contesto recuperato.
Livello 3: Valutazione Umana
Automatizzare la raccolta, valutare manualmente:
def sample_for_human_eval(production_queries, n=200):
"""Campionare query stratificate per difficoltà e tema."""
# Classificare query per difficoltà stimata
easy = [q for q in production_queries if q["retrieval_confidence"] > 0.8]
medium = [q for q in production_queries if 0.5 < q["retrieval_confidence"] <= 0.8]
hard = [q for q in production_queries if q["retrieval_confidence"] <= 0.5]
sample = (
random.sample(easy, min(80, len(easy))) +
random.sample(medium, min(80, len(medium))) +
random.sample(hard, min(40, len(hard)))
)
return sample
I valutatori umani giudicano su tre assi: correttezza (fattualmente accurata), completezza (copre la domanda integralmente) e fondatezza (cita il contesto, non allucina).
Architettura di Produzione
Panoramica della Pipeline
Documenti → Pre-elaborazione → Chunking → Embedding → Vector Store
↓
Query Utente → Espansione Query → Ricerca Ibrida → Reranker → Costruttore Prompt → LLM → Risposta
↓
Logger di Valutazione
Espansione delle Query
Riscrivere la query dell'utente prima del retrieval per migliorare il recall:
def expand_query(query, llm):
"""Generare riformulazioni alternative per migliorare il retrieval."""
expansion_prompt = f"""Genera 3 riformulazioni alternative di questa query di ricerca.
Ciascuna deve catturare la stessa intenzione ma usare terminologia diversa.
Restituisci solo le query, una per riga.
Query: {query}"""
alternatives = llm.generate(expansion_prompt).strip().split("\n")
return [query] + alternatives[:3]
Eseguire ricerca ibrida su tutte le query espanse e unire i risultati con RRF. Questo aggiunge consistentemente un miglioramento del 3-5% nel recall.
Layer di Cache
Cache su due livelli per ridurre latenza e costi:
- Cache degli embedding: Hash del testo di input, cache del vettore embedding. Evita di rigenerare embedding per query identiche.
- Cache semantica: Per query con similarità coseno > 0.95 rispetto a una query in cache, restituire la risposta in cache. Riduce le chiamate al LLM del 20-40% nei casi d'uso di supporto dove domande simili si ripetono.
Monitoraggio
Tracciare queste metriche in produzione:
- Latenza di retrieval (p50, p95, p99) — obiettivo < 100ms per ibrida + rerank
- Punteggio di faithfulness su un campione rotativo — allarme se scende sotto 0.85
- Tasso di retrieval vuoto — query dove nessun chunk supera la soglia di rilevanza
- Segnali di feedback utente — thumbs up/down, eventi di copia, domande di follow-up
Quando RAG Non È Sufficiente
RAG ha limiti ben definiti:
- Ragionamento multi-hop: Quando la risposta richiede di sintetizzare informazioni da 5+ documenti con passaggi inferenziali tra di essi, il retrieval RAG spesso manca i documenti intermedi.
- Ragionamento temporale: "Cosa è cambiato tra Q3 e Q4?" richiede di recuperare e confrontare due set di documenti specifici nel tempo — il retrieval top-k standard non è progettato per questo.
- Computazione sui dati: "Qual è l'SLA medio in tutte le regioni?" richiede una query strutturata, non retrieval testuale.
Per questi casi, considera il RAG agentico — un agente che pianifica i passaggi di retrieval, esegue ricerche multiple e sintetizza i risultati programmaticamente — o approcci ibridi che combinano RAG con query SQL e chiamate API.