Processes

Processes are the executable units of the plumbing language. Each process is a native binary that reads a .plumb spec file, validates I/O against the declared types, and communicates via JSON Lines on stdin/stdout.

See tools.md for tool semantics and the dispatch protocol. See agent-runtime.md for environment variables, control channel, and telemetry.

Stream transducers

A process with type A → B consumes a stream of A-typed messages and produces a stream of B-typed messages. The type signature governs the per-message schema, not the inter-message dependency.

An agent is a stateful stream transducer: the n-th output may depend on all prior inputs, not just the n-th. Conversation context accumulates across the input stream by default. This is essential for pipelines with review loops — a composer receiving a revision request must remember the original task.

The structural processes (id, copy, merge, discard, empty, barrier) are stateless.

Algebraic structure

The category is a copy-discard (CD) category, not a Markov category. Every object is equipped with a commutative comonoid structure (copy and discard) compatible with tensor products. Discard is not natural over our morphisms — agents are nondeterministic and filters are not total — so the stricter Markov structure does not hold.

Merge is additional structure: a monoid (many to one) interacting with copy's comonoid (one to many). Whether this interaction forms a bialgebra or something weaker remains open.

Session types. Protocol declarations (see protocols.md) add a specification layer for channel ordering constraints. The session type primitives correspond to linear logic connectives: send T to ⊗ (tensor), recv T to ⅋ (par), select to ⊕ (sum), offer to & (with), end to the monoidal unit. The duality of session types (send ↔ recv, select ↔ offer) maps to the duality between copy/filter (selection) and merge (being-selected-for) in the existing algebra. This situates the plumbing category within *-autonomous categories.

!(A, B) vs (!A, !B): !(A, B) is a stream of synchronised pairs (one channel, barrier output). (!A, !B) is a pair of independent streams on separate file descriptors (agent telemetry, control channels). The distinction is load-bearing for understanding barrier, project, and agent port decomposition.

Composition

Sequential composition (;)

f ; g pipes the output stream of f into the input stream of g. Each process maintains independent internal state. Type-checked: the output type of f must match the input type of g. Associative: (f ; g) ; h = f ; (g ; h). Unit: f ; id = id ; f = f.

Tensor product ()

f ⊗ g runs f and g on independent channels with independent state. No shared state, no interference. (A → B) ⊗ (C → D) = A ⊗ C → B ⊗ D. Associative, with unit I (the monoidal unit).

Primitives

id : A → A

The identity morphism. Validates each input line against type A and writes it to stdout. Stateless — each line is independent. Total.

agent : A → (T, B)

An LLM conversation loop. Each input line becomes a user message; the model's response is validated against type B. If validation fails, the error is fed back to the model for retry (up to max_retries). The agent also produces a telemetry stream T — reasoning traces from extended thinking, timing data, etc. The .telemetry port is silently discarded unless explicitly wired to an observer.

By default, conversation context accumulates: each input builds on the prior exchange. The model sees the full history of user inputs and its own successful responses.

Retry messages (validation failures and re-prompts) are internal reductions — they are visible to the model during the current exchange but do not carry forward into the history. The clean history contains only: user input, successful assistant response.

Config options:

Key Type Default Description
provider string LLM provider: "anthropic" or "openai". Falls back to PLUMB_PROVIDER env var. Required (no default).
model string LLM model identifier. Falls back to PLUMB_MODEL env var. Required (no default).
prompt string Inline system prompt
prompts [string] System prompt files (read at startup, wrapped in <doc> tags)
max_retries number 3 Validation retry limit
max_tokens number 8192 Max tokens per response (includes thinking + output)
max_tool_calls number Maximum total tool calls per input. Exceeding this limit is an error.
thinking_budget number Token budget for extended thinking. Must be < max_tokens. Anthropic only — ignored for OpenAI.
endpoint string per provider API endpoint. Falls back to PLUMB_ENDPOINT env var. Default: "https://api.anthropic.com" (Anthropic), "https://api.openai.com" (OpenAI)
amnesiac bool false When true, each input starts a fresh conversation
max_messages number Cap on input messages before clean exit
runtime_context bool false When true, inject runtime state (time, token counts, pin count) into the system prompt before each API call
tools [ident] Tool bindings available to the agent (see tools.md)
mcp [ident|obj] MCP server configs (see tools.md § MCP tools)

Total (one output per input). Lowerable to a tool.

discard : A → Unit

The terminal morphism. Validates each input line against type A, produces no output. Useful for sinks and side-effect processes. Total.

Categorically, discard is the counit ε : A → I.

empty : Unit → A

The zero morphism. Produces an empty stream — sends EOF immediately, causing downstream morphisms to terminate cleanly. The categorical dual of discard: where discard consumes a stream and produces nothing, empty consumes nothing and produces an empty stream.

In a plumb body, spawn with one positional channel (the output):

spawn empty(ch_out)

