From ef32b3ceac8b05e2b721f5e647f821b4163dfb75 Mon Sep 17 00:00:00 2001 From: Andrea Bocci Date: Thu, 25 Jun 2026 16:11:13 +0200 Subject: [PATCH] Extend convertToRaw to support streamer files Extend convertToRaw to support streamer (.dat) files. Update the documentation and unit tests accordingly. --- HLTrigger/Tools/README.md | 34 ++-- HLTrigger/Tools/python/streamerToRaw.py | 134 ++++++++++++ HLTrigger/Tools/scripts/convertToRaw | 246 +++++++++++++++++------ HLTrigger/Tools/test/testConvertToRaw.sh | 75 ++++++- 4 files changed, 406 insertions(+), 83 deletions(-) create mode 100644 HLTrigger/Tools/python/streamerToRaw.py diff --git a/HLTrigger/Tools/README.md b/HLTrigger/Tools/README.md index b240b7b8432c5..d41b5dc395d21 100644 --- a/HLTrigger/Tools/README.md +++ b/HLTrigger/Tools/README.md @@ -1,26 +1,32 @@ # convertToRaw -Convert RAW data stored in one or more EDM .root files into the .raw file used as input by the HLT. -``` -usage: convertToRaw [-h] [-o PATH] [-f EVENTS] [-l EVENTS] [--one-file-per-lumi] FILES [FILES ...] +Convert raw data stored into one or more EDM files format (.root files) or Streamer format (.dat files) into the DAQ +format (.raw files) used as input by the HLT. + +The default behaviour is to process a single luminosity section at a time, in order to support luminosity sections split +across multiple files and a limit on the number of events in each lumisection. +If neither of these features is needed (i.e. if lumisections are not split, and all events should be converted) the `-1` +or `--one-file-per-lumi` can be used to process all data with a single job, speeding up the conversion considerably. -Convert RAW data from .root format to .raw format. +``` +usage: convertToRaw [-h] [-s TAG] [-o PATH] [-f EVENTS] [-l EVENTS] [-r [RUN:LUMI-RUN:LUMI]] [-v] [-1] FILES [FILES ...] positional arguments: - FILES input files in .root format + FILES input files in .root or .dat format -optional arguments: +options: -h, --help show this help message and exit + -s TAG, --source TAG name of the FEDRawDataCollection to be repacked into raw format (default: rawDataCollector) -o PATH, --output PATH - base path to store the output files; subdirectories based on the run number are automatically created (default: ) + base path to store the output files; subdirectories based on the run number are automatically created (default: /home/fwyzard/src/cmssw/HLTrigger/Tools) -f EVENTS, --events_per_file EVENTS - split the output into files with at most EVENTS events (default: 50) + split the output into files with at most EVENTS events (default: 100) -l EVENTS, --events_per_lumi EVENTS - process at most EVENTS events in each lumisection (default: 11650) - --one-file-per-lumi assume that lumisections are not split across files (and disable --events_per_lumi) (default: False) + process at most EVENTS events in each lumisection (default: 11655) + -r [RUN:LUMI-RUN:LUMI], --range [RUN:LUMI-RUN:LUMI] + process only the runs and lumisections in the given range (default: all) + -v, --verbose print additional information while processing the input files (default: False) + -1, --one-file-per-lumi + assume that lumisections are not split across files (and disable --events_per_lumi) (default: False) ``` - -The default behaviour is to process a single luminosity section at a time, in order to support luminosity sections split across multiple files and a limit on the number of events in each lumisection. - -If neither of these features is needed (_i.e._ if lumisections are not split, and all events should be converted) the `--one-file-per-lumi` can be used to process all data with a single job, speeding up the conversion considerably. diff --git a/HLTrigger/Tools/python/streamerToRaw.py b/HLTrigger/Tools/python/streamerToRaw.py new file mode 100644 index 0000000000000..bc68fcf681396 --- /dev/null +++ b/HLTrigger/Tools/python/streamerToRaw.py @@ -0,0 +1,134 @@ +# Convert the RAW data from streamer .dat files into DAQ .raw format +# +# usage: cmsRun $CMSSW_RELEASE_BASE/HLTrigger/Tools/python/streamerToRaw.py \ +# inputFiles=/store/path/file.dat[,/store/path/file.dat,...] \ +# runNumber=NNNNNN \ +# [lumiNumber=NNNN] \ +# [eventsPerFile=50] \ +# [eventsPerLumi=11650] \ +# [rawDataCollection=rawDataCollector] \ +# [outputPath=output_directory] +# +# The output files will appear as output_directory/runNNNNNN/runNNNNNN_lumiNNNN_indexNNNNNN.raw . + +import sys +import os +import FWCore.ParameterSet.Config as cms +import FWCore.ParameterSet.VarParsing as VarParsing + +process = cms.Process("FAKE") + +process.maxEvents = cms.untracked.PSet( + input = cms.untracked.int32(-1) # to be overwritten after parsing the command line options +) + +process.source = cms.Source("NewEventStreamFileReader", + fileNames = cms.untracked.vstring() # to be overwritten after parsing the command line options +) + +from EventFilter.Utilities.EvFDaqDirector_cfi import EvFDaqDirector as _EvFDaqDirector +process.EvFDaqDirector = _EvFDaqDirector.clone( + baseDir = "", # to be overwritten after parsing the command line options + buBaseDir = "", # to be overwritten after parsing the command line options + runNumber = 0 # to be overwritten after parsing the command line options +) + +process.writer = cms.OutputModule("RawStreamFileWriterForBU", + source = cms.InputTag('rawDataCollector'), # to be overwritten after parsing the command line options + numEventsPerFile = cms.uint32(0) # to be overwritten after parsing the command line options +) + +process.endpath = cms.EndPath(process.writer) + +process.load('FWCore.MessageService.MessageLogger_cfi') +process.MessageLogger.cerr.FwkReport.reportEvery = 0 # to be overwritten after parsing the command line options + +# parse command line options +options = VarParsing.VarParsing ('python') +for name in 'filePrepend', 'maxEvents', 'outputFile', 'secondaryOutputFile', 'section', 'tag', 'storePrepend', 'totalSections': + del options._register[name] + del options._beenSet[name] + del options._info[name] + del options._types[name] + if name in options._singletons: + del options._singletons[name] + if name in options._lists: + del options._lists[name] + if name in options._noCommaSplit: + del options._noCommaSplit[name] + if name in options._noDefaultClear: + del options._noDefaultClear[name] + + +options.register('runNumber', + 0, + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.int, + "Run number to use") + +options.register('lumiNumber', + None, + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.int, + "Luminosity section number to use") + +options.register('eventsPerLumi', + 11650, + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.int, + "Number of events in the given luminosity section to process") + +options.register('eventsPerFile', + 50, + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.int, + "Split the output into files with at most this number of events") + +options.register('rawDataCollection', + 'rawDataCollector', + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "FEDRawDataCollection to be repacked into RAW format") + +options.register('outputPath', + os.getcwd(), + VarParsing.VarParsing.multiplicity.singleton, + VarParsing.VarParsing.varType.string, + "Output directory for the FED RAW data files") + +options.parseArguments() + +# check that the option values are valide +if options.runNumber <= 0: + sys.stderr.write('Invalid run number\n') + sys.exit(1) + +if options.lumiNumber is not None and options.lumiNumber <= 0: + sys.stderr.write('Invalid luminosity section number\n') + sys.exit(1) + +if options.eventsPerLumi == 0 or options.eventsPerLumi < -1: + sys.stderr.write('Invalid number of events per luminosity section\n') + sys.exit(1) + +if options.eventsPerFile <= 0: + sys.stderr.write('Invalid number of events per output file\n') + sys.exit(1) + +# configure the job based on the command line options +process.source.fileNames = options.inputFiles +if options.lumiNumber is not None: + # process only one lumisection + process.source.lumisToProcess = cms.untracked.VLuminosityBlockRange('%d:%d' % (options.runNumber, options.lumiNumber)) + process.maxEvents.input = options.eventsPerLumi +process.EvFDaqDirector.runNumber = options.runNumber +process.EvFDaqDirector.baseDir = options.outputPath +process.EvFDaqDirector.buBaseDir = options.outputPath +process.writer.source = options.rawDataCollection +process.writer.numEventsPerFile = options.eventsPerFile +process.MessageLogger.cerr.FwkReport.reportEvery = options.eventsPerFile + +# create the output directory, if it does not exist +outputRunPath = f'{options.outputPath}/run{options.runNumber:06d}' +os.makedirs(outputRunPath, exist_ok=True) +open(f'{outputRunPath}/fu.lock', 'w').close() diff --git a/HLTrigger/Tools/scripts/convertToRaw b/HLTrigger/Tools/scripts/convertToRaw index 32eb9fc4a18a1..7423c724b2167 100755 --- a/HLTrigger/Tools/scripts/convertToRaw +++ b/HLTrigger/Tools/scripts/convertToRaw @@ -76,9 +76,13 @@ events_per_file = 100 events_per_lumi = 11655 output_directory = os.getcwd() -parser = argparse.ArgumentParser(description='Convert RAW data from .root format to .raw format.', formatter_class = argparse.ArgumentDefaultsHelpFormatter) -parser.add_argument('files', type=str, metavar='FILES', nargs='+', help='input files in .root format') -parser.add_argument('-s', '--source', type=str, dest='raw_data_collection', metavar='TAG', default='rawDataCollector', help='name of the FEDRawDataCollection to be repacked into RAW format') +parser = argparse.ArgumentParser( + description='Convert raw data stored into one or more EDM files format (.root files) or Streamer format (.dat files) into the DAQ format (.raw files) used as input by the HLT.', + epilog='''The default behaviour is to process a single luminosity section at a time, in order to support luminosity sections split across multiple files and a limit on the number of events in each lumisection. + If neither of these features is needed (i.e. if lumisections are not split, and all events should be converted) the `-1` or `--one-file-per-lumi` can be used to process all data with a single job, speeding up the conversion considerably.''', + formatter_class = argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument('files', type=str, metavar='FILES', nargs='+', help='input files in .root or .dat format') +parser.add_argument('-s', '--source', type=str, dest='raw_data_collection', metavar='TAG', default='rawDataCollector', help='name of the FEDRawDataCollection to be repacked into raw format') parser.add_argument('-o', '--output', type=str, dest='output_directory', metavar='PATH', default=os.getcwd(), help='base path to store the output files; subdirectories based on the run number are automatically created') parser.add_argument('-f', '--events_per_file', type=int, dest='events_per_file', metavar='EVENTS', default=events_per_file, help='split the output into files with at most EVENTS events') parser.add_argument('-l', '--events_per_lumi', type=int, dest='events_per_lumi', metavar='EVENTS', default=events_per_lumi, help='process at most EVENTS events in each lumisection') @@ -92,7 +96,34 @@ if args.output_directory and args.output_directory.endswith('/'): args.output_directory = args.output_directory[:-1] # read the list of input files from the command line arguments -files = [ 'file:' + f if (not ':' in f and not f.startswith('/store/') and os.path.exists(f)) else f for f in args.files ] +format = '' +for f in args.files: + latest = '' + if f.endswith('.root'): + latest = 'edm' + elif f.endswith('.dat'): + latest = 'streamer' + else: + sys.stderr.write(f'error: \'{f}\' is not a unsupported file type. Only .root files or .dat files can be converted.') + sys.exit(1) + + if format == '': + format = latest + elif format != latest: + sys.stderr.write('error: cannot mix files of different types. Input files must all be of type \'.root\' or \'.dat.') + sys.exit(1) + +files = [] +for f in args.files: + if ':' in f or f.startswith('/store/'): + # PFN or LFN + files.append(f) + else: + # local file + if os.path.exists(f): + files.append('file:' + f) + else: + sys.stderr.write(f'warning: skipping non-existent input file {f}') # extract the list of runs and lumiections in the input files class FileInfo(object): @@ -100,61 +131,148 @@ class FileInfo(object): self.events = 0 self.files = set() -header = re.compile(r'^ +Run +Lumi +# Events$') -empty = re.compile(r'^ *$') content = {} -for f in files: +# TODO can we run these in parallel over the various files ? +if format == 'edm': + header = re.compile(r'^ +Run +Lumi +# Events$') + empty = re.compile(r'^ *$') + + for f in files: + + # run edmFileUtil --eventsInLumis ... + print(f'preprocessing input file {f}') + output = subprocess.run(['edmFileUtil', '--eventsInLumis', f], capture_output=True, text=True) + if args.verbose: + print(output.stdout) + + # handle error conditions + if output.returncode < 0: + sys.stderr.write('error: edmFileUtil was killed by signal %d\n' % -output.returncode) + if not args.verbose: + sys.stderr.write('\n') + sys.stderr.write(output.stderr) + sys.exit(output.returncode) + elif output.returncode > 0: + sys.stderr.write('error: edmFileUtil exited with error code %d\n' % output.returncode) + if not args.verbose: + sys.stderr.write('\n') + sys.stderr.write(output.stderr) + sys.exit(output.returncode) + + # parse the output of edmFileUtil + parsing = False + for line in output.stdout.splitlines(): + if not parsing and header.match(line): + # start parsing + parsing = True + continue - # run edmFileUtil --eventsInLumis ... - print(f'preprocessing input file {f}') - output = subprocess.run(['edmFileUtil', '--eventsInLumis', f], capture_output=True, text=True) - if args.verbose: - print(output.stdout) + if parsing and empty.match(line): + # stop parsing + parsing = False + continue - # handle error conditions - if output.returncode < 0: - sys.stderr.write('error: edmFileUtil was killed by signal %d\n' % -output.returncode) - if not args.verbose: - sys.stderr.write('\n') - sys.stderr.write(output.stderr) - sys.exit(output.returncode) - elif output.returncode > 0: - sys.stderr.write('error: edmFileUtil exited with error code %d\n' % output.returncode) - if not args.verbose: - sys.stderr.write('\n') - sys.stderr.write(output.stderr) - sys.exit(output.returncode) - - # parse the output of edmFileUtil - parsing = False - for line in output.stdout.splitlines(): - if not parsing and header.match(line): - # start parsing - parsing = True - continue - - if parsing and empty.match(line): - # stop parsing - parsing = False - continue - - if parsing: - run, lumi, events = tuple(map(int, line.split())) - if not args.range.is_in_range(run, lumi): - print(f' run {run}, lumisection {lumi} is outside of the given range and will be skipped') + if parsing: + run, lumi, events = tuple(map(int, line.split())) + if not args.range.is_in_range(run, lumi): + print(f' run {run}, lumisection {lumi} is outside of the given range and will be skipped') + continue + if events == 0: + print(f' run {run}, lumisection {lumi} is empty and will be skipped') + continue + print(f' run {run}, lumisection {lumi} with {events} events will be processed') + if not run in content: + content[run] = {} + if not lumi in content[run]: + content[run][lumi] = FileInfo() + content[run][lumi].events += events + content[run][lumi].files.add(f) + print() + +elif format == 'streamer': + header = re.compile(r'^----------dumping first EVENT-----------$') + footer = re.compile(r'^------------END--------------$') + empty = re.compile(r'^ *$') + run_p = re.compile(r'^run=(\d+)') + lumi_p = re.compile(r'^lumi=(\d+)') + event_p = re.compile(r'^and (\d+) events') + + for f in files: + + # run DiagStreamerFile ... + print(f'preprocessing input file {f}') + output = subprocess.run(['DiagStreamerFile', f], capture_output=True, text=True) + if args.verbose: + print(output.stdout) + + # handle error conditions + if output.returncode < 0: + sys.stderr.write('error: DiagStreamerFile was killed by signal %d\n' % -output.returncode) + if not args.verbose: + sys.stderr.write('\n') + sys.stderr.write(output.stderr) + sys.exit(output.returncode) + elif output.returncode > 0: + sys.stderr.write('error: DiagStreamerFile exited with error code %d\n' % output.returncode) + if not args.verbose: + sys.stderr.write('\n') + sys.stderr.write(output.stderr) + sys.exit(output.returncode) + + # parse the output of DiagStreamerFile + parsing = None + for line in output.stdout.splitlines(): + if not parsing and header.match(line): + # start parsing the header + parsing = 'header' + continue + + if not parsing and footer.match(line): + # start parsing the footer + parsing = 'footer' continue - if events == 0: - print(f' run {run}, lumisection {lumi} is empty and will be skipped') + + if parsing and empty.match(line): + # stop parsing after an empty line + parsing = None continue - print(f' run {run}, lumisection {lumi} with {events} events will be processed') - if not run in content: - content[run] = {} - if not lumi in content[run]: - content[run][lumi] = FileInfo() - content[run][lumi].events += events - content[run][lumi].files.add(f) - print() + + if parsing == 'header': + is_run = run_p.match(line) + if is_run: + run = int(is_run[1]) + continue + is_lumi = lumi_p.match(line) + if is_lumi: + lumi = int(is_lumi[1]) + continue + + # TODO can we improve the logic for parsing the footer and make it more robust ? + if parsing == 'footer': + is_event = event_p.match(line) + if is_event: + if not args.range.is_in_range(run, lumi): + print(f' run {run}, lumisection {lumi} is outside of the given range and will be skipped') + continue + events = int(is_event[1]) + if events == 0: + print(f' run {run}, lumisection {lumi} is empty and will be skipped') + continue + print(f' run {run}, lumisection {lumi} with {events} events will be processed') + if not run in content: + content[run] = {} + if not lumi in content[run]: + content[run][lumi] = FileInfo() + content[run][lumi].events += events + content[run][lumi].files.add(f) + break + print() + +else: + sys.stderr.write(f'logic error') + sys.exit(1) + # drop empty lumisections # note: this may no longer be needed, but is left as a cross check @@ -168,19 +286,31 @@ empty_runs = [ run for run in content if not content[run] ] for run in empty_runs: del content[run] + # locate the CMSSW configuration file -config_name = 'HLTrigger/Tools/python/convertToRaw.py' +if format == 'edm': + config_name = 'HLTrigger/Tools/python/convertToRaw.py' +elif format == 'streamer': + config_name = 'HLTrigger/Tools/python/streamerToRaw.py' +else: + sys.stderr.write(f'logic error') + sys.exit(1) + current_area = os.environ['CMSSW_BASE'] release_area = os.environ['CMSSW_RELEASE_BASE'] +full_release = os.environ['CMSSW_FULL_RELEASE_BASE'] -config_py = current_area + '/src/' + config_name -if not os.path.exists(config_py): +if current_area and os.path.exists(current_area + '/src/' + config_name): + config_py = current_area + '/src/' + config_name +elif release_area and os.path.exists(release_area + '/src/' + config_name): config_py = release_area + '/src/' + config_name -if not os.path.exists(config_py): +elif full_release and os.path.exists(full_release + '/src/' + config_name): + config_py = full_release + '/src/' + config_name +else: sys.stderr.write('error: cannot find the configuration file %s\n' % config_name) sys.exit(1) -# convert the input data to FED RAW data format +# convert the input data to FED raw data format converted_files = [] # process each run diff --git a/HLTrigger/Tools/test/testConvertToRaw.sh b/HLTrigger/Tools/test/testConvertToRaw.sh index c8b4884dc9af0..5f272d9a81a68 100755 --- a/HLTrigger/Tools/test/testConvertToRaw.sh +++ b/HLTrigger/Tools/test/testConvertToRaw.sh @@ -14,30 +14,83 @@ check_for_failure() { "${@}" && exit 1 || echo -e "\n ---> Passed test of '${@}'\n\n" } -inputfile=/store/data/Run2024C/EphemeralHLTPhysics0/RAW/v1/000/379/416/00000/e8dd5e3c-216f-4545-acb6-ab86c9161085.root +inputfile="/store/data/Run2024C/EphemeralHLTPhysics0/RAW/v1/000/379/416/00000/e8dd5e3c-216f-4545-acb6-ab86c9161085.root" -echo "========================================" +echo "============================================================" echo "Testing convertToRaw in ${SCRAM_TEST_PATH}." -echo "----------------------------------------" +echo "------------------------------------------------------------" echo -echo "========================================" +echo "============================================================" echo "testing help function " -echo "----------------------------------------" +echo "------------------------------------------------------------" echo convertToRaw --help || die "Failure running convertToRaw --help" $? -echo "========================================" -echo "testing successful conversion" -echo "----------------------------------------" +echo "============================================================" +echo "testing successful conversion of edm files" +echo "------------------------------------------------------------" echo check_for_success convertToRaw -f 1 -l=1 -v $inputfile -echo "========================================" -echo "testing failing conversion" -echo "----------------------------------------" +echo "============================================================" +echo "testing failing conversion of edm files" +echo "------------------------------------------------------------" echo check_for_failure convertToRaw -f 1 -l=-1 -s rawDataRepacker $inputfile + +echo "============================================================" +echo "generating a streamer file" +echo "------------------------------------------------------------" +echo +cat > stream.py << @EOF +import FWCore.ParameterSet.Config as cms + +process = cms.Process( "STREAMER" ) + +process.source = cms.Source("PoolSource", + fileNames = cms.untracked.vstring('$inputfile') +) + +process.maxEvents.input = 100 + +from EventFilter.Utilities.EvFDaqDirector_cfi import EvFDaqDirector as _EvFDaqDirector +process.EvFDaqDirector = _EvFDaqDirector.clone( + baseDir = "$PWD", + buBaseDir = "$PWD", + runNumber = 379416 +) + +process.FastMonitoringService = cms.Service("FastMonitoringService") + +process.hltOutputTest = cms.OutputModule( "GlobalEvFOutputModule", + use_compression = cms.untracked.bool( True ), + compression_algorithm = cms.untracked.string( "ZSTD" ), + compression_level = cms.untracked.int32( 3 ), + outputCommands = cms.untracked.vstring( 'keep *') +) + +process.out = cms.EndPath(process.hltOutputTest) +@EOF + +mkdir run379416 +cmsRun stream.py +cat run379416/run379416_ls0000_streamTest_pid*.ini run379416/run379416_ls0097_streamTest_pid*.dat > run379416_ls0097_streamTest.dat +rm -rf run379416 + +echo "============================================================" +echo "testing successful conversion of streamer files" +echo "------------------------------------------------------------" +echo + +check_for_success convertToRaw -f 1 -l=1 -v run379416_ls0097_streamTest.dat + +echo "============================================================" +echo "testing failing conversion of streamer files" +echo "------------------------------------------------------------" +echo + +check_for_failure convertToRaw -f 1 -l=-1 -s rawDataRepacker run379416_ls0097_streamTest.dat