13#include "CbmRecoUnpackConfig.tmpl"
21#include <FairRootManager.h>
24#include <RtypesCore.h>
26#include <TStopwatch.h>
48 LOG(debug) <<
"CbmRecoUnpack::~CbmRecoUnpack!";
56 LOG(info) <<
"CbmRecoUnpack::Finish() I do let the unpackers talk first :\n";
70 double dTotalDataSizeIn = 0.0;
71 double dTotalDataSizeOut = 0.0;
73 dTotalDataSizeIn += datait.second.first;
74 dTotalDataSizeOut += datait.second.second;
77 double dUnpRatio = 0 < datait.second.first ? datait.second.second / datait.second.first : 0.0;
78 double dShareIn = 0 < dTotalDataSizeIn ? datait.second.first / dTotalDataSizeIn : 0.0;
79 double dShareOut = 0 < dTotalDataSizeOut ? datait.second.second / dTotalDataSizeOut : 0.0;
83 {{
"dataIn", datait.second.first},
84 {
"dataOut", datait.second.second},
85 {
"unpRatio", dUnpRatio},
86 {
"shareIn", dShareIn},
87 {
"shareOut", dShareOut}},
100 FairRootManager* ioman = FairRootManager::Instance();
103 auto eh = FairRun::Instance()->GetEventHeader();
104 if (eh->IsA() == CbmTsEventHeader::Class())
108 <<
"CbmRecoUnpack::Init() no CbmTsEventHeader was added to the run. Without it, we can not store the UTC of the "
109 "Timeslices correctly. Hence, this causes a fatal. Please add it in the steering macro to the Run.";
112 ioman->Register(
"TimeSlice.",
"DAQ",
fTimeSlice, kTRUE);
194 LOG(info) <<
fTrd2DConfig->GetName() <<
"::Init() ---------------------------------";
238 fhCpuTimePerTs =
new TH1D(
"hCpuTimePerTs",
"CPU Processing time of TS vs TS; Ts; CPU time [ms]", 6000, 0, 6000);
239 fhRealTimePerTs =
new TH1D(
"hRealTimePerTs",
"Real Processing time of TS vs TS; Ts; Real time [ms]", 6000, 0, 6000);
242 new TH1D(
"hCpuTimePerTsHist",
"CPU Histo filling time of TS vs TS; Ts; CPU time [ms]", 6000, 0, 6000);
244 new TH1D(
"hRealTimePerTsHist",
"Real Histo filling time of TS vs TS; Ts; Real time [ms]", 6000, 0, 6000);
247 new TH1D(
"hUnpackingRatioPerTs",
"ratio of tot. unp. digi size to tot. input raw size vs TS; TS; Size Ratio []",
254 LOG(info) <<
"Unpack: Publishing monitoring metrics to time-series DB at " <<
fUriPublishProfMoni <<
" as "
268 if (ifsRaw.is_open()) {
271 while (std::getline(ifsRaw, sLine)) {
285 LOG(error) <<
"No spill start events found in file " <<
sAccEvtTimingFile <<
" => Disabling filtering!";
290 LOG(error) <<
"No spill stop events found in file " <<
sAccEvtTimingFile <<
" => Disabling filtering!";
298 LOG(error) <<
"Accelerator events file " <<
sAccEvtTimingFile <<
" could not be open! => Disabling filtering!";
305 fhDigiSpillStartDist =
new TH1D(
"hDigiSpillStartDist",
"Digi time vs spill start time; Tdigi - Tspill_start [ns]",
306 20000, -100000., 100000);
308 new TH1D(
"hDigiSpillStopist",
"Digi time vs spill stop time; Tdigi - Tspill_stop [ns]", 20000, -100000., 100000);
311 fhDigiSpillStartDist =
new TH1D(
"hDigiSpillStartDist",
"Digi time vs spill start time; Tdigi - Tspill_start [ns]",
313 fhDigiSpillStopDist =
new TH1D(
"hDigiSpillStopDist",
"Digi time vs spill stop time; Tdigi - Tspill_stop [ns]",
317 new TH2D(
"hDigiSpillFilterStart",
"Digi time vs spill start time, per spill; Tdigi - Tspill_start [ns]; Spill",
320 new TH2D(
"hDigiSpillFilterStop",
"Digi time vs spill stop time, per spill; Tdigi - Tspill_stop [ns]; Spill",
332 subsystem,
new TH1D(Form(
"hDigiSpillStartDist%s", name.c_str()),
333 Form(
"Digi time vs spill start time, for %s; Tdigi - Tspill_start [ns]", name.c_str()), 20000,
336 subsystem,
new TH1D(Form(
"hDigiSpillStopDist%s", name.c_str()),
337 Form(
"Digi time vs spill stop time, for %s; Tdigi - Tspill_stop [ns]", name.c_str()), 20000,
342 subsystem,
new TH1D(Form(
"hDigiSpillStartDist%s", name.c_str()),
343 Form(
"Digi time vs spill start time, for %s; Tdigi - Tspill_start [ns]", name.c_str()),
346 subsystem,
new TH1D(Form(
"hDigiSpillStopDist%s", name.c_str()),
347 Form(
"Digi time vs spill stop time, for %s; Tdigi - Tspill_stop [ns]", name.c_str()),
352 new TH2D(Form(
"hDigiSpillStart%s", name.c_str()),
353 Form(
"Digi time vs spill start time, per spill for %s; Tdigi - Tspill_start [ns]; Spill", name.c_str()),
357 new TH2D(Form(
"hDigiSpillStop%s", name.c_str()),
358 Form(
"Digi time vs spill stop time, per spill for %s; Tdigi - Tspill_stop [ns]; Spill", name.c_str()),
368 fNameMap.emplace(std::make_pair(subsystem, std::make_pair(name, 0)));
369 fTimeMap.emplace(std::make_pair(subsystem, std::make_pair(0, 0)));
370 fDataSizeMap.emplace(std::make_pair(subsystem, std::make_pair(0, 0)));
373 fNameMapPerTs.emplace(std::make_pair(subsystem, std::make_pair(name, 0)));
374 fTimeMapPerTs.emplace(std::make_pair(subsystem, std::make_pair(0, 0)));
382 new TH1D(Form(
"hInpRatioPerTs%s", name.c_str()),
383 Form(
"ratio of input data size in total input data size vs TS for %s; TS; Size Ratio []", name.c_str()),
387 new TH1D(Form(
"hOutRatioPerTs%s", name.c_str()),
388 Form(
"ratio of unpacked digi size in total output size vs TS for %s; TS; Size Ratio []", name.c_str()),
392 new TH1D(Form(
"hUnpRatioPerTs%s", name.c_str()),
393 Form(
"ratio of unpacked digi size to raw data size vs TS for %s; TS; O/I Size Ratio []", name.c_str()),
408 hSpeedPerf->SetYTitle(
"Unpacking performance [#mus/Digi]");
412 size_t iunpackerbin = 1;
416 auto timeit =
fTimeMap.find(namepair.first);
417 double cpu = 0 < namepair.second.second ? timeit->second.first / namepair.second.second : 0.0;
418 double wall = 0 < namepair.second.second ? timeit->second.second / namepair.second.second : 0.0;
422 double indata = datait->second.first;
423 double outdata = datait->second.second;
427 std::string label = namepair.second.first;
428 hProducedDigis->GetXaxis()->SetBinLabel(iunpackerbin, label.data());
429 hProducedDigis->SetBinContent(iunpackerbin, namepair.second.second);
432 label = namepair.second.first;
434 hSpeedPerf->GetXaxis()->SetBinLabel(iunpackerbin * 2 - 1, label.data());
435 hSpeedPerf->SetBinContent(iunpackerbin * 2 - 1, cpu);
437 label = namepair.second.first;
439 hSpeedPerf->GetXaxis()->SetBinLabel(iunpackerbin * 2, label.data());
440 hSpeedPerf->SetBinContent(iunpackerbin * 2, wall);
443 label = namepair.second.first;
445 hDataPerf->GetXaxis()->SetBinLabel(iunpackerbin * 2 - 1, label.data());
446 hDataPerf->SetBinContent(iunpackerbin * 2 - 1, indata);
449 label = namepair.second.first;
451 hDataPerf->GetXaxis()->SetBinLabel(iunpackerbin * 2, label.data());
452 hDataPerf->SetBinContent(iunpackerbin * 2, outdata);
460 double dTotalCpuTime =
fTimerTs->CpuTime() * 1000.;
461 double dTotalRealTime =
fTimerTs->RealTime() * 1000.;
472 double dTotalDataSizeIn = 0.0;
473 double dTotalDataSizeOut = 0.0;
475 using sctp = std::chrono::system_clock::time_point;
482 dTotalDataSizeIn += datait.second.first;
483 dTotalDataSizeOut += datait.second.second;
486 double dUnpRatio = 0 < datait.second.first ? datait.second.second / datait.second.first : 0.0;
487 double dShareIn = 0 < dTotalDataSizeIn ? datait.second.first / dTotalDataSizeIn : 0.0;
488 double dShareOut = 0 < dTotalDataSizeOut ? datait.second.second / dTotalDataSizeOut : 0.0;
492 {{
"dataIn", datait.second.first},
493 {
"dataOut", datait.second.second},
494 {
"unpRatio", dUnpRatio},
495 {
"shareIn", dShareIn},
496 {
"shareOut", dShareOut}},
501 datait->second.first = 0.0;
502 datait->second.second = 0.0;
504 dTotalDataSizeIn = 0.0;
505 dTotalDataSizeOut = 0.0;
510 dTotalDataSizeIn += datait.second.first;
511 dTotalDataSizeOut += datait.second.second;
521 if (datait.second.first) {
531 timeit->second.first = 0.0;
532 timeit->second.second = 0.0;
535 datait->second.first = 0.0;
536 datait->second.second = 0.0;
548 TFile* oldFile = gFile;
549 TDirectory* oldDir = gDirectory;
632 const fles::Timeslice& timeslice = *ts;
639 uint64_t nComponents = ts->num_components();
641 LOG(info) <<
"Unpack: TS index " << ts->index() <<
" components " << nComponents;
644 for (uint64_t component = 0; component < nComponents; component++) {
646 auto subsystem =
static_cast<Subsystem>(ts->descriptor(component, 0).sys_id);
649 case Subsystem::MUCH: {
657 case Subsystem::PSD: {
664 case Subsystem::RICH: {
671 case Subsystem::STS: {
678 case Subsystem::TOF: {
685 case Subsystem::TRD: {
692 case Subsystem::TRD2D: {
699 case Subsystem::BMON: {
708 LOG(error) <<
"Unpack: Unknown subsystem " << fles::to_string(subsystem) <<
" for component " << component;
769 fBmonConfig->GetMonitor()->FinalizeTsBmonMicroSpillHistos();
774 fTofConfig->GetMonitor()->FinalizeTsHistos(ts->start_time());
781 uint64_t uTimeDistNs = std::numeric_limits<uint64_t>::max();
783 uint64_t uNewTimeDistNs = 0;
785 uNewTimeDistNs = ts->start_time() -
itSpillStart->GetTime();
788 uNewTimeDistNs =
itSpillStart->GetTime() - ts->start_time();
790 if (uTimeDistNs < uNewTimeDistNs) {
794 uTimeDistNs = uNewTimeDistNs;
799 LOG(info) <<
"Found spill start for ts " << ts->start_time() <<
": " <<
itSpillStart->GetTime() <<
" ("
800 << uTimeDistNs <<
")";
803 uTimeDistNs = std::numeric_limits<uint64_t>::max();
805 uint64_t uNewTimeDistNs = 0;
807 uNewTimeDistNs = ts->start_time() -
itSpillStop->GetTime();
810 uNewTimeDistNs =
itSpillStop->GetTime() - ts->start_time();
812 if (uTimeDistNs < uNewTimeDistNs) {
816 uTimeDistNs = uNewTimeDistNs;
821 LOG(info) <<
"Found spill stop for ts " << ts->start_time() <<
": " <<
itSpillStop->GetTime() <<
" ("
822 << uTimeDistNs <<
")";
827 uint64_t uTimeDistNs = 0;
828 std::string sSignCurr;
830 uTimeDistNs = ts->start_time() -
itSpillStart->GetTime();
834 uTimeDistNs =
itSpillStart->GetTime() - ts->start_time();
838 uint64_t uNextTimeDistNs = 0;
839 std::string sSignCurrNext;
840 if (itSpillStartNext->GetTime() < ts->start_time()) {
841 uNextTimeDistNs = ts->start_time() - itSpillStartNext->GetTime();
845 uNextTimeDistNs = itSpillStartNext->GetTime() - ts->start_time();
848 if (uNextTimeDistNs < uTimeDistNs) {
849 LOG(debug) <<
"New spill start for ts " << ts->start_time() <<
": " <<
itSpillStart->GetTime() <<
" ("
850 << sSignCurr << uTimeDistNs <<
") " << itSpillStartNext->GetTime() <<
" (" << sSignCurrNext
851 << uNextTimeDistNs <<
") ";
856 LOG(debug) <<
"Curr spill start for ts " << ts->start_time() <<
": " <<
itSpillStart->GetTime() <<
" ("
857 << sSignCurr << uTimeDistNs <<
") " << itSpillStartNext->GetTime() <<
" (" << sSignCurrNext
858 << uNextTimeDistNs <<
") ";
863 uTimeDistNs = ts->start_time() -
itSpillStop->GetTime();
867 uTimeDistNs =
itSpillStop->GetTime() - ts->start_time();
872 if (itSpillStopNext->GetTime() < ts->start_time()) {
873 uNextTimeDistNs = ts->start_time() - itSpillStopNext->GetTime();
877 uNextTimeDistNs = itSpillStopNext->GetTime() - ts->start_time();
880 if (uNextTimeDistNs < uTimeDistNs) {
881 LOG(debug) <<
"New spill stop for ts " << ts->start_time() <<
": " <<
itSpillStop->GetTime() <<
" ("
882 << sSignCurr << uTimeDistNs <<
") " << itSpillStopNext->GetTime() <<
" (" << sSignCurrNext
883 << uNextTimeDistNs <<
") ";
887 LOG(debug) <<
"Curr spill stop for ts " << ts->start_time() <<
": " <<
itSpillStop->GetTime() <<
" ("
888 << sSignCurr << uTimeDistNs <<
") " << itSpillStopNext->GetTime() <<
" (" << sSignCurrNext
889 << uNextTimeDistNs <<
") ";
ClassImp(CbmConverterManager)
bool IsExtractionEnd() const
bool IsKickerStart() const
bool IsExtractionStart() const
bool IsExtractionStopSlow() const
Main steering class for unpacking in cbmroot.
virtual Bool_t initParContainers(std::vector< std::pair< std::string, std::shared_ptr< FairParGenericSet > > > *reqparvec)
Initialise the parameter containers requested by the algorithm.
void Finish()
Actions at the end of the run.
std::unique_ptr< cbm::Monitor > fMonitor
ratio of total unpacked size to input size vs TS in run
CbmTimeSlice * fTimeSlice
CbmTimeslice object, mostly redundant with the TsEventHeader, needed by L1 to switch timeslice mode.
double_t dSpillFilterWinStart
Start of time window for digi filtering relative to spill start event.
TH1 * fhDigiSpillStartDist
Monitoring histograms for the spill digi filter.
std::string fUriPublishProfMoni
URI (type:hostname:port:db_name) for optional connection to monitoring DB.
TH2 * fhDigiSpillFilterStop
Digi time relative to spill start.
std::map< Subsystem, TH2 * > fhDigiSpillFilterStartPerDet
Digi time relative to spill stop, per detector.
CbmRecoUnpack()
Constructor.
std::map< Subsystem, TH1 * > fvhUnpRatioPerTs
ratio of system digi size in total output size vs TS in run
bool fPublishProfMoni
Flag if performance profiling data should be published to monitoring DB.
size_t unpack(const Subsystem subsystem, const fles::Timeslice *ts, uint16_t icomp, TConfig config, std::vector< TOptOutA > *optouttargetvecA=nullptr, std::vector< TOptOutB > *optouttargetvecB=nullptr)
Template for the unpacking call of a given algorithm.
double_t dSpillFilterWinStop
End of time window for digi filtering relative to spill start event.
std::string fMoniCurrrentHostname
The application's monitoring object.
void WriteHistograms()
Open special output file and save eventual performance monitoring histograms into it.
std::map< Subsystem, std::pair< double, double > > fDataSizeMap
Map to store the in and out data amount, key = Subsystem.
std::string sAccEvtTimingFile
Full path to a text file containing the raw Accelerator Event timings.
TH1 * fhRealTimePerTsHist
Plotting time per TS.
TH1 * fhCpuTimePerTsHist
Processing time per TS.
std::map< Subsystem, std::pair< double, double > > fDataSizeMapCurrSec
void Reset()
Clear the output vectors as preparation of the next timeslice. Called via FairSource::Reset()
std::map< Subsystem, TH2 * > fhDigiSpillFilterStopPerDet
Digi time relative to spill start, per detector.
std::string fOutfilename
Name of the performance profiling output file.
std::shared_ptr< CbmStsUnpackConfig > fStsConfig
Configuration of the Sts unpacker. Provides the configured algorithm.
bool bOutputFullTimeSorting
Flag to Enable/disable a full time sorting. If off, time sorting happens per link/FLIM source.
std::map< Subsystem, std::pair< std::string, size_t > > fNameMapPerTs
Map to store a name for the unpackers and the processed amount of digis for a single TS,...
std::shared_ptr< CbmBmonUnpackConfig > fBmonConfig
Configuration of the Bmon unpacker. Provides the configured algorithm.
CbmTsEventHeader * fCbmTsEventHeader
Pointer to the Timeslice start time used to write it to the output tree.
std::vector< AccTimingEvent >::iterator itSpillStart
Current spill start Event.
std::map< Subsystem, TH1 * > fvhOutRatioPerTs
ratio of system data in total input size vs TS in run
void initSpillFilterMonitoring(Subsystem subsystem, std::string name)
Init the Spill filter monitoring histograms for a given system.
void InitSpillFilter()
Digi time relative to spill stop, per detector.
std::shared_ptr< CbmTrdUnpackFaspConfig > fTrd2DConfig
Configuration of the Trd unpacker. Provides the configured algorithm.
bool fPubMoniProcTime
Flag if perf data should be published to moni DB using data time (false) or processing time (true).
TH1 * fhDigiSpillStopDist
Digi time relative to spill start.
std::map< Subsystem, std::pair< double, double > > fTimeMap
Map to store the cpu and wall time, key = Subsystem.
fles::Subsystem Subsystem
bool bMonitoringOnly
Flag to Enable/disable the output completely.
std::map< Subsystem, TH1 * > fhDigiSpillStartDistPerDet
Digi time relative to spill stop.
std::map< Subsystem, TH1 * > fvhInpRatioPerTs
Plotting time per TS.
void initPerformanceMaps(Subsystem subsystem, std::string name)
Init the performance profiling maps for a given unpacker.
std::shared_ptr< CbmTofUnpackConfig > fTofConfig
Configuration of the Tof unpacker. Provides the configured algorithm.
~CbmRecoUnpack()
Destructor.
std::map< Subsystem, std::pair< double, double > > fDataSizeMapPerTs
Map to store the in and out data amount for a single TS, key = Subsystem.
void performanceProfiling()
Run the performance profiling based on the fTimeMap and fDataSizeMap members.
uint32_t uSpillIndex
Index of the current spill in current run.
bool fDoDebugPrints
Flag if extended debug output is to be printed or not.
std::vector< AccTimingEvent > vAccEvtsSpillStart
Storage of spill start accelerator events (Either KickerStart or ExtractionStartSlow)
std::map< Subsystem, std::pair< double, double > > fTimeMapPerTs
Map to store the cpu and wall time for a single TS, key = Subsystem.
bool bSpillFilterMoniFindPeak
Flag to Enable/disable monitoring histos with long range for digi filtering relative to spill start e...
TH2 * fhDigiSpillFilterStart
Digi time relative to spill stop.
TH1 * fhRealTimePerTs
Processing time per TS.
void Unpack(std::unique_ptr< fles::Timeslice > ts)
Trigger the unpacking procedure.
TH1 * fhUnpackingRatioPerTs
ratio of selected digi vs TS in run
std::shared_ptr< CbmMuchUnpackConfig > fMuchConfig
Current time slice.
std::shared_ptr< CbmTrdUnpackConfig > fTrd1DConfig
Configuration of the Trd unpacker. Provides the configured algorithm.
bool fDoPerfProfPerTs
Flag if performance profiling per TS should be activated or not.
std::map< Subsystem, TH1 * > fhDigiSpillStopDistPerDet
Digi time relative to spill start, per detector.
void performanceProfilingPerTs()
Run the performance profiling for a single TS based on the fTimeMapPerTs and fDataSizeMapPerTs member...
bool bSpillFilter
Flag to Enable/disable digi filtering relative to spill start event.
std::chrono::system_clock::time_point fMonitorSecCurrentTs
std::shared_ptr< CbmRichUnpackConfig > fRichConfig
Configuration of the Rich unpacker. Provides the configured algorithm.
void RegisterOutputs(FairRootManager *ioman, std::shared_ptr< TConfig > config)
Bool_t Init()
Initialisation.
std::vector< AccTimingEvent >::iterator itSpillStop
Current spill stop Event.
std::map< Subsystem, std::pair< std::string, size_t > > fNameMap
Map to store a name for the unpackers and the processed amount of digis, key = Subsystem.
std::shared_ptr< CbmPsdUnpackConfig > fPsdConfig
Configuration of the Psd unpacker. Provides the configured algorithm.
std::enable_if< std::is_same< TVecobj, std::nullptr_t >::value==true, void >::type timesort(std::vector< TVecobj > *)
Sort a vector timewise vector type has to provide GetTime()
std::enable_if< std::is_same< TVecobj, std::nullptr_t >::value==true, void >::type SpillFilter(uint64_t ulTsStartNs, const Subsystem subsystem, std::vector< TVecobj > *)
Filter a vector timewise relative to spill start. Vector type has to provide GetTime()
bool fDoPerfProf
Flag if performance profiling should be activated or not.
std::vector< AccTimingEvent > vAccEvtsSpillEnd
Storage of spill end accelerator events (ExtractionEnd or ExtractionSlowStop)
Bookkeeping of time-slice content.
std::chrono::time_point< std::chrono::system_clock > sctp