Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions INSTRUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go-wf — a Go library providing a generic workflow orchestration core with Dock
**Repository Type:** Library (Go module)
**Module:** `github.com/jasoet/go-wf`
**Key Dependencies:** Temporal SDK, testcontainers-go, pkg/v2, validator/v10, aws-sdk-go-v2
**Convergence Type:** All builders return `*job.Definition` from `github.com/jasoet/pkg/v2/temporal/job`. This is the single type for registration, execution, scheduling, and lifecycle management across all workflow kinds.

## ABSOLUTE RULE — Git Authorship

Expand Down Expand Up @@ -59,7 +60,9 @@ attribute commits to AI. This applies to ALL commits, including those made by to
| `function/workflow/` | Workflow implementations (function, pipeline, parallel, loop, DAG) |
| `datasync/` | Generic data sync core (Source, Sink, Mapper, Job, Runner) |
| `datasync/activity/` | SyncData activity with OTel instrumentation |
| `datasync/builder/` | Fluent builder for Job construction |
| `datasync/builder/` | Fluent builder for Job construction — returns `*job.Definition` |
| `datasync/chunk/` | Partitioned/chunked sync builder (ChunkedSync, DateChunkedSync) — returns `*job.Definition` |
| `datasync/internal/heartbeat/` | Shared heartbeat helpers used by `datasync/activity` and `datasync/chunk` |
| `datasync/payload/` | Temporal payload types (SyncExecutionInput/Output) |
| `datasync/workflow/` | Sync workflow function, registration, scheduling helpers |
| `workflow/otel.go` | Instrumented workflow orchestration wrappers |
Expand Down Expand Up @@ -87,6 +90,7 @@ attribute commits to AI. This applies to ALL commits, including those made by to
| `docs/contributing.md` | Contributing guide |
| `INSTRUCTION.md` | AI context (this file) |
| `README.md` | Human documentation |
| `github.com/jasoet/pkg/v2/temporal/job` | External package — `*job.Definition`, `job.Registry`, `job.ScheduleSpec`. All go-wf builders return `*job.Definition` from this package. |

## Taskfile Commands

Expand Down Expand Up @@ -144,17 +148,17 @@ Multi-layer architecture organized as package-per-feature:
**Container Module (`container/`)** — concrete implementation
- **Activities** wrap `github.com/jasoet/pkg/v2/docker` for container execution
- **Payloads** implement `TaskInput`/`TaskOutput` interfaces with validated structs (`go-playground/validator`)
- **Workflows** register with Temporal workers via `container.RegisterAll(w)`, using generic core for orchestration
- **Builder** provides a fluent API to compose container → pipeline → parallel → DAG; generic builder alternative available
- **Workflows** register with Temporal workers via `container.RegisterAll(w)` (idempotent), using generic core for orchestration
- **Builder** (`container/builder.WorkflowBuilder`) provides a fluent API: `.Name()`, `.Pipeline()/.Parallel()/.Single()`, `.Add()`, `.Build() (*job.Definition, error)`. Default `TaskQueue`: `"container-<name>"`.
- **Templates** (container, script, HTTP) generate payload structs from higher-level config
- **Patterns** are pre-built workflow compositions (CI/CD pipelines, fan-out/fan-in, etc.)

**Function Module (`function/`)** — concrete implementation
- **Registry** maps named Go handler functions (`func(ctx, FunctionInput) (*FunctionOutput, error)`) for dispatch
- **Activity** dispatches to registered handlers via closure over the registry
- **Payloads** implement `TaskInput`/`TaskOutput` interfaces with validated structs
- **Workflows** register with Temporal workers, using generic core for orchestration (pipeline, parallel, loop, DAG)
- **Builder** provides a fluent API to compose function → pipeline → parallel → loop → DAG; generic builder alternative available
- **Workflows** register with Temporal workers via `function.RegisterAll(w, activityFn)` (idempotent), using generic core for orchestration (pipeline, parallel, loop, DAG)
- **Builder** (`function/builder.WorkflowBuilder`) provides a fluent API: `.Name()`, `.TaskQueue()`, `.Activity(fn)`, `.Pipeline()/.Parallel()/.Single()`, `.Build() (*job.Definition, error)`. Default `TaskQueue`: `"function-<name>"`. Loop/parameterized-loop variants available via `LoopBuilder`.
- **Patterns** are pre-built workflow compositions (ETL, fan-out/fan-in, batch processing, CI/CD DAG, etc.)

