CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaMultiSampler.cxx
Go to the documentation of this file.
1/* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
4
11
12
14
15#include "CbmFlesCanvasTools.h"
17
18#include "TimesliceInputArchive.hpp"
19#include "TimesliceMultiInputArchive.hpp"
20#include "TimesliceMultiSubscriber.hpp"
21#include "TimesliceSubscriber.hpp"
22
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h" // device->fConfig
25
26#include <TCanvas.h>
27#include <TH1F.h>
28#include <TH1I.h>
29#include <TProfile.h>
30
31#include "BoostSerializer.h"
32#include <boost/algorithm/string.hpp>
33#include <boost/archive/binary_oarchive.hpp>
34#include <boost/filesystem.hpp>
35#include <boost/regex.hpp>
36#include <boost/serialization/utility.hpp>
37
38#include "RootSerializer.h"
39
40namespace filesys = boost::filesystem;
41
42#include <algorithm>
43#include <chrono>
44#include <cstdio>
45#include <ctime>
46#include <iomanip>
47#include <sstream>
48#include <string>
49#include <thread> // this_thread::sleep_for
50
51using namespace std;
52
53#include <stdexcept>
54
55struct InitTaskError : std::runtime_error {
56 using std::runtime_error::runtime_error;
57};
58
60 : FairMQDevice()
62 , fFileName("")
63 , fDirName("")
65 , fFileCounter(0)
66 , fHost("")
67 , fPort(0)
69 , fTSCounter(0)
71 , fSource(nullptr)
72 , fTime()
73 , fLastPublishTime {std::chrono::system_clock::now()}
74{
75}
76
78try {
79 // Get the values from the command line options (via fConfig)
80 fFileName = fConfig->GetValue<string>("filename");
81 fDirName = fConfig->GetValue<string>("dirname");
82 fHost = fConfig->GetValue<string>("flib-host");
83 fPort = fConfig->GetValue<uint64_t>("flib-port");
84 fHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
85 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
86 fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts");
87 fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid");
88 fbSendTsPerChannel = fConfig->GetValue<bool>("send-ts-per-channel");
89 fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs");
90 fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds");
91 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
92 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
93 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
94 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
95 fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
96 fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
97
98 if (fbNoSplitTs) {
99 if (fbSendTsPerSysId) {
100 if (fbSendTsPerChannel) {
101 LOG(warning) << "Both no-split-ts, send-ts-per-sysid and "
102 "send-ts-per-channel options used => "
103 << " second and third one will be ignored!!!!";
104 } // if( fbSendTsPerSysId )
105 else
106 LOG(warning) << "Both no-split-ts and send-ts-per-sysid options used => "
107 << " second one will be ignored!!!!";
108 } // if( fbSendTsPerSysId )
109 else if (fbSendTsPerChannel) {
110 LOG(warning) << "Both no-split-ts and send-ts-per-channel options used => "
111 << " second one will be ignored!!!!";
112 } // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId )
113 } // if( fbNoSplitTs )
115 LOG(warning) << "Both send-ts-per-sysid and send-ts-per-channel options used => "
116 << " second one will be ignored!!!!";
117 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
118
120 std::vector<std::string> vSysIdChanPairs = fConfig->GetValue<std::vector<std::string>>("sysid-chan");
121 for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) {
122 const size_t sep = vSysIdChanPairs[uPair].find(':');
123 if (string::npos == sep || 0 == sep || vSysIdChanPairs[uPair].size() == sep) {
124 LOG(info) << vSysIdChanPairs[uPair];
125 throw InitTaskError("Provided pair of SysId + Channel name is missing a : or an argument.");
126 } // if( string::npos == sep || 0 == sep || vSysIdChanPairs[ uPair ].size() == sep )
127
129 std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep);
130 const size_t hexPos = sSysId.find("0x");
131 int iSysId;
132 if (string::npos == hexPos) iSysId = std::stoi(sSysId);
133 else
134 iSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16);
135
137 std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1);
138
140 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
141 if (fSysId.end() != pos) {
143 const vector<std::string>::size_type idx = pos - fSysId.begin();
144 fAllowedChannels[idx] = sChannelName;
145 } // if( fSysId.end() != pos )
146 else {
148 fSysId.push_back(iSysId);
149 fAllowedChannels.push_back(sChannelName);
150 } // else of if( fSysId.end() != pos )
151
152 LOG(info) << vSysIdChanPairs[uPair] << " " << iSysId << " " << sChannelName;
153 } // for( uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair )
154
155 if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
156
157 // Check which input is defined
158 // Posibilities
159 // filename && ! dirname : single file
160 // filename with wildcards && diranme : all files with filename regex in the directory
161 // host && port : connect to the flim server
162
163 bool isGoodInputCombi {false};
164 if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
165 isGoodInputCombi = true;
166 fInputFileList.push_back(fFileName);
167 }
168 else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
169 isGoodInputCombi = true;
170 fInputFileList.push_back(fFileName);
171 }
172 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
173 isGoodInputCombi = true;
174 LOG(info) << "Host: " << fHost;
175 LOG(info) << "Port: " << fPort;
176 }
177 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) {
178 isGoodInputCombi = true;
179 LOG(info) << "Host string: " << fHost;
180 }
181 else {
182 isGoodInputCombi = false;
183 }
184
185
186 if (!isGoodInputCombi) {
187 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
188 "or host + port are allowed combination.");
189 }
190
191
192 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
193
194 // Get the information about created channels from the device
195 // Check if the defined channels from the topology (by name)
196 // are in the list of channels which are possible/allowed
197 // for the device
198 // The idea is to check at initilization if the devices are
199 // properly connected. For the time beeing this is done with a
200 // nameing convention. It is not avoided that someone sends other
201 // data on this channel.
202 int noChannel = fChannels.size();
203 LOG(info) << "Number of defined output channels: " << noChannel;
204 for (auto const& entry : fChannels) {
207 if (entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands
208 || (0 < fuPublishFreqTs
209 && (entry.first == fsChannelNameHistosInput || entry.first == fsChannelNameHistosConfig
210 || entry.first == fsChannelNameCanvasConfig))) {
211 continue;
212 } // if( entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands || histo channels name)
213
214 LOG(info) << "Channel name: " << entry.first;
215 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
216 }
217
218 for (auto const& value : fComponentsToSend) {
219 LOG(info) << "Value : " << value;
220 if (value > 1) {
221 throw InitTaskError("Sending same data to more than one output channel "
222 "not implemented yet.");
223 }
224 }
225
226
227 if (0 == fFileName.size() && 0 != fHost.size() && 0 != fPort) {
228 // Don't add the protocol since this is done now in the TimesliceMultiSubscriber
229 //std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
230 std::string connector = fHost + ":" + std::to_string(fPort);
231 LOG(info) << "Open TSPublisher at " << connector;
232 fSource = new fles::TimesliceMultiSubscriber(connector);
233 }
234 else if (0 == fFileName.size() && 0 != fHost.size()) {
235 std::string connector = fHost;
236 LOG(info) << "Open TSPublisher with host string: " << connector;
237 fSource = new fles::TimesliceMultiSubscriber(connector, fHighWaterMark);
238 }
239 else {
240 // Create a ";" separated string with all file names
241 std::string fileList {""};
242 for (const auto& obj : fInputFileList) {
243 std::string fileName = obj;
244 fileList += fileName;
245 fileList += ";";
246 }
247 fileList.pop_back(); // Remove the last ;
248 LOG(info) << "Input File String: " << fileList;
249 fSource = new fles::TimesliceMultiInputArchive(fileList, fDirName);
250 if (!fSource) { throw InitTaskError("Could open files from file list."); }
251 }
252
253 LOG(info) << "High-Water Mark: " << fHighWaterMark;
254 LOG(info) << "Max. Timeslices: " << fMaxTimeslices;
255 if (fbNoSplitTs) { LOG(info) << "Sending TS copies in no-split mode"; } // if( fbNoSplitTs )
256 else if (fbSendTsPerSysId) {
257 LOG(info) << "Sending components in separate TS per SysId";
258 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs
259 else if (fbSendTsPerChannel) {
260 LOG(info) << "Sending components in separate TS per channel";
261 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
262
263 fTime = std::chrono::steady_clock::now();
264}
265catch (InitTaskError& e) {
266 LOG(error) << e.what();
267 ChangeState(fair::mq::Transition::ErrorFound);
268}
269
271{
273 if (fbNoSplitTs) {
275 fChannelsToSend[0].push_back(channelName);
276 return true;
277 } // if( fbNoSplitTs )
278
279 bool bFoundMatch = false;
280 // for(auto const &entry : fAllowedChannels) {
281 for (uint32_t idx = 0; idx < fAllowedChannels.size(); ++idx) {
282 auto const& entry = fAllowedChannels[idx];
283 LOG(info) << "Looking for name " << channelName << " in " << entry;
284 std::size_t pos1 = channelName.find(entry);
285 if (pos1 != std::string::npos) {
286 /*
287 const vector<std::string>::const_iterator pos =
288 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
289 const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
290*/
291 LOG(info) << "Found " << entry << " in " << channelName;
292 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx
293 << " (SysId 0x" << std::hex << fSysId[idx] << std::dec << ")";
294 fComponentsToSend[idx]++;
295 fChannelsToSend[idx].push_back(channelName);
296
298 if (fbSendTsPerChannel) bFoundMatch = true;
299 else
300 return true;
301 } // if (pos1!=std::string::npos)
302 }
304 if (fbSendTsPerChannel && bFoundMatch) return true;
305
306 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
307 LOG(error) << "Stop device.";
308 return false;
309}
310
312{
313 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
314 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
315 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
316
318 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
320 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
321
323 fhTsRate = new TH1I("TsRate", "TS rate; t [s]", 1800, 0., 1800.);
324 fhTsSize = new TH1I("TsSize", "Size of TS; Size [MB]", 15000, 0., 15000.);
325 fhTsSizeEvo = new TProfile("TsSizeEvo", "Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.);
326 fhTsMaxSizeEvo = new TH1F("TsMaxSizeEvo", "Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.);
327 fhMissedTS = new TH1I("Missed_TS", "Missed TS", 2, -0.5, 1.5);
328 fhMissedTSEvo = new TProfile("Missed_TS_Evo", "Missed TS evolution; t [s]", 1800, 0., 1800.);
329
331 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsRate, "Sampler"));
332 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSize, "Sampler"));
333 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSizeEvo, "Sampler"));
334 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsMaxSizeEvo, "Sampler"));
335 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTS, "Sampler"));
336 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTSEvo, "Sampler"));
337
339 Double_t w = 10;
340 Double_t h = 10;
341 fcSummary = new TCanvas("cSampSummary", "Sampler monitoring plots", w, h);
342 fcSummary->Divide(2, 3);
343
344 fcSummary->cd(1);
345 gPad->SetGridx();
346 gPad->SetGridy();
347 fhTsRate->Draw("hist");
348
349 fcSummary->cd(2);
350 gPad->SetGridx();
351 gPad->SetGridy();
352 gPad->SetLogx();
353 gPad->SetLogy();
354 fhTsSize->Draw("hist");
355
356 fcSummary->cd(3);
357 gPad->SetGridx();
358 gPad->SetGridy();
359 fhTsSizeEvo->Draw("hist");
360
361 fcSummary->cd(4);
362 gPad->SetGridx();
363 gPad->SetGridy();
364 fhTsMaxSizeEvo->Draw("hist");
365
366 fcSummary->cd(5);
367 gPad->SetGridx();
368 gPad->SetGridy();
369 fhMissedTS->Draw("hist");
370
371 fcSummary->cd(6);
372 gPad->SetGridx();
373 gPad->SetGridy();
374 fhMissedTSEvo->Draw("el");
375
377 vCanvases.push_back(std::pair<TCanvas*, std::string>(fcSummary, "canvases"));
378
383 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
384 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
385 // << " in " << vHistos[ uHisto ].second.data()
386 // ;
387 fArrayHisto.Add(vHistos[uHisto].first);
388 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
389 fvpsHistosFolder.push_back(psHistoConfig);
390
392 FairMQMessagePtr messageHist(NewMessage());
393 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig);
394 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);
395
397 if (Send(messageHist, fsChannelNameHistosConfig) < 0) {
398 LOG(fatal) << "Problem sending histo config";
399 } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
400
401 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
402 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
403
407 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
408 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
409 // << " in " << vCanvases[ uCanv ].second.data();
410 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
411 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
412
413 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
414
415 fvpsCanvasConfig.push_back(psCanvConfig);
416
418 FairMQMessagePtr messageCan(NewMessage());
419 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig);
420 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);
421
423 if (Send(messageCan, fsChannelNameCanvasConfig) < 0) {
424 LOG(fatal) << "Problem sending canvas config";
425 } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
426
427 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
428 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
429
430 return true;
431}
432
434{
435 if (0 < fuPublishFreqTs && 0 == fTSCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs )
436
438 if (0 == fTSCounter && nullptr != dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)) {
439 dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)->InitTimesliceSubscriber();
440 } // if( 0 == fTSCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) )
441
442 auto timeslice = fSource->get();
443 if (timeslice) {
445 fTSCounter++;
446
447 const fles::Timeslice& ts = *timeslice;
448 uint64_t uTsIndex = ts.index();
449
450 if (0 < fuPublishFreqTs) {
451 uint64_t uTsTime = ts.descriptor(0, 0).idx;
452 if (0 == fuStartTime) { fuStartTime = uTsTime; } // if( 0 == fuStartTime )
453 fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9;
454 uint64_t uSizeMb = 0;
455
456 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
457 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
458 } // for( uint_t uComp = 0; uComp < ts.num_components(); ++uComp )
459
460
461 fhTsRate->Fill(fdTimeToStart);
462 fhTsSize->Fill(uSizeMb);
463 fhTsSizeEvo->Fill(fdTimeToStart, uSizeMb);
464
466 if (0. == fdLastMaxTime) {
468 fdTsMaxSize = uSizeMb;
469 } // if( 0. == fdLastMaxTime )
470 else if (1. <= fdTimeToStart - fdLastMaxTime) {
473 fdTsMaxSize = uSizeMb;
474 } // else if if( 1 <= fdTimeToStart - fdLastMaxTime )
475 else if (fdTsMaxSize < uSizeMb) {
476 fdTsMaxSize = uSizeMb;
477 } // else if( fdTsMaxSize < uSizeMb )
478 } // if( 0 < fuPublishFreqTs )
479
481 if ((uTsIndex != (fuPrevTsIndex + 1)) && !(0 == fuPrevTsIndex && 0 == uTsIndex)) {
482 LOG(info) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex << " New TS Index is " << uTsIndex
483 << " diff is " << uTsIndex - fuPrevTsIndex << " Missing are " << uTsIndex - fuPrevTsIndex - 1;
484
485 if ("" != fsChannelNameMissedTs) {
487 std::vector<uint64_t> vulMissedIndices;
488 for (uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
489 vulMissedIndices.emplace_back(ulMiss);
490 } // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss )
491 if (!SendMissedTsIdx(vulMissedIndices)) {
493 if ("" != fsChannelNameCommands) {
495 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
496 SendCommand("STOP");
497 } // if( "" != fsChannelNameCommands )
498
499 return false;
500 } // if( !SendMissedTsIdx( vulMissedIndices ) )
501 } // if( "" != fsChannelNameMissedTs )
502
503 if (0 < fuPublishFreqTs) {
504 fhMissedTS->Fill(1, uTsIndex - fuPrevTsIndex - 1);
505 fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fuPrevTsIndex - 1);
506 } // if( 0 < fuPublishFreqTs )
507
508 } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && !( 0 == fuPrevTsIndex && 0 == uTsIndex ) )
509
510 if (0 < fuPublishFreqTs) {
511 fhMissedTS->Fill(0);
512 fhMissedTSEvo->Fill(fdTimeToStart, 0, 1);
513 } // else if( 0 < fuPublishFreqTs )
514
515 fuPrevTsIndex = uTsIndex;
516
517 if (fTSCounter % 10000 == 0) { LOG(info) << "Received TS " << fTSCounter << " with index " << uTsIndex; }
518
519 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
520
521
522 // CheckTimeslice(ts);
523
524 if (fbNoSplitTs) {
527 if (!CreateAndSendFullTs(ts)) {
529 if ("" != fsChannelNameCommands) {
531 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
532 SendCommand("STOP");
533 } // if( "" != fsChannelNameCommands )
534
535 return false;
536 } // if( !CreateAndSendFullTs( ts ) )
537 } // if( fbNoSplitTs )
538 else if (fbSendTsPerSysId) {
543 if ("" != fsChannelNameCommands) {
545 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
546 SendCommand("STOP");
547 } // if( "" != fsChannelNameCommands )
548
549 return false;
550 } // if( !CreateAndCombineComponentsPerSysId( ts ) )
551 } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs )
552 else if (fbSendTsPerChannel) {
557 if ("" != fsChannelNameCommands) {
559 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
560 SendCommand("STOP");
561 } // if( "" != fsChannelNameCommands )
562
563 return false;
564 } // if( !CreateAndCombineComponentsPerChannel( ts ) )
565 } // else if( fbSendTsPerChannel ) of if( fbSendTsPerSysId )
566 else {
567 for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
568 if (!CreateAndSendComponent(ts, nrComp)) {
570 if ("" != fsChannelNameCommands) {
572 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
573 SendCommand("STOP");
574 } // if( "" != fsChannelNameCommands )
575
576 return false;
577 } // if( !CreateAndSendComponent(ts, nrComp) )
578 } // for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp)
579 } // else of if( fbSendTsPerSysId )
580
581 if (0 < fuPublishFreqTs) {
585 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
586 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
587 if ((fdMaxPublishTime < elapsedSeconds.count())
588 || (0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
590 fLastPublishTime = std::chrono::system_clock::now();
591 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
592 } // if( 0 < fuPublishFreqTs )
593
594 return true;
595 } // if (fTSCounter < fMaxTimeslices)
596 else {
597 CalcRuntime();
598
600 if ("" != fsChannelNameCommands) {
602 std::this_thread::sleep_for(std::chrono::seconds(10));
603 std::string sCmd = "EOF ";
605 sCmd += " ";
607 SendCommand(sCmd);
608 } // if( "" != fsChannelNameCommands )
609
610 return false;
611 } // else of if (fTSCounter < fMaxTimeslices)
612 } // if (timeslice)
613 else {
614 CalcRuntime();
615
617 if ("" != fsChannelNameCommands) {
619 std::this_thread::sleep_for(std::chrono::seconds(10));
620 std::string sCmd = "EOF ";
622 sCmd += " ";
624 SendCommand(sCmd);
625 } // if( "" != fsChannelNameCommands )
626
627 return false;
628 } // else of if (timeslice)
629}
630
631bool CbmMQTsaMultiSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
632{
633
634 // Check if component has to be send. If the corresponding channel
635 // is connected create the new timeslice and send it to the
636 // correct channel
637
638 LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
639 const vector<int>::const_iterator pos =
640 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
641 if (pos != fSysId.end()) {
642 const vector<std::string>::size_type idx = pos - fSysId.begin();
643 if (fComponentsToSend[idx] > 0) {
644 LOG(debug) << "Create timeslice component for link " << nrComp;
645
646 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
647 component.append_component(ts.num_microslices(0));
648
649 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
650 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
651 }
652 /*
653 LOG(info) << "Number of core microslices before: " << ts.num_core_microslices();
654 LOG(info) << "Number of core microslices after : " << component.num_core_microslices();
655 LOG(info) << "Number of microslices: " << component.num_microslices(0);
656*/
657 if (!SendData(component, idx)) return false;
658 return true;
659 }
660 }
661 return true;
662}
663
665{
667 if (false == fbListCompPerSysIdReady) {
668 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
669 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
670
671 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId);
672 if (fSysId.end() != pos) {
673 const vector<std::string>::size_type idx = pos - fSysId.begin();
674
675 fvvCompPerSysId[idx].push_back(uCompIdx);
676 } // if( fSysId.end() != pos )
677 } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
678
679 for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
680 std::stringstream ss;
681 ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size() << " components for SysId 0x" << std::hex
682 << std::setw(2) << fSysId[uSysIdx] << std::dec << " :";
683
684 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
685 ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp];
686 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
687
688 LOG(info) << ss.str();
689 } // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId )
690
692 } // if( false == fbListCompPerSysIdReady )
693
695 for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
696 if (0 < fComponentsToSend[uSysIdx]) {
697 LOG(debug) << "Create timeslice with components for SysId " << std::hex << fSysId[uSysIdx] << std::dec;
698
699 if (0 < fvvCompPerSysId[uSysIdx].size()) {
700 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
701
702 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
703 uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerSysId[uSysIdx][uComp]);
704 component.append_component(uNumMsInComp);
705
706 LOG(debug) << "Add components to TS for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " TS "
707 << ts.index() << " Comp " << fvvCompPerSysId[uSysIdx][uComp];
708
709 for (size_t m = 0; m < uNumMsInComp; ++m) {
710 component.append_microslice(uComp, m, ts.descriptor(fvvCompPerSysId[uSysIdx][uComp], m),
711 ts.content(fvvCompPerSysId[uSysIdx][uComp], m));
712 } // for( size_t m = 0; m < uNumMsInComp; ++m )
713 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
714
715 LOG(debug) << "Prepared timeslice for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " with "
716 << component.num_components() << " components";
717
718 if (!SendData(component, uSysIdx)) return false;
719 } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
720 } // if( 0 < fComponentsToSend[ uSysIdx ] )
721 } // for( uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
722
723 return true;
724}
725
727{
728
730 if (false == fbListCompPerChannelReady) {
732 for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
733 if (0 < fComponentsToSend[uSysIdx]) {
734 for (uint32_t uChan = 0; uChan < fChannelsToSend[uSysIdx].size(); ++uChan) {
735 const vector<std::string>::const_iterator pos =
736 std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[uSysIdx][uChan]);
737 if (fvChannelsToSend.end() == pos) {
738 fvChannelsToSend.push_back(fChannelsToSend[uSysIdx][uChan]);
739 fvvCompPerChannel.push_back(std::vector<uint32_t>());
740 }
741 } // for( uChan = 0; uChan < fChannelsToSend[ uSysIdx ].size(); ++ uChan )
742 } // if( 0 < fComponentsToSend[ uSysIdx ] )
743 } // for( uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
744
746 fvvCompPerChannel.resize(fvChannelsToSend.size());
747
750 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
751 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
752
753 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId);
754 if (fSysId.end() != pos) {
755 const vector<std::string>::size_type idxSys = pos - fSysId.begin();
756
757 if (0 < fComponentsToSend[idxSys]) {
758 for (uint32_t uChan = 0; uChan < fChannelsToSend[idxSys].size(); ++uChan) {
759 const vector<std::string>::const_iterator posCh =
760 std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[idxSys][uChan]);
761 if (fvChannelsToSend.end() != posCh) {
762 const vector<std::string>::size_type idxChan = posCh - fvChannelsToSend.begin();
763 fvvCompPerChannel[idxChan].push_back(uCompIdx);
764 } // if( fvChannelsToSend.end() != posCh )
765 } // for( uChan = 0; uChan < fChannelsToSend[ idxSys ].size(); ++ uChan )
766 } // if( 0 < fComponentsToSend[ uSysIdx ] )
767 } // if( fSysId.end() != pos )
768 } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
769
770 for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) {
771 std::stringstream ss;
772 ss << "Found " << std::setw(2) << fvvCompPerChannel[uChanIdx].size() << " components for channel "
773 << fvChannelsToSend[uChanIdx] << " :";
774
775 for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) {
776 ss << " " << std::setw(3) << fvvCompPerChannel[uChanIdx][uComp];
777 } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
778
779 LOG(info) << ss.str();
780 } // for( uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
781
783 } // if( false == fbListCompPerSysIdReady )
784
787
789 for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) {
790 LOG(debug) << "Create timeslice with components for channel " << fvChannelsToSend[uChanIdx];
791
792 if (0 < fvvCompPerChannel[uChanIdx].size()) {
793 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
794
795 for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) {
796 uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerChannel[uChanIdx][uComp]);
797 component.append_component(uNumMsInComp);
798
799 LOG(debug) << "Add components to TS for SysId " << std::hex
800 << static_cast<uint16_t>(ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], 0).sys_id) << std::dec
801 << " TS " << ts.index() << " Comp " << fvvCompPerChannel[uChanIdx][uComp];
802
803 for (size_t m = 0; m < uNumMsInComp; ++m) {
804 component.append_microslice(uComp, m, ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], m),
805 ts.content(fvvCompPerChannel[uChanIdx][uComp], m));
806 } // for( size_t m = 0; m < uNumMsInComp; ++m )
807 } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
808
809 LOG(debug) << "Prepared timeslice for channel " << fvChannelsToSend[uChanIdx] << " with "
810 << component.num_components() << " components";
811
812 if (!SendData(component, fvChannelsToSend[uChanIdx])) return false;
813 } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
814 } // for( uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
815
816 return true;
817}
818
819bool CbmMQTsaMultiSampler::CreateAndSendFullTs(const fles::Timeslice& ts)
820{
822 for (uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx) {
823 if (0 < fComponentsToSend[uChanIdx]) {
824 LOG(debug) << "Copy timeslice component for channel " << fChannelsToSend[uChanIdx][0];
825
826 fles::StorableTimeslice fullTs {ts};
827 if (!SendData(fullTs, uChanIdx)) return false;
828 } // if( 0 < fComponentsToSend[ uChanIdx ] )
829 } // for( uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx )
830 return true;
831}
832
833bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, int idx)
834{
835 // serialize the timeslice and create the message
836 std::stringstream oss;
837 boost::archive::binary_oarchive oa(oss);
838 oa << component;
839 std::string* strMsg = new std::string(oss.str());
840
841 FairMQMessagePtr msg(NewMessage(
842 const_cast<char*>(strMsg->c_str()), // data
843 strMsg->length(), // size
844 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
845 strMsg)); // object that manages the data
846
847 // TODO: Implement sending same data to more than one channel
848 // Need to create new message (copy message??)
849 if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; }
850
851 // in case of error or transfer interruption,
852 // return false to go to IDLE state
853 // successfull transfer will return number of bytes
854 // transfered (can be 0 if sending an empty message).
855
856 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
857 if (Send(msg, fChannelsToSend[idx][0]) < 0) {
858 LOG(error) << "Problem sending data";
859 return false;
860 }
861
863 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
864
865 return true;
866}
867
868bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, std::string sChannel)
869{
870 // serialize the timeslice and create the message
871 std::stringstream oss;
872 boost::archive::binary_oarchive oa(oss);
873 oa << component;
874 std::string* strMsg = new std::string(oss.str());
875
876 FairMQMessagePtr msg(NewMessage(
877 const_cast<char*>(strMsg->c_str()), // data
878 strMsg->length(), // size
879 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
880 strMsg)); // object that manages the data
881
882 // in case of error or transfer interruption,
883 // return false to go to IDLE state
884 // successfull transfer will return number of bytes
885 // transfered (can be 0 if sending an empty message).
886 LOG(debug) << "Send data to channel " << sChannel;
887 if (Send(msg, sChannel) < 0) {
888 LOG(error) << "Problem sending data";
889 return false;
890 }
891
893 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
894
895 return true;
896}
897bool CbmMQTsaMultiSampler::SendMissedTsIdx(std::vector<uint64_t> vIndices)
898{
899 // serialize the vector and create the message
900 std::stringstream oss;
901 boost::archive::binary_oarchive oa(oss);
902 oa << vIndices;
903 std::string* strMsg = new std::string(oss.str());
904
905 FairMQMessagePtr msg(NewMessage(
906 const_cast<char*>(strMsg->c_str()), // data
907 strMsg->length(), // size
908 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
909 strMsg)); // object that manages the data
910
911 // in case of error or transfer interruption,
912 // return false to go to IDLE state
913 // successfull transfer will return number of bytes
914 // transfered (can be 0 if sending an empty message).
915 LOG(debug) << "Send data to channel " << fsChannelNameMissedTs;
916 if (Send(msg, fsChannelNameMissedTs) < 0) {
917 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs;
918 return false;
919 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
920
921 return true;
922}
923bool CbmMQTsaMultiSampler::SendCommand(std::string sCommand)
924{
925 // serialize the vector and create the message
926 std::stringstream oss;
927 boost::archive::binary_oarchive oa(oss);
928 oa << sCommand;
929 std::string* strMsg = new std::string(oss.str());
930
931 FairMQMessagePtr msg(NewMessage(
932 const_cast<char*>(strMsg->c_str()), // data
933 strMsg->length(), // size
934 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
935 strMsg)); // object that manages the data
936
937 // FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data
938 // sCommand.length(), // size
939 // []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
940 // &sCommand ) ); // object that manages the data
941
942 // in case of error or transfer interruption,
943 // return false to go to IDLE state
944 // successfull transfer will return number of bytes
945 // transfered (can be 0 if sending an empty message).
946 LOG(debug) << "Send data to channel " << fsChannelNameCommands;
947 if (Send(msg, fsChannelNameCommands) < 0) {
948 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands;
949 return false;
950 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
951
952 return true;
953}
954
956{
958 FairMQMessagePtr message(NewMessage());
959 // Serialize<RootSerializer>(*message, &fArrayHisto);
960 RootSerializer().Serialize(*message, &fArrayHisto);
961
963 if (Send(message, fsChannelNameHistosInput) < 0) {
964 LOG(error) << "Problem sending data";
965 return false;
966 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
967
970
971 return true;
972}
973
974
976{
977 fhTsRate->Reset();
978 fhTsSize->Reset();
979 fhTsSizeEvo->Reset();
980 fhTsMaxSizeEvo->Reset();
981 fhMissedTS->Reset();
982 fhMissedTSEvo->Reset();
983
984 return true;
985}
986
988
990{
991 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
992
993 LOG(info) << "Runtime: " << run_time.count();
994 LOG(info) << "No more input data";
995}
996
997
998void CbmMQTsaMultiSampler::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
999{
1000 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
1001 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
1002 LOG(info) << "Equipement ID: " << mdsc.eq_id;
1003 LOG(info) << "Flags: " << mdsc.flags;
1004 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
1005 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
1006 LOG(info) << "Microslice Idx: " << mdsc.idx;
1007 LOG(info) << "Checksum: " << mdsc.crc;
1008 LOG(info) << "Size: " << mdsc.size;
1009 LOG(info) << "Offset: " << mdsc.offset;
1010}
1011
1012bool CbmMQTsaMultiSampler::CheckTimeslice(const fles::Timeslice& ts)
1013{
1014 if (0 == ts.num_components()) {
1015 LOG(error) << "No Component in TS " << ts.index();
1016 return 1;
1017 }
1018 LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
1019
1020 for (size_t c = 0; c < ts.num_components(); ++c) {
1021 LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
1022 LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
1023 LOG(info) << "Component " << c << " has the system id 0x" << std::hex
1024 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
1025 LOG(info) << "Component " << c << " has the system id 0x" << static_cast<int>(ts.descriptor(c, 0).sys_id);
1026
1027 /*
1028 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
1029 PrintMicroSliceDescriptor(ts.descriptor(c,m));
1030 }
1031*/
1032 }
1033
1034 return true;
1035}
std::string GenerateCanvasConfigString(TCanvas *pCanv)
std::string FormatDecPrintout(uint64_t ulVal, char cFill, uint uWidth)
static constexpr size_t size()
Definition KfSimdPseudo.h:2
bool first
Generates beam ions for transport simulation.
std::string fsChannelNameCanvasConfig
std::vector< std::string > fAllowedChannels
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::chrono::steady_clock::time_point fTime
bool SendData(const fles::StorableTimeslice &, int)
std::vector< std::vector< uint32_t > > fvvCompPerChannel
std::string fsChannelNameHistosConfig
std::vector< std::vector< uint32_t > > fvvCompPerSysId
std::vector< std::string > fInputFileList
List of input files.
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
bool CreateAndCombineComponentsPerChannel(const fles::Timeslice &)
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
bool IsChannelNameAllowed(std::string)
bool CheckTimeslice(const fles::Timeslice &ts)
std::vector< std::vector< std::string > > fChannelsToSend
std::vector< int > fComponentsToSend
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool CreateAndSendFullTs(const fles::Timeslice &)
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::chrono::system_clock::time_point fLastPublishTime
fles::TimesliceSource * fSource
std::vector< std::string > fvChannelsToSend
bool CreateAndCombineComponentsPerSysId(const fles::Timeslice &)
std::vector< int > fSysId
bool CreateAndSendComponent(const fles::Timeslice &, int)
bool SendCommand(std::string sCommand)
Hash for CbmL1LinkKey.