Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.3.0] - 2026-02-11

### Added

- Functional options pattern for Prometheus `MonitoringService` (`NewMonitoringServiceWithOptions`)
- `WithRegistry` option to use an external `*prometheus.Registry` instead of the global default
- `WithRegisterer` option for lower-level registerer injection
- `WithListenAddress`, `WithRegion`, `WithLogger` options
- Graceful HTTP server shutdown in `MonitoringService.Shutdown()`
- HTTP server timeouts (read, idle) for Slowloris protection
- Tolerance for duplicate metric registration (`AlreadyRegisteredError`)
- Prometheus metrics example (`examples/prometheus-metrics/`)

## [0.2.1] - 2026-02-11

### Fixed
Expand Down
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,53 @@ kcl:events:shard:shardId-000000000001
- Sub-millisecond latency for all operations
- All `Checkpointer` interface methods supported

## Prometheus Metrics

Go-KCL ships with a Prometheus `MonitoringService` that exposes consumer metrics (records processed, bytes processed, millis behind latest, leases held, lease renewals, get/process records duration).

### Standalone mode (default)

KCL registers metrics on the global Prometheus registry and starts its own HTTP server:

```go
import prommetrics "github.com/ODudek/go-kcl/clientlibrary/metrics/prometheus"

metricsService := prommetrics.NewMonitoringService(":2112", "us-east-1", log)
kclConfig.WithMonitoringService(metricsService)
```

### External registry

When your application already exposes a Prometheus `/metrics` endpoint, pass your own registry. KCL will register its collectors there and will **not** start a second HTTP server:

```go
import (
prom "github.com/prometheus/client_golang/prometheus"
prommetrics "github.com/ODudek/go-kcl/clientlibrary/metrics/prometheus"
)

registry := prom.NewRegistry()

metricsService := prommetrics.NewMonitoringServiceWithOptions(
prommetrics.WithRegistry(registry),
prommetrics.WithRegion("us-east-1"),
prommetrics.WithLogger(log),
)
kclConfig.WithMonitoringService(metricsService)

// Expose `registry` through your own HTTP handler.
```

### Available options

| Option | Description |
|---|---|
| `WithListenAddress(addr)` | Address for the standalone metrics server (default `:8080`) |
| `WithRegion(region)` | AWS region label |
| `WithLogger(l)` | Custom logger (defaults to Logrus standard logger) |
| `WithRegistry(reg)` | Use a custom `*prometheus.Registry`; disables the built-in server |
| `WithRegisterer(r)` | Use a custom `prometheus.Registerer`; disables the built-in server |

## Examples

Working examples are available in the [`examples/`](examples/) directory:
Expand All @@ -133,6 +180,7 @@ Working examples are available in the [`examples/`](examples/) directory:
| [`dynamodb-consumer`](examples/dynamodb-consumer/) | DynamoDB | Basic Kinesis consumer with default DynamoDB checkpointer |
| [`redis-consumer`](examples/redis-consumer/) | Redis | Basic Kinesis consumer with Redis checkpointer |
| [`redis-multitenant`](examples/redis-multitenant/) | Redis | Two applications sharing one Redis instance |
| [`prometheus-metrics`](examples/prometheus-metrics/) | Prometheus | Consumer with external Prometheus registry |

## Documentation

Expand Down
61 changes: 36 additions & 25 deletions clientlibrary/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

// Package checkpoint
// The implementation is derived from https://github.com/patrobinson/gokini
//
// Copyright 2018 Patrick robinson.
// Package checkpoint provides interfaces and implementations for managing Kinesis
// shard leases and checkpoint tracking across distributed workers.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
// The Checkpointer interface abstracts the underlying storage backend (DynamoDB, Redis)
// for persisting shard lease ownership, sequence number checkpoints, and lease
// claim requests. Implementations provide atomic conditional updates for consistency
// in a multi-worker environment.
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// The implementation is derived from https://github.com/patrobinson/gokini
package checkpoint

