Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions FWCore/Concurrency/interface/SpinLock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#ifndef FWCore_Concurrency_SpinLock_h
#define FWCore_Concurrency_SpinLock_h
// -*- C++ -*-
//
// Package: FWCore/Concurrency
// Class : SpinLock
//
/**\class SpinLock SpinLock.h "FWCore/Concurrency/interface/SpinLock.h"

Description: A trivial lock which continues looping until the lock is acquired

Usage:
<usage>

*/
//
// Original Author: Christopher Jones
// Created: Tue, 23 Jun 2026 14:12:13 GMT
//

// system include files
#include <atomic>

// user include files
#include "FWCore/Concurrency/interface/hardware_pause.h"

// forward declarations

namespace edm {
class SpinLock {
public:
SpinLock() noexcept = default;
~SpinLock() noexcept {}

SpinLock(const SpinLock&) = delete;
SpinLock& operator=(const SpinLock&) = delete;
SpinLock(SpinLock&&) = delete;
SpinLock& operator=(SpinLock&&) = delete;

// ---------- const member functions ---------------------

// ---------- static member functions --------------------

// ---------- member functions ---------------------------
void lock() noexcept {
//we acquire if the previous value is false
while (locked_.exchange(true, std::memory_order_acq_rel)) {
hardware_pause();
}
}
void unlock() noexcept {
locked_.store(false, std::memory_order_release);
;
}

private:
// ---------- member data --------------------------------
std::atomic<bool> locked_{false};
};
} // namespace edm
#endif
56 changes: 56 additions & 0 deletions FWCore/Concurrency/test/test_catch2_spinlock.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "FWCore/Concurrency/interface/SpinLock.h"
#include <catch2/catch_all.hpp>
#include <mutex>
#include <atomic>
#include <thread>
#include <chrono>

