Workflows

durable execution via event sourcing — explicit continuations, crash recovery, dual runtime

Overview
Fling workflows are powered by flingflow, a vendored durable execution engine. It provides event sourcing (all state changes are appended as immutable events), explicit continuations (each step returns the name of the next step or signals completion), and automatic crash recovery (stuck workflows are detected and resumed by a cron job). The engine was vendored into packages/flingflow/ to keep it self-contained and avoid external dependency churn.
01
Register
workflow(name, steps)
02
Start
workflow.start(name, data)
03
Execute
step → step → done
04
Result
run.result()
Event Sourced
Every state change is an immutable event appended to the _workflow_events table. State is always derived from replaying the event log.
Explicit Continuations
Steps return { step: "next" } to continue or { done: true, result } to complete. No implicit state machine magic.
Crash Recovery
A __workflow-recovery cron runs every minute, detects stuck or timed-out workflows, and drives them to completion.
Architecture
The workflow system is organized into distinct layers. User code interacts only with the top-level workflow API. Everything below is internal machinery that handles registration, execution, persistence, and recovery.
User API
workflow(name, steps, options)
wraps internal types
Runtime Module
src/workflow/runtime.ts
enqueues via QueueSender
Platform API
POST /internal/workflow/execute → Queue
queue consumer dispatches
Engine
flingflow/engine
Registry
WorkflowRegistry
persists events via
Interface
EventStore
implemented by
Local
SqliteEventStore
Workers
D1EventStore
stores rows in
Database
_workflow_events table
Event Sourcing Model
Workflow state is never stored directly. Instead, every action appends an immutable event to the _workflow_events table. The current state is derived by reading the last event for a given run. Six event types form a state machine with well-defined transitions.
Event Types
workflow_created
A new run begins. Payload: workflowName, writes.
step_started
A step is claimed for execution. Payload: step, attempt.
step_completed
A step finished successfully. Payload: step, writes, next_step?.
step_failed
A step threw an error. Payload: step, attempt, reason, retryable.
workflow_completed
The workflow succeeded. Payload: result.
workflow_failed
The workflow failed permanently. Payload: reason, cause.
State Machine Transitions
The resolveTransition() function in transitions.ts determines the next action based on the last event. This is the sole decision point for what happens next.
Created
workflow_created
start_step("start", 1)
Running
step_started
execute step function
Success
step_completed
Failure
step_failed
terminal events
Done
workflow_completed
Failed
workflow_failed
Atomic terminal events. When a step completes with { done: true, result }, the engine writes step_completed and workflow_completed together via appendBatch. Similarly, when a failure is terminal (non-retryable or max attempts exceeded), step_failed and workflow_failed are batched atomically. This ensures no window exists where a run is in a partially-terminal state.
Execution Model
Workflows use a queue-based execution model. When workflow.start() is called, it creates the initial event in D1 and enqueues execution via the platform API. The returned WorkflowRun handle allows the caller to poll for the final result.
Lifecycle of a workflow.start() call
Step 1
createWorkflow(workflowId, name, writes)
enqueue via platform API
Step 2
Platform API → WORKFLOW_QUEUE.send({ runId, scriptName })
queue consumer dispatches
Step 3
Consumer → DISPATCHER → /__workflow/:runId/execute
execute steps (50s budget)
Step 4
Not done? Re-enqueue for continuation
caller polls result
Step 5
result() → polls getLastEvent() every 1s
Deduplication
appendIfNoActiveWorkflow ensures at most one active run per workflowId. If a caller passes { id: "my-wf" } and a run is already active, start() returns the existing run handle with created: false instead of creating a new one.
Local Dev Simulation
In local dev, there is no queue. The QueueSender is implemented with a setTimeout(0) loop that calls __executeWorkflowSteps directly, re-scheduling itself if not done. This simulates the queue re-enqueue behavior without any platform infrastructure.
The 50-second time budget. __executeWorkflowSteps runs engine.executeStep in a for (;;) loop. Each call: (1) claims the next step via claimStep (writes step_started), (2) runs the workflow function, (3) commits the result via commitStep (writes step_completed or step_failed). The loop exits when executeStep returns false (terminal) or when the 50-second time budget is exceeded. If the budget is exceeded, the consumer re-enqueues the message for a fresh execution context.
Dual Runtime
A single module — src/workflow/runtime.ts — serves both the local Node.js runtime and the Cloudflare Worker runtime. It achieves this through dependency injection: the entry point for each runtime injects the appropriate EventStore, CronRegistrar, and QueueSender before user code runs.
Injection Points
__setCronRegistrar(fn)
Injects the cron registration function. Called by each runtime's entry point before user code is imported. This ensures workflow() calls during module evaluation can register the __workflow-recovery cron.
  • Local: fling dev shim calls __setCronRegistrar(cron) then dynamically imports user app
  • Workers: entry.ts calls __setCronRegistrar(cron) during first-request initialization
