Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.4.0] - 2026-03-24

### Fixed

- `IRecordProcessor.ProcessRecords` now returns `error` — previously the return value was void, causing go-kcl to silently advance the shard iterator even when the processor failed. Returning an error from `ProcessRecords` now causes the shard consumer to stop advancing the iterator and retry the same batch of records from the last checkpoint.

### Changed

- `commonShardConsumer.processRecords` returns `error` and propagates it to `getRecords`
- `PollingShardConsumer.getRecords` returns error from `processRecords`, triggering shard lease release and retry from checkpoint

## [0.3.0] - 2026-02-11

### Added
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,33 @@ Working examples are available in the [`examples/`](examples/) directory:
| [`redis-multitenant`](examples/redis-multitenant/) | Redis | Two applications sharing one Redis instance |
| [`prometheus-metrics`](examples/prometheus-metrics/) | Prometheus | Consumer with external Prometheus registry |

## Error Handling in ProcessRecords

`IRecordProcessor.ProcessRecords` returns `error`. This is the primary mechanism for controlling shard iterator advancement:

- **Return `nil`** — batch processed successfully. The consumer checkpoints and advances to the next batch.
- **Return `error`** — batch failed. The consumer **does not advance the shard iterator** and exits `getRecords`, which releases the shard lease. On the next lease acquisition the consumer restarts from the last checkpoint and retries the same batch.

```go
func (p *myProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error {
for _, record := range input.Records {
if err := process(record); err != nil {
// Returning the error causes the entire batch to be retried.
return fmt.Errorf("processing record %s: %w", aws.ToString(record.SequenceNumber), err)
}
}

// Checkpoint after successful processing
lastSeq := input.Records[len(input.Records)-1].SequenceNumber
if err := input.Checkpointer.Checkpoint(lastSeq); err != nil {
return fmt.Errorf("checkpoint failed: %w", err)
}
return nil
}
```

> **Note:** Records before the failed one that were already processed will be retried on the next attempt. Make your processor idempotent (e.g. use conditional writes).

## Documentation

Go-KCL matches exactly the same interface and programming model from original Amazon KCL, the best place for getting reference, tutorial is from Amazon itself:
Expand Down
4 changes: 3 additions & 1 deletion clientlibrary/interfaces/record-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ type (
// ProcessRecords processes a batch of data records from the shard.
// Upon failover, the new instance will receive records with sequence numbers
// greater than the last checkpointed position.
ProcessRecords(processRecordsInput *ProcessRecordsInput)
// If an error is returned, the shard consumer will stop advancing the iterator
// and retry the same batch of records from the last checkpoint.
ProcessRecords(processRecordsInput *ProcessRecordsInput) error

// Shutdown is called when the record processor is no longer needed.
// When ShutdownInput.ShutdownReason is TERMINATE, you MUST checkpoint before returning.
Expand Down
11 changes: 8 additions & 3 deletions clientlibrary/worker/common-shard-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (sc *commonShardConsumer) waitOnParentShard() error {
}
}

func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) {
func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) error {
log := sc.kclConfig.Logger

getRecordsTime := time.Since(getRecordsStartTime).Milliseconds()
Expand Down Expand Up @@ -170,15 +170,20 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec
if recordLength > 0 || sc.kclConfig.CallProcessRecordsEvenForEmptyRecordList {
processRecordsStartTime := time.Now()

// Delivery the events to the record processor
// Deliver the events to the record processor.
// If the processor returns an error, propagate it so the shard consumer
// does not advance the iterator — the same batch will be retried from checkpoint.
input.CacheEntryTime = &getRecordsStartTime
input.CacheExitTime = &processRecordsStartTime
sc.recordProcessor.ProcessRecords(input)
if err := sc.recordProcessor.ProcessRecords(input); err != nil {
return err
}
processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds()
sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming))
}

sc.mService.IncrRecordsProcessed(sc.shard.ID, recordLength)
sc.mService.IncrBytesProcessed(sc.shard.ID, recordBytes)
sc.mService.MillisBehindLatest(sc.shard.ID, float64(*millisBehindLatest))
return nil
}
5 changes: 4 additions & 1 deletion clientlibrary/worker/fan-out-shard-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func (sc *FanOutShardConsumer) getRecords() error {
continue
}
continuationSequenceNumber = subEvent.Value.ContinuationSequenceNumber
sc.processRecords(getRecordsStartTime, subEvent.Value.Records, subEvent.Value.MillisBehindLatest, recordCheckpointer)
if err := sc.processRecords(getRecordsStartTime, subEvent.Value.Records, subEvent.Value.MillisBehindLatest, recordCheckpointer); err != nil {
log.Errorf("Error processing records from shard %s: %+v", sc.shard.ID, err)
return err
}

