Documentation Index
Fetch the complete documentation index at: https://lab.pollack.ai/llms.txt
Use this file to discover all available pages before exploring further.
Step<I, O>
The atomic unit of a workflow.
public interface Step<I, O> {
String name();
O execute(AgentContext ctx, I input);
default AgentContext updateContext(AgentContext ctx, O output) { return ctx; }
default Class<?> inputType() { return Object.class; }
default Class<?> outputType() { return Object.class; }
static <I,O> Step<I,O> named(String name, BiFunction<AgentContext, I, O> fn);
static <T> Step<T,T> noop(); // anonymous pass-through
static <T> Step<T,T> noop(String name); // named pass-through — referenceable in IR
}
updateContext() — override to publish side-channel metadata alongside the primary output. Called by the executor after execute(). The output parameter is the value returned by execute() — derive metadata from it directly rather than capturing state in fields:
class ClassifierStep implements Step<String, ClassificationResult> {
@Override
public ClassificationResult execute(AgentContext ctx, String input) {
return runModel(input); // rich result carries all data
}
@Override
public AgentContext updateContext(AgentContext ctx, ClassificationResult output) {
return ctx.mutate()
.with(CONFIDENCE, output.confidence())
.with(DETECTED_LABEL, output.label())
.build();
}
}
The primary output (ClassificationResult) flows forward as the next step’s input; metadata (confidence, label) flows via context keys. Default implementation returns ctx unchanged.
AgentStep marker — implement on steps that make LLM calls for correct NodeType.AGENT cost tracking:
public interface AgentStep { /* marker — no methods */ }
Built-in Step Types
| Type | Factory | What it wraps |
|---|
| Lambda | Step.named("n", (ctx, in) -> ...) | Any function |
| Chat | ChatClientStep.of(chat, "template {input}") | Single Spring AI call |
| Claude | ClaudeStep.of("template").workingDirectory(path) | Full Claude CLI session |
| A2A | A2AStep.of(url) | Remote agent via Agent-to-Agent protocol |
| Function | Steps.of(fn) | Pure deterministic function |
| Retry | Steps.retrying(3, step) | Retry wrapper |
| Output ref | Steps.outputOf("step-name") | Read prior step result from context |
| Terminate | Steps.terminate(status, msg) | Early workflow exit |
ClaudeStep options
ClaudeStep.of("Fix the tests in {input}")
.workingDirectory(projectPath)
.permissionMode(PermissionMode.ACCEPT_EDITS)
.withMcp(mcpConfig)
.withA2a(a2aEndpoint);
Workflow Builder
Workflow.<I, O>define("name")
.step(step) // first step
.then(step) // chain
.branch(predicate).then(a).otherwise(b) // conditional
.gather(step1, step2) // homogeneous fan-out → List<Object>
.parallel(step1, step2) // enrichment fan-out → fork input unchanged, branches write to context
.parallel(itemsSupplier, stepFactory) // dynamic fan-out
.repeatUntil(predicate).step(a).end() // while-do
.repeatUntilOutput(predicate).step(a).end() // do-while
.decision(chat).option("a", step).end() // LLM routing
.gate(gate).onPass(a).onFail(b).maxRetries(3).end() // quality gate
.onError(ExType.class, recovery) // error recovery
.run(input) // execute
.run(input, RunOptions.maxCost(5.0)) // with constraints
.compile() // → WorkflowGraph (IR only)
.build() // → Workflow (implements Step)
Named shortcuts:
Workflow.sequential("name").step(a).then(b).run(input);
Workflow.loop("name").step(a).times(5).run(input);
Workflow.loop("name").step(a).until(predicate).run(input);
Workflow.parallel("name").step(a, b, c).run(input);
Workflow.supervisor("name", chat).agents(a, b).until(pred).run(input);
Gate
public interface Gate<O> {
GateDecision evaluate(AgentContext ctx, O output);
}
GateDecision: PASS, FAIL, ESCALATE, TIMEOUT
Implementations
| Gate | Description |
|---|
JudgeGate(jury, threshold) | Automated — passes when agent-judge score >= threshold |
TieredGate(jury, high, low) | >= high → PASS, >= low → ESCALATE, < low → FAIL |
HumanGate | HITL — waits for external signal with durable timeout |
Gate builder
.gate(new JudgeGate(jury, 0.8))
.onPass(approveStep) // required
.onFail(reviseStep) // optional
.onTimeout(fallbackStep) // optional
.withReflector(reflectorStep) // Verdict → feedback text for retry
.maxRetries(3) // cap
.end()
On failure, Verdict written to AgentContext.JUDGE_VERDICT. Reflector transforms verdict into constructive feedback for the retry step.
Gate.updateContext()
Gates implement updateContext() just like steps do. Override it to write the gate’s assessment to AgentContext — both the onPass and onFail branches can then read it:
class AssessCodeQualityGate implements Gate<PrContext> {
static final ContextKey<Assessment> QUALITY_ASSESSMENT = ContextKey.of("quality-assessment", Assessment.class);
private Assessment assessment;
@Override
public GateDecision evaluate(AgentContext ctx, PrContext pr) {
assessment = runJudge(pr);
return assessment.score() >= threshold ? GateDecision.PASS : GateDecision.FAIL;
}
@Override
public AgentContext updateContext(AgentContext ctx, PrContext pr) {
return ctx.mutate().with(QUALITY_ASSESSMENT, assessment).build();
}
}
AgentContext
Immutable, threaded through every step. Copy-on-write via mutate().
Well-known Keys
| Key | Type | Description |
|---|
WORKFLOW_RUN_ID | String | Unique execution ID |
WORKFLOW_NAME | String | Workflow name |
CURRENT_STEP | String | Active step name |
ITERATION_COUNT | Integer | Loop counter |
ACCUMULATED_COST | Double | USD spend so far |
ACCUMULATED_TOKENS | Long | Total tokens |
JUDGE_VERDICT | Verdict | Gate failure verdict |
JUDGE_REFLECTION | String | Reflector feedback text |
// Read
ctx.get(AgentContext.ITERATION_COUNT) // Optional<Integer>
ctx.require(AgentContext.ITERATION_COUNT) // throws if missing
// Write (returns new context)
ctx.mutate().with(key, value).build()
ContextKey<T> implements Bloch’s Typesafe Heterogeneous Container (Effective Java Item 33).
mergeFrom
AgentContext merged = base.mergeFrom(donor);
Overlays all entries from donor onto base. Donor wins on key conflict. Returns a new immutable instance. Used internally by WorkflowExecutor to propagate sub-workflow context writes back to the parent — you rarely call this directly, but it is part of the public API if you need manual context composition.
Sub-workflow Composition
A Workflow implements Step, so it can be used anywhere a step is expected — as a sequential step, inside a branch, inside a gate path, or inside a parallel branch. The executor handles the nesting transparently:
Workflow<PrContext, AssessmentResult> aiAssessment = Workflow.<PrContext, AssessmentResult>define("ai-assessment")
.step(assessCodeQuality) // updateContext() writes QUALITY_ASSESSMENT
.then(assessBackport) // updateContext() writes BACKPORT_ASSESSMENT
.build();
Workflow.<Integer, Path>define("pr-review")
.step(fetchPrContext) // writes PR_CONTEXT to ctx
.branch(skipAi)
.then(skipStep)
.otherwise(aiAssessment) // sub-workflow — context writes propagate back
.then(qualityJudge) // reads QUALITY_ASSESSMENT and BACKPORT_ASSESSMENT ✓
.run(prNumber);
How it works: The executor detects step instanceof Workflow and runs it inline rather than dispatching through the StepRunner. The sub-workflow receives the parent’s current context, executes normally, and the final context (including all updateContext() writes from every nested step) is merged back into the parent via mergeFrom(). Leaf steps still go through the StepRunner (so TemporalStepRunner dispatch works correctly for individual steps).
Nesting is unlimited: sub-workflows can contain sub-workflows. Each level merges its context writes back up the chain.
Parallel branches: context writes from parallel sub-workflow branches are merged at the join node. If two branches write to the same key, the last branch to complete wins.
RunOptions
RunOptions.maxCost(5.0)
RunOptions.maxIterations(50)
RunOptions.maxDuration(Duration.ofMinutes(10))
RunOptions.unlimited()
// Chain
RunOptions.maxCost(5.0).withMaxIterations(50).withMaxDuration(Duration.ofMinutes(10))
WorkflowGraph (IR)
Pure data structure — no execution logic, no Spring AI imports.
WorkflowGraph<I, O> graph = workflow.compile();
graph.name() // String
graph.nodes() // List<WorkflowNode>
graph.edges() // List<WorkflowEdge>
graph.startNode() // String
graph.finishNode() // String
Node Types (sealed)
| Node | Description |
|---|
StepNode | Regular step execution (AGENT or DETERMINISTIC) |
GatewayNode | Predicate branch |
DecisionNode | LLM routing |
GateNode | Quality gate with reflector and retry |
LoopEntryNode | While-do entry |
LoopCheckNode | Do-while condition |
LoopExitNode | Loop resume point |
ForkNode | Parallel split |
JoinNode | Convergence |
Edge Conditions (sealed)
| Condition | From |
|---|
Unconditional | Sequential |
BooleanGuard(true/false) | GatewayNode |
OptionMatch("name") | DecisionNode |
GateMatch(PASS/FAIL/ESCALATE) | GateNode |
BranchIndex(i) | ForkNode |
ErrorMatch(ExType) | Error edges |
LoopContinue / LoopExit | Loop nodes |
BackEdge(condition) | backTo() cyclic edges |
Type Checking
WorkflowGraphAssert.assertTypeCompatible(graph) walks a compiled graph and checks that each step’s declared output type is assignable to the next step’s declared input type. Catches ClassCastException-style bugs at test time.
Opt-in: Steps that override inputType() / outputType() participate. Lambda steps and Step.named() return Object.class by default and are silently skipped — no false positives.
// Typed steps declare their types
static class ClassifyStep implements Step<String, ClassificationResult> {
@Override public Class<?> inputType() { return String.class; }
@Override public Class<?> outputType() { return ClassificationResult.class; }
// ...
}
static class FormatResultStep implements Step<ClassificationResult, String> {
@Override public Class<?> inputType() { return ClassificationResult.class; }
@Override public Class<?> outputType() { return String.class; }
// ...
}
// Compile and check — no LLM calls needed
WorkflowGraph<String, Object> graph = Workflow.<String, Object>define("pipeline")
.step(new ClassifyStep(chat))
.then(new FormatResultStep())
.compile();
WorkflowGraphAssert.assertTypeCompatible(graph); // passes
If types don’t match:
.step(new ClassifyStep(chat)) // outputs ClassificationResult
.then(new IntegerConsumer()) // expects Integer — MISMATCH
// Throws:
// TypeIncompatibleException: step 'classify' outputs ClassificationResult
// but step 'int-consumer' expects Integer
Untyped steps (lambdas) break the typed chain — no check across the gap:
.step(new ClassifyStep(chat)) // typed
.then(Step.named("transform", (ctx, in) -> in.toString())) // untyped — skipped
.then(new IntegerConsumer()) // no check — prior step untyped
Use in CI to validate workflow structure without making LLM calls.
WorkflowAbortException
Thrown by a step to abort the workflow and return a typed result. Unlike an unhandled exception (which propagates as an error), WorkflowAbortException is caught by the executor and its carried result becomes the workflow’s output:
class QualityGateStep implements Step<ReviewResult, Path> {
@Override
public Path execute(AgentContext ctx, ReviewResult result) {
if (result.isTooRisky()) {
// abort — caller receives the error report, not an exception
throw new WorkflowAbortException(writeErrorReport(result));
}
return writeApprovalReport(result);
}
}
Use WorkflowAbortException when a step detects an unrecoverable condition but can still produce a meaningful output — for example, a typed error response, a failure report, or a structured rejection. Prefer it over returning null or throwing a raw exception when the caller needs to distinguish “completed with degraded output” from “crashed.”
StepRunner
The substrate swap seam. Same workflow code, different durability:
@Bean StepRunner stepRunner() {
return new LocalStepRunner(); // in-process, zero overhead
}
Three runners are available:
| Runner | What it adds | Status |
|---|
LocalStepRunner | Direct in-process execution, zero overhead | Available |
CheckpointingStepRunner | JDBC crash recovery via workflow-batch | Available |
TemporalStepRunner | Distributed durable execution via workflow-temporal | Available |
Same workflow code — swap the @Bean, not the workflow. See Durability for setup and crash-recovery examples.
Operator retry (CheckpointManager): FAILED steps are not retried automatically — the system holds them until an operator decides the failure was transient. CheckpointManager.resetFailedSteps(runId) deletes FAILED records; re-running with the same runId then skips COMPLETED steps and retries the reset ones.
CheckpointManager manager = new CheckpointManager(readRepo, writeRepo);
manager.getRunState("run-1"); // inspect: what completed, what failed
manager.resetFailedSteps("run-1"); // delete FAILED records (only if transient)
executor.execute(graph, ctx, input); // retry — COMPLETED skipped, reset steps re-run
Sub-workflows are always inline: a Workflow used as a step bypasses the StepRunner entirely and runs in-process. Only leaf steps go through the runner. This is intentional — TemporalStepRunner dispatches to a separate activity worker thread and cannot carry full parent context; sub-workflows must stay in-process to propagate context correctly.
TraceRecorder
Records every step transition:
record StepTransition(
String fromStep, String toStep,
Duration duration, int tokensUsed,
double costUsd, Instant timestamp
) {}
TraceRecorder.noop() is the default — zero overhead unless opted in. Trace data feeds Markov analysis, run diagnosis, and replay.
Module Structure
agent-workflow/
├── workflow-api/ # Step, AgentContext, ContextKey, Gate, RunOptions
├── workflow-core/ # WorkflowGraph, WorkflowExecutor, StepRunner, TraceRecorder
├── workflow-flows/ # Workflow DSL, ClaudeStep, ChatClientStep, A2AStep, JudgeGate
├── workflow-batch/ # CheckpointingStepRunner, JdbcTraceRecorder, JPA entities
├── workflow-temporal/ # TemporalStepRunner, StepActivityImpl
├── workflow-journal/ # JournalContextPolicy (agent-journal backed)
├── workflow-mcp/ # McpAgentToolRegistrar
├── workflow-tools/ # BashTool, ReadTool, WriteTool, EditTool, GlobTool, GrepTool
├── workflow-agents/ # AgentLoop
└── workflow-examples/ # Example pipelines