From 1a2acc90b45af89001e51b88813906e3371e1bd2 Mon Sep 17 00:00:00 2001 From: Aleksander Date: Wed, 11 Feb 2026 16:07:17 +0100 Subject: [PATCH 1/3] Add Prometheus metrics service with external registry support - Implement functional options for Prometheus MonitoringService - Support custom registry/registerer to avoid starting a second HTTP server - Document Prometheus integration and usage in README - Add example: consumer with external Prometheus registry - Add comprehensive unit tests for Prometheus metrics service - Update go.mod to require github.com/prometheus/client_model v0.2.0 --- README.md | 48 +++ clientlibrary/metrics/prometheus/options.go | 77 ++++ .../metrics/prometheus/prometheus.go | 97 ++++- .../metrics/prometheus/prometheus_test.go | 336 ++++++++++++++++++ examples/prometheus-metrics/main.go | 123 +++++++ go.mod | 2 +- 6 files changed, 666 insertions(+), 17 deletions(-) create mode 100644 clientlibrary/metrics/prometheus/options.go create mode 100644 clientlibrary/metrics/prometheus/prometheus_test.go create mode 100644 examples/prometheus-metrics/main.go diff --git a/README.md b/README.md index 2767757..b4fc507 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,53 @@ kcl:events:shard:shardId-000000000001 - Sub-millisecond latency for all operations - All `Checkpointer` interface methods supported +## Prometheus Metrics + +Go-KCL ships with a Prometheus `MonitoringService` that exposes consumer metrics (records processed, bytes processed, millis behind latest, leases held, lease renewals, get/process records duration). + +### Standalone mode (default) + +KCL registers metrics on the global Prometheus registry and starts its own HTTP server: + +```go +import prommetrics "github.com/ODudek/go-kcl/clientlibrary/metrics/prometheus" + +metricsService := prommetrics.NewMonitoringService(":2112", "us-east-1", log) +kclConfig.WithMonitoringService(metricsService) +``` + +### External registry + +When your application already exposes a Prometheus `/metrics` endpoint, pass your own registry. KCL will register its collectors there and will **not** start a second HTTP server: + +```go +import ( + prom "github.com/prometheus/client_golang/prometheus" + prommetrics "github.com/ODudek/go-kcl/clientlibrary/metrics/prometheus" +) + +registry := prom.NewRegistry() + +metricsService := prommetrics.NewMonitoringServiceWithOptions( + prommetrics.WithRegistry(registry), + prommetrics.WithRegion("us-east-1"), + prommetrics.WithLogger(log), +) +kclConfig.WithMonitoringService(metricsService) + +// Expose `registry` through your own HTTP handler. +``` + +### Available options + +| Option | Description | +|---|---| +| `WithListenAddress(addr)` | Address for the standalone metrics server (default `:8080`) | +| `WithRegion(region)` | AWS region label | +| `WithLogger(l)` | Custom logger (defaults to Logrus standard logger) | +| `WithRegistry(reg)` | Use a custom `*prometheus.Registry`; disables the built-in server | +| `WithRegisterer(r)` | Use a custom `prometheus.Registerer`; disables the built-in server | + ## Examples Working examples are available in the [`examples/`](examples/) directory: @@ -133,6 +180,7 @@ Working examples are available in the [`examples/`](examples/) directory: | [`dynamodb-consumer`](examples/dynamodb-consumer/) | DynamoDB | Basic Kinesis consumer with default DynamoDB checkpointer | | [`redis-consumer`](examples/redis-consumer/) | Redis | Basic Kinesis consumer with Redis checkpointer | | [`redis-multitenant`](examples/redis-multitenant/) | Redis | Two applications sharing one Redis instance | +| [`prometheus-metrics`](examples/prometheus-metrics/) | Prometheus | Consumer with external Prometheus registry | ## Documentation diff --git a/clientlibrary/metrics/prometheus/options.go b/clientlibrary/metrics/prometheus/options.go new file mode 100644 index 0000000..f763601 --- /dev/null +++ b/clientlibrary/metrics/prometheus/options.go @@ -0,0 +1,77 @@ +package prometheus + +import ( + prom "github.com/prometheus/client_golang/prometheus" + + "github.com/ODudek/go-kcl/logger" +) + +// Option configures MonitoringService via the functional options pattern. +type Option func(*config) + +type config struct { + listenAddress string + region string + logger logger.Logger + registerer prom.Registerer + gatherer prom.Gatherer + startServer bool +} + +func defaultConfig() config { + return config{ + listenAddress: ":8080", + logger: logger.GetDefaultLogger(), + registerer: prom.DefaultRegisterer, + gatherer: prom.DefaultGatherer, + startServer: true, + } +} + +// WithListenAddress sets the address for the standalone metrics HTTP server. +func WithListenAddress(addr string) Option { + return func(c *config) { + c.listenAddress = addr + } +} + +// WithRegion sets the AWS region label. +func WithRegion(region string) Option { + return func(c *config) { + c.region = region + } +} + +// WithLogger sets a custom logger. +func WithLogger(l logger.Logger) Option { + return func(c *config) { + c.logger = l + } +} + +// WithRegistry configures the service to use the given registry instead of +// the global default. When set, no standalone HTTP server is started — +// the caller is responsible for exposing the registry. +func WithRegistry(reg *prom.Registry) Option { + return func(c *config) { + if reg == nil { + return + } + c.registerer = reg + c.gatherer = reg + c.startServer = false + } +} + +// WithRegisterer allows passing a lower-level prom.Registerer (e.g. a +// wrapped or prefixed registerer). When used alone (without WithRegistry) +// the gatherer stays at the default and no server is started. +func WithRegisterer(r prom.Registerer) Option { + return func(c *config) { + if r == nil { + return + } + c.registerer = r + c.startServer = false + } +} diff --git a/clientlibrary/metrics/prometheus/prometheus.go b/clientlibrary/metrics/prometheus/prometheus.go index b66f378..00418f5 100644 --- a/clientlibrary/metrics/prometheus/prometheus.go +++ b/clientlibrary/metrics/prometheus/prometheus.go @@ -30,7 +30,11 @@ package prometheus import ( + "context" + "errors" + "fmt" "net/http" + "time" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -38,8 +42,14 @@ import ( "github.com/ODudek/go-kcl/logger" ) -// MonitoringService publishes kcl metrics to Prometheus. -// It might be trick if the service onboarding with KCL already uses Prometheus. +// MonitoringService publishes KCL metrics to Prometheus. +// +// Two modes of operation: +// - Standalone (default / backward-compatible): registers metrics on the +// global registry and starts its own HTTP server. +// - External registry: the caller provides a *prom.Registry (or +// prom.Registerer); no HTTP server is started and the caller is +// responsible for exposing metrics. type MonitoringService struct { listenAddress string namespace string @@ -48,6 +58,11 @@ type MonitoringService struct { region string logger logger.Logger + registerer prom.Registerer + gatherer prom.Gatherer + startServer bool + server *http.Server + processedRecords *prom.CounterVec processedBytes *prom.CounterVec behindLatestMillis *prom.GaugeVec @@ -57,12 +72,35 @@ type MonitoringService struct { processRecordsTime *prom.HistogramVec } -// NewMonitoringService returns a Monitoring service publishing metrics to Prometheus. -func NewMonitoringService(listenAddress, region string, logger logger.Logger) *MonitoringService { +// NewMonitoringService returns a MonitoringService that registers metrics on +// the global Prometheus registry and starts its own HTTP server on +// listenAddress. This preserves the original constructor signature for +// backward compatibility. +func NewMonitoringService(listenAddress, region string, log logger.Logger) *MonitoringService { + return NewMonitoringServiceWithOptions( + WithListenAddress(listenAddress), + WithRegion(region), + WithLogger(log), + ) +} + +// NewMonitoringServiceWithOptions creates a MonitoringService configured via +// functional options. When no WithRegistry / WithRegisterer option is +// supplied the service behaves identically to NewMonitoringService (global +// registry, own HTTP server). +func NewMonitoringServiceWithOptions(opts ...Option) *MonitoringService { + cfg := defaultConfig() + for _, o := range opts { + o(&cfg) + } + return &MonitoringService{ - listenAddress: listenAddress, - region: region, - logger: logger, + listenAddress: cfg.listenAddress, + region: cfg.region, + logger: cfg.logger, + registerer: cfg.registerer, + gatherer: cfg.gatherer, + startServer: cfg.startServer, } } @@ -100,7 +138,7 @@ func (p *MonitoringService) Init(appName, streamName, workerID string) error { Help: "The time taken to process records", }, []string{"kinesisStream", "shard"}) - metrics := []prom.Collector{ + collectors := []prom.Collector{ p.processedBytes, p.processedRecords, p.behindLatestMillis, @@ -109,10 +147,13 @@ func (p *MonitoringService) Init(appName, streamName, workerID string) error { p.getRecordsTime, p.processRecordsTime, } - for _, metric := range metrics { - err := prom.Register(metric) - if err != nil { - return err + for _, c := range collectors { + if err := p.registerer.Register(c); err != nil { + var are prom.AlreadyRegisteredError + if errors.As(err, &are) { + continue + } + return fmt.Errorf("registering collector: %w", err) } } @@ -120,11 +161,24 @@ func (p *MonitoringService) Init(appName, streamName, workerID string) error { } func (p *MonitoringService) Start() error { - http.Handle("/metrics", promhttp.Handler()) + if !p.startServer { + return nil + } + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(p.gatherer, promhttp.HandlerOpts{})) + + p.server = &http.Server{ + Addr: p.listenAddress, + Handler: mux, + ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + IdleTimeout: 120 * time.Second, + } + go func() { p.logger.Infof("Starting Prometheus listener on %s", p.listenAddress) - err := http.ListenAndServe(p.listenAddress, nil) - if err != nil { + if err := p.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { p.logger.Errorf("Error starting Prometheus metrics endpoint. %+v", err) } p.logger.Infof("Stopped metrics server") @@ -133,7 +187,18 @@ func (p *MonitoringService) Start() error { return nil } -func (p *MonitoringService) Shutdown() {} +func (p *MonitoringService) Shutdown() { + if p.server == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := p.server.Shutdown(ctx); err != nil { + p.logger.Errorf("Error shutting down Prometheus metrics server: %+v", err) + } +} func (p *MonitoringService) IncrRecordsProcessed(shard string, count int) { p.processedRecords.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Add(float64(count)) diff --git a/clientlibrary/metrics/prometheus/prometheus_test.go b/clientlibrary/metrics/prometheus/prometheus_test.go new file mode 100644 index 0000000..9c39884 --- /dev/null +++ b/clientlibrary/metrics/prometheus/prometheus_test.go @@ -0,0 +1,336 @@ +package prometheus + +import ( + "fmt" + "net" + "net/http" + "testing" + "time" + + prom "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ODudek/go-kcl/logger" +) + +func newTestLogger() logger.Logger { + return logger.GetDefaultLogger() +} + +func freePort(t *testing.T) string { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + addr := l.Addr().String() + require.NoError(t, l.Close()) + return addr +} + +func TestNewMonitoringService_BackwardCompat(t *testing.T) { + svc := NewMonitoringService(":0", "us-east-1", newTestLogger()) + + assert.NotNil(t, svc) + assert.Equal(t, ":0", svc.listenAddress) + assert.Equal(t, "us-east-1", svc.region) + assert.True(t, svc.startServer) + assert.Equal(t, prom.DefaultRegisterer, svc.registerer) + assert.Equal(t, prom.DefaultGatherer, svc.gatherer) +} + +func TestNewMonitoringServiceWithOptions_Standalone(t *testing.T) { + log := newTestLogger() + svc := NewMonitoringServiceWithOptions( + WithListenAddress(":9090"), + WithRegion("eu-west-1"), + WithLogger(log), + ) + + assert.NotNil(t, svc) + assert.Equal(t, ":9090", svc.listenAddress) + assert.Equal(t, "eu-west-1", svc.region) + assert.True(t, svc.startServer) +} + +func TestNewMonitoringServiceWithOptions_ExternalRegistry(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-west-2"), + WithLogger(newTestLogger()), + ) + + assert.NotNil(t, svc) + assert.False(t, svc.startServer) + assert.Equal(t, prom.Registerer(reg), svc.registerer) + assert.Equal(t, prom.Gatherer(reg), svc.gatherer) +} + +func TestNewMonitoringServiceWithOptions_WithRegisterer(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegisterer(reg), + WithRegion("ap-southeast-1"), + WithLogger(newTestLogger()), + ) + + assert.NotNil(t, svc) + assert.False(t, svc.startServer) + assert.Equal(t, prom.Registerer(reg), svc.registerer) +} + +func TestWithRegistry_NilIgnored(t *testing.T) { + svc := NewMonitoringServiceWithOptions( + WithRegistry(nil), + WithLogger(newTestLogger()), + ) + + assert.Equal(t, prom.DefaultRegisterer, svc.registerer, "nil registry should be ignored") + assert.True(t, svc.startServer, "startServer should remain true when nil registry ignored") +} + +func TestDefaultLogger_WhenOmitted(t *testing.T) { + svc := NewMonitoringServiceWithOptions( + WithListenAddress(":9090"), + ) + + assert.NotNil(t, svc.logger, "default logger should be set when WithLogger is not used") +} + +func TestInit_ExternalRegistry(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-east-1"), + WithLogger(newTestLogger()), + ) + + err := svc.Init("testapp", "my-stream", "worker-1") + require.NoError(t, err) + + // Touch every metric so that Gather() returns them (Vec collectors + // only appear after at least one label set is observed). + svc.IncrRecordsProcessed("shard-0", 1) + svc.IncrBytesProcessed("shard-0", 1) + svc.MillisBehindLatest("shard-0", 0) + svc.LeaseGained("shard-0") + svc.LeaseRenewed("shard-0") + svc.RecordGetRecordsTime("shard-0", 0) + svc.RecordProcessRecordsTime("shard-0", 0) + + families, err := reg.Gather() + require.NoError(t, err) + + names := metricFamilyNames(families) + assert.Contains(t, names, "testapp_processed_bytes") + assert.Contains(t, names, "testapp_processed_records") + assert.Contains(t, names, "testapp_behind_latest_millis") + assert.Contains(t, names, "testapp_leases_held") + assert.Contains(t, names, "testapp_lease_renewals") + assert.Contains(t, names, "testapp_get_records_duration_milliseconds") + assert.Contains(t, names, "testapp_process_records_duration_milliseconds") +} + +func TestInit_DuplicateRegistration_Tolerated(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-east-1"), + WithLogger(newTestLogger()), + ) + + require.NoError(t, svc.Init("dupapp", "stream", "w1")) + + svc2 := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-east-1"), + WithLogger(newTestLogger()), + ) + err := svc2.Init("dupapp", "stream", "w2") + assert.NoError(t, err, "re-registration with identical collectors should be tolerated") +} + +func TestMetricRecording_ExternalRegistry(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-east-1"), + WithLogger(newTestLogger()), + ) + require.NoError(t, svc.Init("rectest", "stream-1", "worker-1")) + + svc.IncrRecordsProcessed("shard-0", 5) + svc.IncrRecordsProcessed("shard-0", 3) + svc.IncrBytesProcessed("shard-0", 1024) + svc.MillisBehindLatest("shard-0", 42.5) + svc.LeaseGained("shard-0") + svc.LeaseRenewed("shard-0") + svc.RecordGetRecordsTime("shard-0", 150.0) + svc.RecordProcessRecordsTime("shard-0", 75.0) + + families, err := reg.Gather() + require.NoError(t, err) + byName := indexFamilies(families) + + assertCounterValue(t, byName, "rectest_processed_records", 8) + assertCounterValue(t, byName, "rectest_processed_bytes", 1024) + assertGaugeValue(t, byName, "rectest_behind_latest_millis", 42.5) + assertGaugeValue(t, byName, "rectest_leases_held", 1) + assertCounterValue(t, byName, "rectest_lease_renewals", 1) +} + +func TestMetricRecording_LeaseLost(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-east-1"), + WithLogger(newTestLogger()), + ) + require.NoError(t, svc.Init("leasetest", "stream-1", "worker-1")) + + svc.LeaseGained("shard-0") + svc.LeaseGained("shard-0") + svc.LeaseLost("shard-0") + + families, err := reg.Gather() + require.NoError(t, err) + byName := indexFamilies(families) + + assertGaugeValue(t, byName, "leasetest_leases_held", 1) +} + +func TestDeleteMetricMillisBehindLatest(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-east-1"), + WithLogger(newTestLogger()), + ) + require.NoError(t, svc.Init("deltest", "stream-1", "worker-1")) + + svc.MillisBehindLatest("shard-0", 100) + svc.DeleteMetricMillisBehindLatest("shard-0") + + families, err := reg.Gather() + require.NoError(t, err) + byName := indexFamilies(families) + + _, exists := byName["deltest_behind_latest_millis"] + assert.False(t, exists, "metric family should be absent after deletion") +} + +func TestStart_ExternalRegistry_NoServer(t *testing.T) { + reg := prom.NewRegistry() + svc := NewMonitoringServiceWithOptions( + WithRegistry(reg), + WithRegion("us-east-1"), + WithLogger(newTestLogger()), + ) + + require.NoError(t, svc.Start()) + assert.Nil(t, svc.server, "no server should be created for external registry") +} + +func TestStart_Standalone_ServesMetrics(t *testing.T) { + addr := freePort(t) + reg := prom.NewRegistry() + + svc := &MonitoringService{ + listenAddress: addr, + region: "us-east-1", + logger: newTestLogger(), + registerer: reg, + gatherer: reg, + startServer: true, + } + require.NoError(t, svc.Init("srvtest", "stream-1", "worker-1")) + require.NoError(t, svc.Start()) + defer svc.Shutdown() + + svc.IncrRecordsProcessed("shard-0", 1) + + require.Eventually(t, func() bool { + resp, err := http.Get(fmt.Sprintf("http://%s/metrics", addr)) + if err != nil { + return false + } + defer resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, 2*time.Second, 50*time.Millisecond) +} + +func TestShutdown_Graceful(t *testing.T) { + addr := freePort(t) + reg := prom.NewRegistry() + + svc := &MonitoringService{ + listenAddress: addr, + region: "us-east-1", + logger: newTestLogger(), + registerer: reg, + gatherer: reg, + startServer: true, + } + require.NoError(t, svc.Init("sdtest", "stream-1", "worker-1")) + require.NoError(t, svc.Start()) + + require.Eventually(t, func() bool { + resp, err := http.Get(fmt.Sprintf("http://%s/metrics", addr)) + if err != nil { + return false + } + defer resp.Body.Close() + return resp.StatusCode == http.StatusOK + }, 2*time.Second, 50*time.Millisecond) + + svc.Shutdown() + + _, err := http.Get(fmt.Sprintf("http://%s/metrics", addr)) + assert.Error(t, err, "server should be stopped after shutdown") +} + +func TestShutdown_NilServer(t *testing.T) { + svc := NewMonitoringServiceWithOptions( + WithRegistry(prom.NewRegistry()), + WithLogger(newTestLogger()), + ) + // Should not panic. + svc.Shutdown() +} + +// --- helpers --- + +func metricFamilyNames(families []*dto.MetricFamily) []string { + names := make([]string, 0, len(families)) + for _, f := range families { + names = append(names, f.GetName()) + } + return names +} + +func indexFamilies(families []*dto.MetricFamily) map[string]*dto.MetricFamily { + m := make(map[string]*dto.MetricFamily, len(families)) + for _, f := range families { + m[f.GetName()] = f + } + return m +} + +func assertCounterValue(t *testing.T, families map[string]*dto.MetricFamily, name string, expected float64) { + t.Helper() + fam, ok := families[name] + require.True(t, ok, "metric family %q not found", name) + require.NotEmpty(t, fam.GetMetric()) + actual := fam.GetMetric()[0].GetCounter().GetValue() + assert.InDelta(t, expected, actual, 0.001, "counter %s", name) +} + +func assertGaugeValue(t *testing.T, families map[string]*dto.MetricFamily, name string, expected float64) { + t.Helper() + fam, ok := families[name] + require.True(t, ok, "metric family %q not found", name) + require.NotEmpty(t, fam.GetMetric()) + actual := fam.GetMetric()[0].GetGauge().GetValue() + assert.InDelta(t, expected, actual, 0.001, "gauge %s", name) +} diff --git a/examples/prometheus-metrics/main.go b/examples/prometheus-metrics/main.go new file mode 100644 index 0000000..c52cb3f --- /dev/null +++ b/examples/prometheus-metrics/main.go @@ -0,0 +1,123 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/aws/aws-sdk-go-v2/aws" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + + cfg "github.com/ODudek/go-kcl/clientlibrary/config" + kc "github.com/ODudek/go-kcl/clientlibrary/interfaces" + prommetrics "github.com/ODudek/go-kcl/clientlibrary/metrics/prometheus" + wk "github.com/ODudek/go-kcl/clientlibrary/worker" + "github.com/ODudek/go-kcl/logger" +) + +func main() { + log := logger.GetDefaultLogger() + + // 1. Configure KCL + kclConfig := cfg.NewKinesisClientLibConfig( + "my-app", + "my-stream", + "us-east-1", + "worker-1", + ). + WithInitialPositionInStream(cfg.LATEST). + WithMaxRecords(100). + WithLogger(log) + + // ----------------------------------------------------------------- + // Option A: Standalone mode (backward-compatible) + // + // KCL starts its own HTTP server on :2112 and registers metrics on + // the global Prometheus registry. + // ----------------------------------------------------------------- + _ = prommetrics.NewMonitoringService(":2112", "us-east-1", log) + + // ----------------------------------------------------------------- + // Option B: External registry + // + // When your application already exposes a Prometheus /metrics + // endpoint, pass your own registry so KCL does not start a second + // HTTP server. + // ----------------------------------------------------------------- + registry := prom.NewRegistry() + registry.MustRegister(prom.NewGoCollector()) + + metricsService := prommetrics.NewMonitoringServiceWithOptions( + prommetrics.WithRegistry(registry), + prommetrics.WithRegion("us-east-1"), + prommetrics.WithLogger(log), + ) + + // Attach the monitoring service to the worker config + kclConfig.WithMonitoringService(metricsService) + + // Build and start the worker + worker := wk.NewWorker(&processorFactory{}, kclConfig) + if err := worker.Start(); err != nil { + log.Fatalf("Failed to start worker: %v", err) + } + + // Expose the external registry through the application's own HTTP server + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})) + go func() { + log.Infof("Serving Prometheus metrics on :2112") + if err := http.ListenAndServe(":2112", mux); err != nil { + log.Errorf("Metrics server error: %v", err) + } + }() + + // Graceful shutdown on SIGINT/SIGTERM + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + sig := <-sigs + log.Infof("Received %s, shutting down...", sig) + worker.Shutdown() +} + +// --- Record Processor --- + +type processorFactory struct{} + +func (f *processorFactory) CreateProcessor() kc.IRecordProcessor { + return &recordProcessor{} +} + +type recordProcessor struct{} + +func (p *recordProcessor) Initialize(input *kc.InitializationInput) { + fmt.Printf("[init] ShardId: %s\n", input.ShardId) +} + +func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { + if len(input.Records) == 0 { + return + } + + for _, r := range input.Records { + fmt.Printf("[record] PartitionKey=%s Data=%s\n", + aws.ToString(r.PartitionKey), string(r.Data)) + } + + lastSeq := input.Records[len(input.Records)-1].SequenceNumber + if err := input.Checkpointer.Checkpoint(lastSeq); err != nil { + fmt.Printf("[error] checkpoint failed: %v\n", err) + } +} + +func (p *recordProcessor) Shutdown(input *kc.ShutdownInput) { + fmt.Printf("[shutdown] Reason: %s\n", + aws.ToString(kc.ShutdownReasonMessage(input.ShutdownReason))) + + if input.ShutdownReason == kc.TERMINATE { + _ = input.Checkpointer.Checkpoint(nil) + } +} diff --git a/go.mod b/go.mod index 1a0e527..c06d858 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/prometheus/client_golang v1.11.1 + github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.32.1 github.com/redis/go-redis/v9 v9.7.3 github.com/rs/zerolog v1.26.1 @@ -43,7 +44,6 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.9.0 // indirect From ba6a7fa61ac7ad2533d7b1bbc0138d91b9bd7dba Mon Sep 17 00:00:00 2001 From: Aleksander Date: Wed, 11 Feb 2026 16:09:19 +0100 Subject: [PATCH 2/3] Update CHANGELOG for 0.3.0 and fix GoCollector usage in example --- CHANGELOG.md | 13 +++++++++++++ examples/prometheus-metrics/main.go | 3 ++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4d2c99..4d1d66a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.3.0] - 2026-02-11 + +### Added + +- Functional options pattern for Prometheus `MonitoringService` (`NewMonitoringServiceWithOptions`) +- `WithRegistry` option to use an external `*prometheus.Registry` instead of the global default +- `WithRegisterer` option for lower-level registerer injection +- `WithListenAddress`, `WithRegion`, `WithLogger` options +- Graceful HTTP server shutdown in `MonitoringService.Shutdown()` +- HTTP server timeouts (read, idle) for Slowloris protection +- Tolerance for duplicate metric registration (`AlreadyRegisteredError`) +- Prometheus metrics example (`examples/prometheus-metrics/`) + ## [0.2.1] - 2026-02-11 ### Fixed diff --git a/examples/prometheus-metrics/main.go b/examples/prometheus-metrics/main.go index c52cb3f..a026aea 100644 --- a/examples/prometheus-metrics/main.go +++ b/examples/prometheus-metrics/main.go @@ -9,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" cfg "github.com/ODudek/go-kcl/clientlibrary/config" @@ -48,7 +49,7 @@ func main() { // HTTP server. // ----------------------------------------------------------------- registry := prom.NewRegistry() - registry.MustRegister(prom.NewGoCollector()) + registry.MustRegister(collectors.NewGoCollector()) metricsService := prommetrics.NewMonitoringServiceWithOptions( prommetrics.WithRegistry(registry), From 76e71fc9eca802109a6930a4d1ae41ce93f4f549 Mon Sep 17 00:00:00 2001 From: Aleksander Date: Wed, 11 Feb 2026 20:13:53 +0100 Subject: [PATCH 3/3] Replace upstream license headers with GoDoc package comments --- clientlibrary/checkpoint/checkpointer.go | 61 ++++++----- .../checkpoint/dynamodb-checkpointer.go | 22 ++-- clientlibrary/config/config.go | 24 ++--- clientlibrary/config/kcl-config.go | 60 +++++------ clientlibrary/interfaces/inputs.go | 90 +++++++--------- .../record-processor-checkpointer.go | 102 ++++-------------- clientlibrary/interfaces/record-processor.go | 77 ++++--------- clientlibrary/metrics/interfaces.go | 43 ++++++-- clientlibrary/worker/common-shard-consumer.go | 3 +- .../worker/polling-shard-consumer.go | 24 ++--- .../worker/record-processor-checkpointer.go | 22 ++-- clientlibrary/worker/worker.go | 23 ++-- 12 files changed, 214 insertions(+), 337 deletions(-) diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index 44c8148..6011318 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -17,16 +17,15 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package checkpoint -// The implementation is derived from https://github.com/patrobinson/gokini -// -// Copyright 2018 Patrick robinson. +// Package checkpoint provides interfaces and implementations for managing Kinesis +// shard leases and checkpoint tracking across distributed workers. // -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The Checkpointer interface abstracts the underlying storage backend (DynamoDB, Redis) +// for persisting shard lease ownership, sequence number checkpoints, and lease +// claim requests. Implementations provide atomic conditional updates for consistency +// in a multi-worker environment. // -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// The implementation is derived from https://github.com/patrobinson/gokini package checkpoint import ( @@ -37,20 +36,30 @@ import ( ) const ( - LeaseKeyKey = "ShardID" - LeaseOwnerKey = "AssignedTo" - LeaseTimeoutKey = "LeaseTimeout" + // LeaseKeyKey is the field name for the shard ID (primary key). + LeaseKeyKey = "ShardID" + // LeaseOwnerKey is the field name for the worker that owns the lease. + LeaseOwnerKey = "AssignedTo" + // LeaseTimeoutKey is the field name for the lease expiration time. + LeaseTimeoutKey = "LeaseTimeout" + // SequenceNumberKey is the field name for the last checkpointed sequence number. SequenceNumberKey = "Checkpoint" - ParentShardIdKey = "ParentShardId" - ClaimRequestKey = "ClaimRequest" + // ParentShardIdKey is the field name for the parent shard ID (resharding). + ParentShardIdKey = "ParentShardId" + // ClaimRequestKey is the field name for the worker claiming the shard (lease stealing). + ClaimRequestKey = "ClaimRequest" - // ShardEnd We've completely processed all records in this shard. + // ShardEnd is the sentinel checkpoint value indicating a shard has been completely + // processed and all records delivered. ShardEnd = "SHARD_END" - // ErrShardClaimed is returned when shard is claimed + // ErrShardClaimed is the error message returned when a lease acquisition fails + // because another worker has an active claim on the shard. ErrShardClaimed = "shard is already claimed by another node" ) +// ErrLeaseNotAcquired is returned when a worker cannot acquire a lease on a shard, +// typically because another worker holds an active lease or a claim request is in progress. type ErrLeaseNotAcquired struct { Cause string } @@ -59,33 +68,35 @@ func (e ErrLeaseNotAcquired) Error() string { return fmt.Sprintf("lease not acquired: %s", e.Cause) } -// Checkpointer handles checkpointing when a record has been processed +// Checkpointer manages shard lease acquisition, renewal, checkpointing, and lease stealing. +// Implementations must provide atomic conditional updates to ensure consistency +// across multiple concurrent workers. type Checkpointer interface { - // Init initialises the Checkpoint + // Init establishes a connection to the backend store and creates the lease table if needed. Init() error - // GetLease attempts to gain a lock on the given shard + // GetLease attempts to acquire or renew a lease on the given shard for the specified worker. GetLease(*par.ShardStatus, string) error - // CheckpointSequence writes a checkpoint at the designated sequence ID + // CheckpointSequence persists the current checkpoint sequence number for the shard. CheckpointSequence(*par.ShardStatus) error - // FetchCheckpoint retrieves the checkpoint for the given shard + // FetchCheckpoint retrieves the stored checkpoint, lease owner, and lease timeout for the shard. FetchCheckpoint(*par.ShardStatus) error - // RemoveLeaseInfo to remove lease info for shard entry because the shard no longer exists + // RemoveLeaseInfo removes all lease data for a shard that no longer exists in Kinesis. RemoveLeaseInfo(string) error - // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment + // RemoveLeaseOwner clears the lease owner for a shard, making it available for reassignment. RemoveLeaseOwner(string) error - // GetLeaseOwner to get current owner of lease for shard + // GetLeaseOwner returns the current lease owner for the specified shard. GetLeaseOwner(string) (string, error) - // ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods) + // ListActiveWorkers returns a map of worker IDs to their assigned shards (used for rebalancing). ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) - // ClaimShard claims a shard for stealing + // ClaimShard places a claim request on a shard to signal a steal attempt. ClaimShard(*par.ShardStatus, string) error } diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 5a5bcf0..09bc969 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -17,16 +17,6 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package checkpoint -// The implementation is derived from https://github.com/patrobinson/gokini -// -// Copyright 2018 Patrick robinson. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. package checkpoint import ( @@ -51,7 +41,9 @@ const ( NumMaxRetries = 10 ) -// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend +// DynamoCheckpoint implements the Checkpointer interface using AWS DynamoDB as the backend. +// It manages lease acquisition via conditional PutItem operations, checkpoint persistence, +// and optional lease stealing via claim requests. type DynamoCheckpoint struct { log logger.Logger TableName string @@ -65,6 +57,8 @@ type DynamoCheckpoint struct { lastLeaseSync time.Time } +// NewDynamoCheckpoint creates a DynamoDB-backed Checkpointer from the given configuration. +// Call Init to establish the connection and create the lease table if needed. func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *DynamoCheckpoint { checkpointer := &DynamoCheckpoint{ log: kclConfig.Logger, @@ -79,13 +73,15 @@ func NewDynamoCheckpoint(kclConfig *config.KinesisClientLibConfiguration) *Dynam return checkpointer } -// WithDynamoDB is used to provide DynamoDB service +// WithDynamoDB sets a custom DynamoDB client (useful for testing with mocks). +// Returns the receiver for method chaining. func (checkpointer *DynamoCheckpoint) WithDynamoDB(svc DynamoDBAPI) *DynamoCheckpoint { checkpointer.svc = svc return checkpointer } -// Init initialises the DynamoDB Checkpoint +// Init establishes a connection to DynamoDB and ensures the lease table exists. +// If a custom DynamoDB service was set via WithDynamoDB, that service is used. func (checkpointer *DynamoCheckpoint) Init() error { checkpointer.log.Infof("Creating DynamoDB session") diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 7cade9b..2218e9f 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -17,23 +17,11 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package config -// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client -/* - * Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - +// Package config provides configuration types and builder methods for the Kinesis Client Library. +// +// The main type is KinesisClientLibConfiguration, which holds all settings for a KCL Worker. +// Use one of the NewKinesisClientLibConfig* constructors to create a configuration with defaults, +// then customize with the fluent WithXxx methods. package config import ( @@ -307,6 +295,8 @@ var positionMap = map[InitialPositionInStream]*string{ AT_TIMESTAMP: aws.String("AT_TIMESTAMP"), } +// InitalPositionInStreamToShardIteratorType converts an InitialPositionInStream value +// to the corresponding Kinesis ShardIteratorType string ("LATEST", "TRIM_HORIZON", or "AT_TIMESTAMP"). func InitalPositionInStreamToShardIteratorType(pos InitialPositionInStream) *string { return positionMap[pos] } diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 6215b9e..bebb08c 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -17,23 +17,6 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package config -// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client -/* - * Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - package config import ( @@ -120,48 +103,60 @@ func (c *KinesisClientLibConfiguration) WithDynamoDBEndpoint(dynamoDBEndpoint st return c } -// WithTableName to provide alternative lease table in DynamoDB +// WithTableName sets an alternative DynamoDB/Redis table name for lease management. +// Defaults to ApplicationName. func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration { c.TableName = tableName return c } +// WithInitialPositionInStream sets where to start reading when no checkpoint exists. +// Valid values: LATEST, TRIM_HORIZON, or AT_TIMESTAMP. func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration { c.InitialPositionInStream = initialPositionInStream c.InitialPositionInStreamExtended = *newInitialPosition(initialPositionInStream) return c } +// WithTimestampAtInitialPositionInStream sets the initial position to AT_TIMESTAMP with the given time. func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration { c.InitialPositionInStream = AT_TIMESTAMP c.InitialPositionInStreamExtended = *newInitialPositionAtTimestamp(timestamp) return c } +// WithFailoverTimeMillis sets the lease duration in milliseconds. Workers that do not renew +// their lease within this period will have their shards reassigned. Default: 10000. func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis) c.FailoverTimeMillis = failoverTimeMillis return c } +// WithLeaseRefreshPeriodMillis sets the period before lease expiry during which the owner +// will attempt to renew the lease. Default: 5000. func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefreshPeriodMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("LeaseRefreshPeriodMillis", leaseRefreshPeriodMillis) c.LeaseRefreshPeriodMillis = leaseRefreshPeriodMillis return c } +// WithLeaseRefreshWaitTime sets the wait period in milliseconds before an async lease renewal. Default: 2500. func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration { checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime) c.LeaseRefreshWaitTime = leaseRefreshWaitTime return c } +// WithShardSyncIntervalMillis sets the interval in milliseconds between shard sync tasks. Default: 60000. func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) c.ShardSyncIntervalMillis = shardSyncIntervalMillis return c } +// WithMaxRecords sets the maximum number of records per GetRecords call. +// The Kinesis API enforces an upper limit of 10000. Default: 10000. func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration { checkIsValuePositive("MaxRecords", maxRecords) if maxRecords > MaxMaxRecords { @@ -179,41 +174,31 @@ func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisCl return c } -// WithIdleTimeBetweenReadsInMillis -// Controls how long the KCL will sleep if no records are returned from Kinesis -// -//

