compact

Conversation with automatic context compaction

compact/compact.plumb
(* An agent with automatic context compaction.
   When the conversation context grows past a token threshold,
   the compaction observer pauses the agent, retrieves its memory,
   compresses it with a summariser, and resumes.

   Run:
     plumb examples/compact/compact.plumb

   Or interactively:
     chat examples/compact/compact.plumb

   Then have a long conversation — after ~150k input tokens,
   the compaction protocol fires automatically. *)

use mem

(* The main agent — a researcher with control and telemetry ports.
   CtrlCmd/CtrlResp types match the Compaction protocol boundary. *)
let researcher : (!string, !Mem.CtrlCmd) -> (!string, !json) = agent {
  provider: "anthropic",
  model: "claude-haiku-4-5",
  prompts: ["./researcher.md"],
  runtime_context: true
}

(* Main pipeline: wire the agent's control/telemetry to the compaction
   observer so context compaction happens automatically. *)
let main : !string -> !string = plumb(input, output) {
  let ctrl : !Mem.CtrlCmd = channel
  let ctrl_out : !Mem.CtrlResp = channel
  let telem : !json = channel

  spawn researcher(input=input, ctrl_in=ctrl, output=output, ctrl_out=ctrl_out, telemetry=telem)
  spawn Mem.compact(ctrl_out=ctrl_out, telemetry=telem, ctrl_in=ctrl)
}
compact/compact_eliza.plumb
(* ELIZA compaction test — no API key required.

   Uses ELIZA as the researcher agent and a trivial pass-through as
   the compressor (ELIZA cannot produce valid SetMemory, so the
   compressor here is also ELIZA — it will produce a string response
   that fails SetMemory validation, exposing protocol error paths).

   Run:
     echo '"I feel sad today"' '"My mother never understood me"' '"Perhaps you are right"' | \
       PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
       _build/default/bin/plumb/main.exe \
       examples/compact/compact_eliza.plumb

   Or interactively:
     PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
       _build/default/bin/chat/main.exe \
       examples/compact/compact_eliza.plumb *)

