Processes
Processes are the executable units of the plumbing language. Each process is a native binary that reads a .plumb spec file, validates I/O against the declared types, and communicates via JSON Lines on stdin/stdout.
See tools.md for tool semantics and the dispatch protocol. See agent-runtime.md for environment variables, control channel, and telemetry.
Stream transducers
A process with type A → B consumes a stream of A-typed messages and produces a stream of B-typed messages. The type signature governs the per-message schema, not the inter-message dependency.
An agent is a stateful stream transducer: the n-th output may depend on all prior inputs, not just the n-th. Conversation context accumulates across the input stream by default. This is essential for pipelines with review loops — a composer receiving a revision request must remember the original task.
The structural processes (id, copy, merge, discard, empty, barrier) are stateless.
Algebraic structure
The category is a copy-discard (CD) category, not a Markov category. Every object is equipped with a commutative comonoid structure (copy and discard) compatible with tensor products. Discard is not natural over our morphisms — agents are nondeterministic and filters are not total — so the stricter Markov structure does not hold.
Merge is additional structure: a monoid (many to one) interacting with copy's comonoid (one to many). Whether this interaction forms a bialgebra or something weaker remains open.
Session types. Protocol declarations (see protocols.md) add a specification layer for channel ordering constraints. The session type primitives correspond to linear logic connectives: send T to ⊗ (tensor), recv T to ⅋ (par), select to ⊕ (sum), offer to & (with), end to the monoidal unit. The duality of session types (send ↔ recv, select ↔ offer) maps to the duality between copy/filter (selection) and merge (being-selected-for) in the existing algebra. This situates the plumbing category within *-autonomous categories.
!(A, B) vs (!A, !B): !(A, B) is a stream of synchronised pairs (one channel, barrier output). (!A, !B) is a pair of independent streams on separate file descriptors (agent telemetry, control channels). The distinction is load-bearing for understanding barrier, project, and agent port decomposition.
Composition
Sequential composition (;)
f ; g pipes the output stream of f into the input stream of g. Each process maintains independent internal state. Type-checked: the output type of f must match the input type of g. Associative: (f ; g) ; h = f ; (g ; h). Unit: f ; id = id ; f = f.
Tensor product (⊗)
f ⊗ g runs f and g on independent channels with independent state. No shared state, no interference. (A → B) ⊗ (C → D) = A ⊗ C → B ⊗ D. Associative, with unit I (the monoidal unit).
Primitives
id : A → A
The identity morphism. Validates each input line against type A and writes it to stdout. Stateless — each line is independent. Total.
agent : A → (T, B)
An LLM conversation loop. Each input line becomes a user message; the model's response is validated against type B. If validation fails, the error is fed back to the model for retry (up to max_retries). The agent also produces a telemetry stream T — reasoning traces from extended thinking, timing data, etc. The .telemetry port is silently discarded unless explicitly wired to an observer.
By default, conversation context accumulates: each input builds on the prior exchange. The model sees the full history of user inputs and its own successful responses.
Retry messages (validation failures and re-prompts) are internal reductions — they are visible to the model during the current exchange but do not carry forward into the history. The clean history contains only: user input, successful assistant response.
Config options:
| Key | Type | Default | Description |
|---|---|---|---|
provider |
string | — | LLM provider: "anthropic" or "openai". Falls back to PLUMB_PROVIDER env var. Required (no default). |
model |
string | — | LLM model identifier. Falls back to PLUMB_MODEL env var. Required (no default). |
prompt |
string | — | Inline system prompt |
prompts |
[string] | — | System prompt files (read at startup, wrapped in <doc> tags) |
max_retries |
number | 3 | Validation retry limit |
max_tokens |
number | 8192 | Max tokens per response (includes thinking + output) |
max_tool_calls |
number | — | Maximum total tool calls per input. Exceeding this limit is an error. |
thinking_budget |
number | — | Token budget for extended thinking. Must be < max_tokens. Anthropic only — ignored for OpenAI. |
endpoint |
string | per provider | API endpoint. Falls back to PLUMB_ENDPOINT env var. Default: "https://api.anthropic.com" (Anthropic), "https://api.openai.com" (OpenAI) |
amnesiac |
bool | false | When true, each input starts a fresh conversation |
max_messages |
number | — | Cap on input messages before clean exit |
runtime_context |
bool | false | When true, inject runtime state (time, token counts, pin count) into the system prompt before each API call |
tools |
[ident] | — | Tool bindings available to the agent (see tools.md) |
mcp |
[ident|obj] | — | MCP server configs (see tools.md § MCP tools) |
Total (one output per input). Lowerable to a tool.
discard : A → Unit
The terminal morphism. Validates each input line against type A, produces no output. Useful for sinks and side-effect processes. Total.
Categorically, discard is the counit ε : A → I.
empty : Unit → A
The zero morphism. Produces an empty stream — sends EOF immediately,
causing downstream morphisms to terminate cleanly. The categorical dual
of discard: where discard consumes a stream and produces nothing,
empty consumes nothing and produces an empty stream.
In a plumb body, spawn with one positional channel (the output):
spawn empty(ch_out)
The desugarer auto-inserts empty for declared channels that are never
referenced in any spawn argument. Users can also write spawn empty(ch)
explicitly when an intentionally empty stream is needed.
Categorically, empty is the unit η : I → A.
copy : A → A
Fan-out. Duplicates each input message to two output channels. Stateless — each message is independent. Total.
In a plumb body, spawn with three positional channels — one input, two outputs:
spawn fan(ch_in, ch_out0, ch_out1)
Copy validates each message against type A before forwarding. Close semantics: when the input stream closes, both output streams close.
The binding declares the message type, not the multiplicity. Multiplicity is structural — it's in the wiring (three channels), not the type signature:
let fan : Message → Message = copy
Categorically, copy is the comonoid diagonal Δ : A → A ⊗ A. The tensor product appears in the channel structure, not the type annotation.
merge : (A, B) → A | B
Fan-in. Interleaves messages from two input channels onto one output channel. The output type is the sum (coproduct) of the input types — when the two inputs carry different types, each message is tagged by its variant. Stateless. Not total (requires interleaving) — not lowerable.
In a plumb body, spawn with three positional channels — two inputs, one output:
spawn join(ch_in0, ch_in1, ch_out)
Merge validates each message against the declared type before forwarding. Close semantics: the output stream closes only when both input streams have closed. Message ordering between the two inputs is non-deterministic.
When both inputs carry the same type (A ⊗ A → A), merge is the monoid fold ∇:
let join : Message → Message = merge
When the inputs differ, the binding type is a sum. Each input validates against its matching variant:
type QA = { text: string } | { count: int }
let join : QA → QA = merge
Categorically, merge is the copairing [ι₁, ι₂] : A + B → A | B.
map : A → B
Pure transform. Evaluates an expression on each message. Validates both the input (against A) and the output (against B) at runtime. Cannot be used inline in wiring chains — arbitrary map expressions require type annotation, so they must be named bindings. Total.
filter : A → A
Predicate filter. Passes messages where the expression evaluates to true, drops the rest. Input type = output type, enforced at load time. Can be used inline in wiring chains: filter(score >= 85). Not total (may produce 0 outputs) — not lowerable.
Lenient evaluation: The filter evaluates the predicate expression on each parsed JSON message. If the message validates against the channel type, the cleaned value is used; if validation fails, the raw parsed JSON is used instead. If the expression raises an evaluation error (missing field, type mismatch in comparison), the message is silently dropped — it doesn't match the predicate. JSON parse errors are fatal.
This makes filter usable on heterogeneous streams where only some messages have the fields referenced by the predicate. For example, filtering telemetry for usage events:
telemetry ; filter(kind = "usage" && prompt_tokens > 150000)
Messages without kind or prompt_tokens fields are silently dropped
(missing field → Eval_error → treated as false), while usage events
matching the threshold pass through.
barrier : (A, B) → (A, B)
Binary synchronised fan-in. Two concurrent reader threads with mutex + condvar; the last to arrive emits the product. Closes on any-EOF (opposite of merge's all-EOF). Ports: in0, in1, output. Not total (requires synchronisation) — not lowerable.
Categorically, barrier is !A ⊗ !B → !(A × B) — it synchronises two independent streams into a stream of correlated pairs.
project(n) : (A₀, …, Aₖ) → Aₙ
Product projection. Extracts the n-th component from a product stream. The input must be !(A₀, …, Aₖ) (a stream of tuples); the output is !Aₙ.
let fst_msg : !(Message, Message) -> !Message = project(0)
let snd_msg : !(Message, Message) -> !Message = project(1)
The index is zero-based. Out-of-bounds indices are rejected at load time: project(5) on a 2-element product type.
Ports: input, output. Not lowerable (requires product stream).
Categorically, project(n) is the canonical projection πₙ. The key law is barrier ; project(i) = πᵢ — barrier introduces a product, projection eliminates it.
Fatal conditions: - Input message is not a JSON array (malformed product) - Index out of bounds at runtime (should not occur if types are correct)
_format_json : !json → !string
JSON serialisation. Writes the compact JSON representation of each input value as a JSON string on the output. Each input JSON value becomes a JSON-escaped string value on stdout (one per line, JSON Lines).
Forms an adjunction pair with _parse_json:
_format_json ; _parse_json = id— on the nose_parse_json ; _format_json ≅ id— up to isomorphism (whitespace normalisation)
Executes inline (no subprocess). Ports: input, output. Total. Lowerable.
The standard library provides clean names via use json:
use json
solo@telemetry ; Json.format ; output
_parse_json : !string → !json
JSON deserialisation. Parses each input JSON string value and writes the
parsed JSON value to stdout. Inverse of _format_json.
Executes inline (no subprocess). Ports: input, output. Total. Lowerable.
Available as Json.parse via use json.
log(label) (compiler-internal)
Debug tap: !A → !Unit. Reads JSON values, emits a debug log line to stderr (when PIPELINE_DEBUG=1), and drops the value. No output data is produced.
Not available in surface syntax. Created by the desugarer when PIPELINE_DEBUG=1 to instrument session protocol steps. Paired with copy to create non-intrusive taps:
send_ch → Copy → (send wrapper, Log("protocol:session:send:Pause"))
No type validation — internal channels always carry valid JSON. Parse errors log the raw string.
Seeded channels
A seeded channel is a channel pre-loaded with one null JSON line
("null\n") by the executor before any thread starts. This is the
Petri net concept of an initial marking: a place with one token.
Seeded channels are declared in pb_seeded on plumb_body. The
executor writes the seed value using Unix.write on the raw fd
before spawning forwarding threads. The compiler uses seeded
channels for the token-ring construction that serialises terminating
protocol rounds (see protocols.md).
merge_all (compiler-internal)
All-EOF merge (structural coproduct): !A ⊗ !A → !A. Both inputs
forward to the output; EOF is sent when both inputs close. No drain
protocol — drain markers are forwarded transparently for chain
transparency.
Not available in surface syntax. Emitted by the compiler for n-ary
fan-in desugaring (build_homo, build_hetero) when the fan-in target
is not in a feedback cycle. Using merge (with drain) in this position
causes the drain state machine to inject orphan markers that never
return, blocking shutdown.
The desugarer detects cycles in the wiring graph: if the fan-in target
participates in a cycle, it emits merge (with drain). Otherwise it
emits merge_all (structural coproduct, no drain).
Ports: in0, in1, output. Same as merge.
merge_any (compiler-internal)
Any-EOF merge: !A ⊗ !A → !A. Identical to merge except the output
closes when the first input reaches EOF — the other reader gets a
broken pipe and terminates cleanly.
Not available in surface syntax. Emitted by the <-> desugarer for the
session envelope merge tree, where tagger EOF (agent lifecycle end) is
the natural shutdown signal. Using merge in this position creates a
circular EOF dependency: the merge waits for send wrappers, send wrappers
wait for barriers, barriers wait for the merge output (see PLMB-033).
Ports: in0, in1, output. Same as merge.
Instance naming
Process names in the routing table are instance-numbered: each spawn
gets a unique name formed from the binding name and a per-binding
counter ({binding}{N}). For example, a pipeline with two id
spawns produces processes id0 and id1. A single copy spawn
produces copy0.
This is a compiler detail — the surface language uses binding names without suffixes. Instance numbering guarantees unique process names in the routing table, which is required for unique socket endpoints in the fabric and unambiguous supervisor lookup.
Inline execution
The structural morphisms (id, copy, merge, discard, empty, map, filter, barrier) execute inline as forwarding threads within the plumb process. They do not spawn subprocesses, synthesise spec files, or require binary resolution. The compiler-internal morphisms (tagger, log, first, merge_all, merge_any) also execute inline. Only agents and nested plumbs spawn subprocesses — they need isolation for API credentials, file permissions, and failure domains.
Multiplexed I/O
Agent subprocesses use exactly three file descriptors: stdin, stdout, and stderr. All logical channels (data, control, telemetry, tool dispatch) are multiplexed onto stdin/stdout using tagged JSON envelopes:
{"__port": "input", "msg": <json>} ← runtime → agent (stdin)
{"__port": "ctrl_in", "msg": <json>} ← runtime → agent (stdin)
{"__port": "tool_resp", "msg": <json>} ← runtime → agent (stdin)
{"__port": "output", "msg": <json>} ← agent → runtime (stdout)
{"__port": "ctrl_out", "msg": <json>} ← agent → runtime (stdout)
{"__port": "telemetry", "msg": <json>} ← agent → runtime (stdout)
{"__port": "tool_req", "msg": <json>} ← agent → runtime (stdout)
Port-specific EOF is signalled with {"__port": "input", "__eof": true}.
This closes the logical data channel while keeping control alive (required
for the drain loop — see executor.md).
stderr carries only debug logs and errors — it is not part of the algebra.
The __port envelope is transport-layer only and was used by the legacy
stdio bridge (removed in PLMB-061). Agents now speak ZMQ directly
(zmq_agent.ml); the agent binary retains stdio fallback support for
standalone use. Internal channels within a pipeline never carry
__port tags.
Port names
The port count depends on the input/output decomposition:
| Input | Output | Ports |
|---|---|---|
!A |
!B |
[input, output] |
!A |
(!B, !T) |
[input, output, telemetry] |
(!A, !C) |
!B |
[input, ctrl_in, output, ctrl_out] |
(!A, !C) |
(!B, !T) |
[input, ctrl_in, output, ctrl_out, telemetry] |
The ctrl_out port carries the agent's control responses (memory dumps, acknowledgements) as a composable stream. It can be wired into downstream processes using port addressing:
worker@ctrl_out ; filter(kind = "memory") ; memory_sink
When ctrl_out is not explicitly wired, it is silently drained (messages discarded).