CbmRoot
Loading...
Searching...
No Matches
CbmMQTsSamplerRepReq.cxx
Go to the documentation of this file.
1/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
6
9
10#include "TimesliceInputArchive.hpp"
11#include "TimesliceMultiInputArchive.hpp"
12#include "TimesliceMultiSubscriber.hpp"
13#include "TimesliceSubscriber.hpp"
14
15#include "FairMQLogger.h"
16#include "FairMQProgOptions.h" // device->fConfig
17
18#include <TCanvas.h>
19#include <TH1F.h>
20#include <TH1I.h>
21#include <TProfile.h>
22
23#include "BoostSerializer.h"
24#include <boost/algorithm/string.hpp>
25#include <boost/archive/binary_oarchive.hpp>
26//#include <boost/filesystem.hpp>
27#include <boost/regex.hpp>
28#include <boost/serialization/utility.hpp>
29
30#include "RootSerializer.h"
31
32//namespace filesys = boost::filesystem;
33
34#include <thread> // this_thread::sleep_for
35
36#include <algorithm>
37#include <chrono>
38#include <ctime>
39#include <iomanip>
40#include <sstream>
41#include <string>
42
43#include <stdio.h>
44
45using namespace std;
46
47#include <stdexcept>
48
49struct InitTaskError : std::runtime_error {
50 using std::runtime_error::runtime_error;
51};
52
54 : FairMQDevice()
55 , fTime()
56 , fLastPublishTime {std::chrono::system_clock::now()}
57{
58}
59
61try {
62 // Get the values from the command line options (via fConfig)
63 fsFileName = fConfig->GetValue<string>("filename");
64 fsDirName = fConfig->GetValue<string>("dirname");
65 fsHost = fConfig->GetValue<string>("fles-host");
66 fusPort = fConfig->GetValue<uint16_t>("fles-port");
67 fulHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
68 fulMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
69 fsChannelNameTsRequest = fConfig->GetValue<std::string>("ChNameTsReq");
70 fbNoSplitTs = fConfig->GetValue<bool>("no-split-ts");
71 fbSendTsPerSysId = fConfig->GetValue<bool>("send-ts-per-sysid");
72 fbSendTsPerBlock = fConfig->GetValue<bool>("send-ts-per-block");
73 fsChannelNameMissedTs = fConfig->GetValue<std::string>("ChNameMissTs");
74 fsChannelNameCommands = fConfig->GetValue<std::string>("ChNameCmds");
75 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
76 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
77 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
78 fsHistosSuffix = fConfig->GetValue<std::string>("HistosSuffix");
79 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
80
81 if (fbNoSplitTs) {
82 if (fbSendTsPerSysId) {
83 if (fbSendTsPerBlock) {
84 LOG(warning) << "Both no-split-ts, send-ts-per-sysid and send-ts-per-block options used => "
85 << " second and third one will be ignored!!!!";
86 } // if( fbSendTsPerBlock )
87 else
88 LOG(warning) << "Both no-split-ts and send-ts-per-sysid options used => "
89 << " second one will be ignored!!!!";
90 } // if( fbSendTsPerSysId )
91 else if (fbSendTsPerBlock) {
92 LOG(warning) << "Both no-split-ts and send-ts-per-block options used => "
93 << " second one will be ignored!!!!";
94 } // else if( fbSendTsPerSysId ) of if( fbSendTsPerSysId )
95 else
96 LOG(debug) << "Running in no-split-ts mode!";
97 } // if( fbNoSplitTs )
98 else if (fbSendTsPerBlock) {
99 if (fbSendTsPerSysId) {
100 LOG(warning) << "Both send-ts-per-sysid and send-ts-per-block options used => "
101 << " second one will be ignored!!!!";
102 } // if (fbSendTsPerSysId)
103 else
104 LOG(debug) << "Running in send-ts-per-block mode!";
105 } // else if( fbSendTsPerBlock ) of if( fbNoSplitTs )
106 else if (fbSendTsPerSysId) {
107 LOG(debug) << "Running in send-ts-per-sysid mode!";
108 } // else if (fbSendTsPerSysId) else if( fbSendTsPerBlock ) of if( fbNoSplitTs )
109 else {
110 LOG(debug) << "Running in no-split-ts mode by default!";
111 fbNoSplitTs = true;
112 } // else of else if (fbSendTsPerSysId) else if( fbSendTsPerBlock ) of if( fbNoSplitTs )
113
115 std::vector<std::string> vSysIdBlockPairs = fConfig->GetValue<std::vector<std::string>>("block-sysid");
116 for (uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair) {
117 const size_t sep = vSysIdBlockPairs[uPair].find(':');
118 if (string::npos == sep || 0 == sep || vSysIdBlockPairs[uPair].size() == sep) {
119 LOG(info) << vSysIdBlockPairs[uPair];
120 throw InitTaskError("Provided pair of Block name + SysId is missing a : or an argument.");
121 } // if( string::npos == sep || 0 == sep || vSysIdBlockPairs[ uPair ].size() == sep )
122
124 std::string sBlockName = vSysIdBlockPairs[uPair].substr(0, sep);
125
128 std::string sSysId = vSysIdBlockPairs[uPair].substr(sep + 1);
129 const size_t hexPos = sSysId.find("0x");
130 uint16_t usSysId;
131 if (string::npos == hexPos) usSysId = std::stoi(sSysId);
132 else
133 usSysId = std::stoi(sSysId.substr(hexPos + 2), nullptr, 16);
134
135 LOG(debug) << "Extracted block info from pair \"" << vSysIdBlockPairs[uPair] << "\": name is " << sBlockName
136 << " and SysId is " << usSysId << " extracted from " << sSysId;
137
139 uint32_t uSysIdIdx = 0;
140 for (; uSysIdIdx < fSysId.size() && fSysId[uSysIdIdx] != usSysId; ++uSysIdIdx) {}
141 if (uSysIdIdx == fSysId.size()) { throw InitTaskError("Unknown System ID for " + vSysIdBlockPairs[uPair]); }
142 else if (true == fComponentActive[uSysIdIdx]) {
143 throw InitTaskError("System ID already in use by another block for " + vSysIdBlockPairs[uPair]);
144 }
145 fComponentActive[uSysIdIdx] = true;
146
148 auto itBlock = fvBlocksToSend.begin();
149 for (; itBlock != fvBlocksToSend.end(); ++itBlock) {
150 if ((*itBlock).first == sBlockName) break;
151 } // for( ; itBlock != fvBlocksToSend.end(); ++itBlock)
152 if (fvBlocksToSend.end() != itBlock) {
154 (*itBlock).second.insert(usSysId);
155 } // if( fvBlocksToSend.end() != itBlock )
156 else {
158 fvBlocksToSend.push_back(std::pair<std::string, std::set<uint16_t>>(sBlockName, {usSysId}));
159 fvvCompPerBlock.push_back(std::vector<uint32_t>({}));
160 } // else of if( fSysId.end() != pos )
161
162 LOG(info) << vSysIdBlockPairs[uPair] << " Added SysId 0x" << std::hex << usSysId << std::dec << " to "
163 << sBlockName;
164 } // for( uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair )
165
166 if (0 == fulMaxTimeslices) fulMaxTimeslices = UINT_MAX;
167
168 // Check which input is defined
169 // Possibilities:
170 // filename && ! dirname : single file
171 // filename with wildcards && dirname : all files with filename regex in the directory
172 // host && port : connect to the flesnet server
173 bool isGoodInputCombi {false};
174 if (0 != fsFileName.size() && 0 == fsDirName.size() && 0 == fsHost.size() && 0 == fusPort) {
175 isGoodInputCombi = true;
176 fvsInputFileList.push_back(fsFileName);
177 }
178 else if (0 != fsFileName.size() && 0 != fsDirName.size() && 0 == fsHost.size() && 0 == fusPort) {
179 isGoodInputCombi = true;
180 fvsInputFileList.push_back(fsFileName);
181 }
182 else if (0 == fsFileName.size() && 0 == fsDirName.size() && 0 != fsHost.size() && 0 != fusPort) {
183 isGoodInputCombi = true;
184 LOG(info) << "Host: " << fsHost;
185 LOG(info) << "Port: " << fusPort;
186 }
187 else if (0 == fsFileName.size() && 0 == fsDirName.size() && 0 != fsHost.size() && 0 == fusPort) {
188 isGoodInputCombi = true;
189 LOG(info) << "Host string: " << fsHost;
190 }
191 else {
192 isGoodInputCombi = false;
193 }
194
195 if (!isGoodInputCombi) {
196 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
197 "or host + port are allowed combination.");
198 }
199
200 LOG(info) << "MaxTimeslices: " << fulMaxTimeslices;
201
202 if (0 == fsFileName.size() && 0 != fsHost.size() && 0 != fusPort) {
203 // Don't add the protocol since this is done now in the TimesliceMultiSubscriber
204 //std::string connector = "tcp://" + fsHost + ":" + std::to_string(fusPort);
205 std::string connector = fsHost + ":" + std::to_string(fusPort);
206 LOG(info) << "Open TSPublisher at " << connector;
207 fSource = new fles::TimesliceMultiSubscriber(connector, fulHighWaterMark);
208 }
209 else if (0 == fsFileName.size() && 0 != fsHost.size()) {
210 std::string connector = fsHost;
211 LOG(info) << "Open TSPublisher with host string: " << connector;
212 fSource = new fles::TimesliceMultiSubscriber(connector, fulHighWaterMark);
213 }
214 else {
215 // Create a ";" separated string with all file names
216 std::string fileList {""};
217 for (const auto& obj : fvsInputFileList) {
218 std::string fileName = obj;
219 fileList += fileName;
220 fileList += ";";
221 }
222 fileList.pop_back(); // Remove the last ;
223 LOG(info) << "Input File String: " << fileList;
224 fSource = new fles::TimesliceMultiInputArchive(fileList, fsDirName);
225 if (!fSource) { throw InitTaskError("Could open files from file list."); }
226 }
227
228 LOG(info) << "High-Water Mark: " << fulHighWaterMark;
229 LOG(info) << "Max. Timeslices: " << fulMaxTimeslices;
230 if (fbNoSplitTs) { LOG(info) << "Sending TS copies in no-split mode"; } // if( fbNoSplitTs )
231 else if (fbSendTsPerSysId) {
232 LOG(info) << "Sending components in separate TS per SysId";
233 } // else if( fbSendTsPerSysId ) of if( fbNoSplitTs )
234 else if (fbSendTsPerBlock) {
235 LOG(info) << "Sending components in separate TS per block (multiple SysId)";
236 } // else if( fbSendTsPerBlock ) of if( fbSendTsPerSysId ) of if( fbNoSplitTs )
237
239
240 fTime = std::chrono::steady_clock::now();
241}
242catch (InitTaskError& e) {
243 LOG(error) << e.what();
244 ChangeState(fair::mq::Transition::ErrorFound);
245}
246catch (boost::bad_any_cast& e) {
247 LOG(error) << "Error during InitTask: " << e.what();
248 ChangeState(fair::mq::Transition::ErrorFound);
249}
250
252{
253 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
254 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
255 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
256 if ("" != fsHistosSuffix) { //
257 LOG(info) << "Suffix added to folders, histograms and canvas names: " << fsHistosSuffix;
258 }
259
261 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
263 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
264
266 /* clang-format off */
267 fhTsRate = new TH1I(Form("TsRate%s", fsHistosSuffix.data()),
268 "TS rate; t [s]",
269 1800, 0., 1800.);
270 fhTsSize = new TH1I(Form("TsSize%s", fsHistosSuffix.data()),
271 "Size of TS; Size [MB]",
272 15000, 0., 15000.);
273 fhTsSizeEvo = new TProfile(Form("TsSizeEvo%s", fsHistosSuffix.data()),
274 "Evolution of the TS Size; t [s]; Mean size [MB]",
275 1800, 0., 1800.);
276 fhTsMaxSizeEvo = new TH1F(Form("TsMaxSizeEvo%s", fsHistosSuffix.data()),
277 "Evolution of maximal TS Size; t [s]; Max size [MB]",
278 1800, 0., 1800.);
279 fhMissedTS = new TH1I(Form("MissedTs%s", fsHistosSuffix.data()),
280 "Missed TS",
281 2, -0.5, 1.5);
282 fhMissedTSEvo = new TProfile(Form("MissedTsEvo%s", fsHistosSuffix.data()),
283 "Missed TS evolution; t [s]",
284 1800, 0., 1800.);
285 /* clang-format on */
286
288 std::string sFolder = std::string("Sampler") + fsHistosSuffix;
289 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsRate, sFolder));
290 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSize, sFolder));
291 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsSizeEvo, sFolder));
292 vHistos.push_back(std::pair<TNamed*, std::string>(fhTsMaxSizeEvo, sFolder));
293 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTS, sFolder));
294 vHistos.push_back(std::pair<TNamed*, std::string>(fhMissedTSEvo, sFolder));
295
297 Double_t w = 10;
298 Double_t h = 10;
299 fcSummary = new TCanvas(Form("cSampSummary%s", fsHistosSuffix.data()), "Sampler monitoring plots", w, h);
300 fcSummary->Divide(2, 3);
301
302 fcSummary->cd(1);
303 gPad->SetGridx();
304 gPad->SetGridy();
305 fhTsRate->Draw("hist");
306
307 fcSummary->cd(2);
308 gPad->SetGridx();
309 gPad->SetGridy();
310 gPad->SetLogx();
311 gPad->SetLogy();
312 fhTsSize->Draw("hist");
313
314 fcSummary->cd(3);
315 gPad->SetGridx();
316 gPad->SetGridy();
317 fhTsSizeEvo->Draw("hist");
318
319 fcSummary->cd(4);
320 gPad->SetGridx();
321 gPad->SetGridy();
322 fhTsMaxSizeEvo->Draw("hist");
323
324 fcSummary->cd(5);
325 gPad->SetGridx();
326 gPad->SetGridy();
327 fhMissedTS->Draw("hist");
328
329 fcSummary->cd(6);
330 gPad->SetGridx();
331 gPad->SetGridy();
332 fhMissedTSEvo->Draw("el");
333
335 vCanvases.push_back(std::pair<TCanvas*, std::string>(fcSummary, std::string("canvases") + fsHistosSuffix));
336
341 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
342 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
343 // << " in " << vHistos[ uHisto ].second.data()
344 // ;
345 fArrayHisto.Add(vHistos[uHisto].first);
346 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
347 fvpsHistosFolder.push_back(psHistoConfig);
348
349 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
350 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
351
355 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
356 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
357 // << " in " << vCanvases[ uCanv ].second.data();
358 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
359 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
360
361 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
362
363 fvpsCanvasConfig.push_back(psCanvConfig);
364
365 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
366 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
367
368 return true;
369}
370
371bool CbmMQTsSamplerRepReq::HandleRequest(FairMQMessagePtr& msgReq, int)
372{
374 if (0 < fuPublishFreqTs && 0 == fulTsCounter) { InitHistograms(); } // if( 0 < fuPublishFreqTs )
375
376 if (fbEofFound) {
378 return true;
379 }
380
382 std::string reqStr(static_cast<char*>(msgReq->GetData()), msgReq->GetSize());
383 if ("SendFirstTimesliceIndex" == reqStr) {
384 if (0 == fulFirstTsIndex) { //
385 GetNewTs();
386 }
387 if (!SendFirstTsIndex() && !fbEofFound) { //
388 return false;
389 }
390 return true;
391 }
392
393 if (fbNoSplitTs) {
394
395 if (!CreateAndSendFullTs() && !fbEofFound) {
397 if ("" != fsChannelNameCommands) {
399 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
400 SendCommand("STOP");
401 } // if( "" != fsChannelNameCommands )
402
403 return false;
404 } // if( !CreateAndSendFullTs( ts ) && !fbEofFound)
405 } // if( fbNoSplitTs )
406 else if (fbSendTsPerSysId) {
408 int iSysId = std::stoi(reqStr);
409 LOG(debug) << "Received TS SysId component request from client: 0x" << std::hex << iSysId << std::dec;
410
415 if ("" != fsChannelNameCommands) {
417 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
418 SendCommand("STOP");
419 } // if( "" != fsChannelNameCommands )
420
421 return false;
422 } // if(!CreateAndCombineComponentsPerSysId(iSysId) && !fbEofFound)
423 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs
424 else if (fbSendTsPerBlock) {
425 LOG(debug) << "Received TS components block request from client: " << reqStr;
426
431 if ("" != fsChannelNameCommands) {
433 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
434 SendCommand("STOP");
435 } // if( "" != fsChannelNameCommands )
436
437 return false;
438 } // if( !CreateAndCombineComponentsPerChannel(reqStr) && !fbEofFound)
439 } // else if( fbSendTsPerSysId && fbSendTsPerSysId ) of if( fbNoSplitTs )
440
444 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
445 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
446 if ((fdMaxPublishTime < elapsedSeconds.count())
447 || (0 == fulMessageCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
448 if (!fbConfigSent) {
449 // Send the configuration only once per run!
451 } // if( !fbConfigSent )
452 else
454
455 fLastPublishTime = std::chrono::system_clock::now();
456 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulMessageCounter % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
457
458 return true;
459}
460
461std::unique_ptr<fles::Timeslice> CbmMQTsSamplerRepReq::GetNewTs()
462{
464 if (0 == fulTsCounter && nullptr != dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)) {
465 dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource)->InitTimesliceSubscriber();
466 } // if( 0 == fulTsCounter && nullptr != dynamic_cast< fles::TimesliceMultiSubscriber >(fSource) )
467
468 std::unique_ptr<fles::Timeslice> timeslice = fSource->get();
469 if (timeslice) {
471
472 const fles::Timeslice& ts = *timeslice;
473 uint64_t uTsIndex = ts.index();
474
475 if (0 == fulFirstTsIndex) { //
476 fulFirstTsIndex = ts.descriptor(0, 0).idx;
477 }
478
479 if (0 < fuPublishFreqTs) {
480 uint64_t uTsTime = ts.descriptor(0, 0).idx;
481 if (0 == fuStartTime) { //
482 fuStartTime = uTsTime;
483 } // if( 0 == fuStartTime )
484 fdTimeToStart = static_cast<double_t>(uTsTime - fuStartTime) / 1e9;
485 uint64_t uSizeMb = 0;
486
487 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
488 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
489 } // for( uint_t uComp = 0; uComp < ts.num_components(); ++uComp )
490
491
492 fhTsRate->Fill(fdTimeToStart);
493 fhTsSize->Fill(uSizeMb);
494 fhTsSizeEvo->Fill(fdTimeToStart, uSizeMb);
495
497 if (0. == fdLastMaxTime) {
499 fdTsMaxSize = uSizeMb;
500 } // if( 0. == fdLastMaxTime )
501 else if (1. <= fdTimeToStart - fdLastMaxTime) {
504 fdTsMaxSize = uSizeMb;
505 } // else if if( 1 <= fdTimeToStart - fdLastMaxTime )
506 else if (fdTsMaxSize < uSizeMb) {
507 fdTsMaxSize = uSizeMb;
508 } // else if( fdTsMaxSize < uSizeMb )
509 } // if( 0 < fuPublishFreqTs )
510
512 if ((uTsIndex != (fulPrevTsIndex + 1)) && !(0 == fulPrevTsIndex && 0 == uTsIndex && 0 == fulTsCounter)) {
513 LOG(debug) << "Missed Timeslices. Old TS Index was " << fulPrevTsIndex << " New TS Index is " << uTsIndex
514 << " diff is " << uTsIndex - fulPrevTsIndex << " Missing are " << uTsIndex - fulPrevTsIndex - 1;
515
516 if ("" != fsChannelNameMissedTs) {
518 std::vector<uint64_t> vulMissedIndices;
519 if (0 == fulPrevTsIndex && 0 == fulTsCounter) {
521 vulMissedIndices.emplace_back(0);
522 }
524 for (uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
525 vulMissedIndices.emplace_back(ulMiss);
526 } // for( uint64_t ulMiss = fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss )
527 if (!SendMissedTsIdx(vulMissedIndices)) {
529 if ("" != fsChannelNameCommands) {
531 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
532 SendCommand("STOP");
533 } // if( "" != fsChannelNameCommands )
534
535 return nullptr;
536 } // if( !SendMissedTsIdx( vulMissedIndices ) )
537 } // if( "" != fsChannelNameMissedTs )
538
539 if (0 < fuPublishFreqTs) {
540 fhMissedTS->Fill(1, uTsIndex - fulPrevTsIndex - 1);
541 fhMissedTSEvo->Fill(fdTimeToStart, 1, uTsIndex - fulPrevTsIndex - 1);
542 } // if( 0 < fuPublishFreqTs )
543
544 } // if( ( uTsIndex != ( fulPrevTsIndex + 1 ) ) && !( 0 == fulPrevTsIndex && 0 == uTsIndex ) )
545
546 if (0 < fuPublishFreqTs) {
547 fhMissedTS->Fill(0);
548 fhMissedTSEvo->Fill(fdTimeToStart, 0, 1);
549 } // else if( 0 < fuPublishFreqTs )
550
551 fulTsCounter++;
552 fulPrevTsIndex = uTsIndex;
553
554 if (fulTsCounter % 10000 == 0) { LOG(info) << "Received TS " << fulTsCounter << " with index " << uTsIndex; }
555
556 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
557 return timeslice;
558 } // if (fulTsCounter < fulMaxTimeslices)
559 else {
560 CalcRuntime();
561
563 if ("" != fsChannelNameCommands) {
565 std::this_thread::sleep_for(std::chrono::seconds(10));
566 std::string sCmd = "EOF ";
568 sCmd += " ";
570 SendCommand(sCmd);
571 } // if( "" != fsChannelNameCommands )
572
573 fbEofFound = true;
574
575 return nullptr;
576 } // else of if (fulTsCounter < fulMaxTimeslices)
577 } // if (timeslice)
578 else {
579 CalcRuntime();
580
582 if ("" != fsChannelNameCommands) {
584 std::this_thread::sleep_for(std::chrono::seconds(10));
585 std::string sCmd = "EOF ";
587 sCmd += " ";
589 SendCommand(sCmd);
590 } // if( "" != fsChannelNameCommands )
591
592 fbEofFound = true;
593
594 return nullptr;
595 } // else of if (timeslice)
596}
597
599{
601 while (fulHighWaterMark <= fdpTimesliceBuffer.size()) {
602 fdpTimesliceBuffer.pop_front();
603 fdbCompSentFlags.pop_front();
604 } // while( fulHighWaterMark <= fdpTimesliceBuffer.size() )
605
607 fdpTimesliceBuffer.push_back(GetNewTs());
608 if (nullptr == fdpTimesliceBuffer.back()) {
609 fdpTimesliceBuffer.pop_back();
610 return false;
611 } // if(nullptr == fdpTimesliceBuffer[fdpTimesliceBuffer.size() - 1])
612
615 if (fbSendTsPerBlock) { fdbCompSentFlags.push_back(std::vector<bool>(fvBlocksToSend.size(), false)); }
616 else {
617 fdbCompSentFlags.push_back(std::vector<bool>(fComponentActive.size(), false));
618 }
619 return true;
620}
621
623{
624 std::unique_ptr<fles::Timeslice> timeslice = GetNewTs();
625 if (timeslice) {
627 const fles::Timeslice& ts = *timeslice;
628 fles::StorableTimeslice fullTs {ts};
629 if (!SendData(fullTs)) {
631 if ("" != fsChannelNameCommands) {
633 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
634 SendCommand("STOP");
635 } // if( "" != fsChannelNameCommands )
636
637 return false;
638 } // if (!SendData(fullTs, uChanIdx))
639 return true;
640 } // if (timeslice)
641 else {
642 return false;
643 } // else of if (timeslice)
644}
645
647{
648 if (false == fbListCompPerSysIdReady) {
651 if (0 == fdpTimesliceBuffer.size()) {
652 if (!AddNewTsInBuffer()) return false;
653 } // if( 0 == fdpTimesliceBuffer.size() )
654
655 if (nullptr == fdpTimesliceBuffer.front()) return false;
656
657 for (uint32_t uCompIdx = 0; uCompIdx < fdpTimesliceBuffer.front()->num_components(); ++uCompIdx) {
658 uint16_t usMsSysId = fdpTimesliceBuffer.front()->descriptor(uCompIdx, 0).sys_id;
659
660 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), usMsSysId);
661 if (fSysId.end() != pos) {
662 const vector<std::string>::size_type idx = pos - fSysId.begin();
663
664 fvvCompPerSysId[idx].push_back(uCompIdx);
665 } // if( fSysId.end() != pos )
666 } // for( uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx )
667
668 for (uint32_t uSysIdx = 0; uSysIdx < fComponents.size(); ++uSysIdx) {
669 std::stringstream ss;
670 ss << "Found " << std::setw(2) << fvvCompPerSysId[uSysIdx].size() << " components for SysId 0x" << std::hex
671 << std::setw(2) << fSysId[uSysIdx] << std::dec << " :";
672
673 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uSysIdx].size(); ++uComp) {
674 ss << " " << std::setw(3) << fvvCompPerSysId[uSysIdx][uComp];
675 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uSysIdx ].size(); ++uComp )
676
677 LOG(info) << ss.str();
678 } // for( uint32_t uSysId = 0; uSysId < fSysId.size(); ++uSysId )
679
681 } // if( false == fbListCompPerSysIdReady )
682
683 return true;
684}
686{
689 if (!PrepareCompListPerSysId()) return false;
690
692 const vector<std::string>::const_iterator pos = std::find(fComponents.begin(), fComponents.end(), sSystemName);
693 if (fComponents.end() != pos) {
694 const vector<std::string>::size_type idx = pos - fComponents.begin();
695 return CreateCombinedComponentsPerSysId(static_cast<uint32_t>(idx));
696 } // if (fComponents.end() != pos)
697 else {
698 LOG(error) << "Did not find " << sSystemName << " in the list of known systems";
699 return false;
700 } // else of if (fComponents.end() != pos)
701}
703{
706 if (!PrepareCompListPerSysId()) return false;
707
709 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
710 if (fSysId.end() != pos) {
711 const vector<int>::size_type idx = pos - fSysId.begin();
712 return CreateCombinedComponentsPerSysId(static_cast<uint32_t>(idx));
713 } // if (fSysId.end() != pos)
714 else {
715 LOG(error) << "Did not find 0x" << std::hex << iSysId << std::dec << " in the list of known systems";
716 return false;
717 } // else of if (fSysId.end() != pos)
718}
720{
722 LOG(debug) << "Create timeslice with components for SysId " << std::hex << fSysId[uCompIndex] << std::dec;
723
724 if (0 < fvvCompPerSysId[uCompIndex].size()) {
726 uint32_t uTsIndex = 0;
727 for (; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex) {
728 if (false == fdbCompSentFlags[uTsIndex][uCompIndex]) break;
729 } // for( ; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex )
730
732 if (fdpTimesliceBuffer.size() == uTsIndex) {
733 --uTsIndex;
734 if (!AddNewTsInBuffer()) return false;
735 } // if( fdpTimesliceBuffer.size() == uTsIndex )
736
738 fles::StorableTimeslice component {static_cast<uint32_t>(fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
739 fdpTimesliceBuffer[uTsIndex]->index()};
740
741 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[uCompIndex].size(); ++uComp) {
742 uint32_t uNumMsInComp = fdpTimesliceBuffer[uTsIndex]->num_microslices(fvvCompPerSysId[uCompIndex][uComp]);
743 component.append_component(uNumMsInComp);
744
745 LOG(debug) << "Add components to TS for SysId " << std::hex << fSysId[uCompIndex] << std::dec << " TS "
746 << fdpTimesliceBuffer[uTsIndex]->index() << " Comp " << fvvCompPerSysId[uCompIndex][uComp];
747
748 for (size_t m = 0; m < uNumMsInComp; ++m) {
749 component.append_microslice(uComp, m,
750 fdpTimesliceBuffer[uTsIndex]->descriptor(fvvCompPerSysId[uCompIndex][uComp], m),
751 fdpTimesliceBuffer[uTsIndex]->content(fvvCompPerSysId[uCompIndex][uComp], m));
752 } // for( size_t m = 0; m < uNumMsInComp; ++m )
753 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uCompIndex ].size(); ++uComp )
754
755 LOG(debug) << "Prepared timeslice for SysId " << std::hex << fSysId[uCompIndex] << std::dec << " with "
756 << component.num_components() << " components";
757
758 if (!SendData(component)) return false;
759
760 fdbCompSentFlags[uTsIndex][uCompIndex] = true;
761 } // if (0 < fvvCompPerSysId[uCompIndex].size())
762
763 return true;
764}
765
767{
768 if (false == fbListCompPerBlockReady) {
770 if (!PrepareCompListPerSysId()) return false;
771
773 for (auto itBlock = fvBlocksToSend.begin(); itBlock != fvBlocksToSend.end(); ++itBlock) {
774 auto uBlockIdx = itBlock - fvBlocksToSend.begin();
775
776 for (auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys) {
778 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), *itSys);
779 if (fSysId.end() != pos) {
780 const vector<int>::size_type idxSys = pos - fSysId.begin();
781
783 for (uint32_t uComp = 0; uComp < fvvCompPerSysId[idxSys].size(); ++uComp) {
784 fvvCompPerBlock[uBlockIdx].push_back(fvvCompPerSysId[idxSys][uComp]);
785 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ idxSys ].size(); ++uComp )
786 } // if (fSysId.end() != pos)
787 else {
788 LOG(error) << "Error when building the components list for block " << itBlock->first;
789 LOG(error) << "Did not find 0x" << std::hex << *itSys << std::dec << " in the list of known systems";
790 return false;
791 } // else of if (fSysId.end() != pos)
792 } // for( auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys )
793 } // for( auto itBlock = fvBlocksToSend.begin(); itBlock != fvBlocksToSend.end(); ++itBlock)
794
796 } // if( false == fbListCompPerBlockReady )
797
798 return true;
799}
800
802{
805 if (!PrepareCompListPerBlock()) return false;
806
808 for (auto itKnownBlock = fvBlocksToSend.begin(); itKnownBlock != fvBlocksToSend.end(); ++itKnownBlock) {
809 if ((*itKnownBlock).first == sBlockName) {
810 auto uBlockIdx = itKnownBlock - fvBlocksToSend.begin();
811
813 uint32_t uTsIndex = 0;
814 for (; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex) {
815 if (false == fdbCompSentFlags[uTsIndex][uBlockIdx]) break;
816 } // for( ; uTsIndex < fdpTimesliceBuffer.size(); ++uTsIndex )
817
819 if (fdpTimesliceBuffer.size() == uTsIndex) {
820 --uTsIndex;
821 if (!AddNewTsInBuffer()) return false;
822 } // if( fdpTimesliceBuffer.size() == uTsIndex )
823
825 fles::StorableTimeslice component {static_cast<uint32_t>(fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
826 fdpTimesliceBuffer[uTsIndex]->index()};
827
828 for (uint32_t uComp = 0; uComp < fvvCompPerBlock[uBlockIdx].size(); ++uComp) {
829 uint32_t uNumMsInComp = fdpTimesliceBuffer[uTsIndex]->num_microslices(fvvCompPerBlock[uBlockIdx][uComp]);
830 component.append_component(uNumMsInComp);
831
832 LOG(debug) << "Add components to TS for Block " << sBlockName << " TS " << fdpTimesliceBuffer[uTsIndex]->index()
833 << " Comp " << fvvCompPerBlock[uBlockIdx][uComp];
834
835 for (size_t m = 0; m < uNumMsInComp; ++m) {
836 component.append_microslice(uComp, m,
837 fdpTimesliceBuffer[uTsIndex]->descriptor(fvvCompPerBlock[uBlockIdx][uComp], m),
838 fdpTimesliceBuffer[uTsIndex]->content(fvvCompPerBlock[uBlockIdx][uComp], m));
839 } // for( size_t m = 0; m < uNumMsInComp; ++m )
840 } // for( uint32_t uComp = 0; uComp < fvvCompPerSysId[ uCompIndex ].size(); ++uComp )
841
842 LOG(debug) << "Prepared timeslice for Block " << sBlockName << " with " << component.num_components()
843 << " components";
844
845 if (!SendData(component)) return false;
846
847 fdbCompSentFlags[uTsIndex][uBlockIdx] = true;
848 return true;
849 } // if( (*itKnownBlock).first == sBlockName )
850 } // for( auto itKnownBlock = fvBlocksToSend.begin(); itKnownBlock != fvBlocksToSend.end(); ++itKnownBlock)
851
853 LOG(error) << "Requested block " << sBlockName << " not found in the list of known blocks";
854 return false;
855}
856
858{
859 // create the message with the first timeslice index
860 std::string sIndex = FormatDecPrintout(fulFirstTsIndex);
861 // serialize the vector and create the message
862 std::stringstream oss;
863 boost::archive::binary_oarchive oa(oss);
864 oa << sIndex;
865 std::string* strMsg = new std::string(oss.str());
866
867 FairMQMessagePtr msg(NewMessage(
868 const_cast<char*>(strMsg->c_str()), // data
869 strMsg->length(), // size
870 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
871 strMsg)); // object that manages the data
872
873 // in case of error or transfer interruption,
874 // return false to go to IDLE state
875 // successfull transfer will return number of bytes
876 // transfered (can be 0 if sending an empty message).
877 if (Send(msg, fsChannelNameTsRequest) < 0) {
878 LOG(error) << "Problem sending reply with first TS index";
879 return false;
880 }
881
883 LOG(debug) << "Send message " << fulMessageCounter << " with a size of " << msg->GetSize();
884
885 return true;
886}
887bool CbmMQTsSamplerRepReq::SendData(const fles::StorableTimeslice& component)
888{
889 // serialize the timeslice and create the message
890 std::stringstream oss;
891 boost::archive::binary_oarchive oa(oss);
892 oa << component;
893 std::string* strMsg = new std::string(oss.str());
894
895 FairMQMessagePtr msg(NewMessage(
896 const_cast<char*>(strMsg->c_str()), // data
897 strMsg->length(), // size
898 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
899 strMsg)); // object that manages the data
900
901 // in case of error or transfer interruption,
902 // return false to go to IDLE state
903 // successfull transfer will return number of bytes
904 // transfered (can be 0 if sending an empty message).
905 if (Send(msg, fsChannelNameTsRequest) < 0) {
906 LOG(error) << "Problem sending data";
907 return false;
908 }
909
911 LOG(debug) << "Send message " << fulMessageCounter << " with a size of " << msg->GetSize();
912
913 return true;
914}
915bool CbmMQTsSamplerRepReq::SendMissedTsIdx(std::vector<uint64_t> vIndices)
916{
917 // serialize the vector and create the message
918 std::stringstream oss;
919 boost::archive::binary_oarchive oa(oss);
920 oa << vIndices;
921 std::string* strMsg = new std::string(oss.str());
922
923 FairMQMessagePtr msg(NewMessage(
924 const_cast<char*>(strMsg->c_str()), // data
925 strMsg->length(), // size
926 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
927 strMsg)); // object that manages the data
928
929 // in case of error or transfer interruption,
930 // return false to go to IDLE state
931 // successfull transfer will return number of bytes
932 // transfered (can be 0 if sending an empty message).
933 LOG(debug) << "Send data to channel " << fsChannelNameMissedTs;
934 if (Send(msg, fsChannelNameMissedTs) < 0) {
935 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameMissedTs;
936 return false;
937 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
938
939 return true;
940}
941bool CbmMQTsSamplerRepReq::SendCommand(std::string sCommand)
942{
943 // serialize the vector and create the message
944 std::stringstream oss;
945 boost::archive::binary_oarchive oa(oss);
946 oa << sCommand;
947 std::string* strMsg = new std::string(oss.str());
948
949 FairMQMessagePtr msg(NewMessage(
950 const_cast<char*>(strMsg->c_str()), // data
951 strMsg->length(), // size
952 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
953 strMsg)); // object that manages the data
954
955 // FairMQMessagePtr msg( NewMessage( const_cast<char*>( sCommand.c_str() ), // data
956 // sCommand.length(), // size
957 // []( void* /*data*/, void* object ){ delete static_cast< std::string * >( object ); },
958 // &sCommand ) ); // object that manages the data
959
960 // in case of error or transfer interruption,
961 // return false to go to IDLE state
962 // successfull transfer will return number of bytes
963 // transfered (can be 0 if sending an empty message).
964 LOG(debug) << "Send data to channel " << fsChannelNameCommands;
965 if (Send(msg, fsChannelNameCommands) < 0) {
966 LOG(error) << "Problem sending missed TS indices to channel " << fsChannelNameCommands;
967 return false;
968 } // if( Send( msg, fsChannelNameMissedTs ) < 0 )
969
970 return true;
971}
973{
975 std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
976 FairMQMessagePtr messageHeader(NewMessage());
977 // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
978 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
979
980 FairMQParts partsOut;
981 partsOut.AddPart(std::move(messageHeader));
982
983 for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
985 FairMQMessagePtr messageHist(NewMessage());
986 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
987 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
988
989 partsOut.AddPart(std::move(messageHist));
990 } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
991
994 if (0 == fvpsHistosFolder.size()) {
995 FairMQMessagePtr messageHist(NewMessage());
996 partsOut.AddPart(std::move(messageHist));
997 }
998
999 for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
1001 FairMQMessagePtr messageCan(NewMessage());
1002 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
1003 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
1004
1005 partsOut.AddPart(std::move(messageCan));
1006 } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
1007
1010 if (0 == fvpsCanvasConfig.size()) {
1011 FairMQMessagePtr messageHist(NewMessage());
1012 partsOut.AddPart(std::move(messageHist));
1013 }
1014
1016 FairMQMessagePtr msgHistos(NewMessage());
1017 // Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
1018 RootSerializer().Serialize(*msgHistos, &fArrayHisto);
1019
1020 partsOut.AddPart(std::move(msgHistos));
1021
1023 if (Send(partsOut, fsChannelNameHistosInput) < 0) {
1024 LOG(error) << "CbmMQTsSamplerRepReq::SendHistoConfAndData => Problem sending data";
1025 return false;
1026 } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
1027
1030
1031 return true;
1032}
1033
1035{
1037 FairMQMessagePtr message(NewMessage());
1038 // Serialize<RootSerializer>(*message, &fArrayHisto);
1039 RootSerializer().Serialize(*message, &fArrayHisto);
1040
1042 if (Send(message, fsChannelNameHistosInput) < 0) {
1043 LOG(error) << "Problem sending data";
1044 return false;
1045 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
1046
1049
1050 return true;
1051}
1052
1054{
1055 fhTsRate->Reset();
1056 fhTsSize->Reset();
1057 fhTsSizeEvo->Reset();
1058 fhTsMaxSizeEvo->Reset();
1059 fhMissedTS->Reset();
1060 fhMissedTSEvo->Reset();
1061
1062 return true;
1063}
1064
1066
1068{
1069 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
1070
1071 LOG(info) << "Runtime: " << run_time.count();
1072 LOG(info) << "No more input data";
1073}
std::string GenerateCanvasConfigString(TCanvas *pCanv)
std::string FormatDecPrintout(uint64_t ulVal, char cFill, uint uWidth)
static constexpr size_t size()
Definition KfSimdPseudo.h:2
bool first
Generates beam ions for transport simulation.
std::chrono::system_clock::time_point fLastPublishTime
std::deque< std::vector< bool > > fdbCompSentFlags
bool CreateCombinedComponentsPerBlock(std::string sBlockName)
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::deque< std::unique_ptr< fles::Timeslice > > fdpTimesliceBuffer
Buffering of partially sent timeslices, limited by fulHighWaterMark.
std::vector< std::vector< uint32_t > > fvvCompPerSysId
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
std::vector< int > fSysId
bool HandleRequest(FairMQMessagePtr &, int)
std::vector< std::string > fComponents
std::vector< bool > fComponentActive
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::vector< std::vector< uint32_t > > fvvCompPerBlock
bool fbEofFound
Flag indicating the EOF was reached to avoid sending an emergency STOP.
std::unique_ptr< fles::Timeslice > GetNewTs()
std::vector< std::pair< std::string, std::set< uint16_t > > > fvBlocksToSend
bool SendData(const fles::StorableTimeslice &component)
std::vector< std::string > fvsInputFileList
List of input files.
bool CreateCombinedComponentsPerSysId(std::string sSystemName)
std::chrono::steady_clock::time_point fTime
fles::TimesliceSource * fSource
bool SendCommand(std::string sCommand)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.