import (
Expand All @@ -37,20 +36,30 @@ import (
)

const (
LeaseKeyKey = "ShardID"
LeaseOwnerKey = "AssignedTo"
LeaseTimeoutKey = "LeaseTimeout"
// LeaseKeyKey is the field name for the shard ID (primary key).
LeaseKeyKey = "ShardID"
// LeaseOwnerKey is the field name for the worker that owns the lease.
LeaseOwnerKey = "AssignedTo"
// LeaseTimeoutKey is the field name for the lease expiration time.
LeaseTimeoutKey = "LeaseTimeout"
// SequenceNumberKey is the field name for the last checkpointed sequence number.
SequenceNumberKey = "Checkpoint"
ParentShardIdKey = "ParentShardId"
ClaimRequestKey = "ClaimRequest"
// ParentShardIdKey is the field name for the parent shard ID (resharding).
ParentShardIdKey = "ParentShardId"
// ClaimRequestKey is the field name for the worker claiming the shard (lease stealing).
ClaimRequestKey = "ClaimRequest"

// ShardEnd We've completely processed all records in this shard.
// ShardEnd is the sentinel checkpoint value indicating a shard has been completely
// processed and all records delivered.
ShardEnd = "SHARD_END"

// ErrShardClaimed is returned when shard is claimed
// ErrShardClaimed is the error message returned when a lease acquisition fails
// because another worker has an active claim on the shard.
ErrShardClaimed = "shard is already claimed by another node"
)

// ErrLeaseNotAcquired is returned when a worker cannot acquire a lease on a shard,
// typically because another worker holds an active lease or a claim request is in progress.
type ErrLeaseNotAcquired struct {
Cause string
}
Expand All @@ -59,33 +68,35 @@ func (e ErrLeaseNotAcquired) Error() string {
return fmt.Sprintf("lease not acquired: %s", e.Cause)
}

// Checkpointer handles checkpointing when a record has been processed
// Checkpointer manages shard lease acquisition, renewal, checkpointing, and lease stealing.
// Implementations must provide atomic conditional updates to ensure consistency
// across multiple concurrent workers.
type Checkpointer interface {
// Init initialises the Checkpoint
// Init establishes a connection to the backend store and creates the lease table if needed.
Init() error

// GetLease attempts to gain a lock on the given shard
// GetLease attempts to acquire or renew a lease on the given shard for the specified worker.
GetLease(*par.ShardStatus, string) error

// CheckpointSequence writes a checkpoint at the designated sequence ID
// CheckpointSequence persists the current checkpoint sequence number for the shard.
CheckpointSequence(*par.ShardStatus) error

// FetchCheckpoint retrieves the checkpoint for the given shard
// FetchCheckpoint retrieves the stored checkpoint, lease owner, and lease timeout for the shard.
FetchCheckpoint(*par.ShardStatus) error

// RemoveLeaseInfo to remove lease info for shard entry because the shard no longer exists
// RemoveLeaseInfo removes all lease data for a shard that no longer exists in Kinesis.
RemoveLeaseInfo(string) error

// RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment
// RemoveLeaseOwner clears the lease owner for a shard, making it available for reassignment.
RemoveLeaseOwner(string) error

// GetLeaseOwner to get current owner of lease for shard
// GetLeaseOwner returns the current lease owner for the specified shard.
GetLeaseOwner(string) (string, error)

// ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods)
// ListActiveWorkers returns a map of worker IDs to their assigned shards (used for rebalancing).
ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)

// ClaimShard claims a shard for stealing
// ClaimShard places a claim request on a shard to signal a steal attempt.
ClaimShard(*par.ShardStatus, string) error
}

Expand Down
22 changes: 9 additions & 13 deletions clientlibrary/checkpoint/dynamodb-checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

// Package checkpoint
// The implementation is derived from https://github.com/patrobinson/gokini
//
// Copyright 2018 Patrick robinson.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
package checkpoint

import (
Expand All @@ -51,7 +41,9 @@ const (
NumMaxRetries = 10
)

// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
// DynamoCheckpoint implements the Checkpointer interface using AWS DynamoDB as the backend.
// It manages lease acquisition via conditional PutItem operations, checkpoint persistence,
// and optional lease stealing via claim requests.
type DynamoCheckpoint struct {
log logger.Logger
TableName string
Expand All @@ -65,6 +57,8 @@ type DynamoCheckpoint struct {
lastLeaseSync time.Time
}

// NewDynamoCheckpoint creates a DynamoDB-backed Checkpointer from the given configuration.
// Call Init to establish the connection and create the lease table if needed.
func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *DynamoCheckpoint {
checkpointer := &DynamoCheckpoint{
log: kclConfig.Logger,
Expand All @@ -79,13 +73,15 @@ func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *Dynam
return checkpointer
}

// WithDynamoDB is used to provide DynamoDB service
// WithDynamoDB sets a custom DynamoDB client (useful for testing with mocks).
// Returns the receiver for method chaining.
func (checkpointer *DynamoCheckpoint) WithDynamoDB(svc DynamoDBAPI) *DynamoCheckpoint {
checkpointer.svc = svc
return checkpointer
}

// Init initialises the DynamoDB Checkpoint
// Init establishes a connection to DynamoDB and ensures the lease table exists.
// If a custom DynamoDB service was set via WithDynamoDB, that service is used.
func (checkpointer *DynamoCheckpoint) Init() error {
checkpointer.log.Infof("Creating DynamoDB session")

Expand Down
24 changes: 7 additions & 17 deletions clientlibrary/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,11 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

// Package config
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
/*
* Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

// Package config provides configuration types and builder methods for the Kinesis Client Library.
//
// The main type is KinesisClientLibConfiguration, which holds all settings for a KCL Worker.
// Use one of the NewKinesisClientLibConfig* constructors to create a configuration with defaults,
// then customize with the fluent WithXxx methods.
package config

import (
Expand Down Expand Up @@ -307,6 +295,8 @@ var positionMap = map[InitialPositionInStream]*string{
AT_TIMESTAMP: aws.String("AT_TIMESTAMP"),
}

// InitalPositionInStreamToShardIteratorType converts an InitialPositionInStream value
// to the corresponding Kinesis ShardIteratorType string ("LATEST", "TRIM_HORIZON", or "AT_TIMESTAMP").
func InitalPositionInStreamToShardIteratorType(pos InitialPositionInStream) *string {
return positionMap[pos]
}
Expand Down
Loading