CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMonitorReqTof.cxx
Go to the documentation of this file.
1/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
6
8#include "CbmMQDefs.h"
10
11#include "StorableTimeslice.hpp"
12
13#include "FairMQLogger.h"
14#include "FairMQProgOptions.h" // device->fConfig
15#include "FairParGenericSet.h"
16
17#include "TCanvas.h"
18#include "TFile.h"
19#include "TH1.h"
20#include "TList.h"
21#include "TNamed.h"
22
23#include "BoostSerializer.h"
24#include <boost/archive/binary_iarchive.hpp>
25#include <boost/serialization/utility.hpp>
26
27#include <array>
28#include <iomanip>
29#include <stdexcept>
30#include <string>
31
32#include "RootSerializer.h"
33struct InitTaskError : std::runtime_error {
34 using std::runtime_error::runtime_error;
35};
36
37using namespace std;
38
40
42try {
44 LOG(info) << "Init options for CbmMqStarHistoServer.";
45
46 fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
47 fbDebugMonitorMode = fConfig->GetValue<bool>("DebugMoni");
48 fbIgnoreCriticalErrors = fConfig->GetValue<bool>("IgnCritErr");
49 fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
50 fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
51 fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
52 fiGdpbIndex = fConfig->GetValue<int32_t>("GdpbIdx");
53
54 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
55 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
56 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
57 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
58 fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName");
59 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
60
61 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
62 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
63 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
64
67}
68catch (InitTaskError& e) {
69 LOG(error) << e.what();
70 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
72}
73
75{
76 LOG(info) << "Init parameter containers for CbmDeviceMonitorReqTof.";
77
79
80 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
81 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
82 fParCList->Remove(tempObj);
83 std::string paramName {tempObj->GetName()};
84 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
85 // Should only be used for small data because of the cost of an additional copy
86
87 // Here must come the proper Runid
88 std::string message = paramName + ",111";
89 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
90
91 FairMQMessagePtr req(NewSimpleMessage(message));
92 FairMQMessagePtr rep(NewMessage());
93
94 FairParGenericSet* newObj = nullptr;
95
96 if (Send(req, "parameters") > 0) {
97 if (Receive(rep, "parameters") >= 0) {
98 if (rep->GetSize() != 0) {
99 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
100 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
101 LOG(info) << "Received unpack parameter from the server:";
102 newObj->print();
103 }
104 else {
105 LOG(error) << "Received empty reply. Parameter not available";
106 } // if (rep->GetSize() != 0)
107 } // if (Receive(rep, "parameters") >= 0)
108 } // if (Send(req, "parameters") > 0)
109 fParCList->AddAt(newObj, iparC);
110 delete tempObj;
111 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
112
120
121 Bool_t initOK = fMonitorAlgo->InitContainers();
122
123 return initOK;
124}
125
127{
130 bool initOK = fMonitorAlgo->CreateHistograms();
131
133 std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
135 std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
136
141 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
142 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
143 // << " in " << vHistos[ uHisto ].second.data()
144 // ;
145 fArrayHisto.Add(vHistos[uHisto].first);
146 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
147 fvpsHistosFolder.push_back(psHistoConfig);
148
149 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
150 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
151
155 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
156 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
157 // << " in " << vCanvases[ uCanv ].second.data();
158 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
159 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
160
161 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
162
163 fvpsCanvasConfig.push_back(psCanvConfig);
164
165 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
166 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
167
168 return initOK;
169}
170
171
173{
175 std::string message = fsTsBlockName;
176 if ("" == message) message = std::to_string(kusSysIdTof);
177 LOG(debug) << "Requesting new TS by sending message: " << message;
178 FairMQMessagePtr req(NewSimpleMessage(message));
179 FairMQMessagePtr rep(NewMessage());
180
181 if (Send(req, fsChannelNameDataInput) <= 0) {
182 LOG(error) << "Failed to send the request! message was " << message;
183 return false;
184 } // if (Send(req, fsChannelNameDataInput) <= 0)
185 else if (Receive(rep, fsChannelNameDataInput) < 0) {
186 LOG(error) << "Failed to receive a reply to the request! message was " << message;
187 return false;
188 } // else if (Receive(rep, fsChannelNameDataInput) < 0)
189 else if (rep->GetSize() == 0) {
190 LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
191 return false;
192 } // else if (rep->GetSize() == 0)
193
195 if (0 == fulNumMessages) {
196 try {
198 }
199 catch (InitTaskError& e) {
200 LOG(error) << e.what();
201 ChangeState(fair::mq::Transition::ErrorFound);
202 }
203 } // if( 0 == fulNumMessages)
204
205 if (0 == fulNumMessages) InitHistograms();
206
209 LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize();
210
211 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
212
213 std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize());
214 std::istringstream iss(msgStr);
215 boost::archive::binary_iarchive inputArchive(iss);
216
218 fles::StorableTimeslice component {0};
219 inputArchive >> component;
220
222 DoUnpack(component, 0);
223
227 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
228 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
229 if ((fdMaxPublishTime < elapsedSeconds.count())
230 || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
231 if (!fbConfigSent) {
232 // Send the configuration only once per run!
234 } // if( !fbConfigSent )
235 else
237
238 fLastPublishTime = std::chrono::system_clock::now();
239 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
240
241 return true;
242}
243
245{
247 std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
248 FairMQMessagePtr messageHeader(NewMessage());
249 // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
250 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
251
252 FairMQParts partsOut;
253 partsOut.AddPart(std::move(messageHeader));
254
255 for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
257 FairMQMessagePtr messageHist(NewMessage());
258 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
259 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
260
261 partsOut.AddPart(std::move(messageHist));
262 } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
263
264 for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
266 FairMQMessagePtr messageCan(NewMessage());
267 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
268 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
269
270 partsOut.AddPart(std::move(messageCan));
271 } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
272
274 FairMQMessagePtr msgHistos(NewMessage());
275 // Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
276 RootSerializer().Serialize(*msgHistos, &fArrayHisto);
277
278 partsOut.AddPart(std::move(msgHistos));
279
281 if (Send(partsOut, fsChannelNameHistosInput) < 0) {
282 LOG(error) << "CbmDeviceMonitorReqTof::SendHistoConfAndData => Problem sending data";
283 return false;
284 } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
285
288
289 return true;
290}
291
293{
295 FairMQMessagePtr message(NewMessage());
296 // Serialize<RootSerializer>(*message, &fArrayHisto);
297 RootSerializer().Serialize(*message, &fArrayHisto);
298
300 if (Send(message, fsChannelNameHistosInput) < 0) {
301 LOG(error) << "Problem sending data";
302 return false;
303 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
304
307
308 return true;
309}
310
311
313
314
315Bool_t CbmDeviceMonitorReqTof::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
316{
317 fulTsCounter++;
318
319 if (kFALSE == fbComponentsAddedToList) {
320 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
321 if (kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) {
323 } // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
324 else if (kusSysIdBmon == ts.descriptor(uCompIdx, 0).sys_id) {
326 } // if( kusSysIdBmon == ts.descriptor( uCompIdx, 0 ).sys_id )
327 } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
329 } // if( kFALSE == fbComponentsAddedToList )
330
331 if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
332 LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
333 return kTRUE;
334 } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
335
338
339 if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
340
341 return kTRUE;
342}
343
std::string GenerateCanvasConfigString(TCanvas *pCanv)
bool first
CbmMcbm2018MonitorAlgoTof * fMonitorAlgo
Processing algo.
Bool_t fbIgnoreCriticalErrors
Switch ON the filling of a additional set of histograms.
std::string fsChannelNameDataInput
User settings parameters.
Bool_t fbComponentsAddedToList
If ON not printout at all for critical errors.
Bool_t fbIgnoreOverlapMs
Control flags.
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
TList * fParCList
Parameters management.
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
Bool_t fbDebugMonitorMode
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
static const uint16_t kusSysIdBmon
static const uint16_t kusSysIdTof
Constants.
uint64_t fulNumMessages
Statistics & first TS rejection.
std::chrono::system_clock::time_point fLastPublishTime
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void UseAbsoluteTime(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetIgnoreCriticalErrors(Bool_t bFlagIn=kTRUE)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetDebugMonitorMode(Bool_t bFlagIn=kTRUE)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.