__initWorkflowEngine(store)
Injects the EventStore implementation and initializes the engine. Creates a WorkflowRegistry from the module-scope definitions, constructs the Engine, and calls store.initialize() to ensure the _workflow_events table exists. Idempotent.
  • Local: entry.ts passes new SqliteEventStore(dbPath)
  • Workers: entry.ts passes new D1EventStore(env.DB)
__setQueueSender(sender)
Injects the QueueSender that dispatches execution requests. Called after engine initialization. The sender's send() method is how start() and recovery trigger workflow execution.
  • Local: entry.ts creates a sender that uses setTimeout(0) + __executeWorkflowSteps loop
  • Workers: entry.ts creates a sender that POSTs to FLING_API /internal/workflow/execute with FLING_WORKER_SECRET
Phase 1
User calls workflow(name, steps) at top level
stored in definitions Map
Phase 2
Entry point calls __initWorkflowEngine(eventStore)
inject queue sender
Phase 3
Entry point calls __setQueueSender(sender)
engine + queue ready
Phase 4
workflow.start() / .get() / .list() available
Zero code duplication. There is exactly one workflow runtime module. The local runtime and the Worker runtime import the same src/workflow/runtime.ts. The differences are which EventStore is injected and which QueueSender is provided. The worker entry also exposes a /__workflow/:runId/execute internal endpoint that the queue consumer dispatches to — this handler calls __executeWorkflowSteps(runId) and returns { done: boolean }.
D1EventStore
The D1EventStore implements the EventStore interface using Cloudflare D1's async API. It lives in src/worker-runtime/d1-event-store.ts and handles all SQL interactions for workflow events in production.
Implementation Details
initialize()
CREATE TABLE IF NOT EXISTS with primary key (run_id, seq). Creates indexes for (run_id, seq DESC) and (workflow_id). Uses db.exec() since these are DDL statements.
appendIfNoActiveWorkflow()
Atomic INSERT...SELECT...WHERE NOT EXISTS. Checks if any active run exists for the workflowId in a single SQL statement. If result.meta.changes === 0, queries for the active run_id and returns WorkflowAlreadyActiveError.
appendBatch()
Uses db.batch() for atomic multi-event writes. D1 batches are all-or-nothing, ensuring terminal events (step_completed + workflow_completed) are never partially written.
Workflow ID Validation
Every append() and appendBatch() call validates that the event's workflow_id matches existing events for the same run_id. Returns WorkflowIdMismatchError on conflict.
Error Types
UniqueConstraintError
Duplicate (run_id, seq). Normal contention — another worker already claimed the step. The engine treats this as a non-fatal race and retries.
WorkflowAlreadyActiveError
Returned by appendIfNoActiveWorkflow when a run is already active for the given workflowId. Carries the activeRunId.
WorkflowIdMismatchError
An event was appended with a different workflow_id than what already exists for that run_id. Indicates a logic error.
Recovery
If execution fails mid-flight, a workflow can get stuck. The __workflow-recovery cron job runs every minute, scans for stuck workflows, and re-enqueues them via queueSender.send() (which calls the platform API). It is registered automatically on the first workflow() call.
Recovery Reasons
timeout
A step has been running longer than stepTimeoutMS. Recovery writes step_failed with reason "timeout". If max attempts exceeded, also writes workflow_failed.
stuck_created
A workflow was created but its first step was never started. The enqueue after createWorkflow failed, or the queue message was lost before execution began.
stuck_completed
A step completed with a next_step continuation, but the next step was never claimed. The process crashed between steps.
stuck_failed
A step failed with a retryable error, but the retry was never started. The process crashed after writing the failure event.
stuck_terminal
A workflow should have been finalized (completed or failed) but the terminal event was never written. The appendBatch may have failed.
Recovery Flow
Trigger
__workflow-recovery cron (every minute)
scan active workflows
Analyze
Check each active run for stuck/timeout conditions
fix + re-enqueue via queueSender
Result
Re-enqueued N workflow(s)
Recovery is safe to run concurrently. The engine uses UniqueConstraintError detection for optimistic concurrency. If a normal execution and the recovery cron both try to claim the same step, the one that loses the race gets a UniqueConstraintError and backs off. No locks or distributed coordination are needed.
Bundler Safety
The flingflow package depends on better-sqlite3 (a native Node.js addon) for its SqliteEventStore. This must never leak into Cloudflare Worker builds, which cannot run native addons. Two mechanisms prevent this.
Subpath Imports
Fine-grained package exports
The flingflow package.json exports individual modules via subpath exports: flingflow/engine, flingflow/store, flingflow/types, flingflow/recovery, flingflow/clock, flingflow/registry, and flingflow/context.
The workflow runtime (src/workflow/runtime.ts) imports only from these subpaths — never from the barrel flingflow export. Since SqliteEventStore (which depends on better-sqlite3) only lives in flingflow/index, it is never reached by the Worker bundle's import graph.
esbuild Rejection Plugin
Build-time safety net
The Worker bundler (src/cli/deploy/bundler.ts) includes a reject-native-addons esbuild plugin. It registers an onResolve handler for the better-sqlite3 import specifier.
If better-sqlite3 appears anywhere in the Worker build's import graph, the build fails immediately with a clear error: "better-sqlite3 must not be imported in Worker builds. Use flingflow subpath imports instead of the barrel export."
Defense in depth. Subpath imports prevent the problem by construction — the Worker code simply never imports the module that pulls in better-sqlite3. The esbuild plugin is the safety net: even if someone accidentally imports the barrel export, the build fails loudly rather than producing a broken Worker bundle.
Key Files
A quick reference mapping each file to its role in the workflow system.
Fling Source Tree
FilePurpose
src/workflow/runtime.ts The single workflow runtime module. Manages registration, engine initialization, QueueSender injection, start/get/list operations, __executeWorkflowSteps (the step execution loop with 50s time budget), and recovery cron. Imported by both runtimes.
src/types/workflow.ts User-facing types: WorkflowApi, WorkflowCtx, WorkflowRun, WorkflowContinuation, WorkflowStepHandler, WorkflowOptions.
src/worker-runtime/d1-event-store.ts D1 implementation of EventStore for Cloudflare Workers. Handles INSERT...WHERE NOT EXISTS, batch atomicity, workflow ID validation.
src/worker-runtime/entry.ts Worker entry point. Calls __setCronRegistrar, __initWorkflowEngine(new D1EventStore(db)), and __setQueueSender (with a sender that POSTs to FLING_API). Exposes the /__workflow/:runId/execute internal endpoint.
src/runtime/entry.ts Local entry point. Calls __initWorkflowEngine(new SqliteEventStore(dbPath)) and __setQueueSender (with a setTimeout(0) loop that calls __executeWorkflowSteps directly).
platform/api/src/routes/internal.ts POST /internal/workflow/execute route. Receives { runId } from user workers, derives scriptName from the authenticated project, enqueues { runId, scriptName } onto WORKFLOW_QUEUE.
platform/api/src/workflow-queue.ts Queue message processor. Dequeues { runId, scriptName }, dispatches to the user worker via DISPATCHER.get(scriptName).fetch("/__workflow/{runId}/execute"), re-enqueues if not done.
src/cli/deploy/bundler.ts Worker bundler with the reject-native-addons esbuild plugin that prevents better-sqlite3 from leaking into Worker builds.
flingflow Package (packages/flingflow/)
FilePurpose
src/engine.ts Core execution engine. createWorkflow, claimStep, executeStep, commitStep. Called by __executeWorkflowSteps in the runtime module.
src/store.ts EventStore interface definition. Error types: UniqueConstraintError, WorkflowAlreadyActiveError, WorkflowIdMismatchError.
src/types.ts Core types: WorkflowEvent discriminated union (6 event types), Continuation, Ctx, WorkflowFn, ClaimResult.
src/transitions.ts State machine transition resolver. Maps each event type to the next action: start_step, complete_workflow, fail_workflow, or none.
src/recovery.ts Recovery logic. Scans active workflows for stuck and timed-out states. Returns RecoveryResult[] with reason codes.
src/context.ts Builds the Ctx object for workflow functions. Manages scratchpad reads (from completed events) and buffered writes.
src/registry.ts WorkflowRegistry class. Maps workflow names to definitions (fn, maxAttempts, backoff config, stepTimeoutMS).
src/clock.ts Clock interface and SystemClock implementation. Injected into Engine for testable time handling.
src/store-sqlite.ts SqliteEventStore using better-sqlite3. Used only in local development. Never imported by Worker code.
src/store-memory.ts MemoryEventStore for tests. In-memory event storage with full EventStore interface compliance.