Skip to main content

Documentation Index

Fetch the complete documentation index at: https://lab.pollack.ai/llms.txt

Use this file to discover all available pages before exploring further.

The Graduation Path

Agent Workflow separates workflow definition from execution. The StepRunner interface is the seam — swap the bean, not the workflow:
LevelRunnerWhat it adds
0LocalStepRunnerIn-process, zero overhead. Default.
1CheckpointingStepRunnerJDBC crash recovery — resume from last completed step
2TemporalStepRunnerDistributed durable execution via Temporal activities
Same workflow code at every level:
// Level 0 — default, no persistence
@Bean StepRunner stepRunner() {
    return new LocalStepRunner();
}

// Level 1 — JDBC crash recovery
@Bean StepRunner stepRunner(AgentStepExecutionReadRepository readRepo,
                            AgentStepExecutionWriteRepository writeRepo) {
    return new CheckpointingStepRunner(readRepo, writeRepo);
}

// Level 2 — Temporal durable execution
@Bean StepRunner stepRunner() {
    return new TemporalStepRunner("agent-tasks");
}

CheckpointingStepRunner

Persists step outputs to a JDBC database. On restart with the same runId, completed steps are skipped — their cached output is returned directly.

How it works

  1. Before executing a step, queries by (runId, stepName) — the checkpoint key
  2. If a COMPLETED record exists, returns the cached outputPayload (skip)
  3. Otherwise, creates a STARTED record, executes the step, upgrades to COMPLETED with the serialized output
  4. On exception, records FAILED with the error message

Maven coordinates

<dependency>
    <groupId>io.github.markpollack</groupId>
    <artifactId>workflow-batch</artifactId>
    <version>0.6.0</version>
</dependency>
Requires Spring Data JPA and a JDBC DataSource on the classpath. H2 works for development; Postgres or MySQL for production.

Restart semantics

runId is the stable identity for a workflow instance. COMPLETED steps are skipped permanently for that runId. FAILED steps are not automatically retried — the system leaves them in place until an operator explicitly decides to retry. This is intentional. Not all failures are transient: a bad prompt, a schema mismatch, or a programming error will fail again without a fix. Automatic retry would mask the real problem.

Crash-and-resume with CheckpointManager

When a step fails, call CheckpointManager.getRunState() to inspect what happened, then resetFailedSteps() only after confirming the failure was transient:
var manager = new CheckpointManager(readRepo, writeRepo);
var ctx = AgentContext.withRunId("run-1");
var executor = new WorkflowExecutor(checkpointRunner, TraceRecorder.noop());

// First attempt — crashes at step-c
try {
    executor.execute(workflow.graph(), ctx, "start");
} catch (RuntimeException e) {
    // step-a and step-b: COMPLETED; step-c: FAILED
}

// Operator inspects state
List<AgentStepExecution> state = manager.getRunState("run-1");
// diagnose: was this a transient network blip? a permanent config error?

// Only reset if the failure was transient
int reset = manager.resetFailedSteps("run-1");  // deletes the FAILED record(s)
// reset == 1 (step-c)

// Retry with same runId — step-a and step-b skipped, step-c retried
String result = executor.execute(workflow.graph(), ctx, "start");
resetFailedSteps deletes FAILED records; COMPLETED records are untouched. The next execution creates a fresh STARTED record for each reset step and re-runs it.

Basic crash-and-resume example

A 4-step workflow crashes at step 3. After operator reset, steps 1-2 are skipped (cached), step 3 retried:
Step<String, String> step1 = Step.named("step-1", (ctx, in) -> in + "→1");
Step<String, String> step2 = Step.named("step-2", (ctx, in) -> in + "→2");
Step<String, String> step3 = Step.named("step-3", (ctx, in) -> {
    if (shouldCrash()) throw new RuntimeException("crash!");
    return in + "→3";
});
Step<String, String> step4 = Step.named("step-4", (ctx, in) -> in + "→4");

var workflow = Workflow.<String, String>define("crash-resume")
        .step(step1).step(step2).step(step3).step(step4)
        .build();

