Agent Loops That Don't Run Away
The $500 Incident
An agent loop with a subtle bug — a tool that returned an error JSON that the model interpreted as success — ran for 6 hours overnight. By morning it had made 4,800 LLM calls, consumed 12M tokens, and generated $487 in API charges while accomplishing exactly nothing. The team discovered it via a Slack alert from their cloud billing dashboard, not their agent monitoring.
This is not a hypothetical. It happens to teams building agents for the first time. The fix is not better prompting — it is budget enforcement at the infrastructure level.
The Four Budget Dimensions
A production agent loop needs hard limits across four independent dimensions. Any one of them can save you from a runaway loop.
Implementation: The Budget Context
Wrap all budget state in a single object that gets passed into every loop iteration.
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class AgentBudget:
# Hard limits
max_steps: int = 25
max_input_tokens: int = 500_000
max_output_tokens: int = 50_000
max_cost_usd: float = 5.00
max_wall_seconds: float = 300.0 # 5 minutes
# Human checkpoint triggers
checkpoint_every_n_steps: Optional[int] = None
checkpoint_on_tool: Optional[list[str]] = None # e.g. ["write_file", "send_email"]
# State (tracked during execution)
steps_used: int = 0
input_tokens_used: int = 0
output_tokens_used: int = 0
cost_usd_used: float = 0.0
started_at: float = field(default_factory=time.time)
def check(self) -> Optional[str]:
"""Return a halt reason if any budget is exceeded, else None."""
if self.steps_used >= self.max_steps:
return f"step limit reached ({self.steps_used}/{self.max_steps})"
if self.input_tokens_used >= self.max_input_tokens:
return f"input token limit ({self.input_tokens_used}/{self.max_input_tokens})"
if self.output_tokens_used >= self.max_output_tokens:
return f"output token limit"
if self.cost_usd_used >= self.max_cost_usd:
return f"cost limit (${self.cost_usd_used:.2f}/${self.max_cost_usd:.2f})"
elapsed = time.time() - self.started_at
if elapsed >= self.max_wall_seconds:
return f"wall-clock limit ({elapsed:.0f}s/{self.max_wall_seconds:.0f}s)"
return None
def record_step(self, usage: dict, cost: float):
self.steps_used += 1
self.input_tokens_used += usage.get("input_tokens", 0)
self.output_tokens_used += usage.get("output_tokens", 0)
self.cost_usd_used += cost
def needs_checkpoint(self, tool_used: Optional[str] = None) -> bool:
if self.checkpoint_every_n_steps and self.steps_used % self.checkpoint_every_n_steps == 0:
return True
if tool_used and self.checkpoint_on_tool and tool_used in self.checkpoint_on_tool:
return True
return FalseThe Agent Loop
import anthropic
from typing import Callable
client = anthropic.Anthropic()
def run_agent(
task: str,
tools: list,
tool_executor: Callable,
human_approver: Optional[Callable] = None,
budget: Optional[AgentBudget] = None,
) -> dict:
budget = budget or AgentBudget()
messages = [{"role": "user", "content": task}]
result = {"status": "unknown", "halt_reason": None, "steps": 0, "cost": 0.0}
while True:
# Check budgets before each iteration
halt_reason = budget.check()
if halt_reason:
result["status"] = "budget_exceeded"
result["halt_reason"] = halt_reason
escalate(task, messages, halt_reason)
break
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4096,
tools=tools,
messages=messages,
)
cost = compute_cost("claude-opus-4-5", response.usage.model_dump())
budget.record_step(response.usage.model_dump(), cost)
result["steps"] = budget.steps_used
result["cost"] = budget.cost_usd_used
# Natural completion
if response.stop_reason == "end_turn":
result["status"] = "complete"
result["output"] = extract_text(response)
break
# Process tool calls
if response.stop_reason == "tool_use":
tool_results = []
last_tool = None
for block in response.content:
if block.type == "tool_use":
last_tool = block.name
# Human checkpoint before executing high-stakes tools
if budget.needs_checkpoint(tool_used=block.name):
if human_approver:
approved = human_approver(
task=task,
tool=block.name,
input=block.input,
budget=budget,
)
if not approved:
result["status"] = "human_rejected"
break
tool_result = tool_executor(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(tool_result),
})
if result["status"] == "human_rejected":
escalate(task, messages, "human rejected tool execution")
break
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return result
def escalate(task: str, messages: list, reason: str):
"""Notify humans and log for post-mortem."""
logger.error("agent_escalated", extra={
"task": task[:200],
"reason": reason,
"message_count": len(messages),
})
# Send to Slack, PagerDuty, email, etc.
notify_on_call(f"Agent halted: {reason}\nTask: {task[:100]}")Real Failure Modes and How to Catch Them
1. Tool Always Returns Error JSON
The model interprets {"error": "file not found"} as a result and keeps trying variants. The fix: return errors as is_error: true in the tool result, and include loop-breaking instructions in your system prompt.
# Tool executor wrapper
def safe_tool_executor(name: str, input: dict) -> dict:
try:
result = execute_tool(name, input)
return {"success": True, "result": result}
except Exception as e:
return {
"success": False,
"error": str(e),
"is_error": True, # Some providers use this flag
}Add to system prompt: "If a tool returns is_error: true more than twice for the same operation, stop and explain what you tried and why it failed."
2. Infinite Reflection Loop
The model uses a think or scratchpad tool to reason, then calls it again on the output of the previous call. This is a loop the step counter catches — but only if you set max_steps low enough. Default to 25 steps. Raise it only when you have evidence a specific task needs more.
3. Context Window Overflow
After 20+ tool calls, conversation history grows large. Without truncation the next call fails with a context limit error. Handle it:
def truncate_messages_if_needed(messages: list, max_tokens: int = 150_000) -> list:
estimated = estimate_tokens(messages)
if estimated <= max_tokens:
return messages
# Keep system context and last N turns
# Never truncate the initial task message (messages[0])
initial = messages[:1]
recent = messages[-10:] # last 10 messages
return initial + [{"role": "user", "content": "[...earlier context truncated...]"}] + recent4. Silent Cost Accumulation
The model calls a cheap tool (web search, calculator) 200 times inside one apparent "step." The step counter does not catch this. Add per-tool call limits:
tool_call_counts: dict[str, int] = {}
MAX_TOOL_CALLS = {"web_search": 10, "calculator": 20, "_default": 50}
def guarded_tool_executor(name: str, input: dict) -> dict:
count = tool_call_counts.get(name, 0) + 1
tool_call_counts[name] = count
limit = MAX_TOOL_CALLS.get(name, MAX_TOOL_CALLS["_default"])
if count > limit:
raise RuntimeError(f"Tool {name} call limit ({limit}) exceeded")
return safe_tool_executor(name, input)Observability
Every agent execution should emit a structured trace you can query post-mortem.
@dataclass
class AgentTrace:
task_id: str
task_summary: str
status: str
halt_reason: Optional[str]
steps: int
input_tokens: int
output_tokens: int
cost_usd: float
wall_seconds: float
tool_calls: list[dict] # {step, tool, input_summary, success}
escalated: bool
# Emit to your observability stack on every completion
def emit_trace(trace: AgentTrace):
datadog.distribution("agent.cost_usd", trace.cost_usd, tags=["env:prod"])
datadog.distribution("agent.steps", trace.steps)
datadog.increment("agent.status", tags=[f"status:{trace.status}"])
if trace.escalated:
datadog.increment("agent.escalations")Key metrics to alert on:
agent.cost_usdp99 > $1.00 → budget limit too high or runaway patternagent.status{status:budget_exceeded}rate > 5% → tasks are systematically exceeding budgetsagent.stepsp99 > 20 → tasks are more complex than expected or loops are forming
Setting Budgets for New Tasks
When deploying a new agent task type, run 20–50 test cases and measure:
- Median steps, p95 steps, max steps
- Median cost, p95 cost, max cost
- Wall time distribution
Set your limits at 3× the p95 for steps and cost, with a hard floor at the p99. Never guess. Run the data.
# Budget profiler for new task types
def profile_task(task_fn, inputs: list, n_samples: int = 20):
results = [task_fn(i) for i in inputs[:n_samples]]
steps = sorted(r["steps"] for r in results)
costs = sorted(r["cost"] for r in results)
p95_idx = int(len(steps) * 0.95)
print(f"Steps: median={steps[len(steps)//2]}, p95={steps[p95_idx]}, max={steps[-1]}")
print(f"Cost: median={costs[len(costs)//2]:.3f}, p95={costs[p95_idx]:.3f}, max={costs[-1]:.3f}")
print(f"Suggested max_steps: {steps[p95_idx] * 3}")
print(f"Suggested max_cost: ${costs[p95_idx] * 3:.2f}")Key Takeaways
- Budget enforcement must happen at the infrastructure level, not the prompt level — the model cannot reliably police itself.
- Enforce four independent dimensions: step count, token usage, wall-clock time, and dollar cost. Any one of them can catch a runaway loop the others miss.
- Human checkpoints should trigger on step count intervals and on specific high-stakes tool calls (file writes, emails, API mutations), not just on failures.
- Per-tool call limits catch silent cost accumulation from tools the model calls dozens of times within a single logical step.
- Set budget limits at 3× the p95 of observed behavior from profiling runs — never guess at limits for a new task type.
- Emit structured traces for every agent execution and alert on escalation rate and p99 cost — you need telemetry to distinguish deliberate high-cost runs from runaway loops.