CbmRoot
Loading...
Searching...
No Matches
CbmDeviceEventBuilderEtofStar2019.cxx
Go to the documentation of this file.
1/* Copyright (C) 2019-2021 PI-UHd, GSI
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Norbert Herrmann [committer] */
4
10
11#include "CbmMQDefs.h"
13#include "CbmStar2019TofPar.h"
14
15#include "StorableTimeslice.hpp"
16
17#include "FairMQLogger.h"
18#include "FairMQProgOptions.h" // device->fConfig
19#include "FairParGenericSet.h"
20#include "FairRuntimeDb.h"
21
22#include "TFile.h"
23#include "TH1.h"
24#include "TH2.h"
25#include "THttpServer.h"
26#include "TROOT.h"
27#include "TString.h"
28
29#include <boost/archive/binary_iarchive.hpp>
30#include <boost/archive/binary_oarchive.hpp>
31
32// include this header to serialize vectors
33#include <boost/serialization/vector.hpp>
34
35#include <array>
36#include <iomanip>
37#include <stdexcept>
38#include <string>
39struct InitTaskError : std::runtime_error {
40 using std::runtime_error::runtime_error;
41};
42
43using namespace std;
44
45//static Int_t iMess=0;
46const Int_t DetMask = 0x003FFFFF;
47static uint fiSelectComponents {0};
48
50 : //CbmDeviceUnpackTofMcbm2018(),
51 fNumMessages(0)
52 , fbMonitorMode(kFALSE)
53 , fbDebugMonitorMode(kFALSE)
54 , fbSandboxMode(kFALSE)
55 , fbEventDumpEna(kFALSE)
56 , fParCList(nullptr)
57 , fulTsCounter(0)
58 , fNumEvt(0)
59 , fEventBuilderAlgo(nullptr)
60 , fTimer()
61 , fUnpackPar(nullptr)
62 , fpBinDumpFile(nullptr)
63{
65}
66
68
70try {
71 // Get the information about created channels from the device
72 // Check if the defined channels from the topology (by name)
73 // are in the list of channels which are possible/allowed
74 // for the device
75 // The idea is to check at initilization if the devices are
76 // properly connected. For the time beeing this is done with a
77 // nameing convention. It is not avoided that someone sends other
78 // data on this channel.
79 //logger::SetLogLevel("INFO");
80
81 int noChannel = fChannels.size();
82 LOG(info) << "Number of defined channels: " << noChannel;
83 for (auto const& entry : fChannels) {
84 LOG(info) << "Channel name: " << entry.first;
85 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
86 if (entry.first == "syscmd") {
88 continue;
89 }
90 //if(entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleData);
91 if (entry.first != "etofevts") OnData(entry.first, &CbmDeviceEventBuilderEtofStar2019::HandleParts);
92 else {
93 fChannelsToSend[0].push_back(entry.first);
94 LOG(info) << "Init to send data to channel " << fChannelsToSend[0][0];
95 }
96 }
98}
99catch (InitTaskError& e) {
100 LOG(error) << e.what();
101 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
103}
104
106{
107 for (auto const& entry : fAllowedChannels) {
108 LOG(info) << "Inspect " << entry;
109 std::size_t pos1 = channelName.find(entry);
110 if (pos1 != std::string::npos) {
111 const vector<std::string>::const_iterator pos =
112 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
113 const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
114 LOG(info) << "Found " << entry << " in " << channelName;
115 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
116 return true;
117 }
118 }
119 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
120 LOG(error) << "Stop device.";
121 return false;
122}
123
125{
126 LOG(info) << "Init parameter containers for CbmDeviceEventBuilderEtofStar2019.";
127 // FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
128
129 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
130 // Should only be used for small data because of the cost of an additional copy
131 std::string message {"CbmStar2019TofPar,111"};
132 LOG(info) << "Requesting parameter container CbmStar2019TofPar, sending message: " << message;
133
134 FairMQMessagePtr req(NewSimpleMessage("CbmStar2019TofPar,111"));
135 FairMQMessagePtr rep(NewMessage());
136
137 if (Send(req, "parameters") > 0) {
138 if (Receive(rep, "parameters") >= 0) {
139 if (rep->GetSize() != 0) {
140 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
141 fUnpackPar = dynamic_cast<CbmStar2019TofPar*>(tmsg.ReadObject(tmsg.GetClass()));
142 LOG(info) << "Received unpack parameter from parmq server: " << fUnpackPar;
143 fUnpackPar->Print();
144 }
145 else {
146 LOG(error) << "Received empty reply. Parameter not available";
147 }
148 }
149 }
150
151
153
154 Bool_t initOK = kTRUE;
156 initOK &= ReInitContainers(); // needed for TInt parameters
158
159 if (kTRUE == fbMonitorMode) { // CreateHistograms();
161
163 std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();
164 /* FIXME
166 THttpServer* server = FairRunOnline::Instance()->GetHttpServer();
167 for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
168 {
169 server->Register( Form( "/%s", vHistos[ uHisto ].second.data() ), vHistos[ uHisto ].first );
170 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
171
172 server->RegisterCommand("/Reset_EvtBuild_Hist", "bStarEtof2019EventBuilderResetHistos=kTRUE");
173 server->Restrict("/Reset_EvtBuild_Hist", "allow=admin");
174 */
175 } // if( kTRUE == fbMonitorMode )
176
177 return initOK;
178}
179
181{
182 FairRuntimeDb* fRtdb = FairRuntimeDb::instance();
183
185
186 LOG(info) << "Setting parameter containers for " << fParCList->GetEntries() << " entries ";
187
188 for (Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC) {
189 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
190 fParCList->Remove(tempObj);
191
192 std::string sParamName {tempObj->GetName()};
193
194 FairParGenericSet* newObj = dynamic_cast<FairParGenericSet*>(fRtdb->getContainer(sParamName.data()));
195 LOG(info) << " - Get " << sParamName.data() << " at " << newObj;
196 if (nullptr == newObj) {
197
198 LOG(error) << "Failed to obtain parameter container " << sParamName << ", for parameter index " << iparC;
199 return;
200 } // if( nullptr == newObj )
201 if (iparC == 0) {
202 newObj = (FairParGenericSet*) fUnpackPar;
203 LOG(info) << " - Mod " << sParamName.data() << " to " << newObj;
204 }
205 fParCList->AddAt(newObj, iparC);
206 // delete tempObj;
207 } // for( Int_t iparC = 0; iparC < fParCList->GetEntries(); ++iparC )
208}
209
210void CbmDeviceEventBuilderEtofStar2019::AddMsComponentToList(size_t component, UShort_t usDetectorId)
211{
212 fEventBuilderAlgo->AddMsComponentToList(component, usDetectorId);
213}
214
215Bool_t CbmDeviceEventBuilderEtofStar2019::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
216{
217 if (0 == fulTsCounter) {
218 LOG(info) << "FIXME ===> Jumping 1st TS as corrupted with current FW + "
219 "FLESNET combination";
220 fulTsCounter++;
221 return kTRUE;
222 } // if( 0 == fulTsCounter )
223 if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
224 LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
225 return kTRUE;
226 } // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
227
228 std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();
229
230 for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
232 Int_t iBuffSzByte = 0;
233 void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
234 if (NULL != pDataBuff) {
236 // Bool_t fbSendEventToStar = kFALSE;
237 if (kFALSE == fbSandboxMode) {
238 /*
239 ** Function to send sub-event block to the STAR DAQ system
240 * trg_word received is packed as:
241 *
242 * trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
243 */
244 /*
245 star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
246 pDataBuff, iBuffSzByte );
247 */
248
249 SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);
250
251 } // if( kFALSE == fbSandboxMode )
252
253 LOG(debug) << "Sent STAR event with size " << iBuffSzByte << " Bytes"
254 << " and token " << eventBuffer[uEvent].GetTrigger().GetStarToken();
255 } // if( NULL != pDataBuff )
256 else
257 LOG(error) << "Invalid STAR SubEvent Output, can only happen if trigger "
258 << " object was not set => Do Nothing more with it!!! ";
259 } // for( UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent )
260
261 return kTRUE;
262}
263
264
266{
267 LOG(info) << "ReInit parameter containers for CbmDeviceEventBuilderEtofStar2019";
268 Bool_t initOK = fEventBuilderAlgo->ReInitContainers();
269 return initOK;
270}
271
272// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
273bool CbmDeviceEventBuilderEtofStar2019::HandleData(FairMQMessagePtr& msg, int /*index*/)
274{
275 // Don't do anything with the data
276 // Maybe add an message counter which counts the incomming messages and add
277 // an output
278 fNumMessages++;
279 LOG(debug) << "Received message number " << fNumMessages << " with size " << msg->GetSize();
280
281 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
282 std::istringstream iss(msgStr);
283 boost::archive::binary_iarchive inputArchive(iss);
284
285 fles::StorableTimeslice component {0};
286 inputArchive >> component;
287
288 CheckTimeslice(component);
289
290 DoUnpack(component, 0);
291
292 // if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";
293
294 return true;
295}
296
297static Double_t dctime = 0.;
298
299bool CbmDeviceEventBuilderEtofStar2019::HandleParts(FairMQParts& parts, int /*index*/)
300{
301 // Don't do anything with the data
302 // Maybe add an message counter which counts the incomming messages and add
303 // an output
304 fNumMessages++;
305 LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts";
306
307 fles::StorableTimeslice ts {0}; // rename ??? FIXME
308
309 switch (fiSelectComponents) {
310 case 0: {
311 std::string msgStr(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
312 std::istringstream iss(msgStr);
313 boost::archive::binary_iarchive inputArchive(iss);
314 inputArchive >> ts;
315 CheckTimeslice(ts);
316 if (1 == fNumMessages) {
317 LOG(info) << "Initialize TS components list to " << ts.num_components();
318 for (size_t c {0}; c < ts.num_components(); c++) {
319 auto systemID = static_cast<int>(ts.descriptor(c, 0).sys_id);
320 LOG(info) << "Found systemID: " << std::hex << systemID << std::dec;
321 fEventBuilderAlgo->AddMsComponentToList(c, systemID); // TOF data
322 }
323 }
324 } break;
325 case 1: {
326 fles::StorableTimeslice component {0};
327
328 uint ncomp = parts.Size();
329 for (uint i = 0; i < ncomp; i++) {
330 std::string msgStr(static_cast<char*>(parts.At(i)->GetData()), (parts.At(i))->GetSize());
331 std::istringstream iss(msgStr);
332 boost::archive::binary_iarchive inputArchive(iss);
333 //fles::StorableTimeslice component{i};
334 inputArchive >> component;
335
336 CheckTimeslice(component);
337 fEventBuilderAlgo->AddMsComponentToList(i, 0x60); // TOF data
338 LOG(debug) << "HandleParts message " << fNumMessages << " with indx " << component.index();
339 }
340 } break;
341 default:;
342 }
343
344 if (kFALSE == fEventBuilderAlgo->ProcessTs(ts)) {
345 LOG(error) << "Failed processing TS " << ts.index() << " in event builder algorithm class";
346 return kTRUE;
347 } // if( kFALSE == fEventBuilderAlgo->ProcessTs( ts ) )
348
349 std::vector<CbmTofStarSubevent2019>& eventBuffer = fEventBuilderAlgo->GetEventBuffer();
350 LOG(debug) << "Process time slice " << fNumMessages << " with " << eventBuffer.size() << " events";
351
352 //if(fNumMessages%10000 == 0) LOG(info)<<"Processed "<<fNumMessages<<" time slices";
353
354 for (UInt_t uEvent = 0; uEvent < eventBuffer.size(); ++uEvent) {
356 Int_t iBuffSzByte = 0;
357 void* pDataBuff = eventBuffer[uEvent].BuildOutput(iBuffSzByte);
358 if (NULL != pDataBuff) {
360 // Send to Star TriggerHandler, TBD
361 if (kFALSE == fbSandboxMode) {
362 /*
363 ** Function to send sub-event block to the STAR DAQ system
364 * trg_word received is packed as:
365 *
366 * trg_cmd|daq_cmd|tkn_hi|tkn_mid|tkn_lo
367 */
368 /*
369 star_rhicf_write( eventBuffer[ uEvent ].GetTrigger().GetStarTrigerWord(),
370 pDataBuff, iBuffSzByte );
371 */
372 } // if( kFALSE == fbSandboxMode )
373 SendSubevent(eventBuffer[uEvent].GetTrigger().GetStarTrigerWord(), (char*) pDataBuff, iBuffSzByte, 0);
374
375 LOG(debug) << "Sent STAR event " << uEvent << " with size " << iBuffSzByte << " Bytes"
376 << ", token " << eventBuffer[uEvent].GetTrigger().GetStarToken() << ", TrigWord "
377 << eventBuffer[uEvent].GetTrigger().GetStarTrigerWord();
378 }
379 }
380
381 if (0 == fulTsCounter % 10000) {
382 LOG(info) << "Processed " << fulTsCounter << " TS, CPUtime: " << dctime / 10. << " ms/TS";
383 dctime = 0.;
384 }
385 fulTsCounter++;
386 return true;
387}
388
389bool CbmDeviceEventBuilderEtofStar2019::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
390{
391 const char* cmd = (char*) (msg->GetData());
392 const char cmda[4] = {*cmd};
393 LOG(info) << "Handle message " << cmd << ", " << cmd[0];
394 cbm::mq::LogState(this);
395
396 // only one implemented so far "Stop"
397
398 if (strcmp(cmda, "STOP")) {
399 LOG(info) << "STOP";
401 cbm::mq::LogState(this);
403 cbm::mq::LogState(this);
405 cbm::mq::LogState(this);
407 cbm::mq::LogState(this);
408 }
409 return true;
410}
411
412
414{
415 if (0 == ts.num_components()) {
416 LOG(error) << "No Component in TS " << ts.index();
417 return 1;
418 }
419 auto tsIndex = ts.index();
420
421 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << tsIndex;
422
423 /*
424 for (size_t c = 0; c < ts.num_components(); ++c) {
425 LOG(debug) << "Found " << ts.num_microslices(c)
426 << " microslices in component " << c;
427 LOG(debug) << "Component " << c << " has a size of "
428 << ts.size_component(c) << " bytes";
429 LOG(debug) << "Sys ID: Ox" << std::hex << static_cast<int>(ts.descriptor(0,0).sys_id)
430 << std::dec;
431
432 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
433 PrintMicroSliceDescriptor(ts.descriptor(c,m));
434 }
435 }
436*/
437 return true;
438}
439
440bool CbmDeviceEventBuilderEtofStar2019::SendEvent(std::vector<Int_t> vdigi, int idx)
441{
442 LOG(debug) << "Send Data for event " << fNumEvt << " with size " << vdigi.size() << Form(" at %p ", &vdigi);
443 // LOG(debug) << "EventHeader: "<< fEventHeader[0] << " " << fEventHeader[1] << " " << fEventHeader[2] << " " << fEventHeader[3];
444
445 std::stringstream oss;
446 boost::archive::binary_oarchive oa(oss);
447 oa << vdigi;
448 std::string* strMsg = new std::string(oss.str());
449
450 FairMQParts parts;
451 parts.AddPart(NewMessage(
452 const_cast<char*>(strMsg->c_str()), // data
453 strMsg->length(), // size
454 [](void*, void* object) { delete static_cast<std::string*>(object); },
455 strMsg)); // object that manages the data
456
457 LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];
458
459
460 // if (Send(msg, fChannelsToSend[idx][0]) < 0) {
461 if (Send(parts, fChannelsToSend[idx][0]) < 0) {
462 LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
463 return false;
464 }
465 fNumEvt++;
466 //if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
467 return true;
468}
469
470bool CbmDeviceEventBuilderEtofStar2019::SendSubevent(uint trig, char* pData, int nData, int idx)
471{
472
473 LOG(debug) << "SendSubevent " << fNumEvt << ", TrigWord " << trig << " with size " << nData << Form(" at %p ", pData);
474
475 std::stringstream ossE;
476 boost::archive::binary_oarchive oaE(ossE);
477 oaE << trig;
478 std::string* strMsgE = new std::string(ossE.str());
479
480 /*
481 std::stringstream oss;
482 boost::archive::binary_oarchive oa(oss);
483 oa << cData;
484 std::string* strMsg = new std::string(oss.str());
485 */
486
487 std::string* strMsg = new std::string(pData, nData);
488
489 FairMQParts parts;
490 parts.AddPart(NewMessage(
491 const_cast<char*>(strMsgE->c_str()), // data
492 strMsgE->length(), // size
493 [](void*, void* object) { delete static_cast<std::string*>(object); },
494 strMsgE)); // object that manages the data
495
496 parts.AddPart(NewMessage(
497 const_cast<char*>(strMsg->c_str()), // data
498 strMsg->length(), // size
499 [](void*, void* object) { delete static_cast<std::string*>(object); },
500 strMsg)); // object that manages the data
501
502 LOG(debug) << "Send data to channel " << idx << " " << fChannelsToSend[idx][0];
503
504
505 // if (Send(msg, fChannelsToSend[idx][0]) < 0) {
506 if (Send(parts, fChannelsToSend[idx][0]) < 0) {
507 LOG(error) << "Problem sending data " << fChannelsToSend[idx][0];
508 return false;
509 }
510 fNumEvt++;
511 //if(fNumEvt==100) FairMQStateMachine::ChangeState(PAUSE); //sleep(10000); // Stop for debugging ...
512 return true;
513}
514
516
518{
519 if (NULL != fpBinDumpFile) {
520 LOG(info) << "Closing binary file used for event dump.";
521 fpBinDumpFile->close();
522 } // if( NULL != fpBinDumpFile )
523
525 if (kTRUE == fbMonitorMode) {
527 std::vector<std::pair<TNamed*, std::string>> vHistos = fEventBuilderAlgo->GetHistoVector();
528
530 TFile* oldFile = gFile;
531 TDirectory* oldDir = gDirectory;
532
534 TFile* histoFile = nullptr;
535
536 // open separate histo file in recreate mode
537 histoFile = new TFile("data/eventBuilderMonHist.root", "RECREATE");
538 histoFile->cd();
539
541 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
543 gDirectory->mkdir(vHistos[uHisto].second.data());
544 gDirectory->cd(vHistos[uHisto].second.data());
545
547 vHistos[uHisto].first->Write();
548
549 histoFile->cd();
550 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
551
553 gFile = oldFile;
554 gDirectory = oldDir;
555
556 histoFile->Close();
557 } // if( kTRUE == fbMonitorMode )
558}
static Double_t dctime
static uint fiSelectComponents
uint64_t fulTsCounter
Statistics & first TS rejection.
virtual bool SendEvent(std::vector< Int_t >, int)
std::vector< std::vector< std::string > > fChannelsToSend
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Temp until we change from CbmMcbmUnpack to something else.
std::fstream * fpBinDumpFile
Event dump to binary file.
Bool_t fbSandboxMode
Switch ON the filling of a additional set of histograms.
CbmStar2019EventBuilderEtofAlgo * fEventBuilderAlgo
Processing algo.
virtual bool SendSubevent(uint, char *, int, int)
virtual Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
TList * fParCList
Switch ON the dumping of the events to a binary file.
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
std::vector< CbmTofStarSubevent2019 > & GetEventBuffer()
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ProcessTs(const fles::Timeslice &ts)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
void LogState(FairMQDevice *device)
Definition CbmMQDefs.h:47
Hash for CbmL1LinkKey.