Skip to content

Commit ee8171d

Browse files
committed
MaybeWakeWaiters
1 parent d38beee commit ee8171d

3 files changed

Lines changed: 64 additions & 46 deletions

File tree

src/libraries/System.Private.CoreLib/src/System/Threading/LowLevelLifoSemaphore.cs

Lines changed: 60 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal sealed partial class LowLevelLifoSemaphore
1717
{
1818
// Spinning in the threadpool semaphore is not always useful and benefits vary greatly by scenario.
1919
//
20-
// Example1: An app periodically with rough time span T runs a task and waits for task`s completion.
20+
// Example1: An app periodically with rough time span T runs a task and waits for task's completion.
2121
// The app would benefit if a threadpool worker spins for longer than T as worker would not need to be woken up.
2222
//
2323
// Example2: The new workitems may be produced by non-pool threads and could only arrive if pool threads start blocking.
@@ -146,11 +146,6 @@ private bool WaitSlow(int timeoutMs, short tpThreadCount)
146146
}
147147
}
148148

149-
return WaitNoSpin(timeoutMs, allowFastWake: true);
150-
}
151-
152-
public bool WaitNoSpin(int timeoutMs, bool allowFastWake = false)
153-
{
154149
// Now we will try registering as a waiter and wait.
155150
// If signaled before that, we have to acquire as this can be the last thread that could take that signal.
156151
// The difference with spinning above is that we are not waiting for a signal. We should typically
@@ -172,24 +167,67 @@ public bool WaitNoSpin(int timeoutMs, bool allowFastWake = false)
172167
Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts);
173168
if (countsBeforeUpdate == counts)
174169
{
175-
return counts.SignalCount != 0 || WaitAsWaiter(timeoutMs, allowFastWake);
170+
return counts.SignalCount != 0 || WaitAsWaiter(timeoutMs, allowFastWake: true);
176171
}
177172

178173
Backoff.Exponential(collisionCount++);
179174
}
180175
}
181176

177+
public bool WaitNoSpin(int timeoutMs)
178+
{
179+
Counts counts = _separated._counts.InterlockedIncrementWaiterCount();
180+
181+
// If there were pending signals, we may end in a condition that requires
182+
// waking a waiter.
183+
// Perhaps the current thread will be such waiter, but we should still
184+
// go through wait/wake routine (vs. just claiming the signal) as the caller
185+
// wants to park the thread.
186+
MaybeWakeWaiters(counts);
187+
188+
return WaitAsWaiter(timeoutMs, allowFastWake: false);
189+
}
190+
191+
private void MaybeWakeWaiters(Counts counts)
192+
{
193+
// Check if waiters need to be woken
194+
uint collisionCount = 0;
195+
while (true)
196+
{
197+
// Determine how many waiters to wake.
198+
// The number of wakes should not be more than the signal count, not more than waiter count and discount any pending wakes.
199+
int countOfWaitersToWake = (int)Math.Min(counts.SignalCount, counts.WaiterCount) - counts.CountOfWaitersSignaledToWake;
200+
if (countOfWaitersToWake <= 0)
201+
{
202+
// No waiters to wake. This is the most common case.
203+
break;
204+
}
205+
206+
Counts newCounts = counts;
207+
newCounts.AddCountOfWaitersSignaledToWake((uint)countOfWaitersToWake);
208+
Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts);
209+
if (countsBeforeUpdate == counts)
210+
{
211+
Debug.Assert(_maximumSignalCount - counts.SignalCount >= 1);
212+
if (countOfWaitersToWake > 0)
213+
{
214+
ReleaseCore(countOfWaitersToWake);
215+
}
216+
217+
break;
218+
}
219+
220+
// collision, try again.
221+
Backoff.Exponential(collisionCount++);
222+
223+
counts = _separated._counts;
224+
}
225+
}
226+
182227
private bool WaitAsWaiter(int timeoutMs, bool allowFastWake)
183228
{
184229
Debug.Assert(timeoutMs >= -1);
185230

186-
// If the caller wants that the threads spend time waiting require a 4 usec cooldown
187-
// before reintroducing the thread. The sleep/wake transition typically takes care of that,
188-
// but the blocker has fast wake paths and the underlying OS API may have those as well,
189-
// thus fast wake ups are hard to avoid completely.
190-
// If fast wake happened when parking was desired, we hold up the thread a bit
191-
// before reintroducing.
192-
long cooldown = Stopwatch.Frequency * 4 / 1000000;
193231
while (true)
194232
{
195233
long blockingStart = allowFastWake ? 0 : Stopwatch.GetTimestamp();
@@ -260,36 +298,7 @@ public void Signal()
260298
{
261299
// Increment signal count. This enables one-shot acquire.
262300
Counts counts = _separated._counts.InterlockedIncrementSignalCount();
263-
264-
// Now check if waiters need to be woken
265-
uint collisionCount = 0;
266-
while (true)
267-
{
268-
// Determine how many waiters to wake.
269-
// The number of wakes should not be more than the signal count, not more than waiter count and discount any pending wakes.
270-
int countOfWaitersToWake = (int)Math.Min(counts.SignalCount, counts.WaiterCount) - counts.CountOfWaitersSignaledToWake;
271-
if (countOfWaitersToWake <= 0)
272-
{
273-
// No waiters to wake. This is the most common case.
274-
return;
275-
}
276-
277-
Counts newCounts = counts;
278-
newCounts.AddCountOfWaitersSignaledToWake((uint)countOfWaitersToWake);
279-
Counts countsBeforeUpdate = _separated._counts.InterlockedCompareExchange(newCounts, counts);
280-
if (countsBeforeUpdate == counts)
281-
{
282-
Debug.Assert(_maximumSignalCount - counts.SignalCount >= 1);
283-
if (countOfWaitersToWake > 0)
284-
ReleaseCore(countOfWaitersToWake);
285-
return;
286-
}
287-
288-
// collision, try again.
289-
Backoff.Exponential(collisionCount++);
290-
291-
counts = _separated._counts;
292-
}
301+
MaybeWakeWaiters(counts);
293302
}
294303

295304
private bool Block(int timeoutMs)
@@ -458,6 +467,13 @@ public void InterlockedDecrementWaiterCount()
458467
Debug.Assert(countsAfterUpdate.WaiterCount != ushort.MaxValue); // underflow check
459468
}
460469

470+
public Counts InterlockedIncrementWaiterCount()
471+
{
472+
var countsAfterUpdate = new Counts(Interlocked.Add(ref _data, unchecked((ulong)1) << WaiterCountShift));
473+
Debug.Assert(countsAfterUpdate.WaiterCount != ushort.MaxValue); // overflow check
474+
return countsAfterUpdate;
475+
}
476+
461477
public ushort CountOfWaitersSignaledToWake
462478
{
463479
get => GetUInt16Value(CountOfWaitersSignaledToWakeShift);

src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ private static void WorkerThreadStart()
108108
bool isIdle = false;
109109
while (isIdle ?
110110
semaphore.WaitNoSpin(timeoutMs) :
111-
//semaphore.Wait(timeoutMs, threadPoolInstance._separated.counts.NumExistingThreads) :
112111
semaphore.Wait(timeoutMs, threadPoolInstance._separated.counts.NumExistingThreads))
113112
{
114113
WorkerDoWork(threadPoolInstance, out isIdle);

src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,10 @@ public bool LocalFindAndPop(object obj)
366366
else
367367
{
368368
// Failed, restore head.
369-
m_headIndex = head;
369+
// This write must complete before we return with a missed steal and check if
370+
// there is a pending thread request because the thread that responds
371+
// to the request must see the write to not conclude that the queue is empty.
372+
Interlocked.Exchange(ref m_headIndex, head);
370373
}
371374
}
372375
}

0 commit comments

Comments
 (0)