23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h"
25#include "FairParGenericSet.h"
26#include "FairRootFileSink.h"
27#include "FairRootManager.h"
28#include "FairRunOnline.h"
29#include "FairSource.h"
31#include "BoostSerializer.h"
33#include "RootSerializer.h"
43#include <boost/archive/binary_iarchive.hpp>
44#include <boost/serialization/utility.hpp>
53 using std::runtime_error::runtime_error;
65 LOG(info) <<
"Init options for CbmDeviceDigiEventSink.";
103 int noChannel = fChannels.size();
104 LOG(info) <<
"Number of defined channels: " << noChannel;
105 for (
auto const& entry : fChannels) {
106 LOG(info) <<
"Channel name: " << entry.first;
119 throw InitTaskError(
"Failed creating the TS meta data TClonesarray ");
127 fpRun =
new FairRunOnline();
136 pSink->GetRootFile()->SetCompressionLevel(0);
160 fvDigiSts =
new std::vector<CbmStsDigi>();
162 fvDigiTrd =
new std::vector<CbmTrdDigi>();
163 fvDigiTof =
new std::vector<CbmTofDigi>();
165 fvDigiPsd =
new std::vector<CbmPsdDigi>();
178 LOG(info) <<
"Initialized outTree with rootMgr at " <<
fpFairRootMgr;
188 LOG(error) << e.what();
196 std::size_t pos1 = channelName.find(entry);
197 if (pos1 != std::string::npos) {
198 const vector<std::string>::const_iterator
pos =
201 LOG(info) <<
"Found " << entry <<
" in " << channelName;
202 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
206 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
207 LOG(error) <<
"Stop device.";
220 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
224 "Evo. of the full TS buffer size; Time in run [s]; Size []",
227 "Evo. of the missed TS buffer size; Time in run [s]; Size []",
230 "Processed full TS; Time in run [s]; # []",
233 "Processed missing TS; Time in run [s]; # []",
236 "Total processed TS; Time in run [s]; # []",
239 "Processed events; Time in run [s]; # []",
246 vHistos.push_back(std::pair<TNamed*, std::string>(
fhFullTsProcEvo, sFolder));
247 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissTsProcEvo, sFolder));
248 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTotalTsProcEvo, sFolder));
249 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTotalEventsEvo, sFolder));
253 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
295 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
300 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
303 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
309 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
312 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
315 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
319 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
332 if (bResetStartTime) {
334 fStartTime = std::chrono::system_clock::now();
343 std::vector<uint64_t> vIndices;
344 std::string msgStrMissTs(
static_cast<char*
>(msg->GetData()), msg->GetSize());
345 std::istringstream issMissTs(msgStrMissTs);
346 boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
347 inputArchiveMissTs >> vIndices;
364 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with " << parts.Size() <<
" parts"
365 <<
", size0: " << parts.At(0)->GetSize();
377 LOG(debug) <<
"TS direct to dump";
387 LOG(debug) <<
"TS direct to storage";
392 LOG(debug) <<
"TS metadata checked";
404 LOG(info) <<
"CbmDeviceDigiEventSink::HandleData => "
406 <<
" after accounting for the ones reported as missing by the source (" <<
fvulMissedTsIndices.size()
415 LOG(debug) <<
"TS queues checked";
420 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
424 std::chrono::duration<double_t> elapsedSecondsFill = currentTime -
fLastFillTime;
425 if (1.0 < elapsedSecondsFill.count()) {
426 std::chrono::duration<double_t> secInRun = currentTime -
fStartTime;
455 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
472 <<
" missed/empty ones)";
483 std::string sCommand;
484 std::string msgStrCmd(
static_cast<char*
>(msg->GetData()), msg->GetSize());
485 std::istringstream issCmd(msgStrCmd);
486 boost::archive::binary_iarchive inputArchiveCmd(issCmd);
487 inputArchiveCmd >> sCommand;
489 std::string sCmdTag = sCommand;
490 size_t charPosDel = sCommand.find(
' ');
491 if (std::string::npos != charPosDel) {
492 sCmdTag = sCommand.substr(0, charPosDel);
495 if (
"EOF" == sCmdTag) {
499 if (std::string::npos == charPosDel) {
500 LOG(fatal) <<
"CbmDeviceDigiEventSink::HandleCommand => "
501 <<
"Incomplete EOF command received: " << sCommand;
506 std::string sNext = sCommand.substr(charPosDel);
507 charPosDel = sNext.find(
' ');
509 if (std::string::npos == charPosDel) {
510 LOG(fatal) <<
"CbmDeviceDigiEventSink::HandleCommand => "
511 <<
"Incomplete EOF command received: " << sCommand;
519 LOG(info) <<
"CbmDeviceDigiEventSink::HandleCommand => "
523 LOG(info) <<
"CbmDeviceDigiEventSink::HandleCommand => "
528 else if (
"STOP" == sCmdTag) {
534 LOG(warning) <<
"Unknown command received: " << sCmdTag <<
" => will be ignored!";
542 bool bHoleFoundInBothQueues =
false;
544 std::map<uint64_t, CbmEventTimeslice>::iterator itFullTs =
fmFullTsStorage.begin();
547 while (!bHoleFoundInBothQueues) {
564 LOG(debug) <<
"CbmDeviceDigiEventSink::CheckTsQueues => Full TS " << (*itFullTs).first <<
" VS "
589 LOG(debug) <<
"CbmDeviceDigiEventSink::CheckTsQueues => Empty TS " << (*itMissTs) <<
" VS "
593 bHoleFoundInBothQueues =
true;
596 LOG(debug) <<
"CbmDeviceDigiEventSink::CheckTsQueues => buffered TS " <<
fmFullTsStorage.size()
599 LOG(debug) <<
"CbmDeviceDigiEventSink::CheckTsQueues => buffered TS index " << (*it).first;
608 LOG(info) <<
"CbmDeviceDigiEventSink::CheckTsQueues => "
660 if (source) { source->FillEventHeader(
fEvtHeader); }
688 FairMQMessagePtr messageHeader(NewMessage());
690 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
692 FairMQParts partsOut;
693 partsOut.AddPart(std::move(messageHeader));
697 FairMQMessagePtr messageHist(NewMessage());
699 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
701 partsOut.AddPart(std::move(messageHist));
707 FairMQMessagePtr messageHist(NewMessage());
708 partsOut.AddPart(std::move(messageHist));
713 FairMQMessagePtr messageCan(NewMessage());
715 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
717 partsOut.AddPart(std::move(messageCan));
723 FairMQMessagePtr messageHist(NewMessage());
724 partsOut.AddPart(std::move(messageHist));
728 FairMQMessagePtr msgHistos(NewMessage());
729 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
731 partsOut.AddPart(std::move(msgHistos));
735 LOG(error) <<
"CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";
748 FairMQMessagePtr message(NewMessage());
749 RootSerializer().Serialize(*message, &
fArrayHisto);
753 LOG(error) <<
"Problem sending data";
792 LOG(info) <<
"Performing clean close of the file";
798 LOG(info) <<
"Still buffered TS " <<
fmFullTsStorage.size() <<
" and still buffered empties "
801 if (fair::mq::State::Running == GetCurrentState()) {
803 ChangeState(fair::mq::Transition::Stop);
804 std::this_thread::sleep_for(std::chrono::milliseconds(3000));
805 ChangeState(fair::mq::Transition::End);
816 uint32_t uPartIdx = 0;
820 if (3 != parts.Size()) {
821 LOG(error) <<
"CbmEventTimeslice::CbmEventTimeslice => Wrong number of parts to deserialize DigiEvents: "
822 << parts.Size() <<
" VS 3!";
823 LOG(fatal) <<
"Probably the wrong value was used for the option DigiEventInput of the Sink or DigiEventOutput of "
824 <<
"the event builder";
828 TObject* tempObjectPointer =
nullptr;
829 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer);
830 if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo(
"CbmTsEventHeader")) {
834 LOG(fatal) <<
"Failed to deserialize the TS header";
839 tempObjectPointer =
nullptr;
840 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer);
842 if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo(
"TimesliceMetaData")) {
846 LOG(fatal) <<
"Failed to deserialize the TS metadata";
851 std::string msgStrEvt(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
852 std::istringstream issEvt(msgStrEvt);
853 boost::archive::binary_iarchive inputArchiveEvt(issEvt);
857 LOG(debug) <<
"Input event array " <<
fvDigiEvents.size();
861 if (10 != parts.Size()) {
862 LOG(error) <<
"CbmEventTimeslice::CbmEventTimeslice => Wrong number of parts to deserialize raw data + events: "
863 << parts.Size() <<
" VS 10!";
864 LOG(fatal) <<
"Probably the wrong value was used for the option DigiEventInput of the Sink or DigiEventOutput of "
865 <<
"the event builder";
869 TObject* tempObjectPointer =
nullptr;
870 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer);
871 if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo(
"CbmTsEventHeader")) {
875 LOG(fatal) <<
"Failed to deserialize the TS header";
880 std::string msgStrBmon(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
881 std::istringstream issBmon(msgStrBmon);
882 boost::archive::binary_iarchive inputArchiveBmon(issBmon);
887 std::string msgStrSts(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
888 std::istringstream issSts(msgStrSts);
889 boost::archive::binary_iarchive inputArchiveSts(issSts);
894 std::string msgStrMuch(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
895 std::istringstream issMuch(msgStrMuch);
896 boost::archive::binary_iarchive inputArchiveMuch(issMuch);
901 std::string msgStrTrd(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
902 std::istringstream issTrd(msgStrTrd);
903 boost::archive::binary_iarchive inputArchiveTrd(issTrd);
908 std::string msgStrTof(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
909 std::istringstream issTof(msgStrTof);
910 boost::archive::binary_iarchive inputArchiveTof(issTof);
915 std::string msgStrRich(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
916 std::istringstream issRich(msgStrRich);
917 boost::archive::binary_iarchive inputArchiveRich(issRich);
922 std::string msgStrPsd(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
923 std::istringstream issPsd(msgStrPsd);
924 boost::archive::binary_iarchive inputArchivePsd(issPsd);
929 tempObjectPointer =
nullptr;
930 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectPointer);
932 if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo(
"TimesliceMetaData")) {
936 LOG(fatal) <<
"Failed to deserialize the TS metadata";
950 std::vector<CbmEvent>* pvOutEvents =
nullptr;
951 RootSerializer().Deserialize(*parts.At(uPartIdx), pvOutEvents);
953 LOG(debug) <<
"Input event array " <<
fvEvents.size();
977 selEvent.
fTime =
event.GetStartTime();
978 selEvent.
fNumber =
event.GetNumber();
1022 if (bExclusiveTrdExtract) {
1023 for (uint32_t uDigiInEvt = 0; uDigiInEvt < uNbDigis; ++uDigiInEvt) {
std::vector< CbmBmonDigi > fDigis
Data vector.
static const char * GetBranchName()
Get the desired name of the branch for this objects in the cbm output tree (static)
virtual ~CbmDeviceDigiEventSink()
std::vector< CbmStsDigi > * fvDigiSts
bool fbBypassConsecutiveTs
If true, store digis vectors with full TS in addition to selected events.
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
std::string fsHistosSuffix
std::vector< CbmRichDigi > * fvDigiRich
uint64_t fulLastFullTsCounter
uint64_t fulLastMissTsCounter
bool fbFillHistos
Switch ON/OFF loop based extraction of TRD digis due to 1D/2D.
std::string fsChannelNameCommands
FairRunOnline * fpRun
Data storage.
bool HandleCommand(FairMQMessagePtr &, int)
std::chrono::system_clock::time_point fLastPublishTime
bool HandleMissTsData(FairMQMessagePtr &, int)
void PrepareTreeEntry(CbmEventTimeslice unpTs)
uint32_t fuPublishFreqTs
Histograms management.
uint64_t fuPrevTsIndex
Parameters management.
FairRootManager * fpFairRootMgr
bool fbReceivedEof
Control Commands reception.
bool fbWriteMissingTs
Switch ON/OFF the bypass of the consecutive TS buffer before writing to file.
std::vector< CbmMuchDigi > * fvDigiMuch
std::chrono::system_clock::time_point fLastFillTime
bool fbInitDone
Switch ON/OFF filling of histograms.
uint64_t fulMissedTsCounter
CbmTsEventHeader * fEvtHeader
bool fbExclusiveTrdExtract
Switch ON/OFF the input of CbmDigiEvents instead of raw data + CbmEvents.
TCanvas * fcEventSinkAllHist
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
double_t fdMinPublishTime
std::map< uint64_t, CbmEventTimeslice > fmFullTsStorage
Buffered TS.
std::string fsChannelNameDataInput
int64_t fiTreeFileMaxSize
Output file/tree management.
TClonesArray * fTimeSliceMetaDataArray
TS MetaData storage.
std::string fsOutputFileName
Keep track of whether the Finish was already called.
uint64_t fulProcessedEvents
bool fbStoreFullTs
Constants.
bool HandleData(FairMQParts &, int)
TProfile * fhMissTsBuffSizeEvo
double_t fdMaxPublishTime
bool IsChannelNameAllowed(std::string channelName)
Internal methods.
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
std::vector< CbmPsdDigi > * fvDigiPsd
TProfile * fhFullTsBuffSizeEvo
bool ResetHistograms(bool bResetStartTime=false)
std::vector< CbmDigiEvent > * fEventsSel
CbmEvents.
std::vector< uint64_t > fvulMissedTsIndices
bool SendHistoConfAndData()
std::vector< CbmBmonDigi > * fvDigiBmon
Full TS Digis storage (optional usage, controlled by fbStoreFullTs!)
std::string fsChannelNameMissedTs
message queues
uint64_t fulLastProcessedEvents
std::string fsChannelNameHistosInput
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
bool fbFinishDone
Keep track of whether the Init was already fully completed.
std::vector< CbmTofDigi > * fvDigiTof
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< CbmTrdDigi > * fvDigiTrd
bool fbDigiEventInput
Switch ON/OFF the ROOT file compression.
std::chrono::system_clock::time_point fStartTime
bool fbDisableCompression
Switch ON/OFF writing of empty TS to file for the missing ones (if no bypass)
CbmPsdDigiData fPsd
PSD data.
CbmTrdDigiData fTrd
TRD data.
CbmTofDigiData fTof
TOF data.
CbmStsDigiData fSts
STS data.
CbmRichDigiData fRich
RICH data.
CbmMuchDigiData fMuch
MUCH data.
CbmBmonDigiData fBmon
Beam monitor data.
Collection of digis from all detector systems within one event.
double fTime
Event trigger time [ns].
CbmDigiData fData
Event data.
uint64_t fNumber
Event identifier.
CbmTsEventHeader fCbmTsEventHeader
TS information in header.
std::vector< CbmRichDigi > fvDigiRich
bool fbDigiEvtInput
Input Type.
std::vector< CbmEvent > fvEvents
Raw events.
CbmEventTimeslice(FairMQParts &parts, bool bDigiEvtInput=false)
TODO: rename to CbmTsWithEvents.
std::vector< CbmDigiEvent > & GetSelectedData(bool bExclusiveTrdExtract=true)
std::vector< CbmTofDigi > fvDigiTof
std::vector< CbmDigiEvent > fvDigiEvents
Digi events.
std::vector< CbmStsDigi > fvDigiSts
std::vector< CbmPsdDigi > fvDigiPsd
void ExtractSelectedData(bool bExclusiveTrdExtract=true)
TimesliceMetaData fTsMetaData
extra Metadata
std::vector< CbmMuchDigi > fvDigiMuch
std::vector< CbmBmonDigi > fvDigiBmon
Raw data.
std::vector< CbmTrdDigi > fvDigiTrd
Class characterising one event by a collection of links (indices) to data objects,...
std::vector< CbmMuchDigi > fDigis
Data vector.
static const char * GetBranchName()
Get the desired name of the branch for this obj in the cbm output tree (static)
std::vector< CbmPsdDigi > fDigis
Data vector.
static const char * GetBranchName()
Get the desired name of the branch for this obj in the cbm output tree (static)
std::vector< CbmRichDigi > fDigis
Data vector.
static const char * GetBranchName()
Get the desired name of the branch for this obj in the cbm output tree (static)
std::vector< CbmStsDigi > fDigis
Data vector.
static const char * GetBranchName()
Get the desired name of the branch for this obj in the cbm output tree (static)
std::vector< CbmTofDigi > fDigis
Data vector.
static const char * GetBranchName()
Get the desired name of the branch for this obj in the cbm output tree (static)
std::vector< CbmTrdDigi > fDigis
Data vector.
static const char * GetBranchName()
Get the desired name of the branch for this obj in the cbm output tree (static)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)