Python DSL Reference

Construct plumbing programs in Python. The DSL renders valid .plumb source text; the OCaml toolchain type-checks and executes it.

from persevere.plumbing.dsl import Program, Agent, Filter, Map, F, PlumbBody
from persevere.plumbing.dsl import Record, SumType, ProductType, ListType, NamedType, Bare
from persevere.plumbing.dsl import pipe, tensor, PortRef, Raw

Types

Python types map to plumbing types automatically:

Python Plumbing DSL class
str string Primitive("string")
int int Primitive("int")
float number Primitive("number")
bool bool Primitive("bool")
dict json Primitive("json")
None unit Primitive("unit")
list[T] [T] ListType(T)
Pydantic BaseModel TypeName NamedType("TypeName")

Compound types:

DSL Renders as
Record([("name", str), ("age?", int, True)]) { name: string, age?: int }
SumType([str, Record([("x", int)])]) string \| { x: int }
ProductType([str, int]) (string, int)
StreamType(T) !T
NamedType("Foo") Foo

Pydantic BaseModel subclasses used in type positions are auto-registered as type declarations. No explicit prog.type() needed.

Processes

DSL Renders as
Agent(model="claude-haiku-4-5", prompts=[...]) agent { model: "claude-haiku-4-5", prompts: [...] }
Tool(process="helper") tool { process: "helper" }
Filter(F.score >= 85) filter(score >= 85)
Filter(F.verdict == False) filter(verdict = false)
Filter(expr).project("field") filter(expr).field
Map(F.score, F.review) map({score: score, review: review})
Map(quality=F.score) map({quality: score})
Id() / ID id
Copy() / COPY copy
Merge() / MERGE merge
Discard() / DISCARD discard
Barrier() / BARRIER barrier
Project(n) project(n)
Raw("text") text (verbatim)

Agent accepts: model (required), provider (default "anthropic"), prompt, prompts, tools, mcp, plus any extra keys as **kwargs.

Composition

DSL Renders as Meaning
Pipe(a, b, c) / a >> b >> c a ; b ; c Sequential
Tensor(a, b) / a * b (a * b) Parallel
PortRef("p", "ctrl_in") p@ctrl_in Port address

PlumbBody

Wiring topology for plumb(ports) { ... } definitions:

body = PlumbBody("input", "output")
body.channel("ch", str)                    # let ch : !string = channel
body.spawn("proc", "in", "out")           # spawn proc(in, out)
body.wire("input", "composer", "checker")  # input ; composer ; checker
body.wire("judge", "cool",                # judge ; cool ; (a@ctrl * b@ctrl)
          tensor(PortRef("a", "ctrl"),
                 PortRef("b", "ctrl")))

Wire elements: strings (names), Process/Expr instances (rendered), PortRef (port addressing). The compiler validates wiring — no Python-side port checking.

Program

prog = Program()
prog.module("Name")                    # module Name
prog.use("fs")                         # use fs
prog.type("Foo", Record([...]))        # type Foo = { ... }
prog.let("name",                       # let name : !In -> !Out = impl
    input=str, output=str,             # auto stream-wrapped
    impl=Agent(...),                   # or PlumbBody, or any Process
    budget=100000, oitl="confirm")     # annotations as **kwargs

source = prog.render()                 # .plumb source text
result = prog.check()                  # CheckResult via plumb-check
pipeline = await prog.run()            # async Pipeline context manager
results = prog.call_sync(data)         # synchronous one-shot call

Port types are auto-wrapped in StreamType. Use Bare(T) to suppress. ProductType factors are wrapped individually: ProductType([str, int]) becomes (!string, !int).

Example: Composer-Checker-Critic

from pydantic import BaseModel
from persevere.plumbing.dsl import *

class Verdict(BaseModel):
    verdict: bool
    commentary: str
    draft: str

class Review(BaseModel):
    score: int
    review: str
    draft: str

prog = Program()

prog.let("composer",
    input=SumType([str,
        Record([("verdict", bool), ("commentary", str)]),
        Record([("score", int), ("review", str)])]),
    output=str,
    impl=Agent(model="claude-haiku-4-5", max_messages=4,
               thinking_budget=1024, prompts=["system.md"]))

prog.let("checker", input=str, output=Verdict,
    impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))

prog.let("critic", input=Verdict, output=Review,
    impl=Agent(model="claude-haiku-4-5", thinking_budget=1024))

body = PlumbBody("input", "output")
body.wire("input", "composer", "checker")
body.wire("checker", Filter(F.verdict == False),
          Map(F.verdict, F.commentary), "composer")
body.wire("checker", Filter(F.verdict == True), "critic")
body.wire("critic", Filter(F.score < 85),
          Map(F.score, F.review), "composer")
body.wire("critic", Filter(F.score >= 85).project("draft"), "output")
prog.let("main", input=str, output=str, impl=body)

Example: Heated Debate (tensor + control ports)

prog = Program()
prog.type("Verdict", Record([("resolved", bool), ("verdict", str),
                              ("topic", str), ("heat", float)]))
prog.type("Control", Record([("set_temp", float)]))

prog.let("advocate", input=ProductType([str, NamedType("Control")]),
    output=str, impl=Agent(model="claude-haiku-4-5", max_messages=8))
prog.let("skeptic", input=ProductType([str, NamedType("Control")]),
    output=str, impl=Agent(model="claude-haiku-4-5", max_messages=8))
prog.let("judge", input=ProductType([str, str]),
    output=NamedType("Verdict"),
    impl=Agent(model="claude-haiku-4-5", max_messages=8))
prog.let("cool", input=NamedType("Verdict"),
    output=NamedType("Control"), impl=Map({"set_temp": "heat"}))

body = PlumbBody("input", "output")
body.wire("input", tensor(Raw("advocate"), Raw("skeptic")),
          "barrier", "judge")
body.wire("judge", Filter(F.resolved == False).project("topic"),
          tensor(Raw("advocate"), Raw("skeptic")))
body.wire("judge", Filter(F.resolved == True).project("verdict"), "output")
body.wire("judge", "cool",
          tensor(PortRef("advocate", "ctrl_in"),
                 PortRef("skeptic", "ctrl_in")))
prog.let("main", input=str, output=str, impl=body)