From eb990940843473080986a353278ff7191ffb04d2 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Wed, 15 Feb 2023 14:30:08 -0800 Subject: [PATCH 1/2] Add requeue At and requeue after to new controller framework --- pkg/controller/v2/controller_test.go | 29 ++++++++++++++++++ pkg/controller/v2/reconciler.go | 44 ++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/pkg/controller/v2/controller_test.go b/pkg/controller/v2/controller_test.go index add865f..69c8b27 100644 --- a/pkg/controller/v2/controller_test.go +++ b/pkg/controller/v2/controller_test.go @@ -55,6 +55,35 @@ func TestRequeue(t *testing.T) { assert.NoError(t, controller.Reconcile("bar")) <-done } +func TestRqueueAfter(t *testing.T) { + var requeued atomic.Bool + done := make(chan struct{}) + controller := NewController[testID](func(ctx context.Context, request Request[testID]) Directive[testID] { + if requeued.CompareAndSwap(false, true) { + return request.Requeue().After(time.Second) + } + close(done) + return request.Ack() + }, WithParallelism(10)) + defer controller.Stop() + assert.NoError(t, controller.Reconcile("bar")) + <-done +} + +func TestRqueueAt(t *testing.T) { + var requeued atomic.Bool + done := make(chan struct{}) + controller := NewController[testID](func(ctx context.Context, request Request[testID]) Directive[testID] { + if requeued.CompareAndSwap(false, true) { + return request.Requeue().At(time.Now().Add(time.Second)) + } + close(done) + return request.Ack() + }, WithParallelism(10)) + defer controller.Stop() + assert.NoError(t, controller.Reconcile("bar")) + <-done +} func TestRetry(t *testing.T) { var retried atomic.Bool diff --git a/pkg/controller/v2/reconciler.go b/pkg/controller/v2/reconciler.go index 787f37b..829d22c 100644 --- a/pkg/controller/v2/reconciler.go +++ b/pkg/controller/v2/reconciler.go @@ -101,6 +101,50 @@ func (r *Requeue[I]) Do(controller *Controller[I]) { go controller.enqueue(r.request) } +// After retries the request after the given delay +func (r *Requeue[I]) After(delay time.Duration) *RequeueAfter[I] { + return &RequeueAfter[I]{ + Requeue: r, + delay: delay, + } +} + +// At retries the request at the given time +func (r *Requeue[I]) At(t time.Time) *RequeueAt[I] { + return &RequeueAt[I]{ + Requeue: r, + t: t, + } +} + +// RequeueAfter requeues a reconciliation request after the given delay +type RequeueAfter[I ID] struct { + *Requeue[I] + delay time.Duration +} + +// Do executes the controller directive +func (r *RequeueAfter[I]) Do(controller *Controller[I]) { + controller.Log.Debugw("Requeueing request", "Request.ID", r.request.ID, "delay", r.delay) + time.AfterFunc(r.delay, func() { + controller.enqueue(r.request) + }) +} + +// RequeueAt requeue a reconciliation request at a specific time +type RequeueAt[I ID] struct { + *Requeue[I] + t time.Time +} + +// Do executes the controller directive +func (r *RequeueAt[I]) Do(controller *Controller[I]) { + controller.Log.Debugw("Requeueing request", "Request.ID", r.request.ID, "Time", r.t) + time.AfterFunc(time.Until(r.t), func() { + controller.enqueue(r.request) + }) +} + // Fail fails a reconciliation request type Fail[I ID] struct { request Request[I] From a7e0615edaf2536cdbd51384f31514e4031829ac Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Wed, 15 Feb 2023 14:35:58 -0800 Subject: [PATCH 2/2] release --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index e56d903..f314d02 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.10.9-dev +0.10.9