compact
Conversation with automatic context compaction
compact/compact.plumb
(* An agent with automatic context compaction.
When the conversation context grows past a token threshold,
the compaction observer pauses the agent, retrieves its memory,
compresses it with a summariser, and resumes.
Run:
plumb examples/compact/compact.plumb
Or interactively:
chat examples/compact/compact.plumb
Then have a long conversation — after ~150k input tokens,
the compaction protocol fires automatically. *)
use mem
(* The main agent — a researcher with control and telemetry ports.
CtrlCmd/CtrlResp types match the Compaction protocol boundary. *)
let researcher : (!string, !Mem.CtrlCmd) -> (!string, !json) = agent {
provider: "anthropic",
model: "claude-haiku-4-5",
prompts: ["./researcher.md"],
runtime_context: true
}
(* Main pipeline: wire the agent's control/telemetry to the compaction
observer so context compaction happens automatically. *)
let main : !string -> !string = plumb(input, output) {
let ctrl : !Mem.CtrlCmd = channel
let ctrl_out : !Mem.CtrlResp = channel
let telem : !json = channel
spawn researcher(input=input, ctrl_in=ctrl, output=output, ctrl_out=ctrl_out, telemetry=telem)
spawn Mem.compact(ctrl_out=ctrl_out, telemetry=telem, ctrl_in=ctrl)
}
compact/compact_eliza.plumb
(* ELIZA compaction test — no API key required.
Uses ELIZA as the researcher agent and a trivial pass-through as
the compressor (ELIZA cannot produce valid SetMemory, so the
compressor here is also ELIZA — it will produce a string response
that fails SetMemory validation, exposing protocol error paths).
Run:
echo '"I feel sad today"' '"My mother never understood me"' '"Perhaps you are right"' | \
PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
_build/default/bin/plumb/main.exe \
examples/compact/compact_eliza.plumb
Or interactively:
PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
_build/default/bin/chat/main.exe \
examples/compact/compact_eliza.plumb *)
(* Command types — sent to the agent's control channel *)
type Pause = { pause: bool }
type GetMemory = { get_memory: bool }
type Resume = { resume: bool }
(* Typed message — content is json because user messages are strings
while assistant messages are arrays of content blocks. *)
type Message = { role: string, content: json }
type SetMemory = { set_memory: [Message] }
(* Response types — sent back from the agent on ctrl_out *)
type PauseAck = { kind: string }
type PinnedEntry = { name: string, arguments: json, content: string }
type MemoryDump = { kind: string, messages: [Message], pinned: [PinnedEntry] }
type SetMemoryAck = { kind: string, old_messages: int, new_messages: int }
type ResumeAck = { kind: string, resumed: bool }
(* Sum types for the protocol boundary *)
type CtrlCmd = Pause | GetMemory | SetMemory | Resume
type CtrlResp = PauseAck | MemoryDump | SetMemoryAck | ResumeAck
protocol Compaction =
send Pause . recv PauseAck .
send GetMemory . recv MemoryDump .
send SetMemory . recv SetMemoryAck .
send Resume . recv ResumeAck . end
(* Compressor — ELIZA will produce a generic string response, which
will fail validation against SetMemory. This tests the error
path in the compaction protocol. *)
let compressor : !MemoryDump -> !SetMemory = agent {
provider: "eliza",
model: "doctor"
}
(* Researcher agent — ELIZA, no API key *)
let researcher : (!string, !CtrlCmd) -> (!string, !json) = agent {
provider: "eliza",
model: "doctor"
}
(* Compaction observer — threshold at 0 tokens so compaction fires
on the very first usage event. *)
let compact : (!CtrlResp, !json) -> !CtrlCmd =
plumb(ctrl_out, telemetry, ctrl_in) {
(ctrl_out, ctrl_in) <-> Compaction as session
telemetry ; filter(kind = "usage" && prompt_tokens > 0) ;
map(null) ; session@trigger
session@trigger ; map({pause: true}) ; session@send(Pause)
session@done(PauseAck) ; map({get_memory: true}) ; session@send(GetMemory)
session@recv(MemoryDump) ; compressor ; session@send(SetMemory)
session@done(SetMemoryAck) ; map({resume: true}) ; session@send(Resume)
}
(* Main pipeline *)
let main : !string -> !string = plumb(input, output) {
let ctrl : !CtrlCmd = channel
let ctrl_out : !CtrlResp = channel
let telem : !json = channel
spawn researcher(input=input, ctrl_in=ctrl, output=output, ctrl_out=ctrl_out, telemetry=telem)
spawn compact(ctrl_out=ctrl_out, telemetry=telem, ctrl_in=ctrl)
}
compact/compact_test.plumb
(* Standalone compaction test with a very low token threshold.
Inlines all machinery from stdlib/mem.plumb so we can
modify the threshold without touching the shared library.
Run (piped — should fire compaction then exit):
echo '"tell me about the history of computing"' | \
PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
~/bin/antrun _build/default/bin/plumb/main.exe \
examples/compact/compact_test.plumb
Or interactively:
PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
~/bin/antrun _build/default/bin/chat/main.exe \
examples/compact/compact_test.plumb *)
(* Command types — sent to the agent's control channel *)
type Pause = { pause: bool }
type GetMemory = { get_memory: bool }
type Resume = { resume: bool }
(* Typed message — content is json because user messages are strings
while assistant messages are arrays of content blocks. *)
type Message = { role: string, content: json }
type SetMemory = { set_memory: [Message] }
(* Response types — sent back from the agent on ctrl_out *)
type PauseAck = { kind: string }
type PinnedEntry = { name: string, arguments: json, content: string }
type MemoryDump = { kind: string, messages: [Message], pinned: [PinnedEntry] }
type SetMemoryAck = { kind: string, old_messages: int, new_messages: int }
type ResumeAck = { kind: string, resumed: bool }
(* Sum types for the protocol boundary *)
type CtrlCmd = Pause | GetMemory | SetMemory | Resume
type CtrlResp = PauseAck | MemoryDump | SetMemoryAck | ResumeAck
protocol Compaction =
send Pause . recv PauseAck .
send GetMemory . recv MemoryDump .
send SetMemory . recv SetMemoryAck .
send Resume . recv ResumeAck . end
(* Compressor agent — summarises conversation history.
The compactor prompt instructs wrapping summaries in <context-summary>
tags — currently prompt-enforced; future work: structural validation. *)
let compressor : !MemoryDump -> !SetMemory = agent {
provider: "anthropic",
model: "claude-haiku-4-5",
prompts: ["compactor.md"],
amnesiac: true
}
(* Researcher agent with control and telemetry ports *)
let researcher : (!string, !CtrlCmd) -> (!string, !json) = agent {
provider: "anthropic",
model: "claude-haiku-4-5",
prompts: ["./researcher.md"],
runtime_context: true
}
(* Compaction observer — LOW THRESHOLD (100 tokens) for testing.
The stdlib uses 150000; here we force compaction to fire on the
first response so we can verify the protocol works end-to-end. *)
let compact : (!CtrlResp, !json) -> !CtrlCmd =
plumb(ctrl_out, telemetry, ctrl_in) {
(ctrl_out, ctrl_in) <-> Compaction as session
telemetry ; filter(kind = "usage" && prompt_tokens > 0) ;
map(null) ; session@trigger
session@trigger ; map({pause: true}) ; session@send(Pause)
session@done(PauseAck) ; map({get_memory: true}) ; session@send(GetMemory)
session@recv(MemoryDump) ; compressor ; session@send(SetMemory)
session@done(SetMemoryAck) ; map({resume: true}) ; session@send(Resume)
}
(* Main pipeline *)
let main : !string -> !string = plumb(input, output) {
let ctrl : !CtrlCmd = channel
let ctrl_out : !CtrlResp = channel
let telem : !json = channel
spawn researcher(input=input, ctrl_in=ctrl, output=output, ctrl_out=ctrl_out, telemetry=telem)
spawn compact(ctrl_out=ctrl_out, telemetry=telem, ctrl_in=ctrl)
}
compact/compact.py
"""Multi-turn conversation with automatic context compaction.
Demonstrates long-running pipelines where the runtime compacts
the agent's context when it grows past a token threshold.
Requires: ANTHROPIC_API_KEY
Usage:
ANTHROPIC_API_KEY=sk-... python compact.py
"""
import asyncio
from pathlib import Path
import plumbing as pb
async def main() -> None:
spec = Path(__file__).parent / "compact.plumb"
async with await pb.run(spec, verbose=True) as pipeline:
messages = [
"What is the capital of France?",
"Tell me about its history.",
"What about its architecture?",
]
for msg in messages:
print(f"> {msg}")
await pipeline.send(msg)
response = await pipeline.recv()
if response is None:
break
print(f"\n{response}\n")
if __name__ == "__main__":
asyncio.run(main())
compact/researcher.md
You are a research assistant. You help users explore topics in depth by answering questions, explaining concepts, and building on the conversation.
Be thorough and detailed in your responses. Draw connections between ideas when relevant. If you are unsure about something, say so rather than speculating.
Your conversation history may be periodically compacted to manage context length. This happens automatically and transparently — you will not notice it directly, but your memory of earlier messages may be summarised rather than verbatim. This is normal and expected.