CbmRoot
Loading...
Searching...
No Matches
CbmMQTsSamplerRepReq.cxx
Go to the documentation of this file.
1/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
6
9
10#include "TimesliceInputArchive.hpp"
11#include "TimesliceMultiInputArchive.hpp"
12#include "TimesliceMultiSubscriber.hpp"
13#include "TimesliceSubscriber.hpp"
14
15#include "FairMQLogger.h"
16#include "FairMQProgOptions.h" // device->fConfig
17
18#include <TCanvas.h>
19#include <TH1F.h>
20#include <TH1I.h>
21#include <TProfile.h>
22
23#include "BoostSerializer.h"
24#include <boost/algorithm/string.hpp>
25#include <boost/archive/binary_oarchive.hpp>
26//#include <boost/filesystem.hpp>
27#include <boost/regex.hpp>
28#include <boost/serialization/utility.hpp>
29
30#include "RootSerializer.h"
31
32//namespace filesys = boost::filesystem;
33
34#include <algorithm>
35#include <chrono>
36#include <cstdio>
37#include <ctime>
38#include <iomanip>
39#include <sstream>
40#include <string>
41#include <thread> // this_thread::sleep_for
42
43using namespace std;
44
45#include <stdexcept>
46
47struct InitTaskError : std::runtime_error {
48 using std::runtime_error::runtime_error;
49};
50
52 : FairMQDevice()
53 , fTime()
54 , fLastPublishTime {std::chrono::system_clock::now()}
55{
56}
57
59try {
60 // Get the values from the command line options (via fConfig)
61 fsFileName = fConfig->GetValue<string>("filename");
62 fsDirName = fConfig->GetValue<string>("dirname");
63 fsHost = fConfig->GetValue<string>("fles-host");
64 fusPort = fConfig->GetValue<uint16_t>("fles-port");
65 fulHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
66 fulMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
67 fsChannelNameTsRequest = fConfig->GetValue<std::string>("ChNameTsReq");
68 fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts");
69 fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid");
70 fbSendTsPerBlock = fConfig->GetValue<bool>("send-ts-per-block");
71 fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs");
72 fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds");
73 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
74 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
75 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
76 fsHistosSuffix = fConfig->GetValue<std::string>("HistosSuffix");
77 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
78
79 if (fbNoSplitTs) {
80 if (fbSendTsPerSysId) {
81 if (fbSendTsPerBlock) {
82 LOG(warning) << "Both no-split-ts, send-ts-per-sysid and send-ts-per-block options used => "
83 << " second and third one will be ignored!!!!";
84 } // if( fbSendTsPerBlock )
85 else
86 LOG(warning) << "Both no-split-ts and send-ts-per-sysid options used => "
87 << " second one will be ignored!!!!";
88 } // if( fbSendTsPerSysId )
89 else if (fbSendTsPerBlock) {
90 LOG(warning) << "Both no-split-ts and send-ts-per-block options used => "
91 << " second one will be ignored!!!!";
92 } // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId )
93 else
94 LOG(debug) << "Running in no-split-ts mode!";
95 } // if( fbNoSplitTs )
96 else if (fbSendTsPerBlock) {
97 if (fbSendTsPerSysId) {
98 LOG(warning) << "Both send-ts-per-sysid and send-ts-per-block options used => "
99 << " second one will be ignored!!!!";
100 } // if (fbSendTsPerSysId)
101 else
102 LOG(debug) << "Running in send-ts-per-block mode!";
103 } // else if( fbSendTsPerBlock ) of if( fbNoSplitTs )
104 else if (fbSendTsPerSysId) {
105 LOG(debug) << "Running in send-ts-per-sysid mode!";
106 } // else if (fbSendTsPerSysId) else if( fbSendTsPerBlock ) of if( fbNoSplitTs )
107 else {
108 LOG(debug) << "Running in no-split-ts mode by default!";
109 fbNoSplitTs = true;
110 } // else of else if (fbSendTsPerSysId) else if( fbSendTsPerBlock ) of if( fbNoSplitTs )
111
113 std::vector<std::string> vSysIdBlockPairs = fConfig->GetValue<std::vector<std::string>>("block-sysid");
114 for (uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair) {
115 const size_t sep = vSysIdBlockPairs[uPair].find(':');
116 if (string::npos == sep || 0 == sep || vSysIdBlockPairs[uPair].size() == sep) {
117 LOG(info) << vSysIdBlockPairs[uPair];
118 throw InitTaskError("Provided pair of Block name + SysId is missing a : or an argument.");
119 } // if( string::npos == sep || 0 == sep || vSysIdBlockPairs[ uPair ].size() == sep )
120
122 std::string sBlockName = vSysIdBlockPairs[uPair].substr(0, sep);
123
126 std::string sSysId = vSysIdBlockPairs[uPair].substr(sep + 1);
127 const size_t hexPos = sSysId.find("0x");
128 uint16_t usSysId;
129 if (string::npos == hexPos) usSysId = std::stoi(sSysId);
130 else
131 usSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16);
132
133 LOG(debug) << "Extracted block info from pair \"" << vSysIdBlockPairs[uPair] << "\": name is " << sBlockName
134 << " and SysId is " << usSysId << " extracted from " << sSysId;
135
137 uint32_t uSysIdIdx = 0;
138 for (; uSysIdIdx < fSysId.size() && fSysId[uSysIdIdx] != usSysId; ++uSysIdIdx) {}
139 if (uSysIdIdx == fSysId.size()) { throw InitTaskError("Unknown System ID for " + vSysIdBlockPairs[uPair]); }
140 else if (true == fComponentActive[uSysIdIdx]) {
141 throw InitTaskError("System ID already in use by another block for " + vSysIdBlockPairs[uPair]);
142 }
143 fComponentActive[uSysIdIdx] = true;
144
146 auto itBlock = fvBlocksToSend.begin();
147 for (; itBlock != fvBlocksToSend.end(); ++itBlock) {
148 if ((*itBlock).first == sBlockName) break;
149 } // for( ; itBlock != fvBlocksToSend.end(); ++itBlock)
150 if (fvBlocksToSend.end() != itBlock) {
152 (*itBlock).second.insert(usSysId);
153 } // if( fvBlocksToSend.end() != itBlock )
154 else {
156 fvBlocksToSend.push_back(std::pair<std::string, std::set<uint16_t>>(sBlockName, {usSysId}));
157 fvvCompPerBlock.push_back(std::vector<uint32_t>({}));
158 } // else of if( fSysId.end() != pos )
159
160 LOG(info) << vSysIdBlockPairs[uPair] << " Added SysId 0x" << std::hex << usSysId << std::dec << " to "
161 << sBlockName;
162 } // for( uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair )
163
164 if (0 == fulMaxTimeslices) fulMaxTimeslices = UINT_MAX;
165
166 // Check which input is defined
167 // Possibilities:
168 // filename && ! dirname : single file
169 // filename with wildcards && dirname : all files with filename regex in the directory
170 // host && port : connect to the flesnet server
171 bool isGoodInputCombi {false};
172 if (0 != fsFileName.size() && 0 == fsDirName.size() && 0 == fsHost.size() && 0 == fusPort) {
173 isGoodInputCombi = true;
174 fvsInputFileList.push_back(fsFileName);
175 }
176 else if (0 != fsFileName.size() && 0 != fsDirName.size() && 0 == fsHost.size() && 0 == fusPort) {
177 isGoodInputCombi = true;
178 fvsInputFileList.push_back(fsFileName);
179 }
180 else if (0 == fsFileName.size() && 0 == fsDirName.size() && 0 != fsHost.size() && 0 != fusPort) {
181 isGoodInputCombi = true;
182 LOG(info) << "Host: " << fsHost;
183 LOG(info) << "Port: " << fusPort;
184 }
185 else if (0 == fsFileName.size() && 0 == fsDirName.size() && 0 != fsHost.size() && 0 == fusPort) {
186 isGoodInputCombi = true;
187 LOG(info) << "Host string: " << fsHost;
188 }
189 else {
190 isGoodInputCombi = false;
191 }
192
193 if (!isGoodInputCombi) {
194 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
195 "or host + port are allowed combination.");
196 }
197
198 LOG(info) << "MaxTimeslices: " << fulMaxTimeslices;
199
200 if (0 == fsFileName.size() && 0 != fsHost.size() && 0 != fusPort) {
201 // Don't add the protocol since this is done now in the TimesliceMultiSubscriber
202 //std::string connector = "tcp://" + fsHost + ":" + std::to_string(fusPort);
203 std::string connector = fsHost + ":" + std::to_string(fusPort);
204 LOG(info) << "Open TSPublisher at " << connector;
205 fSource = new fles::TimesliceMultiSubscriber(connector, fulHighWaterMark);
206 }
207 else if (0 == fsFileName.size() && 0 != fsHost.size()) {
208 std::string connector = fsHost;
209 LOG(info) << "Open TSPublisher with host string: " << connector;
210 fSource = new fles::TimesliceMultiSubscriber(connector, fulHighWaterMark);
211 }
212 else {
213 // Create a ";" separated string with all file names
214 std::string fileList {""};
215 for (const auto& obj : fvsInputFileList) {
216 std::string fileName = obj;
217 fileList += fileName;
218 fileList += ";";
219 }
220 fileList.pop_back(); // Remove the last ;
221 LOG(info) << "Input File String: " << fileList;
222 fSource = new fles::TimesliceMultiInputArchive(fileList, fsDirName);
223 if (!fSource) { throw InitTaskError("Could open files from file list."); }
224 }
225
226 LOG(info) << "High-Water Mark: " << fulHighWaterMark;
227 LOG(info) << "Max. Timeslices: " << fulMaxTimeslices;
228 if (fbNoSplitTs) { LOG(info) << "Sending TS copies in no-split mode"; } // if( fbNoSplitTs )
229 else if (fbSendTsPerSysId) {
230 LOG(info) << "Sending components in separate TS per SysId";
231 } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs )
232 else if (fbSendTsPerBlock) {
233 LOG(info) << "Sending components in separate TS per block (multiple SysId)";
234 } // else if( fbSendTsPerBlock ) of if( fbSendTsPerSysId ) of if( fbNoSplitTs )
235
237
238 fTime = std::chrono::steady_clock::now();
239}
240catch (InitTaskError& e) {
241 LOG(error) << e.what();
242 ChangeState(fair::mq::Transition::ErrorFound);
243}
244catch (boost::bad_any_cast& e) {
245 LOG(error) << "Error during InitTask: " << e.what();
246 ChangeState(fair::mq::Transition::ErrorFound);
247}
248
250{
251 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
252 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
253 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
254 if ("" != fsHistosSuffix) { //
255 LOG(info) << "Suffix added to folders, histograms and canvas names: " << fsHistosSuffix;
256 }
257
259 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
261 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
262
264 /* clang-format off */
265 fhTsRate = new TH1I(Form("TsRate%s", fsHistosSuffix.data()),
266 "TS rate; t [s]",
267 1800, 0., 1800.);
268 fhTsSize = new TH1I(Form("TsSize%s", fsHistosSuffix.data()),
269 "Size of TS; Size [MB]",
270 15000, 0., 15000.);
271 fhTsSizeEvo = new TProfile(Form("TsSizeEvo%s", fsHistosSuffix.data()),
272 "Evolution of the TS Size; t [s]; Mean size [MB]",
273 1800, 0., 1800.);
274 fhTsMaxSizeEvo = new TH1F(Form("TsMaxSizeEvo%s", fsHistosSuffix.data()),
275 "Evolution of maximal TS Size; t [s]; Max size [MB]",
276 1800, 0., 1800.);
277 fhMissedTS = new TH1I(Form("MissedTs%s", fsHistosSuffix.data()),
278 "Missed TS",
279 2, -0.5, 1.5);
280 fhMissedTSEvo = new TProfile(Form("MissedTsEvo%s", fsHistosSuffix.data()),
281 "Missed TS evolution; t [s]",
282 1800, 0., 1800.);
283 /* clang-format on */
284
286 std::string sFolder = std::string("Sampler") + fsHistosSuffix;
287 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsRate, sFolder));
288 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSize, sFolder));
289 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSizeEvo, sFolder));
290 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsMaxSizeEvo, sFolder));
291 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTS, sFolder));
292 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTSEvo, sFolder));
293
295 Double_t w = 10;
296 Double_t h = 10;
297 fcSummary = new TCanvas(Form("cSampSummary%s", fsHistosSuffix.data()), "Sampler monitoring plots", w, h);
298 fcSummary->Divide(2, 3);
299
300 fcSummary->cd(1);
301 gPad->SetGridx();
302 gPad->SetGridy();
303 fhTsRate->Draw("hist");
304
305 fcSummary->cd(2);
306 gPad->SetGridx();
307 gPad->SetGridy();
308 gPad->SetLogx();
309 gPad->SetLogy();
310 fhTsSize->Draw("hist");
311
312 fcSummary->cd(3);
313 gPad->SetGridx();
314 gPad->SetGridy();
315 fhTsSizeEvo->Draw("hist");
316
317 fcSummary->cd(4);
318 gPad->SetGridx();
319 gPad->SetGridy();
320 fhTsMaxSizeEvo->Draw("hist");
321
322 fcSummary->cd(5);
323 gPad->SetGridx();
324 gPad->SetGridy();
325 fhMissedTS->Draw("hist");
326
327 fcSummary->cd(6);
328 gPad->SetGridx();
329 gPad->SetGridy();
330 fhMissedTSEvo->Draw("el");
331
333 vCanvases.push_back(std::pair<TCanvas*, std::string>(fcSummary, std::string("canvases") + fsHistosSuffix));
334
339 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
340 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
341 // << " in " << vHistos[ uHisto ].second.data()
342 // ;
343 fArrayHisto.Add(vHistos[uHisto].first);
344 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
345 fvpsHistosFolder.push_back(psHistoConfig);
346
347 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
348 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
349
353 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
354 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
355 // << " in " << vCanvases[ uCanv ].second.data();
356 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
357 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
358
359 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
360
361 fvpsCanvasConfig.push_back(psCanvConfig);
362
363 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
364 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
365
366 return true;
367}
368
369bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int)
370{
372 if (0 < fuPublishFreqTs && 0 == fulTsCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs )
373
374 if (fbEofFound) {
376 return true;
377 }
378
380 std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize());
381 if ("SendFirstTimesliceIndex" == reqStr) {
382 if (0 == fulFirstTsIndex) { //
383 GetNewTs();
384 }
385 if (!SendFirstTsIndex() && !fbEofFound) { //
386 return false;
387 }
388 return true;
389 }
390
391 if (fbNoSplitTs) {
392
393 if (!CreateAndSendFullTs() && !fbEofFound) {
395 if ("" != fsChannelNameCommands) {
397 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
398 SendCommand("STOP");
399 } // if( "" != fsChannelNameCommands )
400
401 return false;
402 } // if( !CreateAndSendFullTs( ts ) && !fbEofFound)
403 } // if( fbNoSplitTs )
404 else if (fbSendTsPerSysId) {
406 int iSysId = std::stoi(reqStr);
407 LOG(debug) << "Received TS SysId component request from client: 0x" << std::hex << iSysId << std::dec;
408
413 if ("" != fsChannelNameCommands) {
415 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
416 SendCommand("STOP");
417 } // if( "" != fsChannelNameCommands )
418
419 return false;
420 } // if(!CreateAndCombineComponentsPerSysId(iSysId) && !fbEofFound)
421 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs
422 else if (fbSendTsPerBlock) {
423 LOG(debug) << "Received TS components block request from client: " << reqStr;
424
429 if ("" != fsChannelNameCommands) {
431 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
432 SendCommand("STOP");
433 } // if( "" != fsChannelNameCommands )
434
435 return false;
436 } // if( !CreateAndCombineComponentsPerChannel(reqStr) && !fbEofFound)
437 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
438
442 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
443 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
444 if ((fdMaxPublishTime < elapsedSeconds.count())
445 || (0 == fulMessageCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
446 if (!fbConfigSent) {
447 // Send the configuration only once per run!
449 } // if( !fbConfigSent )
450 else
452
453 fLastPublishTime = std::chrono::system_clock::now();
454 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulMessageCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
455
456 return true;
457}
458
459std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs()
460{
462 if (0 == fulTsCounter && nullptr != dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)) {
463 dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)->InitTimesliceSubscriber();
464 } // if( 0 == fulTsCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) )
465
466 std::unique_ptr<fles::Timeslice> timeslice = fSource->get();
467 if (timeslice) {
469
470 const fles::Timeslice& ts = *timeslice;
471 uint64_t uTsIndex = ts.index();
472
473 if (0 == fulFirstTsIndex) { //
474 fulFirstTsIndex = ts.descriptor(0, 0).idx;
475 }
476
477 if (0 < fuPublishFreqTs) {
478 uint64_t uTsTime = ts.descriptor(0, 0).idx;
479 if (0 == fuStartTime) { //
480 fuStartTime = uTsTime;
481 } // if( 0 == fuStartTime )
482 fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9;
483 uint64_t uSizeMb = 0;
484
485 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
486 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
487 } // for( uint_t uComp = 0; uComp < ts.num_components(); ++uComp )
488
489
490 fhTsRate->Fill(fdTimeToStart);
491 fhTsSize->Fill(uSizeMb);
492 fhTsSizeEvo->Fill(fdTimeToStart, uSizeMb);
493
495 if (0. == fdLastMaxTime) {
497 fdTsMaxSize = uSizeMb;
498 } // if( 0. == fdLastMaxTime )
499 else if (1. <= fdTimeToStart - fdLastMaxTime) {
502 fdTsMaxSize = uSizeMb;
503 } // else if if( 1 <= fdTimeToStart - fdLastMaxTime )
504 else if (fdTsMaxSize < uSizeMb) {
505 fdTsMaxSize = uSizeMb;
506 } // else if( fdTsMaxSize < uSizeMb )
507 } // if( 0 < fuPublishFreqTs )
508
510 if ((uTsIndex != (fulPrevTsIndex + 1)) && !(0 == fulPrevTsIndex && 0 == uTsIndex && 0 == fulTsCounter)) {
511 LOG(debug) << "Missed Timeslices. Old TS Index was " << fulPrevTsIndex << " New TS Index is " << uTsIndex
512 << " diff is " << uTsIndex - fulPrevTsIndex << " Missing are " << uTsIndex - fulPrevTsIndex - 1;
513
514 if ("" != fsChannelNameMissedTs) {
516 std::vector<uint64_t> vulMissedIndices;
517 if (0 == fulPrevTsIndex && 0 == fulTsCounter) {
519 vulMissedIndices.emplace_back(0);
520 }
522 for (uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
523 vulMissedIndices.emplace_back(ulMiss);
524 } // for( uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss )
525 if (!SendMissedTsIdx(vulMissedIndices)) {
527 if ("" != fsChannelNameCommands) {
529 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
530 SendCommand("STOP");
531 } // if( "" != fsChannelNameCommands )
532
533 return nullptr;
534 } // if( !SendMissedTsIdx( vulMissedIndices ) )
535 } // if( "" != fsChannelNameMissedTs )
536
537 if (0 < fuPublishFreqTs) {
538 fhMissedTS->Fill(1, uTsIndex - fulPrevTsIndex - 1);
539 fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fulPrevTsIndex - 1);
540 } // if( 0 < fuPublishFreqTs )
541
542 } // if( ( uTsIndex != ( fulPrevTsIndex + 1 ) ) && !( 0 == fulPrevTsIndex && 0 == uTsIndex ) )
543
544 if (0 < fuPublishFreqTs) {
545 fhMissedTS->Fill(0);
546 fhMissedTSEvo->Fill(fdTimeToStart, 0, 1);
547 } // else if( 0 < fuPublishFreqTs )
548
549 fulTsCounter++;
550 fulPrevTsIndex = uTsIndex;
551
552 if (fulTsCounter % 10000 == 0) { LOG(info) << "Received TS " << fulTsCounter << " with index " << uTsIndex; }
553
554 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
555 return timeslice;
556 } // if (fulTsCounter < fulMaxTimeslices)
557 else {
558 CalcRuntime();
559
561 if ("" != fsChannelNameCommands) {
563 std::this_thread::sleep_for(std::chrono::seconds(10));
564 std::string sCmd = "EOF ";
566 sCmd += " ";
568 SendCommand(sCmd);
569 } // if( "" != fsChannelNameCommands )
570
571 fbEofFound = true;
572
573 return nullptr;
574 } // else of if (fulTsCounter < fulMaxTimeslices)
575 } // if (timeslice)
576 else {
577 CalcRuntime();
578
580 if ("" != fsChannelNameCommands) {
582 std::this_thread::sleep_for(std::chrono::seconds(10));
583 std::string sCmd = "EOF ";
585 sCmd += " ";
587 SendCommand(sCmd);
588 } // if( "" != fsChannelNameCommands )
589
590 fbEofFound = true;
591
592 return nullptr;
593 } // else of if (timeslice)
594}
595
597{
599 while (fulHighWaterMark <= fdpTimesliceBuffer.size()) {
600 fdpTimesliceBuffer.pop_front();
601 fdbCompSentFlags.pop_front();
602 } // while( fulHighWaterMark <= fdpTimesliceBuffer.size() )
603
605 fdpTimesliceBuffer.push_back(GetNewTs());
606 if (nullptr == fdpTimesliceBuffer.back()) {
607 fdpTimesliceBuffer.pop_back();
608 return false;
609 } // if(nullptr == fdpTimesliceBuffer[fdpTimesliceBuffer.size() - 1])
610
613 if (fbSendTsPerBlock) { fdbCompSentFlags.push_back(std::vector<bool>(fvBlocksToSend.size(), false)); }
614 else {
615 fdbCompSentFlags.push_back(std::vector<bool>(fComponentActive.size(), false));
616 }
617 return true;
618}
619
621{
622 std::unique_ptr<fles::Timeslice> timeslice = GetNewTs();
623 if (timeslice) {
625 const fles::Timeslice& ts = *timeslice;
626 fles::StorableTimeslice fullTs {ts};
627 if (!SendData(fullTs)) {
629 if ("" != fsChannelNameCommands) {
631 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
632 SendCommand("STOP");
633 } // if( "" != fsChannelNameCommands )
634
635 return false;
636 } // if (!SendData(fullTs, uChanIdx))
637 return true;
638 } // if (timeslice)
639 else {
640 return false;
641 } // else of if (timeslice)
642}
643
645{
646 if (false == fbListCompPerSysIdReady) {
649 if (0 == fdpTimesliceBuffer.size()) {
650 if (!AddNewTsInBuffer()) return false;
651 } // if( 0 == fdpTimesliceBuffer.size() )
652
653 if (nullptr == fdpTimesliceBuffer.front()) return false;
654
655 for (uint32_t uCompIdx = 0; uCompIdx < fdpTimesliceBuffer.front()->num_components(); ++uCompIdx) {
656 uint16_t usMsSysId = fdpTimesliceBuffer.front()->descriptor(uCompIdx, 0).sys_id;
657
658 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usMsSysId);
659 if (fSysId.end() != pos) {
660 const vector<std::string>::size_type idx = pos - fSysId.begin();
661
662 fvvCompPerSysId[idx].push_back(uCompIdx);
663 } // if( fSysId.end() != pos )
664 } // for( uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx )
665
666 for (uint32_t uSysIdx = 0; uSysIdx < fComponents.size(); ++uSysIdx) {
667 std::stringstream ss;
668 ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size() << " components for SysId 0x" << std::hex
669 << std::setw(2) << fSysId[uSysIdx] << std::dec << " :";
670
671 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
672 ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp];
673 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
674
675 LOG(info) << ss.str();
676 } // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId )
677
679 } // if( false == fbListCompPerSysIdReady )
680
681 return true;
682}
684{
687 if (!PrepareCompListPerSysId()) return false;
688
690 const vector<std::string>::const_iterator pos = std::find(fComponents.begin(), fComponents.end(), sSystemName);
691 if (fComponents.end() != pos) {
692 const vector<std::string>::size_type idx = pos - fComponents.begin();
693 return CreateCombinedComponentsPerSysId(static_cast<uint32_t>(idx));
694 } // if (fComponents.end() != pos)
695 else {
696 LOG(error) << "Did not find " << sSystemName << " in the list of known systems";
697 return false;
698 } // else of if (fComponents.end() != pos)
699}
701{
704 if (!PrepareCompListPerSysId()) return false;
705
707 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
708 if (fSysId.end() != pos) {
709 const vector<int>::size_type idx = pos - fSysId.begin();
710 return CreateCombinedComponentsPerSysId(static_cast<uint32_t>(idx));
711 } // if (fSysId.end() != pos)
712 else {
713 LOG(error) << "Did not find 0x" << std::hex << iSysId << std::dec << " in the list of known systems";
714 return false;
715 } // else of if (fSysId.end() != pos)
716}
718{
720 LOG(debug) << "Create timeslice with components for SysId " << std::hex << fSysId[uCompIndex] << std::dec;
721
722 if (0 < fvvCompPerSysId[uCompIndex].size()) {
724 uint32_t uTsIndex = 0;
725 for (; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex) {
726 if (false == fdbCompSentFlags[uTsIndex][uCompIndex]) break;
727 } // for( ; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex )
728
730 if (fdpTimesliceBuffer.size() == uTsIndex) {
731 --uTsIndex;
732 if (!AddNewTsInBuffer()) return false;
733 } // if( fdpTimesliceBuffer.size() == uTsIndex )
734
736 fles::StorableTimeslice component {static_cast<uint32_t>(fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
737 fdpTimesliceBuffer[uTsIndex]->index()};
738
739 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uCompIndex].size(); ++uComp) {
740 uint32_t uNumMsInComp = fdpTimesliceBuffer[uTsIndex]->num_microslices(fvvCompPerSysId[uCompIndex][uComp]);
741 component.append_component(uNumMsInComp);
742
743 LOG(debug) << "Add components to TS for SysId " << std::hex << fSysId[uCompIndex] << std::dec << " TS "
744 << fdpTimesliceBuffer[uTsIndex]->index() << " Comp " << fvvCompPerSysId[uCompIndex][uComp];
745
746 for (size_t m = 0; m < uNumMsInComp; ++m) {
747 component.append_microslice(uComp, m,
748 fdpTimesliceBuffer[uTsIndex]->descriptor(fvvCompPerSysId[uCompIndex][uComp], m),
749 fdpTimesliceBuffer[uTsIndex]->content(fvvCompPerSysId[uCompIndex][uComp], m));
750 } // for( size_t m = 0; m < uNumMsInComp; ++m )
751 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uCompIndex ].size(); ++uComp )
752
753 LOG(debug) << "Prepared timeslice for SysId " << std::hex << fSysId[uCompIndex] << std::dec << " with "
754 << component.num_components() << " components";
755
756 if (!SendData(component)) return false;
757
758 fdbCompSentFlags[uTsIndex][uCompIndex] = true;
759 } // if (0 < fvvCompPerSysId[uCompIndex].size())
760
761 return true;
762}
763
765{
766 if (false == fbListCompPerBlockReady) {
768 if (!PrepareCompListPerSysId()) return false;
769
771 for (auto itBlock = fvBlocksToSend.begin(); itBlock != fvBlocksToSend.end(); ++itBlock) {
772 auto uBlockIdx = itBlock - fvBlocksToSend.begin();
773
774 for (auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys) {
776 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), *itSys);
777 if (fSysId.end() != pos) {
778 const vector<int>::size_type idxSys = pos - fSysId.begin();
779
781 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[idxSys].size(); ++uComp) {
782 fvvCompPerBlock[uBlockIdx].push_back(fvvCompPerSysId[idxSys][uComp]);
783 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ idxSys ].size(); ++uComp )
784 } // if (fSysId.end() != pos)
785 else {
786 LOG(error) << "Error when building the components list for block " << itBlock->first;
787 LOG(error) << "Did not find 0x" << std::hex << *itSys << std::dec << " in the list of known systems";
788 return false;
789 } // else of if (fSysId.end() != pos)
790 } // for( auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys )
791 } // for( auto itBlock = fvBlocksToSend.begin(); itBlock != fvBlocksToSend.end(); ++itBlock)
792
794 } // if( false == fbListCompPerBlockReady )
795
796 return true;
797}
798
800{
803 if (!PrepareCompListPerBlock()) return false;
804
806 for (auto itKnownBlock = fvBlocksToSend.begin(); itKnownBlock != fvBlocksToSend.end(); ++itKnownBlock) {
807 if ((*itKnownBlock).first == sBlockName) {
808 auto uBlockIdx = itKnownBlock - fvBlocksToSend.begin();
809
811 uint32_t uTsIndex = 0;
812 for (; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex) {
813 if (false == fdbCompSentFlags[uTsIndex][uBlockIdx]) break;
814 } // for( ; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex )
815
817 if (fdpTimesliceBuffer.size() == uTsIndex) {
818 --uTsIndex;
819 if (!AddNewTsInBuffer()) return false;
820 } // if( fdpTimesliceBuffer.size() == uTsIndex )
821
823 fles::StorableTimeslice component {static_cast<uint32_t>(fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
824 fdpTimesliceBuffer[uTsIndex]->index()};
825
826 for (uint32_t uComp = 0; uComp < fvvCompPerBlock[uBlockIdx].size(); ++uComp) {
827 uint32_t uNumMsInComp = fdpTimesliceBuffer[uTsIndex]->num_microslices(fvvCompPerBlock[uBlockIdx][uComp]);
828 component.append_component(uNumMsInComp);
829
830 LOG(debug) << "Add components to TS for Block " << sBlockName << " TS " << fdpTimesliceBuffer[uTsIndex]->index()
831 << " Comp " << fvvCompPerBlock[uBlockIdx][uComp];
832
833 for (size_t m = 0; m < uNumMsInComp; ++m) {
834 component.append_microslice(uComp, m,
835 fdpTimesliceBuffer[uTsIndex]->descriptor(fvvCompPerBlock[uBlockIdx][uComp], m),
836 fdpTimesliceBuffer[uTsIndex]->content(fvvCompPerBlock[uBlockIdx][uComp], m));
837 } // for( size_t m = 0; m < uNumMsInComp; ++m )
838 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uCompIndex ].size(); ++uComp )
839
840 LOG(debug) << "Prepared timeslice for Block " << sBlockName << " with " << component.num_components()
841 << " components";
842
843 if (!SendData(component)) return false;
844
845 fdbCompSentFlags[uTsIndex][uBlockIdx] = true;
846 return true;
847 } // if( (*itKnownBlock).first == sBlockName )
848 } // for( auto itKnownBlock = fvBlocksToSend.begin(); itKnownBlock != fvBlocksToSend.end(); ++itKnownBlock)
849
851 LOG(error) << "Requested block " << sBlockName << " not found in the list of known blocks";
852 return false;
853}
854
856{
857 // create the message with the first timeslice index
858 std::string sIndex = FormatDecPrintout(fulFirstTsIndex);
859 // serialize the vector and create the message
860 std::stringstream oss;
861 boost::archive::binary_oarchive oa(oss);
862 oa << sIndex;
863 std::string* strMsg = new std::string(oss.str());
864
865 FairMQMessagePtr msg(NewMessage(
866 const_cast<char*>(strMsg->c_str()), // data
867 strMsg->length(), // size
868 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
869 strMsg)); // object that manages the data
870
871 // in case of error or transfer interruption,
872 // return false to go to IDLE state
873 // successfull transfer will return number of bytes
874 // transfered (can be 0 if sending an empty message).
875 if (Send(msg, fsChannelNameTsRequest) < 0) {
876 LOG(error) << "Problem sending reply with first TS index";
877 return false;
878 }
879
881 LOG(debug) << "Send message " << fulMessageCounter << " with a size of " << msg->GetSize();
882
883 return true;
884}
885bool CbmMQTsSamplerRepReq::SendData(const fles::StorableTimeslice& component)
886{
887 // serialize the timeslice and create the message
888 std::stringstream oss;
889 boost::archive::binary_oarchive oa(oss);
890 oa << component;
891 std::string* strMsg = new std::string(oss.str());
892
893 FairMQMessagePtr msg(NewMessage(
894 const_cast<char*>(strMsg->c_str()), // data
895 strMsg->length(), // size
896 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
897 strMsg)); // object that manages the data
898
899 // in case of error or transfer interruption,
900 // return false to go to IDLE state
901 // successfull transfer will return number of bytes
902 // transfered (can be 0 if sending an empty message).
903 if (Send(msg, fsChannelNameTsRequest) < 0) {
904 LOG(error) << "Problem sending data";
905 return false;
906 }
907
909 LOG(debug) << "Send message " << fulMessageCounter << " with a size of " << msg->GetSize();
910
911 return true;
912}
913bool CbmMQTsSamplerRepReq::SendMissedTsIdx(std::vector<uint64_t> vIndices)
914{
915 // serialize the vector and create the message
916 std::stringstream oss;
917 boost::archive::binary_oarchive oa(oss);
918 oa << vIndices;
919 std::string* strMsg = new std::string(oss.str());
920
921 FairMQMessagePtr msg(NewMessage(
922 const_cast<char*>(strMsg->c_str()), // data
923 strMsg->length(), // size
924 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
925 strMsg)); // object that manages the data
926
927 // in case of error or transfer interruption,
928 // return false to go to IDLE state
929 // successfull transfer will return number of bytes
930 // transfered (can be 0 if sending an empty message).
931 LOG(debug) << "Send data to channel " << fsChannelNameMissedTs;
932 if (Send(msg, fsChannelNameMissedTs) < 0) {
933 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs;
934 return false;
935 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
936
937 return true;
938}
939bool CbmMQTsSamplerRepReq::SendCommand(std::string sCommand)
940{
941 // serialize the vector and create the message
942 std::stringstream oss;
943 boost::archive::binary_oarchive oa(oss);
944 oa << sCommand;
945 std::string* strMsg = new std::string(oss.str());
946
947 FairMQMessagePtr msg(NewMessage(
948 const_cast<char*>(strMsg->c_str()), // data
949 strMsg->length(), // size
950 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
951 strMsg)); // object that manages the data
952
953 // FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data
954 // sCommand.length(), // size
955 // []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
956 // &sCommand ) ); // object that manages the data
957
958 // in case of error or transfer interruption,
959 // return false to go to IDLE state
960 // successfull transfer will return number of bytes
961 // transfered (can be 0 if sending an empty message).
962 LOG(debug) << "Send data to channel " << fsChannelNameCommands;
963 if (Send(msg, fsChannelNameCommands) < 0) {
964 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands;
965 return false;
966 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
967
968 return true;
969}
971{
973 std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
974 FairMQMessagePtr messageHeader(NewMessage());
975 // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
976 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
977
978 FairMQParts partsOut;
979 partsOut.AddPart(std::move(messageHeader));
980
981 for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
983 FairMQMessagePtr messageHist(NewMessage());
984 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
985 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
986
987 partsOut.AddPart(std::move(messageHist));
988 } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
989
992 if (0 == fvpsHistosFolder.size()) {
993 FairMQMessagePtr messageHist(NewMessage());
994 partsOut.AddPart(std::move(messageHist));
995 }
996
997 for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
999 FairMQMessagePtr messageCan(NewMessage());
1000 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
1001 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
1002
1003 partsOut.AddPart(std::move(messageCan));
1004 } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
1005
1008 if (0 == fvpsCanvasConfig.size()) {
1009 FairMQMessagePtr messageHist(NewMessage());
1010 partsOut.AddPart(std::move(messageHist));
1011 }
1012
1014 FairMQMessagePtr msgHistos(NewMessage());
1015 // Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
1016 RootSerializer().Serialize(*msgHistos, &fArrayHisto);
1017
1018 partsOut.AddPart(std::move(msgHistos));
1019
1021 if (Send(partsOut, fsChannelNameHistosInput) < 0) {
1022 LOG(error) << "CbmMQTsSamplerRepReq::SendHistoConfAndData => Problem sending data";
1023 return false;
1024 } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
1025
1028
1029 return true;
1030}
1031
1033{
1035 FairMQMessagePtr message(NewMessage());
1036 // Serialize<RootSerializer>(*message, &fArrayHisto);
1037 RootSerializer().Serialize(*message, &fArrayHisto);
1038
1040 if (Send(message, fsChannelNameHistosInput) < 0) {
1041 LOG(error) << "Problem sending data";
1042 return false;
1043 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
1044
1047
1048 return true;
1049}
1050
1052{
1053 fhTsRate->Reset();
1054 fhTsSize->Reset();
1055 fhTsSizeEvo->Reset();
1056 fhTsMaxSizeEvo->Reset();
1057 fhMissedTS->Reset();
1058 fhMissedTSEvo->Reset();
1059
1060 return true;
1061}
1062
1064
1066{
1067 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
1068
1069 LOG(info) << "Runtime: " << run_time.count();
1070 LOG(info) << "No more input data";
1071}
std::string GenerateCanvasConfigString(TCanvas *pCanv)
std::string FormatDecPrintout(uint64_t ulVal, char cFill, uint uWidth)
static constexpr size_t size()
Definition KfSimdPseudo.h:2
bool first
Generates beam ions for transport simulation.
std::chrono::system_clock::time_point fLastPublishTime
std::deque< std::vector< bool > > fdbCompSentFlags
bool CreateCombinedComponentsPerBlock(std::string sBlockName)
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::deque< std::unique_ptr< fles::Timeslice > > fdpTimesliceBuffer
Buffering of partially sent timeslices, limited by fulHighWaterMark.
std::vector< std::vector< uint32_t > > fvvCompPerSysId
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
std::vector< int > fSysId
bool HandleRequest(FairMQMessagePtr &, int)
std::vector< std::string > fComponents
std::vector< bool > fComponentActive
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::vector< std::vector< uint32_t > > fvvCompPerBlock
bool fbEofFound
Flag indicating the EOF was reached to avoid sending an emergency STOP.
std::unique_ptr< fles::Timeslice > GetNewTs()
std::vector< std::pair< std::string, std::set< uint16_t > > > fvBlocksToSend
bool SendData(const fles::StorableTimeslice &component)
std::vector< std::string > fvsInputFileList
List of input files.
bool CreateCombinedComponentsPerSysId(std::string sSystemName)
std::chrono::steady_clock::time_point fTime
fles::TimesliceSource * fSource
bool SendCommand(std::string sCommand)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.