From c6c416a5d52e07d1a51b99167ed58356459e7e20 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Mon, 22 Jun 2026 14:25:19 -0500 Subject: [PATCH 1/4] Avoid race condition when conditionally starting Lumi begin stream Problem was the atomic could be incremented after it was already decremented to 0 once. This lead to the endLumiAsync being called twice for the same Lumi. --- FWCore/Framework/src/EventProcessor.cc | 9 ++++++--- FWCore/Framework/src/LuminosityBlockProcessingStatus.cc | 2 -- FWCore/Framework/src/LuminosityBlockProcessingStatus.h | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index c9886db2c8eb1..47640c55d0296 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1605,6 +1605,7 @@ namespace edm { LuminosityBlockAuxiliary const& iLumiAux, WaitingTaskHolder iNextTask) { auto lbp = principalCache_.getAvailableLumiPrincipalPtr(); + assert(lbp); iLumiStatus->setLumiPrincipal(lbp); //A new file may have been opened since the last use of the LuminosityBlock lbp->possiblyUpdateAfterAddition(preg()); @@ -1700,12 +1701,15 @@ namespace edm { EventSetupImpl const& es = status->eventSetupImpl(); using Traits = OccurrenceTraits; - + status->setNStreamsThatMayProcessLumi(preallocations_.numberOfStreams()); streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable { for (auto i : std::views::iota(0U, preallocations_.numberOfStreams())) { streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable { - //the status increments the number of streams here if successful if (!status->shouldStreamStartLumi()) { + if (status->streamFinishedLumi()) { + //we didn't start, but we are the last to decrement so have to start job + globalEndLumiAsync(holder, std::move(status)); + } return; } streamQueues_[i].pause(); @@ -1820,7 +1824,6 @@ namespace edm { ServiceRegistry::Operate operate(serviceToken_); std::exception_ptr ptr; - // Try hard to clean up resources so the // process can terminate in a controlled // fashion even after exceptions have occurred. diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc index 6a6bc5641ed30..ce92f08c72989 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc @@ -27,8 +27,6 @@ namespace edm { bool LuminosityBlockProcessingStatus::shouldStreamStartLumi() { if (haveStartedNextLumiOrEndedRun()) return false; - - ++nStreamsProcessingLumi_; return true; } diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h index fb4f360057076..033796077af56 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h @@ -67,6 +67,7 @@ namespace edm { void globalEndRunHolderDoneWaiting() { globalEndRunHolder_.doneWaiting(std::exception_ptr{}); } bool shouldStreamStartLumi(); + void setNStreamsThatMayProcessLumi(unsigned int iNLumis) { nStreamsProcessingLumi_ = iNLumis; } bool streamFinishedLumi() { return 0 == (--nStreamsProcessingLumi_); } //These should only be called while in the InputSource's task queue From 84da54781e7b9842c24eb5335e8cad5fe651d6ca Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 23 Jun 2026 11:19:22 -0500 Subject: [PATCH 2/4] Added edm::SpinLock --- FWCore/Concurrency/interface/SpinLock.h | 61 +++++++++++++++++++ .../Concurrency/test/test_catch2_spinlock.cc | 56 +++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 FWCore/Concurrency/interface/SpinLock.h create mode 100644 FWCore/Concurrency/test/test_catch2_spinlock.cc diff --git a/FWCore/Concurrency/interface/SpinLock.h b/FWCore/Concurrency/interface/SpinLock.h new file mode 100644 index 0000000000000..89b662f5a2ad9 --- /dev/null +++ b/FWCore/Concurrency/interface/SpinLock.h @@ -0,0 +1,61 @@ +#ifndef FWCore_Concurrency_SpinLock_h +#define FWCore_Concurrency_SpinLock_h +// -*- C++ -*- +// +// Package: FWCore/Concurrency +// Class : SpinLock +// +/**\class SpinLock SpinLock.h "FWCore/Concurrency/interface/SpinLock.h" + + Description: A trivial lock which continues looping until the lock is acquired + + Usage: + + +*/ +// +// Original Author: Christopher Jones +// Created: Tue, 23 Jun 2026 14:12:13 GMT +// + +// system include files +#include + +// user include files +#include "FWCore/Concurrency/interface/hardware_pause.h" + +// forward declarations + +namespace edm { + class SpinLock { + public: + SpinLock() noexcept = default; + ~SpinLock() noexcept {} + + SpinLock(const SpinLock&) = delete; + SpinLock& operator=(const SpinLock&) = delete; + SpinLock(SpinLock&&) = delete; + SpinLock& operator=(SpinLock&&) = delete; + + // ---------- const member functions --------------------- + + // ---------- static member functions -------------------- + + // ---------- member functions --------------------------- + void lock() noexcept { + //we acquire if the previous value is false + while (locked_.exchange(true, std::memory_order_acq_rel)) { + hardware_pause(); + } + } + void unlock() noexcept { + locked_.store(false, std::memory_order_release); + ; + } + + private: + // ---------- member data -------------------------------- + std::atomic locked_{false}; + }; +} // namespace edm +#endif diff --git a/FWCore/Concurrency/test/test_catch2_spinlock.cc b/FWCore/Concurrency/test/test_catch2_spinlock.cc new file mode 100644 index 0000000000000..864263b12d6fe --- /dev/null +++ b/FWCore/Concurrency/test/test_catch2_spinlock.cc @@ -0,0 +1,56 @@ +#include "FWCore/Concurrency/interface/SpinLock.h" +#include +#include +#include +#include +#include + +TEST_CASE("SpinLock", "[SpinLock]") { + SECTION("construct/destruct") { edm::SpinLock lock; } + SECTION("lock guard") { + edm::SpinLock lock; + std::lock_guard guard{lock}; + } + SECTION("thread") { + edm::SpinLock lock; + + std::atomic wait_count{2}; + std::atomic concurrent_count{0}; + + std::exception_ptr excpt; + std::thread t{[&lock, &wait_count, &concurrent_count, &excpt]() { + --wait_count; + while (0 != wait_count) { + } + try { + std::lock_guard g{lock}; + auto v = ++concurrent_count; + REQUIRE(v == 1); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + REQUIRE(concurrent_count.load() == 1); + auto v2 = --concurrent_count; + REQUIRE(v2 == 0); + } catch (...) { + excpt = std::current_exception(); + } + }}; + { + --wait_count; + while (0 != wait_count) { + } + { + std::lock_guard g{lock}; + auto v = ++concurrent_count; + REQUIRE(v == 1); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + REQUIRE(concurrent_count.load() == 1); + auto v2 = --concurrent_count; + REQUIRE(v2 == 0); + } + } + t.join(); + if (excpt) { + std::rethrow_exception(excpt); + } + } +} From 5bf2873fd39125612c0d42e0b2a41fee3a76a5bd Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Tue, 23 Jun 2026 13:01:50 -0500 Subject: [PATCH 3/4] Allow end lumi to proceed even if not all streams started the Lumi - guarantee that at least 1 stream processes a Lumi even if the Lumi has no Events --- FWCore/Framework/src/EventProcessor.cc | 5 --- .../src/LuminosityBlockProcessingStatus.cc | 44 ++++++++++++++++++- .../src/LuminosityBlockProcessingStatus.h | 8 ++-- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 47640c55d0296..1bd05a463b3f9 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1701,15 +1701,10 @@ namespace edm { EventSetupImpl const& es = status->eventSetupImpl(); using Traits = OccurrenceTraits; - status->setNStreamsThatMayProcessLumi(preallocations_.numberOfStreams()); streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable { for (auto i : std::views::iota(0U, preallocations_.numberOfStreams())) { streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable { if (!status->shouldStreamStartLumi()) { - if (status->streamFinishedLumi()) { - //we didn't start, but we are the last to decrement so have to start job - globalEndLumiAsync(holder, std::move(status)); - } return; } streamQueues_[i].pause(); diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc index ce92f08c72989..91aaed748db32 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc @@ -12,6 +12,7 @@ #include "LuminosityBlockProcessingStatus.h" #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" +#include namespace edm { void LuminosityBlockProcessingStatus::resetResources() { @@ -25,11 +26,52 @@ namespace edm { } bool LuminosityBlockProcessingStatus::shouldStreamStartLumi() { - if (haveStartedNextLumiOrEndedRun()) + //there is no more work to do for this lumi and some other stream + // is already processing this lumi + if (haveStartedNextLumiOrEndedRun() and aStreamStartedLumi_) { return false; + } + { + std::lock_guard g{checkShouldStartStream_}; + auto streamStartedLumi = aStreamStartedLumi_.load(); + if (not streamStartedLumi) { + //at least one stream must process the lumi, and we are 'it' + assert(0 == nStreamsProcessingLumi_); + ++nStreamsProcessingLumi_; + aStreamStartedLumi_.store(true); + return true; + } + //check condition again as it could have changed since acquiring the lock + if (haveStartedNextLumiOrEndedRun()) { + return false; + } + //At this point, nStreamsProcessingLumi_ can't be zero since + // - if this were the first lumi then aStreamStartedLumi_ would have been false + // and we wouldn't have gotten here + // - if we made it to `streamFinishedLumi` and it decremented the value to 0 + // that would happen while that routine held the spin lock and that function + // will not be called until `haveStartedNextLumiOrEndedRun()` is true + // therefore when this routine obtained the spin lock it would have passed + // the test above and not gotten here + assert(0 != nStreamsProcessingLumi_); + ++nStreamsProcessingLumi_; + } return true; } + bool LuminosityBlockProcessingStatus::streamFinishedLumi() { + assert(haveStartedNextLumiOrEndedRun()); + std::lock_guard g{checkShouldStartStream_}; + //this must be done in the spin lock so that the changes to + // aStreamStartedLumi_ and nStreamsProcessingLumi_ happen + // atomically within shouldStreamStartLumi + + if (0 == --nStreamsProcessingLumi_) { + return true; + } + return false; + } + void LuminosityBlockProcessingStatus::setEndTime() { constexpr char kUnset = 0; constexpr char kSetting = 1; diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h index 033796077af56..efaafc034c3ef 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h @@ -26,6 +26,7 @@ #include "DataFormats/Provenance/interface/Timestamp.h" #include "FWCore/Concurrency/interface/LimitedTaskQueue.h" #include "FWCore/Concurrency/interface/WaitingTaskList.h" +#include "FWCore/Concurrency/interface/SpinLock.h" #include "FWCore/Framework/interface/IOVSyncValue.h" // forward declarations @@ -67,8 +68,7 @@ namespace edm { void globalEndRunHolderDoneWaiting() { globalEndRunHolder_.doneWaiting(std::exception_ptr{}); } bool shouldStreamStartLumi(); - void setNStreamsThatMayProcessLumi(unsigned int iNLumis) { nStreamsProcessingLumi_ = iNLumis; } - bool streamFinishedLumi() { return 0 == (--nStreamsProcessingLumi_); } + bool streamFinishedLumi(); //These should only be called while in the InputSource's task queue void updateLastTimestamp(edm::Timestamp const& iTime) { @@ -108,10 +108,12 @@ namespace edm { WaitingTaskList endIOVWaitingTasks_; edm::WaitingTaskHolder globalEndRunHolder_; edm::Timestamp endTime_{}; - std::atomic nStreamsProcessingLumi_{0}; + unsigned int nStreamsProcessingLumi_{0}; //guarded by spinlock std::atomic eventProcessingState_{EventProcessingState::kProcessing}; std::atomic endTimeSetStatus_{0}; + edm::SpinLock checkShouldStartStream_; std::atomic startedNextLumiOrEndedRun_{false}; + std::atomic aStreamStartedLumi_{false}; bool globalBeginSucceeded_{false}; bool cleaningUpAfterException_{false}; }; From 20bc03f7de943ff8ab50f9475f2c221ceceef9f8 Mon Sep 17 00:00:00 2001 From: Christopher Jones Date: Thu, 25 Jun 2026 11:20:55 -0500 Subject: [PATCH 4/4] Add test for being able to continue when a Lumi is blocked The test holds an Event from the first Lumi until the Event from the 3rd Lumi is started. This forces the system to be able to skip the streamBeginLumi for the 2nd Lumi. --- FWCore/Framework/test/BuildFile.xml | 8 +++ .../test/stubs/ConcurrentLumiAnalyzer.cc | 56 +++++++++++++++++++ ...concurrent_lumis_with_blocked_event_cfg.py | 18 ++++++ 3 files changed, 82 insertions(+) create mode 100644 FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc create mode 100644 FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py diff --git a/FWCore/Framework/test/BuildFile.xml b/FWCore/Framework/test/BuildFile.xml index 4a218deeb17be..e96cf0239d941 100644 --- a/FWCore/Framework/test/BuildFile.xml +++ b/FWCore/Framework/test/BuildFile.xml @@ -195,6 +195,13 @@ + + + + + + + @@ -398,3 +405,4 @@ + diff --git a/FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc b/FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc new file mode 100644 index 0000000000000..bb3194b9a3ffd --- /dev/null +++ b/FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc @@ -0,0 +1,56 @@ +// -*- C++ -*- +// +// Package: Subsystem/Package +// Class : ConcurrentLumiAnalyzer +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Christopher Jones +// Created: Thu, 25 Jun 2026 14:53:01 GMT +// + +// system include files +#include +#include + +// user include files +#include "FWCore/Framework/interface/global/EDAnalyzer.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/Utilities/interface/Exception.h" + +namespace edmtest { + class ConcurrentLumiAnalyzer : public edm::global::EDAnalyzer<> { + public: + explicit ConcurrentLumiAnalyzer(edm::ParameterSet const&) {} + + void analyze(edm::StreamID, edm::Event const& iEvent, edm::EventSetup const&) const { + auto lumi = iEvent.luminosityBlock(); + int count = 0; + if (lumi == 1) { + while (keepHolding_) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1000ms); + ++count; + if (count > 10) { + throw cms::Exception("WaitedTooLong"); + } + } + } + if (lumi == 3) { + keepHolding_ = false; + } + } + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + descriptions.addDefault(desc); + } + + private: + mutable std::atomic keepHolding_{true}; + }; +} // namespace edmtest + +DEFINE_FWK_MODULE(edmtest::ConcurrentLumiAnalyzer); diff --git a/FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py b/FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py new file mode 100644 index 0000000000000..e6fc5487c082f --- /dev/null +++ b/FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py @@ -0,0 +1,18 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +from FWCore.Modules.modules import EmptySource +process.source = EmptySource(numberEventsInLuminosityBlock=1, numberEventsInRun=3) + +process.maxEvents.input = 4 + +from FWCore.Framework.modules import edmtest_ConcurrentLumiAnalyzer +process.tester = edmtest_ConcurrentLumiAnalyzer() + +process.p = cms.Path(process.tester) + +process.options.numberOfThreads = 2 +process.options.numberOfConcurrentLuminosityBlocks = 2 + +#process.add_(cms.Service("Tracer"))