Skip to content

Chapter 3 — Query Engine & Conversation Loop

The query loop is the heartbeat — it calls the model, executes tools, handles errors, and decides when to stop.

Why this system exists

Every coding agent lives or dies by its conversation loop. This is the single piece of code that transforms a stateless LLM API into an autonomous agent: it sends messages, inspects whether the model wants to call a tool or emit final text, executes tool calls, feeds results back, and repeats until the task is done or a budget gate fires. Get this loop wrong and the agent either halts too early, burns money in infinite cycles, or loses context mid-task.

The core loop (shared architecture)

Regardless of which provider you use, every query engine follows the same shape:

while True:
    response = call_model(messages, tools)
    if no tool calls in response:
        break                              # model is done
    for each tool_call in response:
        result = execute(tool_call)
        append result to next input
    if budget_exceeded(turns, cost, tokens):
        break                              # safety stop

The differences between providers are structural (how messages and tool results are formatted) rather than architectural (the loop itself is identical).

Python implementation

import anthropic


async def query_loop(
    client: anthropic.Anthropic,
    messages: list[dict],
    system: str,
    tools: list[dict],
    tool_executor,
    max_turns: int = 10,
):
    """Run the agentic loop using Anthropic Messages API."""
    response = None

    for turn in range(max_turns):
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=8096,
            system=system,
            messages=messages,
            tools=tools,
        )

        # Append assistant response
        messages.append({"role": "assistant", "content": response.content})

        # Check if model wants to use tools
        tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
        if not tool_use_blocks:
            break  # Model is done — no more tool calls

        # Execute each tool and collect results
        tool_results = []
        for block in tool_use_blocks:
            result = await tool_executor.run(block.name, block.input)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": str(result),
            })

        # Append tool results as user message
        messages.append({"role": "user", "content": tool_results})

    # Return final text
    return next(
        (b.text for b in response.content if hasattr(b, "text")),
        "",
    )
import json

from openai import OpenAI


def query_loop(
    client: OpenAI,
    user_input: str,
    instructions: str,
    tools: list[dict],
    tool_executor,
    max_turns: int = 10,
    previous_response_id: str | None = None,
):
    """Run the agentic loop using OpenAI Responses API."""
    current_input = user_input
    response = None

    for turn in range(max_turns):
        response = client.responses.create(
            model="gpt-4.1",
            instructions=instructions,
            input=current_input,
            tools=tools,
            previous_response_id=previous_response_id,
        )
        previous_response_id = response.id

        # Check for function calls
        function_calls = [
            item for item in response.output
            if item.type == "function_call"
        ]
        if not function_calls:
            break  # Model is done

        # Execute and build next input
        results = []
        for call in function_calls:
            result = tool_executor.run(call.name, json.loads(call.arguments))
            results.append({
                "type": "function_call_output",
                "call_id": call.call_id,
                "output": str(result),
            })
        current_input = results

    # Return final text
    return next(
        (item.text for item in response.output if hasattr(item, "text")),
        "",
    )

Conversation state ownership

Anthropic requires you to manage the full messages list yourself — every assistant turn and tool result must be appended explicitly. OpenAI's Responses API chains turns via previous_response_id, so the server holds prior context. Both approaches work; the tradeoff is control vs. convenience.

Streaming

For interactive agents, streaming lets users see partial output as it arrives instead of waiting for the full response.

import anthropic


def stream_response(
    client: anthropic.Anthropic,
    messages: list[dict],
    system: str,
    tools: list[dict],
):
    """Stream a single model turn with Anthropic."""
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=8096,
        system=system,
        messages=messages,
        tools=tools,
    ) as stream:
        for event in stream:
            match event.type:
                case "content_block_start":
                    # A new content block (text or tool_use) is starting
                    pass
                case "content_block_delta":
                    if hasattr(event.delta, "text"):
                        print(event.delta.text, end="", flush=True)
                case "message_stop":
                    # Full message received — inspect for tool calls
                    pass

    # After stream closes, get the accumulated message
    return stream.get_final_message()
from openai import OpenAI


def stream_response(
    client: OpenAI,
    current_input,
    instructions: str,
    tools: list[dict],
):
    """Stream a single model turn with OpenAI."""
    stream = client.responses.create(
        model="gpt-4.1",
        instructions=instructions,
        input=current_input,
        tools=tools,
        stream=True,
    )
    for event in stream:
        if event.type == "response.output_text.delta":
            print(event.delta, end="", flush=True)

Streaming + tool loops

In a full agent, you stream each turn individually, then check the accumulated response for tool calls. The outer while loop stays the same — only the inner call_model step switches from blocking to streaming.

Recovery mechanisms

Things go wrong. The query engine is where you catch and recover from failures instead of letting them crash the session.

Scenario Detection Recovery Limit
Prompt too long API error (context length exceeded) Compact messages — summarize older turns 1 attempt
Max output tokens stop_reason == "max_tokens" Truncate partial output, continue in next turn 3 attempts
Rate limit HTTP 429 Exponential backoff with jitter 5 retries
Model error HTTP 500 Retry with exponential backoff 3 retries
import time
import random


def retry_with_backoff(fn, max_retries: int = 3, base_delay: float = 1.0):
    """Retry a callable with exponential backoff and jitter."""
    for attempt in range(max_retries):
        try:
            return fn()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            time.sleep(delay)

Budget gates

Budget gates prevent runaway agents. Check them after every turn.

from dataclasses import dataclass


@dataclass(frozen=True)
class BudgetGates:
    max_turns: int = 10
    max_cost_usd: float = 5.0
    max_tokens: int = 200_000


def check_budget(
    gates: BudgetGates,
    turn_count: int,
    total_cost: float,
    total_tokens: int,
) -> str | None:
    """Return a reason string if any budget gate is hit, else None."""
    if turn_count >= gates.max_turns:
        return "max_turns_reached"
    if total_cost >= gates.max_cost_usd:
        return "max_cost_reached"
    if total_tokens >= gates.max_tokens:
        return "max_tokens_reached"
    return None

Always enforce budget gates

An agent without budget gates is a credit card attached to a while True loop. Even during development, set conservative defaults and log when gates fire.

Transcript persistence

Crash recovery depends on writing state to disk at the right moments. The pattern is asymmetric on purpose — block before the expensive call, fire-and-forget after it:

Write-before-query pattern:
  User message → save transcript [BLOCKING]    ← crash recovery point
  API call starts...
  Assistant response → save transcript [async]  ← non-blocking
  Turn complete → flush [if eager mode]

The blocking write before the API call ensures that if the process crashes mid-request, you can resume from the last user message without re-executing tools. The async write after the response is safe because the worst case (crash before write completes) just means you re-call the model for one turn.

Build it yourself

A checklist for implementing your own query engine:

  • [ ] One central turn loop — all model calls flow through a single function
  • [ ] Tool result normalization — convert executor output to the provider's expected format before re-injection
  • [ ] Budget gates on every turn — check cost, token count, and turn count after each iteration
  • [ ] Persist before calling — write user messages to disk before the API call, not after
  • [ ] Retry with backoff — handle 429s and 500s in the loop, not in calling code

Key differences: Anthropic vs OpenAI

Concept Anthropic OpenAI
Conversation state messages list (user/assistant) previous_response_id chain
Tool call signal stop_reason == "tool_use" Output contains function_call items
Tool result format tool_result content block in user message function_call_output in next input
Streaming client.messages.stream() context manager stream=True parameter
System prompt system parameter instructions parameter
Token counting response.usage.input_tokens / output_tokens response.usage.input_tokens / output_tokens