Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,109 @@ KDS Java API libraries.
3. Test
> `make test`

## Checkpointer Backends

Go-KCL uses a pluggable `Checkpointer` interface for lease management and progress tracking. You can swap backends via `worker.WithCheckpointer()`.

### DynamoDB (default)

The default backend. No extra setup needed — the worker creates its own DynamoDB client and lease table automatically. The table name defaults to `ApplicationName`.

```go
import (
cfg "github.com/ODudek/go-kcl/clientlibrary/config"
wk "github.com/ODudek/go-kcl/clientlibrary/worker"
)

kclConfig := cfg.NewKinesisClientLibConfig("my-app", "my-stream", "us-east-1", "worker-1")
worker := wk.NewWorker(factory, kclConfig)
```

You can point to a custom DynamoDB endpoint (e.g. LocalStack):

```go
kclConfig.WithDynamoDBEndpoint("http://localhost:4566")
```

Or inject a pre-configured DynamoDB checkpointer:

```go
import chk "github.com/ODudek/go-kcl/clientlibrary/checkpoint"

checkpointer := chk.NewDynamoCheckpoint(kclConfig).WithDynamoDB(dynamoClient)
worker := wk.NewWorker(factory, kclConfig).
WithCheckpointer(checkpointer)
```

### Redis

An alternative backend using Redis for lease management. Useful when you want lower latency, reduced AWS costs, or already run Redis in your infrastructure. Atomic lease operations are implemented via Lua scripts (equivalent to DynamoDB conditional writes).

```go
import (
cfg "github.com/ODudek/go-kcl/clientlibrary/config"
redischk "github.com/ODudek/go-kcl/clientlibrary/checkpoint/redis"
wk "github.com/ODudek/go-kcl/clientlibrary/worker"
)

kclConfig := cfg.NewKinesisClientLibConfig("my-app", "my-stream", "us-east-1", "worker-1")

checkpointer := redischk.NewRedisCheckpoint(kclConfig, redischk.RedisConfig{
Address: "localhost:6379", // host:port (required)
Password: os.Getenv("REDIS_PWD"), // optional
DB: 0, // database number 0-15
TLS: false, // enable TLS
KeyPrefix: "kcl", // key prefix (default: "kcl")
})

worker := wk.NewWorker(factory, kclConfig).
WithCheckpointer(checkpointer)
```

#### Configuration

| Field | Default | Description |
|---|---|---|
| `Address` | *(required)* | `host:port` or URL (`redis://`, `rediss://`) |
| `Password` | `""` | AUTH password (overrides URL password if set) |
| `DB` | `0` | Database number 0-15 (overrides URL db if set) |
| `KeyPrefix` | `"kcl"` | Prefix for all Redis keys |
| `TLS` | `false` | Enable TLS (min TLS 1.2). Auto-enabled by `rediss://` scheme |

#### Multi-tenancy

Multiple go-kcl applications can safely share a single Redis instance. All keys are namespaced using the application's `TableName` (which defaults to `ApplicationName`):

```
kcl:{tableName}:shard:{shardID} — per-shard lease hash
kcl:{tableName}:shards — shard registry set
```

For example, two apps `orders` and `events` produce completely isolated keys:

```
kcl:orders:shard:shardId-000000000001
kcl:events:shard:shardId-000000000001
```

#### Features

- Atomic lease acquisition and renewal via Lua scripts
- Conditional lease owner removal (prevents accidental overwrite)
- Lease stealing support (same as DynamoDB backend)
- Sub-millisecond latency for all operations
- All `Checkpointer` interface methods supported

## Examples

Working examples are available in the [`examples/`](examples/) directory:

| Example | Backend | Description |
|---|---|---|
| [`dynamodb-consumer`](examples/dynamodb-consumer/) | DynamoDB | Basic Kinesis consumer with default DynamoDB checkpointer |
| [`redis-consumer`](examples/redis-consumer/) | Redis | Basic Kinesis consumer with Redis checkpointer |
| [`redis-multitenant`](examples/redis-multitenant/) | Redis | Two applications sharing one Redis instance |

## Documentation

Go-KCL matches exactly the same interface and programming model from original Amazon KCL, the best place for getting reference, tutorial is from Amazon itself:
Expand Down
7 changes: 5 additions & 2 deletions clientlibrary/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ const (
)

type ErrLeaseNotAcquired struct {
cause string
Cause string
}

func (e ErrLeaseNotAcquired) Error() string {
return fmt.Sprintf("lease not acquired: %s", e.cause)
return fmt.Sprintf("lease not acquired: %s", e.Cause)
}

// Checkpointer handles checkpointing when a record has been processed
Expand Down Expand Up @@ -94,3 +94,6 @@ var ErrSequenceIDNotFound = errors.New("SequenceIDNotFoundForShard")

// ErrShardNotAssigned is returned by ListActiveWorkers when no AssignedTo is found
var ErrShardNotAssigned = errors.New("AssignedToNotFoundForShard")

// ErrNoLeaseOwner is returned by GetLeaseOwner when no lease owner exists for the shard
var ErrNoLeaseOwner = errors.New("no LeaseOwner in checkpoints table")
4 changes: 0 additions & 4 deletions clientlibrary/checkpoint/dynamodb-checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ const (
NumMaxRetries = 10
)

var (
ErrNoLeaseOwner = errors.New("no LeaseOwner in checkpoints table")
)

// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct {
log logger.Logger
Expand Down
Loading