Skip to content

feat(PubSub): Streaming pull keepalive#15649

Open
robertvoinescu-work wants to merge 1 commit into
googleapis:mainfrom
robertvoinescu-work:feat/keepAlive
Open

feat(PubSub): Streaming pull keepalive#15649
robertvoinescu-work wants to merge 1 commit into
googleapis:mainfrom
robertvoinescu-work:feat/keepAlive

Conversation

@robertvoinescu-work

@robertvoinescu-work robertvoinescu-work commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

b/427319804

@product-auto-label product-auto-label Bot added the api: pubsub Issues related to the Pub/Sub API. label Jun 4, 2026
@robertvoinescu-work robertvoinescu-work changed the title feat(PubSub): Add handling for streams that silently die. feat(PubSub): Streaming pull keepalive Jun 4, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a protocol-compliant keep-alive ping/pong mechanism for the streaming pull client, including a timeout-based stream restart when the server becomes unresponsive. It also refactors stream disposal and updates the test suite to validate these behaviors. The review feedback highlights critical issues with the implementation: first, using a 15-second timeout for MoveNext will cause constant stream restarts during idle periods, so the timeout should be increased to 45 seconds (s_streamPingPeriod + s_streamPongPeriod); second, disposing the CancellationTokenSource while MoveNext is still active can lead to an ObjectDisposedException or race conditions; and third, the StreamTimeoutRestarts test must be updated to wait longer than the new 45-second timeout to pass successfully.

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs Outdated

@amanda-tarafa amanda-tarafa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, generally good. Thanks.

@robertvoinescu-work robertvoinescu-work marked this pull request as ready for review June 5, 2026 16:53
@robertvoinescu-work robertvoinescu-work requested a review from a team as a code owner June 5, 2026 16:53
@amanda-tarafa amanda-tarafa added the allow breaking changes Allows breaking changes to be merged. Use with care! label Jun 5, 2026
@amanda-tarafa

Copy link
Copy Markdown
Contributor

The breaking change check is a false positive, already being tracked in b/520002639. I've added the allow breaking changes label to bypass.

@amanda-tarafa amanda-tarafa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, but I have a few comments.

Also, I think it'd be good to encapsulate all the ping/pong checks and times etc. on the _retryState object, so that we can use it the same or similar to how we use it for exceptions.

@robertvoinescu-work

Copy link
Copy Markdown
Contributor Author

I've added refactored things a bit differently than moving things onto the RetryState, mostly because it didn't feel cohesive to include ping, pongs tracking and handling on RetryState which just handles exceptions. Instead I've created a KeepAliveMonitor to encapsulate the new logic and have this throw a DeadlineExceeded exception if we timeout. The exception will then be handled in the normal logical flow.

@amanda-tarafa amanda-tarafa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to chat about these.

@amanda-tarafa amanda-tarafa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little bit of pseudocode, but see if that seems like a good balance between encapsulation and knowing what's happening just by looking at the handlers.

@robertvoinescu-work robertvoinescu-work force-pushed the feat/keepAlive branch 2 times, most recently from 77d6c51 to abf1ebc Compare June 9, 2026 17:30

@amanda-tarafa amanda-tarafa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great. Still a few changes but nothing major now.

}
}

// Message-stream has messages (or not, depending on moveNextResult)
private void HandlePullMessageData(Task<bool> moveNextTask)
{
if (!moveNextTask.IsCompleted)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to record the failure in _retryState as that's what increments the backoff for restarting the pull. That's why I had the idea to include what you've put in KeepAliveMonitor in RetryState instead because the lack of pong is a effectively a retriable failure.

I think we can still merge those two, and maybe call it RestartMonitor or something. But we can do that in a follow up PR, where we first clean up RetryState and then merge it with KeepAliveMonitor into RestartMonitor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a plan will resolve in a follow-up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to record the failure in _retryState as that's what increments the backoff for restarting the pull.

This part we need to solve on this PR, otherwise we'll be retrying sooner than with the backoff and that might make the problem worse.

What we can do in a follow up is all the refactoring.

_pongReceived = false;
}

internal void RecordPong() => _pongReceived = true;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still want to use Interlock here and on RecordPing. The pong is recorded when the MoveNext completes and that happens in the background.

@robertvoinescu-work robertvoinescu-work Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I'd confused myself after all the refactoring, done for both.

Comment on lines +72 to +73
public bool DisableKeepAlive { get; private set; }
public ServerAction WithDisableKeepAlive() { DisableKeepAlive = true; return this; }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes no sense to have the With to return the same instance. Either you make the type mutable, i.e. keep the DisableKeepAlive set public, or you don't, meaning you return a new instance on this method. But this is just a detour from using properties idioamatically.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer, I've reverted this change entirely based on refactor.

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs Outdated
@robertvoinescu-work robertvoinescu-work force-pushed the feat/keepAlive branch 3 times, most recently from 7739ec0 to d3110ee Compare June 10, 2026 19:17

@amanda-tarafa amanda-tarafa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are good tests, thanks.

A couple of things, but looks great.

}
}

// Message-stream has messages (or not, depending on moveNextResult)
private void HandlePullMessageData(Task<bool> moveNextTask)
{
if (!moveNextTask.IsCompleted)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to record the failure in _retryState as that's what increments the backoff for restarting the pull.

This part we need to solve on this PR, otherwise we'll be retrying sooner than with the backoff and that might make the problem worse.

What we can do in a follow up is all the refactoring.

Comment on lines 463 to 464

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK to change these properties based on the values in a pong? I'm guessing yes, because the next move next with real messages will have the right values, but just double checking, what do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. There is a case where we HandleAckResponse and for that we will have incorrect _exactlyonceDelieveryEnabled field set. Fixed.

/// <summary>
/// For testing only. Disables stream timeout enforcement when no messages are being received.
/// </summary>
internal bool DisableKeepAliveTimeout { get; set; }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this one read-only and pass the value in the constructor. And if my recommendation of configuring the pong period seems good, you'll have to rename it and change the type, of course.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good point, thanks Amanda. I've updated the code.

internal void RecordPing()
{
long nowTicks = _clock.GetCurrentDateTimeUtc().Ticks;
Interlocked.Exchange(ref _lastPingTicks, nowTicks);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you only need interlock for the pong.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, the ping is grabbed synchronously on the main processing thread. Removed the interlocked.

/// <summary>
/// For testing only. Disables stream timeout enforcement when no messages are being received.
/// </summary>
internal bool DisableKeepAliveTimeout { get; set; }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this one read only as well, and receive it on the internal constructor, we can use that one from the tests if we have to.

And can we just configure the pong timeout instead, for tests we set to inifnity (or whatever that translates to in TimeSpan) and we don't have to add ifs on production code. If that works, rename this as PongPeriod or something like that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a nice way to avoid ifs in production code 👍 . I've updated accordingly

return Task.FromResult(SubscriberClient.Reply.Ack);
});
// Wait for more than the timeout.
// With dynamic timeout, the first stream starts at T=0.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What does the word dynamic mean here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to convey that the timeout on the first was at 45 seconds instead of 30 seconds. I just removed it I think it's probably clearer without.

@amanda-tarafa amanda-tarafa left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Make sure to run all integration tests locally before merging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

allow breaking changes Allows breaking changes to be merged. Use with care! api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants