Skip to main content

Documentation Index

Fetch the complete documentation index at: https://lab.pollack.ai/llms.txt

Use this file to discover all available pages before exploring further.

Step<I, O>

The atomic unit of a workflow.
public interface Step<I, O> {
    String name();
    O execute(AgentContext ctx, I input);

    default AgentContext updateContext(AgentContext ctx, O output) { return ctx; }

    default Class<?> inputType()  { return Object.class; }
    default Class<?> outputType() { return Object.class; }

    static <I,O> Step<I,O> named(String name, BiFunction<AgentContext, I, O> fn);
    static <T> Step<T,T> noop();           // anonymous pass-through
    static <T> Step<T,T> noop(String name); // named pass-through — referenceable in IR
}
updateContext() — override to publish side-channel metadata alongside the primary output. Called by the executor after execute(). The output parameter is the value returned by execute() — derive metadata from it directly rather than capturing state in fields:
class ClassifierStep implements Step<String, ClassificationResult> {
    @Override
    public ClassificationResult execute(AgentContext ctx, String input) {
        return runModel(input);  // rich result carries all data
    }

    @Override
    public AgentContext updateContext(AgentContext ctx, ClassificationResult output) {
        return ctx.mutate()
            .with(CONFIDENCE, output.confidence())
            .with(DETECTED_LABEL, output.label())
            .build();
    }
}
The primary output (ClassificationResult) flows forward as the next step’s input; metadata (confidence, label) flows via context keys. Default implementation returns ctx unchanged. AgentStep marker — implement on steps that make LLM calls for correct NodeType.AGENT cost tracking:
public interface AgentStep { /* marker — no methods */ }

Built-in Step Types

TypeFactoryWhat it wraps
LambdaStep.named("n", (ctx, in) -> ...)Any function
ChatChatClientStep.of(chat, "template {input}")Single Spring AI call
ClaudeClaudeStep.of("template").workingDirectory(path)Full Claude CLI session
A2AA2AStep.of(url)Remote agent via Agent-to-Agent protocol
FunctionSteps.of(fn)Pure deterministic function
RetrySteps.retrying(3, step)Retry wrapper
Output refSteps.outputOf("step-name")Read prior step result from context
TerminateSteps.terminate(status, msg)Early workflow exit

ClaudeStep options

ClaudeStep.of("Fix the tests in {input}")
    .workingDirectory(projectPath)
    .permissionMode(PermissionMode.ACCEPT_EDITS)
    .withMcp(mcpConfig)
    .withA2a(a2aEndpoint);

Workflow Builder

Workflow.<I, O>define("name")
    .step(step)                                    // first step
    .then(step)                                    // chain
    .branch(predicate).then(a).otherwise(b)        // conditional
    .gather(step1, step2)                          // homogeneous fan-out → List<Object>
    .parallel(step1, step2)                        // enrichment fan-out → fork input unchanged, branches write to context
    .parallel(itemsSupplier, stepFactory)           // dynamic fan-out
    .repeatUntil(predicate).step(a).end()          // while-do
    .repeatUntilOutput(predicate).step(a).end()    // do-while
    .decision(chat).option("a", step).end()        // LLM routing
    .gate(gate).onPass(a).onFail(b).maxRetries(3).end()  // quality gate
    .onError(ExType.class, recovery)               // error recovery
    .run(input)                                     // execute
    .run(input, RunOptions.maxCost(5.0))           // with constraints
    .compile()                                      // → WorkflowGraph (IR only)
    .build()                                        // → Workflow (implements Step)
Named shortcuts:
Workflow.sequential("name").step(a).then(b).run(input);
Workflow.loop("name").step(a).times(5).run(input);
Workflow.loop("name").step(a).until(predicate).run(input);
Workflow.parallel("name").step(a, b, c).run(input);
Workflow.supervisor("name", chat).agents(a, b).until(pred).run(input);

Gate

public interface Gate<O> {
    GateDecision evaluate(AgentContext ctx, O output);
}
GateDecision: PASS, FAIL, ESCALATE, TIMEOUT

Implementations

GateDescription
JudgeGate(jury, threshold)Automated — passes when agent-judge score >= threshold
TieredGate(jury, high, low)>= high → PASS, >= low → ESCALATE, < low → FAIL
HumanGateHITL — waits for external signal with durable timeout

Gate builder

.gate(new JudgeGate(jury, 0.8))
    .onPass(approveStep)          // required
    .onFail(reviseStep)           // optional
    .onTimeout(fallbackStep)      // optional
    .withReflector(reflectorStep) // Verdict → feedback text for retry
    .maxRetries(3)                // cap
    .end()
On failure, Verdict written to AgentContext.JUDGE_VERDICT. Reflector transforms verdict into constructive feedback for the retry step.

Gate.updateContext()

Gates implement updateContext() just like steps do. Override it to write the gate’s assessment to AgentContext — both the onPass and onFail branches can then read it:
class AssessCodeQualityGate implements Gate<PrContext> {
    static final ContextKey<Assessment> QUALITY_ASSESSMENT = ContextKey.of("quality-assessment", Assessment.class);

