15#include "StorableTimeslice.hpp"
17#include "FairMQLogger.h"
18#include "FairMQProgOptions.h"
19#include "FairParGenericSet.h"
20#include "FairRuntimeDb.h"
25#include "THttpServer.h"
29#include <boost/archive/binary_iarchive.hpp>
30#include <boost/archive/binary_oarchive.hpp>
33#include <boost/serialization/vector.hpp>
40 using std::runtime_error::runtime_error;
52 , fbMonitorMode(kFALSE)
53 , fbDebugMonitorMode(kFALSE)
54 , fbSandboxMode(kFALSE)
55 , fbEventDumpEna(kFALSE)
59 , fEventBuilderAlgo(nullptr)
62 , fpBinDumpFile(nullptr)
81 int noChannel = fChannels.size();
82 LOG(info) <<
"Number of defined channels: " << noChannel;
83 for (
auto const& entry : fChannels) {
84 LOG(info) <<
"Channel name: " << entry.first;
86 if (entry.first ==
"syscmd") {
100 LOG(error) << e.what();
108 LOG(info) <<
"Inspect " << entry;
109 std::size_t pos1 = channelName.find(entry);
110 if (pos1 != std::string::npos) {
111 const vector<std::string>::const_iterator
pos =
114 LOG(info) <<
"Found " << entry <<
" in " << channelName;
115 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
119 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
120 LOG(error) <<
"Stop device.";
126 LOG(info) <<
"Init parameter containers for CbmDeviceEventBuilderEtofStar2019.";
131 std::string message {
"CbmStar2019TofPar,111"};
132 LOG(info) <<
"Requesting parameter container CbmStar2019TofPar, sending message: " << message;
134 FairMQMessagePtr req(NewSimpleMessage(
"CbmStar2019TofPar,111"));
135 FairMQMessagePtr rep(NewMessage());
137 if (Send(req,
"parameters") > 0) {
138 if (Receive(rep,
"parameters") >= 0) {
139 if (rep->GetSize() != 0) {
142 LOG(info) <<
"Received unpack parameter from parmq server: " <<
fUnpackPar;
146 LOG(error) <<
"Received empty reply. Parameter not available";
154 Bool_t initOK = kTRUE;
166 THttpServer* server = FairRunOnline::Instance()->GetHttpServer();
167 for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
169 server->Register( Form( "/%s", vHistos[ uHisto ].second.data() ), vHistos[ uHisto ].first );
170 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
172 server->RegisterCommand("/Reset_EvtBuild_Hist", "bStarEtof2019EventBuilderResetHistos=kTRUE");
173 server->Restrict("/Reset_EvtBuild_Hist", "allow=admin");
182 FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
186 LOG(info) <<
"Setting parameter containers for " <<
fParCList->GetEntries() <<
" entries ";
188 for (Int_t iparC = 0; iparC <
fParCList->GetEntries(); ++iparC) {
189 FairParGenericSet* tempObj = (FairParGenericSet*) (
fParCList->At(iparC));
192 std::string sParamName {tempObj->GetName()};
194 FairParGenericSet* newObj =
dynamic_cast<FairParGenericSet*
>(fRtdb->getContainer(sParamName.data()));
195 LOG(info) <<
" - Get " << sParamName.data() <<
" at " << newObj;
196 if (
nullptr == newObj) {
198 LOG(error) <<
"Failed to obtain parameter container " << sParamName <<
", for parameter index " << iparC;
203 LOG(info) <<
" - Mod " << sParamName.data() <<
" to " << newObj;
218 LOG(info) <<
"FIXME ===> Jumping 1st TS as corrupted with current FW + "
219 "FLESNET combination";
224 LOG(error) <<
"Failed processing TS " << ts.index() <<
" in event builder algorithm class";
230 for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
232 Int_t iBuffSzByte = 0;
233 void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
234 if (NULL != pDataBuff) {
249 SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (
char*) pDataBuff, iBuffSzByte, 0);
253 LOG(debug) <<
"Sent STAR event with size " << iBuffSzByte <<
" Bytes"
254 <<
" and token " << eventBuffer[uEvent].GetTrigger().GetStarToken();
257 LOG(error) <<
"Invalid STAR SubEvent Output, can only happen if trigger "
258 <<
" object was not set => Do Nothing more with it!!! ";
267 LOG(info) <<
"ReInit parameter containers for CbmDeviceEventBuilderEtofStar2019";
279 LOG(debug) <<
"Received message number " <<
fNumMessages <<
" with size " << msg->GetSize();
281 std::string msgStr(
static_cast<char*
>(msg->GetData()), msg->GetSize());
282 std::istringstream iss(msgStr);
283 boost::archive::binary_iarchive inputArchive(iss);
285 fles::StorableTimeslice component {0};
286 inputArchive >> component;
305 LOG(debug) <<
"Received message number " <<
fNumMessages <<
" with " << parts.Size() <<
" parts";
307 fles::StorableTimeslice ts {0};
311 std::string msgStr(
static_cast<char*
>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
312 std::istringstream iss(msgStr);
313 boost::archive::binary_iarchive inputArchive(iss);
317 LOG(info) <<
"Initialize TS components list to " << ts.num_components();
318 for (
size_t c {0}; c < ts.num_components(); c++) {
319 auto systemID =
static_cast<int>(ts.descriptor(c, 0).sys_id);
320 LOG(info) <<
"Found systemID: " << std::hex << systemID << std::dec;
326 fles::StorableTimeslice component {0};
328 uint ncomp = parts.Size();
329 for (uint i = 0; i < ncomp; i++) {
330 std::string msgStr(
static_cast<char*
>(parts.At(i)->GetData()), (parts.At(i))->GetSize());
331 std::istringstream iss(msgStr);
332 boost::archive::binary_iarchive inputArchive(iss);
334 inputArchive >> component;
338 LOG(debug) <<
"HandleParts message " <<
fNumMessages <<
" with indx " << component.index();
345 LOG(error) <<
"Failed processing TS " << ts.index() <<
" in event builder algorithm class";
350 LOG(debug) <<
"Process time slice " <<
fNumMessages <<
" with " << eventBuffer.size() <<
" events";
354 for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
356 Int_t iBuffSzByte = 0;
357 void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
358 if (NULL != pDataBuff) {
373 SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (
char*) pDataBuff, iBuffSzByte, 0);
375 LOG(debug) <<
"Sent STAR event " << uEvent <<
" with size " << iBuffSzByte <<
" Bytes"
376 <<
", token " << eventBuffer[uEvent].GetTrigger().GetStarToken() <<
", TrigWord "
377 << eventBuffer[uEvent].GetTrigger().GetStarTrigerWord();
382 LOG(info) <<
"Processed " <<
fulTsCounter <<
" TS, CPUtime: " <<
dctime / 10. <<
" ms/TS";
391 const char* cmd = (
char*) (msg->GetData());
392 const char cmda[4] = {*cmd};
393 LOG(info) <<
"Handle message " << cmd <<
", " << cmd[0];
398 if (strcmp(cmda,
"STOP")) {
415 if (0 == ts.num_components()) {
416 LOG(error) <<
"No Component in TS " << ts.index();
419 auto tsIndex = ts.index();
421 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice " << tsIndex;
442 LOG(debug) <<
"Send Data for event " <<
fNumEvt <<
" with size " << vdigi.size() << Form(
" at %p ", &vdigi);
445 std::stringstream oss;
446 boost::archive::binary_oarchive oa(oss);
448 std::string* strMsg =
new std::string(oss.str());
451 parts.AddPart(NewMessage(
452 const_cast<char*
>(strMsg->c_str()),
454 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
457 LOG(debug) <<
"Send data to channel " << idx <<
" " <<
fChannelsToSend[idx][0];
473 LOG(debug) <<
"SendSubevent " <<
fNumEvt <<
", TrigWord " << trig <<
" with size " << nData << Form(
" at %p ", pData);
475 std::stringstream ossE;
476 boost::archive::binary_oarchive oaE(ossE);
478 std::string* strMsgE =
new std::string(ossE.str());
487 std::string* strMsg =
new std::string(pData, nData);
490 parts.AddPart(NewMessage(
491 const_cast<char*
>(strMsgE->c_str()),
493 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
496 parts.AddPart(NewMessage(
497 const_cast<char*
>(strMsg->c_str()),
499 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
502 LOG(debug) <<
"Send data to channel " << idx <<
" " <<
fChannelsToSend[idx][0];
520 LOG(info) <<
"Closing binary file used for event dump.";
530 TFile* oldFile = gFile;
531 TDirectory* oldDir = gDirectory;
534 TFile* histoFile =
nullptr;
537 histoFile =
new TFile(
"data/eventBuilderMonHist.root",
"RECREATE");
541 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
543 gDirectory->mkdir(vHistos[uHisto].second.data());
544 gDirectory->cd(vHistos[uHisto].second.data());
547 vHistos[uHisto].first->Write();
static uint fiSelectComponents
bool HandleData(FairMQMessagePtr &, int)
uint64_t fulTsCounter
Statistics & first TS rejection.
virtual bool SendEvent(std::vector< Int_t >, int)
bool IsChannelNameAllowed(std::string channelName)
CbmStar2019TofPar * fUnpackPar
bool HandleMessage(FairMQMessagePtr &, int)
std::vector< std::string > fAllowedChannels
std::vector< std::vector< std::string > > fChannelsToSend
virtual ~CbmDeviceEventBuilderEtofStar2019()
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Temp until we change from CbmMcbmUnpack to something else.
CbmDeviceEventBuilderEtofStar2019()
std::fstream * fpBinDumpFile
Event dump to binary file.
Bool_t ReInitContainers()
Bool_t fbSandboxMode
Switch ON the filling of a additional set of histograms.
CbmStar2019EventBuilderEtofAlgo * fEventBuilderAlgo
Processing algo.
Bool_t fbMonitorMode
Control flags.
virtual bool SendSubevent(uint, char *, int, int)
bool HandleParts(FairMQParts &, int)
virtual Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
bool CheckTimeslice(const fles::Timeslice &ts)
TList * fParCList
Switch ON the dumping of the events to a binary file.
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
std::vector< CbmTofStarSubevent2019 > & GetEventBuffer()
Bool_t ReInitContainers()
void SetAddStatusToEvent(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ProcessTs(const fles::Timeslice &ts)
Bool_t CreateHistograms()
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
void LogState(FairMQDevice *device)