Skip to main content

Step<I, O>

The atomic unit of a workflow.
public interface Step<I, O> {
    String name();
    O execute(AgentContext ctx, I input);

    default Class<?> inputType()  { return Object.class; }
    default Class<?> outputType() { return Object.class; }

    static <I,O> Step<I,O> named(String name, BiFunction<AgentContext, I, O> fn);
    static <I,O> Step<I,O> noop();
}
AgentStep marker — implement on steps that make LLM calls for correct NodeType.AGENT cost tracking:
public interface AgentStep { /* marker — no methods */ }

Built-in Step Types

TypeFactoryWhat it wraps
LambdaStep.named("n", (ctx, in) -> ...)Any function
ChatChatClientStep.of(chat, "template {input}")Single Spring AI call
ClaudeClaudeStep.of("template").workingDirectory(path)Full Claude CLI session
A2AA2AStep.of(url)Remote agent via Agent-to-Agent protocol
FunctionSteps.of(fn)Pure deterministic function
RetrySteps.retrying(3, step)Retry wrapper
Output refSteps.outputOf("step-name")Read prior step result from context
TerminateSteps.terminate(status, msg)Early workflow exit

ClaudeStep options

ClaudeStep.of("Fix the tests in {input}")
    .workingDirectory(projectPath)
    .permissionMode(PermissionMode.ACCEPT_EDITS)
    .withMcp(mcpConfig)
    .withA2a(a2aEndpoint);

Workflow Builder

Workflow.<I, O>define("name")
    .step(step)                                    // first step
    .then(step)                                    // chain
    .branch(predicate).then(a).otherwise(b)        // conditional
    .parallel(step1, step2)                        // static fan-out
    .parallel(itemsSupplier, stepFactory)           // dynamic fan-out
    .repeatUntil(predicate).step(a).end()          // while-do
    .repeatUntilOutput(predicate).step(a).end()    // do-while
    .decision(chat).option("a", step).end()        // LLM routing
    .gate(gate).onPass(a).onFail(b).maxRetries(3).end()  // quality gate
    .onError(ExType.class, recovery)               // error recovery
    .run(input)                                     // execute
    .run(input, RunOptions.maxCost(5.0))           // with constraints
    .compile()                                      // → WorkflowGraph (IR only)
    .build()                                        // → Workflow (implements Step)
Named shortcuts:
Workflow.sequential("name").step(a).then(b).run(input);
Workflow.loop("name").step(a).times(5).run(input);
Workflow.loop("name").step(a).until(predicate).run(input);
Workflow.parallel("name").step(a, b, c).run(input);
Workflow.supervisor("name", chat).agents(a, b).until(pred).run(input);

Gate

public interface Gate<O> {
    GateDecision evaluate(AgentContext ctx, O output);
}
GateDecision: PASS, FAIL, ESCALATE, TIMEOUT

Implementations

GateDescription
JudgeGate(jury, threshold)Automated — passes when agent-judge score >= threshold
TieredGate(jury, high, low)>= high → PASS, >= low → ESCALATE, < low → FAIL
HumanGateHITL — waits for external signal with durable timeout

Gate builder

.gate(new JudgeGate(jury, 0.8))
    .onPass(approveStep)          // required
    .onFail(reviseStep)           // optional
    .onTimeout(fallbackStep)      // optional
    .withReflector(reflectorStep) // Verdict → feedback text for retry
    .maxRetries(3)                // cap
    .end()
On failure, Verdict written to AgentContext.JUDGE_VERDICT. Reflector transforms verdict into constructive feedback for the retry step.

AgentContext

Immutable, threaded through every step. Copy-on-write via mutate().

Well-known Keys

KeyTypeDescription
WORKFLOW_RUN_IDStringUnique execution ID
WORKFLOW_NAMEStringWorkflow name
CURRENT_STEPStringActive step name
ITERATION_COUNTIntegerLoop counter
ACCUMULATED_COSTDoubleUSD spend so far
ACCUMULATED_TOKENSLongTotal tokens
JUDGE_VERDICTVerdictGate failure verdict
JUDGE_REFLECTIONStringReflector feedback text
// Read
ctx.get(AgentContext.ITERATION_COUNT)      // Optional<Integer>
ctx.require(AgentContext.ITERATION_COUNT)   // throws if missing

// Write (returns new context)
ctx.mutate().with(key, value).build()
ContextKey<T> implements Bloch’s Typesafe Heterogeneous Container (Effective Java Item 33).

RunOptions

RunOptions.maxCost(5.0)
RunOptions.maxIterations(50)
RunOptions.maxDuration(Duration.ofMinutes(10))
RunOptions.unlimited()

