Python Bindings

The persevere.plumbing Python package provides a typed interface to the plumbing runtime. It calls the compiled OCaml binaries (plumb, plumb-check) as subprocesses and communicates via JSON over stdin/stdout.

Installation

Pre-built wheels are available for macOS arm64 and Linux x86_64. They are not yet published to PyPI — install from the local wheel file produced by make python-dev or make linux-release:

pip install dist/python/persevere_plumbing-*.whl           # core API
pip install "dist/python/plumbing-*.whl[pydantic]"  # adds Pydantic AI integration

The package bundles the plumb and plumb-check binaries. If they are not bundled (e.g. during development), the library searches the dune build output and then PATH.

Type checking — check()

Validate a .plumb program and inspect its types, bindings, and protocols.

import persevere.plumbing as pb
from pathlib import Path

result = pb.check(Path("pipeline.plumb"))
print(result.version)      # language version
print(result.bindings)     # list of Binding objects
print(result.types)        # list of TypeDecl objects
print(result.protocols)    # list of ProtocolDecl objects

Each Binding has: - name — process name (e.g. "main") - input, output — type strings - input_type, output_type — type AST dicts (for map_type()) - impl — implementation kind ("agent", "pipeline", etc.) - attrs — configuration dict

Raises CheckError with structured fields (code, category, detail, line, column) on validation failure.

One-shot execution — call_sync() / call()

Send a single input and collect all outputs.

results = pb.call_sync(Path("pipeline.plumb"), "hello")
# results is a list of output values

call_sync() is a synchronous wrapper around the async call(). Use call() directly in async code:

results = await pb.call(Path("pipeline.plumb"), "hello")

Parameters: - spec — path to the .plumb file - input — any JSON-serialisable value - auto_approve — bypass tool approval prompts (default False) - search_path — additional module search paths - binary — override binary path - env — additional environment variables for the subprocess

Raises PipelineError if the pipeline exits with a non-zero code. The error includes stderr diagnostics.

Streaming execution — run() / Pipeline

Launch a long-lived pipeline for multi-turn conversations or streaming output.

import asyncio
import persevere.plumbing as pb
from pathlib import Path

async def main():
    async with await pb.run(Path("doctor.plumb")) as pipeline:
        await pipeline.send("I feel anxious")
        response = await pipeline.recv()
        print(response)

asyncio.run(main())

Pipeline methods

Method Returns Description
send(message) None Send JSON to the pipeline's stdin
recv() Any \| None Next output value (skips telemetry)
recv_event() Event \| None Next output or telemetry event
recv_telemetry() Telemetry \| None Next telemetry (skips output)
close_input() None Close stdin (idempotent)
shutdown(timeout) int Graceful shutdown, returns exit code

The pipeline is also an async context manager (async with) and an async iterator (async for event in pipeline).

Event types

  • Output(data) — primary output from the pipeline
  • Telemetry(agent, payload) — agent telemetry (thinking, usage, tool calls, etc.)
  • Event = Output | Telemetry

Parameters

  • spec — path to the .plumb file
  • auto_approve — bypass tool approval prompts (default False)
  • verbose — enable telemetry streaming (default True)
  • search_path — additional module search paths
  • binary — override binary path
  • env — additional environment variables

Pydantic AI integration — PlumbingModel

PlumbingModel is an interop adapter that wraps a plumbing pipeline as a Pydantic AI Model. It is not the primary interface — use call_sync() or run() for direct pipeline access.

from pathlib import Path
from pydantic_ai import Agent
from persevere.plumbing.provider import PlumbingModel

async def main():
    async with PlumbingModel(Path("pipeline.plumb")) as model:
        agent = Agent(model)
        result = await agent.run("hello")
        print(result.output)

Constructor parameters

Parameter Type Default Description
program str \| Path required Path to .plumb file
pass_history bool False Send full conversation or just last message
auto_approve bool False Bypass tool approval
search_path list[Path] \| None None Module search paths
binary str \| None None Override binary path
env dict[str, str] \| None None Extra environment variables

Constraints

  • result_type=str only. Structured output types inject output tools that PlumbingModel cannot handle. Use the bare API with type mapping (see below) for typed output.
  • No system prompts. Do not set system_prompt or instructions on the Agent — they are silently discarded with pass_history=False, or contaminate the pipeline's prompt structure with pass_history=True.
  • Stateful. The same pipeline process persists across turns. The pipeline maintains its own conversation history internally.
  • Lifecycle. Always use async with to ensure the pipeline process is shut down cleanly.

Multi-turn usage

With pass_history=False (the default), only the last user message is sent to the pipeline on each turn. The pipeline maintains its own state — prior messages are already in the agent's context.

