diff --git a/FWCore/Concurrency/interface/SpinLock.h b/FWCore/Concurrency/interface/SpinLock.h new file mode 100644 index 0000000000000..89b662f5a2ad9 --- /dev/null +++ b/FWCore/Concurrency/interface/SpinLock.h @@ -0,0 +1,61 @@ +#ifndef FWCore_Concurrency_SpinLock_h +#define FWCore_Concurrency_SpinLock_h +// -*- C++ -*- +// +// Package: FWCore/Concurrency +// Class : SpinLock +// +/**\class SpinLock SpinLock.h "FWCore/Concurrency/interface/SpinLock.h" + + Description: A trivial lock which continues looping until the lock is acquired + + Usage: + + +*/ +// +// Original Author: Christopher Jones +// Created: Tue, 23 Jun 2026 14:12:13 GMT +// + +// system include files +#include + +// user include files +#include "FWCore/Concurrency/interface/hardware_pause.h" + +// forward declarations + +namespace edm { + class SpinLock { + public: + SpinLock() noexcept = default; + ~SpinLock() noexcept {} + + SpinLock(const SpinLock&) = delete; + SpinLock& operator=(const SpinLock&) = delete; + SpinLock(SpinLock&&) = delete; + SpinLock& operator=(SpinLock&&) = delete; + + // ---------- const member functions --------------------- + + // ---------- static member functions -------------------- + + // ---------- member functions --------------------------- + void lock() noexcept { + //we acquire if the previous value is false + while (locked_.exchange(true, std::memory_order_acq_rel)) { + hardware_pause(); + } + } + void unlock() noexcept { + locked_.store(false, std::memory_order_release); + ; + } + + private: + // ---------- member data -------------------------------- + std::atomic locked_{false}; + }; +} // namespace edm +#endif diff --git a/FWCore/Concurrency/test/test_catch2_spinlock.cc b/FWCore/Concurrency/test/test_catch2_spinlock.cc new file mode 100644 index 0000000000000..864263b12d6fe --- /dev/null +++ b/FWCore/Concurrency/test/test_catch2_spinlock.cc @@ -0,0 +1,56 @@ +#include "FWCore/Concurrency/interface/SpinLock.h" +#include +#include +#include +#include +#include + +TEST_CASE("SpinLock", "[SpinLock]") { + SECTION("construct/destruct") { edm::SpinLock lock; } + SECTION("lock guard") { + edm::SpinLock lock; + std::lock_guard guard{lock}; + } + SECTION("thread") { + edm::SpinLock lock; + + std::atomic wait_count{2}; + std::atomic concurrent_count{0}; + + std::exception_ptr excpt; + std::thread t{[&lock, &wait_count, &concurrent_count, &excpt]() { + --wait_count; + while (0 != wait_count) { + } + try { + std::lock_guard g{lock}; + auto v = ++concurrent_count; + REQUIRE(v == 1); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + REQUIRE(concurrent_count.load() == 1); + auto v2 = --concurrent_count; + REQUIRE(v2 == 0); + } catch (...) { + excpt = std::current_exception(); + } + }}; + { + --wait_count; + while (0 != wait_count) { + } + { + std::lock_guard g{lock}; + auto v = ++concurrent_count; + REQUIRE(v == 1); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + REQUIRE(concurrent_count.load() == 1); + auto v2 = --concurrent_count; + REQUIRE(v2 == 0); + } + } + t.join(); + if (excpt) { + std::rethrow_exception(excpt); + } + } +} diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index c9886db2c8eb1..1bd05a463b3f9 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1605,6 +1605,7 @@ namespace edm { LuminosityBlockAuxiliary const& iLumiAux, WaitingTaskHolder iNextTask) { auto lbp = principalCache_.getAvailableLumiPrincipalPtr(); + assert(lbp); iLumiStatus->setLumiPrincipal(lbp); //A new file may have been opened since the last use of the LuminosityBlock lbp->possiblyUpdateAfterAddition(preg()); @@ -1700,11 +1701,9 @@ namespace edm { EventSetupImpl const& es = status->eventSetupImpl(); using Traits = OccurrenceTraits; - streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable { for (auto i : std::views::iota(0U, preallocations_.numberOfStreams())) { streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable { - //the status increments the number of streams here if successful if (!status->shouldStreamStartLumi()) { return; } @@ -1820,7 +1819,6 @@ namespace edm { ServiceRegistry::Operate operate(serviceToken_); std::exception_ptr ptr; - // Try hard to clean up resources so the // process can terminate in a controlled // fashion even after exceptions have occurred. diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc index 6a6bc5641ed30..91aaed748db32 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.cc @@ -12,6 +12,7 @@ #include "LuminosityBlockProcessingStatus.h" #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h" +#include namespace edm { void LuminosityBlockProcessingStatus::resetResources() { @@ -25,13 +26,52 @@ namespace edm { } bool LuminosityBlockProcessingStatus::shouldStreamStartLumi() { - if (haveStartedNextLumiOrEndedRun()) + //there is no more work to do for this lumi and some other stream + // is already processing this lumi + if (haveStartedNextLumiOrEndedRun() and aStreamStartedLumi_) { return false; - - ++nStreamsProcessingLumi_; + } + { + std::lock_guard g{checkShouldStartStream_}; + auto streamStartedLumi = aStreamStartedLumi_.load(); + if (not streamStartedLumi) { + //at least one stream must process the lumi, and we are 'it' + assert(0 == nStreamsProcessingLumi_); + ++nStreamsProcessingLumi_; + aStreamStartedLumi_.store(true); + return true; + } + //check condition again as it could have changed since acquiring the lock + if (haveStartedNextLumiOrEndedRun()) { + return false; + } + //At this point, nStreamsProcessingLumi_ can't be zero since + // - if this were the first lumi then aStreamStartedLumi_ would have been false + // and we wouldn't have gotten here + // - if we made it to `streamFinishedLumi` and it decremented the value to 0 + // that would happen while that routine held the spin lock and that function + // will not be called until `haveStartedNextLumiOrEndedRun()` is true + // therefore when this routine obtained the spin lock it would have passed + // the test above and not gotten here + assert(0 != nStreamsProcessingLumi_); + ++nStreamsProcessingLumi_; + } return true; } + bool LuminosityBlockProcessingStatus::streamFinishedLumi() { + assert(haveStartedNextLumiOrEndedRun()); + std::lock_guard g{checkShouldStartStream_}; + //this must be done in the spin lock so that the changes to + // aStreamStartedLumi_ and nStreamsProcessingLumi_ happen + // atomically within shouldStreamStartLumi + + if (0 == --nStreamsProcessingLumi_) { + return true; + } + return false; + } + void LuminosityBlockProcessingStatus::setEndTime() { constexpr char kUnset = 0; constexpr char kSetting = 1; diff --git a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h index fb4f360057076..efaafc034c3ef 100644 --- a/FWCore/Framework/src/LuminosityBlockProcessingStatus.h +++ b/FWCore/Framework/src/LuminosityBlockProcessingStatus.h @@ -26,6 +26,7 @@ #include "DataFormats/Provenance/interface/Timestamp.h" #include "FWCore/Concurrency/interface/LimitedTaskQueue.h" #include "FWCore/Concurrency/interface/WaitingTaskList.h" +#include "FWCore/Concurrency/interface/SpinLock.h" #include "FWCore/Framework/interface/IOVSyncValue.h" // forward declarations @@ -67,7 +68,7 @@ namespace edm { void globalEndRunHolderDoneWaiting() { globalEndRunHolder_.doneWaiting(std::exception_ptr{}); } bool shouldStreamStartLumi(); - bool streamFinishedLumi() { return 0 == (--nStreamsProcessingLumi_); } + bool streamFinishedLumi(); //These should only be called while in the InputSource's task queue void updateLastTimestamp(edm::Timestamp const& iTime) { @@ -107,10 +108,12 @@ namespace edm { WaitingTaskList endIOVWaitingTasks_; edm::WaitingTaskHolder globalEndRunHolder_; edm::Timestamp endTime_{}; - std::atomic nStreamsProcessingLumi_{0}; + unsigned int nStreamsProcessingLumi_{0}; //guarded by spinlock std::atomic eventProcessingState_{EventProcessingState::kProcessing}; std::atomic endTimeSetStatus_{0}; + edm::SpinLock checkShouldStartStream_; std::atomic startedNextLumiOrEndedRun_{false}; + std::atomic aStreamStartedLumi_{false}; bool globalBeginSucceeded_{false}; bool cleaningUpAfterException_{false}; }; diff --git a/FWCore/Framework/test/BuildFile.xml b/FWCore/Framework/test/BuildFile.xml index 4a218deeb17be..e96cf0239d941 100644 --- a/FWCore/Framework/test/BuildFile.xml +++ b/FWCore/Framework/test/BuildFile.xml @@ -195,6 +195,13 @@ + + + + + + + @@ -398,3 +405,4 @@ + diff --git a/FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc b/FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc new file mode 100644 index 0000000000000..bb3194b9a3ffd --- /dev/null +++ b/FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc @@ -0,0 +1,56 @@ +// -*- C++ -*- +// +// Package: Subsystem/Package +// Class : ConcurrentLumiAnalyzer +// +// Implementation: +// [Notes on implementation] +// +// Original Author: Christopher Jones +// Created: Thu, 25 Jun 2026 14:53:01 GMT +// + +// system include files +#include +#include + +// user include files +#include "FWCore/Framework/interface/global/EDAnalyzer.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/MakerMacros.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/Utilities/interface/Exception.h" + +namespace edmtest { + class ConcurrentLumiAnalyzer : public edm::global::EDAnalyzer<> { + public: + explicit ConcurrentLumiAnalyzer(edm::ParameterSet const&) {} + + void analyze(edm::StreamID, edm::Event const& iEvent, edm::EventSetup const&) const { + auto lumi = iEvent.luminosityBlock(); + int count = 0; + if (lumi == 1) { + while (keepHolding_) { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1000ms); + ++count; + if (count > 10) { + throw cms::Exception("WaitedTooLong"); + } + } + } + if (lumi == 3) { + keepHolding_ = false; + } + } + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + edm::ParameterSetDescription desc; + descriptions.addDefault(desc); + } + + private: + mutable std::atomic keepHolding_{true}; + }; +} // namespace edmtest + +DEFINE_FWK_MODULE(edmtest::ConcurrentLumiAnalyzer); diff --git a/FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py b/FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py new file mode 100644 index 0000000000000..e6fc5487c082f --- /dev/null +++ b/FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py @@ -0,0 +1,18 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +from FWCore.Modules.modules import EmptySource +process.source = EmptySource(numberEventsInLuminosityBlock=1, numberEventsInRun=3) + +process.maxEvents.input = 4 + +from FWCore.Framework.modules import edmtest_ConcurrentLumiAnalyzer +process.tester = edmtest_ConcurrentLumiAnalyzer() + +process.p = cms.Path(process.tester) + +process.options.numberOfThreads = 2 +process.options.numberOfConcurrentLuminosityBlocks = 2 + +#process.add_(cms.Service("Tracer"))