// The shard has been closed, so no new records can be read from it
if continuationSequenceNumber == nil {
Expand Down
5 changes: 4 additions & 1 deletion clientlibrary/worker/polling-shard-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ func (sc *PollingShardConsumer) getRecords() error {
// reset the retry count after success
retriedErrors = 0

sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer)
if err := sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer); err != nil {
log.Errorf("Error processing records from shard %s: %+v", sc.shard.ID, err)
return err
}

// The shard has been closed, so no new records can be read from it
if getResp.NextShardIterator == nil {
Expand Down
5 changes: 3 additions & 2 deletions examples/dynamodb-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (p *recordProcessor) Initialize(input *kc.InitializationInput) {
aws.ToString(input.ExtendedSequenceNumber.SequenceNumber))
}

func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error {
if len(input.Records) == 0 {
return
return nil
}

for _, r := range input.Records {
Expand All @@ -79,6 +79,7 @@ func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
if err := input.Checkpointer.Checkpoint(lastSeq); err != nil {
fmt.Printf("[error] checkpoint failed: %v\n", err)
}
return nil
}

func (p *recordProcessor) Shutdown(input *kc.ShutdownInput) {
Expand Down
5 changes: 3 additions & 2 deletions examples/prometheus-metrics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func (p *recordProcessor) Initialize(input *kc.InitializationInput) {
fmt.Printf("[init] ShardId: %s\n", input.ShardId)
}

func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error {
if len(input.Records) == 0 {
return
return nil
}

for _, r := range input.Records {
Expand All @@ -112,6 +112,7 @@ func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
if err := input.Checkpointer.Checkpoint(lastSeq); err != nil {
fmt.Printf("[error] checkpoint failed: %v\n", err)
}
return nil
}

func (p *recordProcessor) Shutdown(input *kc.ShutdownInput) {
Expand Down
5 changes: 3 additions & 2 deletions examples/redis-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (p *recordProcessor) Initialize(input *kc.InitializationInput) {
aws.ToString(input.ExtendedSequenceNumber.SequenceNumber))
}

func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error {
if len(input.Records) == 0 {
return
return nil
}

for _, r := range input.Records {
Expand All @@ -92,6 +92,7 @@ func (p *recordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
if err := input.Checkpointer.Checkpoint(lastSeq); err != nil {
fmt.Printf("[error] checkpoint failed: %v\n", err)
}
return nil
}

func (p *recordProcessor) Shutdown(input *kc.ShutdownInput) {
Expand Down
3 changes: 2 additions & 1 deletion examples/redis-multitenant/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (p *logProcessor) Initialize(input *kc.InitializationInput) {
fmt.Printf("[%s][init] ShardId: %s\n", p.prefix, input.ShardId)
}

func (p *logProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
func (p *logProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error {
for _, r := range input.Records {
fmt.Printf("[%s][record] Key=%s Data=%s\n",
p.prefix,
Expand All @@ -129,6 +129,7 @@ func (p *logProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
lastSeq := input.Records[len(input.Records)-1].SequenceNumber
_ = input.Checkpointer.Checkpoint(lastSeq)
}
return nil
}

func (p *logProcessor) Shutdown(input *kc.ShutdownInput) {
Expand Down
5 changes: 3 additions & 2 deletions test/record_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
dd.count = 0
}

func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) error {
dd.t.Log("Processing Records...")

// don't process empty record
if len(input.Records) == 0 {
return
return nil
}

for _, v := range input.Records {
Expand All @@ -79,6 +79,7 @@ func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
diff := input.CacheExitTime.Sub(*input.CacheEntryTime)
dd.t.Logf("Checkpoint progress at: %v, MillisBehindLatest = %v, KCLProcessTime = %v", lastRecordSequenceNumber, input.MillisBehindLatest, diff)
_ = input.Checkpointer.Checkpoint(lastRecordSequenceNumber)
return nil
}

func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) {
Expand Down