CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMcbmEventSink.cxx
Go to the documentation of this file.
1/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
13
14
16#include "CbmEvent.h"
17#include "CbmFlesCanvasTools.h"
18#include "CbmMQDefs.h"
19
20#include "TimesliceMetaData.h"
21
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h" // device->fConfig
25#include "FairParGenericSet.h"
26#include "FairRootFileSink.h"
27#include "FairRootManager.h"
28#include "FairRunOnline.h"
29
30#include "BoostSerializer.h"
31
32#include "RootSerializer.h"
33
35#include "TCanvas.h"
36#include "TFile.h"
37#include "TH1.h"
38#include "TList.h"
39#include "TNamed.h"
40
41#include <boost/archive/binary_iarchive.hpp>
42#include <boost/serialization/utility.hpp>
43
45#include <thread> // this_thread::sleep_for
46
47#include <array>
48#include <iomanip>
49#include <stdexcept>
50#include <string>
51struct InitTaskError : std::runtime_error {
52 using std::runtime_error::runtime_error;
53};
54
55using namespace std;
56
57//Bool_t bMcbm2018MonitorTaskBmonResetHistos = kFALSE;
58
60
62try {
64 LOG(info) << "Init options for CbmDeviceMcbmEventSink.";
65
66 fsOutputFileName = fConfig->GetValue<std::string>("OutFileName");
67
68 fsChannelNameDataInput = fConfig->GetValue<std::string>("EvtNameIn");
70
71 fbFillHistos = fConfig->GetValue<bool>("FillHistos");
72 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
73 fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
74 fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
75 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
76 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
77 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
78
81
84
86 // Get the information about created channels from the device
87 // Check if the defined channels from the topology (by name)
88 // are in the list of channels which are possible/allowed
89 // for the device
90 // The idea is to check at initilization if the devices are
91 // properly connected. For the time beeing this is done with a
92 // nameing convention. It is not avoided that someone sends other
93 // data on this channel.
94 //logger::SetLogLevel("INFO");
95 int noChannel = fChannels.size();
96 LOG(info) << "Number of defined channels: " << noChannel;
97 for (auto const& entry : fChannels) {
98 LOG(info) << "Channel name: " << entry.first;
99 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
100 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
101 OnData(entry.first, &CbmDeviceMcbmEventSink::HandleData);
102 } // if( entry.first.find( "ts" )
103 } // for( auto const &entry : fChannels )
104
105 // InitContainers();
106
108 fvDigiBmon = new std::vector<CbmTofDigi>();
109 fvDigiSts = new std::vector<CbmStsDigi>();
110 fvDigiMuch = new std::vector<CbmMuchBeamTimeDigi>();
111 fvDigiTrd = new std::vector<CbmTrdDigi>();
112 fvDigiTof = new std::vector<CbmTofDigi>();
113 fvDigiRich = new std::vector<CbmRichDigi>();
114 fvDigiPsd = new std::vector<CbmPsdDigi>();
115
118 fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 1);
119 if (NULL == fTimeSliceMetaDataArray) {
120 throw InitTaskError("Failed creating the TS meta data TClonesarray ");
121 } // if( NULL == fTimeSliceMetaDataArray )
124 fEventsArray = new TClonesArray("CbmEvent", 500);
125 if (NULL == fEventsArray) {
126 throw InitTaskError("Failed creating the Events TClonesarray ");
127 } // if( NULL == fEventsArray )
128
130 if ("" != fsOutputFileName) {
131 fpRun = new FairRunOnline();
132 fpFairRootMgr = FairRootManager::Instance();
133 fpFairRootMgr->SetSink(new FairRootFileSink(fsOutputFileName));
134 if (nullptr == fpFairRootMgr->GetOutFile()) {
135 throw InitTaskError("Could not open root file");
136 } // if( nullptr == fpFairRootMgr->GetOutFile() )
137 } // if( "" != fsOutputFileName )
138 else {
139 throw InitTaskError("Empty output filename!");
140 } // else of if( "" != fsOutputFileName )
141
142 LOG(info) << "Init Root Output to " << fsOutputFileName;
143
144 fpFairRootMgr->InitSink();
145 // fEvtHeader = new FairEventHeader();
146 // fEvtHeader->SetRunId(iRunId);
147 // rootMgr->Register("EventHeader.", "Event", fEvtHeader, kTRUE);
148 // rootMgr->FillEventHeader(fEvtHeader);
149
152 fpFairRootMgr->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, kTRUE);
154 fpFairRootMgr->RegisterAny("BmonDigi", fvDigiBmon, kTRUE);
155 fpFairRootMgr->RegisterAny("StsDigi", fvDigiSts, kTRUE);
156 fpFairRootMgr->RegisterAny("MuchBeamTimeDigi", fvDigiMuch, kTRUE);
157 fpFairRootMgr->RegisterAny("TrdDigi", fvDigiTrd, kTRUE);
158 fpFairRootMgr->RegisterAny("TofDigi", fvDigiTof, kTRUE);
159 fpFairRootMgr->RegisterAny("RichDigi", fvDigiRich, kTRUE);
160 fpFairRootMgr->RegisterAny("PsdDigi", fvDigiPsd, kTRUE);
162 fpFairRootMgr->Register("CbmEvent", "Cbm Event", fEventsArray, kTRUE);
163 /*
164 TTree* outTree =new TTree(FairRootManager::GetTreeName(), "/cbmout", 99);
165 LOG(info) << "define Tree " << outTree->GetName();
166
167 fpFairRootMgr->GetSink()->SetOutTree(outTree);
168*/
169 fpFairRootMgr->WriteFolder();
170
171 LOG(info) << "Initialized outTree with rootMgr at " << fpFairRootMgr;
172
174 if (kTRUE == fbFillHistos) {
175 /*
177 std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector();
179 std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector();
180
185 for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
186 {
187// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
188// << " in " << vHistos[ uHisto ].second.data()
189// ;
190 fArrayHisto.Add( vHistos[ uHisto ].first );
191 std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
192 vHistos[ uHisto ].second );
193 fvpsHistosFolder.push_back( psHistoConfig );
194
196 FairMQMessagePtr messageHist( NewMessage() );
197// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
198 BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageHist, psHistoConfig );
199
201 if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
202 {
203 throw InitTaskError( "Problem sending histo config" );
204 } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
205
206 LOG(info) << "Config of hist " << psHistoConfig.first.data()
207 << " in folder " << psHistoConfig.second.data() ;
208 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
209
213 for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
214 {
215// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
216// << " in " << vCanvases[ uCanv ].second.data();
217 std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
218 std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
219
220 std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
221
222 fvpsCanvasConfig.push_back( psCanvConfig );
223
225 FairMQMessagePtr messageCan( NewMessage() );
226// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
227 BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageCan, psCanvConfig );
228
230 if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
231 {
232 throw InitTaskError( "Problem sending canvas config" );
233 } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
234
235 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
236 << " is " << psCanvConfig.second.data() ;
237 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
238*/
239 } // if( kTRUE == fbFillHistos )
240}
241catch (InitTaskError& e) {
242 LOG(error) << e.what();
243 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
245}
246
248{
249 for (auto const& entry : fsAllowedChannels) {
250 std::size_t pos1 = channelName.find(entry);
251 if (pos1 != std::string::npos) {
252 const vector<std::string>::const_iterator pos =
253 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
254 const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
255 LOG(info) << "Found " << entry << " in " << channelName;
256 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
257 return true;
258 } // if (pos1!=std::string::npos)
259 } // for(auto const &entry : fsAllowedChannels)
260 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
261 LOG(error) << "Stop device.";
262 return false;
263}
264/*
265Bool_t CbmDeviceMcbmEventSink::InitContainers()
266{
267 LOG(info) << "Init parameter containers for CbmDeviceMcbmEventSink.";
268
269 if( kFALSE == InitParameters( fpAlgo ->GetParList() ) )
270 return kFALSE;
271
273 fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
274
275 Bool_t initOK = fpAlgo->InitContainers();
276
277// Bool_t initOK = fMonitorAlgo->ReInitContainers();
278
279 return initOK;
280}
281
282Bool_t CbmDeviceMcbmEventSink::InitParameters( TList* fParCList )
283{
284 for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
285 {
286 FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
287 fParCList->Remove( tempObj );
288 std::string paramName{ tempObj->GetName() };
289 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
290 // Should only be used for small data because of the cost of an additional copy
291
292 // Her must come the proper Runid
293 std::string message = paramName + ",111";
294 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
295
296 FairMQMessagePtr req( NewSimpleMessage(message) );
297 FairMQMessagePtr rep( NewMessage() );
298
299 FairParGenericSet* newObj = nullptr;
300
301 if( Send(req, "parameters") > 0 )
302 {
303 if( Receive( rep, "parameters" ) >= 0)
304 {
305 if( 0 != rep->GetSize() )
306 {
307 CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() );
308 newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
309 LOG( info ) << "Received unpack parameter from the server:";
310 newObj->print();
311 } // if( 0 != rep->GetSize() )
312 else
313 {
314 LOG( error ) << "Received empty reply. Parameter not available";
315 return kFALSE;
316 } // else of if( 0 != rep->GetSize() )
317 } // if( Receive( rep, "parameters" ) >= 0)
318 } // if( Send(req, "parameters") > 0 )
319 fParCList->AddAt( newObj, iparC );
320 delete tempObj;
321 } // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
322
323 return kTRUE;
324}
325*/
326//--------------------------------------------------------------------//
327// handler is called whenever a message arrives on fsChannelNameMissedTs, with a reference to the message and a sub-channel index (here 0)
328bool CbmDeviceMcbmEventSink::HandleMissTsData(FairMQMessagePtr& msg, int /*index*/)
329{
330 std::vector<uint64_t> vIndices;
331 std::string msgStrMissTs(static_cast<char*>(msg->GetData()), msg->GetSize());
332 std::istringstream issMissTs(msgStrMissTs);
333 boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
334 inputArchiveMissTs >> vIndices;
335
336 fvulMissedTsIndices.insert(fvulMissedTsIndices.end(), vIndices.begin(), vIndices.end());
337
340
341 return true;
342}
343//--------------------------------------------------------------------//
344// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
345bool CbmDeviceMcbmEventSink::HandleData(FairMQParts& parts, int /*index*/)
346{
348 LOG(debug) << "Received message number " << fulNumMessages << " with " << parts.Size() << " parts"
349 << ", size0: " << parts.At(0)->GetSize();
350
351 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
352
354 uint32_t uPartIdx = 0;
357 /*
358 std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
359 ( parts.At( uPartIdx ) )->GetSize() );
360 std::istringstream issTsMeta(msgStrTsMeta);
361 boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
362 inputArchiveTsMeta >> (*fTsMetaData);
363 ++uPartIdx;
364*/
365 // Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
366 RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData);
367 LOG(debug) << "TS metadata extracted";
368
371 || (0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex())) {
372 LOG(debug) << "TS direct to dump";
374 PrepareTreeEntry(parts);
379 fulTsCounter++;
380 } // if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter ) )
381 else {
382 LOG(debug) << "TS direct to storage";
384 fmFullTsStorage.emplace_hint(fmFullTsStorage.end(), std::pair<uint64_t, CbmUnpackedTimeslice>(
386 } // else of if( fuPrevTsIndex + 1 == fTsMetaData->GetIndex() || ( 0 == fuPrevTsIndex && 0 == fulTsCounter && 0 == fTsMetaData->GetIndex() )
387 LOG(debug) << "TS metadata checked";
388
390 // delete fTsMetaData;
391
394 LOG(debug) << "TS queues checked";
395
397 if (kTRUE == fbFillHistos) {
401 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
402 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
403 if ((fdMaxPublishTime < elapsedSeconds.count())
404 || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
406 fLastPublishTime = std::chrono::system_clock::now();
407 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
408 } // if( kTRUE == fbFillHistos )
409
410 return true;
411}
412//--------------------------------------------------------------------//
413bool CbmDeviceMcbmEventSink::HandleCommand(FairMQMessagePtr& msg, int /*index*/)
414{
415 /*
416 std::string sCommand( static_cast< char * >( msg->GetData() ),
417 msg->GetSize() );
418*/
419 std::string sCommand;
420 std::string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
421 std::istringstream issCmd(msgStrCmd);
422 boost::archive::binary_iarchive inputArchiveCmd(issCmd);
423 inputArchiveCmd >> sCommand;
424
425 std::string sCmdTag = sCommand;
426 size_t charPosDel = sCommand.find(' ');
427 if (std::string::npos != charPosDel) {
428 sCmdTag = sCommand.substr(0, charPosDel);
429 } // if( std::string::npos != charPosDel )
430
431 if ("EOF" == sCmdTag) {
432 fbReceivedEof = true;
433
435 if (std::string::npos == charPosDel) {
436 LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
437 << "Incomplete EOF command received: " << sCommand;
438 return false;
439 } // if( std::string::npos == charPosDel )
441 charPosDel++;
442 std::string sNext = sCommand.substr(charPosDel);
443 charPosDel = sNext.find(' ');
444
445 if (std::string::npos == charPosDel) {
446 LOG(fatal) << "CbmDeviceMcbmEventSink::HandleCommand => "
447 << "Incomplete EOF command received: " << sCommand;
448 return false;
449 } // if( std::string::npos == charPosDel )
450 fuLastTsIndex = std::stoul(sNext.substr(0, charPosDel));
452 charPosDel++;
453 fuTotalTsCount = std::stoul(sNext.substr(charPosDel));
454
455 LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
456 << "Received EOF command with final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
459 LOG(info) << "CbmDeviceMcbmEventSink::HandleCommand => "
460 << "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
461 Finish();
462 } // if( fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
463 } // if( "EOF" == sCmdTag )
464 else if ("STOP" == sCmdTag) {
467 Finish();
468 } // else if( "STOP" == sCmdTag )
469 else {
470 LOG(warning) << "Unknown command received: " << sCmdTag << " => will be ignored!";
471 } // else if command not recognized
472
473 return true;
474}
475//--------------------------------------------------------------------//
477{
478 bool bHoleFoundInBothQueues = false;
479
480 std::map<uint64_t, CbmUnpackedTimeslice>::iterator itFullTs = fmFullTsStorage.begin();
481 std::vector<uint64_t>::iterator itMissTs = fvulMissedTsIndices.begin();
482
483 while (!bHoleFoundInBothQueues) {
485 if (fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first) {
487 PrepareTreeEntry((*itFullTs).second);
490
492 fuPrevTsIndex = (*itFullTs).first;
493 fulTsCounter++;
494
496 ++itFullTs;
497 continue;
498 } // if( fmFullTsStorage.end() != itFullTs && fuPrevTsIndex + 1 == (*itFullTs).first() )
500 if (fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs)) {
502 new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
503 TimesliceMetaData(0, 0, 0, (*itMissTs));
504
507
509 fuPrevTsIndex = (*itMissTs);
511
513 ++itMissTs;
514 continue;
515 } // if( fvulMissedTsIndices.end() != itMissTs && fuPrevTsIndex + 1 == (*itMissTs ) )
516
518 bHoleFoundInBothQueues = true;
519 } // while( !bHoleFoundInBothQueues )
520
522 fmFullTsStorage.erase(fmFullTsStorage.begin(), itFullTs);
523 fvulMissedTsIndices.erase(fvulMissedTsIndices.begin(), itMissTs);
524
527 LOG(info) << "CbmDeviceMcbmEventSink::CheckTsQueues => "
528 << "Found final TS index " << fuLastTsIndex << " and total nb TS " << fuTotalTsCount;
529 Finish();
530 } // if( fbReceivedEof && fuPrevTsIndex == fuLastTsIndex && fulTsCounter == fuTotalTsCount )
531}
532//--------------------------------------------------------------------//
534{
537
539 new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
540 TimesliceMetaData(std::move(unpTs.fTsMetaData));
541
542 /*
545 fvDigiBmon->insert( fvDigiBmon->end(), unpTs.fvDigiBmon.begin(), unpTs.fvDigiBmon.end() );
547 fvDigiSts->insert( fvDigiSts->end(), unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() );
549 fvDigiMuch->insert( fvDigiMuch->end(), unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() );
551 fvDigiTrd->insert( fvDigiTrd->end(), unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() );
553 fvDigiTof->insert( fvDigiTof->end(), unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() );
555 fvDigiRich->insert( fvDigiRich->end(), unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() );
557 fvDigiPsd->insert( fvDigiPsd->end(), unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() );
558*/
561 (*fvDigiBmon) = std::move(unpTs.fvDigiBmon);
563 (*fvDigiSts) = std::move(unpTs.fvDigiSts);
565 (*fvDigiMuch) = std::move(unpTs.fvDigiMuch);
567 (*fvDigiTrd) = std::move(unpTs.fvDigiTrd);
569 (*fvDigiTof) = std::move(unpTs.fvDigiTof);
571 (*fvDigiRich) = std::move(unpTs.fvDigiRich);
573 (*fvDigiPsd) = std::move(unpTs.fvDigiPsd);
574
576 fEventsArray->AbsorbObjects(&(unpTs.fEventsArray));
577}
579{
580 // Unpacked digis + CbmEvent output to root file
581 /*
582 * NH style
583// fpFairRootMgr->FillEventHeader(fEvtHeader);
584// LOG(info) << "Fill WriteOutBuffer with FairRootManager at " << fpFairRootMgr;
585// fpOutRootFile->cd();
586 fpFairRootMgr->Fill();
587 fpFairRootMgr->StoreWriteoutBufferData( fpFairRootMgr->GetEventTime() );
588 //fpFairRootMgr->StoreAllWriteoutBufferData();
589 fpFairRootMgr->DeleteOldWriteoutBufferData();
590*/
592 fpFairRootMgr->StoreWriteoutBufferData(fpFairRootMgr->GetEventTime());
593 fpFairRootMgr->Fill();
594 fpFairRootMgr->DeleteOldWriteoutBufferData();
595
598
600 fvDigiBmon->clear();
601 fvDigiSts->clear();
602 fvDigiMuch->clear();
603 fvDigiTrd->clear();
604 fvDigiTof->clear();
605 fvDigiRich->clear();
606 fvDigiPsd->clear();
607
609 // fEventsArray->Delete();
610 fEventsArray->Clear("C");
611 // fEventsArray->Clear();
612}
613
614//--------------------------------------------------------------------//
616{
618 FairMQMessagePtr message(NewMessage());
619 // Serialize<RootSerializer>(*message, &fArrayHisto);
620 RootSerializer().Serialize(*message, &fArrayHisto);
621
623 if (Send(message, fsChannelNameHistosInput) < 0) {
624 LOG(error) << "Problem sending data";
625 return false;
626 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
627
629 // fpAlgo->ResetHistograms( kFALSE );
630
631 return true;
632}
633
634//--------------------------------------------------------------------//
636{
638
640 if (!fbFinishDone) Finish();
641
645 delete fTsMetaData;
646
648 fvDigiBmon->clear();
649 fvDigiSts->clear();
650 fvDigiMuch->clear();
651 fvDigiTrd->clear();
652 fvDigiTof->clear();
653 fvDigiRich->clear();
654 fvDigiPsd->clear();
655
657 fEventsArray->Clear();
658 delete fEventsArray;
659
660 delete fpRun;
661}
662
664{
665 // Clean closure of output to root file
666 fpFairRootMgr->Write();
667 // fpFairRootMgr->GetSource()->Close();
668 fpFairRootMgr->CloseSink();
669 LOG(info) << "File closed after saving " << (fulTsCounter + fulMissedTsCounter) << " TS (" << fulTsCounter
670 << " full ones and " << fulMissedTsCounter << " missed/empty ones)";
671
672 if (kTRUE == fbFillHistos) {
674 fLastPublishTime = std::chrono::system_clock::now();
675 } // if( kTRUE == fbFillHistos )
676
677 ChangeState(fair::mq::Transition::Stop);
678 std::this_thread::sleep_for(std::chrono::milliseconds(3000));
679 ChangeState(fair::mq::Transition::End);
680
681 fbFinishDone = kTRUE;
682}
683
684CbmUnpackedTimeslice::CbmUnpackedTimeslice(FairMQParts& parts) : fEventsArray("CbmEvent", 500)
685{
687 uint32_t uPartIdx = 0;
690 /*
691 std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
692 ( parts.At( uPartIdx ) )->GetSize() );
693 std::istringstream issTsMeta(msgStrTsMeta);
694 boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
695 inputArchiveTsMeta >> (*fTsMetaData);
696 ++uPartIdx;
697*/
698 TObject* tempObjectMeta = nullptr;
699 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectMeta);
700 ++uPartIdx;
701
702 if (TString(tempObjectMeta->ClassName()).EqualTo("TimesliceMetaData")) {
703 fTsMetaData = *(static_cast<TimesliceMetaData*>(tempObjectMeta));
704 } // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
705
707 std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
708 std::istringstream issBmon(msgStrBmon);
709 boost::archive::binary_iarchive inputArchiveBmon(issBmon);
710 inputArchiveBmon >> fvDigiBmon;
711 ++uPartIdx;
712
714 std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
715 std::istringstream issSts(msgStrSts);
716 boost::archive::binary_iarchive inputArchiveSts(issSts);
717 inputArchiveSts >> fvDigiSts;
718 ++uPartIdx;
719
721 std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
722 std::istringstream issMuch(msgStrMuch);
723 boost::archive::binary_iarchive inputArchiveMuch(issMuch);
724 inputArchiveMuch >> fvDigiMuch;
725 ++uPartIdx;
726
728 std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
729 std::istringstream issTrd(msgStrTrd);
730 boost::archive::binary_iarchive inputArchiveTrd(issTrd);
731 inputArchiveTrd >> fvDigiTrd;
732 ++uPartIdx;
733
735 std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
736 std::istringstream issTof(msgStrTof);
737 boost::archive::binary_iarchive inputArchiveTof(issTof);
738 inputArchiveTof >> fvDigiTof;
739 ++uPartIdx;
740
742 std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
743 std::istringstream issRich(msgStrRich);
744 boost::archive::binary_iarchive inputArchiveRich(issRich);
745 inputArchiveRich >> fvDigiRich;
746 ++uPartIdx;
747
749 std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
750 std::istringstream issPsd(msgStrPsd);
751 boost::archive::binary_iarchive inputArchivePsd(issPsd);
752 inputArchivePsd >> fvDigiPsd;
753 ++uPartIdx;
754
756 TObject* tempObject = nullptr;
757 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObject);
758 ++uPartIdx;
759
760 if (TString(tempObject->ClassName()).EqualTo("TClonesArray")) {
761 TClonesArray* arrayEventsIn = static_cast<TClonesArray*>(tempObject);
762
764 fEventsArray.AbsorbObjects(arrayEventsIn);
765 } // if( TString( tempObject->ClassName() ).EqualTo( "TClonesArray") )
766}
767
769{
770 fvDigiBmon.clear();
771 fvDigiSts.clear();
772 fvDigiMuch.clear();
773 fvDigiTrd.clear();
774 fvDigiTof.clear();
775 fvDigiRich.clear();
776 fvDigiPsd.clear();
777 // fEventsArray.Clear("C");
778 fEventsArray.Delete();
779}
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::vector< CbmRichDigi > * fvDigiRich
TimesliceMetaData * fTsMetaData
std::vector< CbmPsdDigi > * fvDigiPsd
std::vector< CbmTrdDigi > * fvDigiTrd
void PrepareTreeEntry(CbmUnpackedTimeslice unpTs)
uint64_t fuPrevTsIndex
Parameters management.
std::string fsOutputFileName
Keep track of whether the Finish was already called.
std::vector< CbmTofDigi > * fvDigiTof
std::chrono::system_clock::time_point fLastPublishTime
uint32_t fuPublishFreqTs
Histograms management.
bool fbReceivedEof
Control Commands reception.
std::vector< CbmMuchBeamTimeDigi > * fvDigiMuch
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
std::vector< CbmTofDigi > * fvDigiBmon
Digis storage.
bool IsChannelNameAllowed(std::string channelName)
Internal methods.
bool HandleData(FairMQParts &, int)
TClonesArray * fEventsArray
CbmEvents.
bool HandleMissTsData(FairMQMessagePtr &, int)
Bool_t fbFinishDone
Switch ON/OFF filling of histograms.
FairRunOnline * fpRun
Data storage.
std::map< uint64_t, CbmUnpackedTimeslice > fmFullTsStorage
Buffered TS.
std::vector< uint64_t > fvulMissedTsIndices
output container of CbmEvents
bool HandleCommand(FairMQMessagePtr &, int)
std::vector< CbmStsDigi > * fvDigiSts
std::string fsChannelNameMissedTs
message queues
TClonesArray * fTimeSliceMetaDataArray
TS MetaData storage.
std::vector< CbmRichDigi > fvDigiRich
std::vector< CbmPsdDigi > fvDigiPsd
std::vector< CbmTrdDigi > fvDigiTrd
std::vector< CbmMuchBeamTimeDigi > fvDigiMuch
CbmUnpackedTimeslice(FairMQParts &parts)
TODO: rename to CbmTsWithEvents.
std::vector< CbmTofDigi > fvDigiTof
TimesliceMetaData fTsMetaData
std::vector< CbmStsDigi > fvDigiSts
std::vector< CbmTofDigi > fvDigiBmon
uint64_t GetIndex() const
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.