From a33b95296841009e3f9b7b0c831b3c0e5dadc600 Mon Sep 17 00:00:00 2001 From: Aleksander Date: Tue, 24 Mar 2026 12:21:15 +0100 Subject: [PATCH 1/2] Make ProcessRecords return error to control iterator advancement - IRecordProcessor.ProcessRecords now returns error; returning an error prevents the shard consumer from advancing the iterator and triggers a retry from the last checkpoint - Updated all example and test processors to return error - Updated core worker logic to propagate errors from ProcessRecords --- CHANGELOG.md | 11 ++++++++ README.md | 27 +++++++++++++++++++ clientlibrary/interfaces/record-processor.go | 4 ++- clientlibrary/worker/common-shard-consumer.go | 11 +++++--- .../worker/polling-shard-consumer.go | 5 +++- examples/dynamodb-consumer/main.go | 5 ++-- examples/prometheus-metrics/main.go | 5 ++-- examples/redis-consumer/main.go | 5 ++-- examples/redis-multitenant/main.go | 3 ++- test/record_processor_test.go | 5 ++-- 10 files changed, 67 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d1d66a..609cb73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.4.0] - 2026-03-24 + +### Fixed + +- `IRecordProcessor.ProcessRecords` now returns `error` — previously the return value was void, causing go-kcl to silently advance the shard iterator even when the processor failed. Returning an error from `ProcessRecords` now causes the shard consumer to stop advancing the iterator and retry the same batch of records from the last checkpoint. + +### Changed + +- `commonShardConsumer.processRecords` returns `error` and propagates it to `getRecords` +- `PollingShardConsumer.getRecords` returns error from `processRecords`, triggering shard lease release and retry from checkpoint + ## [0.3.0] - 2026-02-11 ### Added diff --git a/README.md b/README.md index b4fc507..38d40eb 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,33 @@ Working examples are available in the [`examples/`](examples/) directory: | [`redis-multitenant`](examples/redis-multitenant/) | Redis | Two applications sharing one Redis instance | | [`prometheus-metrics`](examples/prometheus-metrics/) | Prometheus | Consumer with external Prometheus registry | +## Error Handling in ProcessRecords + +`IRecordProcessor.ProcessRecords` returns `error`. This is the primary mechanism for controlling shard iterator advancement: + +- **Return `nil`** — batch processed successfully. The consumer checkpoints and advances to the next batch. +- **Return `error`** — batch failed. The consumer **does not advance the shard iterator** and exits `getRecords`, which releases the shard lease. On the next lease acquisition the consumer restarts from the last checkpoint and retries the same batch. + +```go +func (p *myProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error { + for _, record := range input.Records { + if err := process(record); err != nil { + // Returning the error causes the entire batch to be retried. + return fmt.Errorf("processing record %s: %w", aws.ToString(record.SequenceNumber), err) + } + } + + // Checkpoint after successful processing + lastSeq := input.Records[len(input.Records)-1].SequenceNumber + if err := input.Checkpointer.Checkpoint(lastSeq); err != nil { + return fmt.Errorf("checkpoint failed: %w", err) + } + return nil +} +``` + +> **Note:** Records before the failed one that were already processed will be retried on the next attempt. Make your processor idempotent (e.g. use conditional writes). + ## Documentation Go-KCL matches exactly the same interface and programming model from original Amazon KCL, the best place for getting reference, tutorial is from Amazon itself: diff --git a/clientlibrary/interfaces/record-processor.go b/clientlibrary/interfaces/record-processor.go index 723868b..dd2cd1f 100644 --- a/clientlibrary/interfaces/record-processor.go +++ b/clientlibrary/interfaces/record-processor.go @@ -33,7 +33,9 @@ type ( // ProcessRecords processes a batch of data records from the shard. // Upon failover, the new instance will receive records with sequence numbers // greater than the last checkpointed position. - ProcessRecords(processRecordsInput *ProcessRecordsInput) + // If an error is returned, the shard consumer will stop advancing the iterator + // and retry the same batch of records from the last checkpoint. + ProcessRecords(processRecordsInput *ProcessRecordsInput) error // Shutdown is called when the record processor is no longer needed. // When ShutdownInput.ShutdownReason is TERMINATE, you MUST checkpoint before returning. diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 973c436..2136b2b 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -137,7 +137,7 @@ func (sc *commonShardConsumer) waitOnParentShard() error { } } -func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) { +func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) error { log := sc.kclConfig.Logger getRecordsTime := time.Since(getRecordsStartTime).Milliseconds() @@ -170,10 +170,14 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec if recordLength > 0 || sc.kclConfig.CallProcessRecordsEvenForEmptyRecordList { processRecordsStartTime := time.Now() - // Delivery the events to the record processor + // Deliver the events to the record processor. + // If the processor returns an error, propagate it so the shard consumer + // does not advance the iterator — the same batch will be retried from checkpoint. input.CacheEntryTime = &getRecordsStartTime input.CacheExitTime = &processRecordsStartTime - sc.recordProcessor.ProcessRecords(input) + if err := sc.recordProcessor.ProcessRecords(input); err != nil { + return err + } processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds() sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming)) } @@ -181,4 +185,5 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec sc.mService.IncrRecordsProcessed(sc.shard.ID, recordLength) sc.mService.IncrBytesProcessed(sc.shard.ID, recordBytes) sc.mService.MillisBehindLatest(sc.shard.ID, float64(*millisBehindLatest)) + return nil } diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index f7d65db..587bfd9 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -204,7 +204,10 @@ func (sc *PollingShardConsumer) getRecords() error { // reset the retry count after success retriedErrors = 0 - sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) + if err := sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer); err != nil { + log.Errorf("Error processing records from shard %s: %+v", sc.shard.ID, err) + return err + } // The shard has been closed, so no new records can be read from it if getResp.NextShardIterator == nil { diff --git a/examples/dynamodb-consumer/main.go b/examples/dynamodb-consumer/main.go index 44f1169..f9bceef 100644 --- a/examples/dynamodb-consumer/main.go +++ b/examples/dynamodb-consumer/main.go @@ -64,9 +64,9 @@ func (p *recordProcessor) Initialize(input *kc.InitializationInput) { aws.ToString(input.ExtendedSequenceNumber.SequenceNumber)) } -func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { +func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error { if len(input.Records) == 0 { - return + return nil } for _, r := range input.Records { @@ -79,6 +79,7 @@ func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { if err := input.Checkpointer.Checkpoint(lastSeq); err != nil { fmt.Printf("[error] checkpoint failed: %v\n", err) } + return nil } func (p *recordProcessor) Shutdown(input *kc.ShutdownInput) { diff --git a/examples/prometheus-metrics/main.go b/examples/prometheus-metrics/main.go index a026aea..31027d2 100644 --- a/examples/prometheus-metrics/main.go +++ b/examples/prometheus-metrics/main.go @@ -98,9 +98,9 @@ func (p *recordProcessor) Initialize(input *kc.InitializationInput) { fmt.Printf("[init] ShardId: %s\n", input.ShardId) } -func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { +func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error { if len(input.Records) == 0 { - return + return nil } for _, r := range input.Records { @@ -112,6 +112,7 @@ func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { if err := input.Checkpointer.Checkpoint(lastSeq); err != nil { fmt.Printf("[error] checkpoint failed: %v\n", err) } + return nil } func (p *recordProcessor) Shutdown(input *kc.ShutdownInput) { diff --git a/examples/redis-consumer/main.go b/examples/redis-consumer/main.go index 85dfe1b..34faec4 100644 --- a/examples/redis-consumer/main.go +++ b/examples/redis-consumer/main.go @@ -77,9 +77,9 @@ func (p *recordProcessor) Initialize(input *kc.InitializationInput) { aws.ToString(input.ExtendedSequenceNumber.SequenceNumber)) } -func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { +func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error { if len(input.Records) == 0 { - return + return nil } for _, r := range input.Records { @@ -92,6 +92,7 @@ func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { if err := input.Checkpointer.Checkpoint(lastSeq); err != nil { fmt.Printf("[error] checkpoint failed: %v\n", err) } + return nil } func (p *recordProcessor) Shutdown(input *kc.ShutdownInput) { diff --git a/examples/redis-multitenant/main.go b/examples/redis-multitenant/main.go index 277ec2c..11c3a2b 100644 --- a/examples/redis-multitenant/main.go +++ b/examples/redis-multitenant/main.go @@ -117,7 +117,7 @@ func (p *logProcessor) Initialize(input *kc.InitializationInput) { fmt.Printf("[%s][init] ShardId: %s\n", p.prefix, input.ShardId) } -func (p *logProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { +func (p *logProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error { for _, r := range input.Records { fmt.Printf("[%s][record] Key=%s Data=%s\n", p.prefix, @@ -129,6 +129,7 @@ func (p *logProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { lastSeq := input.Records[len(input.Records)-1].SequenceNumber _ = input.Checkpointer.Checkpoint(lastSeq) } + return nil } func (p *logProcessor) Shutdown(input *kc.ShutdownInput) { diff --git a/test/record_processor_test.go b/test/record_processor_test.go index 154be89..e6fe73b 100644 --- a/test/record_processor_test.go +++ b/test/record_processor_test.go @@ -57,12 +57,12 @@ func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { dd.count = 0 } -func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { +func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error { dd.t.Log("Processing Records...") // don't process empty record if len(input.Records) == 0 { - return + return nil } for _, v := range input.Records { @@ -79,6 +79,7 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { diff := input.CacheExitTime.Sub(*input.CacheEntryTime) dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v, KCLProcessTime = %v", lastRecordSequenceNumber, input.MillisBehindLatest, diff) _ = input.Checkpointer.Checkpoint(lastRecordSequenceNumber) + return nil } func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { From 73b4b1a29b7970f540bd039661d1d855baa5e2e9 Mon Sep 17 00:00:00 2001 From: Aleksander Date: Tue, 24 Mar 2026 12:26:52 +0100 Subject: [PATCH 2/2] Update fan-out-shard-consumer.go --- clientlibrary/worker/fan-out-shard-consumer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clientlibrary/worker/fan-out-shard-consumer.go b/clientlibrary/worker/fan-out-shard-consumer.go index a297c77..d224b76 100644 --- a/clientlibrary/worker/fan-out-shard-consumer.go +++ b/clientlibrary/worker/fan-out-shard-consumer.go @@ -125,7 +125,10 @@ func (sc *FanOutShardConsumer) getRecords() error { continue } continuationSequenceNumber = subEvent.Value.ContinuationSequenceNumber - sc.processRecords(getRecordsStartTime, subEvent.Value.Records, subEvent.Value.MillisBehindLatest, recordCheckpointer) + if err := sc.processRecords(getRecordsStartTime, subEvent.Value.Records, subEvent.Value.MillisBehindLatest, recordCheckpointer); err != nil { + log.Errorf("Error processing records from shard %s: %+v", sc.shard.ID, err) + return err + } // The shard has been closed, so no new records can be read from it if continuationSequenceNumber == nil {