// First attempt — step-3 fails
var ctx = AgentContext.withRunId("run-1");
var executor = new WorkflowExecutor(checkpointRunner, TraceRecorder.noop());
executor.execute(workflow.graph(), ctx, "start"); // throws

// Operator diagnoses, decides it was transient, resets
new CheckpointManager(readRepo, writeRepo).resetFailedSteps("run-1");

// Retry — steps 1-2 SKIPPED (cached), steps 3-4 execute
String result = executor.execute(workflow.graph(), ctx, "start");
// result: "start→1→2→3→4"
A complete runnable example is in workflow-dsl-examples/CrashRecoveryIT@DataJpaTest + H2, no LLM needed.

JPA entities

Two JPA entities back the checkpoint system:
EntityTablePurpose
AgentStepExecutionagent_step_executionsPer-step checkpoint. Key: (runId, stepName) unique constraint. Tracks status, output, tokens, cost.
AgentFlowExecutionagent_flow_executionsPer-run envelope. Tracks workflow name, steps total/completed, total cost.
Both use BatchStatus (severity-ordered enum) and ExitStatus (embeddable record with severity-based composition via and()).

Typed output deserialization

Each checkpoint stores the step’s output type alongside its serialized payload. On restore, CheckpointingStepRunner uses Class.forName(outputType) to deserialize back to the original type rather than raw Object. This means that when a step is skipped and its cached output is returned to the next step, the type is preserved:
// Step produces a typed result
Step<String, SummaryReport> summarize = new SummarizeStep(chat);

// First run — step executes, checkpoint stores SummaryReport.class + JSON payload
// Second run (restart) — checkpoint restores SummaryReport, not Map or String
Steps that declare outputType() participate fully. Step.named() lambdas return Object.class by default — deserialization falls back to Jackson’s type inference for those.

JdbcTraceRecorder

Records every step transition to a step_transitions table. Auto-creates the table on first use.
// From DataSource
TraceRecorder recorder = new JdbcTraceRecorder(dataSource);

// Or from JdbcTemplate
TraceRecorder recorder = new JdbcTraceRecorder(jdbcTemplate);

// Wire into executor
var executor = new WorkflowExecutor(stepRunner, recorder);
Each StepTransition record includes: run_id, workflow_name, from_step, to_step, timestamp, duration_ms, tokens_used, cost_usd, node_type, label. Query traces for a run:
JdbcTraceRecorder recorder = new JdbcTraceRecorder(dataSource);
List<StepTransition> trace = recorder.getTrace("run-1");

TemporalStepRunner

Dispatches each step as a Temporal Activity. Steps must be registered with StepActivityImpl on the worker side.

Maven coordinates

<dependency>
    <groupId>io.github.markpollack</groupId>
    <artifactId>workflow-temporal</artifactId>
    <version>0.6.0</version>
</dependency>

Activity dispatch

// Workflow side — configure the runner
StepRunner runner = new TemporalStepRunner("agent-tasks");
// Default timeouts: 10min start-to-close, 30s heartbeat

// Custom timeouts
StepRunner runner = new TemporalStepRunner("agent-tasks",
        Duration.ofMinutes(30),   // start-to-close
        Duration.ofMinutes(1));   // heartbeat

Worker-side step registration

// Register steps with the activity implementation
StepActivityImpl activity = new StepActivityImpl();
activity.registerStep(step1);
activity.registerStep(step2);

// Wire into Temporal worker
Worker worker = factory.newWorker("agent-tasks");
worker.registerActivitiesImplementations(activity);
Steps are resolved by name from a ConcurrentHashMap registry. The activity creates a fresh AgentContext with the runId for each execution.
Steps dispatched via Temporal must be idempotent — Temporal may retry activities on timeout or failure.
Sub-workflows run inline, not as activities. A Workflow used as a step inside another Workflow bypasses the TemporalStepRunner and executes in-process. Only leaf steps are dispatched as Temporal activities. This is required for correct context propagation — the activity worker receives only the runId, not the full parent context.

API Reference

StepRunner interface, TraceRecorder, WorkflowExecutor

DSL Primitives

Sequential, parallel, gate, loop, branch, and more