CbmRoot
Loading...
Searching...
No Matches
CbmTsConsumerReqDevExample.cxx
Go to the documentation of this file.
1/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
6
8
9#include "StorableTimeslice.hpp"
10
11#include "FairMQLogger.h"
12#include "FairMQProgOptions.h" // device->fConfig
13#include "FairParGenericSet.h"
14
15#include "TCanvas.h"
16#include "TFile.h"
17#include "TH1.h"
18#include "TList.h"
19#include "TNamed.h"
20#include <thread>
21
22#include "BoostSerializer.h"
23#include <boost/archive/binary_iarchive.hpp>
24#include <boost/serialization/utility.hpp>
25
26#include <array>
27#include <iomanip>
28#include <stdexcept>
29#include <string>
30
31#include "RootSerializer.h"
32struct InitTaskError : std::runtime_error {
33 using std::runtime_error::runtime_error;
34};
35
36using namespace std;
37
38
40// ALGO: : fMonitorAlgo {new CbmMcbm2018MonitorAlgoBmon()}
41{
42}
43
45try {
47 LOG(info) << "Init options for CbmMqStarHistoServer.";
48 fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
49 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
50 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
51 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
52 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
53 fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName");
54 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
55
56 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
57 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
58 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
59}
60catch (InitTaskError& e) {
61 LOG(error) << e.what();
62 ChangeState(fair::mq::Transition::ErrorFound);
63}
64
66{
67 LOG(info) << "Init parameter containers for CbmTsConsumerReqDevExample.";
68
69 // ALGO: fParCList = fMonitorAlgo->GetParList();
70 fParCList = new TList();
71
72 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
73 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
74 fParCList->Remove(tempObj);
75 std::string paramName {tempObj->GetName()};
76 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
77 // Should only be used for small data because of the cost of an additional copy
78
79 // Her must come the proper Runid
80 std::string message = paramName + ",111";
81 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
82
83 FairMQMessagePtr req(NewSimpleMessage(message));
84 FairMQMessagePtr rep(NewMessage());
85
86 FairParGenericSet* newObj = nullptr;
87
88 if (Send(req, "parameters") > 0) {
89 if (Receive(rep, "parameters") >= 0) {
90 if (rep->GetSize() != 0) {
91 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
92 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
93 LOG(info) << "Received unpack parameter from the server:";
94 newObj->print();
95 }
96 else {
97 LOG(error) << "Received empty reply. Parameter not available";
98 } // if (rep->GetSize() != 0)
99 } // if (Receive(rep, "parameters") >= 0)
100 } // if (Send(req, "parameters") > 0)
101 fParCList->AddAt(newObj, iparC);
102 delete tempObj;
103 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
104
106 // ALGO: fMonitorAlgo->SetIgnoreOverlapMs(fbIgnoreOverlapMs);
107
108 // fMonitorAlgo->AddMsComponentToList(0, 0x90);
109
110 // ALGO: Bool_t initOK = fMonitorAlgo->InitContainers();
111 bool initOK = true;
112
113 return initOK;
114}
115
117{
120 // ALGO: bool initOK = fMonitorAlgo->CreateHistograms();
121 bool initOK = true;
122
124 // ALGO: std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
125 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
127 // ALGO: std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
128 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
129
134 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
135 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
136 // << " in " << vHistos[ uHisto ].second.data()
137 // ;
138 fArrayHisto.Add(vHistos[uHisto].first);
139 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
140 fvpsHistosFolder.push_back(psHistoConfig);
141
142 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
143 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
144
148 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
149 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
150 // << " in " << vCanvases[ uCanv ].second.data();
151 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
152 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
153
154 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
155
156 fvpsCanvasConfig.push_back(psCanvConfig);
157
158 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
159 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
160
161 return initOK;
162}
163
164
166{
168 std::string message = fsTsBlockName;
169 if ("" == message) message = std::to_string(kusSysId);
170 LOG(debug) << "Requesting new TS by sending message: " << message;
171 FairMQMessagePtr req(NewSimpleMessage(message));
172 FairMQMessagePtr rep(NewMessage());
173
174 if (Send(req, fsChannelNameDataInput) <= 0) {
175 LOG(error) << "Failed to send the request! message was " << message;
176 return false;
177 } // if (Send(req, fsChannelNameDataInput) <= 0)
178 else if (Receive(rep, fsChannelNameDataInput) < 0) {
179 LOG(error) << "Failed to receive a reply to the request! message was " << message;
180 return false;
181 } // else if (Receive(rep, fsChannelNameDataInput) < 0)
182 else if (rep->GetSize() == 0) {
183 LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
184 return false;
185 } // else if (rep->GetSize() == 0)
186
188 if (0 == fulNumMessages) {
189 try {
191 }
192 catch (InitTaskError& e) {
193 LOG(error) << e.what();
194 ChangeState(fair::mq::Transition::ErrorFound);
195 }
196 } // if( 0 == fulNumMessages)
197
198 if (0 == fulNumMessages) InitHistograms();
199
201 LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize();
202
203 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
204
205 std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize());
206 std::istringstream iss(msgStr);
207 boost::archive::binary_iarchive inputArchive(iss);
208
210 fles::StorableTimeslice component {0};
211 inputArchive >> component;
212
214 DoUnpack(component, 0);
215
219 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
220 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
221 if ((fdMaxPublishTime < elapsedSeconds.count())
222 || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
223 if (!fbConfigSent) {
224 // Send the configuration only once per run!
226 } // if( !fbConfigSent )
227 else
229
230 fLastPublishTime = std::chrono::system_clock::now();
231 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
232
233 return true;
234}
235
237{
239 std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
240 FairMQMessagePtr messageHeader(NewMessage());
241 // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
242 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
243
244 FairMQParts partsOut;
245 partsOut.AddPart(std::move(messageHeader));
246
247 for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
249 FairMQMessagePtr messageHist(NewMessage());
250 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
251 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
252
253 partsOut.AddPart(std::move(messageHist));
254 } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
255
256 for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
258 FairMQMessagePtr messageCan(NewMessage());
259 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
260 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
261
262 partsOut.AddPart(std::move(messageCan));
263 } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
264
266 FairMQMessagePtr msgHistos(NewMessage());
267 // Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
268 RootSerializer().Serialize(*msgHistos, &fArrayHisto);
269
270 partsOut.AddPart(std::move(msgHistos));
271
273 if (Send(partsOut, fsChannelNameHistosInput) < 0) {
274 LOG(error) << "CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";
275 return false;
276 } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
277
279 // ALGO: fMonitorAlgo->ResetHistograms(kFALSE);
280
281 return true;
282}
283
285{
287 FairMQMessagePtr message(NewMessage());
288 // Serialize<RootSerializer>(*message, &fArrayHisto);
289 RootSerializer().Serialize(*message, &fArrayHisto);
290
292 if (Send(message, fsChannelNameHistosInput) < 0) {
293 LOG(error) << "Problem sending data";
294 return false;
295 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
296
298 // ALGO: fMonitorAlgo->ResetHistograms(kFALSE);
299
300 return true;
301}
302
303
305
306
307Bool_t CbmTsConsumerReqDevExample::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
308{
309 fulTsCounter++;
310
311 if (kFALSE == fbComponentsAddedToList) {
312 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
313 if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
315 // ALGO:
316 std::this_thread::sleep_for(std::chrono::milliseconds(500));
317 } // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
318 } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
320 } // if( kFALSE == fbComponentsAddedToList )
321
322 // ALGO:
323 /*
324 if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
325 LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
326 return kTRUE;
327 } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
328 */
329
331 // ALGO: fMonitorAlgo->ClearVector();
332
333 if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
334
335 return kTRUE;
336}
337
std::string GenerateCanvasConfigString(TCanvas *pCanv)
bool first
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
TObjArray fArrayHisto
Processing algo.
Bool_t fbComponentsAddedToList
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
std::string fsChannelNameDataInput
User settings parameters.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
TList * fParCList
Parameters management.
bool DoUnpack(const fles::Timeslice &ts, size_t component)
std::chrono::system_clock::time_point fLastPublishTime
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
uint64_t fulNumMessages
Statistics & first TS rejection.
static const uint16_t kusSysId
Constants.
Hash for CbmL1LinkKey.