Observability for Retrieval
Series
Building Production RAGTraditional service observability — latency percentiles, error rates, uptime — is necessary but nowhere near sufficient for a RAG system. You can have 99.9% uptime, sub-200ms p99 latency, and zero HTTP 500s while silently returning hallucinated answers to 20% of queries. None of your infrastructure dashboards will show it.
Observability for retrieval means instrumenting the semantic layer, not just the infrastructure layer. This post covers what to log, what to dashboard, and what to alert on.
The Three Visibility Gaps
Standard observability covers compute. RAG introduces three gaps that require additional instrumentation:
Gap 1: Retrieval quality drift — Your embedding model doesn't change, but your corpus does. Documents get stale, new document types get added, terminology shifts. Retrieval quality degrades slowly and invisibly.
Gap 2: Context utilization failure — The right chunks are retrieved but the LLM ignores them or contradicts them. This shows up as hallucinations that look plausible, not as errors.
Gap 3: Distribution shift in queries — Users start asking questions your system wasn't designed for. The query distribution drifts from your golden set, and your evaluation metrics no longer reflect production behavior.
What to Log
Log everything at query time. Storage is cheap; missing data when you need to debug is not. Structure your logs so they're queryable:
import time
import uuid
from dataclasses import dataclass, asdict, field
from typing import Optional
import json
@dataclass
class RetrievalSpan:
span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
query: str = ""
query_embedding_latency_ms: float = 0.0
embedding_model: str = ""
bm25_latency_ms: float = 0.0
vector_latency_ms: float = 0.0
rerank_latency_ms: float = 0.0
total_retrieval_latency_ms: float = 0.0
candidates_fetched: int = 0
chunks_returned: int = 0
chunk_ids: list[str] = field(default_factory=list)
chunk_scores: list[float] = field(default_factory=list)
rerank_model: Optional[str] = None
generation_model: str = ""
prompt_tokens: int = 0
output_tokens: int = 0
generation_latency_ms: float = 0.0
total_latency_ms: float = 0.0
cache_hit: bool = False
cache_type: Optional[str] = None # "exact" | "semantic" | None
user_id: Optional[str] = None
session_id: Optional[str] = None
feedback_signal: Optional[str] = None # "thumbs_up" | "thumbs_down" | None
class RAGTracer:
def __init__(self, log_sink):
"""log_sink: callable that accepts a dict — write to BigQuery, ClickHouse, S3, etc."""
self._sink = log_sink
def emit(self, span: RetrievalSpan) -> None:
record = asdict(span)
record["ts"] = int(time.time())
self._sink(record)
def stdout_sink(record: dict) -> None:
print(json.dumps(record))
tracer = RAGTracer(log_sink=stdout_sink)Never log raw user queries without reviewing your privacy policy and data retention requirements. If your system handles PII, hash or redact query content before it reaches your log store.
What to Dashboard
Three categories of dashboards, one per audience:
Engineering dashboard — latency breakdown, error rates, cache hit rates:
# SQL templates for ClickHouse / BigQuery
LATENCY_BY_COMPONENT = """
SELECT
toStartOfHour(FROM_UNIXTIME(ts)) AS hour,
avg(query_embedding_latency_ms) AS p50_embed,
quantile(0.99)(query_embedding_latency_ms) AS p99_embed,
avg(total_retrieval_latency_ms) AS p50_retrieval,
quantile(0.99)(total_retrieval_latency_ms) AS p99_retrieval,
avg(generation_latency_ms) AS p50_gen,
quantile(0.99)(generation_latency_ms) AS p99_gen,
countIf(cache_hit = 1) / count() AS cache_hit_rate
FROM rag_spans
WHERE ts > now() - INTERVAL 24 HOUR
GROUP BY hour
ORDER BY hour
"""
COST_BY_HOUR = """
SELECT
toStartOfHour(FROM_UNIXTIME(ts)) AS hour,
sum(prompt_tokens) / 1e6 * 5.00 AS llm_input_cost_usd,
sum(output_tokens) / 1e6 * 15.00 AS llm_output_cost_usd,
count() AS query_count
FROM rag_spans
WHERE ts > now() - INTERVAL 24 HOUR
GROUP BY hour
ORDER BY hour
"""Quality dashboard — retrieval score distributions, LLM judge scores:
Track the distribution of top-1 rerank scores over time. A shift in the score distribution (scores dropping from median 0.85 to median 0.71) is an early signal of retrieval quality degradation before users complain.
SCORE_DISTRIBUTION = """
SELECT
toStartOfDay(FROM_UNIXTIME(ts)) AS day,
quantile(0.25)(chunk_scores[1]) AS p25_top_score,
quantile(0.50)(chunk_scores[1]) AS median_top_score,
quantile(0.75)(chunk_scores[1]) AS p75_top_score,
countIf(chunk_scores[1] < 0.5)
/ count() AS low_confidence_rate
FROM rag_spans
WHERE length(chunk_scores) > 0
GROUP BY day
ORDER BY day
"""Product dashboard — for non-engineers: queries per day, user satisfaction rate, top query categories.
What to Alert On
Alerts should be specific and actionable. Never alert on something you can't do anything about within 15 minutes.
ALERT_RULES = [
{
"name": "high_p99_latency",
"query": "SELECT quantile(0.99)(total_latency_ms) FROM rag_spans WHERE ts > now() - INTERVAL 5 MINUTE",
"threshold": 5000, # 5 seconds
"comparison": "gt",
"severity": "page",
"runbook": "Check vector DB latency, check if GPU is saturated for reranker",
},
{
"name": "low_cache_hit_rate",
"query": "SELECT countIf(cache_hit) / count() FROM rag_spans WHERE ts > now() - INTERVAL 1 HOUR",
"threshold": 0.10, # below 10% — may indicate cache invalidation bug
"comparison": "lt",
"severity": "warn",
"runbook": "Check Redis connectivity, check if cache TTL was accidentally set to 0",
},
{
"name": "retrieval_score_drop",
"query": "SELECT median(chunk_scores[1]) FROM rag_spans WHERE ts > now() - INTERVAL 1 HOUR",
"threshold": 0.55, # tune to your system's normal operating range
"comparison": "lt",
"severity": "warn",
"runbook": "Check if corpus was updated with low-quality documents, check embedding model version",
},
{
"name": "high_error_rate",
"query": "SELECT countIf(generation_latency_ms = 0) / count() FROM rag_spans WHERE ts > now() - INTERVAL 5 MINUTE",
"threshold": 0.05, # >5% of queries failing
"comparison": "gt",
"severity": "page",
"runbook": "Check LLM API status page, check API key rate limits",
},
]Detecting Query Distribution Shift
If your production queries drift away from your golden set, your evaluation metrics stop being meaningful. Detect this by comparing the embedding distribution of recent production queries against your golden set queries:
import numpy as np
from scipy.stats import ks_2samp
def detect_query_drift(
golden_query_embeddings: np.ndarray, # shape: (n_golden, dim)
recent_query_embeddings: np.ndarray, # shape: (n_recent, dim)
significance: float = 0.05,
) -> dict:
"""
Use Kolmogorov-Smirnov test on PCA-projected embeddings
to detect distribution shift.
"""
from sklearn.decomposition import PCA
pca = PCA(n_components=10)
pca.fit(golden_query_embeddings)
golden_proj = pca.transform(golden_query_embeddings)
recent_proj = pca.transform(recent_query_embeddings)
drift_detected = False
component_results = {}
for i in range(10):
stat, p_value = ks_2samp(golden_proj[:, i], recent_proj[:, i])
if p_value < significance:
drift_detected = True
component_results[f"pc{i}"] = {"ks_stat": round(stat, 4), "p_value": round(p_value, 4)}
return {
"drift_detected": drift_detected,
"n_golden": len(golden_query_embeddings),
"n_recent": len(recent_query_embeddings),
"components": component_results,
}Run this weekly on the last 7 days of production queries. When drift is detected, sample 20 recent queries, inspect them manually, and decide if the golden set needs expansion or if query routing needs adjustment.
The Feedback Loop Architecture
The most valuable observability signal is explicit user feedback — thumbs up/down, ratings, corrections. Build an endpoint to receive it and wire it back to your span store:
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
app = FastAPI()
class FeedbackRequest(BaseModel):
span_id: str
signal: str # "thumbs_up" | "thumbs_down" | "correction"
correction_text: str | None = None
@app.post("/feedback")
async def record_feedback(
req: FeedbackRequest,
background_tasks: BackgroundTasks,
):
background_tasks.add_task(
update_span_feedback,
span_id=req.span_id,
signal=req.signal,
correction=req.correction_text,
)
return {"status": "ok"}
async def update_span_feedback(span_id: str, signal: str, correction: str | None):
# UPDATE rag_spans SET feedback_signal = ? WHERE span_id = ?
pass # implement against your log storeAim for a feedback rate above 5% — below that, signals are too sparse to act on. Prompt users proactively for feedback on responses where the retrieval confidence was low (top-1 rerank score below 0.6).
Key Takeaways
- Standard infrastructure metrics (latency, uptime, error rate) are necessary but blind to semantic quality degradation — instrument the retrieval layer separately.
- Log every span with chunk IDs, scores, latencies, and token counts at query time; storage is cheap, missing forensics when debugging are not.
- Track retrieval score distributions over time — a shift in median top-1 score is an early warning of corpus or embedding quality problems.
- Alert on retrieval score drops and high p99 latency, not just HTTP error rates; include a runbook URL in every alert definition.
- Run KS-based query distribution drift detection weekly to catch when production queries have moved beyond your golden set's coverage.
- Explicit user feedback (thumbs down, corrections) is your most valuable signal; build the feedback endpoint on day one, not as a post-launch feature.