async with PlumbingModel(Path("doctor.plumb")) as model:
    agent = Agent(model)
    r1 = await agent.run("I feel anxious")
    r2 = await agent.run("Tell me more", message_history=r1.all_messages())

The message_history is passed for Pydantic AI's bookkeeping, not because the pipeline uses it.

Type mapping

The persevere.plumbing._types module maps plumbing type ASTs to Python types. This is used internally by PlumbingModel and can be used directly for typed validation.

Plumbing type Python type
string str
int int
number float
bool bool
json Any
unit NoneType
[T] list[T]
{ f: T, ... } Pydantic BaseModel
A \| B Union[A, B]
(A, B) tuple[A, B]
!T T (stream wrapper removed)
from persevere.plumbing._types import input_type, output_type

result = pb.check(Path("pipeline.plumb"))
in_type = input_type(result)   # Python type for main binding input
out_type = output_type(result) # Python type for main binding output

Error hierarchy

PlumbError (base)
├── CheckError          — type-check failure (code, category, detail, line, column)
├── RunError            — process launch failure (detail, returncode)
├── PipelineError       — non-zero exit during execution (detail, returncode, stderr)
├── BinaryNotFoundError — plumb/plumb-check binary not found
└── PlumbingModelError  — Pydantic AI integration error (detail, cause)

Program construction — DSL

The persevere.plumbing.dsl subpackage provides Python combinators that construct plumbing programs as an AST and render them as valid .plumb source text. You write Python; the OCaml toolchain type-checks and executes the result.

from pydantic import BaseModel
from persevere.plumbing.dsl import Program, Agent, Filter, Map, F, PlumbBody, SumType, Record

class Verdict(BaseModel):
    verdict: bool
    commentary: str
    draft: str

prog = Program()
prog.let("composer", input=str, output=str,
    impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))
prog.let("checker", input=str, output=Verdict,
    impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))

body = PlumbBody("input", "output")
body.wire("input", "composer", "checker")
body.wire("checker", Filter(F.verdict == True).project("draft"), "output")
prog.let("main", input=str, output=str, impl=body)

source = prog.render()   # .plumb source text
result = prog.check()    # CheckResult (validates via plumb-check)

Terminology mapping

DSL class .plumb syntax Reference
Agent(model=, ...) agent { ... } processes.md § Agent
Tool(process=) tool { process: name } processes.md § Tool
Filter(F.x >= 85) filter(x >= 85) processes.md § Filter
Map(F.x, F.y) map({x: x, y: y}) processes.md § Map
Id() / ID id processes.md § Identity
Copy() / COPY copy processes.md § Copy
Merge() / MERGE merge processes.md § Merge
Discard() / DISCARD discard processes.md § Discard
Barrier() / BARRIER barrier processes.md § Barrier
Pipe(a, b) / a >> b a ; b Sequential composition
Tensor(a, b) / a * b (a * b) Parallel composition
PlumbBody("in", "out") plumb(in, out) { ... } Wiring body
PortRef("p", "ctrl") p@ctrl Port addressing
Record([("f", str)]) { f: string } Record type
SumType([A, B]) A \| B Sum type
ProductType([A, B]) (A, B) Product type
StreamType(T) !T Stream type
ListType(T) [T] List type

Native Python type shortcuts

Python types are accepted wherever a PlumbType is expected:

Python .plumb
str string
int int
float number
bool bool
dict json
None unit
list[T] [T]
BaseModel subclass TypeName (auto-registered)

Auto stream wrapping

Port types on let() are automatically wrapped in StreamType — almost every port is a stream. Use Bare(some_type) to suppress:

prog.let("tool_binding", input=Bare(str), output=Bare(int), impl=...)
# renders: let tool_binding : string -> int = ...

F proxy

The F object creates field references for filter/map expressions:

Filter(F.score >= 85)              # filter(score >= 85)
Filter(F.verdict == False)         # filter(verdict = false)
Filter((F.a > 0) & (F.b < 10))    # filter(a > 0 && b < 10)
Map(F.score, F.review)             # map({score: score, review: review})
Map(quality=F.score)               # map({quality: score})

Running constructed programs

# Type-check
result = prog.check()

# One-shot synchronous call
results = prog.call_sync("hello")

# Async streaming
async with await prog.run() as pipeline:
    await pipeline.send("hello")
    response = await pipeline.recv()

Debugging check() errors

When check() raises CheckError, the error references line/column in the generated .plumb source. To debug:

source = prog.render()
print(source)          # inspect the generated source
prog.check()           # error will reference lines in the above

Escape hatches

Use Raw("verbatim text") for syntax the DSL does not model (protocols, session types, complex builtins). PlumbBody.raw() adds raw statements to a wiring body.

Examples

See examples/README.md for runnable scripts demonstrating each API pattern.