Skip to content

Anshul439/Orqestra

Repository files navigation

Orqestra

A distributed job orchestrator built in Go. The server coordinates job assignment across distributed worker processes via bidirectional gRPC streaming, with Redis-backed queuing and Postgres persistence.

Features

  • gRPC API for job submission, inspection, listing, and cancellation
  • CLI client (cmd/cli) for submitting, inspecting, listing, and cancelling jobs
  • Distributed workers (cmd/worker) — separate processes communicating with the server via bidirectional gRPC streams
  • Real command execution — workers run shell commands via exec.Command
  • Sequential workflow engine — chain multiple commands into named workflows
  • Exponential backoff retry with configurable max retries
  • Delayed job scheduling via Redis sorted sets
  • Crash recovery — jobs interrupted by worker or server failures are automatically re-queued on restart
  • Job cancellation — cancel pending or running jobs via CLI
  • Redis-backed queue with reliable delivery (BRPOPLPUSH pattern)
  • Transactional outbox pattern — job submissions are crash-safe; a background relay ensures jobs are eventually delivered to Redis
  • Postgres-backed job persistence
  • Docker Compose setup — one command to run the full stack
  • Structured logging (log/slog)
  • Graceful shutdown

Architecture

cmd/cli  ──gRPC──►  cmd/server  ◄──gRPC stream──  cmd/worker
                        ├── internal/server     (gRPC handlers + workflow engine)
                        ├── internal/workflow   (workflow registry)
                        ├── internal/outbox     (relay: Postgres → Redis)
                        ├── internal/queue      (Redis queue)
                        └── internal/db         (Postgres)

The server and workers are separate processes. Workers connect to the server over a persistent bidirectional stream, receive job assignments, execute them, and report results back.

Reliability

Postgres is the source of truth. Jobs are written to Postgres and an outbox table atomically, then asynchronously relayed to Redis by a background goroutine. This prevents jobs from being lost if the server crashes between the database write and the queue enqueue.

Requirements

  • Go 1.26+
  • PostgreSQL
  • Redis
  • Task
  • golang-migrate
  • protoc + Go plugins (only needed to regenerate proto)

Setup

The recommended way to run the full stack is via Docker (see Docker below).

For local development without Docker:

# Create database
createdb -U postgres orchestrator

# Copy env config
cp .env.example .env

# Apply migrations
task migrate:up

Configuration

Variable Default Description
DB_URL Postgres connection string
WORKER_COUNT Number of concurrent workers
REDIS_ADDR localhost:6379 Redis address
REDIS_PASSWORD Redis password (leave blank if none)
REDIS_DB 0 Redis DB index
REDIS_QUEUE_NAME jobs Redis key prefix for queues
GRPC_ADDR :50051 gRPC server listen address

Docker

The easiest way to run the full stack. No local Postgres or Redis needed.

# Build images and start everything (postgres, redis, server, worker)
task docker:up

# Follow logs
task docker:logs

# Stop and remove containers
task docker:down

The server's gRPC port is exposed on localhost:50051, so you can still use the CLI from your host machine:

go run ./cmd/cli submit --type=shell --payload='{"command":"echo hello"}'
go run ./cmd/cli list

Postgres and Redis are only accessible within the Docker network. To inspect them directly:

docker compose exec postgres psql -U postgres orchestrator
docker compose exec redis redis-cli

Migrations

Create a new migration:

task migrate:create NAME=<name>

Apply all pending migrations:

task migrate:up

Roll back the most recent migration:

task migrate:down

Check the current migration version:

task migrate:version

Running

The server and workers are separate processes. Workers execute shell commands on the machine they run on. Start them in separate terminals:

# Terminal 1 — start the gRPC server
task server

# Terminal 2 — start the workers (connects to server via gRPC stream)
task worker

WORKER_COUNT in .env controls how many concurrent worker goroutines the worker process spawns.

For hot reload during development:

task dev:server   # auto-restarts server on file change
task dev:worker   # auto-restarts worker on file change

Testing

The test suite covers three tiers. Integration and E2E tests require a local Postgres and Redis instance.

# Run the full suite (sequential — integration tests share a DB)
task test

# Unit tests only — no Postgres or Redis required
task test:unit

# DB integration tests only
task test:integration

# End-to-end tests only
task test:e2e
Layer Location What's tested
Unit internal/workflow, internal/server YAML parsing, registry, exponential backoff
DB integration internal/db Job CRUD, outbox lifecycle, crash recovery (ResetRunningJobs), workflow step dispatch
Relay internal/outbox Happy path, Redis failure (entry stays unprocessed), skip already-processed
E2E e2e/ Job lifecycle (submit → relay → worker → completed), workflow abort on step failure, retry exhaustion

Unit tests always run offline. Integration and E2E tests require local Postgres and Redis and are skipped automatically if the dependencies are unavailable.

CLI Usage

# Start the server
task server

# Submit a shell command job
go run ./cmd/cli submit --type=shell --payload='{"command":"echo hello world"}'

# Submit with retries
go run ./cmd/cli submit --type=shell --payload='{"command":"go test ./..."}' --retries=3

# Check job status
go run ./cmd/cli status <job-id>

# List all jobs
go run ./cmd/cli list

# List jobs filtered by status
go run ./cmd/cli list --status=pending
go run ./cmd/cli list --status=running
go run ./cmd/cli list --status=completed
go run ./cmd/cli list --status=failed

# Cancel a job
go run ./cmd/cli cancel <job-id>

Workflows

Workflows are named sequences of shell commands executed in order. If any step fails, the workflow stops and is marked failed.

Drop .yaml files into the workflows/ directory and restart the server — they are loaded automatically.

# List loaded workflows
go run ./cmd/cli workflow list

# Trigger a workflow by name
go run ./cmd/cli workflow trigger <name>

# Check run status
go run ./cmd/cli workflow status <run-id>

Workflow file format:

name: ci
steps:
  - command: go test ./...
  - command: go build ./...

Example use cases: CI pipelines, database backups, Docker deployments, scheduled maintenance tasks, sequential data processing.

Example workflows are included in workflows/. Edit or delete them and add your own.

Using Docker? The workflows/ directory is volume-mounted into the server container. Add a workflow file and run docker compose restart server — no rebuild needed.

The CLI reads GRPC_ADDR too. If it is set to a listen-style value like :50051, the CLI treats it as localhost:50051.

Regenerating Proto

If you modify proto/orchestrator.proto, regenerate Go code with:

task proto

Requires protoc-gen-go and protoc-gen-go-grpc in your $PATH:

go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

About

Distributed job orchestrator with retries, persistence, and crash recovery

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors