Caching, Batching, and Cost Control
Series
Building Production RAGThe cost profile of a production RAG system surprises most teams after launch. They budget for LLM token costs and ignore embedding costs. They optimize the hot path and miss the fact that 40% of queries are exact duplicates from impatient users hitting refresh. They batch offline jobs but call embedding APIs one document at a time in the indexing pipeline.
This post is about the concrete cost math and the three levers that move it: caching, batching, and model selection per operation type.
The Real Cost Breakdown
Before optimizing, measure. A typical RAG query touches three billable operations:
| Operation | Model | Cost (per 1M tokens) | Tokens per query | Cost per query |
|---|---|---|---|---|
| Query embedding | text-embedding-3-small | $0.02 | ~50 | $0.000001 |
| Re-ranking (if API) | Cohere rerank-v3 | $1.00/1K calls | — | $0.001 |
| LLM generation | gpt-4o | $5.00 input / $15 output | ~3K in / ~500 out | $0.022 |
At 10K queries/day: embedding cost is negligible ($0.30/month), re-ranking is ~$300/month, and LLM generation is ~$6,600/month. The generation cost dominates by two orders of magnitude. This tells you where to focus.
from dataclasses import dataclass
@dataclass
class CostModel:
embedding_cost_per_1m_tokens: float = 0.02 # text-embedding-3-small
rerank_cost_per_1k_calls: float = 1.00 # Cohere rerank-v3
llm_input_cost_per_1m_tokens: float = 5.00 # gpt-4o
llm_output_cost_per_1m_tokens: float = 15.00 # gpt-4o
def estimate_daily_cost(
self,
daily_queries: int,
avg_query_tokens: int = 50,
avg_context_tokens: int = 3000,
avg_output_tokens: int = 500,
rerank_enabled: bool = True,
) -> dict[str, float]:
embed_cost = (daily_queries * avg_query_tokens / 1_000_000) * self.embedding_cost_per_1m_tokens
rerank_cost = (daily_queries / 1000) * self.rerank_cost_per_1k_calls if rerank_enabled else 0
llm_input_cost = (daily_queries * avg_context_tokens / 1_000_000) * self.llm_input_cost_per_1m_tokens
llm_output_cost = (daily_queries * avg_output_tokens / 1_000_000) * self.llm_output_cost_per_1m_tokens
total = embed_cost + rerank_cost + llm_input_cost + llm_output_cost
return {
"embedding": round(embed_cost, 4),
"reranking": round(rerank_cost, 4),
"llm_input": round(llm_input_cost, 4),
"llm_output": round(llm_output_cost, 4),
"total_per_day": round(total, 4),
"total_per_month": round(total * 30, 2),
}
# 10K queries/day:
model = CostModel()
print(model.estimate_daily_cost(daily_queries=10_000))Lever 1: Query-Result Caching
The highest-ROI cache is the simplest: cache the full RAG response for identical or near-identical queries. In most production systems, 20–40% of queries are repeats within a rolling 24-hour window.
import hashlib
import json
import re
import time
from typing import Optional
import redis
class QueryCache:
def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
self.redis = redis_client
self.default_ttl = default_ttl
@staticmethod
def _normalize(query: str) -> str:
query = query.lower().strip()
query = re.sub(r"[^\w\s]", " ", query)
return re.sub(r"\s+", " ", query)
def _cache_key(self, query: str) -> str:
normalized = self._normalize(query)
return f"rag:query:{hashlib.sha256(normalized.encode()).hexdigest()}"
def get(self, query: str) -> Optional[dict]:
key = self._cache_key(query)
value = self.redis.get(key)
if value:
result = json.loads(value)
result["_cache_hit"] = True
return result
return None
def set(self, query: str, response: dict, ttl: Optional[int] = None) -> None:
key = self._cache_key(query)
payload = {k: v for k, v in response.items() if k != "_cache_hit"}
self.redis.setex(key, ttl or self.default_ttl, json.dumps(payload))Cache invalidation strategy: set TTL based on how often your corpus changes. If your documentation updates daily, use a 6-hour TTL. If it updates weekly, a 48-hour TTL is fine. For time-sensitive queries (anything involving dates or "current"), detect them and skip the cache or set a 15-minute TTL.
Lever 2: Semantic Query Deduplication
Exact-match caching misses paraphrases. "How do I cancel my subscription" and "where do I go to unsubscribe" are the same query. A semantic cache uses embedding similarity to find near-duplicate queries:
import numpy as np
from dataclasses import dataclass
@dataclass
class SemanticCacheEntry:
query: str
embedding: np.ndarray
response: dict
created_at: float
ttl: int
class SemanticQueryCache:
def __init__(self, similarity_threshold: float = 0.92, max_entries: int = 10_000):
self.threshold = similarity_threshold
self.max_entries = max_entries
self._entries: list[SemanticCacheEntry] = []
def _embed(self, query: str) -> np.ndarray:
# plug in your embedding model here
raise NotImplementedError
def get(self, query: str) -> Optional[dict]:
q_emb = self._embed(query)
now = time.time()
# Remove expired entries
self._entries = [e for e in self._entries if now - e.created_at < e.ttl]
if not self._entries:
return None
emb_matrix = np.stack([e.embedding for e in self._entries])
sims = emb_matrix @ q_emb
best_idx = int(np.argmax(sims))
if sims[best_idx] >= self.threshold:
result = dict(self._entries[best_idx].response)
result["_cache_hit"] = True
result["_cache_similarity"] = float(sims[best_idx])
return result
return None
def set(self, query: str, response: dict, ttl: int = 3600) -> None:
q_emb = self._embed(query)
entry = SemanticCacheEntry(
query=query, embedding=q_emb,
response=response, created_at=time.time(), ttl=ttl,
)
self._entries.append(entry)
if len(self._entries) > self.max_entries:
self._entries = self._entries[-self.max_entries:]At scale, use a vector database (Qdrant, Redis with vector search) as the backing store instead of an in-memory list.
Lever 3: Embedding Cache for Indexing
If you're re-embedding documents after minor edits, you're wasting money and time. Hash document content and cache embeddings keyed by content hash:
import hashlib
import numpy as np
import sqlite3
from pathlib import Path
class EmbeddingCache:
"""Persistent embedding cache backed by SQLite. Survives restarts."""
def __init__(self, db_path: str = "/tmp/embedding_cache.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
content_hash TEXT PRIMARY KEY,
model_name TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at INTEGER NOT NULL
)
""")
self.conn.commit()
@staticmethod
def _hash(text: str, model: str) -> str:
payload = f"{model}::{text}"
return hashlib.sha256(payload.encode()).hexdigest()
def get(self, text: str, model: str) -> Optional[np.ndarray]:
h = self._hash(text, model)
row = self.conn.execute(
"SELECT embedding FROM embeddings WHERE content_hash = ?", (h,)
).fetchone()
if row:
return np.frombuffer(row[0], dtype=np.float32)
return None
def set(self, text: str, model: str, embedding: np.ndarray) -> None:
h = self._hash(text, model)
self.conn.execute(
"INSERT OR REPLACE INTO embeddings VALUES (?, ?, ?, ?)",
(h, model, embedding.astype(np.float32).tobytes(), int(time.time())),
)
self.conn.commit()This is especially valuable during iterative pipeline development when you're re-running indexing frequently.
Lever 4: Batching API Calls
One-document-at-a-time embedding API calls are an expensive habit. Most embedding APIs support batches of 256–2048 texts per request; throughput is 10–100x higher in batch mode.
import openai
import numpy as np
from typing import Generator
def embed_in_batches(
texts: list[str],
model: str = "text-embedding-3-small",
batch_size: int = 512,
cache: Optional[EmbeddingCache] = None,
) -> np.ndarray:
client = openai.OpenAI()
all_embeddings = [None] * len(texts)
uncached_indices = []
# Check cache first
for i, text in enumerate(texts):
if cache:
cached = cache.get(text, model)
if cached is not None:
all_embeddings[i] = cached
continue
uncached_indices.append(i)
# Batch embed uncached texts
uncached_texts = [texts[i] for i in uncached_indices]
for batch_start in range(0, len(uncached_texts), batch_size):
batch = uncached_texts[batch_start : batch_start + batch_size]
response = client.embeddings.create(model=model, input=batch)
for j, embedding_data in enumerate(response.data):
original_idx = uncached_indices[batch_start + j]
emb = np.array(embedding_data.embedding, dtype=np.float32)
all_embeddings[original_idx] = emb
if cache:
cache.set(texts[original_idx], model, emb)
return np.stack(all_embeddings)Lever 5: Model Tiering
Not all queries deserve the same LLM. Route by complexity:
def route_query_to_model(query: str, context_chunks: list[dict]) -> str:
"""
Simple routing: use a cheaper model for simple factual lookups.
Complex/long queries get the full model.
"""
query_words = len(query.split())
context_tokens = sum(len(c["text"].split()) for c in context_chunks)
has_comparison = any(w in query.lower() for w in ["compare", "difference", "vs", "versus", "better"])
has_reasoning = any(w in query.lower() for w in ["why", "explain", "how does", "analyze"])
if context_tokens < 1500 and query_words < 20 and not has_comparison and not has_reasoning:
return "gpt-4o-mini" # ~10x cheaper than gpt-4o
return "gpt-4o"In practice, 60–70% of simple RAG queries (single factual lookups, definitions, brief summaries) can be routed to gpt-4o-mini with imperceptible quality loss. That alone cuts LLM cost by 40–50% at the system level.
The Cost Dashboard You Need
Track these metrics weekly:
- Cost per query (broken down by component)
- Cache hit rate (target: >30% for query cache, >60% for embedding cache)
- Model distribution (what % of queries hit each tier)
- Cost per DAU (rising cost per active user is a warning sign)
At 10K queries/day with good caching (35% hit rate) and model tiering (65% on mini): monthly LLM cost drops from ~$6,600 to ~$2,800. That's a meaningful margin improvement without touching model quality.
Key Takeaways
- LLM generation cost dominates by two orders of magnitude — optimize there first, not on embeddings.
- Exact-match query caching with a 6–48h TTL eliminates 20–40% of queries before they hit the LLM; it's the highest-ROI single optimization.
- Semantic query caching catches paraphrase duplicates that exact hashing misses — worth adding after exact caching is working.
- Embedding caches (content-hash keyed) prevent re-embedding unchanged documents during iterative indexing pipeline development.
- Batch your embedding API calls; one-at-a-time calls are 10–100x less throughput-efficient than full batches.
- Model tiering (routing simple queries to cheaper models) typically reduces LLM cost by 40–50% with negligible quality impact.