Python Bindings
The persevere.plumbing Python package provides a typed interface to the
plumbing runtime. It calls the compiled OCaml binaries (plumb,
plumb-check) as subprocesses and communicates via JSON over
stdin/stdout.
Installation
Pre-built wheels are available for macOS arm64 and Linux x86_64. They
are not yet published to PyPI — install from the local wheel file
produced by make python-dev or make linux-release:
pip install dist/python/persevere_plumbing-*.whl # core API
pip install "dist/python/plumbing-*.whl[pydantic]" # adds Pydantic AI integration
The package bundles the plumb and plumb-check binaries. If they
are not bundled (e.g. during development), the library searches the
dune build output and then PATH.
Type checking — check()
Validate a .plumb program and inspect its types, bindings, and
protocols.
import persevere.plumbing as pb
from pathlib import Path
result = pb.check(Path("pipeline.plumb"))
print(result.version) # language version
print(result.bindings) # list of Binding objects
print(result.types) # list of TypeDecl objects
print(result.protocols) # list of ProtocolDecl objects
Each Binding has:
- name — process name (e.g. "main")
- input, output — type strings
- input_type, output_type — type AST dicts (for map_type())
- impl — implementation kind ("agent", "pipeline", etc.)
- attrs — configuration dict
Raises CheckError with structured fields (code, category,
detail, line, column) on validation failure.
One-shot execution — call_sync() / call()
Send a single input and collect all outputs.
results = pb.call_sync(Path("pipeline.plumb"), "hello")
# results is a list of output values
call_sync() is a synchronous wrapper around the async call().
Use call() directly in async code:
results = await pb.call(Path("pipeline.plumb"), "hello")
Parameters:
- spec — path to the .plumb file
- input — any JSON-serialisable value
- auto_approve — bypass tool approval prompts (default False)
- search_path — additional module search paths
- binary — override binary path
- env — additional environment variables for the subprocess
Raises PipelineError if the pipeline exits with a non-zero code.
The error includes stderr diagnostics.
Streaming execution — run() / Pipeline
Launch a long-lived pipeline for multi-turn conversations or streaming output.
import asyncio
import persevere.plumbing as pb
from pathlib import Path
async def main():
async with await pb.run(Path("doctor.plumb")) as pipeline:
await pipeline.send("I feel anxious")
response = await pipeline.recv()
print(response)
asyncio.run(main())
Pipeline methods
| Method | Returns | Description |
|---|---|---|
send(message) |
None |
Send JSON to the pipeline's stdin |
recv() |
Any \| None |
Next output value (skips telemetry) |
recv_event() |
Event \| None |
Next output or telemetry event |
recv_telemetry() |
Telemetry \| None |
Next telemetry (skips output) |
close_input() |
None |
Close stdin (idempotent) |
shutdown(timeout) |
int |
Graceful shutdown, returns exit code |
The pipeline is also an async context manager (async with) and an
async iterator (async for event in pipeline).
Event types
Output(data)— primary output from the pipelineTelemetry(agent, payload)— agent telemetry (thinking, usage, tool calls, etc.)Event = Output | Telemetry
Parameters
spec— path to the.plumbfileauto_approve— bypass tool approval prompts (defaultFalse)verbose— enable telemetry streaming (defaultTrue)search_path— additional module search pathsbinary— override binary pathenv— additional environment variables
Pydantic AI integration — PlumbingModel
PlumbingModel is an interop adapter that wraps a plumbing pipeline
as a Pydantic AI Model. It is not the primary interface — use
call_sync() or run() for direct pipeline access.
from pathlib import Path
from pydantic_ai import Agent
from persevere.plumbing.provider import PlumbingModel
async def main():
async with PlumbingModel(Path("pipeline.plumb")) as model:
agent = Agent(model)
result = await agent.run("hello")
print(result.output)
Constructor parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
program |
str \| Path |
required | Path to .plumb file |
pass_history |
bool |
False |
Send full conversation or just last message |
auto_approve |
bool |
False |
Bypass tool approval |
search_path |
list[Path] \| None |
None |
Module search paths |
binary |
str \| None |
None |
Override binary path |
env |
dict[str, str] \| None |
None |
Extra environment variables |
Constraints
result_type=stronly. Structured output types inject output tools thatPlumbingModelcannot handle. Use the bare API with type mapping (see below) for typed output.- No system prompts. Do not set
system_promptorinstructionson theAgent— they are silently discarded withpass_history=False, or contaminate the pipeline's prompt structure withpass_history=True. - Stateful. The same pipeline process persists across turns. The pipeline maintains its own conversation history internally.
- Lifecycle. Always use
async withto ensure the pipeline process is shut down cleanly.
Multi-turn usage
With pass_history=False (the default), only the last user message
is sent to the pipeline on each turn. The pipeline maintains its own
state — prior messages are already in the agent's context.
async with PlumbingModel(Path("doctor.plumb")) as model:
agent = Agent(model)
r1 = await agent.run("I feel anxious")
r2 = await agent.run("Tell me more", message_history=r1.all_messages())
The message_history is passed for Pydantic AI's bookkeeping, not
because the pipeline uses it.
Type mapping
The persevere.plumbing._types module maps plumbing type ASTs to Python types.
This is used internally by PlumbingModel and can be used directly
for typed validation.
| Plumbing type | Python type |
|---|---|
string |
str |
int |
int |
number |
float |
bool |
bool |
json |
Any |
unit |
NoneType |
[T] |
list[T] |
{ f: T, ... } |
Pydantic BaseModel |
A \| B |
Union[A, B] |
(A, B) |
tuple[A, B] |
!T |
T (stream wrapper removed) |
from persevere.plumbing._types import input_type, output_type
result = pb.check(Path("pipeline.plumb"))
in_type = input_type(result) # Python type for main binding input
out_type = output_type(result) # Python type for main binding output
Error hierarchy
PlumbError (base)
├── CheckError — type-check failure (code, category, detail, line, column)
├── RunError — process launch failure (detail, returncode)
├── PipelineError — non-zero exit during execution (detail, returncode, stderr)
├── BinaryNotFoundError — plumb/plumb-check binary not found
└── PlumbingModelError — Pydantic AI integration error (detail, cause)
Program construction — DSL
The persevere.plumbing.dsl subpackage provides Python combinators that
construct plumbing programs as an AST and render them as valid
.plumb source text. You write Python; the OCaml toolchain
type-checks and executes the result.
from pydantic import BaseModel
from persevere.plumbing.dsl import Program, Agent, Filter, Map, F, PlumbBody, SumType, Record
class Verdict(BaseModel):
verdict: bool
commentary: str
draft: str
prog = Program()
prog.let("composer", input=str, output=str,
impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))
prog.let("checker", input=str, output=Verdict,
impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))
body = PlumbBody("input", "output")
body.wire("input", "composer", "checker")
body.wire("checker", Filter(F.verdict == True).project("draft"), "output")
prog.let("main", input=str, output=str, impl=body)
source = prog.render() # .plumb source text
result = prog.check() # CheckResult (validates via plumb-check)
Terminology mapping
| DSL class | .plumb syntax |
Reference |
|---|---|---|
Agent(model=, ...) |
agent { ... } |
processes.md § Agent |
Tool(process=) |
tool { process: name } |
processes.md § Tool |
Filter(F.x >= 85) |
filter(x >= 85) |
processes.md § Filter |
Map(F.x, F.y) |
map({x: x, y: y}) |
processes.md § Map |
Id() / ID |
id |
processes.md § Identity |
Copy() / COPY |
copy |
processes.md § Copy |
Merge() / MERGE |
merge |
processes.md § Merge |
Discard() / DISCARD |
discard |
processes.md § Discard |
Barrier() / BARRIER |
barrier |
processes.md § Barrier |
Pipe(a, b) / a >> b |
a ; b |
Sequential composition |
Tensor(a, b) / a * b |
(a * b) |
Parallel composition |
PlumbBody("in", "out") |
plumb(in, out) { ... } |
Wiring body |
PortRef("p", "ctrl") |
p@ctrl |
Port addressing |
Record([("f", str)]) |
{ f: string } |
Record type |
SumType([A, B]) |
A \| B |
Sum type |
ProductType([A, B]) |
(A, B) |
Product type |
StreamType(T) |
!T |
Stream type |
ListType(T) |
[T] |
List type |
Native Python type shortcuts
Python types are accepted wherever a PlumbType is expected:
| Python | .plumb |
|---|---|
str |
string |
int |
int |
float |
number |
bool |
bool |
dict |
json |
None |
unit |
list[T] |
[T] |
BaseModel subclass |
TypeName (auto-registered) |
Auto stream wrapping
Port types on let() are automatically wrapped in StreamType —
almost every port is a stream. Use Bare(some_type) to suppress:
prog.let("tool_binding", input=Bare(str), output=Bare(int), impl=...)
# renders: let tool_binding : string -> int = ...
F proxy
The F object creates field references for filter/map expressions:
Filter(F.score >= 85) # filter(score >= 85)
Filter(F.verdict == False) # filter(verdict = false)
Filter((F.a > 0) & (F.b < 10)) # filter(a > 0 && b < 10)
Map(F.score, F.review) # map({score: score, review: review})
Map(quality=F.score) # map({quality: score})
Running constructed programs
# Type-check
result = prog.check()
# One-shot synchronous call
results = prog.call_sync("hello")
# Async streaming
async with await prog.run() as pipeline:
await pipeline.send("hello")
response = await pipeline.recv()
Debugging check() errors
When check() raises CheckError, the error references line/column
in the generated .plumb source. To debug:
source = prog.render()
print(source) # inspect the generated source
prog.check() # error will reference lines in the above
Escape hatches
Use Raw("verbatim text") for syntax the DSL does not model
(protocols, session types, complex builtins). PlumbBody.raw()
adds raw statements to a wiring body.
Examples
See examples/README.md for runnable scripts demonstrating each API pattern.