Agent runtime
Runtime behaviour of the agent process. For primitive semantics and configuration, see processes.md. For the multiplexed I/O envelope format, see processes.md § Multiplexed I/O.
Environment variables
| Variable | Description |
|---|---|
PLUMB_PROVIDER |
Default LLM provider for agents without explicit provider in config. Accepted values: "anthropic", "openai". Set via --provider flag on chat/plumb. |
PLUMB_MODEL |
Default LLM model for agents without explicit model in config. Set via --model flag on chat/plumb. |
ANTHROPIC_API_KEY |
Required when provider = "anthropic". API key for the Anthropic Messages API. |
OPENAI_API_KEY |
Required when provider = "openai". API key for the OpenAI Chat Completions API. |
PIPELINE_DEBUG |
Set to 1 to enable debug logging. Per-message conversation transcript and API metadata are written as JSON Lines to stderr. Default: off (only warnings and errors). |
PLUMB_MCP_TOOLS |
Path to temp JSON file containing MCP tool schemas. Set by plumb runtime when MCP servers are configured. Internal — not user-facing. |
PLUMB_ENDPOINT_INPUT |
ZMQ URI for the data_in socket (e.g. tcp://127.0.0.1:5550). If set, the agent uses ZMQ transport mode instead of stdio. |
PLUMB_ENDPOINT_OUTPUT |
ZMQ URI for the data_out socket. Required when PLUMB_ENDPOINT_INPUT is set. |
PLUMB_ENDPOINT_CTRL_IN |
ZMQ URI for the ctrl_in socket. Optional; omit to disable control channel in ZMQ mode. |
PLUMB_ENDPOINT_CTRL_OUT |
ZMQ URI for the ctrl_out socket. Optional; omit to disable control output in ZMQ mode. |
PLUMB_ENDPOINT_TELEMETRY |
ZMQ URI for the telemetry socket. Optional; omit to suppress telemetry in ZMQ mode. |
PLUMB_ENDPOINT_TOOL_REQ |
ZMQ URI for the tool_req socket. Optional; required when MCP tools are used in ZMQ mode. |
PLUMB_ENDPOINT_TOOL_RESP |
ZMQ URI for the tool_resp socket. Optional; required when MCP tools are used in ZMQ mode. |
When PIPELINE_DEBUG=1, the agent logs each new message in the conversation (including retry messages that don't carry forward into history) and API call metadata:
echo '"hello"' | PIPELINE_DEBUG=1 agent spec.plumb
Each log line is a JSON object with "log" (level) and "event" (name) fields, distinct from both error output and thinking output. Example:
{"log":"debug","event":"message","role":"user","content":"\"hello\""}
{"log":"debug","event":"api_request","model":"claude-sonnet-4-5-20250514","max_tokens":8192,"thinking_budget":null,"message_count":1}
{"log":"debug","event":"message","role":"assistant","content":[{"type":"text","text":"{\"greeting\":\"hi\"}"}]}
{"log":"debug","event":"api_response","status":200,"content_blocks":1,"stop_reason":"end_turn","input_tokens":150,"output_tokens":42}
Retry messages are tagged with "retry":true so they can be distinguished from messages that persist in history.
The environment variable propagates through pipelines — all processes in a composed pipeline inherit it.
ZMQ transport mode
When PLUMB_ENDPOINT_INPUT is set, the agent switches from stdio to ZMQ
transport. All data, control, telemetry, and tool traffic flows through
dedicated ZMQ sockets instead of multiplexed stdin/stdout envelopes.
Socket summary
| Name | ZMQ type | Bind/connect | Envelope format | Payload format |
|---|---|---|---|---|
data_in |
PULL | agent connects → fabric PUSH | single-frame: 4-byte big-endian topic length + topic bytes + payload bytes | v2 traced (provenance + JSON) |
data_out |
PUSH | agent binds / fabric PULL connects | same envelope format | v2 traced |
ctrl_in |
PULL | agent connects → fabric PUSH | same envelope format | v2 traced or bare JSON (receiver tolerates both) |
ctrl_out |
PUSH | agent binds / fabric PULL connects | same envelope format | bare JSON |
telemetry |
PUSH | agent binds / fabric PULL connects | same envelope format | v2 traced |
tool_req |
PUSH | agent binds / fabric PULL connects | same envelope format | bare JSON |
tool_resp |
PULL | agent connects → fabric PUSH | same envelope format | bare JSON |
Data channels (data_in, data_out, telemetry) carry v2 traced
payloads as defined in protocols.md § v2 wire format.
Control and tool channels (ctrl_out, tool_req, tool_resp) carry
bare JSON. ctrl_in accepts both formats: the receiver decodes v2
traced payloads and extracts the application JSON, but also accepts
bare JSON for backward compatibility. This tolerance is necessary
because inline morphisms on the ctrl_in path always emit traced
payloads.
All agent sockets have ZMQ_SNDHWM = 0 (unlimited send queue). The
fabric is responsible for flow control at higher levels; back-pressure
within the ZMQ layer is not relied upon.
Endpoint environment variables
| Variable | Socket | Required |
|---|---|---|
PLUMB_ENDPOINT_INPUT |
data_in |
yes (triggers ZMQ mode) |
PLUMB_ENDPOINT_OUTPUT |
data_out |
yes |
PLUMB_ENDPOINT_CTRL_IN |
ctrl_in |
optional |
PLUMB_ENDPOINT_CTRL_OUT |
ctrl_out |
optional |
PLUMB_ENDPOINT_TELEMETRY |
telemetry |
optional |
PLUMB_ENDPOINT_TOOL_REQ |
tool_req |
optional |
PLUMB_ENDPOINT_TOOL_RESP |
tool_resp |
optional |
Detection logic: if PLUMB_ENDPOINT_INPUT is present in the environment,
the agent initialises a ZMQ context and opens all configured sockets.
Otherwise it falls back to stdio.
Ready handshake
After connecting and binding all sockets the agent sends a ready signal
on ctrl_out:
{"status":"ready"}
The fabric waits up to 30 seconds for this message before sending any data. The reaper fibre (which monitors for unexpected child exit) is started before the fabric enters its ready-wait, so a crash during socket setup is detected promptly.
ctrl_out PULL closure
The fabric creates a PULL socket to receive the agent's
{"status":"ready"} handshake on ctrl_out. If this socket was
synthesised by the handler (not from the routing table), it is
closed immediately after the ready handshake completes. This
prevents zmq_ctx_term from blocking on the open socket during
shutdown.
Shutdown protocol
- Fabric sends an EOF frame (empty payload) on
data_in. - Agent processes any remaining queued data messages.
- Agent sends an EOF frame on
data_out. - Agent handles any pending control messages on
ctrl_in. - Fabric sends
{"stop":true}followed by an EOF frame onctrl_in. - Agent sends EOF frames on
ctrl_out,telemetry,tool_req. - Agent closes all sockets, destroys the ZMQ context, and exits.
- Fabric detects EOF on its PULL sockets and reaps the child process.
Step 3 precedes step 4 deliberately. In topologies where ctrl_in
EOF depends on data_out EOF propagating through the graph (e.g. a
feedback cycle where data_out → ... → ctrl_in), the agent must
signal data_out EOF before blocking on the ctrl drain.
For simple agent types (!T → !T) with no control channel,
steps 4–5 are skipped.
Drain markers in feedback loops
Feedback cycles (e.g. the heat pipeline's advocate/skeptic loop) use merge nodes to combine the forward input with the feedback path. When the forward input reaches EOF, the merge must determine when the feedback path has quiesced — no more messages will arrive from the cycle.
Wire format: 0x01 + merge_name (UTF-8). Unambiguous: no JSON
payload starts with 0x01, and it is distinct from 0x02 (v2 trace)
and empty (EOF). Defined in lib/fabric/protocol.ml.
Zero-flow optimisation: if the merge forwarded zero messages before EOF, no data entered the cycle and the feedback input is force-closed immediately.
Drain protocol (≥1 message forwarded):
- Merge injects a drain marker (ID = its own name) on its output.
- The marker traverses the cycle: each morphism and agent forwards it transparently (before JSON parse, outside slot logic).
- The marker returns on the merge's feedback input (input 1).
- If no data messages arrived since the marker was sent: quiescent → force-close feedback input, merge exits.
- If data arrived: re-probe (send another marker, reset flag).
- Drain markers with non-matching IDs are silently ignored (they belong to another merge in the topology).
Convergence: at most k+1 marker rounds for k remaining feedback iterations after EOF. In practice, one round after the last feedback message.
Agent passthrough: the agent binary detects drain markers (0x01
prefix) before JSON parse and forwards them on data_out without
LLM processing — analogous to EOF detection.
Boundary strip: drain markers that leak to the output boundary
(e.g. via a copy that feeds both a feedback path and the output) are
silently discarded in lib/plumb/run.ml.
Structural vs feedback merges: only feedback merges (user-written
merge spawns at cycle boundaries) run the drain protocol. Structural
merges from n-ary fan-in desugaring use merge_all, which forwards
drain markers transparently and closes when both inputs reach EOF. The
desugarer detects cycles in the wiring graph: fan-in targets that
participate in a cycle get merge (with drain); acyclic fan-in gets
merge_all (no drain). This prevents orphan drain markers in
topologies where structural merges have no feedback return path.
ZMQ I/O model
ZMQ's signalling fd (ZMQ_FD, obtained via Zmq.Socket.get_fd) is
edge-triggered and unreliable when combined with Eio's
await_readable — particularly in connect-before-bind topology where
internal ZMQ state transitions (reconnect, connection establishment)
can occur while await_readable is already registered. The fd
transition is consumed by ZMQ internally and never reaches Eio,
causing recv to block indefinitely.
recv_raw and send_raw in
lib/transport/transport_zmq.ml
use Eio_unix.run_in_systhread with ZMQ's blocking recv/send
instead. The system thread blocks on ZMQ's own internal polling,
which handles edge cases correctly. Thread pool overhead is negligible
for LLM-speed workloads.
Socket closure during run_in_systhread raises
Unix.Unix_error(ENOTSOCK, ...) or Zmq.ZMQ_exception. Both are
caught and translated to Transport_error(Socket_closed, ...) so the
fabric's inline morphism loop terminates cleanly.
Extra boundary outputs (observe pass)
Run.run supports extra boundary output ports created by the observe
compiler pass (lib/language/observe.ml). When a binding's Plumb body
contains agent ports (e.g. telemetry) consumed only by empty spawns,
observe removes those empty spawns and merges the channels into a
synthetic boundary output port (e.g. "__telemetry").
After calling Run.run, callers access the extra output with:
Run.extra_output running "__telemetry" (* returns in_channel option *)
Run.extra_output_fd running "__telemetry" (* returns Unix.file_descr option for Unix.select *)
Wire format on extra output channels: each line is topic\tpayload\n —
a tab-separated pair of the ZMQ topic string and the JSON payload.
Agent names are identifiers (no tabs); JSON escapes any literal tab
characters in payload values. The topic carries the originating agent
name, allowing a single merged port to carry telemetry from multiple
agents.
observe is idempotent: a second application to the same port label
finds no further empty-consumed channels and returns the binding
unchanged.
Fabric shutdown
Run.run (lib/plumb/run.ml) is the
production entry point wrapping Fabric.execute. Callers see
in_channel/out_channel; ZMQ is internal. Non-obvious shutdown
details:
- In-switch
fab.cancel()(cooperative cancel). Daemon fibres blocked inrun_in_systhread(T.recv)prevent the switch from exiting. The main fibre callscancel()before the switch body returns, unblocking those fibres so the switch can exit cleanly. - Post-switch
release_resources(defensive cancel). After the Eio switch exits (or if an exception escapes it),release_resourcescallscancel()again beforeFabric.cleanupcallsT.destroy_context, which blocks on open sockets.cancelis idempotent — the second call is a no-op if sockets were already closed.release_resourcesalso closes the input pipe fd as a safety net. has_toolscheck. Usetool_bindings <> [](syntactic property of the spec), nottool_dispatch_fn <> None(runtime capability). An agent with no declared tools must not get tool dispatch fibres, regardless of whether a dispatch callback exists.morphisms_donebeforecancel(). Inline morphisms run as daemon fibres; their completion is tracked by a counter-backed promise. The main fibre awaits this promise afterdone_presolves (with a 2-second timeout) to ensure all morphism output is flushed before sockets are force-closed. If the timeout fires, it is logged as a warning and shutdown proceeds.
Testing ZMQ agents
ZMQ agent tests use sequential send-then-poll, not
Eio.Fiber.both. Unix.sleepf must never be called inside
Eio.Fiber.both — it blocks the entire Eio domain, preventing the
other fibre from running.
The recommended pattern:
T.sendon PUSH (effectively non-blocking withrun_in_systhread)Bridge.send_eofto signal data EOFUnix.sleepffor agent processing time (the agent is a separate process — sleeping the test's OS thread is safe)- Poll with
T.try_recvin a deadline loop
See the zmq-agent test group in
test/test_fabric.ml.
Telemetry
In ZMQ mode, telemetry events are sent as frames on the dedicated
telemetry PUSH socket rather than as __port: "telemetry" envelopes on
stdout. The event payload format is identical in both modes.
In stdio mode, the agent writes structured telemetry as __port: "telemetry" envelopes
on stdout (see processes.md § Multiplexed I/O).
Every telemetry event has a kind field that identifies the event type.
The event kinds are:
| Kind | Description | Key fields |
|---|---|---|
config |
Agent configuration at startup | model, provider, max_tokens, thinking_budget, tools |
usage |
Token usage per API call | prompt_tokens, completion_tokens, cache_read_tokens, cache_creation_tokens |
thinking |
Extended thinking block | thinking, signature |
tool_call |
Tool invocation summary | id, name, arguments, summary, is_error |
pin_state |
Absolute pin state after pin/unpin | action, count, keys |
output |
Validated output value | content |
Example events:
{"kind":"config","model":"claude-haiku-4-5-20251001","provider":"anthropic","max_tokens":8192}
{"kind":"usage","prompt_tokens":150,"completion_tokens":42}
{"kind":"thinking","thinking":"Let me consider...","signature":"abc123"}
{"kind":"tool_call","id":"toolu_01abc","name":"doc_read","arguments":"{\"path\":\"notes.md\"}","summary":"ok","is_error":false}
{"kind":"pin_state","action":"pin","count":2,"keys":["doc_read({\"path\":\"notes.md\"})","search({\"query\":\"cats\"})"]}
{"kind":"output","content":{"greeting":"hello"}}
The type → kind rewrite
The LLM API returns thinking blocks with a "type": "thinking" field.
Since type is a reserved keyword in the plumbing language (used for
type declarations), it cannot appear in filter expressions like
filter(type = "thinking"). The agent binary rewrites "type" to
"kind" on all thinking blocks before emitting them as telemetry, so
filters and observers can reference the field:
agent.telemetry ; filter(kind = "thinking") ; observer
All other telemetry events (usage, config, tool_call, output)
are constructed by the agent binary and use "kind" natively — no
rewriting needed.
Wiring telemetry
In the plumbing language, telemetry maps to the agent's .telemetry
port. By default this port is implicitly discarded (; discard). To
observe an agent's reasoning, wire the telemetry stream to an
observer:
composer.telemetry ; angel ; mux(source: angel) ; composer
Example config with thinking:
summarizer : Article → Summary = agent {
model: "claude-sonnet-4-5-20250514"
prompt: "Summarize the article."
max_tokens: 16384
thinking_budget: 10000
}
Thinking blocks are preserved in conversation history for multi-turn
consistency. Each thinking block includes a signature field required
by the API for multi-turn pass-through.
stderr
Stderr (fd 2) carries debug logs and errors, distinguished by format:
- Debug logs:
{"log": "debug", "event": "...", ...}— conversation messages, API metadata. Only emitted whenPIPELINE_DEBUG=1. - Errors:
{"error": "...", "code": "...", ...}— structured error output. Always emitted.
Thinking blocks go to the telemetry port, not stderr. Debug logs and errors go to stderr.
Control channel
Agents with product input types receive a control stream on a second input channel:
type Control = { set_temp: float } | { set_model: string } | { stop: bool }
let composer : (!Draft, !Control) → !Verdict = agent {
prompt: "Review the draft."
}
In stdio mode, the control stream arrives as __port: "ctrl_in" envelopes on stdin. A background thread reads from the internal ctrl pipe (fed by the stdin demux) and updates the agent's mutable state under a mutex. The agent writes responses (memory dumps, acknowledgements) as __port: "ctrl_out" envelopes on stdout.
In ZMQ mode, control messages arrive on the dedicated ctrl_in PULL socket instead of muxed stdin envelopes. The control reader runs as an Eio fibre rather than a thread; responses are sent on the ctrl_out PUSH socket.
| Field | Type | Effect |
|---|---|---|
set_temp |
float | null | Set or clear temperature override |
set_model |
string | null | Set or clear model override |
stop |
bool | Signal clean exit after current message |
pause |
bool | Stop reading data from stdin; control and telemetry stay live |
resume |
bool | Resume reading data from stdin |
get_memory |
bool | Emit conversation history on ctrl_out (kind: memory). Optional format field: "openai" translates messages to OpenAI Chat Completions format; default is Anthropic-style content blocks. |
format |
string | Optional alongside get_memory. "openai" → OpenAI format; omit or any other value → Anthropic format. |
set_memory |
array | Replace conversation history; emit ack on ctrl_out (kind: memory_set) |
Sending null clears an override, reverting to the spec default:
{"set_temp": 0.9}
{"set_temp": null}
{"set_model": "claude-opus-4-20250514"}
{"set_model": null}
{"stop": true}
{"pause": true}
{"get_memory": true}
{"get_memory": true, "format": "openai"}
{"set_memory": [{"role":"user","content":"\"hello\""},{"role":"assistant","content":"\"world\""}]}
{"resume": true}
Unknown fields are silently ignored (forward compatibility).
Control output (fd 7)
The agent writes JSON Lines to fd 7 in response to memory operations:
get_memory response:
{"kind":"memory","messages":[{"role":"user","content":"\"hello\""},{"role":"assistant","content":"{\"greeting\":\"hi\"}"}],"pinned":[]}
The pinned array contains tool results the agent has marked for
preservation across compaction cycles. Each entry has tool,
arguments, and content fields. See Document pinning.
set_memory acknowledgement:
{"kind":"memory_set","old_messages":42,"new_messages":2}
pause acknowledgement:
{"kind":"pause_ack"}
resume acknowledgement:
{"kind":"resume_ack"}
Note: pause_ack and resume_ack are emitted on fd 7 (ctrl_out), which flows through the session tagger if a <-> wire is active. The tagger wraps all ctrl_out messages in {__step: N, payload: msg} envelopes. For linear protocols (terminated by end), the protocol structure provides one-shot trigger semantics — no additional gating is needed.
When changes take effect
Model and temperature changes take effect on the next API call — not just the next input message. The conversation loop reads the current values via closures on every iteration, including retries and tool-use rounds within a single input. No HTTP connection restart is needed: model and temperature are per-request body parameters, not connection-level config.
The stop signal is checked between input messages. It does not interrupt a running API call.
Pause and memory protocol
When paused, the agent stops reading from stdin but continues handling control messages. Memory requests (get_memory, set_memory) are processed at message boundaries — between input lines when not paused, or immediately when paused.
The set_memory operation replaces conversation history. The system prompt is never affected. If any message in the set_memory array lacks role or content, the entire payload is rejected with a warning.
When pins are active, set_memory automatically prepends a user message containing all pinned tool results in <pinned-documents> blocks. This happens transparently — the compactor does not need to preserve pinned content in its output.
Protocol constraint: memory_request is a single slot. The controller must wait for a response on fd 7 before sending the next memory request. If two requests arrive before the main loop handles either, the first is silently overwritten.
System prompt construction
The system prompt is assembled from the agent's config:
- Inline prompt (
prompt: "...") — added first as a plain text block. - Prompt files (
prompts: ["system.md", "./role.md"]) — each file is read at startup and wrapped in document tags:
<doc id="system.md">
file contents here
</doc>
File paths are resolved by the executor before the agent starts:
- Bare name (no
/) — searched via the resource path (PLUMB_RESOURCES+ installed resource directories). Use for shared files:"system.md","grammar.peg". -
Path (contains
/) — resolved relative to the.plumbfile that defines the agent. Use for local files:"./composer.md","./heat.plumb". -
Output instruction — appended automatically. Describes the expected JSON output type and instructs the model to respond with only the JSON value.
Each part becomes a separate system block. The output instruction has cache_break: true for prompt caching efficiency.
Streaming
API responses are received via Server-Sent Events (SSE). This prevents read timeouts during extended thinking and avoids buffering large responses in memory. Streaming is an internal transport detail — the process interface remains synchronous JSON Lines.
In ZMQ mode, stdin and stdout are not used for process I/O. All data flows through ZMQ sockets as described in ZMQ transport mode.
Document pinning
Agents can mark tool results as important so they survive context
compaction. The agent calls the synthetic pin tool with the name and
arguments of a previous tool call; the result is stored in a session-local
pin set. On the next compaction cycle, pinned content is excluded from
the compressible conversation, provided to the compactor as read-only
reference, and reinjected at the top of the post-compaction history.
Pin and unpin tools
The agent binary injects two synthetic tools into every agent's tool list:
-
pin{tool: string, arguments: object}— pin the result of a previous tool call. If the pin limit (20) is reached, the oldest pin is evicted (FIFO). Idempotent — pinning an already-pinned call is a no-op. -
unpin{tool: string, arguments: object}— remove a pin, allowing the result to be compacted normally. Idempotent.
Canonical keys
Pins are identified by a canonical form of (tool_name,
sorted_arguments_json). Object keys are recursively sorted;
integer-valued floats are normalised to integers. This means
{"b":1,"a":2} and {"a":2,"b":1} produce the same pin key.
Cache-hit interception
When the agent calls a tool whose canonical key matches an active pin, the request is intercepted. Instead of dispatching to the tool, the agent returns a note telling the LLM where to find the pinned content:
{"pinned":true,"note":"This tool result is already pinned in your context. Look for <pinned key=\"fetch({\\\"path\\\":\\\"foo.md\\\"})\"> in the pinned-documents block at the top of your conversation."}
Reinjection format
After compaction, pinned content is prepended to the new history as a single user message:
<pinned-documents>
The following tool results are pinned in your context. They survive
compaction and are always available. Do not re-fetch them.
<pinned key="fetch({"path":"foo.md"})">
file contents here
</pinned>
</pinned-documents>
Each <pinned> element carries a key attribute — the canonical tool
call form — so the agent can cross-reference it when told a result is
already in context.
Persistence
Pins are session-local. They survive compaction but not process restart.
Runtime context
When runtime_context: true is set in the agent config, the agent
appends a dynamic system block before each API call:
[Runtime context]
Time: 2026-03-08T14:23:17Z
Session input tokens: 82341
Session output tokens: 12045
API calls: 23
Max output tokens per turn: 8192
History messages: 47
Pinned documents: 3
This is not cached (no cache_break) — it changes on every call.
Token counts are cumulative for the session. The agent uses this to
plan: how much context budget remains, whether to be concise, when to
summarise.
Context window
Unbounded accumulation will eventually exceed the model's context window. The compaction protocol provides automated context management via pause/get_memory/set_memory/resume. Document pinning ensures critical tool results survive compaction. Runtime context injection gives the agent visibility into its own resource consumption.