TEST_CASE("SpinLock", "[SpinLock]") {
SECTION("construct/destruct") { edm::SpinLock lock; }
SECTION("lock guard") {
edm::SpinLock lock;
std::lock_guard<edm::SpinLock> guard{lock};
}
SECTION("thread") {
edm::SpinLock lock;

std::atomic<int> wait_count{2};
std::atomic<int> concurrent_count{0};

std::exception_ptr excpt;
std::thread t{[&lock, &wait_count, &concurrent_count, &excpt]() {
--wait_count;
while (0 != wait_count) {
}
try {
std::lock_guard<edm::SpinLock> g{lock};
auto v = ++concurrent_count;
REQUIRE(v == 1);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE(concurrent_count.load() == 1);
auto v2 = --concurrent_count;
REQUIRE(v2 == 0);
} catch (...) {
excpt = std::current_exception();
}
}};
{
--wait_count;
while (0 != wait_count) {
}
{
std::lock_guard<edm::SpinLock> g{lock};
auto v = ++concurrent_count;
REQUIRE(v == 1);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE(concurrent_count.load() == 1);
auto v2 = --concurrent_count;
REQUIRE(v2 == 0);
}
}
t.join();
if (excpt) {
std::rethrow_exception(excpt);
}
}
}
4 changes: 1 addition & 3 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,7 @@ namespace edm {
LuminosityBlockAuxiliary const& iLumiAux,
WaitingTaskHolder iNextTask) {
auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
assert(lbp);
iLumiStatus->setLumiPrincipal(lbp);
//A new file may have been opened since the last use of the LuminosityBlock
lbp->possiblyUpdateAfterAddition(preg());
Expand Down Expand Up @@ -1700,11 +1701,9 @@ namespace edm {

EventSetupImpl const& es = status->eventSetupImpl();
using Traits = OccurrenceTraits<LuminosityBlockPrincipal, TransitionActionStreamBegin>;

streamQueuesInserter_.push(*holder.group(), [this, status, holder, &es]() mutable {
for (auto i : std::views::iota(0U, preallocations_.numberOfStreams())) {
streamQueues_[i].push(*holder.group(), [this, i, status, holder, &es]() mutable {
//the status increments the number of streams here if successful
if (!status->shouldStreamStartLumi()) {
return;
}
Expand Down Expand Up @@ -1820,7 +1819,6 @@ namespace edm {
ServiceRegistry::Operate operate(serviceToken_);

std::exception_ptr ptr;

// Try hard to clean up resources so the
// process can terminate in a controlled
// fashion even after exceptions have occurred.
Expand Down
46 changes: 43 additions & 3 deletions FWCore/Framework/src/LuminosityBlockProcessingStatus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "LuminosityBlockProcessingStatus.h"
#include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
#include <mutex>

namespace edm {
void LuminosityBlockProcessingStatus::resetResources() {
Expand All @@ -25,13 +26,52 @@ namespace edm {
}

bool LuminosityBlockProcessingStatus::shouldStreamStartLumi() {
if (haveStartedNextLumiOrEndedRun())
//there is no more work to do for this lumi and some other stream
// is already processing this lumi
if (haveStartedNextLumiOrEndedRun() and aStreamStartedLumi_) {
return false;

++nStreamsProcessingLumi_;
}
{
std::lock_guard g{checkShouldStartStream_};
auto streamStartedLumi = aStreamStartedLumi_.load();
if (not streamStartedLumi) {
//at least one stream must process the lumi, and we are 'it'
assert(0 == nStreamsProcessingLumi_);
++nStreamsProcessingLumi_;
aStreamStartedLumi_.store(true);
return true;
}
//check condition again as it could have changed since acquiring the lock
if (haveStartedNextLumiOrEndedRun()) {
return false;
}
//At this point, nStreamsProcessingLumi_ can't be zero since
// - if this were the first lumi then aStreamStartedLumi_ would have been false
// and we wouldn't have gotten here
// - if we made it to `streamFinishedLumi` and it decremented the value to 0
// that would happen while that routine held the spin lock and that function
// will not be called until `haveStartedNextLumiOrEndedRun()` is true
// therefore when this routine obtained the spin lock it would have passed
// the test above and not gotten here
assert(0 != nStreamsProcessingLumi_);
++nStreamsProcessingLumi_;
}
return true;
}

bool LuminosityBlockProcessingStatus::streamFinishedLumi() {
assert(haveStartedNextLumiOrEndedRun());
std::lock_guard g{checkShouldStartStream_};
//this must be done in the spin lock so that the changes to
// aStreamStartedLumi_ and nStreamsProcessingLumi_ happen
// atomically within shouldStreamStartLumi

if (0 == --nStreamsProcessingLumi_) {
return true;
}
return false;
}

void LuminosityBlockProcessingStatus::setEndTime() {
constexpr char kUnset = 0;
constexpr char kSetting = 1;
Expand Down
7 changes: 5 additions & 2 deletions FWCore/Framework/src/LuminosityBlockProcessingStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
#include "FWCore/Concurrency/interface/WaitingTaskList.h"
#include "FWCore/Concurrency/interface/SpinLock.h"
#include "FWCore/Framework/interface/IOVSyncValue.h"

// forward declarations
Expand Down Expand Up @@ -67,7 +68,7 @@ namespace edm {
void globalEndRunHolderDoneWaiting() { globalEndRunHolder_.doneWaiting(std::exception_ptr{}); }

bool shouldStreamStartLumi();
bool streamFinishedLumi() { return 0 == (--nStreamsProcessingLumi_); }
bool streamFinishedLumi();

//These should only be called while in the InputSource's task queue
void updateLastTimestamp(edm::Timestamp const& iTime) {
Expand Down Expand Up @@ -107,10 +108,12 @@ namespace edm {
WaitingTaskList endIOVWaitingTasks_;
edm::WaitingTaskHolder globalEndRunHolder_;
edm::Timestamp endTime_{};
std::atomic<unsigned int> nStreamsProcessingLumi_{0};
unsigned int nStreamsProcessingLumi_{0}; //guarded by spinlock
std::atomic<EventProcessingState> eventProcessingState_{EventProcessingState::kProcessing};
std::atomic<char> endTimeSetStatus_{0};
edm::SpinLock checkShouldStartStream_;
std::atomic<bool> startedNextLumiOrEndedRun_{false};
std::atomic<bool> aStreamStartedLumi_{false};
bool globalBeginSucceeded_{false};
bool cleaningUpAfterException_{false};
};
Expand Down
8 changes: 8 additions & 0 deletions FWCore/Framework/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@
<use name="FWCore/Sources"/>
</library>

<library file="stubs/ConcurrentLumiAnalyzer.cc" name="FWCoreFrameworkConcurrentLumiAnalyzer">
<flags EDM_PLUGIN="1"/>
<use name="FWCore/Framework"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/Utilities"/>
</library>


<bin name="TestFWCoreFramework" file="maker2_t.catch2.cc,productregistry.catch2.cc,edproducer_productregistry_callback.cc,event_getrefbeforeput_t.catch2.cc,generichandle_t.catch2.cc,edconsumerbase_t.catch2.cc,checkForModuleDependencyCorrectness_t.catch2.cc">
<use name="DataFormats/Common"/>
Expand Down Expand Up @@ -398,3 +405,4 @@
<test name="testFWCoreFrameworkCmsRunVarparsing" command="run_cmsRun_varparsing.sh"/>
<test name="testFWCoreFrameworkCmsRunArgparse" command="run_cmsRun_argparse.sh"/>
<test name="testFWCoreFrameworkCmsRunArgv" command="run_cmsRun_argv.sh"/>
<test name="testFWCoreFrameworkBlockedLumi" command="cmsRun ${LOCALTOP}/src/FWCore/Framework/test/test_2_concurrent_lumis_with_blocked_event_cfg.py"/>
56 changes: 56 additions & 0 deletions FWCore/Framework/test/stubs/ConcurrentLumiAnalyzer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// -*- C++ -*-
//
// Package: Subsystem/Package
// Class : ConcurrentLumiAnalyzer
//
// Implementation:
// [Notes on implementation]
//
// Original Author: Christopher Jones
// Created: Thu, 25 Jun 2026 14:53:01 GMT
//

// system include files
#include <chrono>
#include <thread>

// user include files
#include "FWCore/Framework/interface/global/EDAnalyzer.h"
#include "FWCore/Framework/interface/Event.h"
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/Utilities/interface/Exception.h"

namespace edmtest {
class ConcurrentLumiAnalyzer : public edm::global::EDAnalyzer<> {
public:
explicit ConcurrentLumiAnalyzer(edm::ParameterSet const&) {}

void analyze(edm::StreamID, edm::Event const& iEvent, edm::EventSetup const&) const {
auto lumi = iEvent.luminosityBlock();
int count = 0;
if (lumi == 1) {
while (keepHolding_) {
using namespace std::chrono_literals;
std::this_thread::sleep_for(1000ms);
++count;
if (count > 10) {
throw cms::Exception("WaitedTooLong");
}
}
}
if (lumi == 3) {
keepHolding_ = false;
}
}
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
descriptions.addDefault(desc);
}

private:
mutable std::atomic<bool> keepHolding_{true};
};
} // namespace edmtest

DEFINE_FWK_MODULE(edmtest::ConcurrentLumiAnalyzer);
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import FWCore.ParameterSet.Config as cms

process = cms.Process("TEST")

from FWCore.Modules.modules import EmptySource
process.source = EmptySource(numberEventsInLuminosityBlock=1, numberEventsInRun=3)

process.maxEvents.input = 4

from FWCore.Framework.modules import edmtest_ConcurrentLumiAnalyzer
process.tester = edmtest_ConcurrentLumiAnalyzer()

process.p = cms.Path(process.tester)

process.options.numberOfThreads = 2
process.options.numberOfConcurrentLuminosityBlocks = 2

#process.add_(cms.Service("Tracer"))