CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaMultiSampler.cxx
Go to the documentation of this file.
1/* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
4
14
15#include "CbmFlesCanvasTools.h"
17
18#include "TimesliceInputArchive.hpp"
19#include "TimesliceMultiInputArchive.hpp"
20#include "TimesliceMultiSubscriber.hpp"
21#include "TimesliceSubscriber.hpp"
22
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h" // device->fConfig
25
26#include <TCanvas.h>
27#include <TH1F.h>
28#include <TH1I.h>
29#include <TProfile.h>
30
31#include "BoostSerializer.h"
32#include <boost/algorithm/string.hpp>
33#include <boost/archive/binary_oarchive.hpp>
34#include <boost/filesystem.hpp>
35#include <boost/regex.hpp>
36#include <boost/serialization/utility.hpp>
37
38#include "RootSerializer.h"
39
40namespace filesys = boost::filesystem;
41
42#include <thread> // this_thread::sleep_for
43
44#include <algorithm>
45#include <chrono>
46#include <ctime>
47#include <iomanip>
48#include <sstream>
49#include <string>
50
51#include <stdio.h>
52
53using namespace std;
54
55#include <stdexcept>
56
57struct InitTaskError : std::runtime_error {
58 using std::runtime_error::runtime_error;
59};
60
62 : FairMQDevice()
63 , fMaxTimeslices(0)
64 , fFileName("")
65 , fDirName("")
66 , fInputFileList()
67 , fFileCounter(0)
68 , fHost("")
69 , fPort(0)
70 , fHighWaterMark(1)
71 , fTSCounter(0)
72 , fMessageCounter(0)
73 , fSource(nullptr)
74 , fTime()
75 , fLastPublishTime {std::chrono::system_clock::now()}
76{
77}
78
80try {
81 // Get the values from the command line options (via fConfig)
82 fFileName = fConfig->GetValue<string>("filename");
83 fDirName = fConfig->GetValue<string>("dirname");
84 fHost = fConfig->GetValue<string>("flib-host");
85 fPort = fConfig->GetValue<uint64_t>("flib-port");
86 fHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
87 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
88 fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts");
89 fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid");
90 fbSendTsPerChannel = fConfig->GetValue<bool>("send-ts-per-channel");
91 fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs");
92 fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds");
93 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
94 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
95 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
96 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
97 fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
98 fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
99
100 if (fbNoSplitTs) {
101 if (fbSendTsPerSysId) {
102 if (fbSendTsPerChannel) {
103 LOG(warning) << "Both no-split-ts, send-ts-per-sysid and "
104 "send-ts-per-channel options used => "
105 << " second and third one will be ignored!!!!";
106 } // if( fbSendTsPerSysId )
107 else
108 LOG(warning) << "Both no-split-ts and send-ts-per-sysid options used => "
109 << " second one will be ignored!!!!";
110 } // if( fbSendTsPerSysId )
111 else if (fbSendTsPerChannel) {
112 LOG(warning) << "Both no-split-ts and send-ts-per-channel options used => "
113 << " second one will be ignored!!!!";
114 } // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId )
115 } // if( fbNoSplitTs )
117 LOG(warning) << "Both send-ts-per-sysid and send-ts-per-channel options used => "
118 << " second one will be ignored!!!!";
119 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
120
122 std::vector<std::string> vSysIdChanPairs = fConfig->GetValue<std::vector<std::string>>("sysid-chan");
123 for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) {
124 const size_t sep = vSysIdChanPairs[uPair].find(':');
125 if (string::npos == sep || 0 == sep || vSysIdChanPairs[uPair].size() == sep) {
126 LOG(info) << vSysIdChanPairs[uPair];
127 throw InitTaskError("Provided pair of SysId + Channel name is missing a : or an argument.");
128 } // if( string::npos == sep || 0 == sep || vSysIdChanPairs[ uPair ].size() == sep )
129
131 std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep);
132 const size_t hexPos = sSysId.find("0x");
133 int iSysId;
134 if (string::npos == hexPos) iSysId = std::stoi(sSysId);
135 else
136 iSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16);
137
139 std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1);
140
142 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
143 if (fSysId.end() != pos) {
145 const vector<std::string>::size_type idx = pos - fSysId.begin();
146 fAllowedChannels[idx] = sChannelName;
147 } // if( fSysId.end() != pos )
148 else {
150 fSysId.push_back(iSysId);
151 fAllowedChannels.push_back(sChannelName);
152 } // else of if( fSysId.end() != pos )
153
154 LOG(info) << vSysIdChanPairs[uPair] << " " << iSysId << " " << sChannelName;
155 } // for( uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair )
156
157 if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
158
159 // Check which input is defined
160 // Posibilities
161 // filename && ! dirname : single file
162 // filename with wildcards && diranme : all files with filename regex in the directory
163 // host && port : connect to the flim server
164
165 bool isGoodInputCombi {false};
166 if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
167 isGoodInputCombi = true;
168 fInputFileList.push_back(fFileName);
169 }
170 else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
171 isGoodInputCombi = true;
172 fInputFileList.push_back(fFileName);
173 }
174 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
175 isGoodInputCombi = true;
176 LOG(info) << "Host: " << fHost;
177 LOG(info) << "Port: " << fPort;
178 }
179 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) {
180 isGoodInputCombi = true;
181 LOG(info) << "Host string: " << fHost;
182 }
183 else {
184 isGoodInputCombi = false;
185 }
186
187
188 if (!isGoodInputCombi) {
189 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
190 "or host + port are allowed combination.");
191 }
192
193
194 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
195
196 // Get the information about created channels from the device
197 // Check if the defined channels from the topology (by name)
198 // are in the list of channels which are possible/allowed
199 // for the device
200 // The idea is to check at initilization if the devices are
201 // properly connected. For the time beeing this is done with a
202 // nameing convention. It is not avoided that someone sends other
203 // data on this channel.
204 int noChannel = fChannels.size();
205 LOG(info) << "Number of defined output channels: " << noChannel;
206 for (auto const& entry : fChannels) {
209 if (entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands
210 || (0 < fuPublishFreqTs
211 && (entry.first == fsChannelNameHistosInput || entry.first == fsChannelNameHistosConfig
212 || entry.first == fsChannelNameCanvasConfig))) {
213 continue;
214 } // if( entry.first == fsChannelNameMissedTs || entry.first == fsChannelNameCommands || histo channels name)
215
216 LOG(info) << "Channel name: " << entry.first;
217 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
218 }
219
220 for (auto const& value : fComponentsToSend) {
221 LOG(info) << "Value : " << value;
222 if (value > 1) {
223 throw InitTaskError("Sending same data to more than one output channel "
224 "not implemented yet.");
225 }
226 }
227
228
229 if (0 == fFileName.size() && 0 != fHost.size() && 0 != fPort) {
230 // Don't add the protocol since this is done now in the TimesliceMultiSubscriber
231 //std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
232 std::string connector = fHost + ":" + std::to_string(fPort);
233 LOG(info) << "Open TSPublisher at " << connector;
234 fSource = new fles::TimesliceMultiSubscriber(connector);
235 }
236 else if (0 == fFileName.size() && 0 != fHost.size()) {
237 std::string connector = fHost;
238 LOG(info) << "Open TSPublisher with host string: " << connector;
239 fSource = new fles::TimesliceMultiSubscriber(connector, fHighWaterMark);
240 }
241 else {
242 // Create a ";" separated string with all file names
243 std::string fileList {""};
244 for (const auto& obj : fInputFileList) {
245 std::string fileName = obj;
246 fileList += fileName;
247 fileList += ";";
248 }
249 fileList.pop_back(); // Remove the last ;
250 LOG(info) << "Input File String: " << fileList;
251 fSource = new fles::TimesliceMultiInputArchive(fileList, fDirName);
252 if (!fSource) { throw InitTaskError("Could open files from file list."); }
253 }
254
255 LOG(info) << "High-Water Mark: " << fHighWaterMark;
256 LOG(info) << "Max. Timeslices: " << fMaxTimeslices;
257 if (fbNoSplitTs) { LOG(info) << "Sending TS copies in no-split mode"; } // if( fbNoSplitTs )
258 else if (fbSendTsPerSysId) {
259 LOG(info) << "Sending components in separate TS per SysId";
260 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs
261 else if (fbSendTsPerChannel) {
262 LOG(info) << "Sending components in separate TS per channel";
263 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
264
265 fTime = std::chrono::steady_clock::now();
266}
267catch (InitTaskError& e) {
268 LOG(error) << e.what();
269 ChangeState(fair::mq::Transition::ErrorFound);
270}
271
273{
275 if (fbNoSplitTs) {
277 fChannelsToSend[0].push_back(channelName);
278 return true;
279 } // if( fbNoSplitTs )
280
281 bool bFoundMatch = false;
282 // for(auto const &entry : fAllowedChannels) {
283 for (uint32_t idx = 0; idx < fAllowedChannels.size(); ++idx) {
284 auto const& entry = fAllowedChannels[idx];
285 LOG(info) << "Looking for name " << channelName << " in " << entry;
286 std::size_t pos1 = channelName.find(entry);
287 if (pos1 != std::string::npos) {
288 /*
289 const vector<std::string>::const_iterator pos =
290 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
291 const vector<std::string>::size_type idx = pos-fAllowedChannels.begin();
292*/
293 LOG(info) << "Found " << entry << " in " << channelName;
294 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx
295 << " (SysId 0x" << std::hex << fSysId[idx] << std::dec << ")";
296 fComponentsToSend[idx]++;
297 fChannelsToSend[idx].push_back(channelName);
298
300 if (fbSendTsPerChannel) bFoundMatch = true;
301 else
302 return true;
303 } // if (pos1!=std::string::npos)
304 }
306 if (fbSendTsPerChannel && bFoundMatch) return true;
307
308 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
309 LOG(error) << "Stop device.";
310 return false;
311}
312
314{
315 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
316 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
317 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
318
320 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
322 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
323
325 fhTsRate = new TH1I("TsRate", "TS rate; t [s]", 1800, 0., 1800.);
326 fhTsSize = new TH1I("TsSize", "Size of TS; Size [MB]", 15000, 0., 15000.);
327 fhTsSizeEvo = new TProfile("TsSizeEvo", "Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.);
328 fhTsMaxSizeEvo = new TH1F("TsMaxSizeEvo", "Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.);
329 fhMissedTS = new TH1I("Missed_TS", "Missed TS", 2, -0.5, 1.5);
330 fhMissedTSEvo = new TProfile("Missed_TS_Evo", "Missed TS evolution; t [s]", 1800, 0., 1800.);
331
333 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsRate, "Sampler"));
334 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSize, "Sampler"));
335 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSizeEvo, "Sampler"));
336 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsMaxSizeEvo, "Sampler"));
337 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTS, "Sampler"));
338 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTSEvo, "Sampler"));
339
341 Double_t w = 10;
342 Double_t h = 10;
343 fcSummary = new TCanvas("cSampSummary", "Sampler monitoring plots", w, h);
344 fcSummary->Divide(2, 3);
345
346 fcSummary->cd(1);
347 gPad->SetGridx();
348 gPad->SetGridy();
349 fhTsRate->Draw("hist");
350
351 fcSummary->cd(2);
352 gPad->SetGridx();
353 gPad->SetGridy();
354 gPad->SetLogx();
355 gPad->SetLogy();
356 fhTsSize->Draw("hist");
357
358 fcSummary->cd(3);
359 gPad->SetGridx();
360 gPad->SetGridy();
361 fhTsSizeEvo->Draw("hist");
362
363 fcSummary->cd(4);
364 gPad->SetGridx();
365 gPad->SetGridy();
366 fhTsMaxSizeEvo->Draw("hist");
367
368 fcSummary->cd(5);
369 gPad->SetGridx();
370 gPad->SetGridy();
371 fhMissedTS->Draw("hist");
372
373 fcSummary->cd(6);
374 gPad->SetGridx();
375 gPad->SetGridy();
376 fhMissedTSEvo->Draw("el");
377
379 vCanvases.push_back(std::pair<TCanvas*, std::string>(fcSummary, "canvases"));
380
385 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
386 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
387 // << " in " << vHistos[ uHisto ].second.data()
388 // ;
389 fArrayHisto.Add(vHistos[uHisto].first);
390 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
391 fvpsHistosFolder.push_back(psHistoConfig);
392
394 FairMQMessagePtr messageHist(NewMessage());
395 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig);
396 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);
397
399 if (Send(messageHist, fsChannelNameHistosConfig) < 0) {
400 LOG(fatal) << "Problem sending histo config";
401 } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
402
403 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
404 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
405
409 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
410 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
411 // << " in " << vCanvases[ uCanv ].second.data();
412 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
413 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
414
415 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
416
417 fvpsCanvasConfig.push_back(psCanvConfig);
418
420 FairMQMessagePtr messageCan(NewMessage());
421 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig);
422 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);
423
425 if (Send(messageCan, fsChannelNameCanvasConfig) < 0) {
426 LOG(fatal) << "Problem sending canvas config";
427 } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
428
429 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
430 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
431
432 return true;
433}
434
436{
437 if (0 < fuPublishFreqTs && 0 == fTSCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs )
438
440 if (0 == fTSCounter && nullptr != dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)) {
441 dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)->InitTimesliceSubscriber();
442 } // if( 0 == fTSCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) )
443
444 auto timeslice = fSource->get();
445 if (timeslice) {
447 fTSCounter++;
448
449 const fles::Timeslice& ts = *timeslice;
450 uint64_t uTsIndex = ts.index();
451
452 if (0 < fuPublishFreqTs) {
453 uint64_t uTsTime = ts.descriptor(0, 0).idx;
454 if (0 == fuStartTime) { fuStartTime = uTsTime; } // if( 0 == fuStartTime )
455 fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9;
456 uint64_t uSizeMb = 0;
457
458 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
459 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
460 } // for( uint_t uComp = 0; uComp < ts.num_components(); ++uComp )
461
462
463 fhTsRate->Fill(fdTimeToStart);
464 fhTsSize->Fill(uSizeMb);
465 fhTsSizeEvo->Fill(fdTimeToStart, uSizeMb);
466
468 if (0. == fdLastMaxTime) {
470 fdTsMaxSize = uSizeMb;
471 } // if( 0. == fdLastMaxTime )
472 else if (1. <= fdTimeToStart - fdLastMaxTime) {
475 fdTsMaxSize = uSizeMb;
476 } // else if if( 1 <= fdTimeToStart - fdLastMaxTime )
477 else if (fdTsMaxSize < uSizeMb) {
478 fdTsMaxSize = uSizeMb;
479 } // else if( fdTsMaxSize < uSizeMb )
480 } // if( 0 < fuPublishFreqTs )
481
483 if ((uTsIndex != (fuPrevTsIndex + 1)) && !(0 == fuPrevTsIndex && 0 == uTsIndex)) {
484 LOG(info) << "Missed Timeslices. Old TS Index was " << fuPrevTsIndex << " New TS Index is " << uTsIndex
485 << " diff is " << uTsIndex - fuPrevTsIndex << " Missing are " << uTsIndex - fuPrevTsIndex - 1;
486
487 if ("" != fsChannelNameMissedTs) {
489 std::vector<uint64_t> vulMissedIndices;
490 for (uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
491 vulMissedIndices.emplace_back(ulMiss);
492 } // for( uint64_t ulMiss = fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss )
493 if (!SendMissedTsIdx(vulMissedIndices)) {
495 if ("" != fsChannelNameCommands) {
497 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
498 SendCommand("STOP");
499 } // if( "" != fsChannelNameCommands )
500
501 return false;
502 } // if( !SendMissedTsIdx( vulMissedIndices ) )
503 } // if( "" != fsChannelNameMissedTs )
504
505 if (0 < fuPublishFreqTs) {
506 fhMissedTS->Fill(1, uTsIndex - fuPrevTsIndex - 1);
507 fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fuPrevTsIndex - 1);
508 } // if( 0 < fuPublishFreqTs )
509
510 } // if( ( uTsIndex != ( fuPrevTsIndex + 1 ) ) && !( 0 == fuPrevTsIndex && 0 == uTsIndex ) )
511
512 if (0 < fuPublishFreqTs) {
513 fhMissedTS->Fill(0);
514 fhMissedTSEvo->Fill(fdTimeToStart, 0, 1);
515 } // else if( 0 < fuPublishFreqTs )
516
517 fuPrevTsIndex = uTsIndex;
518
519 if (fTSCounter % 10000 == 0) { LOG(info) << "Received TS " << fTSCounter << " with index " << uTsIndex; }
520
521 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
522
523
524 // CheckTimeslice(ts);
525
526 if (fbNoSplitTs) {
529 if (!CreateAndSendFullTs(ts)) {
531 if ("" != fsChannelNameCommands) {
533 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
534 SendCommand("STOP");
535 } // if( "" != fsChannelNameCommands )
536
537 return false;
538 } // if( !CreateAndSendFullTs( ts ) )
539 } // if( fbNoSplitTs )
540 else if (fbSendTsPerSysId) {
545 if ("" != fsChannelNameCommands) {
547 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
548 SendCommand("STOP");
549 } // if( "" != fsChannelNameCommands )
550
551 return false;
552 } // if( !CreateAndCombineComponentsPerSysId( ts ) )
553 } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs )
554 else if (fbSendTsPerChannel) {
559 if ("" != fsChannelNameCommands) {
561 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
562 SendCommand("STOP");
563 } // if( "" != fsChannelNameCommands )
564
565 return false;
566 } // if( !CreateAndCombineComponentsPerChannel( ts ) )
567 } // else if( fbSendTsPerChannel ) of if( fbSendTsPerSysId )
568 else {
569 for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
570 if (!CreateAndSendComponent(ts, nrComp)) {
572 if ("" != fsChannelNameCommands) {
574 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
575 SendCommand("STOP");
576 } // if( "" != fsChannelNameCommands )
577
578 return false;
579 } // if( !CreateAndSendComponent(ts, nrComp) )
580 } // for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp)
581 } // else of if( fbSendTsPerSysId )
582
583 if (0 < fuPublishFreqTs) {
587 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
588 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
589 if ((fdMaxPublishTime < elapsedSeconds.count())
590 || (0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
592 fLastPublishTime = std::chrono::system_clock::now();
593 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fTSCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
594 } // if( 0 < fuPublishFreqTs )
595
596 return true;
597 } // if (fTSCounter < fMaxTimeslices)
598 else {
599 CalcRuntime();
600
602 if ("" != fsChannelNameCommands) {
604 std::this_thread::sleep_for(std::chrono::seconds(10));
605 std::string sCmd = "EOF ";
607 sCmd += " ";
609 SendCommand(sCmd);
610 } // if( "" != fsChannelNameCommands )
611
612 return false;
613 } // else of if (fTSCounter < fMaxTimeslices)
614 } // if (timeslice)
615 else {
616 CalcRuntime();
617
619 if ("" != fsChannelNameCommands) {
621 std::this_thread::sleep_for(std::chrono::seconds(10));
622 std::string sCmd = "EOF ";
624 sCmd += " ";
626 SendCommand(sCmd);
627 } // if( "" != fsChannelNameCommands )
628
629 return false;
630 } // else of if (timeslice)
631}
632
633bool CbmMQTsaMultiSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
634{
635
636 // Check if component has to be send. If the corresponding channel
637 // is connected create the new timeslice and send it to the
638 // correct channel
639
640 LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
641 const vector<int>::const_iterator pos =
642 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
643 if (pos != fSysId.end()) {
644 const vector<std::string>::size_type idx = pos - fSysId.begin();
645 if (fComponentsToSend[idx] > 0) {
646 LOG(debug) << "Create timeslice component for link " << nrComp;
647
648 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
649 component.append_component(ts.num_microslices(0));
650
651 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
652 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
653 }
654 /*
655 LOG(info) << "Number of core microslices before: " << ts.num_core_microslices();
656 LOG(info) << "Number of core microslices after : " << component.num_core_microslices();
657 LOG(info) << "Number of microslices: " << component.num_microslices(0);
658*/
659 if (!SendData(component, idx)) return false;
660 return true;
661 }
662 }
663 return true;
664}
665
667{
669 if (false == fbListCompPerSysIdReady) {
670 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
671 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
672
673 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId);
674 if (fSysId.end() != pos) {
675 const vector<std::string>::size_type idx = pos - fSysId.begin();
676
677 fvvCompPerSysId[idx].push_back(uCompIdx);
678 } // if( fSysId.end() != pos )
679 } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
680
681 for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
682 std::stringstream ss;
683 ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size() << " components for SysId 0x" << std::hex
684 << std::setw(2) << fSysId[uSysIdx] << std::dec << " :";
685
686 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
687 ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp];
688 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
689
690 LOG(info) << ss.str();
691 } // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId )
692
694 } // if( false == fbListCompPerSysIdReady )
695
697 for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
698 if (0 < fComponentsToSend[uSysIdx]) {
699 LOG(debug) << "Create timeslice with components for SysId " << std::hex << fSysId[uSysIdx] << std::dec;
700
701 if (0 < fvvCompPerSysId[uSysIdx].size()) {
702 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
703
704 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
705 uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerSysId[uSysIdx][uComp]);
706 component.append_component(uNumMsInComp);
707
708 LOG(debug) << "Add components to TS for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " TS "
709 << ts.index() << " Comp " << fvvCompPerSysId[uSysIdx][uComp];
710
711 for (size_t m = 0; m < uNumMsInComp; ++m) {
712 component.append_microslice(uComp, m, ts.descriptor(fvvCompPerSysId[uSysIdx][uComp], m),
713 ts.content(fvvCompPerSysId[uSysIdx][uComp], m));
714 } // for( size_t m = 0; m < uNumMsInComp; ++m )
715 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
716
717 LOG(debug) << "Prepared timeslice for SysId " << std::hex << fSysId[uSysIdx] << std::dec << " with "
718 << component.num_components() << " components";
719
720 if (!SendData(component, uSysIdx)) return false;
721 } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
722 } // if( 0 < fComponentsToSend[ uSysIdx ] )
723 } // for( uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
724
725 return true;
726}
727
729{
730
732 if (false == fbListCompPerChannelReady) {
734 for (uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx) {
735 if (0 < fComponentsToSend[uSysIdx]) {
736 for (uint32_t uChan = 0; uChan < fChannelsToSend[uSysIdx].size(); ++uChan) {
737 const vector<std::string>::const_iterator pos =
738 std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[uSysIdx][uChan]);
739 if (fvChannelsToSend.end() == pos) {
740 fvChannelsToSend.push_back(fChannelsToSend[uSysIdx][uChan]);
741 fvvCompPerChannel.push_back(std::vector<uint32_t>());
742 }
743 } // for( uChan = 0; uChan < fChannelsToSend[ uSysIdx ].size(); ++ uChan )
744 } // if( 0 < fComponentsToSend[ uSysIdx ] )
745 } // for( uint32_t uSysIdx = 0; uSysIdx < fComponentsToSend.size(); ++uSysIdx )
746
748 fvvCompPerChannel.resize(fvChannelsToSend.size());
749
752 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
753 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
754
755 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usSysId);
756 if (fSysId.end() != pos) {
757 const vector<std::string>::size_type idxSys = pos - fSysId.begin();
758
759 if (0 < fComponentsToSend[idxSys]) {
760 for (uint32_t uChan = 0; uChan < fChannelsToSend[idxSys].size(); ++uChan) {
761 const vector<std::string>::const_iterator posCh =
762 std::find(fvChannelsToSend.begin(), fvChannelsToSend.end(), fChannelsToSend[idxSys][uChan]);
763 if (fvChannelsToSend.end() != posCh) {
764 const vector<std::string>::size_type idxChan = posCh - fvChannelsToSend.begin();
765 fvvCompPerChannel[idxChan].push_back(uCompIdx);
766 } // if( fvChannelsToSend.end() != posCh )
767 } // for( uChan = 0; uChan < fChannelsToSend[ idxSys ].size(); ++ uChan )
768 } // if( 0 < fComponentsToSend[ uSysIdx ] )
769 } // if( fSysId.end() != pos )
770 } // for( uint32_t uNrComp = 0; uNrComp < ts.num_components(); ++uNrComp )
771
772 for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) {
773 std::stringstream ss;
774 ss << "Found " << std::setw(2) << fvvCompPerChannel[uChanIdx].size() << " components for channel "
775 << fvChannelsToSend[uChanIdx] << " :";
776
777 for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) {
778 ss << " " << std::setw(3) << fvvCompPerChannel[uChanIdx][uComp];
779 } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
780
781 LOG(info) << ss.str();
782 } // for( uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
783
785 } // if( false == fbListCompPerSysIdReady )
786
789
791 for (uint32_t uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx) {
792 LOG(debug) << "Create timeslice with components for channel " << fvChannelsToSend[uChanIdx];
793
794 if (0 < fvvCompPerChannel[uChanIdx].size()) {
795 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
796
797 for (uint32_t uComp = 0; uComp < fvvCompPerChannel[uChanIdx].size(); ++uComp) {
798 uint32_t uNumMsInComp = ts.num_microslices(fvvCompPerChannel[uChanIdx][uComp]);
799 component.append_component(uNumMsInComp);
800
801 LOG(debug) << "Add components to TS for SysId " << std::hex
802 << static_cast<uint16_t>(ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], 0).sys_id) << std::dec
803 << " TS " << ts.index() << " Comp " << fvvCompPerChannel[uChanIdx][uComp];
804
805 for (size_t m = 0; m < uNumMsInComp; ++m) {
806 component.append_microslice(uComp, m, ts.descriptor(fvvCompPerChannel[uChanIdx][uComp], m),
807 ts.content(fvvCompPerChannel[uChanIdx][uComp], m));
808 } // for( size_t m = 0; m < uNumMsInComp; ++m )
809 } // for( uint32_t uComp = 0; uComp < fvvCompPerChannel[ uChanIdx ].size(); ++uComp )
810
811 LOG(debug) << "Prepared timeslice for channel " << fvChannelsToSend[uChanIdx] << " with "
812 << component.num_components() << " components";
813
814 if (!SendData(component, fvChannelsToSend[uChanIdx])) return false;
815 } // if( 0 < fvvCompPerSysId[ uSysIdx ].size() )
816 } // for( uChanIdx = 0; uChanIdx < fvChannelsToSend.size(); ++uChanIdx )
817
818 return true;
819}
820
821bool CbmMQTsaMultiSampler::CreateAndSendFullTs(const fles::Timeslice& ts)
822{
824 for (uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx) {
825 if (0 < fComponentsToSend[uChanIdx]) {
826 LOG(debug) << "Copy timeslice component for channel " << fChannelsToSend[uChanIdx][0];
827
828 fles::StorableTimeslice fullTs {ts};
829 if (!SendData(fullTs, uChanIdx)) return false;
830 } // if( 0 < fComponentsToSend[ uChanIdx ] )
831 } // for( uint32_t uChanIdx = 0; uChanIdx < fChannelsToSend.size(); ++uChanIdx )
832 return true;
833}
834
835bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, int idx)
836{
837 // serialize the timeslice and create the message
838 std::stringstream oss;
839 boost::archive::binary_oarchive oa(oss);
840 oa << component;
841 std::string* strMsg = new std::string(oss.str());
842
843 FairMQMessagePtr msg(NewMessage(
844 const_cast<char*>(strMsg->c_str()), // data
845 strMsg->length(), // size
846 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
847 strMsg)); // object that manages the data
848
849 // TODO: Implement sending same data to more than one channel
850 // Need to create new message (copy message??)
851 if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; }
852
853 // in case of error or transfer interruption,
854 // return false to go to IDLE state
855 // successfull transfer will return number of bytes
856 // transfered (can be 0 if sending an empty message).
857
858 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
859 if (Send(msg, fChannelsToSend[idx][0]) < 0) {
860 LOG(error) << "Problem sending data";
861 return false;
862 }
863
865 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
866
867 return true;
868}
869
870bool CbmMQTsaMultiSampler::SendData(const fles::StorableTimeslice& component, std::string sChannel)
871{
872 // serialize the timeslice and create the message
873 std::stringstream oss;
874 boost::archive::binary_oarchive oa(oss);
875 oa << component;
876 std::string* strMsg = new std::string(oss.str());
877
878 FairMQMessagePtr msg(NewMessage(
879 const_cast<char*>(strMsg->c_str()), // data
880 strMsg->length(), // size
881 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
882 strMsg)); // object that manages the data
883
884 // in case of error or transfer interruption,
885 // return false to go to IDLE state
886 // successfull transfer will return number of bytes
887 // transfered (can be 0 if sending an empty message).
888 LOG(debug) << "Send data to channel " << sChannel;
889 if (Send(msg, sChannel) < 0) {
890 LOG(error) << "Problem sending data";
891 return false;
892 }
893
895 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
896
897 return true;
898}
899bool CbmMQTsaMultiSampler::SendMissedTsIdx(std::vector<uint64_t> vIndices)
900{
901 // serialize the vector and create the message
902 std::stringstream oss;
903 boost::archive::binary_oarchive oa(oss);
904 oa << vIndices;
905 std::string* strMsg = new std::string(oss.str());
906
907 FairMQMessagePtr msg(NewMessage(
908 const_cast<char*>(strMsg->c_str()), // data
909 strMsg->length(), // size
910 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
911 strMsg)); // object that manages the data
912
913 // in case of error or transfer interruption,
914 // return false to go to IDLE state
915 // successfull transfer will return number of bytes
916 // transfered (can be 0 if sending an empty message).
917 LOG(debug) << "Send data to channel " << fsChannelNameMissedTs;
918 if (Send(msg, fsChannelNameMissedTs) < 0) {
919 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs;
920 return false;
921 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
922
923 return true;
924}
925bool CbmMQTsaMultiSampler::SendCommand(std::string sCommand)
926{
927 // serialize the vector and create the message
928 std::stringstream oss;
929 boost::archive::binary_oarchive oa(oss);
930 oa << sCommand;
931 std::string* strMsg = new std::string(oss.str());
932
933 FairMQMessagePtr msg(NewMessage(
934 const_cast<char*>(strMsg->c_str()), // data
935 strMsg->length(), // size
936 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
937 strMsg)); // object that manages the data
938
939 // FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data
940 // sCommand.length(), // size
941 // []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
942 // &sCommand ) ); // object that manages the data
943
944 // in case of error or transfer interruption,
945 // return false to go to IDLE state
946 // successfull transfer will return number of bytes
947 // transfered (can be 0 if sending an empty message).
948 LOG(debug) << "Send data to channel " << fsChannelNameCommands;
949 if (Send(msg, fsChannelNameCommands) < 0) {
950 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands;
951 return false;
952 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
953
954 return true;
955}
956
958{
960 FairMQMessagePtr message(NewMessage());
961 // Serialize<RootSerializer>(*message, &fArrayHisto);
962 RootSerializer().Serialize(*message, &fArrayHisto);
963
965 if (Send(message, fsChannelNameHistosInput) < 0) {
966 LOG(error) << "Problem sending data";
967 return false;
968 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
969
972
973 return true;
974}
975
976
978{
979 fhTsRate->Reset();
980 fhTsSize->Reset();
981 fhTsSizeEvo->Reset();
982 fhTsMaxSizeEvo->Reset();
983 fhMissedTS->Reset();
984 fhMissedTSEvo->Reset();
985
986 return true;
987}
988
990
992{
993 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
994
995 LOG(info) << "Runtime: " << run_time.count();
996 LOG(info) << "No more input data";
997}
998
999
1000void CbmMQTsaMultiSampler::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
1001{
1002 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
1003 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
1004 LOG(info) << "Equipement ID: " << mdsc.eq_id;
1005 LOG(info) << "Flags: " << mdsc.flags;
1006 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
1007 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
1008 LOG(info) << "Microslice Idx: " << mdsc.idx;
1009 LOG(info) << "Checksum: " << mdsc.crc;
1010 LOG(info) << "Size: " << mdsc.size;
1011 LOG(info) << "Offset: " << mdsc.offset;
1012}
1013
1014bool CbmMQTsaMultiSampler::CheckTimeslice(const fles::Timeslice& ts)
1015{
1016 if (0 == ts.num_components()) {
1017 LOG(error) << "No Component in TS " << ts.index();
1018 return 1;
1019 }
1020 LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
1021
1022 for (size_t c = 0; c < ts.num_components(); ++c) {
1023 LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
1024 LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
1025 LOG(info) << "Component " << c << " has the system id 0x" << std::hex
1026 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
1027 LOG(info) << "Component " << c << " has the system id 0x" << static_cast<int>(ts.descriptor(c, 0).sys_id);
1028
1029 /*
1030 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
1031 PrintMicroSliceDescriptor(ts.descriptor(c,m));
1032 }
1033*/
1034 }
1035
1036 return true;
1037}
std::string GenerateCanvasConfigString(TCanvas *pCanv)
std::string FormatDecPrintout(uint64_t ulVal, char cFill, uint uWidth)
static constexpr size_t size()
Definition KfSimdPseudo.h:2
bool first
Generates beam ions for transport simulation.
std::string fsChannelNameCanvasConfig
std::vector< std::string > fAllowedChannels
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::chrono::steady_clock::time_point fTime
bool SendData(const fles::StorableTimeslice &, int)
std::vector< std::vector< uint32_t > > fvvCompPerChannel
std::string fsChannelNameHistosConfig
std::vector< std::vector< uint32_t > > fvvCompPerSysId
std::vector< std::string > fInputFileList
List of input files.
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
bool CreateAndCombineComponentsPerChannel(const fles::Timeslice &)
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
bool IsChannelNameAllowed(std::string)
bool CheckTimeslice(const fles::Timeslice &ts)
std::vector< std::vector< std::string > > fChannelsToSend
std::vector< int > fComponentsToSend
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool CreateAndSendFullTs(const fles::Timeslice &)
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::chrono::system_clock::time_point fLastPublishTime
fles::TimesliceSource * fSource
std::vector< std::string > fvChannelsToSend
bool CreateAndCombineComponentsPerSysId(const fles::Timeslice &)
std::vector< int > fSysId
bool CreateAndSendComponent(const fles::Timeslice &, int)
bool SendCommand(std::string sCommand)
Hash for CbmL1LinkKey.