Workflows
durable execution via event sourcing — explicit continuations, crash recovery, dual runtime
Overview
Fling workflows are powered by flingflow, a vendored durable execution engine.
It provides event sourcing (all state changes are appended as immutable events),
explicit continuations (each step returns the name of the next step or signals completion),
and automatic crash recovery (stuck workflows are detected and resumed by a cron job).
The engine was vendored into
packages/flingflow/ to keep it self-contained and
avoid external dependency churn.
01
Register
workflow(name, steps)
→
02
Start
workflow.start(name, data)
→
03
Execute
step → step → done
→
04
Result
run.result()
Event Sourced
Every state change is an immutable event appended to the
_workflow_events table. State is always derived
from replaying the event log.
Explicit Continuations
Steps return
{ step: "next" } to continue
or { done: true, result } to complete.
No implicit state machine magic.
Crash Recovery
A
__workflow-recovery cron runs every minute,
detects stuck or timed-out workflows, and drives them
to completion.
Architecture
The workflow system is organized into distinct layers. User code interacts only with the
top-level
workflow API. Everything below is internal machinery that handles
registration, execution, persistence, and recovery.
User API
workflow(name, steps, options)
wraps internal types
Runtime Module
src/workflow/runtime.ts
enqueues via QueueSender
Platform API
POST /internal/workflow/execute → Queue
queue consumer dispatches
Engine
flingflow/engine
Registry
WorkflowRegistry
persists events via
Interface
EventStore
implemented by
Local
SqliteEventStore
Workers
D1EventStore
stores rows in
Database
_workflow_events table
Event Sourcing Model
Workflow state is never stored directly. Instead, every action appends an immutable event
to the
_workflow_events table. The current state is derived by reading the
last event for a given run. Six event types form a state machine with well-defined transitions.
Event Types
workflow_created
A new run begins. Payload:
workflowName, writes.step_started
A step is claimed for execution. Payload:
step, attempt.step_completed
A step finished successfully. Payload:
step, writes, next_step?.step_failed
A step threw an error. Payload:
step, attempt, reason, retryable.workflow_completed
The workflow succeeded. Payload:
result.workflow_failed
The workflow failed permanently. Payload:
reason, cause.State Machine Transitions
The
resolveTransition() function in transitions.ts determines
the next action based on the last event. This is the sole decision point for what happens next.
Created
workflow_created
start_step("start", 1)
Running
step_started
execute step function
Success
step_completed
Failure
step_failed
terminal events
Done
workflow_completed
Failed
workflow_failed
Atomic terminal events. When a step completes with
{ done: true, result },
the engine writes step_completed and workflow_completed together via
appendBatch. Similarly, when a failure is terminal (non-retryable or max attempts exceeded),
step_failed and workflow_failed are batched atomically. This ensures
no window exists where a run is in a partially-terminal state.
Execution Model
Workflows use a queue-based execution model. When
workflow.start() is called,
it creates the initial event in D1 and enqueues execution via the platform API. The returned
WorkflowRun handle allows the caller to poll for the final result.
Lifecycle of a workflow.start() call
Step 1
createWorkflow(workflowId, name, writes)
enqueue via platform API
Step 2
Platform API → WORKFLOW_QUEUE.send({ runId, scriptName })
queue consumer dispatches
Step 3
Consumer → DISPATCHER → /__workflow/:runId/execute
execute steps (50s budget)
Step 4
Not done? Re-enqueue for continuation
caller polls result
Step 5
result() → polls getLastEvent() every 1s
Deduplication
appendIfNoActiveWorkflow ensures at most one active run per
workflowId. If a caller passes { id: "my-wf" }
and a run is already active, start() returns the existing
run handle with created: false instead of creating a new one.
Local Dev Simulation
In local dev, there is no queue. The
QueueSender is implemented
with a setTimeout(0) loop that calls __executeWorkflowSteps
directly, re-scheduling itself if not done. This simulates the queue
re-enqueue behavior without any platform infrastructure.
The 50-second time budget.
__executeWorkflowSteps runs
engine.executeStep in a for (;;) loop. Each call:
(1) claims the next step via claimStep (writes step_started),
(2) runs the workflow function,
(3) commits the result via commitStep (writes step_completed
or step_failed). The loop exits when executeStep returns
false (terminal) or when the 50-second time budget is exceeded. If the
budget is exceeded, the consumer re-enqueues the message for a fresh execution context.
Dual Runtime
A single module —
src/workflow/runtime.ts — serves both the local
Node.js runtime and the Cloudflare Worker runtime. It achieves this through dependency injection:
the entry point for each runtime injects the appropriate EventStore,
CronRegistrar, and QueueSender before user code runs.
Injection Points
__setCronRegistrar(fn)
Injects the cron registration function. Called by each runtime's entry point
before user code is imported. This ensures
workflow() calls
during module evaluation can register the __workflow-recovery cron.
- Local:
fling devshim calls__setCronRegistrar(cron)then dynamically imports user app - Workers:
entry.tscalls__setCronRegistrar(cron)during first-request initialization
__initWorkflowEngine(store)
Injects the
EventStore implementation and initializes the engine.
Creates a WorkflowRegistry from the module-scope definitions,
constructs the Engine, and calls store.initialize()
to ensure the _workflow_events table exists. Idempotent.
- Local:
entry.tspassesnew SqliteEventStore(dbPath) - Workers:
entry.tspassesnew D1EventStore(env.DB)
__setQueueSender(sender)
Injects the
QueueSender that dispatches execution requests.
Called after engine initialization. The sender's send() method
is how start() and recovery trigger workflow execution.
- Local:
entry.tscreates a sender that usessetTimeout(0)+__executeWorkflowStepsloop - Workers:
entry.tscreates a sender that POSTs toFLING_API /internal/workflow/executewithFLING_WORKER_SECRET
Phase 1
User calls workflow(name, steps) at top level
stored in definitions Map
Phase 2
Entry point calls __initWorkflowEngine(eventStore)
inject queue sender
Phase 3
Entry point calls __setQueueSender(sender)
engine + queue ready
Phase 4
workflow.start() / .get() / .list() available
Zero code duplication. There is exactly one workflow runtime module.
The local runtime and the Worker runtime import the same
src/workflow/runtime.ts.
The differences are which EventStore is injected and which QueueSender
is provided. The worker entry also exposes a /__workflow/:runId/execute internal
endpoint that the queue consumer dispatches to — this handler calls
__executeWorkflowSteps(runId) and returns { done: boolean }.
D1EventStore
The
D1EventStore implements the EventStore interface using
Cloudflare D1's async API. It lives in src/worker-runtime/d1-event-store.ts and
handles all SQL interactions for workflow events in production.
Implementation Details
initialize()
CREATE TABLE IF NOT EXISTS with primary key (run_id, seq).
Creates indexes for (run_id, seq DESC) and (workflow_id).
Uses db.exec() since these are DDL statements.
appendIfNoActiveWorkflow()
Atomic
INSERT...SELECT...WHERE NOT EXISTS. Checks if any
active run exists for the workflowId in a single SQL statement.
If result.meta.changes === 0, queries for the active run_id
and returns WorkflowAlreadyActiveError.
appendBatch()
Uses
db.batch() for atomic multi-event writes.
D1 batches are all-or-nothing, ensuring terminal events
(step_completed + workflow_completed)
are never partially written.
Workflow ID Validation
Every
append() and appendBatch() call validates that
the event's workflow_id matches existing events for the same
run_id. Returns WorkflowIdMismatchError on conflict.
Error Types
UniqueConstraintError
Duplicate
(run_id, seq). Normal contention — another
worker already claimed the step. The engine treats this as a non-fatal
race and retries.
WorkflowAlreadyActiveError
Returned by
appendIfNoActiveWorkflow when a run is already
active for the given workflowId. Carries the activeRunId.
WorkflowIdMismatchError
An event was appended with a different
workflow_id than what
already exists for that run_id. Indicates a logic error.
Recovery
If execution fails mid-flight, a workflow can get stuck. The
__workflow-recovery
cron job runs every minute, scans for stuck workflows, and re-enqueues them via
queueSender.send() (which calls the platform API). It is registered
automatically on the first workflow() call.
Recovery Reasons
timeout
A step has been running longer than
stepTimeoutMS.
Recovery writes step_failed with reason "timeout".
If max attempts exceeded, also writes workflow_failed.
stuck_created
A workflow was created but its first step was never started.
The enqueue after
createWorkflow failed, or the
queue message was lost before execution began.
stuck_completed
A step completed with a
next_step continuation,
but the next step was never claimed. The process crashed
between steps.
stuck_failed
A step failed with a retryable error, but the retry was
never started. The process crashed after writing the failure
event.
stuck_terminal
A workflow should have been finalized (completed or failed)
but the terminal event was never written. The
appendBatch
may have failed.
Recovery Flow
Trigger
__workflow-recovery cron (every minute)
scan active workflows
Analyze
Check each active run for stuck/timeout conditions
fix + re-enqueue via queueSender
Result
Re-enqueued N workflow(s)
Recovery is safe to run concurrently. The engine uses
UniqueConstraintError
detection for optimistic concurrency. If a normal execution and the recovery cron both try to
claim the same step, the one that loses the race gets a UniqueConstraintError and
backs off. No locks or distributed coordination are needed.
Bundler Safety
The flingflow package depends on
better-sqlite3 (a native Node.js addon) for its
SqliteEventStore. This must never leak into Cloudflare Worker
builds, which cannot run native addons. Two mechanisms prevent this.
Subpath Imports
Fine-grained package exports
The flingflow
package.json exports individual modules via subpath exports:
flingflow/engine, flingflow/store, flingflow/types,
flingflow/recovery, flingflow/clock, flingflow/registry,
and flingflow/context.
The workflow runtime (
src/workflow/runtime.ts) imports only from these subpaths —
never from the barrel flingflow export. Since SqliteEventStore
(which depends on better-sqlite3) only lives in flingflow/index,
it is never reached by the Worker bundle's import graph.
esbuild Rejection Plugin
Build-time safety net
The Worker bundler (
src/cli/deploy/bundler.ts) includes a
reject-native-addons esbuild plugin. It registers an
onResolve handler for the better-sqlite3 import specifier.
If
better-sqlite3 appears anywhere in the Worker build's import graph,
the build fails immediately with a clear error:
"better-sqlite3 must not be imported in Worker builds. Use flingflow subpath imports
instead of the barrel export."
Defense in depth. Subpath imports prevent the problem by construction —
the Worker code simply never imports the module that pulls in
better-sqlite3.
The esbuild plugin is the safety net: even if someone accidentally imports the barrel export,
the build fails loudly rather than producing a broken Worker bundle.
Key Files
A quick reference mapping each file to its role in the workflow system.
Fling Source Tree
| File | Purpose |
|---|---|
| src/workflow/runtime.ts | The single workflow runtime module. Manages registration, engine initialization, QueueSender injection, start/get/list operations, __executeWorkflowSteps (the step execution loop with 50s time budget), and recovery cron. Imported by both runtimes. |
| src/types/workflow.ts | User-facing types: WorkflowApi, WorkflowCtx, WorkflowRun, WorkflowContinuation, WorkflowStepHandler, WorkflowOptions. |
| src/worker-runtime/d1-event-store.ts | D1 implementation of EventStore for Cloudflare Workers. Handles INSERT...WHERE NOT EXISTS, batch atomicity, workflow ID validation. |
| src/worker-runtime/entry.ts | Worker entry point. Calls __setCronRegistrar, __initWorkflowEngine(new D1EventStore(db)), and __setQueueSender (with a sender that POSTs to FLING_API). Exposes the /__workflow/:runId/execute internal endpoint. |
| src/runtime/entry.ts | Local entry point. Calls __initWorkflowEngine(new SqliteEventStore(dbPath)) and __setQueueSender (with a setTimeout(0) loop that calls __executeWorkflowSteps directly). |
| platform/api/src/routes/internal.ts | POST /internal/workflow/execute route. Receives { runId } from user workers, derives scriptName from the authenticated project, enqueues { runId, scriptName } onto WORKFLOW_QUEUE. |
| platform/api/src/workflow-queue.ts | Queue message processor. Dequeues { runId, scriptName }, dispatches to the user worker via DISPATCHER.get(scriptName).fetch("/__workflow/{runId}/execute"), re-enqueues if not done. |
| src/cli/deploy/bundler.ts | Worker bundler with the reject-native-addons esbuild plugin that prevents better-sqlite3 from leaking into Worker builds. |
flingflow Package (packages/flingflow/)
| File | Purpose |
|---|---|
| src/engine.ts | Core execution engine. createWorkflow, claimStep, executeStep, commitStep. Called by __executeWorkflowSteps in the runtime module. |
| src/store.ts | EventStore interface definition. Error types: UniqueConstraintError, WorkflowAlreadyActiveError, WorkflowIdMismatchError. |
| src/types.ts | Core types: WorkflowEvent discriminated union (6 event types), Continuation, Ctx, WorkflowFn, ClaimResult. |
| src/transitions.ts | State machine transition resolver. Maps each event type to the next action: start_step, complete_workflow, fail_workflow, or none. |
| src/recovery.ts | Recovery logic. Scans active workflows for stuck and timed-out states. Returns RecoveryResult[] with reason codes. |
| src/context.ts | Builds the Ctx object for workflow functions. Manages scratchpad reads (from completed events) and buffered writes. |
| src/registry.ts | WorkflowRegistry class. Maps workflow names to definitions (fn, maxAttempts, backoff config, stepTimeoutMS). |
| src/clock.ts | Clock interface and SystemClock implementation. Injected into Engine for testable time handling. |
| src/store-sqlite.ts | SqliteEventStore using better-sqlite3. Used only in local development. Never imported by Worker code. |
| src/store-memory.ts | MemoryEventStore for tests. In-memory event storage with full EventStore interface compliance. |