CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMcbmUnpack.cxx
Go to the documentation of this file.
1/* Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
12#include "CbmDeviceMcbmUnpack.h"
13
14#include "CbmFlesCanvasTools.h"
15#include "CbmMQDefs.h"
22
23#include "StorableTimeslice.hpp"
24#include "TimesliceMetaData.h"
25
26#include "FairMQLogger.h"
27#include "FairMQProgOptions.h" // device->fConfig
28#include "FairParGenericSet.h"
29
30#include "TCanvas.h"
31#include "TFile.h"
32#include "TH1.h"
33#include "TList.h"
34#include "TNamed.h"
35
36#include "BoostSerializer.h"
37#include <boost/archive/binary_iarchive.hpp>
38#include <boost/serialization/utility.hpp>
39
40#include <array>
41#include <iomanip>
42#include <stdexcept>
43#include <string>
44
45#include "RootSerializer.h"
46struct InitTaskError : std::runtime_error {
47 using std::runtime_error::runtime_error;
48};
49
50using namespace std;
51
53
63
65try {
67 LOG(info) << "Init options for CbmDeviceMcbmUnpack.";
68 fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
69 fvsSetTimeOffs = fConfig->GetValue<std::vector<std::string>>("SetTrigWin");
70 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
71 fsChannelNameDataOutput = fConfig->GetValue<std::string>("TsNameOut");
74
75 // Get the information about created channels from the device
76 // Check if the defined channels from the topology (by name)
77 // are in the list of channels which are possible/allowed
78 // for the device
79 // The idea is to check at initilization if the devices are
80 // properly connected. For the time beeing this is done with a
81 // nameing convention. It is not avoided that someone sends other
82 // data on this channel.
83 //logger::SetLogLevel("INFO");
84
85 int noChannel = fChannels.size();
86 LOG(info) << "Number of defined channels: " << noChannel;
87 for (auto const& entry : fChannels) {
88 LOG(info) << "Channel name: " << entry.first;
89 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
90 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
91 OnData(entry.first, &CbmDeviceMcbmUnpack::HandleData);
92 } // if( entry.first.find( "ts" )
93 } // for( auto const &entry : fChannels )
95}
96catch (InitTaskError& e) {
97 LOG(error) << e.what();
98 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
100}
101
102bool CbmDeviceMcbmUnpack::IsChannelNameAllowed(std::string channelName)
103{
104 for (auto const& entry : fsAllowedChannels) {
105 std::size_t pos1 = channelName.find(entry);
106 if (pos1 != std::string::npos) {
107 const vector<std::string>::const_iterator pos =
108 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
109 const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
110 LOG(info) << "Found " << entry << " in " << channelName;
111 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
112 return true;
113 } // if (pos1!=std::string::npos)
114 } // for(auto const &entry : fsAllowedChannels)
115 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
116 LOG(error) << "Stop device.";
117 return false;
118}
119
121{
122 LOG(info) << "Init parameter containers for CbmDeviceMcbmUnpack.";
123
124 if (kFALSE == InitParameters(fUnpAlgoSts->GetParList())) return kFALSE;
125 if (kFALSE == InitParameters(fUnpAlgoMuch->GetParList())) return kFALSE;
126 if (kFALSE == InitParameters(fUnpAlgoTrd->GetParList())) return kFALSE;
127 if (kFALSE == InitParameters(fUnpAlgoTof->GetParList())) return kFALSE;
128 if (kFALSE == InitParameters(fUnpAlgoRich->GetParList())) return kFALSE;
129 if (kFALSE == InitParameters(fUnpAlgoPsd->GetParList())) return kFALSE;
130
138
140 for (std::vector<std::string>::iterator itStrOffs = fvsSetTimeOffs.begin(); itStrOffs != fvsSetTimeOffs.end();
141 ++itStrOffs) {
142 size_t charPosDel = (*itStrOffs).find(',');
143 if (std::string::npos == charPosDel) {
144 LOG(info) << "CbmDeviceMcbmUnpack::InitContainers => "
145 << "Trying to set trigger window with invalid option pattern, ignored! "
146 << " (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found " << (*itStrOffs) << " )";
147 } // if( std::string::npos == charPosDel )
148
150 std::string sSelDet = (*itStrOffs).substr(0, charPosDel);
152 charPosDel++;
153 Double_t dOffset = std::stod((*itStrOffs).substr(charPosDel));
154
155 if ("kSTS" == sSelDet) { fUnpAlgoSts->SetTimeOffsetNs(dOffset); } // if( "kSTS" == sSelDet )
156 else if ("kMUCH" == sSelDet) {
158 } // else if( "kMUCH" == sSelDet )
159 else if ("kTRD" == sSelDet) {
161 } // else if( "kTRD" == sSelDet )
162 else if ("kTOF" == sSelDet) {
164 } // else if( "kTOF" == sSelDet )
165 else if ("kRICH" == sSelDet) {
167 } // else if( "kRICH" == sSelDet )
168 else if ("kPSD" == sSelDet) {
170 } // else if( "kPSD" == sSelDet )
171 else {
172 LOG(info) << "CbmDeviceMcbmUnpack::InitContainers => Trying to set time "
173 "offset for unsupported detector, ignored! "
174 << (sSelDet);
175 continue;
176 } // else of detector enum detection
177 } // for( std::vector< std::string >::iterator itStrAdd = fvsAddDet.begin(); itStrAdd != fvsAddDet.end(); ++itStrAdd )
178
179
184
185 Bool_t initOK = fUnpAlgoSts->InitContainers();
186 initOK &= fUnpAlgoMuch->InitContainers();
187 initOK &= fUnpAlgoTrd->InitContainers();
188 initOK &= fUnpAlgoTof->InitContainers();
189 initOK &= fUnpAlgoRich->InitContainers();
190 initOK &= fUnpAlgoPsd->InitContainers();
191
195
196 // Bool_t initOK = fMonitorAlgo->ReInitContainers();
197
198 return initOK;
199}
200
202{
203 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
204 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
205 fParCList->Remove(tempObj);
206 std::string paramName {tempObj->GetName()};
207 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
208 // Should only be used for small data because of the cost of an additional copy
209
210 // Her must come the proper Runid
211 std::string message = paramName + ",111";
212 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
213
214 FairMQMessagePtr req(NewSimpleMessage(message));
215 FairMQMessagePtr rep(NewMessage());
216
217 FairParGenericSet* newObj = nullptr;
218
219 if (Send(req, "parameters") > 0) {
220 if (Receive(rep, "parameters") >= 0) {
221 if (0 != rep->GetSize()) {
222 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
223 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
224 LOG(info) << "Received unpack parameter from the server:";
225 newObj->print();
226 } // if( 0 != rep->GetSize() )
227 else {
228 LOG(error) << "Received empty reply. Parameter not available";
229 return kFALSE;
230 } // else of if( 0 != rep->GetSize() )
231 } // if( Receive( rep, "parameters" ) >= 0)
232 } // if( Send(req, "parameters") > 0 )
233 fParCList->AddAt(newObj, iparC);
234 delete tempObj;
235 } // for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
236
237 return kTRUE;
238}
239
240// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
241bool CbmDeviceMcbmUnpack::HandleData(FairMQMessagePtr& msg, int /*index*/)
242{
244 LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
245
246 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
247
248 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
249 std::istringstream iss(msgStr);
250 boost::archive::binary_iarchive inputArchive(iss);
251
253 fles::StorableTimeslice ts {0};
254 inputArchive >> ts;
255
257 if (-1.0 == fdTsCoreSizeInNs) {
258 fuNbCoreMsPerTs = ts.num_core_microslices();
259 fuNbOverMsPerTs = ts.num_microslices(0) - ts.num_core_microslices();
263 LOG(info) << "Timeslice parameters: each TS has " << fuNbCoreMsPerTs << " Core MS and " << fuNbOverMsPerTs
264 << " Overlap MS, for a core duration of " << fdTsCoreSizeInNs << " ns and a full duration of "
265 << fdTsFullSizeInNs << " ns";
266 } // if( -1.0 == fdTsCoreSizeInNs )
267
268 fTsMetaData = new TimesliceMetaData(ts.descriptor(0, 0).idx, fdTsCoreSizeInNs, fdTsOverSizeInNs, ts.index());
269
271 DoUnpack(ts, 0);
272
274 if (!SendUnpData()) return false;
275
276 delete fTsMetaData;
277
285
293
294 return true;
295}
296
298{
299
301 /*
302 std::stringstream ossTsMeta;
303 boost::archive::binary_oarchive oaTsMeta(ossTsMeta);
304 oaTsMeta << *(fTsMetaData);
305 std::string* strMsgTsMetaE = new std::string(ossTsMeta.str());
306*/
307 FairMQMessagePtr messTsMeta(NewMessage());
308 // Serialize<RootSerializer>(*messTsMeta, fTsMetaData);
309 RootSerializer().Serialize(*messTsMeta, fTsMetaData);
310
311 std::stringstream ossSts;
312 boost::archive::binary_oarchive oaSts(ossSts);
313 oaSts << fUnpAlgoSts->GetVector();
314 std::string* strMsgSts = new std::string(ossSts.str());
315
316 std::stringstream ossMuch;
317 boost::archive::binary_oarchive oaMuch(ossMuch);
318 oaMuch << fUnpAlgoMuch->GetVector();
319 std::string* strMsgMuch = new std::string(ossMuch.str());
320
321 std::stringstream ossTrd;
322 boost::archive::binary_oarchive oaTrd(ossTrd);
323 oaTrd << fUnpAlgoTrd->GetVector();
324 std::string* strMsgTrd = new std::string(ossTrd.str());
325
327 std::vector<CbmTofDigi>& vDigiTofBmon = fUnpAlgoTof->GetVector();
328 std::vector<CbmTofDigi> vDigiTof = {};
329 std::vector<CbmTofDigi> vDigiBmon = {};
330
331 for (auto digi : vDigiTofBmon) {
332 if (fuDigiMaskedIdBmon == (digi.GetAddress() & fuDigiMaskId)) {
334 vDigiBmon.emplace_back(digi);
335 } // if( fuDigiMaskedIdBmon == ( digi.GetAddress() & fuDigiMaskId ) )
336 else {
338 vDigiTof.emplace_back(digi);
339 } // else of if( fuDigiMaskedIdBmon == ( digi.GetAddress() & fuDigiMaskId ) )
340 } // for( auto digi: vDigi )
341
342 std::stringstream ossTof;
343 boost::archive::binary_oarchive oaTof(ossTof);
344 oaTof << vDigiTof;
345 std::string* strMsgTof = new std::string(ossTof.str());
346
347 std::stringstream ossBmon;
348 boost::archive::binary_oarchive oaBmon(ossBmon);
349 oaBmon << vDigiBmon;
350 std::string* strMsgBmon = new std::string(ossBmon.str());
351
352 std::stringstream ossRich;
353 boost::archive::binary_oarchive oaRich(ossRich);
354 oaRich << fUnpAlgoRich->GetVector();
355 std::string* strMsgRich = new std::string(ossRich.str());
356
357 std::stringstream ossPsd;
358 boost::archive::binary_oarchive oaPsd(ossPsd);
359 oaPsd << fUnpAlgoPsd->GetVector();
360 std::string* strMsgPsd = new std::string(ossPsd.str());
361
362 FairMQParts parts;
363
364 parts.AddPart(std::move(messTsMeta));
365 /*
366 parts.AddPart( NewMessage( const_cast< char * >( strMsgTsMetaE->c_str() ), // data
367 strMsgTsMetaE->length(), // size
368 []( void* , void* object ){ delete static_cast< std::string * >( object ); },
369 strMsgTsMetaE
370 )
371 ); // object that manages the data
372*/
373
374 parts.AddPart(NewMessage(
375 const_cast<char*>(strMsgBmon->c_str()), // data
376 strMsgBmon->length(), // size
377 [](void*, void* object) { delete static_cast<std::string*>(object); },
378 strMsgBmon)); // object that manages the data
379
380 parts.AddPart(NewMessage(
381 const_cast<char*>(strMsgSts->c_str()), // data
382 strMsgSts->length(), // size
383 [](void*, void* object) { delete static_cast<std::string*>(object); },
384 strMsgSts)); // object that manages the data
385
386 parts.AddPart(NewMessage(
387 const_cast<char*>(strMsgMuch->c_str()), // data
388 strMsgMuch->length(), // size
389 [](void*, void* object) { delete static_cast<std::string*>(object); },
390 strMsgMuch)); // object that manages the data
391
392 parts.AddPart(NewMessage(
393 const_cast<char*>(strMsgTrd->c_str()), // data
394 strMsgTrd->length(), // size
395 [](void*, void* object) { delete static_cast<std::string*>(object); },
396 strMsgTrd)); // object that manages the data
397
398 parts.AddPart(NewMessage(
399 const_cast<char*>(strMsgTof->c_str()), // data
400 strMsgTof->length(), // size
401 [](void*, void* object) { delete static_cast<std::string*>(object); },
402 strMsgTof)); // object that manages the data
403
404 parts.AddPart(NewMessage(
405 const_cast<char*>(strMsgRich->c_str()), // data
406 strMsgRich->length(), // size
407 [](void*, void* object) { delete static_cast<std::string*>(object); },
408 strMsgRich)); // object that manages the data
409
410 parts.AddPart(NewMessage(
411 const_cast<char*>(strMsgPsd->c_str()), // data
412 strMsgPsd->length(), // size
413 [](void*, void* object) { delete static_cast<std::string*>(object); },
414 strMsgPsd)); // object that manages the data
415
416 if (Send(parts, fsChannelNameDataOutput) < 0) {
417 LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
418 return false;
419 }
420
421 return true;
422}
423
424
426{
427 if (nullptr != fUnpAlgoSts) delete fUnpAlgoSts;
428 if (nullptr != fUnpAlgoMuch) delete fUnpAlgoMuch;
429 if (nullptr != fUnpAlgoTrd) delete fUnpAlgoTrd;
430 if (nullptr != fUnpAlgoTof) delete fUnpAlgoTof;
431 if (nullptr != fUnpAlgoRich) delete fUnpAlgoRich;
432 if (nullptr != fUnpAlgoPsd) delete fUnpAlgoPsd;
433}
434
435
436Bool_t CbmDeviceMcbmUnpack::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
437{
438 fulTsCounter++;
439
440 if (kFALSE == fbComponentsAddedToList) {
441 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
442 switch (ts.descriptor(uCompIdx, 0).sys_id) {
443 case kusSysIdSts: {
445 break;
446 } // case kusSysIdSts
447 case kusSysIdMuch: {
449 break;
450 } // case kusSysIdMuch
451 case kusSysIdTrd: {
453 break;
454 } // case kusSysIdTrd
455 case kusSysIdTof: {
457 break;
458 } // case kusSysIdTof
459 case kusSysIdBmon: {
461 break;
462 } // case kusSysIdBmon
463 case kusSysIdRich: {
465 break;
466 } // case kusSysIdRich
467 case kusSysIdPsd: {
469 break;
470 } // case kusSysIdPsd
471 default: break;
472 } // switch( ts.descriptor( uCompIdx, 0 ).sys_id )
473 } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
475 } // if( kFALSE == fbComponentsAddedToList )
476
477 if (kFALSE == fUnpAlgoSts->ProcessTs(ts)) {
478 LOG(error) << "Failed processing TS " << ts.index() << " in STS unpacker algorithm class";
479 return kFALSE;
480 } // if( kFALSE == fUnpAlgoSts->ProcessTs( ts ) )
481
482 if (kFALSE == fUnpAlgoMuch->ProcessTs(ts)) {
483 LOG(error) << "Failed processing TS " << ts.index() << " in MUCH unpacker algorithm class";
484 return kFALSE;
485 } // if( kFALSE == fUnpAlgoMuch->ProcessTs( ts ) )
486
487 if (kFALSE == fUnpAlgoTrd->ProcessTs(ts)) {
488 LOG(error) << "Failed processing TS " << ts.index() << " in TRD unpacker algorithm class";
489 return kFALSE;
490 } // if( kFALSE == fUnpAlgoTrd->ProcessTs( ts ) )
491
492 if (kFALSE == fUnpAlgoTof->ProcessTs(ts)) {
493 LOG(error) << "Failed processing TS " << ts.index() << " in TOF unpacker algorithm class";
494 return kFALSE;
495 } // if( kFALSE == fUnpAlgoTof->ProcessTs( ts ) )
496
497 if (kFALSE == fUnpAlgoRich->ProcessTs(ts)) {
498 LOG(error) << "Failed processing TS " << ts.index() << " in RICH unpacker algorithm class";
499 return kFALSE;
500 } // if( kFALSE == fUnpAlgoRich->ProcessTs( ts ) )
501
502 if (kFALSE == fUnpAlgoPsd->ProcessTs(ts)) {
503 LOG(error) << "Failed processing TS " << ts.index() << " in PSD unpacker algorithm class";
504 return kFALSE;
505 } // if( kFALSE == fUnpAlgoPsd->ProcessTs( ts ) )
506
507
508 if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
509
510 return kTRUE;
511}
512
Bool_t bMcbm2018MonitorTaskBmonResetHistos
Double_t fdTsOverSizeInNs
Total size of the core MS in a TS, [nanoseconds].
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
TimesliceMetaData * fTsMetaData
Total size of all MS in a TS, [nanoseconds].
CbmMcbm2018UnpackerAlgoSts * fUnpAlgoSts
Processing algos.
Bool_t fbIgnoreOverlapMs
Control flags.
Double_t fdTsFullSizeInNs
Total size of the overlap MS in a TS, [nanoseconds].
std::string fsChannelNameDataInput
User settings parameters.
static const uint16_t kusSysIdSts
Constants.
size_t fuNbCoreMsPerTs
TS MetaData storage.
CbmMcbm2018UnpackerAlgoTof * fUnpAlgoTof
uint64_t fulNumMessages
Statistics & first TS rejection.
CbmMcbm2018UnpackerAlgoTrdR * fUnpAlgoTrd
static const uint16_t kusSysIdPsd
CbmMcbm2018UnpackerAlgoMuch * fUnpAlgoMuch
Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
static const uint16_t kusSysIdTrd
bool HandleData(FairMQMessagePtr &, int)
std::vector< std::string > fvsSetTimeOffs
Time offsets.
CbmMcbm2018UnpackerAlgoPsd * fUnpAlgoPsd
std::string fsChannelNameDataOutput
bool IsChannelNameAllowed(std::string channelName)
static const uint16_t kusSysIdBmon
CbmMcbm2018UnpackerAlgoRich * fUnpAlgoRich
static const uint16_t kusSysIdMuch
Double_t fdTsCoreSizeInNs
Size of a single MS, [nanoseconds].
Bool_t fbComponentsAddedToList
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
static const uint16_t kusSysIdTof
Bool_t InitParameters(TList *fParCList)
Parameters management.
static const uint16_t kusSysIdRich
void SetBinningFwFlag(Bool_t bEnable=kTRUE)
=> Quick and dirty hack for binning FW!!!
void SetTimeOffsetNs(Double_t dOffsetIn=0.0)
Bool_t ProcessTs(const fles::Timeslice &ts)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ProcessTs(const fles::Timeslice &ts)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
void SetTimeOffsetNs(Double_t dOffsetIn=0.0)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
virtual Bool_t ProcessTs(const fles::Timeslice &ts)
void SetTimeOffsetNs(Double_t dOffsetIn=0.0)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
void SetTimeOffsetNs(Double_t dOffsetIn=0.0)
void SetBinningFwFlag(Bool_t bEnable=kTRUE)
=> Quick and dirty hack for binning FW!!!
Bool_t ProcessTs(const fles::Timeslice &ts)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetTimeOffsetNs(Double_t dOffsetIn=0.0)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Timeslice unpacker algorithm for Spadic v.2.2 .
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetTimeOffsetNs(Double_t dOffsetIn=0.0)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t SetDigiOutputPointer(std::vector< CbmTrdDigi > *const pVector)
Set fTrdDigiVector to the address of pVector.
std::vector< T > & GetVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
void ClearVector()
For unpacker algos.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.