Skip to main content
Building Production RAG

Caching, Batching, and Cost Control

Ravinder··8 min read
RAGAILLMCost OptimizationCachingMLOps
Share:
Caching, Batching, and Cost Control

The cost profile of a production RAG system surprises most teams after launch. They budget for LLM token costs and ignore embedding costs. They optimize the hot path and miss the fact that 40% of queries are exact duplicates from impatient users hitting refresh. They batch offline jobs but call embedding APIs one document at a time in the indexing pipeline.

This post is about the concrete cost math and the three levers that move it: caching, batching, and model selection per operation type.

The Real Cost Breakdown

Before optimizing, measure. A typical RAG query touches three billable operations:

Operation Model Cost (per 1M tokens) Tokens per query Cost per query
Query embedding text-embedding-3-small $0.02 ~50 $0.000001
Re-ranking (if API) Cohere rerank-v3 $1.00/1K calls $0.001
LLM generation gpt-4o $5.00 input / $15 output ~3K in / ~500 out $0.022

At 10K queries/day: embedding cost is negligible ($0.30/month), re-ranking is ~$300/month, and LLM generation is ~$6,600/month. The generation cost dominates by two orders of magnitude. This tells you where to focus.

from dataclasses import dataclass
 
@dataclass
class CostModel:
    embedding_cost_per_1m_tokens: float = 0.02     # text-embedding-3-small
    rerank_cost_per_1k_calls: float = 1.00          # Cohere rerank-v3
    llm_input_cost_per_1m_tokens: float = 5.00      # gpt-4o
    llm_output_cost_per_1m_tokens: float = 15.00    # gpt-4o
 
    def estimate_daily_cost(
        self,
        daily_queries: int,
        avg_query_tokens: int = 50,
        avg_context_tokens: int = 3000,
        avg_output_tokens: int = 500,
        rerank_enabled: bool = True,
    ) -> dict[str, float]:
        embed_cost = (daily_queries * avg_query_tokens / 1_000_000) * self.embedding_cost_per_1m_tokens
        rerank_cost = (daily_queries / 1000) * self.rerank_cost_per_1k_calls if rerank_enabled else 0
        llm_input_cost = (daily_queries * avg_context_tokens / 1_000_000) * self.llm_input_cost_per_1m_tokens
        llm_output_cost = (daily_queries * avg_output_tokens / 1_000_000) * self.llm_output_cost_per_1m_tokens
        total = embed_cost + rerank_cost + llm_input_cost + llm_output_cost
        return {
            "embedding": round(embed_cost, 4),
            "reranking": round(rerank_cost, 4),
            "llm_input": round(llm_input_cost, 4),
            "llm_output": round(llm_output_cost, 4),
            "total_per_day": round(total, 4),
            "total_per_month": round(total * 30, 2),
        }
 
# 10K queries/day:
model = CostModel()
print(model.estimate_daily_cost(daily_queries=10_000))

Lever 1: Query-Result Caching

The highest-ROI cache is the simplest: cache the full RAG response for identical or near-identical queries. In most production systems, 20–40% of queries are repeats within a rolling 24-hour window.

flowchart LR Q[Incoming Query] --> NORM[Normalize\nlowercase, strip punct] NORM --> HASH[SHA-256 hash] HASH --> CACHE{Cache hit?} CACHE -->|Hit| RESP[Return cached response\n~1ms] CACHE -->|Miss| RAG[Full RAG pipeline\n~1-2s] RAG --> STORE[Store in cache\nwith TTL] STORE --> RESP
import hashlib
import json
import re
import time
from typing import Optional
import redis
 
