@@ -408,17 +408,23 @@ public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string?
408408 if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . Trace ( this ) ;
409409
410410 WebSocketValidate . ValidateCloseStatus ( closeStatus , statusDescription ) ;
411- return CloseOutputAsyncCore ( closeStatus , statusDescription , cancellationToken ) ;
411+ return CloseOutputAsyncCore ( closeStatus , statusDescription , enterReceiveMutex : true , cancellationToken : cancellationToken ) ;
412412 }
413413
414- private async Task CloseOutputAsyncCore ( WebSocketCloseStatus closeStatus , string ? statusDescription , CancellationToken cancellationToken )
414+ private async Task CloseOutputAsyncCore ( WebSocketCloseStatus closeStatus , string ? statusDescription , bool enterReceiveMutex , CancellationToken cancellationToken )
415415 {
416416 if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . Trace ( this ) ;
417417
418418 ThrowIfInvalidState ( WebSocketStateHelper . ValidCloseOutputStates ) ;
419419
420420 await SendCloseFrameAsync ( closeStatus , statusDescription , cancellationToken ) . ConfigureAwait ( false ) ;
421421
422+ // Polite EOF wait under the receive mutex; avoids racing the receive loop on the stream.
423+ if ( ! _isServer && _receivedCloseFrame )
424+ {
425+ await WaitForServerToCloseConnectionAsync ( enterReceiveMutex , cancellationToken ) . ConfigureAwait ( false ) ;
426+ }
427+
422428 // If we already received a close frame, since we've now also sent one, we're now closed.
423429 lock ( StateUpdateLock )
424430 {
@@ -1119,41 +1125,55 @@ private async ValueTask HandleReceivedCloseAsync(MessageHeader header, Cancellat
11191125
11201126 if ( ! _isServer && _sentCloseFrame )
11211127 {
1122- await WaitForServerToCloseConnectionAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
1128+ await WaitForServerToCloseConnectionAsync ( enterMutex : false , cancellationToken ) . ConfigureAwait ( false ) ;
11231129 }
11241130 }
11251131
1126- /// <summary>Issues a read on the stream to wait for EOF.</summary>
1127- private async ValueTask WaitForServerToCloseConnectionAsync ( CancellationToken cancellationToken )
1132+ /// <summary>Issues a read on the stream to wait for EOF, optionally acquiring <see cref="_receiveMutex"/> first .</summary>
1133+ private async ValueTask WaitForServerToCloseConnectionAsync ( bool enterMutex , CancellationToken cancellationToken )
11281134 {
1129- if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . Trace ( this ) ;
1135+ bool mutexEntered = false ;
1136+ Task ? task = null ;
1137+ try
1138+ {
1139+ if ( enterMutex )
1140+ {
1141+ await _receiveMutex . EnterAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
1142+ mutexEntered = true ;
1143+ if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . MutexEntered ( _receiveMutex ) ;
1144+ }
11301145
1131- // Per RFC 6455 7.1.1, try to let the server close the connection. We give it up to a second.
1132- // We simply issue a read and don't care what we get back; we could validate that we don't get
1133- // additional data, but at this point we're about to close the connection and we're just stalling
1134- // to try to get the server to close first.
1135- ValueTask < int > finalReadTask = _stream . ReadAsync ( _receiveBuffer , cancellationToken ) ;
1146+ Debug . Assert ( _receiveMutex . IsHeld , $ "Expected { nameof ( _receiveMutex ) } to be held") ;
11361147
1137- if ( finalReadTask . IsCompletedSuccessfully )
1138- {
1139- finalReadTask . GetAwaiter ( ) . GetResult ( ) ;
1140- }
1141- else
1142- {
1143- const int WaitForCloseTimeoutMs = 1_000 ; // arbitrary amount of time to give the server (same duration as .NET Framework)
1144- Task task = finalReadTask . AsTask ( ) ;
1148+ if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . Trace ( this ) ;
1149+
1150+ // Per RFC 6455 7.1.1, try to let the server close the connection. We give it up to a second.
1151+ // We simply issue a read and don't care what we get back; we could validate that we don't get
1152+ // additional data, but at this point we're about to close the connection and we're just stalling
1153+ // to try to get the server to close first.
1154+ ValueTask < int > finalReadTask = _stream . ReadAsync ( _receiveBuffer , cancellationToken ) ;
1155+
1156+ const int WaitForCloseTimeoutMs = 1_000 ; // arbitrary amount of time to give the server
1157+ task = finalReadTask . AsTask ( ) ;
11451158
1146- try
1147- {
11481159#pragma warning disable CA2016 // Token was already provided to the ReadAsync
1149- await task . WaitAsync ( TimeSpan . FromMilliseconds ( WaitForCloseTimeoutMs ) ) . ConfigureAwait ( false ) ;
1160+ await task . WaitAsync ( TimeSpan . FromMilliseconds ( WaitForCloseTimeoutMs ) ) . ConfigureAwait ( false ) ;
11501161#pragma warning restore CA2016
1151- }
1152- catch
1162+ }
1163+ catch
1164+ {
1165+ if ( task is not null )
11531166 {
1154- // Eat any resulting exceptions. We were going to close the connection, anyway.
11551167 LogExceptions ( task ) ;
1156- Abort ( ) ;
1168+ }
1169+ Abort ( ) ;
1170+ }
1171+ finally
1172+ {
1173+ if ( mutexEntered )
1174+ {
1175+ _receiveMutex . Exit ( ) ;
1176+ if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . MutexExited ( _receiveMutex ) ;
11571177 }
11581178 }
11591179 }
@@ -1267,12 +1287,14 @@ private static bool IsValidCloseStatus(WebSocketCloseStatus closeStatus)
12671287 private async ValueTask CloseWithReceiveErrorAndThrowAsync (
12681288 WebSocketCloseStatus closeStatus , WebSocketError error , string ? errorMessage = null , Exception ? innerException = null )
12691289 {
1290+ Debug . Assert ( _receiveMutex . IsHeld , $ "Caller should hold the { nameof ( _receiveMutex ) } ") ;
1291+
12701292 if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . Trace ( this , errorMessage ) ;
12711293
1272- // Close the connection if it hasn't already been closed
1294+ // Caller holds _receiveMutex; don't re-enter it for the EOF wait.
12731295 if ( ! _sentCloseFrame )
12741296 {
1275- await CloseOutputAsync ( closeStatus , string . Empty , default ) . ConfigureAwait ( false ) ;
1297+ await CloseOutputAsyncCore ( closeStatus , string . Empty , enterReceiveMutex : false , cancellationToken : default ) . ConfigureAwait ( false ) ;
12761298 }
12771299
12781300 // Dump our receive buffer; we're in a bad state to do any further processing
@@ -1491,6 +1513,12 @@ private async Task CloseAsyncPrivate(WebSocketCloseStatus closeStatus, string? s
14911513 }
14921514 }
14931515
1516+ // Polite EOF wait under the receive mutex; avoids racing the receive loop on the stream.
1517+ if ( ! _isServer && _receivedCloseFrame )
1518+ {
1519+ await WaitForServerToCloseConnectionAsync ( enterMutex : true , cancellationToken ) . ConfigureAwait ( false ) ;
1520+ }
1521+
14941522 // We're closed. Close the connection and update the status.
14951523 lock ( StateUpdateLock )
14961524 {
@@ -1560,11 +1588,6 @@ private async ValueTask SendCloseFrameAsync(WebSocketCloseStatus closeStatus, st
15601588
15611589 if ( NetEventSource . Log . IsEnabled ( ) ) NetEventSource . Trace ( this , $ "State transition from { state } to { _state } ") ;
15621590 }
1563-
1564- if ( ! _isServer && _receivedCloseFrame )
1565- {
1566- await WaitForServerToCloseConnectionAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
1567- }
15681591 }
15691592
15701593 private void ConsumeFromBuffer ( int count )
0 commit comments