The desugarer auto-inserts empty for declared channels that are never referenced in any spawn argument. Users can also write spawn empty(ch) explicitly when an intentionally empty stream is needed.

Categorically, empty is the unit η : I → A.

copy : A → A

Fan-out. Duplicates each input message to two output channels. Stateless — each message is independent. Total.

In a plumb body, spawn with three positional channels — one input, two outputs:

spawn fan(ch_in, ch_out0, ch_out1)

Copy validates each message against type A before forwarding. Close semantics: when the input stream closes, both output streams close.

The binding declares the message type, not the multiplicity. Multiplicity is structural — it's in the wiring (three channels), not the type signature:

let fan : Message → Message = copy

Categorically, copy is the comonoid diagonal Δ : A → A ⊗ A. The tensor product appears in the channel structure, not the type annotation.

merge : (A, B) → A | B

Fan-in. Interleaves messages from two input channels onto one output channel. The output type is the sum (coproduct) of the input types — when the two inputs carry different types, each message is tagged by its variant. Stateless. Not total (requires interleaving) — not lowerable.

In a plumb body, spawn with three positional channels — two inputs, one output:

spawn join(ch_in0, ch_in1, ch_out)

Merge validates each message against the declared type before forwarding. Close semantics: the output stream closes only when both input streams have closed. Message ordering between the two inputs is non-deterministic.

When both inputs carry the same type (A ⊗ A → A), merge is the monoid fold :

let join : Message → Message = merge

When the inputs differ, the binding type is a sum. Each input validates against its matching variant:

type QA = { text: string } | { count: int }
let join : QA → QA = merge

Categorically, merge is the copairing [ι₁, ι₂] : A + B → A | B.

map : A → B

Pure transform. Evaluates an expression on each message. Validates both the input (against A) and the output (against B) at runtime. Cannot be used inline in wiring chains — arbitrary map expressions require type annotation, so they must be named bindings. Total.

filter : A → A

Predicate filter. Passes messages where the expression evaluates to true, drops the rest. Input type = output type, enforced at load time. Can be used inline in wiring chains: filter(score >= 85). Not total (may produce 0 outputs) — not lowerable.

Lenient evaluation: The filter evaluates the predicate expression on each parsed JSON message. If the message validates against the channel type, the cleaned value is used; if validation fails, the raw parsed JSON is used instead. If the expression raises an evaluation error (missing field, type mismatch in comparison), the message is silently dropped — it doesn't match the predicate. JSON parse errors are fatal.

This makes filter usable on heterogeneous streams where only some messages have the fields referenced by the predicate. For example, filtering telemetry for usage events:

telemetry ; filter(kind = "usage" && prompt_tokens > 150000)

Messages without kind or prompt_tokens fields are silently dropped (missing field → Eval_error → treated as false), while usage events matching the threshold pass through.

barrier : (A, B) → (A, B)

Binary synchronised fan-in. Two concurrent reader threads with mutex + condvar; the last to arrive emits the product. Closes on any-EOF (opposite of merge's all-EOF). Ports: in0, in1, output. Not total (requires synchronisation) — not lowerable.

Categorically, barrier is !A ⊗ !B → !(A × B) — it synchronises two independent streams into a stream of correlated pairs.

project(n) : (A₀, …, Aₖ) → Aₙ

Product projection. Extracts the n-th component from a product stream. The input must be !(A₀, …, Aₖ) (a stream of tuples); the output is !Aₙ.

let fst_msg : !(Message, Message) -> !Message = project(0)
let snd_msg : !(Message, Message) -> !Message = project(1)

The index is zero-based. Out-of-bounds indices are rejected at load time: project(5) on a 2-element product type.

Ports: input, output. Not lowerable (requires product stream).

Categorically, project(n) is the canonical projection πₙ. The key law is barrier ; project(i) = πᵢ — barrier introduces a product, projection eliminates it.

Fatal conditions: - Input message is not a JSON array (malformed product) - Index out of bounds at runtime (should not occur if types are correct)

_format_json : !json → !string

JSON serialisation. Writes the compact JSON representation of each input value as a JSON string on the output. Each input JSON value becomes a JSON-escaped string value on stdout (one per line, JSON Lines).

Forms an adjunction pair with _parse_json:

  • _format_json ; _parse_json = id — on the nose
  • _parse_json ; _format_json ≅ id — up to isomorphism (whitespace normalisation)

Executes inline (no subprocess). Ports: input, output. Total. Lowerable.

The standard library provides clean names via use json:

use json
solo@telemetry ; Json.format ; output

_parse_json : !string → !json

JSON deserialisation. Parses each input JSON string value and writes the parsed JSON value to stdout. Inverse of _format_json.

Executes inline (no subprocess). Ports: input, output. Total. Lowerable.

Available as Json.parse via use json.

log(label) (compiler-internal)

Debug tap: !A → !Unit. Reads JSON values, emits a debug log line to stderr (when PIPELINE_DEBUG=1), and drops the value. No output data is produced.

Not available in surface syntax. Created by the desugarer when PIPELINE_DEBUG=1 to instrument session protocol steps. Paired with copy to create non-intrusive taps:

send_ch → Copy → (send wrapper, Log("protocol:session:send:Pause"))

No type validation — internal channels always carry valid JSON. Parse errors log the raw string.

Seeded channels

A seeded channel is a channel pre-loaded with one null JSON line ("null\n") by the executor before any thread starts. This is the Petri net concept of an initial marking: a place with one token.

Seeded channels are declared in pb_seeded on plumb_body. The executor writes the seed value using Unix.write on the raw fd before spawning forwarding threads. The compiler uses seeded channels for the token-ring construction that serialises terminating protocol rounds (see protocols.md).

merge_all (compiler-internal)

All-EOF merge (structural coproduct): !A ⊗ !A → !A. Both inputs forward to the output; EOF is sent when both inputs close. No drain protocol — drain markers are forwarded transparently for chain transparency.

Not available in surface syntax. Emitted by the compiler for n-ary fan-in desugaring (build_homo, build_hetero) when the fan-in target is not in a feedback cycle. Using merge (with drain) in this position causes the drain state machine to inject orphan markers that never return, blocking shutdown.

The desugarer detects cycles in the wiring graph: if the fan-in target participates in a cycle, it emits merge (with drain). Otherwise it emits merge_all (structural coproduct, no drain).

Ports: in0, in1, output. Same as merge.

merge_any (compiler-internal)

Any-EOF merge: !A ⊗ !A → !A. Identical to merge except the output closes when the first input reaches EOF — the other reader gets a broken pipe and terminates cleanly.

Not available in surface syntax. Emitted by the <-> desugarer for the session envelope merge tree, where tagger EOF (agent lifecycle end) is the natural shutdown signal. Using merge in this position creates a circular EOF dependency: the merge waits for send wrappers, send wrappers wait for barriers, barriers wait for the merge output (see PLMB-033).

Ports: in0, in1, output. Same as merge.

Instance naming

Process names in the routing table are instance-numbered: each spawn gets a unique name formed from the binding name and a per-binding counter ({binding}{N}). For example, a pipeline with two id spawns produces processes id0 and id1. A single copy spawn produces copy0.

This is a compiler detail — the surface language uses binding names without suffixes. Instance numbering guarantees unique process names in the routing table, which is required for unique socket endpoints in the fabric and unambiguous supervisor lookup.

Inline execution

The structural morphisms (id, copy, merge, discard, empty, map, filter, barrier) execute inline as forwarding threads within the plumb process. They do not spawn subprocesses, synthesise spec files, or require binary resolution. The compiler-internal morphisms (tagger, log, first, merge_all, merge_any) also execute inline. Only agents and nested plumbs spawn subprocesses — they need isolation for API credentials, file permissions, and failure domains.

Multiplexed I/O

Agent subprocesses use exactly three file descriptors: stdin, stdout, and stderr. All logical channels (data, control, telemetry, tool dispatch) are multiplexed onto stdin/stdout using tagged JSON envelopes:

{"__port": "input",     "msg": <json>}     ← runtime → agent (stdin)
{"__port": "ctrl_in",   "msg": <json>}     ← runtime → agent (stdin)
{"__port": "tool_resp",  "msg": <json>}     ← runtime → agent (stdin)
{"__port": "output",    "msg": <json>}     ← agent → runtime (stdout)
{"__port": "ctrl_out",  "msg": <json>}     ← agent → runtime (stdout)
{"__port": "telemetry", "msg": <json>}     ← agent → runtime (stdout)
{"__port": "tool_req",  "msg": <json>}     ← agent → runtime (stdout)

Port-specific EOF is signalled with {"__port": "input", "__eof": true}. This closes the logical data channel while keeping control alive (required for the drain loop — see executor.md).

stderr carries only debug logs and errors — it is not part of the algebra.

The __port envelope is transport-layer only and was used by the legacy stdio bridge (removed in PLMB-061). Agents now speak ZMQ directly (zmq_agent.ml); the agent binary retains stdio fallback support for standalone use. Internal channels within a pipeline never carry __port tags.

Port names

The port count depends on the input/output decomposition:

Input Output Ports
!A !B [input, output]
!A (!B, !T) [input, output, telemetry]
(!A, !C) !B [input, ctrl_in, output, ctrl_out]
(!A, !C) (!B, !T) [input, ctrl_in, output, ctrl_out, telemetry]

The ctrl_out port carries the agent's control responses (memory dumps, acknowledgements) as a composable stream. It can be wired into downstream processes using port addressing:

worker@ctrl_out ; filter(kind = "memory") ; memory_sink

When ctrl_out is not explicitly wired, it is silently drained (messages discarded).