diff --git a/INSTRUCTION.md b/INSTRUCTION.md index 6da887e..6084f7a 100644 --- a/INSTRUCTION.md +++ b/INSTRUCTION.md @@ -13,6 +13,7 @@ go-wf — a Go library providing a generic workflow orchestration core with Dock **Repository Type:** Library (Go module) **Module:** `github.com/jasoet/go-wf` **Key Dependencies:** Temporal SDK, testcontainers-go, pkg/v2, validator/v10, aws-sdk-go-v2 +**Convergence Type:** All builders return `*job.Definition` from `github.com/jasoet/pkg/v2/temporal/job`. This is the single type for registration, execution, scheduling, and lifecycle management across all workflow kinds. ## ABSOLUTE RULE — Git Authorship @@ -59,7 +60,9 @@ attribute commits to AI. This applies to ALL commits, including those made by to | `function/workflow/` | Workflow implementations (function, pipeline, parallel, loop, DAG) | | `datasync/` | Generic data sync core (Source, Sink, Mapper, Job, Runner) | | `datasync/activity/` | SyncData activity with OTel instrumentation | -| `datasync/builder/` | Fluent builder for Job construction | +| `datasync/builder/` | Fluent builder for Job construction — returns `*job.Definition` | +| `datasync/chunk/` | Partitioned/chunked sync builder (ChunkedSync, DateChunkedSync) — returns `*job.Definition` | +| `datasync/internal/heartbeat/` | Shared heartbeat helpers used by `datasync/activity` and `datasync/chunk` | | `datasync/payload/` | Temporal payload types (SyncExecutionInput/Output) | | `datasync/workflow/` | Sync workflow function, registration, scheduling helpers | | `workflow/otel.go` | Instrumented workflow orchestration wrappers | @@ -87,6 +90,7 @@ attribute commits to AI. This applies to ALL commits, including those made by to | `docs/contributing.md` | Contributing guide | | `INSTRUCTION.md` | AI context (this file) | | `README.md` | Human documentation | +| `github.com/jasoet/pkg/v2/temporal/job` | External package — `*job.Definition`, `job.Registry`, `job.ScheduleSpec`. All go-wf builders return `*job.Definition` from this package. | ## Taskfile Commands @@ -144,8 +148,8 @@ Multi-layer architecture organized as package-per-feature: **Container Module (`container/`)** — concrete implementation - **Activities** wrap `github.com/jasoet/pkg/v2/docker` for container execution - **Payloads** implement `TaskInput`/`TaskOutput` interfaces with validated structs (`go-playground/validator`) -- **Workflows** register with Temporal workers via `container.RegisterAll(w)`, using generic core for orchestration -- **Builder** provides a fluent API to compose container → pipeline → parallel → DAG; generic builder alternative available +- **Workflows** register with Temporal workers via `container.RegisterAll(w)` (idempotent), using generic core for orchestration +- **Builder** (`container/builder.WorkflowBuilder`) provides a fluent API: `.Name()`, `.Pipeline()/.Parallel()/.Single()`, `.Add()`, `.Build() (*job.Definition, error)`. Default `TaskQueue`: `"container-"`. - **Templates** (container, script, HTTP) generate payload structs from higher-level config - **Patterns** are pre-built workflow compositions (CI/CD pipelines, fan-out/fan-in, etc.) @@ -153,8 +157,8 @@ Multi-layer architecture organized as package-per-feature: - **Registry** maps named Go handler functions (`func(ctx, FunctionInput) (*FunctionOutput, error)`) for dispatch - **Activity** dispatches to registered handlers via closure over the registry - **Payloads** implement `TaskInput`/`TaskOutput` interfaces with validated structs -- **Workflows** register with Temporal workers, using generic core for orchestration (pipeline, parallel, loop, DAG) -- **Builder** provides a fluent API to compose function → pipeline → parallel → loop → DAG; generic builder alternative available +- **Workflows** register with Temporal workers via `function.RegisterAll(w, activityFn)` (idempotent), using generic core for orchestration (pipeline, parallel, loop, DAG) +- **Builder** (`function/builder.WorkflowBuilder`) provides a fluent API: `.Name()`, `.TaskQueue()`, `.Activity(fn)`, `.Pipeline()/.Parallel()/.Single()`, `.Build() (*job.Definition, error)`. Default `TaskQueue`: `"function-"`. Loop/parameterized-loop variants available via `LoopBuilder`. - **Patterns** are pre-built workflow compositions (ETL, fan-out/fan-in, batch processing, CI/CD DAG, etc.) **DataSync Module (`datasync/`)** — concrete implementation @@ -164,8 +168,10 @@ Multi-layer architecture organized as package-per-feature: - **Runner** provides in-process test execution (no Temporal needed) - **Activity** runs the Source→Mapper→Sink pipeline with full OTel instrumentation (`go_wf.datasync.*` metrics) - **Workflow** wraps the activity in a Temporal workflow with scheduling support -- **Builder** provides a fluent API for Job construction +- **Builder** (`datasync/builder.SyncJobBuilder`) provides a fluent API; `Build()` returns `(*job.Definition, error)` +- **Chunk** (`datasync/chunk.ChunkedSync`) partitioned-sync builder for large datasets; walks partitions with cursor-based resume and `ContinueAsNew`; schedule via `.ScheduleEvery()`, `.ScheduleCron()`, or `.ScheduleRaw()`; `Build()` returns `(*job.Definition, error)` - **Payloads** (`SyncExecutionInput`/`SyncExecutionOutput`) implement `TaskInput`/`TaskOutput` for composition with Pipeline, Parallel, and DAG +- **Internal heartbeat** (`datasync/internal/heartbeat`) shared helpers ensuring consistent heartbeat behavior across `datasync/activity` and `datasync/chunk` **Observability (`jasoet/pkg/v2/otel`)** - Activities get full OTel spans + metrics via `Layers.StartService` (container: `go_wf.container.task.*`, function: `go_wf.function.task.*`, datasync: `go_wf.datasync.*`) diff --git a/README.md b/README.md index c037a0f..6483752 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,51 @@ Temporal workflow library providing reusable, production-ready workflows for com - **Container Workflows** - Execute containers (Podman/Docker) with Temporal orchestration - **Function Workflows** - Execute registered Go functions with Temporal orchestration -- **DataSync Workflows** - Generic Source/Mapper/Sink data synchronization pipelines +- **DataSync Workflows** - Generic Source/Mapper/Sink data synchronization pipelines, including partitioned/chunked sync via `datasync/chunk` - **Type-Safe Payloads** - Validated input/output structures +- **Unified Builder API** - All builders produce `*job.Definition` (from `github.com/jasoet/pkg/v2/temporal/job`), providing a single type for registration, execution, scheduling, and lifecycle management - **Production-Ready** - Built-in retries, timeouts, error handling - **Observable** - Built-in OpenTelemetry instrumentation (traces, logs, metrics) with zero overhead when disabled - **Comprehensive Testing** - 85%+ coverage with integration tests - **Full CI/CD** - Automated releases and quality checks +## job.Definition — Unified Workflow Handle + +Every builder in go-wf (`container/builder`, `function/builder`, `datasync/builder`, `datasync/chunk`) returns a `*job.Definition` from its `Build()` method. A `*job.Definition` is the single type you need for all per-job operations: + +```go +import "github.com/jasoet/pkg/v2/temporal/job" + +def, err := container.NewWorkflowBuilder().Name("deploy").Single().Add(myContainer).Build() +if err != nil { log.Fatal(err) } + +// Register workflows + activities on a worker (idempotent) +def.Register(w) + +// Start a workflow run +run, err := def.Execute(ctx, c, def.NewInput()) + +// Manage lifecycle +def.Cancel(ctx, c, wfID, runID, "reason") +def.Describe(ctx, c, wfID, runID) +def.ListRuns(ctx, c, opts) + +// Schedule management +def.ApplySchedule(ctx, c, opts) +def.PauseSchedule(ctx, c, scheduleID, "reason") +``` + +A `job.Registry` aggregates multiple Definitions, allowing you to register all jobs at once and dispatch by name: + +```go +registry := job.NewRegistry() +registry.Add(ordersJobDef) +registry.Add(usersJobDef) +registry.RegisterAll(w) // registers every Definition on the worker + +registry.MustGet("orders").Execute(ctx, c, input) +``` + ## Documentation | Guide | Description | @@ -49,6 +87,10 @@ Temporal workflows for executing registered Go functions: function registry, pip Generic data synchronization workflows using a `Source[T] -> Mapper[T,U] -> Sink[U]` pipeline with helpers for record mapping, deduplication, and Temporal scheduling. See [DataSync Workflows](./docs/datasync-workflows.md) for details. +### [datasync/chunk](./datasync/chunk/) + +Partitioned sync builder for large datasets. `ChunkedSync` walks a sequence of partitions (e.g., date ranges), runs fetch/map/write per partition with cursor-based resume and `ContinueAsNew` for long partition lists. See [DataSync Workflows](./docs/datasync-workflows.md) for details. + ## Observability Built-in OpenTelemetry instrumentation (traces, logs, metrics) with zero overhead when disabled. See [Observability](./docs/observability.md) for details. @@ -71,56 +113,57 @@ import ( "log" "github.com/jasoet/go-wf/container" + "github.com/jasoet/go-wf/container/builder" "github.com/jasoet/go-wf/container/payload" - "github.com/jasoet/go-wf/container/workflow" "github.com/jasoet/pkg/v2/temporal" - "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) func main() { // Create Temporal client - c, closer, err := temporal.NewClient(temporal.DefaultConfig()) + c, err := temporal.NewClient(temporal.DefaultConfig()) if err != nil { log.Fatal(err) } defer c.Close() - if closer != nil { - defer closer.Close() - } - // Create and start worker - w := worker.New(c, "container-tasks", worker.Options{}) - container.RegisterAll(w) + // Build a job Definition (recommended pattern) + def, err := builder.NewWorkflowBuilder(). + Name("postgres-setup"). + Single(). + AddInput(payload.ContainerExecutionInput{ + Image: "postgres:16-alpine", + Env: map[string]string{ + "POSTGRES_PASSWORD": "test", + }, + Ports: []string{"5432:5432"}, + AutoRemove: true, + }). + Build() + if err != nil { + log.Fatal(err) + } - go w.Run(nil) + // Register and start worker + w := worker.New(c, def.TaskQueue, worker.Options{}) + def.Register(w) + go func() { _ = w.Run(worker.InterruptCh()) }() defer w.Stop() // Execute workflow - input := payload.ContainerExecutionInput{ - Image: "postgres:16-alpine", - Env: map[string]string{ - "POSTGRES_PASSWORD": "test", - }, - Ports: []string{"5432:5432"}, - AutoRemove: true, + run, err := def.Execute(context.Background(), c, def.NewInput()) + if err != nil { + log.Fatal(err) } - we, _ := c.ExecuteWorkflow(context.Background(), - client.StartWorkflowOptions{ - ID: "postgres-setup", - TaskQueue: "container-tasks", - }, - workflow.ExecuteContainerWorkflow, - input, - ) - var result payload.ContainerExecutionOutput - we.Get(context.Background(), &result) + _ = run.Get(context.Background(), &result) log.Printf("Container executed: %s", result.ContainerID) } ``` +For callers that register workflows manually, `container.RegisterAll(w)` is also available (idempotent — safe to call multiple times). + ### Function Workflow ```go @@ -132,57 +175,57 @@ import ( fn "github.com/jasoet/go-wf/function" fnactivity "github.com/jasoet/go-wf/function/activity" + fnbuilder "github.com/jasoet/go-wf/function/builder" "github.com/jasoet/go-wf/function/payload" - "github.com/jasoet/go-wf/function/workflow" "github.com/jasoet/pkg/v2/temporal" - "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) func main() { // Create Temporal client - c, closer, err := temporal.NewClient(temporal.DefaultConfig()) + c, err := temporal.NewClient(temporal.DefaultConfig()) if err != nil { log.Fatal(err) } defer c.Close() - if closer != nil { - defer closer.Close() - } - // Create function registry and register handlers + // Register a handler registry := fn.NewRegistry() - registry.Register("greet", func(ctx context.Context, input fn.FunctionInput) (*fn.FunctionOutput, error) { + _ = registry.Register("greet", func(ctx context.Context, input fn.FunctionInput) (*fn.FunctionOutput, error) { return &fn.FunctionOutput{ Result: map[string]string{"greeting": "Hello, " + input.Args["name"] + "!"}, }, nil }) - // Create and start worker - w := worker.New(c, "function-tasks", worker.Options{}) - fn.RegisterWorkflows(w) - fn.RegisterActivity(w, fnactivity.NewExecuteFunctionActivity(registry)) + // Build a job Definition + def, err := fnbuilder.NewWorkflowBuilder[*payload.FunctionExecutionInput, payload.FunctionExecutionOutput](). + Name("greet-job"). + Single(). + Activity(fnactivity.NewExecuteFunctionActivity(registry)). + Build() + if err != nil { + log.Fatal(err) + } - go w.Run(nil) + // Register and start worker + w := worker.New(c, def.TaskQueue, worker.Options{}) + def.Register(w) + go func() { _ = w.Run(worker.InterruptCh()) }() defer w.Stop() // Execute workflow - input := payload.FunctionExecutionInput{ - Name: "greet", - Args: map[string]string{"name": "Temporal"}, - } - - we, _ := c.ExecuteWorkflow(context.Background(), - client.StartWorkflowOptions{ - ID: "greet-example", - TaskQueue: "function-tasks", + run, err := def.Execute(context.Background(), c, + &payload.FunctionExecutionInput{ + Name: "greet", + Args: map[string]string{"name": "Temporal"}, }, - workflow.ExecuteFunctionWorkflow, - input, ) + if err != nil { + log.Fatal(err) + } var result payload.FunctionExecutionOutput - we.Get(context.Background(), &result) + _ = run.Get(context.Background(), &result) log.Printf("Result: %v", result.Result) } ``` @@ -209,6 +252,9 @@ go-wf/ ├── datasync/ # Generic data sync (Source → Mapper → Sink) │ ├── activity/ # SyncData activity with OTel │ ├── builder/ # Fluent builder for Job construction +│ ├── chunk/ # Partitioned/chunked sync builder (ChunkedSync, DateChunkedSync) +│ ├── internal/ +│ │ └── heartbeat/ # Shared heartbeat helpers (used by activity and chunk) │ ├── payload/ # Temporal payload types │ └── workflow/ # Workflow function and registration ├── examples/container/ # Container examples (see [README](./examples/container/README.md)) diff --git a/container/README.md b/container/README.md index e6eb05f..a1a4a1e 100644 --- a/container/README.md +++ b/container/README.md @@ -20,12 +20,40 @@ Temporal workflows for executing Docker containers with advanced orchestration p ## Quick Example +### Builder path (preferred) + +```go +def, err := builder.NewWorkflowBuilder(). + Name("pg-start"). + Single(). + AddInput(payload.ContainerExecutionInput{ + Image: "postgres:16-alpine", + Env: map[string]string{"POSTGRES_PASSWORD": "test"}, + Ports: []string{"5432:5432"}, + WaitStrategy: payload.WaitStrategyConfig{ + Type: "log", + LogMessage: "ready to accept connections", + }, + AutoRemove: true, + }). + Build() + +// def is a *job.Definition — register it with a worker and execute it. +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) +run, err := def.Execute(ctx, c, def.NewInput()) +``` + +See [docs/job-definition.md](../docs/job-definition.md) for the full `*job.Definition` API (lifecycle, scheduling, registry). + +### Low-level path + ```go -input := container.ContainerExecutionInput{ +input := payload.ContainerExecutionInput{ Image: "postgres:16-alpine", Env: map[string]string{"POSTGRES_PASSWORD": "test"}, Ports: []string{"5432:5432"}, - WaitStrategy: container.WaitStrategyConfig{ + WaitStrategy: payload.WaitStrategyConfig{ Type: "log", LogMessage: "ready to accept connections", }, @@ -33,7 +61,7 @@ input := container.ContainerExecutionInput{ } we, _ := c.ExecuteWorkflow(ctx, - client.StartWorkflowOptions{ID: "pg", TaskQueue: "container-tasks"}, + client.StartWorkflowOptions{ID: "pg", TaskQueue: "container-pg-start"}, container.ExecuteContainerWorkflow, input) ``` diff --git a/datasync/README.md b/datasync/README.md index 234ed19..6fea9fc 100644 --- a/datasync/README.md +++ b/datasync/README.md @@ -5,10 +5,11 @@ Generic data synchronization workflows for Temporal. Provides a type-safe `Sourc ## Key Features - **Type-safe pipeline** — `Source[T]`, `Mapper[T,U]`, `Sink[U]` interfaces -- **Fluent builder** — construct sync jobs with `SyncJobBuilder` +- **Fluent builder** — construct sync jobs with `SyncJobBuilder`; `Build()` returns `*job.Definition` - **Built-in helpers** — `RecordMapper`, `InsertIfAbsentSink`, `IdentityMapper` - **Composable** — implements `workflow.TaskInput`/`TaskOutput` for use with Pipeline, Parallel, and DAG orchestration - **Scheduled execution** — run sync jobs on a recurring interval +- **Partitioned sync** — `datasync/chunk/` for cursor-based, date-chunked sync workflows with progress tracking - **OTel instrumented** — full observability out of the box ## Documentation @@ -17,6 +18,7 @@ Generic data synchronization workflows for Temporal. Provides a type-safe `Sourc - [Architecture](../docs/architecture.md) — how this package fits in the overall system - [Workflow Patterns](../docs/workflow-patterns.md) — orchestration patterns - [Getting Started](../docs/getting-started.md) — quick start guide +- [Job Definition API](../docs/job-definition.md) — `*job.Definition` lifecycle, scheduling, and registry ## Quick Example @@ -25,10 +27,35 @@ source := mySource{} mapper := datasync.NewRecordMapper[Raw, Entity]("convert", convertFn) sink := datasync.NewInsertIfAbsentSink[Entity, string]("db", getID, find, create) -job, _ := builder.NewSyncJobBuilder[Raw, Entity]("my-sync"). +def, err := builder.NewSyncJobBuilder[Raw, Entity]("my-sync"). WithSource(source). WithMapper(mapper). WithSink(sink). WithSchedule(5 * time.Minute). Build() + +// def is a *job.Definition +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) +run, err := def.Execute(ctx, c, def.NewInput()) ``` + +See [docs/job-definition.md](../docs/job-definition.md) for the full `*job.Definition` API. + +## Partitioned Sync (`datasync/chunk/`) + +For large datasets that must be processed in date or key-range partitions, use `datasync/chunk/`. It provides `ChunkedSync[In, Out, K]` and `NewDateChunkedSync` — a builder that walks partitions with cursor-based resume, optional rate-limit retry, and ContinueAsNew for large partition lists. + +```go +def, err := chunk.NewDateChunkedSync[Order, Order]("order-sync"). + LookBack(72 * time.Hour). + ChunkSize(24 * time.Hour). + Timezone(time.UTC). + Fetcher(fetcher). + Mapper(datasync.IdentityMapper[Order]()). + Sink(&OrderSink{}). + ScheduleCron("0 2 * * *"). + Build() +``` + +Schedule methods: `.ScheduleEvery(d)`, `.ScheduleCron(expr)`, `.ScheduleRaw(spec)`. See [examples/datasync/chunk_basic.go](../examples/datasync/chunk_basic.go) for a runnable example. diff --git a/docs/architecture.md b/docs/architecture.md index e2010aa..5207490 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -9,26 +9,38 @@ library and for AI agents navigating the codebase. go-wf is organized in four layers. Each layer depends only on layers below it. ``` -┌─────────────────────────────────────────────────────────┐ -│ Layer 4 — Workers & Operations │ -│ Worker registration (RegisterAll), workflow submit, │ -│ cancel, watch, schedule │ -├─────────────────────────────────────────────────────────┤ -│ Layer 3 — Builder / Pattern APIs │ -│ Fluent builders, pre-built patterns (CI/CD, ETL, etc.) │ -│ container/builder function/builder datasync/builder │ -│ container/patterns function/patterns │ -├─────────────────────────────────────────────────────────┤ -│ Layer 2 — Concrete Implementations │ -│ container/ function/ datasync/ │ -│ Payloads, activities, workflow wrappers │ -├─────────────────────────────────────────────────────────┤ -│ Layer 1 — Generic Workflow Core (workflow/) │ -│ TaskInput/TaskOutput constraints, orchestration logic, │ -│ Store, Artifacts, Errors, OTel wrappers │ -└─────────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────────┐ +│ Layer 4 — Workers & Operations │ +│ def.Register(w), def.Execute(ctx, c, input), def.Cancel(...) │ +│ job.Registry — aggregate Definitions, RegisterAll, MustGet │ +│ All via *job.Definition (github.com/jasoet/pkg/v2/temporal/job)│ +├─────────────────────────────────────────────────────────────────┤ +│ Layer 3 — Builder / Pattern APIs │ +│ Fluent builders — all Build() → (*job.Definition, error) │ +│ container/builder function/builder │ +│ datasync/builder datasync/chunk │ +│ container/patterns function/patterns │ +├─────────────────────────────────────────────────────────────────┤ +│ Layer 2 — Concrete Implementations │ +│ container/ function/ datasync/ │ +│ Payloads, activities, workflow wrappers │ +├─────────────────────────────────────────────────────────────────┤ +│ Layer 1 — Generic Workflow Core (workflow/) │ +│ TaskInput/TaskOutput constraints, orchestration logic, │ +│ Store, Artifacts, Errors, OTel wrappers │ +└─────────────────────────────────────────────────────────────────┘ ``` +### pkg/v2/temporal/job — Shared Abstraction + +All go-wf builders converge on `*job.Definition` from `github.com/jasoet/pkg/v2/temporal/job`. This external package (in the `pkg/v2` base library) provides: + +- **`*job.Definition`** — the single handle for a named workflow job. Key methods: `Register(w)`, `Execute(ctx, c, input, opts...)`, `NewInput()`, `Describe`, `History`, `Cancel`, `Terminate`, `Signal`, `Query`, `ListRuns`, `Stats`, `ApplySchedule`, `PauseSchedule`, `ResumeSchedule`, `TriggerSchedule`, `DeleteSchedule`, `DescribeSchedule`. +- **`job.Registry`** — aggregates Definitions by name. `Add`, `Get`, `MustGet`, `List`, `Names`, `RegisterAll`, `ApplySchedules`. +- **`job.ScheduleSpec`** — portable schedule descriptor (interval, cron, calendar, jitter, overlap policy). + +See [docs/job-definition.md](job-definition.md) for the full API reference. + ## Package Relationship Map ``` @@ -55,9 +67,12 @@ function/ ← Concrete: Go function dispatch datasync/ ← Concrete: Source→Mapper→Sink ├── activity/ ← SyncData activity + OTel +├── builder/ ← Fluent Job builder → *job.Definition +├── chunk/ ← Partitioned/chunked sync → *job.Definition +├── internal/ +│ └── heartbeat/ ← Shared heartbeat helpers (activity + chunk) ├── payload/ ← SyncExecutionInput/Output -├── workflow/ ← Sync workflow + scheduling -└── builder/ ← Fluent Job builder +└── workflow/ ← Sync workflow + scheduling ``` **Dependency flow (imports go downward only):** @@ -67,8 +82,11 @@ container/builder ──→ container/workflow ──→ workflow/ container/workflow ──→ container/activity ──→ container/payload function/builder ──→ function/workflow ──→ workflow/ function/workflow ──→ function/activity ──→ function/payload -datasync/workflow ──→ datasync/activity ──→ datasync/ (core interfaces) +datasync/builder ──→ datasync/workflow ──→ datasync/activity ──→ datasync/ +datasync/chunk ──→ datasync/internal/heartbeat +datasync/chunk ──→ datasync/ (core interfaces) All payloads ──→ workflow/ (satisfy TaskInput/TaskOutput) +All builders ──→ github.com/jasoet/pkg/v2/temporal/job (*job.Definition) ``` ## Data Flow: Workflow Execution @@ -335,12 +353,23 @@ Each module registers these wrapper functions as Temporal workflows. ### Workers -Worker registration follows a consistent pattern across modules: +The recommended pattern is to use a `*job.Definition` returned by a builder. `def.Register(w)` +is idempotent — calling it multiple times on the same worker is safe: + +```go +def, err := builder.NewWorkflowBuilder().Name("deploy").Single().AddInput(input).Build() +// ... +w := worker.New(client, def.TaskQueue, worker.Options{}) +def.Register(w) // registers workflows + activities for this Definition +``` + +For callers that register manually without a builder, the package-level helpers are still +available (also idempotent): **Container:** ```go w := worker.New(client, "container-tasks", worker.Options{}) -container.RegisterAll(w) // registers all workflows + activities +container.RegisterAll(w) // registers all container workflows + activities ``` **Function:** @@ -402,10 +431,28 @@ A `Job[T, U]` bundles Source + Mapper + Sink with scheduling and retry configura `Runner[T, U]` executes the pipeline in-process (useful for testing), while the Temporal activity wraps the same pipeline with full observability. +Both `datasync/builder.SyncJobBuilder` and `datasync/chunk.ChunkedSync` return `(*job.Definition, error)` +from their `Build()` methods. There is no separate `FullJobRegistration` type — `*job.Definition` +handles registration, execution, and scheduling uniformly. + DataSync payloads (`SyncExecutionInput`/`SyncExecutionOutput`) also implement `TaskInput`/`TaskOutput`, so sync jobs can be composed into Pipeline, Parallel, and DAG orchestrations alongside container and function tasks. +### datasync/chunk — Partitioned Sync + +`ChunkedSync[In, Out, K]` extends the basic datasync pattern for large datasets by walking an +ordered sequence of `Partition[K]` values. Each partition is processed as a separate activity +invocation. Key features: + +- **Cursor-based resume** — with `WithTracker(t)`, a `ProgressTracker` persists the last + completed partition key. On the next execution, already-processed partitions are skipped. +- **ContinueAsNew** — `MaxPartitionsPerExecution(n)` caps history growth; the workflow + continues from the cursor position in a fresh execution. +- **Schedule API** — `.ScheduleEvery(d)`, `.ScheduleCron(expr)`, `.ScheduleRaw(spec)`. +- **Heartbeat** — shared helpers in `datasync/internal/heartbeat` keep the activity heartbeat + alive consistently across both plain datasync and chunked workflows. + ## Observability All three modules support OpenTelemetry instrumentation: diff --git a/docs/chunk-workflows.md b/docs/chunk-workflows.md new file mode 100644 index 0000000..44771ee --- /dev/null +++ b/docs/chunk-workflows.md @@ -0,0 +1,323 @@ +# Chunked Sync Workflows + +The `datasync/chunk` package provides Temporal-backed partitioned-sync workflows on top of the datasync primitives (`Mapper`, `Sink`, and `job.Definition`). A `ChunkedSync` walks a list of `Partition[K]` in order, running fetch → map → write per partition. Optionally, a `ProgressTracker[K]` persists progress so long-running syncs resume after failure rather than restart from the beginning of the range. + +For time-based partitioning, use `DateChunkedSync` — a thin wrapper that converts `time.Time` keys to `int64` Unix-nanosecond keys internally, because `cmp.Ordered` does not include `time.Time`. + +## When to use + +- Daily or hourly batches over a date range (e.g., 7 days of orders, one workflow execution per day partition) +- Resumable historical backfills where progress must survive worker restarts +- Long-running syncs that need to bound Temporal history length via `MaxPartitionsPerExecution` + `ContinueAsNew` + +Use the simpler `SyncJobBuilder` (see [DataSync Workflows](datasync-workflows.md)) when a single fetch-map-write cycle is sufficient and partitioning is not required. + +## Quick Example + +The following example is adapted from `examples/datasync/chunk_basic.go`: + +```go +import ( + "context" + "time" + + "github.com/jasoet/pkg/v2/temporal" + "github.com/jasoet/go-wf/datasync" + "github.com/jasoet/go-wf/datasync/chunk" + "github.com/jasoet/go-wf/datasync/payload" + "go.temporal.io/sdk/worker" +) + +// TimeFetcher is a function with the signature func(ctx, start, end time.Time) ([]T, error). +var fetcher chunk.TimeFetcher[Order] = func(ctx context.Context, start, end time.Time) ([]Order, error) { + // query your data store for records in [start, end) + return fetchOrdersFromDB(ctx, start, end) +} + +def, err := chunk.NewDateChunkedSync[Order, Order]("order-sync"). + LookBack(7 * 24 * time.Hour). // rolling 7-day window + ChunkSize(24 * time.Hour). // one partition per calendar day + Timezone(time.UTC). + Fetcher(fetcher). + Mapper(datasync.IdentityMapper[Order]()). + Sink(&OrderSink{}). + ScheduleEvery(15 * time.Minute). + MaxPartitionsPerExecution(50). + Build() +if err != nil { + log.Fatal(err) +} + +c, err := temporal.NewClient(temporal.DefaultConfig()) +if err != nil { + log.Fatal(err) +} +defer c.Close() + +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) +go w.Run(worker.InterruptCh()) + +// Execute manually (the schedule handles recurring runs automatically): +input := def.NewInput().(*payload.SyncExecutionInput) +input.JobName = def.Name +run, err := def.Execute(context.Background(), c, input) +``` + +Inspect the result: + +```go +var result chunk.SyncResult[int64] +if err := run.Get(ctx, &result); err != nil { + log.Fatal(err) +} +fmt.Printf("Partitions: %d, Fetched: %d, Inserted: %d\n", + result.TotalPartitions, result.TotalFetched, result.TotalInserted) +for _, p := range result.Partitions { + start := chunk.KeyToTime(p.Start).Format("2006-01-02 15:04") + end := chunk.KeyToTime(p.End).Format("2006-01-02 15:04") + fmt.Printf(" %s..%s fetched=%d inserted=%d\n", start, end, p.Fetched, p.Inserted) +} +``` + +## Core API + +### ChunkedSync and DateChunkedSync + +`chunk.NewChunkedSync[In, Out, K]` is the generic builder. `K` must satisfy `cmp.Ordered` +(e.g., `int64`, `string`). + +`chunk.NewDateChunkedSync[In, Out]` is a `time.Time`-keyed wrapper that projects onto +`int64` Unix-nanosecond keys internally. Prefer this for calendar-based workloads. + +Both builders produce a `*job.Definition` via `.Build()`. + +### DateChunkedSync builder methods + +| Method | Description | +|---|---| +| `LookBack(time.Duration)` | Total window size from now | +| `ChunkSize(time.Duration)` | Size of each partition; window start is aligned to midnight in the configured timezone | +| `Timezone(*time.Location)` | Midnight-alignment timezone (default: UTC) | +| `Fetcher(TimeFetcher[In])` | Data-fetching function for date-range sync | +| `Mapper(datasync.Mapper[In, Out])` | Standard datasync `Mapper` | +| `Sink(datasync.Sink[Out])` | Standard datasync `Sink` | +| `WithTracker(TimeProgressTracker)` | Storage-backed cursor for resumable sync | +| `ScheduleEvery(time.Duration)` | Fixed-interval Temporal schedule | +| `ScheduleCron(string)` | Cron-expression Temporal schedule | +| `ScheduleRaw(*job.ScheduleSpec)` | Full schedule spec (overlap, jitter, calendar) | +| `MaxPartitionsPerExecution(int)` | Cap partitions per run; issues ContinueAsNew for the rest | +| `PartitionSleep(time.Duration)` | Sleep between partitions (emits heartbeats) | +| `ActivityRetry(temporal.RetryPolicy)` | Override default retry policy | +| `ActivityTimeouts(startToClose, heartbeat time.Duration)` | Override default timeouts | +| `RateLimitRetry(RateLimitOpts)` | Decorator for API rate-limit backoff | +| `Disabled(bool)` | Create schedule in paused state | + +The generic `ChunkedSync` builder has the same set of methods, but `Fetcher` accepts +`PartitionFetcher[In, K]` and `WithTracker` accepts `ProgressTracker[K]` instead of +the `time.Time` adapter types. + +### Partition and result types + +```go +// Partition is a half-open range [Start, End) for one unit of work. +type Partition[K cmp.Ordered] struct { + Start K `json:"start"` + End K `json:"end"` +} + +// SyncResult aggregates all partitions processed in one workflow execution. +type SyncResult[K cmp.Ordered] struct { + JobName string + TotalPartitions int + TotalFetched int + TotalInserted int + TotalUpdated int + TotalSkipped int + Partitions []PartitionResult[K] +} +``` + +For `DateChunkedSync`, `K` is `int64`. Convert keys back to `time.Time` with +`chunk.KeyToTime(k)` and `time.Time` to keys with `chunk.TimeToKey(t)`. + +### Partitioner[K] + +The `Partitioner[K]` interface generates the ordered list of partitions for a sync run. +Implementations must be deterministic — the output is captured in Temporal history. + +`DatePartitioner` (used internally by `DateChunkedSync`) implements `Partitioner[int64]` +and emits calendar-day-aligned partitions. Library consumers do not normally use it directly. + +### PartitionFetcher[T, K] + +```go +type PartitionFetcher[T any, K cmp.Ordered] func(ctx context.Context, start, end K) ([]T, error) +``` + +A function that fetches records of type `T` for a single partition `[start, end)`. +Respect `ctx` cancellation. + +For `DateChunkedSync`, use the `TimeFetcher[In]` alias: + +```go +type TimeFetcher[In any] func(ctx context.Context, start, end time.Time) ([]In, error) +``` + +### ProgressTracker[K] + +```go +type ProgressTracker[K cmp.Ordered] interface { + Cursor(ctx context.Context, jobName string) (K, bool, error) + Advance(ctx context.Context, jobName string, completed K) error +} +``` + +Persists the last-completed partition end key so the workflow can skip +already-processed partitions on the next execution. The `bool` from `Cursor` +indicates whether a cursor has been recorded — the zero value of `K` may be +a meaningful key. + +For `DateChunkedSync`, use `TimeProgressTracker`: + +```go +type TimeProgressTracker interface { + Cursor(ctx context.Context, jobName string) (time.Time, bool, error) + Advance(ctx context.Context, jobName string, completed time.Time) error +} +``` + +Implementations are not provided by go-wf. Provide your own (e.g., Postgres-backed) +and pass it via `.WithTracker(...)`. + +## Resumable Backfills + +When `.WithTracker(t)` is set, the workflow: + +1. Calls `t.Cursor(ctx, jobName)` to read the last-completed partition end. +2. Filters out partitions whose `Start` is before the cursor value. +3. Processes remaining partitions in order. +4. After each successful partition, calls `t.Advance(ctx, jobName, partition.End)`. + +This ensures that, on the next execution, already-processed partitions are skipped. +The `Advance` call is idempotent — the workflow may retry it on failure. + +## ContinueAsNew for Large Partition Lists + +Each partition processed adds events to the Temporal workflow history. For very long +backfills, this can exhaust the history budget. Use `MaxPartitionsPerExecution(n)` to +cap the number of partitions per execution: + +```go +def, err := chunk.NewDateChunkedSync[Order, Order]("orders-backfill"). + LookBack(365 * 24 * time.Hour). // 1 year of history + ChunkSize(24 * time.Hour). // 365 partitions total + Fetcher(fetcher). + Mapper(mapper). + Sink(sink). + WithTracker(pgTracker). // required with MaxPartitionsPerExecution + MaxPartitionsPerExecution(30). // process 30 days per execution + Build() +``` + +After processing 30 partitions, the workflow issues `ContinueAsNew` with the same +input. The tracker ensures the next execution resumes at partition 31 rather than +restarting from day 1. + +`MaxPartitionsPerExecution` requires `WithTracker` — without a tracker, every +execution would re-process the same prefix indefinitely. The builder panics at +startup if this combination is invalid. + +## Rate-Limit Handling + +Decorate a fetcher with exponential-backoff retry on API rate-limit errors: + +```go +rateLimited := chunk.WithRateLimitRetry(fetcher, chunk.RateLimitOpts{ + Detector: isHTTP429, // func(error) bool + MaxAttempts: 5, + Sleep: 30 * time.Second, + Sleeper: chunk.HeartbeatSleeper, // emits Temporal heartbeats during sleep +}) + +def, err := chunk.NewDateChunkedSync[Order, Order]("order-sync"). + Fetcher(chunk.TimeFetcher[Order](func(ctx context.Context, start, end time.Time) ([]Order, error) { + return rateLimited(ctx, chunk.TimeToKey(start), chunk.TimeToKey(end)) + })). + // ... + Build() +``` + +Or use `.RateLimitRetry(opts)` on the builder to apply the decorator automatically: + +```go +def, err := chunk.NewDateChunkedSync[Order, Order]("order-sync"). + Fetcher(myFetcher). + RateLimitRetry(chunk.RateLimitOpts{ + Detector: isHTTP429, + MaxAttempts: 5, + Sleep: 30 * time.Second, + }). + Build() +``` + +### HeartbeatSleeper + +`chunk.HeartbeatSleeper` sleeps for a duration while emitting Temporal activity +heartbeats every 10 seconds, so the activity does not time out during long sleeps: + +```go +func HeartbeatSleeper(ctx context.Context, d time.Duration) +``` + +It is the default `Sleeper` in `RateLimitOpts`. Panics if called outside an +activity context (where `activity.RecordHeartbeat` is not valid). + +## Scheduling + +Use one of the three schedule setters on the builder before calling `.Build()`: + +| Method | Underlying `job.ScheduleSpec` field | +|---|---| +| `.ScheduleEvery(d time.Duration)` | `Interval: d` | +| `.ScheduleCron(expr string)` | `Cron: expr` | +| `.ScheduleRaw(spec *job.ScheduleSpec)` | Full spec — use for jitter, overlap, calendar specs, or paused-on-create | + +After building, apply the schedule to Temporal: + +```go +if err := def.ApplySchedule(ctx, c); err != nil { + log.Fatalf("schedule: %v", err) +} +``` + +`ApplySchedule` creates the schedule if it does not exist, or updates it if it does. +The schedule ID equals `def.Name`. + +See [Job Definition](job-definition.md) for the full `*job.ScheduleSpec` shape, +overlap policies, and schedule lifecycle methods (`PauseSchedule`, `ResumeSchedule`, +`TriggerSchedule`, `DeleteSchedule`, `DescribeSchedule`). + +## Activity Defaults + +When no explicit timeouts or retry policy are configured: + +| Setting | Default | +|---|---| +| `StartToCloseTimeout` (partition activity) | 20 minutes | +| `HeartbeatTimeout` (partition activity) | 15 minutes | +| `MaximumAttempts` | 3 | +| `InitialInterval` | 60 seconds | +| `BackoffCoefficient` | 2.0 | +| `MaximumInterval` | 5 minutes | + +Cursor read/advance activities use shorter timeouts (10 seconds start-to-close, +3 retries with 2-second initial interval) because they are lightweight metadata +operations against a local store. + +## See Also + +- [DataSync Workflows](datasync-workflows.md) — the simpler single-activity sync pattern +- [Job Definition](job-definition.md) — the `*job.Definition` shape every builder produces +- `examples/datasync/chunk_basic.go` — complete runnable example +- `datasync/chunk/doc.go` — package-level godoc diff --git a/docs/container-workflows.md b/docs/container-workflows.md index 1e538b7..ec36472 100644 --- a/docs/container-workflows.md +++ b/docs/container-workflows.md @@ -149,13 +149,23 @@ Options include `WithHTTPMethod`, `WithHTTPHeader`, `WithHTTPBody`, ## Builder API +The `container/builder` package provides a fluent API that produces a `*job.Definition` +ready for registration and execution. Using the builder is preferred over constructing +workflow inputs manually. + ### WorkflowBuilder -Fluent builder for composing `ContainerExecutionInput` lists from -`WorkflowSource` objects: +`NewWorkflowBuilder()` takes no arguments. Set the name with `.Name(string)`. ```go -pipeline, err := builder.NewWorkflowBuilder("ci-pipeline"). +import ( + "github.com/jasoet/go-wf/container/builder" + "github.com/jasoet/pkg/v2/temporal/job" +) + +def, err := builder.NewWorkflowBuilder(). + Name("ci-pipeline"). + Pipeline(). // select execution mode Add(buildStep). // WorkflowSource Add(testStep). Add(deployStep). @@ -163,54 +173,100 @@ pipeline, err := builder.NewWorkflowBuilder("ci-pipeline"). StopOnError(true). WithTimeout(5 * time.Minute). WithAutoRemove(true). - BuildPipeline() // returns *payload.PipelineInput + Build() // returns (*job.Definition, error) +if err != nil { + log.Fatal(err) +} ``` -Build methods: +**Identity setters (required):** -| Method | Returns | Description | -|---|---|---| -| `BuildPipeline()` | `*payload.PipelineInput` | Sequential execution | -| `BuildParallel()` | `*payload.ParallelInput` | Concurrent execution | -| `BuildSingle()` | `*payload.ContainerExecutionInput` | First container only | -| `Build()` | `interface{}` | Auto-selects based on `Parallel()` flag | -| `BuildGenericPipeline()` | `*workflow.PipelineInput[...]` | Generic pipeline type | -| `BuildGenericParallel()` | `*workflow.ParallelInput[...]` | Generic parallel type | +| Method | Description | +|---|---| +| `.Name(string)` | Job name — also used as workflow ID prefix | +| `.TaskQueue(string)` | Override task queue (default: `"container-"`) | + +**Mode setters (pick exactly one before calling Build):** + +| Method | Registered Temporal workflow | +|---|---| +| `.Pipeline()` | `ContainerPipelineWorkflow` — sequential, stop-on-error | +| `.Parallel()` | `ParallelContainersWorkflow` — concurrent | +| `.Single()` | `ExecuteContainerWorkflow` — single container | + +**Configuration:** + +`StopOnError(bool)`, `Cleanup(bool)`, `FailFast(bool)`, `MaxConcurrency(int)`, +`WithTimeout(time.Duration)`, `WithAutoRemove(bool)`. -Configuration: `StopOnError(bool)`, `Cleanup(bool)`, `Parallel(bool)`, -`FailFast(bool)`, `MaxConcurrency(int)`, `WithTimeout(Duration)`, -`WithAutoRemove(bool)`. +**`.Build()` returns `(*job.Definition, error)`.** The Definition handles +worker registration and workflow execution. Consume it like this: + +```go +// Register with a Temporal worker +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) + +// Execute a workflow run +run, err := def.Execute(ctx, c, def.NewInput()) +``` + +`def.Register(w)` is idempotent — `container.RegisterAll(w)` is called +internally via `job.RegisterWorkflowOnce` / `job.RegisterActivityOnce`, so +multiple Definitions sharing the same worker will not double-register workflows +or activities. + +**Lower-level raw-input helpers** (`BuildPipeline()`, `BuildParallel()`, `BuildSingle()`, +`BuildGenericPipeline()`, `BuildGenericParallel()`) remain available on +`WorkflowBuilder` for callers that only need the payload struct without a full +`job.Definition`. Prefer `Build()` for new code. ### LoopBuilder -Builds loop workflow inputs for iterating over items or parameter matrices: +Builds loop workflow inputs for iterating over items or parameter matrices. `Build()` returns +`(*job.Definition, error)`. + +```go +def, err := builder.NewLoopBuilder([]string{"file1.csv", "file2.csv"}). + Name("process-files"). + WithTemplate(containerTemplate). + Parallel(true). + MaxConcurrency(3). + Build() +``` + +Or use the convenience constructor: ```go -loop, err := builder.ForEach( +def, err := builder.ForEach( []string{"file1.csv", "file2.csv"}, containerTemplate, -).Parallel(true).MaxConcurrency(3).BuildLoop() +).Name("process-files").Parallel(true).MaxConcurrency(3).Build() ``` For parameterized loops (cartesian product of parameters): ```go -loop, err := builder.ForEachParam( +def, err := builder.ForEachParam( map[string][]string{ "env": {"staging", "prod"}, "region": {"us-west", "eu-central"}, }, deployTemplate, -).Parallel(true).FailFast(true).BuildParameterizedLoop() +).Name("multi-region-deploy").Parallel(true).FailFast(true).Build() ``` Template substitution placeholders: `{{item}}`, `{{index}}`, and `{{.paramName}}` for parameterized loops. +`BuildLoop()` and `BuildParameterizedLoop()` return raw input structs without +a Definition, for callers that manage Temporal options manually. + ### GenericBuilder For non-container use cases, `GenericBuilder[I, O]` provides the same fluent API -over arbitrary `TaskInput`/`TaskOutput` types: +over arbitrary `TaskInput`/`TaskOutput` types and returns raw input structs +(not a `*job.Definition`): ```go gb := builder.NewGenericBuilder[*MyInput, MyOutput]() @@ -469,7 +525,45 @@ container.QueryWorkflow(ctx, client, workflowID, runID, "status", &result) ## Worker Setup -Register all container workflows and activities with a single call: +### Builder-based (preferred) + +When you use `container.WorkflowBuilder` or `container.LoopBuilder`, the resulting +`*job.Definition` handles registration automatically: + +```go +import ( + "github.com/jasoet/go-wf/container/builder" + "github.com/jasoet/pkg/v2/temporal" + "go.temporal.io/sdk/worker" +) + +def, err := builder.NewWorkflowBuilder(). + Name("nightly-deploy"). + Pipeline(). + Add(deployStep). + Build() +if err != nil { + log.Fatal(err) +} + +c, err := temporal.NewClient(temporal.DefaultConfig()) +if err != nil { + log.Fatal(err) +} +defer c.Close() + +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) // calls container.RegisterAll internally; idempotent +w.Run(worker.InterruptCh()) +``` + +`def.Register(w)` calls `container.RegisterAll(w)` via idempotent helpers, so +multiple Definitions on the same worker register the shared workflow/activity +types only once. + +### Manual registration + +If you are dispatching workflows via the lower-level `c.ExecuteWorkflow` path: ```go w := worker.New(temporalClient, "container-queue", worker.Options{}) @@ -480,7 +574,10 @@ w.Run(worker.InterruptCh()) `RegisterAll` calls `RegisterWorkflows` (which registers `ExecuteContainerWorkflow`, `ContainerPipelineWorkflow`, `ParallelContainersWorkflow`, `LoopWorkflow`, `ParameterizedLoopWorkflow`, `DAGWorkflow`, and `WorkflowWithParameters`) and -`RegisterActivities` (which registers the OTel-instrumented -`StartContainerActivity`). +`RegisterActivities` (which registers the OTel-instrumented `StartContainerActivity`). +`container.RegisterAll` is now safe to call multiple times — repeated calls for an +already-registered (worker, type) pair are silently ignored. For finer control, call `RegisterWorkflows` and `RegisterActivities` separately. + +See [Job Definition](job-definition.md) for the full `*job.Definition` API. diff --git a/docs/datasync-workflows.md b/docs/datasync-workflows.md index 379392d..eb44012 100644 --- a/docs/datasync-workflows.md +++ b/docs/datasync-workflows.md @@ -165,9 +165,12 @@ type Job[T any, U any] struct { Use `SyncJobBuilder` for fluent job construction with validation: ```go -import "github.com/jasoet/go-wf/datasync/builder" +import ( + "github.com/jasoet/go-wf/datasync/builder" + "github.com/jasoet/pkg/v2/temporal/job" +) -job, err := builder.NewSyncJobBuilder[APIUser, DBUser]("user-sync"). +def, err := builder.NewSyncJobBuilder[APIUser, DBUser]("user-sync"). WithSource(apiSource). WithMapper(mapper). WithSink(dbSink). @@ -178,13 +181,40 @@ job, err := builder.NewSyncJobBuilder[APIUser, DBUser]("user-sync"). WithRetryBackoffCoefficient(2.0). WithMetadata(map[string]string{"team": "platform"}). Build() +if err != nil { + log.Fatal(err) +} +``` + +`Build()` returns `(*job.Definition, error)` and validates that name, source, mapper, sink, and a positive schedule are all set. A non-nil `*job.Definition` is ready to register with a worker and execute via the Temporal client. + +The caller registers and starts the workflow using the Definition methods directly: + +```go +// Register on a worker +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) + +// Execute a run +input := def.NewInput().(*payload.SyncExecutionInput) +input.JobName = def.Name +run, err := def.Execute(ctx, c, input) +``` + +For recurring execution, attach a schedule at build time and apply it: + +```go +// Schedule is already embedded when WithSchedule is set in the builder. +// Apply it to Temporal after the client is available: +err = def.ApplySchedule(ctx, c) ``` -`Build()` validates that name, source, mapper, sink, and a positive schedule are all set, returning an error if any are missing. +See [Job Definition](job-definition.md) for the full `*job.Definition` API surface. ## Runner -`Runner` executes a single fetch-map-write cycle in-process, useful for testing and simple sync without Temporal: +`Runner` executes a single fetch-map-write cycle in-process, useful for testing and simple sync without Temporal. +Unlike the builder, `Runner` does not involve Temporal at all — it is a plain Go struct. ```go runner := datasync.NewRunner(source, mapper, sink) @@ -209,50 +239,21 @@ type Result struct { } ``` -## Registration and Temporal Workflows - -### JobRegistration - -`BuildRegistration` extracts type-erased metadata from a typed `Job`, useful for listing registered jobs without generic type information: - -```go -reg := datasync.BuildRegistration(job, false) -// reg.Name, reg.Schedule, reg.Disabled, reg.SourceName, reg.SinkName -``` - -### Workflow Registration +## Worker Setup -The `datasync/workflow` package provides `RegisterJob` and `BuildJobRegistration` to wire jobs into a Temporal worker: +A complete worker setup using the builder: ```go import ( - syncwf "github.com/jasoet/go-wf/datasync/workflow" + "github.com/jasoet/go-wf/datasync" + "github.com/jasoet/go-wf/datasync/builder" + "github.com/jasoet/pkg/v2/temporal" "go.temporal.io/sdk/worker" ) -// Register a job with a Temporal worker -w := worker.New(client, syncwf.TaskQueue("user-sync"), worker.Options{}) -syncwf.RegisterJob(w, job) -``` - -`BuildJobRegistration` creates a `FullJobRegistration` that bundles everything needed to register and schedule a job: - -```go -reg := syncwf.BuildJobRegistration(job, false) -// reg.Name, reg.TaskQueue, reg.Schedule, reg.WorkflowInput -// reg.Register(w) -- registers workflow + activities with a worker -``` - -Each job gets its own task queue named `sync-`, and the workflow and activity are registered with names derived from the job name. - -## Worker Setup - -A complete worker setup typically looks like this: - -```go func main() { - // Build the job - job, err := builder.NewSyncJobBuilder[APIUser, DBUser]("user-sync"). + // Build the Definition + def, err := builder.NewSyncJobBuilder[APIUser, DBUser]("user-sync"). WithSource(newAPISource()). WithMapper(datasync.NewRecordMapper[APIUser, DBUser]("user-mapper", convertUser)). WithSink(datasync.NewInsertIfAbsentSink[DBUser, string]("user-sink", getID, findUser, createUser)). @@ -263,12 +264,22 @@ func main() { log.Fatal(err) } - // Create Temporal client and worker - c, _ := client.Dial(client.Options{}) - w := worker.New(c, syncwf.TaskQueue(job.Name), worker.Options{}) + // Create Temporal client + c, err := temporal.NewClient(temporal.DefaultConfig()) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + // Create worker and register + w := worker.New(c, def.TaskQueue, worker.Options{}) + def.Register(w) + + // Optionally create or update the Temporal schedule + if err := def.ApplySchedule(context.Background(), c); err != nil { + log.Printf("schedule apply: %v", err) + } - // Register and start - syncwf.RegisterJob(w, job) w.Run(worker.InterruptCh()) } ``` @@ -287,3 +298,9 @@ The activity layer automatically records OpenTelemetry metrics and spans: - **Metrics**: `syncOpsTotal` (counter with success/error status), `syncOpsDuration` (histogram), `syncRecordsFetched`, `syncRecordsWritten`. - **Traces**: Nested spans for Fetch, Map, and Write operations using the `pkgotel.Layers` API. - **Heartbeats**: Temporal activity heartbeats are sent after fetch and write phases. + +## Partitioned and Date-Range Sync + +For workloads that need to process data in date/time partitions — for example, syncing orders day-by-day over a rolling 7-day window — see [Chunked Sync Workflows](chunk-workflows.md). The `datasync/chunk` package provides cursor-based resume, `ContinueAsNew` for unbounded history, and a `DateChunkedSync` helper that handles time-to-key projection automatically. + +See also [Job Definition](job-definition.md) for the `*job.Definition` type that every builder produces. diff --git a/docs/function-workflows.md b/docs/function-workflows.md index 009a30e..7dc46ee 100644 --- a/docs/function-workflows.md +++ b/docs/function-workflows.md @@ -109,55 +109,85 @@ placeholders like `{{item}}` are allowed and validated at execution time). ## Builder API -The `function/builder` package provides a fluent API to construct workflow -inputs without manually assembling structs. +The `function/builder` package provides a fluent API that produces a `*job.Definition` +ready for registration and execution. Using the builder is preferred over constructing +workflow inputs manually. -### WorkflowBuilder / NewFunctionBuilder +### WorkflowBuilder + +`NewWorkflowBuilder[I, O]()` is generic. `NewFunctionBuilder()` is a convenience +alias pre-specialized for `*payload.FunctionExecutionInput` / `payload.FunctionExecutionOutput`. ```go -import "github.com/jasoet/go-wf/function/builder" +import ( + "github.com/jasoet/go-wf/function/builder" + "github.com/jasoet/go-wf/function/activity" + "github.com/jasoet/pkg/v2/temporal/job" +) + +activityFn := activity.NewExecuteFunctionActivity(registry) -pipelineInput, err := builder.NewFunctionBuilder("my-pipeline"). +def, err := builder.NewFunctionBuilder(). + Name("my-pipeline"). + Activity(activityFn). + Pipeline(). // select execution mode Add(&payload.FunctionExecutionInput{Name: "step-a", Args: map[string]string{"key": "val"}}). Add(&payload.FunctionExecutionInput{Name: "step-b"}). StopOnError(true). - BuildPipeline() + Build() // returns (*job.Definition, error) +if err != nil { + log.Fatal(err) +} ``` -`NewFunctionBuilder` is a convenience wrapper around the generic -`NewWorkflowBuilder` pre-specialized for function execution types. - -Key methods: +**Identity setters (required before `Build`):** | Method | Description | |---|---| -| `Add(input)` | Append a task input | -| `StopOnError(bool)` | Stop pipeline on first error | -| `Parallel(bool)` | Switch to parallel mode | -| `FailFast(bool)` | Stop parallel execution on first failure | -| `MaxConcurrency(int)` | Limit concurrent parallel tasks | -| `BuildPipeline()` | Build a `workflow.PipelineInput` | -| `BuildParallel()` | Build a `workflow.ParallelInput` | -| `BuildSingle()` | Build a single-task input | -| `Build()` | Build pipeline or parallel based on mode | +| `.Name(string)` | Job name — also used as workflow ID prefix | +| `.Activity(fn)` | The activity function returned by `activity.NewExecuteFunctionActivity` | +| `.TaskQueue(string)` | Override task queue (default: `"function-"`) | + +**Mode setters (pick exactly one before calling `Build`):** + +| Method | Registered Temporal workflow | +|---|---| +| `.Pipeline()` | `FunctionPipelineWorkflow` — sequential, stop-on-error | +| `.Parallel()` | `ParallelFunctionsWorkflow` — concurrent | +| `.Single()` | `ExecuteFunctionWorkflow` — single function | + +**Configuration:** `StopOnError(bool)`, `FailFast(bool)`, `MaxConcurrency(int)`. + +**`.Build()` returns `(*job.Definition, error)`.** Consume the Definition like this: + +```go +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) // calls fn.RegisterAll internally; idempotent + +run, err := def.Execute(ctx, c, def.NewInput()) +``` + +`def.Register(w)` calls `fn.RegisterAll(w, activityFn)` via idempotent helpers. +`fn.RegisterAll` is safe to call multiple times — repeated registrations for an +already-registered (worker, type) pair are silently ignored. ### LoopBuilder -Loop builders construct loop workflow inputs from a template and a set of items -or parameter combinations. +`LoopBuilder` produces a `*job.Definition` for item-based or parameterized loops. +`Build()` returns `(*job.Definition, error)`. ```go // Simple item loop — {{item}} in template args is replaced per iteration -loopInput, err := builder.ForEach( +def, err := builder.ForEach( []string{"file1.csv", "file2.csv"}, payload.FunctionExecutionInput{ Name: "process-file", Args: map[string]string{"file": "{{item}}"}, }, -).Parallel(true).BuildLoop() +).Name("process-files").Activity(activityFn).Parallel(true).Build() // Parameterized loop — {{.key}} placeholders are replaced with cross-product values -paramInput, err := builder.ForEachParam( +def, err := builder.ForEachParam( map[string][]string{ "environment": {"dev", "staging"}, "region": {"us-west", "eu-central"}, @@ -169,12 +199,15 @@ paramInput, err := builder.ForEachParam( "region": "{{.region}}", }, }, -).Parallel(true).FailFast(true).BuildParameterizedLoop() +).Name("multi-region-deploy").Activity(activityFn).Parallel(true).FailFast(true).Build() ``` Convenience constructors: `NewFunctionLoopBuilder(items)` and `NewFunctionParameterizedLoopBuilder(params)`. +`BuildLoop()` and `BuildParameterizedLoop()` return raw input structs (without a +Definition), for callers that manage Temporal options manually. + ### DAG Builder The DAG builder constructs a `DAGWorkflowInput` with dependency edges and data @@ -284,7 +317,52 @@ input, err := patterns.CIPipeline() ## Worker Setup -Register all function workflows and the activity on your Temporal worker: +### Builder-based (preferred) + +When you use `function.WorkflowBuilder` or `function.LoopBuilder`, the resulting +`*job.Definition` handles registration automatically: + +```go +import ( + fn "github.com/jasoet/go-wf/function" + "github.com/jasoet/go-wf/function/activity" + "github.com/jasoet/go-wf/function/builder" + "github.com/jasoet/pkg/v2/temporal" + "go.temporal.io/sdk/worker" +) + +registry := fn.NewRegistry() +// ... register handlers ... +activityFn := activity.NewExecuteFunctionActivity(registry) + +def, err := builder.NewFunctionBuilder(). + Name("greet-pipeline"). + Activity(activityFn). + Pipeline(). + Add(&fn.FunctionExecutionInput{Name: "greet"}). + Build() +if err != nil { + log.Fatal(err) +} + +c, err := temporal.NewClient(temporal.DefaultConfig()) +if err != nil { + log.Fatal(err) +} +defer c.Close() + +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) // calls fn.RegisterAll internally; idempotent +w.Run(worker.InterruptCh()) +``` + +`def.Register(w)` calls `fn.RegisterAll(w, activityFn)` via idempotent helpers. +`fn.RegisterAll` is safe to call multiple times — repeated (worker, type) pairs +are silently deduped. + +### Manual registration + +For lower-level use or when registering DAG workflows separately: ```go import ( @@ -329,3 +407,5 @@ Call `fn.SetActivityInstrumenter(wrapper)` during initialization to wrap the activity with OpenTelemetry spans. This must be called once before `RegisterActivity`; subsequent calls are ignored. See [Observability](observability.md) for details. + +See [Job Definition](job-definition.md) for the full `*job.Definition` API. diff --git a/docs/getting-started.md b/docs/getting-started.md index 90c12db..d94f957 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -29,19 +29,20 @@ import ( "time" "github.com/jasoet/pkg/v2/temporal" - "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" fn "github.com/jasoet/go-wf/function" fnactivity "github.com/jasoet/go-wf/function/activity" + fnbuilder "github.com/jasoet/go-wf/function/builder" "github.com/jasoet/go-wf/function/payload" - "github.com/jasoet/go-wf/function/workflow" ) func main() { - c, closer, _ := temporal.NewClient(temporal.DefaultConfig()) + c, err := temporal.NewClient(temporal.DefaultConfig()) + if err != nil { + log.Fatal(err) + } defer c.Close() - if closer != nil { defer closer.Close() } // 1. Register a handler registry := fn.NewRegistry() @@ -52,27 +53,39 @@ func main() { }, nil }) - // 2. Create and start a worker - w := worker.New(c, "function-tasks", worker.Options{}) - fn.RegisterWorkflows(w) - fn.RegisterActivity(w, fnactivity.NewExecuteFunctionActivity(registry)) + // 2. Build a job Definition + def, err := fnbuilder.NewWorkflowBuilder[*payload.FunctionExecutionInput, payload.FunctionExecutionOutput](). + Name("greet-job"). + Single(). + Activity(fnactivity.NewExecuteFunctionActivity(registry)). + Build() + if err != nil { + log.Fatal(err) + } + + // 3. Register on a worker and start + w := worker.New(c, def.TaskQueue, worker.Options{}) + def.Register(w) go func() { _ = w.Run(worker.InterruptCh()) }() defer w.Stop() time.Sleep(time.Second) - // 3. Execute the workflow - we, _ := c.ExecuteWorkflow(context.Background(), - client.StartWorkflowOptions{ID: "greet-example", TaskQueue: "function-tasks"}, - workflow.ExecuteFunctionWorkflow, - payload.FunctionExecutionInput{Name: "greet", Args: map[string]string{"name": "Temporal"}}, + // 4. Execute the workflow + run, err := def.Execute(context.Background(), c, + &payload.FunctionExecutionInput{Name: "greet", Args: map[string]string{"name": "Temporal"}}, ) + if err != nil { + log.Fatal(err) + } var result payload.FunctionExecutionOutput - _ = we.Get(context.Background(), &result) + _ = run.Get(context.Background(), &result) log.Printf("Result: %v", result.Result) } ``` +`def.TaskQueue` defaults to `"function-"` when not overridden with `.TaskQueue(...)`. For more details see [Function Workflows](function-workflows.md) and [docs/job-definition.md](job-definition.md). + ## Quick Start: Container Workflow A container workflow runs a Podman/Docker container as a Temporal activity. @@ -86,49 +99,82 @@ import ( "time" "github.com/jasoet/pkg/v2/temporal" - "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" - "github.com/jasoet/go-wf/container" + cbuilder "github.com/jasoet/go-wf/container/builder" "github.com/jasoet/go-wf/container/payload" - "github.com/jasoet/go-wf/container/workflow" ) func main() { - c, closer, _ := temporal.NewClient(temporal.DefaultConfig()) + c, err := temporal.NewClient(temporal.DefaultConfig()) + if err != nil { + log.Fatal(err) + } defer c.Close() - if closer != nil { defer closer.Close() } - // 1. Create and start a worker - w := worker.New(c, "container-tasks", worker.Options{}) - container.RegisterAll(w) + // 1. Build a job Definition + def, err := cbuilder.NewWorkflowBuilder(). + Name("postgres-example"). + Single(). + AddInput(payload.ContainerExecutionInput{ + Image: "postgres:16-alpine", + Env: map[string]string{"POSTGRES_PASSWORD": "test", "POSTGRES_USER": "test"}, + AutoRemove: true, + Name: "example-postgres", + WaitStrategy: payload.WaitStrategyConfig{ + Type: "log", LogMessage: "ready to accept connections", + StartupTimeout: 30 * time.Second, + }, + }). + Build() + if err != nil { + log.Fatal(err) + } + + // 2. Register on a worker and start + w := worker.New(c, def.TaskQueue, worker.Options{}) + def.Register(w) go func() { _ = w.Run(worker.InterruptCh()) }() defer w.Stop() time.Sleep(time.Second) - // 2. Define and execute the container task - input := payload.ContainerExecutionInput{ - Image: "postgres:16-alpine", - Env: map[string]string{"POSTGRES_PASSWORD": "test", "POSTGRES_USER": "test"}, - AutoRemove: true, - Name: "example-postgres", - WaitStrategy: payload.WaitStrategyConfig{ - Type: "log", LogMessage: "ready to accept connections", - StartupTimeout: 30 * time.Second, - }, + // 3. Execute the workflow + run, err := def.Execute(context.Background(), c, def.NewInput()) + if err != nil { + log.Fatal(err) } - we, _ := c.ExecuteWorkflow(context.Background(), - client.StartWorkflowOptions{ID: "postgres-example", TaskQueue: "container-tasks"}, - workflow.ExecuteContainerWorkflow, input, - ) - var result payload.ContainerExecutionOutput - _ = we.Get(context.Background(), &result) + _ = run.Get(context.Background(), &result) log.Printf("Container ID: %s, Exit Code: %d", result.ContainerID, result.ExitCode) } ``` +`def.TaskQueue` defaults to `"container-"`. For more details see [Container Workflows](container-workflows.md) and [docs/job-definition.md](job-definition.md). + +## Quick Start: Chunked DataSync + +For large datasets that must be processed in partitions (e.g., by date range), use `datasync/chunk.ChunkedSync` instead of the plain datasync builder. It walks partitions sequentially, supports cursor-based resume across executions, and issues `ContinueAsNew` when the partition list is too large for one workflow history. + +```go +import ( + "github.com/jasoet/go-wf/datasync/chunk" + "github.com/jasoet/pkg/v2/temporal/job" +) + +def, err := chunk.NewChunkedSync[OrderRow, OrderRecord, time.Time]("orders-sync"). + Partitioner(datePartitioner). + Fetcher(orderFetcher). + Mapper(orderMapper). + Sink(orderSink). + WithTracker(cursorStore). + MaxPartitionsPerExecution(50). + ScheduleEvery(6 * time.Hour). + Build() +``` + +`def` is a standard `*job.Definition` — use `def.Register(w)` and `def.Execute(...)` as with any other builder. For the full API see [DataSync Workflows](datasync-workflows.md). + ## Running Examples List all available examples: diff --git a/docs/job-definition.md b/docs/job-definition.md new file mode 100644 index 0000000..380a5fd --- /dev/null +++ b/docs/job-definition.md @@ -0,0 +1,316 @@ +# Job Definitions + +The `pkg/v2/temporal/job` package provides `*job.Definition` — a type-focused, +transport-agnostic abstraction for one registered Temporal workflow. Every go-wf +builder produces a `*job.Definition`, and the same type is the basis for a +registry that lets consumer applications expose workflow management (list runs, +trigger by name, attach schedules, cancel, terminate) without hard-coded type +switches. + +## Why This Exists + +Multiple consumer projects wrap the Temporal SDK to expose workflow management. +Each re-implements the same pattern: a `map[string]workflowFn` plus a `switch` +on type name in the trigger handler. The `Definition` and `Registry` abstraction +replaces that pattern with a typed object that owns all per-job operations. + +## The Type + +A `*job.Definition` holds: + +| Field | Purpose | +|---|---| +| `Name` | Logical job identifier; used as workflow ID prefix and for `ListRuns` scoping | +| `TaskQueue` | Temporal task queue name | +| `Description` | Optional human-readable description | +| `Tags` | Optional user-defined tags | +| `Schedule` | Optional `*job.ScheduleSpec` for recurring triggers | + +These fields are read-only after construction. Internal wiring (register, execute, +newInput closures) is set via `Option` functions and is not exported. + +## Temporal Client + +```go +c, err := temporal.NewClient(temporal.DefaultConfig()) +if err != nil { + log.Fatal(err) +} +defer c.Close() +``` + +`temporal.NewClient` returns two values: `(client.Client, error)`. There is no +separate closer — call `c.Close()` directly. + +## Registration and Execution + +```go +// Register workflows and activities on a worker (idempotent). +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) + +// Get a typed zero-value of the workflow input. +input := def.NewInput() + +// Start a workflow run. Workflow ID defaults to "-". +run, err := def.Execute(ctx, c, input) +// run.WorkflowID, run.RunID + +// Attach to an existing run. +handle := def.GetRun(c, workflowID, runID) +``` + +`def.Register(w)` is safe to call multiple times and concurrently. The +builder-supplied registration closure uses `job.RegisterWorkflowOnce` and +`job.RegisterActivityOnce` for deduplication, so multiple Definitions that share +the same underlying workflow type (e.g., multiple container Definitions on one +worker) register that type only once. + +### Execute Options + +```go +run, err := def.Execute(ctx, c, input, + job.WithWorkflowID("my-custom-id"), // override "-" + job.WithTimeout(10*time.Minute), // WorkflowExecutionTimeout + job.WithTaskTimeout(30*time.Second), // WorkflowTaskTimeout + job.WithRetryPolicy(&temporal.RetryPolicy{MaximumAttempts: 1}), + job.WithMemo(map[string]any{"env": "prod"}), +) +``` + +## Per-Run Lifecycle + +All methods accept `wfID` (workflow ID) and `runID` (run ID). Pass `runID = ""` +to target the latest run. + +### Describe + +```go +detail, err := def.Describe(ctx, c, workflowID, runID) +// detail.Status, detail.StartTime, detail.CloseTime, detail.HistoryLength +// detail.Memo, detail.SearchAttributes +``` + +### History + +Returns a bounded activity-event extraction of the workflow history: + +```go +hist, err := def.History(ctx, c, workflowID, runID, job.HistoryOpts{MaxEvents: 200}) +for _, act := range hist.Activities { + fmt.Printf("%s %s attempt=%d duration=%s\n", + act.Name, act.Status, act.Attempt, act.Duration) +} +if hist.Truncated { + fmt.Println("history truncated — increase MaxEvents or reduce range") +} +``` + +### Cancel and Terminate + +```go +// Request graceful cancellation (workflow receives CancellationError). +err = def.Cancel(ctx, c, workflowID, runID) + +// Hard-stop immediately. +err = def.Terminate(ctx, c, workflowID, runID, "reason string") +``` + +### Signal and Query + +```go +err = def.Signal(ctx, c, workflowID, runID, "pause", nil) + +result, err := def.Query(ctx, c, workflowID, runID, "status") +``` + +## Per-Job Aggregates + +These methods are scoped to runs whose workflow ID starts with `"-"`. + +### ListRuns + +```go +page, err := def.ListRuns(ctx, c, job.ListOpts{ + Status: []job.Status{job.StatusRunning}, + PageSize: 50, +}) +for _, run := range page.Runs { + fmt.Printf("%s %s %s\n", run.WorkflowID, run.Status, run.StartTime) +} +// Paginate: +if page.NextPageToken != nil { + nextPage, err := def.ListRuns(ctx, c, job.ListOpts{PageToken: page.NextPageToken}) +} +``` + +### Stats + +```go +stats, err := def.Stats(ctx, c, job.StatsOpts{Location: time.Local}) +fmt.Printf("Running: %d CompletedToday: %d FailedToday: %d\n", + stats.Running, stats.CompletedToday, stats.FailedToday) +``` + +## Schedules + +A `*job.ScheduleSpec` describes when a workflow fires automatically. Exactly one +of `Interval`, `Cron`, or `Calendar` must be set. + +```go +type ScheduleSpec struct { + Interval time.Duration // e.g., 15 * time.Minute + Cron string // standard cron expression + Calendar []CalendarSpec // fine-grained calendar rules + + Overlap OverlapPolicy // default: OverlapSkip + Jitter time.Duration + Paused bool + Note string + CatchupWindow time.Duration +} +``` + +`OverlapPolicy` controls what happens when a trigger fires while a previous run is +still in flight: + +| Constant | Behavior | +|---|---| +| `OverlapSkip` | Drop the new trigger (default) | +| `OverlapBufferOne` | Queue one trigger; drop further | +| `OverlapBufferAll` | Queue all triggers | +| `OverlapCancelOther` | Cancel the running execution and start new | +| `OverlapTerminateOther` | Terminate the running execution and start new | +| `OverlapAllowAll` | Allow unlimited parallel runs | + +### Schedule lifecycle methods + +```go +// Create or update the schedule (idempotent). Schedule ID equals def.Name. +err = def.ApplySchedule(ctx, c) + +// Pause and resume. +err = def.PauseSchedule(ctx, c, "maintenance window") +err = def.ResumeSchedule(ctx, c, "maintenance complete") + +// Fire an immediate run outside the normal cadence. +err = def.TriggerSchedule(ctx, c) + +// Remove the schedule permanently. +err = def.DeleteSchedule(ctx, c) + +// Inspect current schedule state. +detail, err := def.DescribeSchedule(ctx, c) +// detail.Paused, detail.NextRunTime, detail.LastRunTime, detail.Spec +``` + +## Registry + +`job.Registry` maps logical job names to Definitions. Use it when a consumer +application needs to operate on jobs by name (e.g., a REST API that triggers +or cancels jobs by name). + +```go +reg := job.NewRegistry(defA, defB, defC) + +// Register all Definitions on a worker (idempotent). +reg.RegisterAll(w) + +// Create or update all schedules. +err = reg.ApplySchedules(ctx, c) + +// Look up by name. +def, ok := reg.Get("orders-sync") + +// Panics if not found — for use in initialization code where absence is a bug. +def = reg.MustGet("orders-sync") + +run, err := def.Execute(ctx, c, input) +``` + +`Add` inserts a Definition at runtime: + +```go +err = reg.Add(newDef) // returns ErrDuplicateName on conflict +``` + +`List()` returns all Definitions sorted alphabetically by Name. +`Names()` returns just the names. + +## Building Definitions in go-wf + +Every go-wf builder's `.Build()` returns `(*job.Definition, error)`: + +```go +// DataSync +def, err := builder.NewSyncJobBuilder[Order, OrderRow]("orders"). + WithSource(src).WithMapper(mapper).WithSink(sink). + WithSchedule(15 * time.Minute). + Build() + +// Chunked DataSync (date-range partitions) +def, err := chunk.NewDateChunkedSync[Order, OrderRow]("orders-chunked"). + LookBack(7*24*time.Hour).ChunkSize(24*time.Hour). + Fetcher(fetcher).Mapper(mapper).Sink(sink). + ScheduleEvery(15 * time.Minute). + MaxPartitionsPerExecution(50). + Build() + +// Function workflow +def, err := function.NewFunctionBuilder(). + Name("greet-pipeline"). + Activity(activityFn). + Pipeline(). + Add(&fn.FunctionExecutionInput{Name: "greet"}). + Build() + +// Container workflow +def, err := container.NewWorkflowBuilder(). + Name("nightly-deploy"). + Single(). + Add(deployTemplate). + Build() +``` + +## Constructing a Definition Directly + +Consumers without go-wf builders can construct a Definition manually using +`job.New`: + +```go +def, err := job.New("my-workflow", "my-task-queue", + job.WithRegister(func(w worker.Worker) { + w.RegisterWorkflow(MyWorkflow) + w.RegisterActivity(MyActivity) + }), + job.WithExecute(func(ctx context.Context, c client.Client, opts client.StartWorkflowOptions, in any) (client.WorkflowRun, error) { + return c.ExecuteWorkflow(ctx, opts, MyWorkflow, in) + }), + job.WithNewInput(func() any { return &MyInput{} }), + job.WithSchedule(&job.ScheduleSpec{Interval: time.Hour}), + job.WithDescription("Processes daily orders"), + job.WithTags("orders", "nightly"), +) +``` + +`job.New` validates that `name`, `taskQueue`, and all three required closures +(`WithRegister`, `WithExecute`, `WithNewInput`) are present. It also validates the +`ScheduleSpec` if one is provided. + +`job.RegisterWorkflowOnce` and `job.RegisterActivityOnce` are available for +builder authors who want to write idempotent registration closures: + +```go +job.WithRegister(func(w worker.Worker) { + job.RegisterWorkflowOnce(w, "MyWorkflow", MyWorkflow, workflow.RegisterOptions{Name: "MyWorkflow"}) + job.RegisterActivityOnce(w, "MyActivity", MyActivity, activity.RegisterOptions{Name: "MyActivity"}) +}) +``` + +## See Also + +- [DataSync Workflows](datasync-workflows.md) — `SyncJobBuilder` +- [Chunked Sync Workflows](chunk-workflows.md) — `DateChunkedSync` / `ChunkedSync` +- [Container Workflows](container-workflows.md) — `container.WorkflowBuilder` +- [Function Workflows](function-workflows.md) — `function.WorkflowBuilder` +- `pkg/temporal/job/doc.go` in `jasoet/pkg` — package-level godoc diff --git a/examples/container/README.md b/examples/container/README.md index 82a801e..8ea686a 100644 --- a/examples/container/README.md +++ b/examples/container/README.md @@ -407,21 +407,21 @@ go run -tags example patterns-demo.go **Purpose**: Demonstrates advanced builder, template, and source APIs. **Features**: -- `BuildSingle()` for single container execution -- `Build()` auto-select (pipeline or parallel) +- `Build()` returning `*job.Definition` for all execution modes +- `Single()` / `Pipeline()` / `Parallel()` mode selection on `NewWorkflowBuilder()` - `Cleanup()` for cleanup between steps - Constructor options: `WithStopOnError`, `WithParallelMode`, `WithMaxConcurrency`, `WithGlobalAutoRemove` - `ContainerSource` / `NewContainerSource` — wrap payload as WorkflowSource - `AddInput()` for raw ContainerExecutionInput -- `ForEachParam` + `NewParameterizedLoopBuilder` with `BuildParameterizedLoop` +- `ForEachParam` + `NewParameterizedLoopBuilder` with `.Build()` returning `*job.Definition` - `NewGoScript` — Go script template - `NewHTTPWebhook` — webhook notification template - Container options: `WithVolume`, `WithPorts`, `WithLabel`, `WithWaitForLog`, `WithWaitForPort` - Script options: `WithScriptVolume`, `WithScriptPorts` **Examples Included**: -1. **BuildSingle**: Single container via builder with constructor options -2. **Auto-Select Builder**: Build() with ContainerSource and AddInput +1. **Single mode**: Single container via builder with constructor options, `Build()` → `*job.Definition` +2. **Build() with ContainerSource**: `Build()` with ContainerSource and AddInput 3. **Cleanup Pipeline**: Pipeline with Cleanup + rich container options 4. **Parameterized Loop Builder**: ForEachParam and NewParameterizedLoopBuilder 5. **Additional Templates**: NewGoScript, NewHTTPWebhook, advanced container/script options diff --git a/examples/datasync/README.md b/examples/datasync/README.md index 1e09571..21c0c38 100644 --- a/examples/datasync/README.md +++ b/examples/datasync/README.md @@ -31,6 +31,7 @@ task example:datasync -- basic.go task example:datasync -- pipeline.go task example:datasync -- parallel.go task example:datasync -- builder.go +task example:datasync -- chunk_basic.go ``` Each example is self-contained: it creates a datasync job with source/mapper/sink, registers it with a Temporal worker, executes the workflow, and prints results. @@ -72,7 +73,15 @@ Concurrent execution of multiple sync jobs. Three independent data sources are s Fluent builder API for constructing sync jobs with custom record mapping. Transforms User records into UserDTO using a RecordMapper. -**Demonstrates:** Builder pattern, custom RecordMapper, typed transformations, SyncJobBuilder API. +**Demonstrates:** Builder pattern, custom RecordMapper, typed transformations, `SyncJobBuilder.Build()` → `*job.Definition`. + +--- + +### 5. Date-Chunked Sync (`chunk_basic.go`) + +Date-partitioned sync using `datasync/chunk/`. Fetches orders per daily partition using `NewDateChunkedSync`, with LookBack and ChunkSize configuration. + +**Demonstrates:** `chunk.NewDateChunkedSync`, `chunk.TimeFetcher`, cursor-based partition walk, `ChunkedSync.Build()` → `*job.Definition`, `def.Register` / `def.Execute` pattern. ## Worker Setup diff --git a/examples/function/README.md b/examples/function/README.md index 1352286..e2f554c 100644 --- a/examples/function/README.md +++ b/examples/function/README.md @@ -180,28 +180,28 @@ go run -tags example loop.go ### 5. Builder API (`builder.go`) -Fluent builder API for composing workflows programmatically. +Fluent builder API for composing workflows programmatically. All builder paths end with `Build()` returning a `*job.Definition`. **Demonstrates 4 patterns:** -| # | Pattern | Builder Method | Build Method | -|---|---------|---------------|--------------| -| 1 | ETL pipeline | `AddInput()` | `BuildPipeline()` | -| 2 | Parallel pre-flight | `Parallel(true)`, `MaxConcurrency()`, `FailFast()` | `BuildParallel()` | -| 3 | Reusable components | `Add(FunctionSource)` | `BuildPipeline()` | -| 4 | Dynamic inputs | `Add(WorkflowSourceFunc)` | `BuildParallel()` | +| # | Pattern | Builder Method | Mode | +|---|---------|---------------|------| +| 1 | ETL pipeline | `Add()`, `StopOnError()` | `.Pipeline()` | +| 2 | Parallel pre-flight | `MaxConcurrency()`, `FailFast()` | `.Parallel()` | +| 3 | Single function | `Add()` | `.Single()` | +| 4 | Loop over items | `ForEach(items, template)` | `.Loop()` | **Key builder APIs:** -- `NewWorkflowBuilder(name)` — create builder -- `AddInput(FunctionExecutionInput)` — add input directly -- `Add(WorkflowSource)` — add via source interface +- `NewFunctionBuilder()` — create a builder typed for function execution +- `Name(string)` — set the job name (required; default task queue is `"function-"`) +- `Activity(fn)` — set the activity function to register (required) +- `Pipeline()` / `Parallel()` / `Single()` — select execution mode (required) +- `Add(input)` — append an input to the builder - `StopOnError(bool)` — pipeline error handling -- `Parallel(bool)` / `MaxConcurrency(int)` / `FailFast(bool)` — parallel config -- `BuildPipeline()` / `BuildParallel()` / `BuildSingle()` / `Build()` — build output -- `NewFunctionSource(input)` — wrap input as reusable source -- `WorkflowSourceFunc(fn)` — dynamic input generation at build time -- `ForEach(items, template)` — loop builder shortcut -- `ForEachParam(params, template)` — parameterized loop builder shortcut +- `MaxConcurrency(int)` / `FailFast(bool)` — parallel config +- `Build()` — validate and return `*job.Definition` +- `ForEach(items, template)` — shortcut for loop builder +- `ForEachParam(params, template)` — shortcut for parameterized loop builder **Use case:** Programmatic workflow construction, reusable components, dynamic input generation. diff --git a/function/README.md b/function/README.md index 9cc8491..ae7f600 100644 --- a/function/README.md +++ b/function/README.md @@ -20,7 +20,7 @@ Temporal workflow activities for dispatching arbitrary Go functions. Uses a regi ## Quick Example ```go -// Register a handler +// 1. Build a registry and register handlers registry := function.NewRegistry() registry.Register("greet", func(ctx context.Context, input function.FunctionInput) (*function.FunctionOutput, error) { name := input.Args["name"] @@ -29,9 +29,23 @@ registry.Register("greet", func(ctx context.Context, input function.FunctionInpu }, nil }) -// Execute via Temporal -input := function.FunctionExecutionInput{ - Name: "greet", - Args: map[string]string{"name": "World"}, -} +// 2. Build a *job.Definition via the fluent builder +activityFn := activity.NewExecuteFunctionActivity(registry) + +def, err := builder.NewFunctionBuilder(). + Name("greet-job"). + Activity(activityFn). + Single(). + Add(&payload.FunctionExecutionInput{ + Name: "greet", + Args: map[string]string{"name": "World"}, + }). + Build() + +// 3. Register and execute +w := worker.New(c, def.TaskQueue, worker.Options{}) +def.Register(w) +run, err := def.Execute(ctx, c, def.NewInput()) ``` + +See [docs/job-definition.md](../docs/job-definition.md) for the full `*job.Definition` API (lifecycle, scheduling, registry).