You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
With Pause() from #1504, there's a small gap that can't be avoided: between Subscribe()
returning and the new Pause() call, the consumer has already asked the broker for messages and may have pulled some. So there's no way to start a consumer that is paused from the very first moment. StartPaused closes that gap: the consumer comes up paused and doesn't ask the broker for anything until a user calls Resume().
The consumer starts paused, and the setting is passed down to every internal consumer (including topics discovered later), so it's race-free.
Proposed Go API
One new option, no interface change:
// ConsumerOptions// StartPaused, if true, creates the consumer in the paused state: it sends no flow// permits until Resume() is called. Default false.StartPausedbool
After Subscribe(), Paused() returns true until a user calls Resume().
Pass the option down to the partition consumer and set its paused flag before it starts, so the very first connect sends no permits. [PIP] Add Pause() / Resume() to Consumer #1504 already handles "paused at connect" (it holds the initial grant and flushes it on the first Resume()), so no new mechanism is needed.
Set the paused flag on the top-level consumer at creation, so Paused() is correct and
new children inherit it.
With this in place, regex and partitioned consumers can create new children already paused instead of pausing them right after creation ([PIP] Add Pause() / Resume() to Consumer #1504's current approach, which has a tiny race window). This closes that race and matches Java.
Edge cases to test
StartPaused: true, produce messages, Receive with a timeout → nothing arrives; after Resume() → all messages arrive.
Paused() is true right after Subscribe.
Partitioned, multi-topic, and regex consumers all come up paused, including topics discovered later.
Depends on #1504 (adds
Pause()/Resume()/Paused()). Related: #429.With
Pause()from #1504, there's a small gap that can't be avoided: betweenSubscribe()returning and the new
Pause()call, the consumer has already asked the broker for messages and may have pulled some. So there's no way to start a consumer that is paused from the very first moment.StartPausedcloses that gap: the consumer comes up paused and doesn't ask the broker for anything until a user callsResume().Java has this on the builder:
The consumer starts paused, and the setting is passed down to every internal consumer (including topics discovered later), so it's race-free.
Proposed Go API
One new option, no interface change:
After
Subscribe(),Paused()returns true until a user callsResume().Pause()/Resume()toConsumer#1504 already handles "paused at connect" (it holds the initial grant and flushes it on the first Resume()), so no new mechanism is needed.Paused()is correct andnew children inherit it.
Pause()/Resume()toConsumer#1504's current approach, which has a tiny race window). This closes that race and matches Java.Edge cases to test
StartPaused: true, produce messages,Receivewith a timeout → nothing arrives; afterResume()→ all messages arrive.Paused()is true right afterSubscribe.Are you willing to submit a PR?