**DataSync Module (`datasync/`)** — concrete implementation
Expand All @@ -164,8 +168,10 @@ Multi-layer architecture organized as package-per-feature:
- **Runner** provides in-process test execution (no Temporal needed)
- **Activity** runs the Source→Mapper→Sink pipeline with full OTel instrumentation (`go_wf.datasync.*` metrics)
- **Workflow** wraps the activity in a Temporal workflow with scheduling support
- **Builder** provides a fluent API for Job construction
- **Builder** (`datasync/builder.SyncJobBuilder`) provides a fluent API; `Build()` returns `(*job.Definition, error)`
- **Chunk** (`datasync/chunk.ChunkedSync`) partitioned-sync builder for large datasets; walks partitions with cursor-based resume and `ContinueAsNew`; schedule via `.ScheduleEvery()`, `.ScheduleCron()`, or `.ScheduleRaw()`; `Build()` returns `(*job.Definition, error)`
- **Payloads** (`SyncExecutionInput`/`SyncExecutionOutput`) implement `TaskInput`/`TaskOutput` for composition with Pipeline, Parallel, and DAG
- **Internal heartbeat** (`datasync/internal/heartbeat`) shared helpers ensuring consistent heartbeat behavior across `datasync/activity` and `datasync/chunk`

**Observability (`jasoet/pkg/v2/otel`)**
- Activities get full OTel spans + metrics via `Layers.StartService` (container: `go_wf.container.task.*`, function: `go_wf.function.task.*`, datasync: `go_wf.datasync.*`)
Expand Down
152 changes: 99 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,51 @@ Temporal workflow library providing reusable, production-ready workflows for com

- **Container Workflows** - Execute containers (Podman/Docker) with Temporal orchestration
- **Function Workflows** - Execute registered Go functions with Temporal orchestration
- **DataSync Workflows** - Generic Source/Mapper/Sink data synchronization pipelines
- **DataSync Workflows** - Generic Source/Mapper/Sink data synchronization pipelines, including partitioned/chunked sync via `datasync/chunk`
- **Type-Safe Payloads** - Validated input/output structures
- **Unified Builder API** - All builders produce `*job.Definition` (from `github.com/jasoet/pkg/v2/temporal/job`), providing a single type for registration, execution, scheduling, and lifecycle management
- **Production-Ready** - Built-in retries, timeouts, error handling
- **Observable** - Built-in OpenTelemetry instrumentation (traces, logs, metrics) with zero overhead when disabled
- **Comprehensive Testing** - 85%+ coverage with integration tests
- **Full CI/CD** - Automated releases and quality checks

## job.Definition — Unified Workflow Handle

Every builder in go-wf (`container/builder`, `function/builder`, `datasync/builder`, `datasync/chunk`) returns a `*job.Definition` from its `Build()` method. A `*job.Definition` is the single type you need for all per-job operations:

```go
import "github.com/jasoet/pkg/v2/temporal/job"

def, err := container.NewWorkflowBuilder().Name("deploy").Single().Add(myContainer).Build()
if err != nil { log.Fatal(err) }

// Register workflows + activities on a worker (idempotent)
def.Register(w)

// Start a workflow run
run, err := def.Execute(ctx, c, def.NewInput())

// Manage lifecycle
def.Cancel(ctx, c, wfID, runID, "reason")
def.Describe(ctx, c, wfID, runID)
def.ListRuns(ctx, c, opts)

// Schedule management
def.ApplySchedule(ctx, c, opts)
def.PauseSchedule(ctx, c, scheduleID, "reason")
```

A `job.Registry` aggregates multiple Definitions, allowing you to register all jobs at once and dispatch by name:

```go
registry := job.NewRegistry()
registry.Add(ordersJobDef)
registry.Add(usersJobDef)
registry.RegisterAll(w) // registers every Definition on the worker

registry.MustGet("orders").Execute(ctx, c, input)
```

