Chapter 2 -- Tool Execution Pipeline¶
The tool system from Chapter 1 defines what the model can do. This chapter defines how those tools actually run -- the six-phase pipeline that turns a model-emitted tool call into a validated, permitted, scheduled execution with its result reinjected into the conversation.
Why this system exists¶
The model asking for a tool is not the same thing as a tool being allowed to run. Without a structured pipeline, validation happens in random order, permission checks get skipped, parallel execution races against serial tools, and result formats drift between providers. One executor object owns all of this.
The six-phase pipeline¶
Every tool call passes through six phases in strict order:
| Phase | Purpose | Fails to |
|---|---|---|
| 1. Validate | Check inputs against input_schema |
ValidationError |
| 2. Semantic check | Domain-specific input rules (path traversal, size limits) | SemanticError |
| 3. Pre-hooks | Transform or log before execution | Hook error |
| 4. Permissions | Authorization gate (check_permissions) |
PermissionError |
| 5. Execute | Run the tool, serial or concurrent | ExecutionError |
| 6. Post-hooks | Auto-format output, record metrics | Hook error |
Phases 1--4 are fail-fast: any failure short-circuits the rest and returns an error result. Phase 5 is where concurrency decisions happen. Phase 6 runs even on failure (for logging).
ToolResult -- frozen dataclass¶
Every tool execution returns the same shape. Frozen so nothing downstream can mutate a result after the fact.
from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True)
class ToolResult:
"""Immutable result from any tool execution."""
tool_name: str
output: Any
error: str | None = None
duration_ms: float = 0.0
was_concurrent: bool = False
ToolExecutor -- the single execution path¶
import asyncio
import time
import jsonschema
from dataclasses import dataclass
from typing import Any, Callable, Awaitable
@dataclass(frozen=True)
class HookContext:
"""Read-only snapshot passed to pre/post hooks."""
tool_name: str
payload: dict[str, Any]
phase: str
# Type alias for hook callables
Hook = Callable[[HookContext], Awaitable[None]]
class ToolExecutor:
"""Six-phase pipeline that owns all tool execution."""
def __init__(
self,
registry: "ToolRegistry",
pre_hooks: list[Hook] | None = None,
post_hooks: list[Hook] | None = None,
on_progress: Callable[[str, float], None] | None = None,
) -> None:
self._registry = registry
self._pre_hooks = pre_hooks or []
self._post_hooks = post_hooks or []
self._on_progress = on_progress
async def run(
self,
name: str,
payload: dict[str, Any],
context: "ToolContext",
) -> ToolResult:
"""Execute one tool call through all six phases."""
start = time.monotonic()
tool = self._registry.get(name)
# Phase 1: Validate against JSON Schema
try:
jsonschema.validate(payload, tool.input_schema)
except jsonschema.ValidationError as exc:
return ToolResult(
tool_name=name,
output=None,
error=f"Validation failed: {exc.message}",
)
# Phase 2: Semantic check (tool-specific rules)
try:
await tool.semantic_check(payload, context)
except Exception as exc:
return ToolResult(
tool_name=name,
output=None,
error=f"Semantic check failed: {exc}",
)
# Phase 3: Pre-hooks
hook_ctx = HookContext(
tool_name=name, payload=payload, phase="pre"
)
for hook in self._pre_hooks:
await hook(hook_ctx)
# Phase 4: Permissions
try:
await tool.check_permissions(payload, context)
except PermissionError as exc:
return ToolResult(
tool_name=name,
output=None,
error=f"Permission denied: {exc}",
)
# Phase 5: Execute
try:
self._report_progress(name, 0.0)
result = await tool.execute(payload, context)
self._report_progress(name, 1.0)
except Exception as exc:
result = ToolResult(
tool_name=name,
output=None,
error=f"Execution failed: {exc}",
)
# Phase 6: Post-hooks (always run)
post_ctx = HookContext(
tool_name=name, payload=payload, phase="post"
)
for hook in self._post_hooks:
await hook(post_ctx)
elapsed = (time.monotonic() - start) * 1000
return ToolResult(
tool_name=name,
output=result.output,
error=result.error,
duration_ms=elapsed,
)
def _report_progress(self, name: str, fraction: float) -> None:
if self._on_progress is not None:
self._on_progress(name, fraction)
Concurrent vs serial execution¶
When the model emits multiple tool calls in one turn, the executor must decide: run them in parallel or serialize? The answer comes from is_concurrency_safe() on each tool.
async def run_batch(
executor: ToolExecutor,
calls: list[tuple[str, dict[str, Any]]],
context: "ToolContext",
registry: "ToolRegistry",
) -> list[ToolResult]:
"""Run a batch of tool calls, parallel when safe."""
parallel: list[tuple[str, dict[str, Any]]] = []
serial: list[tuple[str, dict[str, Any]]] = []
for name, payload in calls:
tool = registry.get(name)
if tool.is_concurrency_safe():
parallel.append((name, payload))
else:
serial.append((name, payload))
results: list[ToolResult] = []
# Parallel-safe tools run under a TaskGroup
if parallel:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(executor.run(name, payload, context))
for name, payload in parallel
]
results.extend(task.result() for task in tasks)
# Serial tools run one at a time, in order
for name, payload in serial:
result = await executor.run(name, payload, context)
results.append(result)
return results
asyncio.TaskGroup (Python 3.11+) is the right primitive here. If any parallel tool raises an unhandled exception, the group cancels siblings and propagates -- no orphaned coroutines.
Progress callbacks for long-running tools¶
Tools like bash or search can take seconds. The on_progress callback lets the UI layer show incremental feedback without coupling tool logic to display code.
def terminal_progress(tool_name: str, fraction: float) -> None:
"""Example progress callback for terminal UI."""
bar_width = 30
filled = int(bar_width * fraction)
bar = "#" * filled + "-" * (bar_width - filled)
print(f"\r {tool_name} [{bar}] {fraction:.0%}", end="", flush=True)
if fraction >= 1.0:
print()
executor = ToolExecutor(
registry=registry,
on_progress=terminal_progress,
)
For finer-grained progress inside a tool, the tool itself can accept the callback through the context and call it at intermediate steps.
Reinjection -- feeding results back to the model¶
After execution, every result must be sent back to the model in its provider-specific format. The two SDKs differ in shape but not in concept: the model needs the tool call ID paired with the output content.
Tool results go back as tool_result content blocks inside a user message. Each block references the tool_use_id from the model's response.
import anthropic
client = anthropic.Anthropic()
def build_tool_result_message(
tool_use_id: str,
result: ToolResult,
) -> dict:
"""Build an Anthropic tool_result content block."""
if result.error:
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"is_error": True,
"content": result.error,
}
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": str(result.output),
}
# Full tool loop
async def anthropic_tool_loop(
messages: list[dict],
registry: "ToolRegistry",
executor: ToolExecutor,
context: "ToolContext",
) -> str:
while True:
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
tools=registry.to_anthropic_tools(),
)
if response.stop_reason != "tool_use":
# Model is done calling tools
return response.content[0].text
# Collect tool_use blocks and execute them
tool_uses = [
b for b in response.content if b.type == "tool_use"
]
calls = [(b.name, b.input) for b in tool_uses]
results = await run_batch(
executor, calls, context, registry
)
# Build result blocks and continue
result_blocks = [
build_tool_result_message(tu.id, r)
for tu, r in zip(tool_uses, results)
]
messages.append(
{"role": "assistant", "content": response.content}
)
messages.append(
{"role": "user", "content": result_blocks}
)
Tool results go back as function_call_output items in the next input list. Each item references the call_id from the model's function_call output.
import json
from openai import OpenAI
client = OpenAI()
def build_function_output(
call_id: str,
result: ToolResult,
) -> dict:
"""Build an OpenAI function_call_output item."""
output = (
result.error
if result.error
else json.dumps(result.output)
)
return {
"type": "function_call_output",
"call_id": call_id,
"output": output,
}
# Full tool loop
async def openai_tool_loop(
input_items: list[dict],
registry: "ToolRegistry",
executor: ToolExecutor,
context: "ToolContext",
) -> str:
while True:
response = client.responses.create(
model="gpt-4.1",
input=input_items,
tools=registry.to_openai_tools(),
)
fn_calls = [
item for item in response.output
if item.type == "function_call"
]
if not fn_calls:
# Model is done calling tools
text_items = [
item for item in response.output
if item.type == "message"
]
return text_items[0].content[0].text
# Execute all function calls
calls = [
(fc.name, json.loads(fc.arguments))
for fc in fn_calls
]
results = await run_batch(
executor, calls, context, registry
)
# Reinject results and continue
input_items = [
*input_items,
*[item.model_dump() for item in response.output],
*[
build_function_output(fc.call_id, r)
for fc, r in zip(fn_calls, results)
],
]
Tool result format comparison¶
| Aspect | Anthropic | OpenAI |
|---|---|---|
| Container | tool_result content block in user message |
function_call_output item in input list |
| ID field | tool_use_id (matches tool_use block) |
call_id (matches function_call item) |
| Error flag | "is_error": True with error in content |
Error string in output (no dedicated flag) |
| Content type | String or list of content blocks | JSON string |
| Batching | Multiple tool_result blocks in one user message |
Multiple function_call_output items in input |
Build it yourself¶
- Define a frozen
ToolResultdataclass -- one shape for every tool, every provider. - Build a
ToolExecutorwith the six phases in strict order: validate, semantic check, pre-hooks, permissions, execute, post-hooks. - Split incoming tool call batches by
is_concurrency_safe()-- parallel-safe tools run underasyncio.TaskGroup, serial tools run sequentially. - Wire a progress callback into the executor so the UI can show feedback without coupling to tool internals.
- Write one
build_tool_result_message(Anthropic) and onebuild_function_output(OpenAI) function that mapsToolResultto the provider format. - Implement the tool loop: send messages, check for tool calls in the response, execute through the pipeline, reinject results, repeat until the model stops requesting tools.