(* Command types — sent to the agent's control channel *)
type Pause = { pause: bool }
type GetMemory = { get_memory: bool }
type Resume = { resume: bool }

(* Typed message — content is json because user messages are strings
   while assistant messages are arrays of content blocks. *)
type Message = { role: string, content: json }
type SetMemory = { set_memory: [Message] }

(* Response types — sent back from the agent on ctrl_out *)
type PauseAck = { kind: string }
type PinnedEntry = { name: string, arguments: json, content: string }
type MemoryDump = { kind: string, messages: [Message], pinned: [PinnedEntry] }
type SetMemoryAck = { kind: string, old_messages: int, new_messages: int }
type ResumeAck = { kind: string, resumed: bool }

(* Sum types for the protocol boundary *)
type CtrlCmd = Pause | GetMemory | SetMemory | Resume
type CtrlResp = PauseAck | MemoryDump | SetMemoryAck | ResumeAck

protocol Compaction =
  send Pause . recv PauseAck .
  send GetMemory . recv MemoryDump .
  send SetMemory . recv SetMemoryAck .
  send Resume . recv ResumeAck . end

(* Compressor — ELIZA will produce a generic string response, which
   will fail validation against SetMemory.  This tests the error
   path in the compaction protocol. *)
let compressor : !MemoryDump -> !SetMemory = agent {
  provider: "eliza",
  model: "doctor"
}

(* Researcher agent — ELIZA, no API key *)
let researcher : (!string, !CtrlCmd) -> (!string, !json) = agent {
  provider: "eliza",
  model: "doctor"
}

(* Compaction observer — threshold at 0 tokens so compaction fires
   on the very first usage event. *)
let compact : (!CtrlResp, !json) -> !CtrlCmd =
  plumb(ctrl_out, telemetry, ctrl_in) {

  (ctrl_out, ctrl_in) <-> Compaction as session

  telemetry ; filter(kind = "usage" && prompt_tokens > 0) ;
    map(null) ; session@trigger

  session@trigger ; map({pause: true}) ; session@send(Pause)
  session@done(PauseAck) ; map({get_memory: true}) ; session@send(GetMemory)
  session@recv(MemoryDump) ; compressor ; session@send(SetMemory)
  session@done(SetMemoryAck) ; map({resume: true}) ; session@send(Resume)
}

(* Main pipeline *)
let main : !string -> !string = plumb(input, output) {
  let ctrl : !CtrlCmd = channel
  let ctrl_out : !CtrlResp = channel
  let telem : !json = channel

  spawn researcher(input=input, ctrl_in=ctrl, output=output, ctrl_out=ctrl_out, telemetry=telem)
  spawn compact(ctrl_out=ctrl_out, telemetry=telem, ctrl_in=ctrl)
}
compact/compact_test.plumb
(* Standalone compaction test with a very low token threshold.
   Inlines all machinery from stdlib/mem.plumb so we can
   modify the threshold without touching the shared library.

   Run (piped — should fire compaction then exit):
     echo '"tell me about the history of computing"' | \
       PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
       ~/bin/antrun _build/default/bin/plumb/main.exe \
       examples/compact/compact_test.plumb

   Or interactively:
     PLUMB_PATH=stdlib PLUMB_RESOURCES=prompts \
       ~/bin/antrun _build/default/bin/chat/main.exe \
       examples/compact/compact_test.plumb *)

(* Command types — sent to the agent's control channel *)
type Pause = { pause: bool }
type GetMemory = { get_memory: bool }
type Resume = { resume: bool }

(* Typed message — content is json because user messages are strings
   while assistant messages are arrays of content blocks. *)
type Message = { role: string, content: json }
type SetMemory = { set_memory: [Message] }

(* Response types — sent back from the agent on ctrl_out *)
type PauseAck = { kind: string }
type PinnedEntry = { name: string, arguments: json, content: string }
type MemoryDump = { kind: string, messages: [Message], pinned: [PinnedEntry] }
type SetMemoryAck = { kind: string, old_messages: int, new_messages: int }
type ResumeAck = { kind: string, resumed: bool }

(* Sum types for the protocol boundary *)
type CtrlCmd = Pause | GetMemory | SetMemory | Resume
type CtrlResp = PauseAck | MemoryDump | SetMemoryAck | ResumeAck

protocol Compaction =
  send Pause . recv PauseAck .
  send GetMemory . recv MemoryDump .
  send SetMemory . recv SetMemoryAck .
  send Resume . recv ResumeAck . end

(* Compressor agent — summarises conversation history.
   The compactor prompt instructs wrapping summaries in <context-summary>
   tags — currently prompt-enforced; future work: structural validation. *)
let compressor : !MemoryDump -> !SetMemory = agent {
  provider: "anthropic",
  model: "claude-haiku-4-5",
  prompts: ["compactor.md"],
  amnesiac: true
}

(* Researcher agent with control and telemetry ports *)
let researcher : (!string, !CtrlCmd) -> (!string, !json) = agent {
  provider: "anthropic",
  model: "claude-haiku-4-5",
  prompts: ["./researcher.md"],
  runtime_context: true
}

(* Compaction observer — LOW THRESHOLD (100 tokens) for testing.
   The stdlib uses 150000; here we force compaction to fire on the
   first response so we can verify the protocol works end-to-end. *)
let compact : (!CtrlResp, !json) -> !CtrlCmd =
  plumb(ctrl_out, telemetry, ctrl_in) {

  (ctrl_out, ctrl_in) <-> Compaction as session

  telemetry ; filter(kind = "usage" && prompt_tokens > 0) ;
    map(null) ; session@trigger

  session@trigger ; map({pause: true}) ; session@send(Pause)
  session@done(PauseAck) ; map({get_memory: true}) ; session@send(GetMemory)
  session@recv(MemoryDump) ; compressor ; session@send(SetMemory)
  session@done(SetMemoryAck) ; map({resume: true}) ; session@send(Resume)
}

(* Main pipeline *)
let main : !string -> !string = plumb(input, output) {
  let ctrl : !CtrlCmd = channel
  let ctrl_out : !CtrlResp = channel
  let telem : !json = channel

  spawn researcher(input=input, ctrl_in=ctrl, output=output, ctrl_out=ctrl_out, telemetry=telem)
  spawn compact(ctrl_out=ctrl_out, telemetry=telem, ctrl_in=ctrl)
}
compact/compact.py
"""Multi-turn conversation with automatic context compaction.

Demonstrates long-running pipelines where the runtime compacts
the agent's context when it grows past a token threshold.

Requires: ANTHROPIC_API_KEY

Usage:
    ANTHROPIC_API_KEY=sk-... python compact.py
"""

import asyncio
from pathlib import Path

import plumbing as pb


async def main() -> None:
    spec = Path(__file__).parent / "compact.plumb"
    async with await pb.run(spec, verbose=True) as pipeline:
        messages = [
            "What is the capital of France?",
            "Tell me about its history.",
            "What about its architecture?",
        ]
        for msg in messages:
            print(f"> {msg}")
            await pipeline.send(msg)
            response = await pipeline.recv()
            if response is None:
                break
            print(f"\n{response}\n")


if __name__ == "__main__":
    asyncio.run(main())
compact/researcher.md

You are a research assistant. You help users explore topics in depth by answering questions, explaining concepts, and building on the conversation.

Be thorough and detailed in your responses. Draw connections between ideas when relevant. If you are unsure about something, say so rather than speculating.

Your conversation history may be periodically compacted to manage context length. This happens automatically and transparently — you will not notice it directly, but your memory of earlier messages may be summarised rather than verbatim. This is normal and expected.