23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h"
25#include "FairParGenericSet.h"
26#include "FairRootFileSink.h"
27#include "FairRootManager.h"
28#include "FairRunOnline.h"
30#include "BoostSerializer.h"
32#include "RootSerializer.h"
41#include <boost/archive/binary_iarchive.hpp>
42#include <boost/serialization/utility.hpp>
52 using std::runtime_error::runtime_error;
64 LOG(info) <<
"Init options for CbmDeviceMcbmEventSink.";
95 int noChannel = fChannels.size();
96 LOG(info) <<
"Number of defined channels: " << noChannel;
97 for (
auto const& entry : fChannels) {
98 LOG(info) <<
"Channel name: " << entry.first;
109 fvDigiSts =
new std::vector<CbmStsDigi>();
110 fvDigiMuch =
new std::vector<CbmMuchBeamTimeDigi>();
111 fvDigiTrd =
new std::vector<CbmTrdDigi>();
112 fvDigiTof =
new std::vector<CbmTofDigi>();
114 fvDigiPsd =
new std::vector<CbmPsdDigi>();
120 throw InitTaskError(
"Failed creating the TS meta data TClonesarray ");
126 throw InitTaskError(
"Failed creating the Events TClonesarray ");
131 fpRun =
new FairRunOnline();
171 LOG(info) <<
"Initialized outTree with rootMgr at " <<
fpFairRootMgr;
177 std::vector< std::pair< TNamed *, std::string > > vHistos = fpAlgo->GetHistoVector();
179 std::vector< std::pair< TCanvas *, std::string > > vCanvases = fpAlgo->GetCanvasVector();
185 for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
187// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
188// << " in " << vHistos[ uHisto ].second.data()
190 fArrayHisto.Add( vHistos[ uHisto ].first );
191 std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
192 vHistos[ uHisto ].second );
193 fvpsHistosFolder.push_back( psHistoConfig );
196 FairMQMessagePtr messageHist( NewMessage() );
197// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
198 BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageHist, psHistoConfig );
201 if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
203 throw InitTaskError( "Problem sending histo config" );
204 } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
206 LOG(info) << "Config of hist " << psHistoConfig.first.data()
207 << " in folder " << psHistoConfig.second.data() ;
208 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
213 for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
215// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
216// << " in " << vCanvases[ uCanv ].second.data();
217 std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
218 std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
220 std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
222 fvpsCanvasConfig.push_back( psCanvConfig );
225 FairMQMessagePtr messageCan( NewMessage() );
226// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
227 BoostSerializer < std::pair< std::string, std::string > >.Serialize( *messageCan, psCanvConfig );
230 if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
232 throw InitTaskError( "Problem sending canvas config" );
233 } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
235 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
236 << " is " << psCanvConfig.second.data() ;
237 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
242 LOG(error) << e.what();
250 std::size_t pos1 = channelName.find(entry);
251 if (pos1 != std::string::npos) {
252 const vector<std::string>::const_iterator
pos =
255 LOG(info) <<
"Found " << entry <<
" in " << channelName;
256 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
260 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
261 LOG(error) <<
"Stop device.";
273 fpAlgo ->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
275 Bool_t initOK = fpAlgo->InitContainers();
277// Bool_t initOK = fMonitorAlgo->ReInitContainers();
282Bool_t CbmDeviceMcbmEventSink::InitParameters( TList* fParCList )
284 for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
286 FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
287 fParCList->Remove( tempObj );
288 std::string paramName{ tempObj->GetName() };
289 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
290 // Should only be used for small data because of the cost of an additional copy
292 // Her must come the proper Runid
293 std::string message = paramName + ",111";
294 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
296 FairMQMessagePtr req( NewSimpleMessage(message) );
297 FairMQMessagePtr rep( NewMessage() );
299 FairParGenericSet* newObj = nullptr;
301 if( Send(req, "parameters") > 0 )
303 if( Receive( rep, "parameters" ) >= 0)
305 if( 0 != rep->GetSize() )
307 CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() );
308 newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
309 LOG( info ) << "Received unpack parameter from the server:";
311 } // if( 0 != rep->GetSize() )
314 LOG( error ) << "Received empty reply. Parameter not available";
316 } // else of if( 0 != rep->GetSize() )
317 } // if( Receive( rep, "parameters" ) >= 0)
318 } // if( Send(req, "parameters") > 0 )
319 fParCList->AddAt( newObj, iparC );
321 } // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
330 std::vector<uint64_t> vIndices;
331 std::string msgStrMissTs(
static_cast<char*
>(msg->GetData()), msg->GetSize());
332 std::istringstream issMissTs(msgStrMissTs);
333 boost::archive::binary_iarchive inputArchiveMissTs(issMissTs);
334 inputArchiveMissTs >> vIndices;
348 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with " << parts.Size() <<
" parts"
349 <<
", size0: " << parts.At(0)->GetSize();
354 uint32_t uPartIdx = 0;
366 RootSerializer().Deserialize(*parts.At(uPartIdx),
fTsMetaData);
367 LOG(debug) <<
"TS metadata extracted";
372 LOG(debug) <<
"TS direct to dump";
382 LOG(debug) <<
"TS direct to storage";
387 LOG(debug) <<
"TS metadata checked";
394 LOG(debug) <<
"TS queues checked";
401 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
402 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
419 std::string sCommand;
420 std::string msgStrCmd(
static_cast<char*
>(msg->GetData()), msg->GetSize());
421 std::istringstream issCmd(msgStrCmd);
422 boost::archive::binary_iarchive inputArchiveCmd(issCmd);
423 inputArchiveCmd >> sCommand;
425 std::string sCmdTag = sCommand;
426 size_t charPosDel = sCommand.find(
' ');
427 if (std::string::npos != charPosDel) {
428 sCmdTag = sCommand.substr(0, charPosDel);
431 if (
"EOF" == sCmdTag) {
435 if (std::string::npos == charPosDel) {
436 LOG(fatal) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
437 <<
"Incomplete EOF command received: " << sCommand;
442 std::string sNext = sCommand.substr(charPosDel);
443 charPosDel = sNext.find(
' ');
445 if (std::string::npos == charPosDel) {
446 LOG(fatal) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
447 <<
"Incomplete EOF command received: " << sCommand;
455 LOG(info) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
459 LOG(info) <<
"CbmDeviceMcbmEventSink::HandleCommand => "
464 else if (
"STOP" == sCmdTag) {
470 LOG(warning) <<
"Unknown command received: " << sCmdTag <<
" => will be ignored!";
478 bool bHoleFoundInBothQueues =
false;
480 std::map<uint64_t, CbmUnpackedTimeslice>::iterator itFullTs =
fmFullTsStorage.begin();
483 while (!bHoleFoundInBothQueues) {
518 bHoleFoundInBothQueues =
true;
527 LOG(info) <<
"CbmDeviceMcbmEventSink::CheckTsQueues => "
545 fvDigiBmon->insert( fvDigiBmon->end(), unpTs.fvDigiBmon.begin(), unpTs.fvDigiBmon.end() );
547 fvDigiSts->insert( fvDigiSts->end(), unpTs.fvDigiSts.begin(), unpTs.fvDigiSts.end() );
549 fvDigiMuch->insert( fvDigiMuch->end(), unpTs.fvDigiMuch.begin(), unpTs.fvDigiMuch.end() );
551 fvDigiTrd->insert( fvDigiTrd->end(), unpTs.fvDigiTrd.begin(), unpTs.fvDigiTrd.end() );
553 fvDigiTof->insert( fvDigiTof->end(), unpTs.fvDigiTof.begin(), unpTs.fvDigiTof.end() );
555 fvDigiRich->insert( fvDigiRich->end(), unpTs.fvDigiRich.begin(), unpTs.fvDigiRich.end() );
557 fvDigiPsd->insert( fvDigiPsd->end(), unpTs.fvDigiPsd.begin(), unpTs.fvDigiPsd.end() );
563 (*fvDigiSts) = std::move(unpTs.
fvDigiSts);
567 (*fvDigiTrd) = std::move(unpTs.
fvDigiTrd);
569 (*fvDigiTof) = std::move(unpTs.
fvDigiTof);
573 (*fvDigiPsd) = std::move(unpTs.
fvDigiPsd);
618 FairMQMessagePtr message(NewMessage());
620 RootSerializer().Serialize(*message, &
fArrayHisto);
624 LOG(error) <<
"Problem sending data";
677 ChangeState(fair::mq::Transition::Stop);
678 std::this_thread::sleep_for(std::chrono::milliseconds(3000));
679 ChangeState(fair::mq::Transition::End);
687 uint32_t uPartIdx = 0;
698 TObject* tempObjectMeta =
nullptr;
699 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObjectMeta);
702 if (TString(tempObjectMeta->ClassName()).EqualTo(
"TimesliceMetaData")) {
707 std::string msgStrBmon(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
708 std::istringstream issBmon(msgStrBmon);
709 boost::archive::binary_iarchive inputArchiveBmon(issBmon);
714 std::string msgStrSts(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
715 std::istringstream issSts(msgStrSts);
716 boost::archive::binary_iarchive inputArchiveSts(issSts);
721 std::string msgStrMuch(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
722 std::istringstream issMuch(msgStrMuch);
723 boost::archive::binary_iarchive inputArchiveMuch(issMuch);
728 std::string msgStrTrd(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
729 std::istringstream issTrd(msgStrTrd);
730 boost::archive::binary_iarchive inputArchiveTrd(issTrd);
735 std::string msgStrTof(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
736 std::istringstream issTof(msgStrTof);
737 boost::archive::binary_iarchive inputArchiveTof(issTof);
742 std::string msgStrRich(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
743 std::istringstream issRich(msgStrRich);
744 boost::archive::binary_iarchive inputArchiveRich(issRich);
749 std::string msgStrPsd(
static_cast<char*
>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
750 std::istringstream issPsd(msgStrPsd);
751 boost::archive::binary_iarchive inputArchivePsd(issPsd);
756 TObject* tempObject =
nullptr;
757 RootSerializer().Deserialize(*parts.At(uPartIdx), tempObject);
760 if (TString(tempObject->ClassName()).EqualTo(
"TClonesArray")) {
761 TClonesArray* arrayEventsIn =
static_cast<TClonesArray*
>(tempObject);
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::string fsChannelNameCanvasConfig
std::vector< CbmRichDigi > * fvDigiRich
TimesliceMetaData * fTsMetaData
std::vector< CbmPsdDigi > * fvDigiPsd
std::vector< CbmTrdDigi > * fvDigiTrd
std::string fsChannelNameCommands
void PrepareTreeEntry(CbmUnpackedTimeslice unpTs)
std::string fsChannelNameHistosConfig
Bool_t fbFillHistos
Constants.
double_t fdMaxPublishTime
uint64_t fuPrevTsIndex
Parameters management.
std::string fsOutputFileName
Keep track of whether the Finish was already called.
std::vector< CbmTofDigi > * fvDigiTof
double_t fdMinPublishTime
std::chrono::system_clock::time_point fLastPublishTime
uint64_t fulMissedTsCounter
uint32_t fuPublishFreqTs
Histograms management.
bool fbReceivedEof
Control Commands reception.
std::vector< CbmMuchBeamTimeDigi > * fvDigiMuch
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
std::vector< CbmTofDigi > * fvDigiBmon
Digis storage.
bool IsChannelNameAllowed(std::string channelName)
Internal methods.
std::string fsChannelNameDataInput
bool HandleData(FairMQParts &, int)
TClonesArray * fEventsArray
CbmEvents.
bool HandleMissTsData(FairMQMessagePtr &, int)
Bool_t fbFinishDone
Switch ON/OFF filling of histograms.
std::string fsChannelNameHistosInput
FairRunOnline * fpRun
Data storage.
virtual ~CbmDeviceMcbmEventSink()
std::map< uint64_t, CbmUnpackedTimeslice > fmFullTsStorage
Buffered TS.
std::vector< uint64_t > fvulMissedTsIndices
output container of CbmEvents
bool HandleCommand(FairMQMessagePtr &, int)
std::vector< CbmStsDigi > * fvDigiSts
FairRootManager * fpFairRootMgr
std::string fsChannelNameMissedTs
message queues
TClonesArray * fTimeSliceMetaDataArray
TS MetaData storage.
std::vector< CbmRichDigi > fvDigiRich
std::vector< CbmPsdDigi > fvDigiPsd
std::vector< CbmTrdDigi > fvDigiTrd
std::vector< CbmMuchBeamTimeDigi > fvDigiMuch
CbmUnpackedTimeslice(FairMQParts &parts)
TODO: rename to CbmTsWithEvents.
std::vector< CbmTofDigi > fvDigiTof
TClonesArray fEventsArray
TimesliceMetaData fTsMetaData
std::vector< CbmStsDigi > fvDigiSts
std::vector< CbmTofDigi > fvDigiBmon
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)