-// This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will -// immediately retrieve the next set of records after the call to -// {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)} -// has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this -// value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and -// monitor how far behind the records retrieved are by inspecting -// {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the -// CloudWatch -// Metric: GetRecords.MillisBehindLatest -//

-// -// @param IdleTimeBetweenReadsInMillis: how long to sleep between GetRecords calls when no records are returned. -// @return KinesisClientLibConfiguration +// WithIdleTimeBetweenReadsInMillis sets how long the consumer sleeps when no records are returned. +// This value is only used when no records are returned; when records are present, the next +// batch is fetched immediately. Setting this too high may cause the consumer to fall behind. +// Default: 1000. func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis) c.IdleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis return c } +// WithCallProcessRecordsEvenForEmptyRecordList controls whether ProcessRecords is called +// even when GetRecords returns no records. Default: false. func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration { c.CallProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList return c } +// WithTaskBackoffTimeMillis sets the backoff time in milliseconds when tasks encounter errors. Default: 500. func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis) c.TaskBackoffTimeMillis = taskBackoffTimeMillis return c } +// WithLogger sets a custom logger. Panics if nil. func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *KinesisClientLibConfiguration { if logger == nil { log.Panic("Logger cannot be null") @@ -265,16 +250,19 @@ func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerARN(consumerAR return c } +// WithLeaseStealing enables or disables lease stealing for load balancing across workers. Default: false. func (c *KinesisClientLibConfiguration) WithLeaseStealing(enableLeaseStealing bool) *KinesisClientLibConfiguration { c.EnableLeaseStealing = enableLeaseStealing return c } +// WithLeaseStealingIntervalMillis sets the interval between lease stealing (rebalancing) attempts. Default: 5000. func (c *KinesisClientLibConfiguration) WithLeaseStealingIntervalMillis(leaseStealingIntervalMillis int) *KinesisClientLibConfiguration { c.LeaseStealingIntervalMillis = leaseStealingIntervalMillis return c } +// WithLeaseSyncingIntervalMillis sets the interval before syncing with the lease table. Default: 60000. func (c *KinesisClientLibConfiguration) WithLeaseSyncingIntervalMillis(leaseSyncingIntervalMillis int) *KinesisClientLibConfiguration { c.LeaseSyncingTimeIntervalMillis = leaseSyncingIntervalMillis return c diff --git a/clientlibrary/interfaces/inputs.go b/clientlibrary/interfaces/inputs.go index 6275152..24d09f1 100644 --- a/clientlibrary/interfaces/inputs.go +++ b/clientlibrary/interfaces/inputs.go @@ -17,23 +17,11 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package interfaces -// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client -/* - * Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - +// Package interfaces defines the user-facing interfaces for the Kinesis Client Library. +// +// Applications must implement IRecordProcessor and IRecordProcessorFactory to +// consume records from a Kinesis stream. The library manages the lifecycle: +// Initialize -> ProcessRecords (repeated) -> Shutdown. package interfaces import ( @@ -44,72 +32,68 @@ import ( ) const ( - /* - * REQUESTED Indicates that the entire application is being shutdown, and if desired the record processor will be given a - * final chance to checkpoint. This state will not trigger a direct call to - * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but - * instead depend on a different interface for backward compatibility. - */ + // REQUESTED indicates the entire application is shutting down. + // The record processor will be given a final chance to checkpoint. REQUESTED ShutdownReason = iota + 1 - /* - * Terminate processing for this RecordProcessor (resharding use case). - * Indicates that the shard is closed and all records from the shard have been delivered to the application. - * Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records - * from this shard and processing of child shards can be started. - */ + // TERMINATE indicates the shard is closed and all records have been delivered. + // Applications MUST checkpoint their progress so that processing of child + // shards can begin. TERMINATE - /* - * Processing will be moved to a different record processor (fail over, load balancing use cases). - * Applications SHOULD NOT checkpoint their progress (as another record processor may have already started - * processing data). - */ + // ZOMBIE indicates processing is moving to a different record processor + // (failover or load balancing). Applications SHOULD NOT checkpoint as another + // processor may have already started processing data. ZOMBIE ) -// Containers for the parameters to the IRecordProcessor type ( - /* - * Reason the RecordProcessor is being shutdown. - * Used to distinguish between a fail-over vs. a termination (shard is closed and all records have been delivered). - * In case of a fail-over, applications should NOT checkpoint as part of shutdown, - * since another record processor may have already started processing records for that shard. - * In case of termination (resharding use case), applications SHOULD keep checkpointing their progress to indicate - * that they have successfully processed all the records (processing of child shards can then begin). - */ + // ShutdownReason indicates why a record processor is being shut down. + // Used to distinguish between failover vs. termination (shard closed). + // In case of failover, applications should NOT checkpoint. + // In case of termination, applications SHOULD checkpoint their progress. ShutdownReason int + // InitializationInput provides information to the record processor when it + // starts processing a new shard. Passed to IRecordProcessor.Initialize. InitializationInput struct { - // The shardId that the record processor is being initialized for. + // ShardId is the unique identifier of the shard this processor is responsible for. ShardId string - // The last extended sequence number that was successfully checkpointed by the previous record processor. + // ExtendedSequenceNumber is the last successfully checkpointed position. + // Nil indicates no prior checkpoint exists. ExtendedSequenceNumber *ExtendedSequenceNumber } + // ProcessRecordsInput contains a batch of records from a Kinesis shard along + // with metadata and a checkpointer for tracking progress. Passed to + // IRecordProcessor.ProcessRecords. ProcessRecordsInput struct { - // The time that this batch of records was received by the KCL. + // CacheEntryTime is when the batch was received from Kinesis. CacheEntryTime *time.Time - // The time that this batch of records was prepared to be provided to the RecordProcessor. + // CacheExitTime is when the batch was prepared for delivery to the processor. CacheExitTime *time.Time - // The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL. + // Records are the data records from Kinesis, potentially de-aggregated + // if published by the KPL. Records []types.Record - // A checkpointer that the RecordProcessor can use to checkpoint its progress. + // Checkpointer provides methods to checkpoint progress within this batch. Checkpointer IRecordProcessorCheckpointer - // How far behind this batch of records was when received from Kinesis. + // MillisBehindLatest is how far behind the stream tip this batch was when + // received, in milliseconds. Use this to monitor processing lag. MillisBehindLatest int64 } + // ShutdownInput provides information and capabilities when a record processor + // is being shut down. Passed to IRecordProcessor.Shutdown. ShutdownInput struct { - // ShutdownReason shows why RecordProcessor is going to be shutdown. + // ShutdownReason indicates why this processor is shutting down. ShutdownReason ShutdownReason - // Checkpointer is used to record the current progress. + // Checkpointer provides methods to record final progress before shutdown. Checkpointer IRecordProcessorCheckpointer } ) @@ -120,6 +104,8 @@ var shutdownReasonMap = map[ShutdownReason]*string{ ZOMBIE: aws.String("ZOMBIE"), } +// ShutdownReasonMessage returns a human-readable string for the given ShutdownReason +// (e.g. "TERMINATE", "ZOMBIE", "REQUESTED"). func ShutdownReasonMessage(reason ShutdownReason) *string { return shutdownReasonMap[reason] } diff --git a/clientlibrary/interfaces/record-processor-checkpointer.go b/clientlibrary/interfaces/record-processor-checkpointer.go index b8422ad..e5d6afd 100644 --- a/clientlibrary/interfaces/record-processor-checkpointer.go +++ b/clientlibrary/interfaces/record-processor-checkpointer.go @@ -17,102 +17,36 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package interfaces -// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client -/* - * Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - package interfaces type ( + // IPreparedCheckpointer represents a checkpoint that has been prepared but not + // yet committed. This allows decoupling checkpoint preparation from persistence. IPreparedCheckpointer interface { + // GetPendingCheckpoint returns the sequence number for the prepared checkpoint, + // or nil if no checkpoint is pending. GetPendingCheckpoint() *ExtendedSequenceNumber - // Checkpoint - /* - * This method will record a pending checkpoint. - * - * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. - * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. - * @error ShutdownError The record processor instance has been shutdown. Another instance may have - * started processing some of these records already. - * The application should abort processing via this RecordProcessor instance. - * @error InvalidStateError Can't store checkpoint. - * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). - * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can - * backoff and retry. - * @error IllegalArgumentError The sequence number being checkpointed is invalid because it is out of range, - * i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest - * sequence number seen by the associated record processor. - */ + // Checkpoint persists the prepared checkpoint, making it durable. + // Returns an error if the checkpoint store is unavailable, the processor + // has been shut down, or the sequence number is out of range. Checkpoint() error } - // IRecordProcessorCheckpointer - /* - * Used by RecordProcessors when they want to checkpoint their progress. - * The Kinesis Client Library will pass an object implementing this interface to RecordProcessors, so they can - * checkpoint their progress. - */ + // IRecordProcessorCheckpointer allows record processors to persist their progress. + // The library provides an instance to processors via ProcessRecordsInput and ShutdownInput. IRecordProcessorCheckpointer interface { - // Checkpoint - /* - * This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to - * {@link #checkpoint()} but provides the ability to specify the sequence number at which to - * checkpoint. - * - * @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, - * the Kinesis Client Library will start fetching records after this sequence number. - * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. - * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. - * @error ShutdownError The record processor instance has been shutdown. Another instance may have - * started processing some of these records already. - * The application should abort processing via this RecordProcessor instance. - * @error InvalidStateError Can't store checkpoint. - * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). - * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can - * backoff and retry. - * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: - * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the - * greatest sequence number seen by the associated record processor. - * 2.) It is not a valid sequence number for a record in this shard. - */ + // Checkpoint records progress at the provided sequence number. Upon failover, + // the library will start fetching records after this sequence number. + // Pass nil to checkpoint at SHARD_END (shard closed, all records delivered). + // + // Returns an error if the checkpoint store is unavailable, the processor + // has been shut down, or the sequence number is out of range. Checkpoint(sequenceNumber *string) error - // PrepareCheckpoint - /** - * This method will record a pending checkpoint at the provided sequenceNumber. - * - * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. - - * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. - * - * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently. - * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. - * @error ShutdownError The record processor instance has been shutdown. Another instance may have - * started processing some of these records already. - * The application should abort processing via this RecordProcessor instance. - * @error InvalidStateError Can't store pending checkpoint. - * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). - * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The - * application can backoff and retry. - * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: - * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the - * greatest sequence number seen by the associated record processor. - * 2.) It is not a valid sequence number for a record in this shard. - */ + // PrepareCheckpoint creates a pending checkpoint at the provided sequence number + // without committing it immediately. Returns an IPreparedCheckpointer that + // can be committed later via its Checkpoint method. PrepareCheckpoint(sequenceNumber *string) (IPreparedCheckpointer, error) } ) diff --git a/clientlibrary/interfaces/record-processor.go b/clientlibrary/interfaces/record-processor.go index 1c41d56..723868b 100644 --- a/clientlibrary/interfaces/record-processor.go +++ b/clientlibrary/interfaces/record-processor.go @@ -17,77 +17,36 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package interfaces -// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client -/* - * Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ package interfaces type ( - // IRecordProcessor is the interface for some callback functions invoked by KCL will - // The main task of using KCL is to provide implementation on IRecordProcessor interface. - // Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2 + // IRecordProcessor defines the interface for processing records from a Kinesis shard. + // + // Applications using the Kinesis Client Library must implement this interface. + // The lifecycle is: Initialize -> (ProcessRecords)* -> Shutdown. + // This mirrors the Amazon KCL v2 Java IRecordProcessor interface. IRecordProcessor interface { - // Initialize - /* - * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance - * (via processRecords). - * - * @param initializationInput Provides information related to initialization - */ + // Initialize is called once when the record processor is first assigned to a shard. + // Use this to initialize any resources needed for processing. Initialize(initializationInput *InitializationInput) - // ProcessRecords - /* - * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the - * application. - * Upon fail over, the new instance will get records with sequence number > checkpoint position - * for each partition key. - * - * @param processRecordsInput Provides the records to be processed as well as information and capabilities related - * to them (eg checkpointing). - */ + // ProcessRecords processes a batch of data records from the shard. + // Upon failover, the new instance will receive records with sequence numbers + // greater than the last checkpointed position. ProcessRecords(processRecordsInput *ProcessRecordsInput) - // Shutdown - /* - * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this - * RecordProcessor instance. - * - *