## Documentation

| Guide | Description |
Expand Down Expand Up @@ -49,6 +87,10 @@ Temporal workflows for executing registered Go functions: function registry, pip

Generic data synchronization workflows using a `Source[T] -> Mapper[T,U] -> Sink[U]` pipeline with helpers for record mapping, deduplication, and Temporal scheduling. See [DataSync Workflows](./docs/datasync-workflows.md) for details.

### [datasync/chunk](./datasync/chunk/)

Partitioned sync builder for large datasets. `ChunkedSync` walks a sequence of partitions (e.g., date ranges), runs fetch/map/write per partition with cursor-based resume and `ContinueAsNew` for long partition lists. See [DataSync Workflows](./docs/datasync-workflows.md) for details.

## Observability

Built-in OpenTelemetry instrumentation (traces, logs, metrics) with zero overhead when disabled. See [Observability](./docs/observability.md) for details.
Expand All @@ -71,56 +113,57 @@ import (
"log"

"github.com/jasoet/go-wf/container"
"github.com/jasoet/go-wf/container/builder"
"github.com/jasoet/go-wf/container/payload"
"github.com/jasoet/go-wf/container/workflow"
"github.com/jasoet/pkg/v2/temporal"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// Create Temporal client
c, closer, err := temporal.NewClient(temporal.DefaultConfig())
c, err := temporal.NewClient(temporal.DefaultConfig())
if err != nil {
log.Fatal(err)
}
defer c.Close()
if closer != nil {
defer closer.Close()
}

// Create and start worker
w := worker.New(c, "container-tasks", worker.Options{})
container.RegisterAll(w)
// Build a job Definition (recommended pattern)
def, err := builder.NewWorkflowBuilder().
Name("postgres-setup").
Single().
AddInput(payload.ContainerExecutionInput{
Image: "postgres:16-alpine",
Env: map[string]string{
"POSTGRES_PASSWORD": "test",
},
Ports: []string{"5432:5432"},
AutoRemove: true,
}).
Build()
if err != nil {
log.Fatal(err)
}

go w.Run(nil)
// Register and start worker
w := worker.New(c, def.TaskQueue, worker.Options{})
def.Register(w)
go func() { _ = w.Run(worker.InterruptCh()) }()
defer w.Stop()

// Execute workflow
input := payload.ContainerExecutionInput{
Image: "postgres:16-alpine",
Env: map[string]string{
"POSTGRES_PASSWORD": "test",
},
Ports: []string{"5432:5432"},
AutoRemove: true,
run, err := def.Execute(context.Background(), c, def.NewInput())
if err != nil {
log.Fatal(err)
}

we, _ := c.ExecuteWorkflow(context.Background(),
client.StartWorkflowOptions{
ID: "postgres-setup",
TaskQueue: "container-tasks",
},
workflow.ExecuteContainerWorkflow,
input,
)

var result payload.ContainerExecutionOutput
we.Get(context.Background(), &result)
_ = run.Get(context.Background(), &result)
log.Printf("Container executed: %s", result.ContainerID)
}
```

For callers that register workflows manually, `container.RegisterAll(w)` is also available (idempotent — safe to call multiple times).

### Function Workflow

```go
Expand All @@ -132,57 +175,57 @@ import (

fn "github.com/jasoet/go-wf/function"
fnactivity "github.com/jasoet/go-wf/function/activity"
fnbuilder "github.com/jasoet/go-wf/function/builder"
"github.com/jasoet/go-wf/function/payload"
"github.com/jasoet/go-wf/function/workflow"
"github.com/jasoet/pkg/v2/temporal"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// Create Temporal client
c, closer, err := temporal.NewClient(temporal.DefaultConfig())
c, err := temporal.NewClient(temporal.DefaultConfig())
if err != nil {
log.Fatal(err)
}
defer c.Close()
if closer != nil {
defer closer.Close()
}

// Create function registry and register handlers
// Register a handler
registry := fn.NewRegistry()
registry.Register("greet", func(ctx context.Context, input fn.FunctionInput) (*fn.FunctionOutput, error) {
_ = registry.Register("greet", func(ctx context.Context, input fn.FunctionInput) (*fn.FunctionOutput, error) {
return &fn.FunctionOutput{
Result: map[string]string{"greeting": "Hello, " + input.Args["name"] + "!"},
}, nil
})

// Create and start worker
w := worker.New(c, "function-tasks", worker.Options{})
fn.RegisterWorkflows(w)
fn.RegisterActivity(w, fnactivity.NewExecuteFunctionActivity(registry))
// Build a job Definition
def, err := fnbuilder.NewWorkflowBuilder[*payload.FunctionExecutionInput, payload.FunctionExecutionOutput]().
Name("greet-job").
Single().
Activity(fnactivity.NewExecuteFunctionActivity(registry)).
Build()
if err != nil {
log.Fatal(err)
}

go w.Run(nil)
// Register and start worker
w := worker.New(c, def.TaskQueue, worker.Options{})
def.Register(w)
go func() { _ = w.Run(worker.InterruptCh()) }()
defer w.Stop()

// Execute workflow
input := payload.FunctionExecutionInput{
Name: "greet",
Args: map[string]string{"name": "Temporal"},
}

we, _ := c.ExecuteWorkflow(context.Background(),
client.StartWorkflowOptions{
ID: "greet-example",
TaskQueue: "function-tasks",
run, err := def.Execute(context.Background(), c,
&payload.FunctionExecutionInput{
Name: "greet",
Args: map[string]string{"name": "Temporal"},
},
workflow.ExecuteFunctionWorkflow,
input,
)
if err != nil {
log.Fatal(err)
}

var result payload.FunctionExecutionOutput
we.Get(context.Background(), &result)
_ = run.Get(context.Background(), &result)
log.Printf("Result: %v", result.Result)
}
```
Expand All @@ -209,6 +252,9 @@ go-wf/
├── datasync/ # Generic data sync (Source → Mapper → Sink)
│ ├── activity/ # SyncData activity with OTel
│ ├── builder/ # Fluent builder for Job construction
│ ├── chunk/ # Partitioned/chunked sync builder (ChunkedSync, DateChunkedSync)
│ ├── internal/
│ │ └── heartbeat/ # Shared heartbeat helpers (used by activity and chunk)
│ ├── payload/ # Temporal payload types
│ └── workflow/ # Workflow function and registration
├── examples/container/ # Container examples (see [README](./examples/container/README.md))
Expand Down
34 changes: 31 additions & 3 deletions container/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,48 @@ Temporal workflows for executing Docker containers with advanced orchestration p

## Quick Example

### Builder path (preferred)

```go
def, err := builder.NewWorkflowBuilder().
Name("pg-start").
Single().
AddInput(payload.ContainerExecutionInput{
Image: "postgres:16-alpine",
Env: map[string]string{"POSTGRES_PASSWORD": "test"},
Ports: []string{"5432:5432"},
WaitStrategy: payload.WaitStrategyConfig{
Type: "log",
LogMessage: "ready to accept connections",
},
AutoRemove: true,
}).
Build()

// def is a *job.Definition — register it with a worker and execute it.
w := worker.New(c, def.TaskQueue, worker.Options{})
def.Register(w)
run, err := def.Execute(ctx, c, def.NewInput())
```

See [docs/job-definition.md](../docs/job-definition.md) for the full `*job.Definition` API (lifecycle, scheduling, registry).

### Low-level path

```go
input := container.ContainerExecutionInput{
input := payload.ContainerExecutionInput{
Image: "postgres:16-alpine",
Env: map[string]string{"POSTGRES_PASSWORD": "test"},
Ports: []string{"5432:5432"},
WaitStrategy: container.WaitStrategyConfig{
WaitStrategy: payload.WaitStrategyConfig{
Type: "log",
LogMessage: "ready to accept connections",
},
AutoRemove: true,
}

we, _ := c.ExecuteWorkflow(ctx,
client.StartWorkflowOptions{ID: "pg", TaskQueue: "container-tasks"},
client.StartWorkflowOptions{ID: "pg", TaskQueue: "container-pg-start"},
container.ExecuteContainerWorkflow, input)
```

Expand Down
Loading