class QueryCache:
    def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
        self.redis = redis_client
        self.default_ttl = default_ttl
 
    @staticmethod
    def _normalize(query: str) -> str:
        query = query.lower().strip()
        query = re.sub(r"[^\w\s]", " ", query)
        return re.sub(r"\s+", " ", query)
 
    def _cache_key(self, query: str) -> str:
        normalized = self._normalize(query)
        return f"rag:query:{hashlib.sha256(normalized.encode()).hexdigest()}"
 
    def get(self, query: str) -> Optional[dict]:
        key = self._cache_key(query)
        value = self.redis.get(key)
        if value:
            result = json.loads(value)
            result["_cache_hit"] = True
            return result
        return None
 
    def set(self, query: str, response: dict, ttl: Optional[int] = None) -> None:
        key = self._cache_key(query)
        payload = {k: v for k, v in response.items() if k != "_cache_hit"}
        self.redis.setex(key, ttl or self.default_ttl, json.dumps(payload))

Cache invalidation strategy: set TTL based on how often your corpus changes. If your documentation updates daily, use a 6-hour TTL. If it updates weekly, a 48-hour TTL is fine. For time-sensitive queries (anything involving dates or "current"), detect them and skip the cache or set a 15-minute TTL.

Lever 2: Semantic Query Deduplication

Exact-match caching misses paraphrases. "How do I cancel my subscription" and "where do I go to unsubscribe" are the same query. A semantic cache uses embedding similarity to find near-duplicate queries:

import numpy as np
from dataclasses import dataclass
 
@dataclass
class SemanticCacheEntry:
    query: str
    embedding: np.ndarray
    response: dict
    created_at: float
    ttl: int
 
class SemanticQueryCache:
    def __init__(self, similarity_threshold: float = 0.92, max_entries: int = 10_000):
        self.threshold = similarity_threshold
        self.max_entries = max_entries
        self._entries: list[SemanticCacheEntry] = []
 
    def _embed(self, query: str) -> np.ndarray:
        # plug in your embedding model here
        raise NotImplementedError
 
    def get(self, query: str) -> Optional[dict]:
        q_emb = self._embed(query)
        now = time.time()
 
        # Remove expired entries
        self._entries = [e for e in self._entries if now - e.created_at < e.ttl]
 
        if not self._entries:
            return None
 
        emb_matrix = np.stack([e.embedding for e in self._entries])
        sims = emb_matrix @ q_emb
        best_idx = int(np.argmax(sims))
 
        if sims[best_idx] >= self.threshold:
            result = dict(self._entries[best_idx].response)
            result["_cache_hit"] = True
            result["_cache_similarity"] = float(sims[best_idx])
            return result
        return None
 
    def set(self, query: str, response: dict, ttl: int = 3600) -> None:
        q_emb = self._embed(query)
        entry = SemanticCacheEntry(
            query=query, embedding=q_emb,
            response=response, created_at=time.time(), ttl=ttl,
        )
        self._entries.append(entry)
        if len(self._entries) > self.max_entries:
            self._entries = self._entries[-self.max_entries:]

At scale, use a vector database (Qdrant, Redis with vector search) as the backing store instead of an in-memory list.

Lever 3: Embedding Cache for Indexing

If you're re-embedding documents after minor edits, you're wasting money and time. Hash document content and cache embeddings keyed by content hash:

import hashlib
import numpy as np
import sqlite3
from pathlib import Path
 