Warning

- * - * When the value of {@link ShutdownInput#getShutdownReason()} is - * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you - * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. - * - * @param shutdownInput - * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor. - */ + // Shutdown is called when the record processor is no longer needed. + // When ShutdownInput.ShutdownReason is TERMINATE, you MUST checkpoint before returning. + // When it is ZOMBIE or REQUESTED, do NOT checkpoint as another processor may have + // already started processing records for this shard. Shutdown(shutdownInput *ShutdownInput) } - // IRecordProcessorFactory is interface for creating IRecordProcessor. Each Worker can have multiple threads - // for processing shard. Client can choose either creating one processor per shard or sharing them. + // IRecordProcessorFactory creates IRecordProcessor instances. + // The library calls CreateProcessor for each shard assignment. Implementations + // can create a new processor per shard or reuse instances (if thread-safe). IRecordProcessorFactory interface { - - // CreateProcessor - /* - * Returns a record processor to be used for processing data records for a (assigned) shard. - * - * @return Returns a processor object. - */ + // CreateProcessor returns a new IRecordProcessor for a shard. CreateProcessor() IRecordProcessor } ) diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index 47ec490..ca29b16 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -17,34 +17,57 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package metrics -// The implementation is derived from https://github.com/patrobinson/gokini -// -// Copyright 2018 Patrick robinson. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// Package metrics provides the MonitoringService interface for recording +// KCL worker metrics. Implementations are available for CloudWatch and +// Prometheus backends. // -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// The implementation is derived from https://github.com/patrobinson/gokini package metrics +// MonitoringService records KCL worker metrics including record processing +// rate, bytes processed, stream lag, and lease acquisition/renewal events. +// Metrics are tracked per shard. The service lifecycle is: +// Init -> Start -> [metric recording] -> Shutdown. type MonitoringService interface { + // Init initializes the service with application context (used as metric labels/dimensions). Init(appName, streamName, workerID string) error + + // Start begins metric collection and reporting. Start() error + + // IncrRecordsProcessed records the count of successfully processed records for the shard. IncrRecordsProcessed(shard string, count int) + + // IncrBytesProcessed records the number of data bytes processed for the shard. IncrBytesProcessed(shard string, count int64) + + // MillisBehindLatest records how many milliseconds the consumer lags behind the stream tip. MillisBehindLatest(shard string, milliSeconds float64) + + // DeleteMetricMillisBehindLatest clears the lag metric for the shard (e.g. on lease loss). DeleteMetricMillisBehindLatest(shard string) + + // LeaseGained records that the worker acquired the lease for the shard. LeaseGained(shard string) + + // LeaseLost records that the worker lost the lease for the shard. LeaseLost(shard string) + + // LeaseRenewed records a successful lease renewal for the shard. LeaseRenewed(shard string) + + // RecordGetRecordsTime records the elapsed time (ms) for fetching a batch from Kinesis. RecordGetRecordsTime(shard string, time float64) + + // RecordProcessRecordsTime records the elapsed time (ms) for processing a batch of records. RecordProcessRecordsTime(shard string, time float64) + + // Shutdown stops metric collection and flushes any pending metrics. Shutdown() } -// NoopMonitoringService implements MonitoringService by does nothing. +// NoopMonitoringService implements MonitoringService as a no-op, +// useful for testing or when metrics collection should be disabled. type NoopMonitoringService struct{} func (NoopMonitoringService) Init(_, _, _ string) error { return nil } diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index b890153..973c436 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -17,7 +17,6 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package worker package worker import ( @@ -41,6 +40,8 @@ type shardConsumer interface { getRecords() error } +// KinesisSubscriberGetter abstracts the Kinesis API operations required by shard consumers. +// It enables dependency injection for testing and custom AWS configurations. type KinesisSubscriberGetter interface { SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index ea34c9b..f7d65db 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -17,16 +17,6 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package worker -// The implementation is derived from https://github.com/patrobinson/gokini -// -// Copyright 2018 Patrick robinson. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. package worker import ( @@ -46,16 +36,22 @@ import ( ) const ( + // kinesisReadTPSLimit is the maximum GetRecords API calls allowed per second per shard. kinesisReadTPSLimit = 5 - MaxBytes = 10000000 - MaxBytesPerSecond = 2000000 + // MaxBytes is the maximum bytes readable in a rate-limit window before enforcing a cool-down. + MaxBytes = 10000000 + // MaxBytesPerSecond is the maximum bytes per second readable from a shard via GetRecords. + MaxBytesPerSecond = 2000000 + // BytesToMbConversion is the conversion factor from bytes to megabytes. BytesToMbConversion = 1000000 ) var ( - rateLimitTimeNow = time.Now - rateLimitTimeSince = time.Since + rateLimitTimeNow = time.Now + rateLimitTimeSince = time.Since + // errLocalTPSExceeded is returned when the local GetRecords TPS limit is exceeded. errLocalTPSExceeded = errors.New("error GetRecords TPS exceeded") + // errMaxBytesExceeded is returned when the max bytes per second limit is exceeded. errMaxBytesExceeded = errors.New("error GetRecords max bytes for call period exceeded") ) diff --git a/clientlibrary/worker/record-processor-checkpointer.go b/clientlibrary/worker/record-processor-checkpointer.go index 849300f..b0f88ab 100644 --- a/clientlibrary/worker/record-processor-checkpointer.go +++ b/clientlibrary/worker/record-processor-checkpointer.go @@ -17,7 +17,6 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package worker package worker import ( @@ -28,30 +27,25 @@ import ( ) type ( - - // PreparedCheckpointer - /* - * Objects of this class are prepared to checkpoint at a specific sequence number. They use an - * IRecordProcessorCheckpointer to do the actual checkpointing, so their checkpoint is subject to the same 'didn't go - * backwards' validation as a normal checkpoint. - */ + // PreparedCheckpointer holds a prepared checkpoint at a specific sequence number. + // It delegates to an IRecordProcessorCheckpointer for persistence, so checkpoints + // are subject to the same forward-only validation. PreparedCheckpointer struct { pendingCheckpointSequenceNumber *kcl.ExtendedSequenceNumber checkpointer kcl.IRecordProcessorCheckpointer } - //RecordProcessorCheckpointer - /* - * This class is used to enable RecordProcessors to checkpoint their progress. - * The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application - * RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment. - */ + // RecordProcessorCheckpointer enables record processors to checkpoint their progress. + // The library creates one instance per shard assignment and passes it to the + // IRecordProcessor lifecycle methods. RecordProcessorCheckpointer struct { shard *par.ShardStatus checkpoint chk.Checkpointer } ) +// NewRecordProcessorCheckpoint creates a new checkpointer for the given shard +// that persists progress via the provided Checkpointer backend. func NewRecordProcessorCheckpoint(shard *par.ShardStatus, checkpoint chk.Checkpointer) kcl.IRecordProcessorCheckpointer { return &RecordProcessorCheckpointer{ shard: shard, diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index c28b542..15b077d 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -17,16 +17,13 @@ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -// Package worker -// The implementation is derived from https://github.com/patrobinson/gokini -// -// Copyright 2018 Patrick robinson. +// Package worker implements the Worker orchestrator for consuming Amazon Kinesis data streams. // -// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +// The Worker coordinates shard discovery, lease management, and record processing. +// It spins up goroutine-based consumers for each assigned shard using either polling +// (GetRecords API) or enhanced fan-out (SubscribeToShard API) strategies. // -// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +// The implementation is derived from https://github.com/patrobinson/gokini package worker import ( @@ -94,20 +91,22 @@ func NewWorker(factory kcl.IRecordProcessorFactory, kclConfig *config.KinesisCli } } -// WithKinesis is used to provide Kinesis service for either custom implementation or unit testing. +// WithKinesis provides a custom Kinesis service client for testing or alternative configurations. +// Returns the Worker for method chaining. func (w *Worker) WithKinesis(svc *kinesis.Client) *Worker { w.kc = svc return w } -// WithCheckpointer is used to provide a custom checkpointer service for non-dynamodb implementation -// or unit testing. +// WithCheckpointer provides a custom checkpointer implementation (e.g. Redis backend or test mock). +// If not set, a DynamoDB-backed checkpointer is created during Start. +// Returns the Worker for method chaining. func (w *Worker) WithCheckpointer(checker chk.Checkpointer) *Worker { w.checkpointer = checker return w } -// Start Run starts consuming data from the stream, and pass it to the application record processors. +// Start initializes the worker and begins consuming data from the stream. func (w *Worker) Start() error { log := w.kclConfig.Logger if err := w.initialize(); err != nil {