YAML/JSON-driven workflow orchestration for OpenClaw agents.
Compose multi-step agent pipelines with dependency management, parallel execution, retry logic, output gates, cache adoption with contract freshness signatures, explicit handoff completion, and partial resume β all in a declarative YAML file.
OpenClaw subagents are powerful β but fire-and-forget. There's no native way to:
- Run agent B only after agent A succeeds
- Run agents A and B in parallel, then agent C when both finish
- Retry a flaky step before failing the whole pipeline
- Resume a partially-completed pipeline after a crash
- Validate that a step actually produced the expected output files
- Reuse cached outputs safely when they still match the current contract
Developers work around this with shell scripts, manual timing, and fragile cron chains. openclaw-workflow solves this at the platform level.
# Local development
npm install
npm run build
openclaw plugins install -l . --dangerously-force-unsafe-install
# From npm once published
openclaw plugins install openclaw-workflow --dangerously-force-unsafe-install--dangerously-force-unsafe-install is currently required because the plugin intentionally imports node:child_process for the CLI session fallback used when OpenClaw does not expose a stable native session API to plugins.
Note: When installing via
--link(local development), use the long form--link .rather than the shorthand-l .to avoid a known OpenClaw CLI argument-parsing ambiguity where-lmay not be recognized.
Sealed tool_worker steps intercept every tool result before the model sees it, spool the raw payload to the artifact store, and return only a bounded envelope to the model. This requires the plugin to register agentToolResultMiddleware β an API that OpenClaw currently restricts to bundled (first-party) plugins.
| Step | Command | What it does |
|---|---|---|
| 1. Patch | node scripts/patch-openclaw-trust-workflow-middleware.mjs --plugin openclaw-workflow |
Edits the OpenClaw gateway/runtime so openclaw-workflow is trusted for registerAgentToolResultMiddleware and Pi middleware receives stable session/run context (sessionId, sessionKey, runId). Without this, OpenClaw either rejects the middleware registration entirely or emits Pi tool_result events without enough identity to resolve the active sealed run reliably. |
| 2. Install | openclaw plugins install --link . --dangerously-force-unsafe-install |
Installs the plugin from your local checkout as a linked (symlinked) plugin. --link means OpenClaw loads directly from your working directory β no copy step needed. --dangerously-force-unsafe-install bypasses the unsafe-import check that would otherwise block the plugin because it uses node:child_process. |
# 1. Build the plugin
npm install
npm run build
# 2. Patch OpenClaw middleware trust + Pi runtime context
node scripts/patch-openclaw-trust-workflow-middleware.mjs --plugin openclaw-workflow
# 3. Install as a linked plugin
openclaw plugins install --link . --dangerously-force-unsafe-install
# 4. Restart the OpenClaw gateway so it picks up the patch and the linked installRe-run the patch after every OpenClaw update β it only modifies the middleware trust guard and, when a source checkout is available, the Pi embedded-runner middleware context wiring. Use --dry-run to preview what it would change, or --restore to revert:
# Preview changes without writing anything
node scripts/patch-openclaw-trust-workflow-middleware.mjs --plugin openclaw-workflow --dry-run
# Revert the patch
node scripts/patch-openclaw-trust-workflow-middleware.mjs --restoreThe script locates the OpenClaw gateway/runtime files that contain the middleware trust guard and patches that guard:
// Before patch:
if (record.origin !== "bundled") { /* reject */ }
// After patch:
if (record.origin !== "bundled" && record.id !== "openclaw-workflow") { /* reject */ }Nothing else is modified. The patch is idempotent β running it twice is safe.
When the OpenClaw source checkout contains src/agents/pi-embedded-runner/extensions.ts, the same script also patches the Pi tool_result extension boundary so the middleware runner is built with stable identity from sessionManager:
const middlewareCtx = {
runtime: "pi",
agentId: sessionManagerAny.agentId ?? sessionManagerAny.agent?.id,
sessionId:
sessionManagerAny.sessionId ??
sessionManagerAny.session?.id ??
sessionManagerAny.currentSessionId,
sessionKey:
sessionManagerAny.sessionKey ??
sessionManagerAny.session?.key ??
sessionManagerAny.key,
runId: sessionManagerAny.runId ?? sessionManagerAny.currentRunId,
};With OPENCLAW_WORKFLOW_TRACE=1, the patched Pi extension also logs:
[openclaw-trace] pi.tool_result.middleware_context
so you can verify the middleware context before and after the fix.
If the patch was not applied and the middleware registration fails, behavior depends on your config:
- Default (no
requireSealedToolResultMiddleware): Plugin starts normally; sealedtool_workersteps that needrecordObservationBeforeModelwill throw at runtime when they attempt to assert that capability. requireSealedToolResultMiddleware: true: Plugin refuses to start entirely, which makes the failure visible at gateway startup rather than at step execution time.
Plugin config lives in ~/.openclaw/openclaw.json under plugins.entries."openclaw-workflow".config:
{
"plugins": {
"entries": {
"openclaw-workflow": {
"enabled": true,
"config": {
"workflowsDir": "~/.openclaw/workflows",
"runsDir": "~/.openclaw/workflow-runs",
"baseDir": "/home/user/myproject",
"concurrency": 3,
"notifyChannel": "telegram",
"sessionModel": "anthropic/claude-sonnet-4-6",
"sessionAdapter": "auto",
"pollIntervalMs": 5000,
"stateBackend": "auto",
"redisUrl": "redis://localhost:6379",
"redisMcpToolPrefix": "MCP_DOCKER",
"filesystemFallback": true,
"materializeOutputs": "on_demand",
"requireSealedToolResultMiddleware": true,
"sealedMaxPreviewBytes": 2048
}
}
}
}
}
requireSealedToolResultMiddlewareβ Set totrue(default) after applying the trust patch so the plugin fails at gateway startup if middleware registration is rejected, rather than silently degrading at step runtime. Set tofalseif you are not using sealedtool_workersteps.
sealedMaxPreviewBytesβ Controls how many bytes of each intercepted tool result are inlined into the model context. Everything beyond this limit is spooled to the artifact store and retrievable viaworkflow_observation_read. Default2048.
State/artifact backend config notes:
stateBackend:filesystem|redis|auto|dual(defaultfilesystem)redisUrl: enables native Redis backend resolution when configuredredisMcpToolPrefix: MCP Redis adapter prefix (defaultMCP_DOCKER)filesystemFallback: fallback to file-backed state/artifacts when Redis is unavailablematerializeOutputs:never|on_demand|always
Redis notes:
- Native Redis mode uses
ioredisat runtime (included in this package dependencies). - MCP Redis mode expects commands exposed as
<PREFIX>__get,<PREFIX>__set,<PREFIX>__hset, etc. (for exampleMCP_DOCKER__get).
Create your workflows directory and restart/verify the gateway:
mkdir -p ~/.openclaw/workflows
openclaw plugins list --verbose
openclaw plugins inspect openclaw-workflow --json
openclaw plugins doctorOn Windows/WSL-mounted directories, OpenClaw may block linked plugins when the source path is reported as world-writable (mode=777). Move or copy the plugin to a non-world-writable path if inspection says the plugin candidate was blocked.
The plugin chooses how to spawn subagent sessions based on the available OpenClaw API surface.
Precedence:
OPENCLAW_WORKFLOW_SESSION_ADAPTERenvironment variable (override)sessionAdapterplugin configuration- Default:
"auto"(selects the best available)
Available Adapters:
-
runtime-subagent(Preferred): Uses the modernapi.runtime.subagentSDK. -
legacy-api: Uses the olderapi.sessionssurface. -
cli: Fallback toopenclawCLI cron jobs. -
auto: Triesruntime-subagent$\rightarrow$ legacy-api$\rightarrow$ cli.
If a specific adapter is forced but the required API is missing, the plugin will fail fast with an error.
When a workflow contains sealed tool_worker steps with context firewall enforcement, adapter selection is capability-aware. The selected adapter must support:
- tool-result interception
- transcript firewall
- artifact sink
If those capabilities are unavailable, execution fails before worker spawn (rather than degrading to prompt-only sealing).
1. Create a workflow file:
cat > ~/.openclaw/workflows/hello.yml << 'EOF'
name: Hello Pipeline
version: "1.0"
steps:
- id: greet
name: "Greeter"
task: "Write a friendly greeting to output/hello-{date}.txt"
timeout: 60
outputs:
- "output/hello-{date}.txt"
- id: followup
name: "Follow-up"
depends_on: [greet]
task: "Read the greeting from output/hello-{date}.txt and write a response to output/response-{date}.txt"
timeout: 60
EOF2. List available workflows:
workflow_list()
3. Dry run (validate without executing):
workflow_run({ name: "hello", dry_run: true })
4. Run the pipeline:
workflow_run({ name: "hello" })
# β { run_id: "hello-pipeline-20260309T082000", status: "running", ... }
5. Check status:
workflow_status({ name: "hello" })
# β { status: "ok", steps_ok: 2, steps_total: 2, ... }
OpenClaw Workflow supports two YAML surfaces:
-
Authoring schema (public)
Compact, human-friendly workflow format for writing pipelines. -
Execution schema (internal target)
Low-level runtime format produced by the compiler.
Authoring workflows are compiled into execution workflows before validation and execution:
authoring YAML -> authoring-loader -> authoring-compiler -> execution WorkflowDefinition -> workflow-loader normalize/validate -> template-schema-validator -> workflow-executor
Raw execution-schema workflow files are disabled in the public load path. For migration-only workflows, use the internal migration loader (loadLegacyExecutionWorkflowForMigrationOnly) rather than normal loadWorkflow / loadWorkflowFromFile.
Security note: trust is based on in-memory compilation, not on YAML metadata. A hand-written __compiled_from field in input YAML is not treated as trusted compiler output.
Authoring example:
schema: authoring
name: Example
collections:
jobs:
key: job_id
queues: [pending, done]
pipeline:
- collect_jobs:
uses: browser
writes: jobs.pending
- classify_jobs:
uses: model
for_each: jobs_ready
parser: json
outputs:
- id: classification_results
- publish_state:
uses: plugin
operation: workflow.state_publish
state_publish:
from_step: collect_jobs
output: jobs_pending
collection: jobs
queue: jobs_pending
- classify_all_jobs:
uses: drain
worker_group: classification
worker:
uses: model
task: Classify claimed jobs.
outputs:
- id: classified_jobsCompiled execution outline (abridged):
name: Example
state:
collections: ...
queues: ...
worker_groups: ...
steps:
- id: collect_jobs
kind: sealed
...
- id: __publish_jobs_pending_from_collect_jobs
kind: plugin
uses: workflow.state_publish
...
- id: classify_jobs
kind: loop_subagent
...
- id: classify_all_jobs
kind: state_drain
...
### Authoring-specific fields (public schema)
In addition to `resources`, `collections`, `profiles`, and `pipeline`, the authoring schema supports:
- top-level `state` (merged with compiler-generated collection state)
- top-level `required_skills` (public skills only; engine-native state backends remain internal)
- top-level `concurrency` and `version`
- `defaults.sealed` to set global sealed-worker policy overrides
Authoring step additions:
- `outputs` supports rich arrays:
- string entries (`- jobs_ready`)
- objects with `id`, `path`, `validate`, `optional`, and `materialize.{path,mode}`
- plugin op shorthand via top-level `operation` (preferred over `with.operation`, still backward-compatible)
- for state plugin specs (`state_publish`, `state_query`, etc.), authoring prefers top-level fields; compiler normalizes compiled/runtime shape to always include `with.state_*` while preserving top-level `state_*` for audit/debug compatibility
- conflicting top-level vs `with.state_*` values are compile-time errors (runtime does not guess)
- public sealed-loop authoring via `for_each` (+ `parser`, `item_schema`) for browser/model steps
- named drain controller authoring via `uses: drain` + `worker_group`, `claim`, `worker`, `complete`
- step-level `sealed` overrides merged over defaults/profile
- `input.claim` for engine-injected bounded claim context (preferred)
- `input_context` as a backward-compatible alias for injected claim context
Example rich output object:
```yaml
outputs:
- id: jobs_ready
path: data/linkedin/job-alerts/jobs-ready-{date}.json
validate: jobs_array
materialize:
path: data/linkedin/job-alerts/jobs-ready-{date}.json
mode: always
## Workflow YAML Schema Reference
### Top-level fields
| Field | Type | Required | Default | Description |
|---------------|----------|----------|---------|-------------|
| `name` | string | β
| β | Human display name. Used in notifications and slugified for run IDs. |
| `version` | string | β | `"1.0"` | Schema version for future compatibility. |
| `description` | string | β | `""` | Human description shown in `workflow_list`. |
| `steps` | array | β
| β | Ordered list of step definitions. |
| `concurrency` | number | β | `3` | Max steps that run in parallel. |
| `state` | object | β | β | Storage backend declaration for state and artifacts. See [Artifact-Backed Declared Outputs](#artifact-backed-declared-outputs). |
| `config` | object | β | `{}` | Top-level configuration variables accessible via `{config.X}` substitution. |
| `validators` | object | β | `{}` | Custom validation rules for output checks, supporting schemas and conditional outcomes (`pass_when`, `retry_when`, `block_when`, `fail_when`). |
| `required_skills` | string[] | β | `[]` | Skills required for the entire workflow. Steps without their own `required_skills` inherit these. Injected as instructions into step prompts and verified against agent config. |
| `required_mcp_servers` | string[] | β | `[]` | External capability servers required by worker steps. Do not list engine-native state backends here. |
`state.contracts` lets you define **semantic state contracts** (for example, collection lifecycle semantics) that the runtime projects to Redis/native state views after outputs validate.
`state.collections`, `state.queues`, and `state.worker_groups` let you define the same lifecycle as named semantic resources for explicit plugin-step state operations such as `workflow.state_publish`, `workflow.state_claim`, `workflow.state_complete`, `workflow.state_query`, `workflow.state_patch_outputs`, `workflow.state_partition`, and `workflow.state_report`.
### Step fields
| Field | Type | Required | Default | Description |
|----------------|-----------|----------|---------|-------------|
| `id` | string | β
| β | Unique step identifier. Must match `[a-zA-Z0-9_-]+`. Used in `depends_on` references and state files. |
| `name` | string | β | Same as `id` | Human display name for notifications. |
| `kind` | string | β | inferred | Step execution kind: `subagent`, `loop_subagent`, `plugin`, `state_drain`, or `sealed`. If omitted, loop steps infer `loop_subagent`, drain-controller steps infer `state_drain`, otherwise `subagent`. |
| `task` | string | β
* | β | The agent prompt / task description. Supports [variable substitution](#variable-substitution). (*Not required for `kind: plugin` steps or loop containers using `for_each`) |
| `uses` | string | β
** | β | Plugin operation ID for `kind: plugin` steps (for example, `workflow.cache_json_document`). |
| `with` | object | β | `{}` | Parameter map passed to a `kind: plugin` operation. Supports [variable substitution](#variable-substitution). |
| `depends_on` | string[] | β | `[]` | IDs of steps that must complete (`ok`) before this step runs. |
| `outputs` | array | β | `[]` | Output validation rules. Supports simple file existence checks or detailed objects with custom validators and schemas. Supports [variable substitution](#variable-substitution). |
| `for_each` | string\|object | β | β | List source for loop expansion. Supports variable/path strings (e.g., `"{songs}"`) and artifact refs (`{ from_step, output }`). |
| `parser` | string | β | `"auto"` | Parser to use for the loop list (`"json"`, `"csv"`, `"newline"`, `"auto"`). |
| `item_schema` | object | β | β | Optional schema to validate each item in the loop list (type, required fields, patterns). |
| `steps` | array | β | `[]` | Steps to execute for each item in the `for_each` list. |
| `model` | string | β | Plugin default | LLM model override for this step's session (e.g. `"anthropic/claude-opus-4"`). |
| `concurrency` | number | β | Global limit | Max parallel instances of this specific step. Useful for avoiding rate limits on specific tools/APIs. |
| `timeout` | number | β | `300` | Maximum execution time in **seconds**. Step is marked failed on timeout. |
| `retry` | number | β | `0` | Number of retry attempts after first failure. `retry: 2` = up to 3 total attempts. |
| `retry_delay` | number | β | `30` | Seconds to wait between retry attempts. |
| `retry_on` | string[] | β | `[]` | Specific failure kinds to retry on (e.g., `["missing_file", "timeout"]`). If empty, only retries when `retryable` is true. |
| `retry_except` | string[] | β | `[]` | Specific failure kinds that prevent retry, even if `retry > 0` or `retry_on` matches. |
| `optional` | boolean | β | `false` | If `true`, step failure doesn't fail the pipeline or block dependent steps. |
| `always_run` | boolean | β | `false` | If `true`, step runs regardless of dependency failure. |
| `on_block` | string | β | `"block_run"` | Behavior when blocked: `"block_run"` (fails pipeline) or `"continue"`. |
| `required_skills` | string[] | β | `[]` | Skills required for this specific step. Overrides workflow-level `required_skills`. Injected as instructions into the step prompt and verified against agent config. |
| `required_mcp_servers` | string[] | β | `[]` | External capability servers required by this worker step. Use `required_skills` (for example `browser-harness`) for skill-level capabilities. |
| `state_contract` | string\|string[] | β | β | Semantic state contract name(s) to project after step outputs validate. The worker only produces outputs; runtime handles Redis/state materialization. |
| `state_publish` | object\|object[] | β | β | Semantic publish specification for `kind: plugin` steps using `workflow.state_publish`. Reads a prior artifact and publishes items into a configured collection/queue. |
| `state_consume` | object | β | β | Semantic consume/claim specification for `kind: plugin` steps using `workflow.state_claim`. Resolves a queue or worker group and emits a claim manifest artifact. |
| `state_reclaim` | object | β | β | Semantic reclaim specification for `kind: plugin` steps using `workflow.state_reclaim_expired`. Requeues expired or orphaned in-flight claims before another worker pass. |
| `state_complete` | object\|object[] | β | β | Semantic completion specification for `kind: plugin` steps using `workflow.state_complete`. Marks claimed items completed/failed using result artifacts from a prior step. |
| `state_query` | object | β | β | Query specification for `kind: plugin` steps using `workflow.state_query`. Produces bounded artifacts from semantic Redis state. |
| `state_partition` | object | β | β | Partition specification for `kind: plugin` steps using `workflow.state_partition`. Splits semantic items into bounded outputs and optional queues. |
| `state_patch_outputs` | object | β | β | Patch specification for `kind: plugin` steps using `workflow.state_patch_outputs`. Merges result artifacts back into semantic Redis documents. |
| `state_report` | object | β | β | Report specification for `kind: plugin` steps using `workflow.state_report`. Produces bounded JSON/Markdown reports from semantic state. |
| `input` | object | β | β | Engine-injected runtime input configuration. Use `input.claim` to inject bounded claim payloads from a dependency claim artifact into worker context. |
| `input_context` | object | β | β | Backward-compatible alias for claim injection config. Prefer `input.claim`. |
| `drain` | object | β | β | Scheduler/controller spec for `kind: state_drain`. Repeatedly expands nested steps until `workflow.state_claim` returns an empty batch. |
| `sealed` | object | β
* | β | Configuration for `kind: sealed` execution boundary. Required for `kind: sealed`. Supports command-mode execution and worker-mode context-firewall policies. |
| `skip_if_empty` | string | β | β | Path to a file that, if missing or containing no valid records (parsed as JSON/CSV/Newline), causes this step to be skipped and marked `ok`. Supports [variable substitution](#variable-substitution). |
| `complete_when` | string | β | `"session"` | Determines completion criteria: `"session"`, `"outputs"`, `"session_then_outputs"`, `"handoff"`, or `"handoff_or_outputs"`. |
| `signaling` | string | β | auto for `handoff`/`handoff_or_outputs`, otherwise `off` | Controls plugin-injected signaling instructions. `"auto"` injects `workflow_step_update` + `workflow_step_complete` protocol into the runtime prompt so authors don't need to repeat this boilerplate in every step task. `"off"` disables injection for that step. |
| `output_contract_version` | number | β | `null` | Optional explicit contract version for cache freshness signatures. Increment to invalidate older cache artifacts even when files are structurally valid. |
| `reuse_outputs` | object | β | β | Structured cache adoption policy. Supports pre-launch reuse checks with validator + signature freshness gates. |
`**` `uses` is required when `kind: plugin`.
`*` `sealed` is required when `kind: sealed`.
Authoring-first note: in normal user workflows (`schema: authoring`), the compiler emits execution kinds such as `sealed`, `plugin`, and internal `state_drain` controllers. Raw `subagent` / `loop_subagent` execution-schema steps are legacy-only and rejected unless legacy execution loading is explicitly enabled.
### Sealed Steps (`kind: sealed`)
`sealed` is the generic worker primitive for enforcing a data-plane/control-plane split:
- **large/full results** are preserved in artifact storage
- **model context** receives only compact summaries/handles
- declared outputs remain the authoritative contract for step completion
Use `kind: sealed` instead of introducing many specialized worker kinds.
#### Sealed modes
- `tool_worker` (default): isolated worker session with policy hints for result/context handling
- `skill_worker`: same execution surface, authored for skill-centric worker tasks
- `adapter`: reserved adapter-directed worker mode
- `command`: bounded OS command execution with stdout/stderr spool + output-contract checks
`mode` defaults to:
- `command` when `sealed.command` is present
- otherwise `tool_worker`
#### Sealed spec fields
```yaml
sealed:
mode: tool_worker | skill_worker | adapter | command
no_model: false
command: # required when mode: command
argv: ["node", "scripts/transform.mjs"]
cwd: "."
env:
NODE_ENV: production
tools:
allow: []
deny: []
result_visibility:
mode: auto
inline_when_safe: true
preserve_full_results: true
spool_when_large: true
return_refs: true
lazy_read: true
expose_preview: true
context_firewall:
enabled: true
strategy: adaptive
on_context_pressure: spool_and_compact
tool_result_policy:
max_context_injection_bytes: auto
max_single_result_bytes_before_spool: auto
include_head_bytes: 512
include_tail_bytes: 512
preserve_full_result: true
mode: auto
stdout_policy:
max_stdout_bytes: 2048
max_stderr_bytes: 4096
max_process_output_bytes: 104857600
mode: spool_and_summarize
watchdog:
mode: progress_based
require_declared_outputs: true
detect_repeated_tool_calls: true
detect_repeated_navigation: true
repeated_tool_call_threshold: 3
on_no_progress: fail
return_contract:
type: json
max_context_bytes: auto
schema: {}
artifact_spool:
enabled: true
path: ".openclaw-workflow/sealed"
- id: normalize_manifest
kind: sealed
sealed:
mode: command
command:
argv: ["node", "scripts/normalize-manifest.mjs", "--date", "{date}"]
return_contract:
type: json
schema:
type: object
required: [status]
properties:
status:
type: string
outputs:
- id: normalized_manifest
validate: manifest_schema- id: collect_jobs
kind: sealed
task: |
Collect jobs and commit declared outputs.
sealed:
mode: tool_worker
context_firewall:
enabled: true
outputs:
- id: jobs_manifestSealed tool_worker steps with context firewall require adapter-enforced runtime capabilities. If the chosen adapter cannot enforce tool-result interception, transcript firewall, and artifact sink, the run fails before spawning the worker.
This prevents silent downgrade from sealed runtime enforcement to prompt-only behavior.
Plugin steps execute built-in workflow operations directly in the orchestrator (no spawned subagent session).
Rules:
- must declare
kind: plugin - must include
uses: <operation_id> - may include
with: { ... }operation arguments - cannot use
for_each - may still use
depends_on,outputs, retry policy, and variable substitution
For semantic state plugin operations (workflow.state_publish, workflow.state_query, workflow.state_partition, workflow.state_report, etc.):
- authoring can define config at top-level
state_* - compiler normalizes compiled config so
with.state_*is populated - top-level
state_*is retained on compiled steps for traceability - if both top-level and
with.state_*are present but differ, compilation fails
Built-in operation IDs:
workflow.cache_json_documentworkflow.state_initworkflow.redis_run_initializerworkflow.state_publishworkflow.state_claimworkflow.state_reclaim_expiredworkflow.state_completeworkflow.state_queryworkflow.state_patch_outputsworkflow.state_partitionworkflow.state_report
Reads a JSON file, commits it as a declared artifact output, and (when Redis is available) mirrors it into Redis.
with fields:
- required:
source_path,json_key - conditionally required:
hash_key(required when Redis is configured) - optional:
allowed_hash_fields,ttl_seconds,base_dir,output_id
Example:
- id: cache_profile
kind: plugin
uses: workflow.cache_json_document
with:
source_path: data/profile-{date}.json
json_key: cache:profile:{run_id}
hash_key: cache:profile_hash:{run_id}
allowed_hash_fields: [profile_id, status]
output_id: profile_cache
outputs:
- id: profile_cache
validate: profile_schemaInitializes run metadata/counters/stream-group idempotently, and commits an initialization artifact even when Redis is unavailable.
workflow.redis_run_initializer remains supported as a backward-compatible alias.
with fields:
- required:
run_key - optional:
stream_key,stream_group(defaultworkers),counter_keys,metadata,ttl_seconds,output_id
Example:
- id: init_run_state
kind: plugin
uses: workflow.state_init
with:
run_key: runs:{run_id}
stream_key: events:{run_id}
stream_group: workers
counter_keys:
processed:{run_id}: 0
failed:{run_id}: 0
metadata:
workflow: "{workflow_name}"
run_id: "{run_id}"
output_id: run_config
outputs:
- id: run_configUse these top-level blocks when you want workflow YAML to describe what state exists while the plugin decides how it maps to Redis-backed views.
state:
backend: auto
collections:
task_alerts:
entity: alert
item_key: alert_key
default_queue: task_alerts_pending
indexes: [route, status, submitted]
views:
document: true
metadata_hash: true
seen_index: true
pending_queue: true
event_stream: true
counters:
published: task_alerts_published
completed: task_alerts_completed
failed: task_alerts_failed
queues:
task_alerts_pending:
collection: task_alerts
batch_size: 25
visibility_timeout_s: 900
worker_groups:
task_alert_classifier:
queue: task_alerts_pending
batch_size: 10
lease_seconds: 900Semantic resource notes:
collections.<name>defines the entity model and keying rules.queues.<name>defines batching / pending-work semantics for a collection.worker_groups.<name>defines how claimers resolve a queue plus lease behavior.collections.<name>.indexesdefines secondary index fields maintained as sets ({prefix}:set:{collection}:idx:{field}:{value}:{date}).
Publishes items from a prior step artifact into a semantic collection and queue, then writes a summary artifact.
Use when:
- a worker step already produced a manifest artifact
- you want the orchestrator/plugin to materialize queue/state views
- you do not want subagents hand-writing Redis commands
state_publish fields:
- required:
from_step,output - recommended:
collection - optional:
source(auto|exact|descendants),queue,select,item_key,summary_output
Example:
- id: publish_task_alert_state
kind: plugin
uses: workflow.state_publish
state_publish:
from_step: collect_task_alerts
output: alerts_manifest
collection: task_alerts
queue: task_alerts_pending
item_key: alert_key
summary_output: state_publish_summary
depends_on: [collect_task_alerts]
outputs:
- id: state_publish_summaryBehavior:
- reads the declared artifact from
from_step/output - in
automode, resolves either the exact artifact (from_step.output) or descendant loop/drain artifacts (from_step:<n>:...) - selects items from the artifact payload (default root item list)
- publishes documents / hashes / queue entries / stream events according to the collection config
- first-seen items increment
published_count; previously-seen items incrementupdated_count - queue enqueue remains idempotent (no duplicate pending entries for the same item key)
- increments configured counters when present
- commits a summary artifact for downstream auditing
Important:
- artifact-only fallback is only safe for non-queue projections. If the publish writes to a queue, Redis is required because there is no filesystem-backed queue consumer yet.
Claims work from a semantic queue or worker group and writes a claim manifest artifact for downstream worker steps.
state_consume fields:
- required: one of
queueorworker_group - required:
output - optional:
batch_size,lease_seconds
Example:
- id: claim_task_alert_batch
kind: plugin
uses: workflow.state_claim
state_consume:
worker_group: task_alert_classifier
output: claim_manifest
depends_on: [publish_task_alert_state]
outputs:
- id: claim_manifest
- id: classify_claimed_alerts
depends_on: [claim_task_alert_batch]
input:
claim:
from: claim_manifest
inject_as: claim
max_bytes: 32768
require_lease: true
expose_artifact_path: false
task: |
Process the engine-injected claim and classify each item.
outputs:
- id: classification_resultsBehavior:
- resolves the queue from
state_consume.queueor the declared worker group - reclaims expired leases back to pending before new claims
- also reclaims orphaned processing entries that have no active lease record (for example, after a crash between queue move and lease bookkeeping)
- claims up to the configured batch size using atomic pending β processing movement
- writes active leases during claim; when Lua/eval is available this happens in the same Redis operation as the queue move
- appends claim events when event streams are enabled
- commits a manifest artifact containing flattened claimed items (plus
item_keyandlease) for downstream steps - includes deterministic summary fields such as
claimed_count,reclaimed_expired_count, andreclaimed_orphaned_count
Important:
workflow.state_claimrequires Redis. There is currently no filesystem-backed queue implementation, so artifact-only mode is intentionally rejected for queue claims.
Workers can consume claim manifests through engine-injected bounded context rather than reading artifact files/paths directly.
Preferred step syntax:
input:
claim:
from: claim_manifest
from_step: claim_task_alert_batch # optional; auto-resolved from immediate depends_on when omitted
inject_as: claim # default: claim
max_items: 50 # optional; default: all claimed items
max_bytes: 32768 # optional; default: 32768
include_fields: [item_key, lease, route, href] # optional projection
require_lease: true # default: true
expose_artifact_path: false # default: falseBackward-compatible alias:
input_context:
from_claim: claim_manifest
mode: injectedRuntime behavior:
- executor resolves the producer claim artifact before worker spawn
- injects a bounded JSON object into worker context (default key:
claim) - rejects invalid/unsafe configs (for example
max_bytes <= 0,max_items < 0) - blocks
expose_artifact_path: truefor agent worker steps - supports both normal and sealed worker execution paths
Authoring drain behavior:
- drain-generated worker steps now default to injected claim input when they depend on the nested
claimstep - generated worker prompt guidance is: process only engine-injected claim, do not read claim artifact paths/directories
Requeues expired leases β and orphaned processing entries without a matching lease record β without claiming new work.
Use when:
- you want an explicit recovery/repair step in the workflow graph
- you want lease cleanup to run on a schedule or before a specific worker wave
state_reclaim fields:
- required: one of
queueorworker_group - optional:
output
Example:
- id: reclaim_task_alert_batch
kind: plugin
uses: workflow.state_reclaim_expired
state_reclaim:
worker_group: task_alert_classifier
output: reclaim_summary
outputs:
- id: reclaim_summaryBehavior:
- scans the queue's processing list and active lease hash
- requeues expired active leases back to pending
- requeues orphaned processing entries that have no active lease record
- writes a recovery summary artifact with
reclaimed_expired_count,reclaimed_orphaned_count, andreclaimed_count
Marks claimed items as completed or failed based on a result artifact written by a downstream worker step.
state_complete fields:
- required:
from_step,output - required: one of
collection,queue, orworker_group - optional:
select,item_key,status_field,summary_output,merge_document,merge_fields,indexes,require_rows,fail_on_skipped,fail_on_stale
Example:
- id: complete_task_alert_batch
kind: plugin
uses: workflow.state_complete
state_complete:
from_step: classify_claimed_alerts
output: classification_results
worker_group: task_alert_classifier
collection: task_alerts
select: $.items
item_key: alert_key
status_field: status
merge_document: true
require_rows: true
fail_on_skipped: true
fail_on_stale: true
summary_output: state_complete_summary
depends_on: [classify_claimed_alerts]
outputs:
- id: state_complete_summaryBehavior:
- reads result items from the upstream artifact
- verifies lease identity when lease metadata is present
- skips stale lease completions (records
stale_count) rather than incorrectly completing newer claims - removes matching active leases and processing-queue entries for accepted completions
- places items into semantic completed/failed membership sets
- optionally merges completion/result fields back into Redis document/hash state (
merge_document/merge_fields) - maintains configured secondary indexes during merge (
collections.<name>.indexesplus per-stepindexes) - appends lifecycle events to the collection stream when configured
- increments completion/failure counters and writes a summary artifact
- supports strict safety gates:
require_rows: truefails completion when selection returns zero rowsfail_on_skipped(default true) fails when rows are skipped due to missing item keysfail_on_stale(default true) fails when stale lease completions are detected
After each item is completed, workflow.state_complete can atomically enqueue the item key into one or more downstream semantic queues based on route, status, or field-match conditions.
Add a route_queues list to state_complete:
state_complete:
from_step: classify
output: classification_results
worker_group: jobs_classifier
collection: jobs
route_queues:
- queue: jobs_easy_apply # target semantic queue name from workflow.state.queues
when:
route: easy_apply # match on item field value
lifecycle: queued # optional lifecycle tag written into document
- queue: jobs_needs_review
when:
status: needs_reviewStateRouteQueueSpec fields:
- required:
queueβ semantic queue name (workflow.state.queues.<name>) - optional:
whenβ field/value predicate map; all conditions must match (AND) - optional:
lifecycleβ string written to the documentlifecyclefield on enqueue - optional:
collectionβ override the collection (defaults to the enclosingstate_completecollection)
The summary artifact output by state_complete includes routing statistics:
{
"routed_count": 42,
"routed_by_queue": { "jobs_easy_apply": 30, "jobs_needs_review": 12 },
"route_duplicates": 3,
"route_skipped": 0
}route_duplicatescounts items that were already members of the target queued-set (no double-enqueue).route_skippedcounts items where no route_queues rule matched.
Queries a semantic collection from Redis and writes bounded query results as an artifact.
state_query fields:
- required:
collection,output - optional:
where,projection,limit,offset,summary_output
Example:
- id: query_ready_alerts
kind: plugin
uses: workflow.state_query
state_query:
collection: task_alerts
where:
all:
status: ready
projection: [item_key, status, route]
limit: 500
output: ready_alerts
summary_output: state_query_summary
outputs:
- id: ready_alerts
- id: state_query_summaryBehavior:
- uses collection secondary indexes for simple equality,
allintersections, andanyunions when possible - falls back to collection seen-set scan + predicate matching when needed
- always re-checks matched documents with the full
wherepredicate for correctness - commits query output and optional summary artifact
Merges rows from one or more artifacts into semantic Redis documents/hashes without exposing those rows to model context.
state_patch_outputs fields:
- required:
collection,output - required in practice:
from_step(or plugin step dependency) - optional:
source(auto|exact|descendants),select,item_key,merge_fields,status_field,indexes,summary_output
Example:
- id: patch_classification_back
kind: plugin
uses: workflow.state_patch_outputs
state_patch_outputs:
from_step: classify_claimed_alerts
output: classification_results
collection: task_alerts
item_key: alert_key
merge_fields: [route, status, submitted]
indexes: [route, status, submitted]
summary_output: state_patch_outputs_summary
outputs:
- id: state_patch_outputs_summaryBehavior:
- reads matching artifacts from
from_stepusing source-mode resolution (including colon-prefixed loop/drain descendants) - merges selected fields into Redis document + hash records
- updates secondary indexes for merged fields and removes stale index memberships on value changes
- commits a summary artifact with patched/skipped counts
Partitions collection items into named bounded outputs and can optionally enqueue each partition into a semantic queue.
state_partition fields:
- required:
collection,partitions - optional:
projection,limit_per_partition,item_key,summary_output
Example:
- id: partition_alerts
kind: plugin
uses: workflow.state_partition
state_partition:
collection: task_alerts
projection: [item_key, status, route]
partitions:
ready:
where: { status: ready }
output: alerts_ready
queue: task_alerts_pending
blocked:
where: { status: blocked }
output: alerts_blocked
summary_output: state_partition_summary
outputs:
- id: alerts_ready
- id: alerts_blocked
- id: state_partition_summaryBehavior:
- resolves each partition independently via semantic query filters
- commits one output artifact per partition
- validates queue ownership (
state.queues.<name>.collection) before enqueueing - optionally enqueues each partitioned item and writes partition events to stream
- when
partition.lifecycleis set, patches document state (lifecycle,queued_for,partition) to keep Redis state aligned with routing - commits aggregate partition summary
Generates bounded final reporting artifacts directly from semantic Redis state.
state_report fields:
- required:
json_output - optional:
collections,counters,include_samples,markdown_output
Example:
- id: final_state_report
kind: plugin
uses: workflow.state_report
state_report:
collections: [task_alerts]
counters: [task_alerts_published, task_alerts_completed, task_alerts_failed]
include_samples: 25
json_output: final_report_json
markdown_output: final_report_md
outputs:
- id: final_report_json
- id: final_report_mdBehavior:
- reports seen/completed/failed totals by collection
- reports per-index value breakdowns when requested (or from collection defaults)
- optionally includes bounded sample items per collection
- optionally resolves configured counters
- emits JSON report and optional Markdown report artifacts
There are now two complementary semantic patterns:
state_contract: best when a normal worker step should produce outputs and the runtime should project state automatically after validation.workflow.state_publish/workflow.state_claim/workflow.state_complete: best when you want the workflow graph to model publish/claim/complete as explicit orchestration steps.
Both keep Redis details out of worker prompts and YAML-level imperative scripts.
Current limitation:
state_contractand the explicit plugin-step state operations do not currently share the same key naming convention.state_contractprojects keys using the contractentity(for examplequeue:alerts:pending), while explicit plugin steps key by collection/queue names. Until those namespaces are unified, do not mix both approaches for the same records if you need a single canonical queue/state view.
Use state_drain when you want the orchestrator itself to act as a queue-drain scheduler/controller.
What it does:
- starts an iteration
- runs nested steps (which should include a
workflow.state_claimplugin step) - inspects the claim artifact (
claimed_count,valid_count, oritems.length) - if claimed count is
0, treats the queue as empty for that iteration - stops after
max_empty_claimsconsecutive empty claims - otherwise expands the next iteration and continues
drain fields:
- required:
worker_group - optional:
max_empty_claims(default1) - optional:
max_iterations(null/unset = no explicit cap) - optional:
max_active_iterations(default1) β number of concurrent active drain iterations
Example:
- id: classifier_drain
kind: state_drain
name: Drain classifier queue
depends_on: [publish_task_alert_state]
drain:
worker_group: task_alert_classifier
max_empty_claims: 1
max_iterations: 100
steps:
- id: claim
kind: plugin
uses: workflow.state_claim
state_consume:
# worker_group can be omitted here; controller injects drain.worker_group if missing
output: claim_manifest
outputs:
- id: claim_manifest
- id: classify
depends_on: [claim]
task: |
Read claim_manifest and classify claimed items.
outputs:
- id: classification_results
- id: complete
kind: plugin
uses: workflow.state_complete
depends_on: [classify]
state_complete:
from_step: classify
output: classification_results
worker_group: task_alert_classifier
collection: task_alerts
require_rows: true
fail_on_skipped: true
fail_on_stale: true
summary_output: state_complete_summary
outputs:
- id: state_complete_summaryNotes:
state_drainis a scheduler/controller step, not a plugin operation ID.- nested step instances are expanded dynamically as
<drain_step_id>:<iteration>:<child_id>. - controller keeps up to
max_active_iterationsiterations active concurrently. - non-optional child
failed/blockedstatuses fail the controller. - on empty claim, pending downstream children in that iteration are marked
skipped.
By default, state_drain uses drain_mode: batch β it stops after max_empty_claims consecutive empty iterations.
Set drain_mode: streaming to keep the drain alive while upstream producers are still running. The controller will idle-poll instead of immediately stopping on an empty claim, and will only close when stop conditions are met.
Additional drain fields for streaming mode:
drain_mode:"batch"(default) |"streaming"idle_wait_seconds: seconds to wait between idle polls (default10)max_idle_seconds: hard timeout in seconds; drain closes even if stop conditions are not met (default600)stop_when.producers_done: list of step IDs β all must reach a terminal status (ok,failed,blocked, orskipped)stop_when.queues_empty: list of semantic queue names β all must have zero items in their pending list
Important: stop_when.producers_done step IDs are not treated as DAG dependencies of the drain step. They are checked at runtime inside the idle loop. If you also need the drain to wait for a producer to start before it runs, add that producer to depends_on explicitly.
Example β drain waits for a publisher step to finish before stopping:
- id: publisher
name: Publish jobs to queue
uses: plugin
# ...
- id: jobs_drain
kind: state_drain
name: Drain jobs queue (streaming)
depends_on: [] # drain can start immediately
drain:
worker_group: jobs_processor
drain_mode: streaming
idle_wait_seconds: 15
max_idle_seconds: 1800
stop_when:
producers_done: [publisher] # wait until publisher finishes
queues_empty: [jobs_pending] # and the queue is empty
steps:
- id: claim
kind: plugin
uses: workflow.state_claim
state_consume:
worker_group: jobs_processor
output: claim_manifest
outputs:
- id: claim_manifest
# ... worker and complete stepsSafely probes external job application landing pages before a browser agent attempts to apply. The plugin reads a claim artifact, evaluates each item's URL, classifies page safety, and enqueues safe items into a verified downstream queue.
Plugin operation ID: workflow.external_landing_preflight
external_landing_preflight spec fields:
- required:
claim_outputβ output ID from the upstream claim step - required:
outputβ artifact output ID for the preflight result - required:
collectionβ semantic collection name - required:
verified_queueβ semantic queue name to enqueue safe items into - optional:
from_stepβ step to read the claim artifact from (defaults to current step) - optional:
max_itemsβ cap on items to preflight per call (default: all claimed items) - optional:
max_redirectsβ redirect follow cap (default:5) - optional:
timeout_msβ per-item probe timeout (default:8000) - optional:
verified_lifecycleβ lifecycle tag written to queued items - optional:
include_fieldsβ item fields to carry forward into the output artifact - optional:
allowed_actionsβ probe actions permitted:navigate,read_dom_metadata,read_links,read_forms,read_buttons - optional:
blocked_actionsβ actions always blocked:submit_form,upload_file,download_file,execute_file,enter_credentials - optional:
safetyβ detection flags:require_domain_consistency,detect_downloads,detect_obfuscated_redirects,detect_login_or_verification,detect_assessment_or_screening,detect_unrelated_page - optional:
matchingβ URL/title match scoring weights and thresholds:title_weight,company_weight,url_weight,strong_match_threshold,ambiguous_threshold - optional:
decisionsβ classify decision codes intoeligible,blocked,skipped,retryablelists
Output artifact shape:
{
"status": "ok",
"generated_at": "2025-01-01T00:00:00.000Z",
"items": [
{
"job_id": "job_123",
"item_key": "job_123",
"lease": "...",
"route": "easy_apply",
"status": "eligible",
"submitted": false,
"retryable": false,
"reason": "eligible",
"landing_decision": "eligible",
"safe_to_attempt": true,
"entrypoint_url": "https://example.com/apply/123",
"matched_job_url": "https://example.com/apply/123",
"match_confidence": 1.0,
"match_confidence_bucket": "strong",
"external_preflight": {
"final_url": "https://example.com/apply/123",
"page_title": "Software Engineer",
"risk_flags": []
}
}
],
"rejected_items": [],
"workflow_result": { "ok": true, "retryable": false, "blocked": false, "failed": false }
}landing_decision values:
eligibleβ page appears safe to attemptblockedβ blocked by safety policylogin_requiredβ page redirected to login or authdownload_riskβ URL points to a downloadable fileredirect_limitβ followed too many redirectstimeoutβ probe timed outunrelated_pageβ page content does not match job descriptionskippedβ item had noapply_url/job_urlunknownβ undetermined
Example usage in an authoring workflow:
- id: preflight_easy_apply
kind: plugin
uses: workflow.external_landing_preflight
depends_on: [claim_easy_apply]
external_landing_preflight:
from_step: claim_easy_apply
claim_output: claim_manifest
output: preflight_results
collection: jobs
verified_queue: jobs_verified_easy_apply
max_items: 5
safety:
require_domain_consistency: true
detect_downloads: true
detect_login_or_verification: true
matching:
strong_match_threshold: 0.8
ambiguous_threshold: 0.5
outputs:
- id: preflight_resultsWhen signaling is enabled for a step, the plugin injects a runtime protocol into the spawned step prompt so workflow authors do not need to embed signaling boilerplate in every task.
Injected guidance includes:
- periodic
workflow_step_updateprogress updates - final
workflow_step_completehandoff request - current
run_idandstep_id - current
attemptandhandoff_tokenwhen available - declared-output contracts derived from
outputs[].validateandworkflow.validators write_outputcommit instructions for declared outputs- repair-and-retry behavior when completion is rejected due to invalid outputs
Default behavior:
complete_when: handofforcomplete_when: handoff_or_outputsβsignaling: auto- all other completion modes β
signaling: off
You can override per-step:
- id: produce_manifest
complete_when: handoff_or_outputs
signaling: auto # optional, auto by default for this complete_when
task: |
Build and validate manifest JSON outputs.
- id: custom_controller
complete_when: handoff
signaling: off # disable auto-injection if you need fully custom behavior
task: |
Run custom orchestration logic.Migration note:
- Existing workflows that already include manual βWorkflow signaling protocolβ text in
taskwill continue to work. - You can safely remove most of that repeated text and rely on
signaling: autofor cleaner workflow files.
Use semantic contracts when a step output should be interpreted as operational state (for example, a pending queue of alerts) without exposing Redis commands in YAML or prompts.
How it works:
- Worker writes declared outputs (
write_output/ normal output contract path). - Orchestrator validates outputs.
- If
state_contractis declared, runtime projects the validated artifact to state views. - If Redis is unavailable and policy is
artifact_only, outputs still pass and remain usable.
Example:
state:
backend: auto
fallback: filesystem
materialize_outputs: on_demand
redis:
provider: auto
tool_prefix: MCP_DOCKER
contracts:
task_alert_collection:
kind: collection
entity: alert
item_key: alert_key
source_output: alerts_manifest
raw_output: alerts_raw
metadata_output: alerts_metadata
summary_output: alerts_summary
lifecycle: pending
dedupe:
by: [saved_search_id, href, query]
state_views:
document: true
metadata_hash: true
seen_index: true
pending_queue: true
event_stream: true
counters:
collected: alerts_collected
rejected: alerts_rejected
on_no_redis: artifact_only
steps:
- id: collect_task_alerts
task: Collect task alert notifications.
state_contract: task_alert_collection
outputs:
- id: alerts_raw
- id: alerts_metadata
- id: alerts_manifest
- id: alerts_summaryNotes:
state_contractis semantic metadata, not an imperative Redis script.- Worker prompts are isolated to declared inputs/outputs and avoid backend implementation details.
- Current runtime projector supports
kind: collectionand safely scalarizes hash fields.
If a step has declared outputs, the plugin now injects a second runtime contract automatically.
You do not need any new YAML fields for this behavior.
What the injected contract tells the worker:
- declared outputs are owned by the workflow plugin
- declared output files must be committed with
write_output - the prompt includes the resolved validator contract, not just a validator name
- when a validator has a useful schema shape, the prompt includes the expected object/array structure and examples
That means a step like this:
- id: build_manifest
complete_when: outputs
task: Build today's alert manifest.
outputs:
- path: data/alerts-execution-manifest-{date}.json
validate: alert_manifest_arraycauses the runtime prompt to include guidance such as:
- exact declared output path
- validator ID (
alert_manifest_array) - resolved schema shape (for example,
JSON arraywith item fields) - semantic rules (
pass_when,retry_when,block_when,fail_when,unknown_policy) - an example item or blocked artifact when the validator shape makes that useful
The new write_output tool is the authoritative path for declared outputs.
For declared outputs it performs all of the following:
- resolves the declared output path and confirms it belongs to the current step
- resolves the validator from the existing
workflow.validatorsmap - validates the candidate value with the same validator engine used by output gating
- writes through a same-directory temp file
- re-validates the staged file from disk
- atomically renames the staged file into place
- records provenance in run state, including
run_id,step_id,attempt, path, bytes, decision, and SHA-256
Important consequences:
- malformed JSON from an older attempt no longer poisons a still-running retry forever; the worker can repair it and commit a fresh artifact atomically
- manual/direct writes to declared output files are still visible to final validation, but they do not cause early completion while the session is still running
- early
pass,blocked, orretryoutput decisions now require matching current-attempt provenance - once the session itself ends, the orchestrator still performs the normal final output validation before accepting completion
This keeps YAML stable while making runtime behavior much stricter and safer.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
enabled |
boolean | β | false |
Enables pre-launch cache adoption checks for this step. |
when |
string | β | β | CEL expression to decide if cache reuse is allowed (permission gate). |
require |
string | β | "declared_outputs" |
Currently must be "declared_outputs". |
accept_decisions |
string[] | β | ["pass"] |
Which merged validator decisions are adoptable (e.g., include "blocked" if desired). |
require_signature |
boolean | β | true |
Requires cache manifest/signature match before adoption. |
legacy_unsigned_cache |
string | β | "stale" |
Policy for old artifacts without signatures: "stale" or "allow_if_valid". |
freshness.include |
string[] | β | all supported | Signature components to include: output_contract_version, step_task, validators, schemas, selected_config, input_signature. |
on_hit.reason |
string | β | "cache_hit" |
Audit reason persisted when cache is adopted. |
on_invalid |
string | β | "run_step" |
Behavior when cache is invalid/stale: "run_step" or "fail_step". |
Retry Policy Notes:
retry_on and retry_except use Failure Kinds. Available kinds: timeout, timeout_stop_confirmed, timeout_stop_unconfirmed, missing_file, schema, fail_when, parse, other.
Example Pattern: Conditional Execution
Use skip_if_empty to avoid launching expensive agents when there is no data to process:
- id: generate_report
name: "Generate Daily Report"
depends_on: [collect_data]
skip_if_empty: "data/daily_metrics.json" # Skip if no metrics were collected
task: "Analyze metrics in data/daily_metrics.json and write a report..."The following {variable} tokens are substituted in task and outputs fields at run time:
| Variable | Example value | Description |
|---|---|---|
{date} |
2026-03-09 |
Current date as YYYY-MM-DD (Workflow timezone) |
{datetime} |
2026-03-09T08:20:00 |
Current datetime as ISO-ish string (Workflow timezone) |
{utc_date} |
2026-03-09 |
Current date as YYYY-MM-DD (UTC) |
{utc_datetime} |
2026-03-09T08:20:00.000Z |
Current datetime as ISO 8601 (UTC) |
{run_id} |
seo-pipeline-20260309T082000 |
The unique run identifier |
{workflow_name} |
SEO Daily Pipeline |
The name of the workflow |
{workflow_run_id} |
seo-pipeline-20260309T082000 |
The unique run identifier |
{run_state_path} |
/home/user/.openclaw/workflow-runs/seo-pipeline-20260309T082000.json |
Path to the run state JSON file |
{item} |
Song-1.mp3 |
Current loop iteration value (only available inside for_each steps) |
{config.X} |
my-custom-value |
Value of variable X from the top-level config block |
{env.X} |
/home/user |
Value of environment variable X (for example {env.HOME}) |
\{variable} |
{date} |
Literal text (escaped). Prevents substitution. |
Unknown {variables} are typically left as-is, except in for_each path templates where they cause an immediate error.
Example:
task: "Write audit to data/seo/{date}/report.json for run {run_id}"
outputs:
- "data/seo/{date}/report.json"Start a workflow execution.
This is registered as an optional tool because it starts background agent work and writes run state. Enable optional tools in the OpenClaw tool UI/config when you want agents to launch workflows.
Input:
{
"name": "seo-pipeline",
"dry_run": false,
"resume": false
}| Parameter | Type | Required | Description |
|---|---|---|---|
name |
string | β | Workflow file stem (e.g. "seo-pipeline" for seo-pipeline.yml) |
dry_run |
boolean | β | Validate and show execution plan without running. Default: false |
resume |
boolean | β | Skip steps that already completed in the last run. Default: false |
Response (normal run):
{
"run_id": "seo-pipeline-20260309T082000",
"workflow": "SEO Daily Pipeline",
"status": "running",
"total_steps": 3,
"steps": {
"tech-auditor": { "status": "pending", "depends_on": [] },
"content-creator": { "status": "pending", "depends_on": ["tech-auditor"] },
"standup": { "status": "pending", "depends_on": ["tech-auditor", "content-creator"] }
},
"message": "Workflow \"SEO Daily Pipeline\" started. Use workflow_status to track progress."
}Response (dry run):
{
"dry_run": true,
"run_id": "seo-pipeline-20260309T082000",
"total_steps": 3,
"execution_waves": [
[{ "id": "tech-auditor", "timeout_s": 420, "retry": 0, "optional": false }],
[{ "id": "content-creator", "timeout_s": 600, "retry": 1, "optional": false }],
[{ "id": "standup", "timeout_s": 300, "retry": 0, "optional": true }]
],
"estimated_min_duration_s": 1320
}Check the status of a run.
Input (by run_id):
{ "run_id": "seo-pipeline-20260309T082000" }Input (by name β returns most recent):
{ "name": "seo-pipeline" }Response:
{
"run_id": "seo-pipeline-20260309T082000",
"workflow": "SEO Daily Pipeline",
"status": "running",
"started_at": "2026-03-09T08:20:00.000Z",
"completed_at": null,
"elapsed_s": 210,
"steps_ok": 1,
"steps_failed": 0,
"steps_total": 3,
"steps": {
"tech-auditor": {
"status": "ok",
"attempts": 1,
"duration_s": 195,
"error": null,
"started_at": "2026-03-09T08:20:00.000Z",
"completed_at": "2026-03-09T08:23:15.000Z"
},
"content-creator": {
"status": "running",
"attempts": 1,
"duration_s": null,
"error": null
},
"standup": {
"status": "pending",
"attempts": 0,
"duration_s": null,
"error": null
}
}
}List all available workflows and their last run status.
Input: (none required)
Response:
{
"workflows_dir": "/home/user/.openclaw/workflows",
"count": 3,
"workflows": [
{
"name": "data-pipeline",
"display_name": "Data ETL Pipeline",
"description": "Extract, Transform, Load pipeline...",
"file": "/home/user/.openclaw/workflows/data-pipeline.yml",
"last_run": {
"run_id": "data-etl-pipeline-20260308T090000",
"status": "ok",
"started_at": "2026-03-08T09:00:00.000Z",
"completed_at": "2026-03-08T09:14:22.000Z"
}
},
{
"name": "seo-pipeline",
"display_name": "SEO Daily Pipeline",
"description": "Daily SEO audit, content creation...",
"file": "/home/user/.openclaw/workflows/seo-pipeline.yml",
"last_run": null
}
]
}Cancel a running workflow. marks the run cancelled, prevents new steps from launching, and attempts to abort active worker sessions using the adapter cancellation path. Abort confirmation depends on the active OpenClaw runtime surface.
This is registered as an optional tool because it modifies persisted run state.
Input:
{ "run_id": "seo-pipeline-20260309T082000" }Response:
{
"run_id": "seo-pipeline-20260309T082000",
"status": "cancelled",
"running_steps": 1,
"abort_requested": 1,
"abort_failed": 0,
"results": [
{
"step_id": "content-creator",
"requested": true,
"confirmed": false,
"method": "gateway.chat.abort"
}
]
}Non-authoritative observability update from a running worker/step. This updates progress metadata only; it does not satisfy dependencies or complete the step.
Input:
{
"run_id": "seo-pipeline-20260309T082000",
"step_id": "content-creator",
"status": "progress",
"message": "Collected 12/30 items",
"counters": { "processed": 12 }
}Authoritative handoff request from a running worker/step. The orchestrator still validates the declared output contract (and cache freshness signature when relevant) before accepting completion.
Input:
{
"run_id": "seo-pipeline-20260309T082000",
"step_id": "content-creator",
"reason": "generated",
"message": "Outputs written and ready",
"attempt": 1,
"handoff_token": "..."
}If validation fails, the response returns structured details (missing/invalid outputs) so the worker can repair and continue.
Authoritative declared-output writer for running workers. Use this for any file listed under the step's outputs contract.
Input:
{
"run_id": "seo-pipeline-20260309T082000",
"step_id": "content-creator",
"path": "data/seo-state/cc-manifest-2026-03-09.json",
"data": [{ "slug": "post-1", "status": "ready" }]
}You may provide either:
data: structured JSON value to serializetext: raw text content
Migration note:
write_outputaccepts eitherpath(legacy) oroutput_id(preferred).- For new workflows, prefer
output_idto keep contracts path-independent.
Outputs can be declared with a logical id instead of (or in addition to) a filesystem path. Artifact-backed outputs are stored in the run artifact store and can be read, listed, and materialized via the read_output, list_outputs, and materialize_output tools.
name: My Pipeline
state:
backend: filesystem # filesystem | redis | auto | dual
materialize_outputs: on_demand # never | on_demand | always
redis:
provider: auto # auto | mcp | native
tool_prefix: MCP_DOCKER
steps:
- id: build_report
task: "Build and commit the daily report."
outputs:
# Path-only (legacy, unchanged):
- path: data/report-{date}.json
validate: report_schema
# ID-only (artifact-backed, no filesystem path):
- id: daily-summary
validate: summary_schema
# Both (artifact-backed + auto-materialized to path):
- id: alert-manifest
path: data/alerts-{date}.json
materialize:
mode: alwaysWhen a step output has only an id, workers use write_output with output_id instead of path:
{
"run_id": "my-pipeline-20260503T090000",
"step_id": "build_report",
"output_id": "daily-summary",
"data": { "items": 42, "status": "ok" }
}The artifact is stored in {runsDir}/.artifacts/{runId}/{stepId}/{outputId}.json and is accessible via read_output without materializing to disk.
Behavior:
- only allows writes to outputs declared for the current step
- reuses the existing validator layer; there are no extra YAML schema keys for writers
- rejects non-committable results such as validator
fail - allows committable non-pass results such as
blockedorretrywhen that is what the validator contract declares - persists provenance used by running-step early completion checks
If a worker manually writes a declared output instead of using write_output, the orchestrator may still validate it at final completion, but it will not trust that file for early completion while the worker session is still active.
Read one declared artifact by run_id, step_id, and output_id.
| Parameter | Type | Required | Description |
|---|---|---|---|
run_id |
string | β | Workflow run ID |
step_id |
string | β | Producing step ID |
output_id |
string | β | Declared output id |
fields |
string[] | β | Restrict which keys are returned from the artifact (projection) |
limit |
number | β | For array artifacts, return only the first N items |
List committed declared artifacts for a run.
| Parameter | Type | Required | Description |
|---|---|---|---|
run_id |
string | β | Workflow run ID |
step_id |
string | β | Filter to a specific step |
Materialize a stored artifact to the filesystem on demand.
| Parameter | Type | Required | Description |
|---|---|---|---|
run_id |
string | β | Workflow run ID |
step_id |
string | β | Producing step ID |
output_id |
string | β | Declared output id |
path |
string | β | Target filesystem path (overrides the output's declared materialize.path) |
Debug/admin tool that returns raw run state including backend resolution metadata.
| Parameter | Type | Required | Description |
|---|---|---|---|
run_id |
string | β | Workflow run ID |
include_steps |
boolean | β | Include full per-step state (default true). Set false for a compact summary. |
Cache adoption is a first-class orchestration feature and follows this order:
- Evaluate
reuse_outputs.when. - Validate declared outputs using the same validator engine as normal completion.
- Compare cached artifact signature with the current contract signature.
- Adopt only if decision is accepted and signature is fresh.
This distinguishes:
- invalid cache: fails current validators.
- stale cache: passes structure, but signature differs (task/validator/schema/config/input/contract version changed).
Cache manifests are stored under baseDir/.openclaw-workflow-cache/ and include producer run ID and signature metadata for auditability.
Each workflow run writes state to {runsDir}/{run_id}.json:
{
"run_id": "seo-pipeline-20260309T082000",
"workflow": "SEO Daily Pipeline",
"status": "ok",
"started_at": "2026-03-09T08:20:00.000Z",
"completed_at": "2026-03-09T08:47:12.000Z",
"steps": {
"tech-auditor": {
"status": "ok",
"started_at": "2026-03-09T08:20:00.000Z",
"completed_at": "2026-03-09T08:23:15.000Z",
"duration_ms": 195000,
"session_key": "agent:main:subagent:abc123",
"handoff_token": "seo-pipeline-20260309T082000:tech-auditor:attempt:1",
"output_check": {
"passed": true,
"missing_files": [],
"checked_files": ["/home/user/project/data/seo-state/ta-handoff-2026-03-09.json"]
},
"cache": {
"hit": true,
"adopted": true,
"reason": "cache_hit",
"current_contract_signature": "sha256:..."
},
"handoff": {
"requested_at": "2026-03-09T08:23:12.000Z",
"completed_at": "2026-03-09T08:23:15.000Z",
"reason": "generated"
},
"output_writes": {
"data/seo-state/ta-handoff-2026-03-09.json": {
"path": "data/seo-state/ta-handoff-2026-03-09.json",
"abs_path": "/home/user/project/data/seo-state/ta-handoff-2026-03-09.json",
"decision": "pass",
"run_id": "seo-pipeline-20260309T082000",
"step_id": "tech-auditor",
"attempt": 1,
"bytes": 824,
"sha256": "sha256:...",
"committed_at": "2026-03-09T08:23:11.000Z"
}
},
"error": null,
"attempts": 1
}
}
}Run status values: pending | running | ok | failed | blocked | cancelled
Step status values: pending | running | ok | failed | blocked | skipped
skipped: Step was never run because a non-optional dependency failedfailed: Step ran but failed (either session error or output gate failed)ok: Step ran successfully and output gate passed (or no outputs defined)
output_writes is internal provenance recorded by write_output. It is used to verify that early output-based completion came from the current attempt rather than a stale artifact or a direct manual file write.
When notifyChannel is configured, the plugin sends messages to that channel after each step:
β
Technical Auditor complete (195s)
β
Content Creator complete (462s)
β
Standup Synthesis complete (88s)
π Pipeline "SEO Daily Pipeline" complete β 3/3 steps passed
On failure:
β Content Creator failed β retrying (attempt 2/2)
β Content Creator failed after 2 attempt(s): Output gate failed β missing: data/seo-state/cc-memo-2026-03-09.md
β οΈ Standup Synthesis failed (optional β continuing pipeline)
π₯ Pipeline "SEO Daily Pipeline" failed β 1 step(s) failed, 2/3 passed
Steps execute based on their depends_on graph. Steps with no dependencies (or all dependencies satisfied) are ready and launch immediately, up to the concurrency limit.
For this workflow:
A βββ
ββββ C βββ D
B βββ
- Wave 1: A and B run in parallel
- Wave 2: C runs after both A and B finish
- Wave 3: D runs after C finishes
When a non-optional step fails, all steps that depend on it (directly or transitively) are marked skipped. This prevents false failures and makes the status clear: the step didn't fail, it was never attempted.
Workflows can iterate over a list of items using the for_each field.
How it works:
The engine resolves the list based on the format of the for_each value:
- Whole-Token References (e.g.,
{songs}):- First, it looks for the key in the current context.
- If not found or not an array, it falls back to looking for a file named
songs.jsonin the workspace.
- Path Templates (e.g.,
manifest-{date}.json):- Variables are substituted strictly. If a variable (like
{date}) is missing or invalid, the workflow fails immediately. - The resulting path is then resolved as a file.
- Variables are substituted strictly. If a variable (like
- Explicit Paths (e.g.,
songs.txt):- Resolved directly as a file.
- Artifact References (e.g.,
{ from_step: build_alert_execution_manifest, output: alerts_execution_manifest }):
-
Reads the declared artifact directly from the run artifact store.
-
Supports list payloads (
dataarray) and single payloads (wrapped as one-item list). -
If the artifact is missing, loop expansion fails the loop step (
failed), including optional loops. -
Expansion: A loop step is expanded into multiple unique step instances, one for each item in the list. An instance ID follows the pattern
loop_id:index:inner_step_id. -
Dynamic Tasks: Use the
{item}variable intaskoroutputsfields to refer to the current iteration's value. -
Dependency Resolution: If a step depends on a loop step, it will wait for all instances of the last step in that loop to complete before starting.
-
List Parsing: The loop resolves the
for_eachvalue using aparser:json(default/auto for.json): Parses a JSON array.csv(auto for.csv): Splits content by commas.newline(auto for.txt): Treats each line as a separate item.
-
Example:
steps: - id: find_songs name: "Find Songs" task: "Find today's liked songs and write them to songs.txt (one per line)" outputs: ["songs.txt"] - id: process_songs name: "Process Songs" depends_on: [find_songs] for_each: "songs.txt" parser: "newline" steps: - id: transcribe task: "Transcribe {item}..."
-
Artifact-backed example:
- id: process_alerts depends_on: [build_alert_execution_manifest] for_each: from_step: build_alert_execution_manifest output: alerts_execution_manifest parser: json steps: - id: notify task: "Send notification for {item.alert_key}"
-
Example with Validation: Use
item_schemato ensure the resolved list contains the expected data before expanding the loop:- id: process_alerts for_each: "data/alerts/alerts-execution-manifest-{date}.json" parser: "json" item_schema: type: object required: [alert_key] properties: alert_key: type: string pattern: "^alert_[a-z0-9_:-]+$" steps: - id: notify task: "Send notification for {item.alert_key}"
workflow_run({ name: "...", resume: true }) loads the most recent run, finds all steps with status: "ok", marks them as already-completed in the new run, and only executes the rest. Use this to recover from partial failures without re-doing expensive work.
Three sequential agents: Technical Auditor β Content Creator β Standup Synthesis. The Content Creator only runs if the Auditor wrote its handoff file. Standup is optional.
Four-stage gate: test β build β deploy β smoke-test. Uses concurrency: 1 to enforce strict sequencing. Deploy has retry: 1 for flaky network situations.
Parallel fetch (primary + reference), then validate, transform, load, report. Demonstrates parallel steps fanning in to a single gate step, with the optional reporting stage at the end.
Iterates over a list of audio files. For each file, it transcribes the audio and then generates a summary. Demonstrates loop expansion and the use of {item}.
Demonstrates how to use per-step concurrency limits to avoid rate-limiting on specific tasks while maintaining high global parallelism.
npm install
npm run typecheck
npm run build
npm test
npm run checkTests use Node.js built-in node:test and mock step runners. They do not require a real OpenClaw CLI install unless you are doing the optional local plugin install smoke test.
- Workflows cause agents to perform real work. Review workflow YAML/JSON before enabling
workflow_run. - Relative output paths are resolved under
baseDir; set it to the workspace you expect agents to write into. - Each step can set its own
modelandtimeout; otherwise the plugin-levelsessionModeland default timeout are used. - If no stable native session runtime exists in the plugin API,
CliAdapterfalls back to theopenclawCLI and requires it inPATH. - The CLI fallback uses argument arrays and preserves the Windows
openclaw.ps1wrapper case via PowerShell-File; workflow prompts are not shell-interpolated.
-
Plugin API shape: The entrypoint uses
definePluginEntryfromopenclaw/plugin-sdk/plugin-entryand registers tools fromregister(api). -
Native sessions:
src/step-runner.tsprefers the modernapi.runtime.subagentAPI (viaRuntimeSubagentAdapter), with fallbacks to a legacyapi.sessionssurface and finally the OpenClaw CLI. -
Notifications: The plugin uses
api.notifications.sendif available and otherwise writes progress throughapi.logger.
The plugin is designed to use the api.runtime.subagent surface for isolated background runs:
interface PluginApi {
// Already present:
registerTool(tool: ToolDefinition): void;
pluginConfig: Record<string, unknown>;
// Preferred surface:
runtime: {
subagent: {
run(args: {
sessionKey: string;
message: string;
provider?: string;
model?: string;
deliver: boolean
}): Promise<{ runId: string }>;
waitForRun(args: {
runId: string;
timeoutMs: number
}): Promise<{ status: string; logs?: string; error?: string }>;
};
};
}| File | Purpose |
|---|---|
src/index.ts |
Plugin entry: registers workflow tools (workflow_run, workflow_status, workflow_list, workflow_cancel, workflow_step_update, workflow_step_complete) |
src/config.ts |
Plugin configuration normalization |
src/workflow-loader.ts |
YAML/JSON parsing, validation, cycle detection |
src/workflow-executor.ts |
Core execution engine: scheduling, deps, retry, resume, dry run |
src/workflow-state.ts |
Atomic state file R/W, run listing |
src/state-artifact-stores.ts |
Filesystem-backed WorkflowStateStore / WorkflowArtifactStore implementations + resolveStateBackend() resolver |
src/state-contract-projector.ts |
Semantic state contract projector that maps validated artifacts to runtime Redis/state views |
src/step-runner.ts |
Session lifecycle: spawn, poll, output check. Includes MockAdapter |
src/output-checker.ts |
File existence validation for output gates |
src/output-validator.ts |
Advanced output content validation |
src/sealed-policy.ts |
Sealed-mode policy normalization and defaults |
src/sealed-spool.ts |
Sealed data-plane spooling and compact envelope helpers |
src/sealed-command-runner.ts |
kind: sealed command-mode bounded execution |
src/sealed-step-runner.ts |
Sealed step dispatcher (command vs worker paths) |
src/return-contract.ts |
AJV-based validation for sealed control-plane returns |
src/step-contract.ts |
Shared contract validation, cache signature freshness, and cache manifest I/O |
src/variable-substitution.ts |
{date}, {datetime}, {utc_date}, {utc_datetime}, {run_id} substitution |
src/list-resolver.ts |
Resolves for_each sources (JSON, CSV, Newline) |
src/template-schema-validator.ts |
Validates variable templates in workflow definitions |
src/tool-schemas.ts |
Tool parameter definitions for OpenClaw SDK |
src/types.ts |
Shared TypeScript interfaces and types |
openclaw.plugin.json |
Plugin manifest + config schema |
package.json |
Package metadata |
tests/*.test.js |
Full test suite (Node built-in test runner) |
tests/fixtures/*.yml |
Fixture workflows for tests |
examples/*.yml |
SEO, deploy, and ETL example pipelines |
- Fork the openclaw/openclaw repository
- Copy this plugin to
plugins/openclaw-workflow/ - Run
npm install && npm run checkto verify tests pass - Submit a PR with the title:
feat: add openclaw-workflow orchestration plugin
Please include test coverage for any new features or bug fixes.
