Python DSL Reference
Construct plumbing programs in Python. The DSL renders valid .plumb
source text; the OCaml toolchain type-checks and executes it.
from persevere.plumbing.dsl import Program, Agent, Filter, Map, F, PlumbBody
from persevere.plumbing.dsl import Record, SumType, ProductType, ListType, NamedType, Bare
from persevere.plumbing.dsl import pipe, tensor, PortRef, Raw
Types
Python types map to plumbing types automatically:
| Python | Plumbing | DSL class |
|---|---|---|
str |
string |
Primitive("string") |
int |
int |
Primitive("int") |
float |
number |
Primitive("number") |
bool |
bool |
Primitive("bool") |
dict |
json |
Primitive("json") |
None |
unit |
Primitive("unit") |
list[T] |
[T] |
ListType(T) |
Pydantic BaseModel |
TypeName |
NamedType("TypeName") |
Compound types:
| DSL | Renders as |
|---|---|
Record([("name", str), ("age?", int, True)]) |
{ name: string, age?: int } |
SumType([str, Record([("x", int)])]) |
string \| { x: int } |
ProductType([str, int]) |
(string, int) |
StreamType(T) |
!T |
NamedType("Foo") |
Foo |
Pydantic BaseModel subclasses used in type positions are
auto-registered as type declarations. No explicit prog.type() needed.
Processes
| DSL | Renders as |
|---|---|
Agent(model="claude-haiku-4-5", prompts=[...]) |
agent { model: "claude-haiku-4-5", prompts: [...] } |
Tool(process="helper") |
tool { process: "helper" } |
Filter(F.score >= 85) |
filter(score >= 85) |
Filter(F.verdict == False) |
filter(verdict = false) |
Filter(expr).project("field") |
filter(expr).field |
Map(F.score, F.review) |
map({score: score, review: review}) |
Map(quality=F.score) |
map({quality: score}) |
Id() / ID |
id |
Copy() / COPY |
copy |
Merge() / MERGE |
merge |
Discard() / DISCARD |
discard |
Barrier() / BARRIER |
barrier |
Project(n) |
project(n) |
Raw("text") |
text (verbatim) |
Agent accepts: model (required), provider (default "anthropic"),
prompt, prompts, tools, mcp, plus any extra keys as **kwargs.
Composition
| DSL | Renders as | Meaning |
|---|---|---|
Pipe(a, b, c) / a >> b >> c |
a ; b ; c |
Sequential |
Tensor(a, b) / a * b |
(a * b) |
Parallel |
PortRef("p", "ctrl_in") |
p@ctrl_in |
Port address |
PlumbBody
Wiring topology for plumb(ports) { ... } definitions:
body = PlumbBody("input", "output")
body.channel("ch", str) # let ch : !string = channel
body.spawn("proc", "in", "out") # spawn proc(in, out)
body.wire("input", "composer", "checker") # input ; composer ; checker
body.wire("judge", "cool", # judge ; cool ; (a@ctrl * b@ctrl)
tensor(PortRef("a", "ctrl"),
PortRef("b", "ctrl")))
Wire elements: strings (names), Process/Expr instances (rendered), PortRef (port addressing). The compiler validates wiring — no Python-side port checking.
Program
prog = Program()
prog.module("Name") # module Name
prog.use("fs") # use fs
prog.type("Foo", Record([...])) # type Foo = { ... }
prog.let("name", # let name : !In -> !Out = impl
input=str, output=str, # auto stream-wrapped
impl=Agent(...), # or PlumbBody, or any Process
budget=100000, oitl="confirm") # annotations as **kwargs
source = prog.render() # .plumb source text
result = prog.check() # CheckResult via plumb-check
pipeline = await prog.run() # async Pipeline context manager
results = prog.call_sync(data) # synchronous one-shot call
Port types are auto-wrapped in StreamType. Use Bare(T) to suppress.
ProductType factors are wrapped individually: ProductType([str, int])
becomes (!string, !int).
Example: Composer-Checker-Critic
from pydantic import BaseModel
from persevere.plumbing.dsl import *
class Verdict(BaseModel):
verdict: bool
commentary: str
draft: str
class Review(BaseModel):
score: int
review: str
draft: str
prog = Program()
prog.let("composer",
input=SumType([str,
Record([("verdict", bool), ("commentary", str)]),
Record([("score", int), ("review", str)])]),
output=str,
impl=Agent(model="claude-haiku-4-5", max_messages=4,
thinking_budget=1024, prompts=["system.md"]))
prog.let("checker", input=str, output=Verdict,
impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))
prog.let("critic", input=Verdict, output=Review,
impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))
body = PlumbBody("input", "output")
body.wire("input", "composer", "checker")
body.wire("checker", Filter(F.verdict == False),
Map(F.verdict, F.commentary), "composer")
body.wire("checker", Filter(F.verdict == True), "critic")
body.wire("critic", Filter(F.score < 85),
Map(F.score, F.review), "composer")
body.wire("critic", Filter(F.score >= 85).project("draft"), "output")
prog.let("main", input=str, output=str, impl=body)
Example: Heated Debate (tensor + control ports)
prog = Program()
prog.type("Verdict", Record([("resolved", bool), ("verdict", str),
("topic", str), ("heat", float)]))
prog.type("Control", Record([("set_temp", float)]))
prog.let("advocate", input=ProductType([str, NamedType("Control")]),
output=str, impl=Agent(model="claude-haiku-4-5", max_messages=8))
prog.let("skeptic", input=ProductType([str, NamedType("Control")]),
output=str, impl=Agent(model="claude-haiku-4-5", max_messages=8))
prog.let("judge", input=ProductType([str, str]),
output=NamedType("Verdict"),
impl=Agent(model="claude-haiku-4-5", max_messages=8))
prog.let("cool", input=NamedType("Verdict"),
output=NamedType("Control"), impl=Map({"set_temp": "heat"}))
body = PlumbBody("input", "output")
body.wire("input", tensor(Raw("advocate"), Raw("skeptic")),
"barrier", "judge")
body.wire("judge", Filter(F.resolved == False).project("topic"),
tensor(Raw("advocate"), Raw("skeptic")))
body.wire("judge", Filter(F.resolved == True).project("verdict"), "output")
body.wire("judge", "cool",
tensor(PortRef("advocate", "ctrl_in"),
PortRef("skeptic", "ctrl_in")))
prog.let("main", input=str, output=str, impl=body)