Agent runtime

Runtime behaviour of the agent process. For primitive semantics and configuration, see processes.md. For the multiplexed I/O envelope format, see processes.md § Multiplexed I/O.

Environment variables

Variable Description
PLUMB_PROVIDER Default LLM provider for agents without explicit provider in config. Accepted values: "anthropic", "openai". Set via --provider flag on chat/plumb.
PLUMB_MODEL Default LLM model for agents without explicit model in config. Set via --model flag on chat/plumb.
ANTHROPIC_API_KEY Required when provider = "anthropic". API key for the Anthropic Messages API.
OPENAI_API_KEY Required when provider = "openai". API key for the OpenAI Chat Completions API.
PIPELINE_DEBUG Set to 1 to enable debug logging. Per-message conversation transcript and API metadata are written as JSON Lines to stderr. Default: off (only warnings and errors).
PLUMB_MCP_TOOLS Path to temp JSON file containing MCP tool schemas. Set by plumb runtime when MCP servers are configured. Internal — not user-facing.
PLUMB_ENDPOINT_INPUT ZMQ URI for the data_in socket (e.g. tcp://127.0.0.1:5550). If set, the agent uses ZMQ transport mode instead of stdio.
PLUMB_ENDPOINT_OUTPUT ZMQ URI for the data_out socket. Required when PLUMB_ENDPOINT_INPUT is set.
PLUMB_ENDPOINT_CTRL_IN ZMQ URI for the ctrl_in socket. Optional; omit to disable control channel in ZMQ mode.
PLUMB_ENDPOINT_CTRL_OUT ZMQ URI for the ctrl_out socket. Optional; omit to disable control output in ZMQ mode.
PLUMB_ENDPOINT_TELEMETRY ZMQ URI for the telemetry socket. Optional; omit to suppress telemetry in ZMQ mode.
PLUMB_ENDPOINT_TOOL_REQ ZMQ URI for the tool_req socket. Optional; required when MCP tools are used in ZMQ mode.
PLUMB_ENDPOINT_TOOL_RESP ZMQ URI for the tool_resp socket. Optional; required when MCP tools are used in ZMQ mode.

When PIPELINE_DEBUG=1, the agent logs each new message in the conversation (including retry messages that don't carry forward into history) and API call metadata:

echo '"hello"' | PIPELINE_DEBUG=1 agent spec.plumb

Each log line is a JSON object with "log" (level) and "event" (name) fields, distinct from both error output and thinking output. Example:

{"log":"debug","event":"message","role":"user","content":"\"hello\""}
{"log":"debug","event":"api_request","model":"claude-sonnet-4-5-20250514","max_tokens":8192,"thinking_budget":null,"message_count":1}
{"log":"debug","event":"message","role":"assistant","content":[{"type":"text","text":"{\"greeting\":\"hi\"}"}]}
{"log":"debug","event":"api_response","status":200,"content_blocks":1,"stop_reason":"end_turn","input_tokens":150,"output_tokens":42}

Retry messages are tagged with "retry":true so they can be distinguished from messages that persist in history.

The environment variable propagates through pipelines — all processes in a composed pipeline inherit it.

ZMQ transport mode

When PLUMB_ENDPOINT_INPUT is set, the agent switches from stdio to ZMQ transport. All data, control, telemetry, and tool traffic flows through dedicated ZMQ sockets instead of multiplexed stdin/stdout envelopes.

Socket summary

Name ZMQ type Bind/connect Envelope format Payload format
data_in PULL agent connects → fabric PUSH single-frame: 4-byte big-endian topic length + topic bytes + payload bytes v2 traced (provenance + JSON)
data_out PUSH agent binds / fabric PULL connects same envelope format v2 traced
ctrl_in PULL agent connects → fabric PUSH same envelope format v2 traced or bare JSON (receiver tolerates both)
ctrl_out PUSH agent binds / fabric PULL connects same envelope format bare JSON
telemetry PUSH agent binds / fabric PULL connects same envelope format v2 traced
tool_req PUSH agent binds / fabric PULL connects same envelope format bare JSON
tool_resp PULL agent connects → fabric PUSH same envelope format bare JSON

Data channels (data_in, data_out, telemetry) carry v2 traced payloads as defined in protocols.md § v2 wire format. Control and tool channels (ctrl_out, tool_req, tool_resp) carry bare JSON. ctrl_in accepts both formats: the receiver decodes v2 traced payloads and extracts the application JSON, but also accepts bare JSON for backward compatibility. This tolerance is necessary because inline morphisms on the ctrl_in path always emit traced payloads.

All agent sockets have ZMQ_SNDHWM = 0 (unlimited send queue). The fabric is responsible for flow control at higher levels; back-pressure within the ZMQ layer is not relied upon.

Endpoint environment variables

Variable Socket Required
PLUMB_ENDPOINT_INPUT data_in yes (triggers ZMQ mode)
PLUMB_ENDPOINT_OUTPUT data_out yes
PLUMB_ENDPOINT_CTRL_IN ctrl_in optional
PLUMB_ENDPOINT_CTRL_OUT ctrl_out optional
PLUMB_ENDPOINT_TELEMETRY telemetry optional
PLUMB_ENDPOINT_TOOL_REQ tool_req optional
PLUMB_ENDPOINT_TOOL_RESP tool_resp optional

Detection logic: if PLUMB_ENDPOINT_INPUT is present in the environment, the agent initialises a ZMQ context and opens all configured sockets. Otherwise it falls back to stdio.

Ready handshake

After connecting and binding all sockets the agent sends a ready signal on ctrl_out:

{"status":"ready"}

The fabric waits up to 30 seconds for this message before sending any data. The reaper fibre (which monitors for unexpected child exit) is started before the fabric enters its ready-wait, so a crash during socket setup is detected promptly.

ctrl_out PULL closure

The fabric creates a PULL socket to receive the agent's {"status":"ready"} handshake on ctrl_out. If this socket was synthesised by the handler (not from the routing table), it is closed immediately after the ready handshake completes. This prevents zmq_ctx_term from blocking on the open socket during shutdown.

Shutdown protocol

  1. Fabric sends an EOF frame (empty payload) on data_in.
  2. Agent processes any remaining queued data messages.
  3. Agent sends an EOF frame on data_out.
  4. Agent handles any pending control messages on ctrl_in.
  5. Fabric sends {"stop":true} followed by an EOF frame on ctrl_in.
  6. Agent sends EOF frames on ctrl_out, telemetry, tool_req.
  7. Agent closes all sockets, destroys the ZMQ context, and exits.
  8. Fabric detects EOF on its PULL sockets and reaps the child process.

Step 3 precedes step 4 deliberately. In topologies where ctrl_in EOF depends on data_out EOF propagating through the graph (e.g. a feedback cycle where data_out → ... → ctrl_in), the agent must signal data_out EOF before blocking on the ctrl drain.

For simple agent types (!T → !T) with no control channel, steps 4–5 are skipped.

Drain markers in feedback loops

Feedback cycles (e.g. the heat pipeline's advocate/skeptic loop) use merge nodes to combine the forward input with the feedback path. When the forward input reaches EOF, the merge must determine when the feedback path has quiesced — no more messages will arrive from the cycle.

Wire format: 0x01 + merge_name (UTF-8). Unambiguous: no JSON payload starts with 0x01, and it is distinct from 0x02 (v2 trace) and empty (EOF). Defined in lib/fabric/protocol.ml.

Zero-flow optimisation: if the merge forwarded zero messages before EOF, no data entered the cycle and the feedback input is force-closed immediately.

Drain protocol (≥1 message forwarded):

  1. Merge injects a drain marker (ID = its own name) on its output.
  2. The marker traverses the cycle: each morphism and agent forwards it transparently (before JSON parse, outside slot logic).
  3. The marker returns on the merge's feedback input (input 1).
  4. If no data messages arrived since the marker was sent: quiescent → force-close feedback input, merge exits.
  5. If data arrived: re-probe (send another marker, reset flag).
  6. Drain markers with non-matching IDs are silently ignored (they belong to another merge in the topology).

Convergence: at most k+1 marker rounds for k remaining feedback iterations after EOF. In practice, one round after the last feedback message.

Agent passthrough: the agent binary detects drain markers (0x01 prefix) before JSON parse and forwards them on data_out without LLM processing — analogous to EOF detection.

Boundary strip: drain markers that leak to the output boundary (e.g. via a copy that feeds both a feedback path and the output) are silently discarded in lib/plumb/run.ml.

Structural vs feedback merges: only feedback merges (user-written merge spawns at cycle boundaries) run the drain protocol. Structural merges from n-ary fan-in desugaring use merge_all, which forwards drain markers transparently and closes when both inputs reach EOF. The desugarer detects cycles in the wiring graph: fan-in targets that participate in a cycle get merge (with drain); acyclic fan-in gets merge_all (no drain). This prevents orphan drain markers in topologies where structural merges have no feedback return path.

ZMQ I/O model

ZMQ's signalling fd (ZMQ_FD, obtained via Zmq.Socket.get_fd) is edge-triggered and unreliable when combined with Eio's await_readable — particularly in connect-before-bind topology where internal ZMQ state transitions (reconnect, connection establishment) can occur while await_readable is already registered. The fd transition is consumed by ZMQ internally and never reaches Eio, causing recv to block indefinitely.

recv_raw and send_raw in lib/transport/transport_zmq.ml use Eio_unix.run_in_systhread with ZMQ's blocking recv/send instead. The system thread blocks on ZMQ's own internal polling, which handles edge cases correctly. Thread pool overhead is negligible for LLM-speed workloads.

Socket closure during run_in_systhread raises Unix.Unix_error(ENOTSOCK, ...) or Zmq.ZMQ_exception. Both are caught and translated to Transport_error(Socket_closed, ...) so the fabric's inline morphism loop terminates cleanly.

Extra boundary outputs (observe pass)

Run.run supports extra boundary output ports created by the observe compiler pass (lib/language/observe.ml). When a binding's Plumb body contains agent ports (e.g. telemetry) consumed only by empty spawns, observe removes those empty spawns and merges the channels into a synthetic boundary output port (e.g. "__telemetry").

After calling Run.run, callers access the extra output with:

Run.extra_output    running "__telemetry"  (* returns in_channel option *)
Run.extra_output_fd running "__telemetry"  (* returns Unix.file_descr option for Unix.select *)

Wire format on extra output channels: each line is topic\tpayload\n — a tab-separated pair of the ZMQ topic string and the JSON payload. Agent names are identifiers (no tabs); JSON escapes any literal tab characters in payload values. The topic carries the originating agent name, allowing a single merged port to carry telemetry from multiple agents.

observe is idempotent: a second application to the same port label finds no further empty-consumed channels and returns the binding unchanged.

Fabric shutdown

Run.run (lib/plumb/run.ml) is the production entry point wrapping Fabric.execute. Callers see in_channel/out_channel; ZMQ is internal. Non-obvious shutdown details:

  1. In-switch fab.cancel() (cooperative cancel). Daemon fibres blocked in run_in_systhread(T.recv) prevent the switch from exiting. The main fibre calls cancel() before the switch body returns, unblocking those fibres so the switch can exit cleanly.
  2. Post-switch release_resources (defensive cancel). After the Eio switch exits (or if an exception escapes it), release_resources calls cancel() again before Fabric.cleanup calls T.destroy_context, which blocks on open sockets. cancel is idempotent — the second call is a no-op if sockets were already closed. release_resources also closes the input pipe fd as a safety net.
  3. has_tools check. Use tool_bindings <> [] (syntactic property of the spec), not tool_dispatch_fn <> None (runtime capability). An agent with no declared tools must not get tool dispatch fibres, regardless of whether a dispatch callback exists.
  4. morphisms_done before cancel(). Inline morphisms run as daemon fibres; their completion is tracked by a counter-backed promise. The main fibre awaits this promise after done_p resolves (with a 2-second timeout) to ensure all morphism output is flushed before sockets are force-closed. If the timeout fires, it is logged as a warning and shutdown proceeds.

Testing ZMQ agents

ZMQ agent tests use sequential send-then-poll, not Eio.Fiber.both. Unix.sleepf must never be called inside Eio.Fiber.both — it blocks the entire Eio domain, preventing the other fibre from running.

The recommended pattern:

  1. T.send on PUSH (effectively non-blocking with run_in_systhread)
  2. Bridge.send_eof to signal data EOF
  3. Unix.sleepf for agent processing time (the agent is a separate process — sleeping the test's OS thread is safe)
  4. Poll with T.try_recv in a deadline loop

See the zmq-agent test group in test/test_fabric.ml.

Telemetry

In ZMQ mode, telemetry events are sent as frames on the dedicated telemetry PUSH socket rather than as __port: "telemetry" envelopes on stdout. The event payload format is identical in both modes.

In stdio mode, the agent writes structured telemetry as __port: "telemetry" envelopes on stdout (see processes.md § Multiplexed I/O). Every telemetry event has a kind field that identifies the event type. The event kinds are:

Kind Description Key fields
config Agent configuration at startup model, provider, max_tokens, thinking_budget, tools
usage Token usage per API call prompt_tokens, completion_tokens, cache_read_tokens, cache_creation_tokens
thinking Extended thinking block thinking, signature
tool_call Tool invocation summary id, name, arguments, summary, is_error
pin_state Absolute pin state after pin/unpin action, count, keys
output Validated output value content

Example events:

{"kind":"config","model":"claude-haiku-4-5-20251001","provider":"anthropic","max_tokens":8192}
{"kind":"usage","prompt_tokens":150,"completion_tokens":42}
{"kind":"thinking","thinking":"Let me consider...","signature":"abc123"}
{"kind":"tool_call","id":"toolu_01abc","name":"doc_read","arguments":"{\"path\":\"notes.md\"}","summary":"ok","is_error":false}
{"kind":"pin_state","action":"pin","count":2,"keys":["doc_read({\"path\":\"notes.md\"})","search({\"query\":\"cats\"})"]}
{"kind":"output","content":{"greeting":"hello"}}

The typekind rewrite

The LLM API returns thinking blocks with a "type": "thinking" field. Since type is a reserved keyword in the plumbing language (used for type declarations), it cannot appear in filter expressions like filter(type = "thinking"). The agent binary rewrites "type" to "kind" on all thinking blocks before emitting them as telemetry, so filters and observers can reference the field:

agent.telemetry ; filter(kind = "thinking") ; observer

All other telemetry events (usage, config, tool_call, output) are constructed by the agent binary and use "kind" natively — no rewriting needed.

Wiring telemetry

In the plumbing language, telemetry maps to the agent's .telemetry port. By default this port is implicitly discarded (; discard). To observe an agent's reasoning, wire the telemetry stream to an observer:

composer.telemetry ; angel ; mux(source: angel) ; composer

Example config with thinking:

summarizer : Article → Summary = agent {
  model: "claude-sonnet-4-5-20250514"
  prompt: "Summarize the article."
  max_tokens: 16384
  thinking_budget: 10000
}

Thinking blocks are preserved in conversation history for multi-turn consistency. Each thinking block includes a signature field required by the API for multi-turn pass-through.

stderr

Stderr (fd 2) carries debug logs and errors, distinguished by format:

  • Debug logs: {"log": "debug", "event": "...", ...} — conversation messages, API metadata. Only emitted when PIPELINE_DEBUG=1.
  • Errors: {"error": "...", "code": "...", ...} — structured error output. Always emitted.

Thinking blocks go to the telemetry port, not stderr. Debug logs and errors go to stderr.

Control channel

Agents with product input types receive a control stream on a second input channel:

type Control = { set_temp: float } | { set_model: string } | { stop: bool }

let composer : (!Draft, !Control) → !Verdict = agent {
  prompt: "Review the draft."
}

In stdio mode, the control stream arrives as __port: "ctrl_in" envelopes on stdin. A background thread reads from the internal ctrl pipe (fed by the stdin demux) and updates the agent's mutable state under a mutex. The agent writes responses (memory dumps, acknowledgements) as __port: "ctrl_out" envelopes on stdout.

In ZMQ mode, control messages arrive on the dedicated ctrl_in PULL socket instead of muxed stdin envelopes. The control reader runs as an Eio fibre rather than a thread; responses are sent on the ctrl_out PUSH socket.

Field Type Effect
set_temp float | null Set or clear temperature override
set_model string | null Set or clear model override
stop bool Signal clean exit after current message
pause bool Stop reading data from stdin; control and telemetry stay live
resume bool Resume reading data from stdin
get_memory bool Emit conversation history on ctrl_out (kind: memory). Optional format field: "openai" translates messages to OpenAI Chat Completions format; default is Anthropic-style content blocks.
format string Optional alongside get_memory. "openai" → OpenAI format; omit or any other value → Anthropic format.
set_memory array Replace conversation history; emit ack on ctrl_out (kind: memory_set)

Sending null clears an override, reverting to the spec default:

{"set_temp": 0.9}
{"set_temp": null}
{"set_model": "claude-opus-4-20250514"}
{"set_model": null}
{"stop": true}
{"pause": true}
{"get_memory": true}
{"get_memory": true, "format": "openai"}
{"set_memory": [{"role":"user","content":"\"hello\""},{"role":"assistant","content":"\"world\""}]}
{"resume": true}

Unknown fields are silently ignored (forward compatibility).

Control output (fd 7)

The agent writes JSON Lines to fd 7 in response to memory operations:

get_memory response:

{"kind":"memory","messages":[{"role":"user","content":"\"hello\""},{"role":"assistant","content":"{\"greeting\":\"hi\"}"}],"pinned":[]}

The pinned array contains tool results the agent has marked for preservation across compaction cycles. Each entry has tool, arguments, and content fields. See Document pinning.

set_memory acknowledgement:

{"kind":"memory_set","old_messages":42,"new_messages":2}

pause acknowledgement:

{"kind":"pause_ack"}

resume acknowledgement:

{"kind":"resume_ack"}

Note: pause_ack and resume_ack are emitted on fd 7 (ctrl_out), which flows through the session tagger if a <-> wire is active. The tagger wraps all ctrl_out messages in {__step: N, payload: msg} envelopes. For linear protocols (terminated by end), the protocol structure provides one-shot trigger semantics — no additional gating is needed.

When changes take effect

Model and temperature changes take effect on the next API call — not just the next input message. The conversation loop reads the current values via closures on every iteration, including retries and tool-use rounds within a single input. No HTTP connection restart is needed: model and temperature are per-request body parameters, not connection-level config.

The stop signal is checked between input messages. It does not interrupt a running API call.

Pause and memory protocol

When paused, the agent stops reading from stdin but continues handling control messages. Memory requests (get_memory, set_memory) are processed at message boundaries — between input lines when not paused, or immediately when paused.

The set_memory operation replaces conversation history. The system prompt is never affected. If any message in the set_memory array lacks role or content, the entire payload is rejected with a warning.

When pins are active, set_memory automatically prepends a user message containing all pinned tool results in <pinned-documents> blocks. This happens transparently — the compactor does not need to preserve pinned content in its output.

Protocol constraint: memory_request is a single slot. The controller must wait for a response on fd 7 before sending the next memory request. If two requests arrive before the main loop handles either, the first is silently overwritten.

System prompt construction

The system prompt is assembled from the agent's config:

  1. Inline prompt (prompt: "...") — added first as a plain text block.
  2. Prompt files (prompts: ["system.md", "./role.md"]) — each file is read at startup and wrapped in document tags:

<doc id="system.md"> file contents here </doc>

File paths are resolved by the executor before the agent starts:

  • Bare name (no /) — searched via the resource path (PLUMB_RESOURCES + installed resource directories). Use for shared files: "system.md", "grammar.peg".
  • Path (contains /) — resolved relative to the .plumb file that defines the agent. Use for local files: "./composer.md", "./heat.plumb".

  • Output instruction — appended automatically. Describes the expected JSON output type and instructs the model to respond with only the JSON value.

Each part becomes a separate system block. The output instruction has cache_break: true for prompt caching efficiency.

Streaming

API responses are received via Server-Sent Events (SSE). This prevents read timeouts during extended thinking and avoids buffering large responses in memory. Streaming is an internal transport detail — the process interface remains synchronous JSON Lines.

In ZMQ mode, stdin and stdout are not used for process I/O. All data flows through ZMQ sockets as described in ZMQ transport mode.

Document pinning

Agents can mark tool results as important so they survive context compaction. The agent calls the synthetic pin tool with the name and arguments of a previous tool call; the result is stored in a session-local pin set. On the next compaction cycle, pinned content is excluded from the compressible conversation, provided to the compactor as read-only reference, and reinjected at the top of the post-compaction history.

Pin and unpin tools

The agent binary injects two synthetic tools into every agent's tool list:

  • pin {tool: string, arguments: object} — pin the result of a previous tool call. If the pin limit (20) is reached, the oldest pin is evicted (FIFO). Idempotent — pinning an already-pinned call is a no-op.

  • unpin {tool: string, arguments: object} — remove a pin, allowing the result to be compacted normally. Idempotent.

Canonical keys

Pins are identified by a canonical form of (tool_name, sorted_arguments_json). Object keys are recursively sorted; integer-valued floats are normalised to integers. This means {"b":1,"a":2} and {"a":2,"b":1} produce the same pin key.

Cache-hit interception

When the agent calls a tool whose canonical key matches an active pin, the request is intercepted. Instead of dispatching to the tool, the agent returns a note telling the LLM where to find the pinned content:

{"pinned":true,"note":"This tool result is already pinned in your context. Look for <pinned key=\"fetch({\\\"path\\\":\\\"foo.md\\\"})\"> in the pinned-documents block at the top of your conversation."}

Reinjection format

After compaction, pinned content is prepended to the new history as a single user message:

<pinned-documents>
The following tool results are pinned in your context. They survive
compaction and are always available. Do not re-fetch them.

<pinned key="fetch({&quot;path&quot;:&quot;foo.md&quot;})">
file contents here
</pinned>
</pinned-documents>

Each <pinned> element carries a key attribute — the canonical tool call form — so the agent can cross-reference it when told a result is already in context.

Persistence

Pins are session-local. They survive compaction but not process restart.

Runtime context

When runtime_context: true is set in the agent config, the agent appends a dynamic system block before each API call:

[Runtime context]
Time: 2026-03-08T14:23:17Z
Session input tokens: 82341
Session output tokens: 12045
API calls: 23
Max output tokens per turn: 8192
History messages: 47
Pinned documents: 3

This is not cached (no cache_break) — it changes on every call. Token counts are cumulative for the session. The agent uses this to plan: how much context budget remains, whether to be concise, when to summarise.

Context window

Unbounded accumulation will eventually exceed the model's context window. The compaction protocol provides automated context management via pause/get_memory/set_memory/resume. Document pinning ensures critical tool results survive compaction. Runtime context injection gives the agent visibility into its own resource consumption.