// Chain
RunOptions.maxCost(5.0).withMaxIterations(50).withMaxDuration(Duration.ofMinutes(10))

WorkflowGraph (IR)

Pure data structure — no execution logic, no Spring AI imports.
WorkflowGraph<I, O> graph = workflow.compile();
graph.name()         // String
graph.nodes()        // List<WorkflowNode>
graph.edges()        // List<WorkflowEdge>
graph.startNode()    // String
graph.finishNode()   // String

Node Types (sealed)

NodeDescription
StepNodeRegular step execution (AGENT or DETERMINISTIC)
GatewayNodePredicate branch
DecisionNodeLLM routing
GateNodeQuality gate with reflector and retry
LoopEntryNodeWhile-do entry
LoopCheckNodeDo-while condition
LoopExitNodeLoop resume point
ForkNodeParallel split
JoinNodeConvergence

Edge Conditions (sealed)

ConditionFrom
UnconditionalSequential
BooleanGuard(true/false)GatewayNode
OptionMatch("name")DecisionNode
GateMatch(PASS/FAIL/ESCALATE)GateNode
BranchIndex(i)ForkNode
ErrorMatch(ExType)Error edges
LoopContinue / LoopExitLoop nodes
BackEdge(condition)backTo() cyclic edges

Type Checking

WorkflowGraphAssert.assertTypeCompatible(graph) walks a compiled graph and checks that each step’s declared output type is assignable to the next step’s declared input type. Catches ClassCastException-style bugs at test time. Opt-in: Steps that override inputType() / outputType() participate. Lambda steps and Step.named() return Object.class by default and are silently skipped — no false positives.
// Typed steps declare their types
static class ClassifyStep implements Step<String, ClassificationResult> {
    @Override public Class<?> inputType()  { return String.class; }
    @Override public Class<?> outputType() { return ClassificationResult.class; }
    // ...
}

static class FormatResultStep implements Step<ClassificationResult, String> {
    @Override public Class<?> inputType()  { return ClassificationResult.class; }
    @Override public Class<?> outputType() { return String.class; }
    // ...
}

// Compile and check — no LLM calls needed
WorkflowGraph<String, Object> graph = Workflow.<String, Object>define("pipeline")
        .step(new ClassifyStep(chat))
        .then(new FormatResultStep())
        .compile();

WorkflowGraphAssert.assertTypeCompatible(graph);  // passes
If types don’t match:
.step(new ClassifyStep(chat))       // outputs ClassificationResult
.then(new IntegerConsumer())         // expects Integer — MISMATCH

// Throws:
// TypeIncompatibleException: step 'classify' outputs ClassificationResult
//   but step 'int-consumer' expects Integer
Untyped steps (lambdas) break the typed chain — no check across the gap:
.step(new ClassifyStep(chat))                           // typed
.then(Step.named("transform", (ctx, in) -> in.toString()))  // untyped — skipped
.then(new IntegerConsumer())                             // no check — prior step untyped
Use in CI to validate workflow structure without making LLM calls.

StepRunner

The substrate swap seam. Same workflow code, different durability:
@Bean StepRunner stepRunner() {
    return new LocalStepRunner();  // in-process, zero overhead
}
Three runners are available:
RunnerWhat it addsStatus
LocalStepRunnerDirect in-process execution, zero overheadAvailable
CheckpointingStepRunnerJDBC crash recovery via workflow-batchAvailable
TemporalStepRunnerDistributed durable execution via workflow-temporalAvailable
Same workflow code — swap the @Bean, not the workflow. See Durability for setup and crash-recovery examples.

TraceRecorder

Records every step transition:
record StepTransition(
    String fromStep, String toStep,
    Duration duration, int tokensUsed,
    double costUsd, Instant timestamp
) {}
TraceRecorder.noop() is the default — zero overhead unless opted in. Trace data feeds Markov analysis, run diagnosis, and replay.

Module Structure

agent-workflow/
├── workflow-api/        # Step, AgentContext, ContextKey, Gate, RunOptions
├── workflow-core/       # WorkflowGraph, WorkflowExecutor, StepRunner, TraceRecorder
├── workflow-flows/      # Workflow DSL, ClaudeStep, ChatClientStep, A2AStep, JudgeGate
├── workflow-batch/      # CheckpointingStepRunner, JdbcTraceRecorder, JPA entities
├── workflow-temporal/   # TemporalStepRunner, StepActivityImpl
├── workflow-journal/    # JournalContextPolicy (agent-journal backed)
├── workflow-mcp/        # McpAgentToolRegistrar
├── workflow-tools/      # BashTool, ReadTool, WriteTool, EditTool, GlobTool, GrepTool
├── workflow-agents/     # AgentLoop
└── workflow-examples/   # Example pipelines