    private Assessment assessment;

    @Override
    public GateDecision evaluate(AgentContext ctx, PrContext pr) {
        assessment = runJudge(pr);
        return assessment.score() >= threshold ? GateDecision.PASS : GateDecision.FAIL;
    }

    @Override
    public AgentContext updateContext(AgentContext ctx, PrContext pr) {
        return ctx.mutate().with(QUALITY_ASSESSMENT, assessment).build();
    }
}

AgentContext

Immutable, threaded through every step. Copy-on-write via mutate().

Well-known Keys

KeyTypeDescription
WORKFLOW_RUN_IDStringUnique execution ID
WORKFLOW_NAMEStringWorkflow name
CURRENT_STEPStringActive step name
ITERATION_COUNTIntegerLoop counter
ACCUMULATED_COSTDoubleUSD spend so far
ACCUMULATED_TOKENSLongTotal tokens
JUDGE_VERDICTVerdictGate failure verdict
JUDGE_REFLECTIONStringReflector feedback text
// Read
ctx.get(AgentContext.ITERATION_COUNT)      // Optional<Integer>
ctx.require(AgentContext.ITERATION_COUNT)   // throws if missing

// Write (returns new context)
ctx.mutate().with(key, value).build()
ContextKey<T> implements Bloch’s Typesafe Heterogeneous Container (Effective Java Item 33).

mergeFrom

AgentContext merged = base.mergeFrom(donor);
Overlays all entries from donor onto base. Donor wins on key conflict. Returns a new immutable instance. Used internally by WorkflowExecutor to propagate sub-workflow context writes back to the parent — you rarely call this directly, but it is part of the public API if you need manual context composition.

Sub-workflow Composition

A Workflow implements Step, so it can be used anywhere a step is expected — as a sequential step, inside a branch, inside a gate path, or inside a parallel branch. The executor handles the nesting transparently:
Workflow<PrContext, AssessmentResult> aiAssessment = Workflow.<PrContext, AssessmentResult>define("ai-assessment")
    .step(assessCodeQuality)    // updateContext() writes QUALITY_ASSESSMENT
    .then(assessBackport)       // updateContext() writes BACKPORT_ASSESSMENT
    .build();

Workflow.<Integer, Path>define("pr-review")
    .step(fetchPrContext)       // writes PR_CONTEXT to ctx
    .branch(skipAi)
        .then(skipStep)
        .otherwise(aiAssessment) // sub-workflow — context writes propagate back
    .then(qualityJudge)         // reads QUALITY_ASSESSMENT and BACKPORT_ASSESSMENT ✓
    .run(prNumber);
How it works: The executor detects step instanceof Workflow and runs it inline rather than dispatching through the StepRunner. The sub-workflow receives the parent’s current context, executes normally, and the final context (including all updateContext() writes from every nested step) is merged back into the parent via mergeFrom(). Leaf steps still go through the StepRunner (so TemporalStepRunner dispatch works correctly for individual steps). Nesting is unlimited: sub-workflows can contain sub-workflows. Each level merges its context writes back up the chain. Parallel branches: context writes from parallel sub-workflow branches are merged at the join node. If two branches write to the same key, the last branch to complete wins.

RunOptions

RunOptions.maxCost(5.0)
RunOptions.maxIterations(50)
RunOptions.maxDuration(Duration.ofMinutes(10))
RunOptions.unlimited()

// Chain
RunOptions.maxCost(5.0).withMaxIterations(50).withMaxDuration(Duration.ofMinutes(10))

WorkflowGraph (IR)

Pure data structure — no execution logic, no Spring AI imports.
WorkflowGraph<I, O> graph = workflow.compile();
graph.name()         // String
graph.nodes()        // List<WorkflowNode>
graph.edges()        // List<WorkflowEdge>
graph.startNode()    // String
graph.finishNode()   // String

Node Types (sealed)

NodeDescription
StepNodeRegular step execution (AGENT or DETERMINISTIC)
GatewayNodePredicate branch
DecisionNodeLLM routing
GateNodeQuality gate with reflector and retry
LoopEntryNodeWhile-do entry
LoopCheckNodeDo-while condition
LoopExitNodeLoop resume point
ForkNodeParallel split
JoinNodeConvergence

Edge Conditions (sealed)

ConditionFrom
UnconditionalSequential
BooleanGuard(true/false)GatewayNode
OptionMatch("name")DecisionNode
GateMatch(PASS/FAIL/ESCALATE)GateNode
BranchIndex(i)ForkNode
ErrorMatch(ExType)Error edges
LoopContinue / LoopExitLoop nodes
BackEdge(condition)backTo() cyclic edges

Type Checking

WorkflowGraphAssert.assertTypeCompatible(graph) walks a compiled graph and checks that each step’s declared output type is assignable to the next step’s declared input type. Catches ClassCastException-style bugs at test time. Opt-in: Steps that override inputType() / outputType() participate. Lambda steps and Step.named() return Object.class by default and are silently skipped — no false positives.
// Typed steps declare their types
static class ClassifyStep implements Step<String, ClassificationResult> {
    @Override public Class<?> inputType()  { return String.class; }
    @Override public Class<?> outputType() { return ClassificationResult.class; }
    // ...
}

static class FormatResultStep implements Step<ClassificationResult, String> {
    @Override public Class<?> inputType()  { return ClassificationResult.class; }
    @Override public Class<?> outputType() { return String.class; }
    // ...
}

// Compile and check — no LLM calls needed
WorkflowGraph<String, Object> graph = Workflow.<String, Object>define("pipeline")
        .step(new ClassifyStep(chat))
        .then(new FormatResultStep())
        .compile();

WorkflowGraphAssert.assertTypeCompatible(graph);  // passes
If types don’t match:
.step(new ClassifyStep(chat))       // outputs ClassificationResult
.then(new IntegerConsumer())         // expects Integer — MISMATCH

// Throws:
// TypeIncompatibleException: step 'classify' outputs ClassificationResult
//   but step 'int-consumer' expects Integer
Untyped steps (lambdas) break the typed chain — no check across the gap:
.step(new ClassifyStep(chat))                           // typed
.then(Step.named("transform", (ctx, in) -> in.toString()))  // untyped — skipped
.then(new IntegerConsumer())                             // no check — prior step untyped
Use in CI to validate workflow structure without making LLM calls.

WorkflowAbortException

Thrown by a step to abort the workflow and return a typed result. Unlike an unhandled exception (which propagates as an error), WorkflowAbortException is caught by the executor and its carried result becomes the workflow’s output:
class QualityGateStep implements Step<ReviewResult, Path> {
    @Override
    public Path execute(AgentContext ctx, ReviewResult result) {
        if (result.isTooRisky()) {
            // abort — caller receives the error report, not an exception
            throw new WorkflowAbortException(writeErrorReport(result));
        }
        return writeApprovalReport(result);
    }
}
Use WorkflowAbortException when a step detects an unrecoverable condition but can still produce a meaningful output — for example, a typed error response, a failure report, or a structured rejection. Prefer it over returning null or throwing a raw exception when the caller needs to distinguish “completed with degraded output” from “crashed.”

StepRunner

The substrate swap seam. Same workflow code, different durability:
@Bean StepRunner stepRunner() {
    return new LocalStepRunner();  // in-process, zero overhead
}
Three runners are available:
RunnerWhat it addsStatus
LocalStepRunnerDirect in-process execution, zero overheadAvailable
CheckpointingStepRunnerJDBC crash recovery via workflow-batchAvailable
TemporalStepRunnerDistributed durable execution via workflow-temporalAvailable
Same workflow code — swap the @Bean, not the workflow. See Durability for setup and crash-recovery examples. Operator retry (CheckpointManager): FAILED steps are not retried automatically — the system holds them until an operator decides the failure was transient. CheckpointManager.resetFailedSteps(runId) deletes FAILED records; re-running with the same runId then skips COMPLETED steps and retries the reset ones.
CheckpointManager manager = new CheckpointManager(readRepo, writeRepo);
manager.getRunState("run-1");        // inspect: what completed, what failed
manager.resetFailedSteps("run-1");   // delete FAILED records (only if transient)
executor.execute(graph, ctx, input); // retry — COMPLETED skipped, reset steps re-run
Sub-workflows are always inline: a Workflow used as a step bypasses the StepRunner entirely and runs in-process. Only leaf steps go through the runner. This is intentional — TemporalStepRunner dispatches to a separate activity worker thread and cannot carry full parent context; sub-workflows must stay in-process to propagate context correctly.

TraceRecorder

Records every step transition:
record StepTransition(
    String fromStep, String toStep,
    Duration duration, int tokensUsed,
    double costUsd, Instant timestamp
) {}
TraceRecorder.noop() is the default — zero overhead unless opted in. Trace data feeds Markov analysis, run diagnosis, and replay.

Module Structure

agent-workflow/
├── workflow-api/        # Step, AgentContext, ContextKey, Gate, RunOptions
├── workflow-core/       # WorkflowGraph, WorkflowExecutor, StepRunner, TraceRecorder
├── workflow-flows/      # Workflow DSL, ClaudeStep, ChatClientStep, A2AStep, JudgeGate
├── workflow-batch/      # CheckpointingStepRunner, JdbcTraceRecorder, JPA entities
├── workflow-temporal/   # TemporalStepRunner, StepActivityImpl
├── workflow-journal/    # JournalContextPolicy (agent-journal backed)
├── workflow-mcp/        # McpAgentToolRegistrar
├── workflow-tools/      # BashTool, ReadTool, WriteTool, EditTool, GlobTool, GrepTool
├── workflow-agents/     # AgentLoop
└── workflow-examples/   # Example pipelines