CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMonitorReqBmon.cxx
Go to the documentation of this file.
1/* Copyright (C) 2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
6
9
10#include "StorableTimeslice.hpp"
11
12#include "FairMQLogger.h"
13#include "FairMQProgOptions.h" // device->fConfig
14#include "FairParGenericSet.h"
15
16#include "TCanvas.h"
17#include "TFile.h"
18#include "TH1.h"
19#include "TList.h"
20#include "TNamed.h"
21
22#include "BoostSerializer.h"
23#include <boost/archive/binary_iarchive.hpp>
24#include <boost/serialization/utility.hpp>
25
26#include <array>
27#include <iomanip>
28#include <stdexcept>
29#include <string>
30
31#include "RootSerializer.h"
32struct InitTaskError : std::runtime_error {
33 using std::runtime_error::runtime_error;
34};
35
36using namespace std;
37
38
40
42try {
44 LOG(info) << "Init options for CbmMqStarHistoServer.";
45 fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
46 fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
47 fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
48 fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
49 fuOffSpillCountLimit = fConfig->GetValue<uint32_t>("SpillThr");
50 fuOffSpillCountLimitNonPulser = fConfig->GetValue<uint32_t>("SpillThrNonPuls");
51 fdSpillCheckInterval = fConfig->GetValue<double>("SpillCheckInt");
52 std::string sChanMap = fConfig->GetValue<std::string>("ChanMap");
53 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
54 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
55 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
56 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
57 fsTsBlockName = fConfig->GetValue<std::string>("TsBlockName");
58 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
59
60 UInt_t uChanIdx = 0;
61 size_t charPosDel = sChanMap.find(',');
62 while (uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel) {
63 fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel));
64 sChanMap = sChanMap.substr(charPosDel + 1);
65 uChanIdx++;
66 charPosDel = sChanMap.find(',');
67 } // while( uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel )
68 if (uChanIdx < fvuChanMap.size()) {
69 fvuChanMap[uChanIdx] = std::stoul(sChanMap);
70 } // if( uChanIdx < fvuChanMap.size() )
71
72 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
73 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
74 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
75
76 if ("" == fsTsBlockName) {
77 //
78 LOG(info) << "Requesting TS using the SysId: 0x" << std::hex << static_cast<int>(kusSysId) << std::dec;
79 }
80 else {
81 //
82 LOG(info) << "Requesting TS using the following block name: " << fsTsBlockName;
83 }
84}
85catch (InitTaskError& e) {
86 LOG(error) << e.what();
87 ChangeState(fair::mq::Transition::ErrorFound);
88}
89
91{
92 LOG(info) << "Init parameter containers for CbmDeviceMonitorReqBmon.";
93
95
96 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
97 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
98 fParCList->Remove(tempObj);
99 std::string paramName {tempObj->GetName()};
100 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
101 // Should only be used for small data because of the cost of an additional copy
102
103 // Her must come the proper Runid
104 std::string message = paramName + ",111";
105 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
106
107 FairMQMessagePtr req(NewSimpleMessage(message));
108 FairMQMessagePtr rep(NewMessage());
109
110 FairParGenericSet* newObj = nullptr;
111
112 if (Send(req, "parameters") > 0) {
113 if (Receive(rep, "parameters") >= 0) {
114 if (rep->GetSize() != 0) {
115 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
116 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
117 LOG(info) << "Received unpack parameter from the server:";
118 newObj->print();
119 }
120 else {
121 LOG(error) << "Received empty reply. Parameter not available";
122 } // if (rep->GetSize() != 0)
123 } // if (Receive(rep, "parameters") >= 0)
124 } // if (Send(req, "parameters") > 0)
125 fParCList->AddAt(newObj, iparC);
126 delete tempObj;
127 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
128
138 fvuChanMap[6], fvuChanMap[7]);
139
140 // fMonitorAlgo->AddMsComponentToList(0, 0x90);
141
142 Bool_t initOK = fMonitorAlgo->InitContainers();
143
144 return initOK;
145}
146
148{
151 bool initOK = fMonitorAlgo->CreateHistograms();
152
154 std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
156 std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
157
162 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
163 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
164 // << " in " << vHistos[ uHisto ].second.data()
165 // ;
166 fArrayHisto.Add(vHistos[uHisto].first);
167 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
168 fvpsHistosFolder.push_back(psHistoConfig);
169
170 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
171 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
172
176 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
177 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
178 // << " in " << vCanvases[ uCanv ].second.data();
179 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
180 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
181
182 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
183
184 fvpsCanvasConfig.push_back(psCanvConfig);
185
186 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
187 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
188
189 return initOK;
190}
191
192
194{
196 std::string message = fsTsBlockName;
197 if ("" == message) message = std::to_string(kusSysId);
198 LOG(debug) << "Requesting new TS by sending message: " << message;
199 FairMQMessagePtr req(NewSimpleMessage(message));
200 FairMQMessagePtr rep(NewMessage());
201
202 if (Send(req, fsChannelNameDataInput) <= 0) {
203 LOG(error) << "Failed to send the request! message was " << message;
204 return false;
205 } // if (Send(req, fsChannelNameDataInput) <= 0)
206 else if (Receive(rep, fsChannelNameDataInput) < 0) {
207 LOG(error) << "Failed to receive a reply to the request! message was " << message;
208 return false;
209 } // else if (Receive(rep, fsChannelNameDataInput) < 0)
210 else if (rep->GetSize() == 0) {
211 LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
212 return false;
213 } // else if (rep->GetSize() == 0)
214
216 if (0 == fulNumMessages) {
217 try {
219 }
220 catch (InitTaskError& e) {
221 LOG(error) << e.what();
222 ChangeState(fair::mq::Transition::ErrorFound);
223 }
224 } // if( 0 == fulNumMessages)
225
226 if (0 == fulNumMessages) InitHistograms();
227
229 LOG(debug) << "Received message number " << fulNumMessages << " with size " << rep->GetSize();
230
231 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
232
233 std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize());
234 std::istringstream iss(msgStr);
235 boost::archive::binary_iarchive inputArchive(iss);
236
238 fles::StorableTimeslice component {0};
239 inputArchive >> component;
240
242 DoUnpack(component, 0);
243
247 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
248 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
249 if ((fdMaxPublishTime < elapsedSeconds.count())
250 || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
251 if (!fbConfigSent) {
252 // Send the configuration only once per run!
254 } // if( !fbConfigSent )
255 else
257
258 fLastPublishTime = std::chrono::system_clock::now();
259 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
260
261 return true;
262}
263
265{
267 std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
268 FairMQMessagePtr messageHeader(NewMessage());
269 // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
270 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
271
272 FairMQParts partsOut;
273 partsOut.AddPart(std::move(messageHeader));
274
275 for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
277 FairMQMessagePtr messageHist(NewMessage());
278 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
279 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
280 partsOut.AddPart(std::move(messageHist));
281 } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
282
283 for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
285 FairMQMessagePtr messageCan(NewMessage());
286 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
287 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
288 partsOut.AddPart(std::move(messageCan));
289 } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
290
292 FairMQMessagePtr msgHistos(NewMessage());
293 // Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
294 RootSerializer().Serialize(*msgHistos, &fArrayHisto);
295
296 partsOut.AddPart(std::move(msgHistos));
297
299 if (Send(partsOut, fsChannelNameHistosInput) < 0) {
300 LOG(error) << "CbmDeviceMonitorReqBmon::SendHistoConfAndData => Problem sending data";
301 return false;
302 } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
303
306
307 return true;
308}
309
311{
313 FairMQMessagePtr message(NewMessage());
314 // Serialize<RootSerializer>(*message, &fArrayHisto);
315 RootSerializer().Serialize(*message, &fArrayHisto);
316
318 if (Send(message, fsChannelNameHistosInput) < 0) {
319 LOG(error) << "Problem sending data";
320 return false;
321 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
322
325
326 return true;
327}
328
329
331
332
333Bool_t CbmDeviceMonitorReqBmon::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
334{
335 fulTsCounter++;
336
337 if (kFALSE == fbComponentsAddedToList) {
338 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
339 if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
341 } // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
342 } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
344 } // if( kFALSE == fbComponentsAddedToList )
345
346 if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
347 LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
348 return kTRUE;
349 } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
350
353
354 if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
355
356 return kTRUE;
357}
358
std::string GenerateCanvasConfigString(TCanvas *pCanv)
bool first
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
Bool_t fbComponentsAddedToList
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
TList * fParCList
Parameters management.
std::chrono::system_clock::time_point fLastPublishTime
uint64_t fulNumMessages
Statistics & first TS rejection.
static const uint16_t kusSysId
Constants.
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
CbmMcbm2018MonitorAlgoBmon * fMonitorAlgo
Processing algo.
std::string fsChannelNameDataInput
User settings parameters.
Bool_t fbIgnoreOverlapMs
Control flags.
std::vector< uint32_t > fvuChanMap
bool DoUnpack(const fles::Timeslice &ts, size_t component)
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetSpillThreshold(UInt_t uCntLimit)
void SetMonitorMode(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetChannelMap(UInt_t uChan0, UInt_t uChan1, UInt_t uChan2, UInt_t uChan3, UInt_t uChan4, UInt_t uChan5, UInt_t uChan6, UInt_t uChan7)
void SetSpillCheckInterval(Double_t dIntervalSec)
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void SetSpillThresholdNonPulser(UInt_t uCntLimit)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.
Hash for CbmL1LinkKey.