class EmbeddingCache:
    """Persistent embedding cache backed by SQLite. Survives restarts."""
 
    def __init__(self, db_path: str = "/tmp/embedding_cache.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS embeddings (
                content_hash TEXT PRIMARY KEY,
                model_name TEXT NOT NULL,
                embedding BLOB NOT NULL,
                created_at INTEGER NOT NULL
            )
        """)
        self.conn.commit()
 
    @staticmethod
    def _hash(text: str, model: str) -> str:
        payload = f"{model}::{text}"
        return hashlib.sha256(payload.encode()).hexdigest()
 
    def get(self, text: str, model: str) -> Optional[np.ndarray]:
        h = self._hash(text, model)
        row = self.conn.execute(
            "SELECT embedding FROM embeddings WHERE content_hash = ?", (h,)
        ).fetchone()
        if row:
            return np.frombuffer(row[0], dtype=np.float32)
        return None
 
    def set(self, text: str, model: str, embedding: np.ndarray) -> None:
        h = self._hash(text, model)
        self.conn.execute(
            "INSERT OR REPLACE INTO embeddings VALUES (?, ?, ?, ?)",
            (h, model, embedding.astype(np.float32).tobytes(), int(time.time())),
        )
        self.conn.commit()

This is especially valuable during iterative pipeline development when you're re-running indexing frequently.

Lever 4: Batching API Calls

One-document-at-a-time embedding API calls are an expensive habit. Most embedding APIs support batches of 256–2048 texts per request; throughput is 10–100x higher in batch mode.

import openai
import numpy as np
from typing import Generator
 
def embed_in_batches(
    texts: list[str],
    model: str = "text-embedding-3-small",
    batch_size: int = 512,
    cache: Optional[EmbeddingCache] = None,
) -> np.ndarray:
    client = openai.OpenAI()
    all_embeddings = [None] * len(texts)
    uncached_indices = []
 
    # Check cache first
    for i, text in enumerate(texts):
        if cache:
            cached = cache.get(text, model)
            if cached is not None:
                all_embeddings[i] = cached
                continue
        uncached_indices.append(i)
 
    # Batch embed uncached texts
    uncached_texts = [texts[i] for i in uncached_indices]
    for batch_start in range(0, len(uncached_texts), batch_size):
        batch = uncached_texts[batch_start : batch_start + batch_size]
        response = client.embeddings.create(model=model, input=batch)
        for j, embedding_data in enumerate(response.data):
            original_idx = uncached_indices[batch_start + j]
            emb = np.array(embedding_data.embedding, dtype=np.float32)
            all_embeddings[original_idx] = emb
            if cache:
                cache.set(texts[original_idx], model, emb)
 
    return np.stack(all_embeddings)

Lever 5: Model Tiering

Not all queries deserve the same LLM. Route by complexity:

def route_query_to_model(query: str, context_chunks: list[dict]) -> str:
    """
    Simple routing: use a cheaper model for simple factual lookups.
    Complex/long queries get the full model.
    """
    query_words = len(query.split())
    context_tokens = sum(len(c["text"].split()) for c in context_chunks)
    has_comparison = any(w in query.lower() for w in ["compare", "difference", "vs", "versus", "better"])
    has_reasoning = any(w in query.lower() for w in ["why", "explain", "how does", "analyze"])
 
    if context_tokens < 1500 and query_words < 20 and not has_comparison and not has_reasoning:
        return "gpt-4o-mini"  # ~10x cheaper than gpt-4o
    return "gpt-4o"

In practice, 60–70% of simple RAG queries (single factual lookups, definitions, brief summaries) can be routed to gpt-4o-mini with imperceptible quality loss. That alone cuts LLM cost by 40–50% at the system level.

The Cost Dashboard You Need

Track these metrics weekly:

  • Cost per query (broken down by component)
  • Cache hit rate (target: >30% for query cache, >60% for embedding cache)
  • Model distribution (what % of queries hit each tier)
  • Cost per DAU (rising cost per active user is a warning sign)

At 10K queries/day with good caching (35% hit rate) and model tiering (65% on mini): monthly LLM cost drops from ~$6,600 to ~$2,800. That's a meaningful margin improvement without touching model quality.

Key Takeaways

  • LLM generation cost dominates by two orders of magnitude — optimize there first, not on embeddings.
  • Exact-match query caching with a 6–48h TTL eliminates 20–40% of queries before they hit the LLM; it's the highest-ROI single optimization.
  • Semantic query caching catches paraphrase duplicates that exact hashing misses — worth adding after exact caching is working.
  • Embedding caches (content-hash keyed) prevent re-embedding unchanged documents during iterative indexing pipeline development.
  • Batch your embedding API calls; one-at-a-time calls are 10–100x less throughput-efficient than full batches.
  • Model tiering (routing simple queries to cheaper models) typically reduces LLM cost by 40–50% with negligible quality impact.
Share: