CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMonitorBmon.cxx
Go to the documentation of this file.
1/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau, Florian Uhlig [committer] */
4
13
14#include "CbmFlesCanvasTools.h"
16
17#include "StorableTimeslice.hpp"
18
19#include "FairMQLogger.h"
20#include "FairMQProgOptions.h" // device->fConfig
21#include "FairParGenericSet.h"
22
23#include "TCanvas.h"
24#include "TFile.h"
25#include "TH1.h"
26#include "TList.h"
27#include "TNamed.h"
28
29#include "BoostSerializer.h"
30#include <boost/archive/binary_iarchive.hpp>
31#include <boost/serialization/utility.hpp>
32
33#include <array>
34#include <iomanip>
35#include <stdexcept>
36#include <string>
37
38#include "RootSerializer.h"
39struct InitTaskError : std::runtime_error {
40 using std::runtime_error::runtime_error;
41};
42
43using namespace std;
44
45
47 : fbIgnoreOverlapMs {false}
48 , fsChannelNameDataInput {"t0component"}
49 , fsChannelNameHistosInput {"histogram-in"}
50 , fuHistoryHistoSize {3600}
51 , fuMinTotPulser {185}
52 , fuMaxTotPulser {195}
53 , fuOffSpillCountLimit {25}
54 , fuOffSpillCountLimitNonPulser {10}
55 , fdSpillCheckInterval {0.0128}
56 , fvuChanMap {0, 1, 2, 3, 4, 5, 6, 7}
57 , fuPublishFreqTs {100}
58 , fdMinPublishTime {0.5}
59 , fdMaxPublishTime {5.0}
60 , fsAllowedChannels {fsChannelNameDataInput}
61 , fParCList {nullptr}
62 , fulNumMessages {0}
63 , fulTsCounter {0}
64 , fLastPublishTime {std::chrono::system_clock::now()}
65 , fMonitorAlgo {new CbmMcbm2018MonitorAlgoBmon()}
66 , fArrayHisto {}
67 , fvpsHistosFolder {}
68 , fvpsCanvasConfig {}
69{
70}
71
73try {
75 LOG(info) << "Init options for CbmMqStarHistoServer.";
76 fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
77 fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
78 fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
79 fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
80 fuOffSpillCountLimit = fConfig->GetValue<uint32_t>("SpillThr");
81 fuOffSpillCountLimitNonPulser = fConfig->GetValue<uint32_t>("SpillThrNonPuls");
82 fdSpillCheckInterval = fConfig->GetValue<double>("SpillCheckInt");
83 std::string sChanMap = fConfig->GetValue<std::string>("ChanMap");
84 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
85 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
86 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
87 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
88 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
90
91 UInt_t uChanIdx = 0;
92 size_t charPosDel = sChanMap.find(',');
93 while (uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel) {
94 LOG(info) << sChanMap.substr(0, charPosDel);
95 fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel));
96 LOG(info) << fvuChanMap[uChanIdx];
97 sChanMap = sChanMap.substr(charPosDel + 1);
98 LOG(info) << sChanMap;
99 uChanIdx++;
100 charPosDel = sChanMap.find(',');
101 } // while( uChanIdx < fvuChanMap.size() && std::string::npos != charPosDel )
102 if (uChanIdx < fvuChanMap.size()) {
103 LOG(info) << sChanMap;
104 fvuChanMap[uChanIdx] = std::stoul(sChanMap);
105 LOG(info) << fvuChanMap[uChanIdx];
106 } // if( uChanIdx < fvuChanMap.size() )
107
108 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
109 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
110 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
111
112 // Get the information about created channels from the device
113 // Check if the defined channels from the topology (by name)
114 // are in the list of channels which are possible/allowed
115 // for the device
116 // The idea is to check at initilization if the devices are
117 // properly connected. For the time beeing this is done with a
118 // nameing convention. It is not avoided that someone sends other
119 // data on this channel.
120 //logger::SetLogLevel("INFO");
121
122 int noChannel = fChannels.size();
123 LOG(info) << "Number of defined channels: " << noChannel;
124 for (auto const& entry : fChannels) {
125 LOG(info) << "Channel name: " << entry.first;
126 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
127 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
128 OnData(entry.first, &CbmDeviceMonitorBmon::HandleData);
129 } // if( entry.first.find( "ts" )
130 } // for( auto const &entry : fChannels )
131}
132catch (InitTaskError& e) {
133 LOG(error) << e.what();
134 ChangeState(fair::mq::Transition::ErrorFound);
135}
136
138{
139 for (auto const& entry : fsAllowedChannels) {
140 std::size_t pos1 = channelName.find(entry);
141 if (pos1 != std::string::npos) {
142 const vector<std::string>::const_iterator pos =
143 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
144 const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
145 LOG(info) << "Found " << entry << " in " << channelName;
146 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
147 return true;
148 } // if (pos1!=std::string::npos)
149 } // for(auto const &entry : fsAllowedChannels)
150 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
151 LOG(error) << "Stop device.";
152 return false;
153}
154
156{
157 LOG(info) << "Init parameter containers for CbmDeviceMonitorBmon.";
158
160
161 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
162 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
163 fParCList->Remove(tempObj);
164 std::string paramName {tempObj->GetName()};
165 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
166 // Should only be used for small data because of the cost of an additional copy
167
168 // Her must come the proper Runid
169 std::string message = paramName + ",111";
170 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
171
172 FairMQMessagePtr req(NewSimpleMessage(message));
173 FairMQMessagePtr rep(NewMessage());
174
175 FairParGenericSet* newObj = nullptr;
176
177 if (Send(req, "parameters") > 0) {
178 if (Receive(rep, "parameters") >= 0) {
179 if (rep->GetSize() != 0) {
180 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
181 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
182 LOG(info) << "Received unpack parameter from the server:";
183 newObj->print();
184 }
185 else {
186 LOG(error) << "Received empty reply. Parameter not available";
187 } // if (rep->GetSize() != 0)
188 } // if (Receive(rep, "parameters") >= 0)
189 } // if (Send(req, "parameters") > 0)
190 fParCList->AddAt(newObj, iparC);
191 delete tempObj;
192 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
193
203 fvuChanMap[6], fvuChanMap[7]);
204
205 // fMonitorAlgo->AddMsComponentToList(0, 0x90);
206
207 Bool_t initOK = fMonitorAlgo->InitContainers();
208
209 return initOK;
210}
211
213{
216 bool initOK = fMonitorAlgo->CreateHistograms();
217
219 std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
221 std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
222
227 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
228 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
229 // << " in " << vHistos[ uHisto ].second.data()
230 // ;
231 fArrayHisto.Add(vHistos[uHisto].first);
232 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
233 fvpsHistosFolder.push_back(psHistoConfig);
234
235 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
236 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
237
241 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
242 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
243 // << " in " << vCanvases[ uCanv ].second.data();
244 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
245 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
246
247 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
248
249 fvpsCanvasConfig.push_back(psCanvConfig);
250
251 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
252 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
253
254 return initOK;
255}
256
257
258// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
259bool CbmDeviceMonitorBmon::HandleData(FairMQMessagePtr& msg, int /*index*/)
260{
261
262 if (0 == fulNumMessages) {
263 try {
265 }
266 catch (InitTaskError& e) {
267 LOG(error) << e.what();
268 ChangeState(fair::mq::Transition::ErrorFound);
269 }
270 } // if( 0 == fulNumMessages)
271
272 if (0 == fulNumMessages) InitHistograms();
273
275 LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
276
277 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
278
279 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
280 std::istringstream iss(msgStr);
281 boost::archive::binary_iarchive inputArchive(iss);
282
284 fles::StorableTimeslice component {0};
285 inputArchive >> component;
286
288 DoUnpack(component, 0);
289
293 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
294 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
295 if ((fdMaxPublishTime < elapsedSeconds.count())
296 || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
297 if (!fbConfigSent) {
298 // Send the configuration only once per run!
300 } // if( !fbConfigSent )
301 else
303
304 fLastPublishTime = std::chrono::system_clock::now();
305 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
306
307 return true;
308}
309
311{
313 std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
314 FairMQMessagePtr messageHeader(NewMessage());
315 // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
316 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
317
318 FairMQParts partsOut;
319 partsOut.AddPart(std::move(messageHeader));
320
321 for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
323 FairMQMessagePtr messageHist(NewMessage());
324 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
325 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
326
327 partsOut.AddPart(std::move(messageHist));
328 } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
329
330 for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
332 FairMQMessagePtr messageCan(NewMessage());
333 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
334 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
335
336 partsOut.AddPart(std::move(messageCan));
337 } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
338
340 FairMQMessagePtr msgHistos(NewMessage());
341 // Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
342 RootSerializer().Serialize(*msgHistos, &fArrayHisto);
343
344 partsOut.AddPart(std::move(msgHistos));
345
347 if (Send(partsOut, fsChannelNameHistosInput) < 0) {
348 LOG(error) << "CbmDeviceMonitorBmon::SendHistoConfAndData => Problem sending data";
349 return false;
350 } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
351
354
355 return true;
356}
357
359{
361 FairMQMessagePtr message(NewMessage());
362 // Serialize<RootSerializer>(*message, &fArrayHisto);
363 RootSerializer().Serialize(*message, &fArrayHisto);
364
366 if (Send(message, fsChannelNameHistosInput) < 0) {
367 LOG(error) << "Problem sending data";
368 return false;
369 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
370
373
374 return true;
375}
376
377
379
380
381Bool_t CbmDeviceMonitorBmon::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
382{
383 fulTsCounter++;
384
385 if (kFALSE == fbComponentsAddedToList) {
386 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
387 if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
389 } // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
390 } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
392 } // if( kFALSE == fbComponentsAddedToList )
393
394 if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
395 LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
396 return kTRUE;
397 } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
398
401
402 if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
403
404 return kTRUE;
405}
406
std::string GenerateCanvasConfigString(TCanvas *pCanv)
bool first
TList * fParCList
Parameters management.
std::string fsChannelNameDataInput
User settings parameters.
bool HandleData(FairMQMessagePtr &, int)
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::chrono::system_clock::time_point fLastPublishTime
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool DoUnpack(const fles::Timeslice &ts, size_t component)
static const uint16_t kusSysId
Constants.
Bool_t fbComponentsAddedToList
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
std::vector< uint32_t > fvuChanMap
bool IsChannelNameAllowed(std::string channelName)
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
CbmMcbm2018MonitorAlgoBmon * fMonitorAlgo
Processing algo.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
uint64_t fulNumMessages
Statistics & first TS rejection.
Bool_t fbIgnoreOverlapMs
Control flags.
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetSpillThreshold(UInt_t uCntLimit)
void SetMonitorMode(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetChannelMap(UInt_t uChan0, UInt_t uChan1, UInt_t uChan2, UInt_t uChan3, UInt_t uChan4, UInt_t uChan5, UInt_t uChan6, UInt_t uChan7)
void SetSpillCheckInterval(Double_t dIntervalSec)
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void SetSpillThresholdNonPulser(UInt_t uCntLimit)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.
Hash for CbmL1LinkKey.