CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMonitorTof.cxx
Go to the documentation of this file.
1/* Copyright (C) 2020-2021 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
12#include "CbmDeviceMonitorTof.h"
13
14#include "CbmFlesCanvasTools.h"
15#include "CbmMQDefs.h"
17
18#include "StorableTimeslice.hpp"
19
20#include "FairMQLogger.h"
21#include "FairMQProgOptions.h" // device->fConfig
22#include "FairParGenericSet.h"
23
24#include "TCanvas.h"
25#include "TFile.h"
26#include "TH1.h"
27#include "TList.h"
28#include "TNamed.h"
29
30#include "BoostSerializer.h"
31#include <boost/archive/binary_iarchive.hpp>
32#include <boost/serialization/utility.hpp>
33
34#include <array>
35#include <iomanip>
36#include <stdexcept>
37#include <string>
38
39#include "RootSerializer.h"
40struct InitTaskError : std::runtime_error {
41 using std::runtime_error::runtime_error;
42};
43
44using namespace std;
45
47
49try {
51 LOG(info) << "Init options for CbmMqStarHistoServer.";
52
53 fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
54 fbDebugMonitorMode = fConfig->GetValue<bool>("DebugMoni");
55 fbIgnoreCriticalErrors = fConfig->GetValue<bool>("IgnCritErr");
56 fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
57 fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
58 fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
59 fiGdpbIndex = fConfig->GetValue<int32_t>("GdpbIdx");
60
61 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
62 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
63 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
64 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
65 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
67
68 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
69 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
70 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
71
74
75 // Get the information about created channels from the device
76 // Check if the defined channels from the topology (by name)
77 // are in the list of channels which are possible/allowed
78 // for the device
79 // The idea is to check at initilization if the devices are
80 // properly connected. For the time beeing this is done with a
81 // nameing convention. It is not avoided that someone sends other
82 // data on this channel.
83 //logger::SetLogLevel("INFO");
84
85 int noChannel = fChannels.size();
86 LOG(info) << "Number of defined channels: " << noChannel;
87 for (auto const& entry : fChannels) {
88 LOG(info) << "Channel name: " << entry.first;
89 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
90 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
91 OnData(entry.first, &CbmDeviceMonitorTof::HandleData);
92 } // if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
93 } // for( auto const &entry : fChannels )
94}
95catch (InitTaskError& e) {
96 LOG(error) << e.what();
97 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
99}
100
101bool CbmDeviceMonitorTof::IsChannelNameAllowed(std::string channelName)
102{
103 for (auto const& entry : fsAllowedChannels) {
104 std::size_t pos1 = channelName.find(entry);
105 if (pos1 != std::string::npos) {
106 const vector<std::string>::const_iterator pos =
107 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
108 const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
109 LOG(info) << "Found " << entry << " in " << channelName;
110 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
111 return true;
112 } // if (pos1!=std::string::npos)
113 } // for(auto const &entry : fsAllowedChannels)
114 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
115 LOG(error) << "Stop device.";
116 return false;
117}
118
120{
121 LOG(info) << "Init parameter containers for CbmDeviceMonitorTof.";
122
124
125 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
126 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
127 fParCList->Remove(tempObj);
128 std::string paramName {tempObj->GetName()};
129 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
130 // Should only be used for small data because of the cost of an additional copy
131
132 // Her must come the proper Runid
133 std::string message = paramName + ",111";
134 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
135
136 FairMQMessagePtr req(NewSimpleMessage(message));
137 FairMQMessagePtr rep(NewMessage());
138
139 FairParGenericSet* newObj = nullptr;
140
141 if (Send(req, "parameters") > 0) {
142 if (Receive(rep, "parameters") >= 0) {
143 if (rep->GetSize() != 0) {
144 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
145 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
146 LOG(info) << "Received unpack parameter from the server:";
147 newObj->print();
148 }
149 else {
150 LOG(error) << "Received empty reply. Parameter not available";
151 } // if (rep->GetSize() != 0)
152 } // if (Receive(rep, "parameters") >= 0)
153 } // if (Send(req, "parameters") > 0)
154 fParCList->AddAt(newObj, iparC);
155 delete tempObj;
156 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
157
165
166 Bool_t initOK = fMonitorAlgo->InitContainers();
167
168 return initOK;
169}
170
172{
175 bool initOK = fMonitorAlgo->CreateHistograms();
176
178 std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
180 std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
181
186 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
187 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
188 // << " in " << vHistos[ uHisto ].second.data()
189 // ;
190 fArrayHisto.Add(vHistos[uHisto].first);
191 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
192 fvpsHistosFolder.push_back(psHistoConfig);
193
194 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
195 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
196
200 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
201 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
202 // << " in " << vCanvases[ uCanv ].second.data();
203 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
204 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
205
206 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
207
208 fvpsCanvasConfig.push_back(psCanvConfig);
209
210 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
211 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
212
213 return initOK;
214}
215
216
217// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
218bool CbmDeviceMonitorTof::HandleData(FairMQMessagePtr& msg, int /*index*/)
219{
220 if (0 == fulNumMessages) {
221 try {
223 }
224 catch (InitTaskError& e) {
225 LOG(error) << e.what();
226 ChangeState(fair::mq::Transition::ErrorFound);
227 }
228 } // if( 0 == fulNumMessages)
229
230 if (0 == fulNumMessages) InitHistograms();
231
233 LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
234
235 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
236
237 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
238 std::istringstream iss(msgStr);
239 boost::archive::binary_iarchive inputArchive(iss);
240
242 fles::StorableTimeslice component {0};
243 inputArchive >> component;
244
246 DoUnpack(component, 0);
247
251 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
252 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
253 if ((fdMaxPublishTime < elapsedSeconds.count())
254 || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
255 if (!fbConfigSent) {
256 // Send the configuration only once per run!
258 } // if( !fbConfigSent )
259 else
261
262 fLastPublishTime = std::chrono::system_clock::now();
263 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
264
265 return true;
266}
267
269{
271 std::pair<uint32_t, uint32_t> pairHeader(fvpsHistosFolder.size(), fvpsCanvasConfig.size());
272 FairMQMessagePtr messageHeader(NewMessage());
273 // Serialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*messageHeader, pairHeader);
274 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
275
276 FairMQParts partsOut;
277 partsOut.AddPart(std::move(messageHeader));
278
279 for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto) {
281 FairMQMessagePtr messageHist(NewMessage());
282 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, fvpsHistosFolder[uHisto]);
283 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, fvpsHistosFolder[uHisto]);
284
285 partsOut.AddPart(std::move(messageHist));
286 } // for (UInt_t uHisto = 0; uHisto < fvpsHistosFolder.size(); ++uHisto)
287
288 for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
290 FairMQMessagePtr messageCan(NewMessage());
291 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, fvpsCanvasConfig[uCanv]);
292 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, fvpsCanvasConfig[uCanv]);
293
294 partsOut.AddPart(std::move(messageCan));
295 } // for (UInt_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv)
296
298 FairMQMessagePtr msgHistos(NewMessage());
299 // Serialize<RootSerializer>(*msgHistos, &fArrayHisto);
300 RootSerializer().Serialize(*msgHistos, &fArrayHisto);
301
302 partsOut.AddPart(std::move(msgHistos));
303
305 if (Send(partsOut, fsChannelNameHistosInput) < 0) {
306 LOG(error) << "CbmDeviceMonitorTof::SendHistoConfAndData => Problem sending data";
307 return false;
308 } // if( Send( partsOut, fsChannelNameHistosInput ) < 0 )
309
312
313 return true;
314}
315
317{
319 FairMQMessagePtr message(NewMessage());
320 // Serialize<RootSerializer>(*message, &fArrayHisto);
321 RootSerializer().Serialize(*message, &fArrayHisto);
322
324 if (Send(message, fsChannelNameHistosInput) < 0) {
325 LOG(error) << "Problem sending data";
326 return false;
327 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
328
331
332 return true;
333}
334
335
337
338
339Bool_t CbmDeviceMonitorTof::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
340{
341 fulTsCounter++;
342
343 if (kFALSE == fbComponentsAddedToList) {
344 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
345 if (kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) {
347 } // if( kusSysIdTof == ts.descriptor( uCompIdx, 0 ).sys_id )
348 else if (kusSysIdBmon == ts.descriptor(uCompIdx, 0).sys_id) {
350 } // if( kusSysIdBmon == ts.descriptor( uCompIdx, 0 ).sys_id )
351 } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
353 } // if( kFALSE == fbComponentsAddedToList )
354
355 if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
356 LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
357 return kTRUE;
358 } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
359
362
363 if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << " time slices";
364
365 return kTRUE;
366}
367
std::string GenerateCanvasConfigString(TCanvas *pCanv)
bool first
uint64_t fulNumMessages
Statistics & first TS rejection.
bool IsChannelNameAllowed(std::string channelName)
Bool_t fbComponentsAddedToList
If ON not printout at all for critical errors.
std::chrono::system_clock::time_point fLastPublishTime
std::string fsChannelNameHistosInput
Bool_t fbDebugMonitorMode
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
CbmMcbm2018MonitorAlgoTof * fMonitorAlgo
Processing algo.
std::string fsChannelNameDataInput
User settings parameters.
TList * fParCList
Parameters management.
static const uint16_t kusSysIdTof
Constants.
Bool_t fbIgnoreCriticalErrors
Switch ON the filling of a additional set of histograms.
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
Bool_t fbIgnoreOverlapMs
Control flags.
bool HandleData(FairMQMessagePtr &, int)
Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
static const uint16_t kusSysIdBmon
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void UseAbsoluteTime(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetIgnoreCriticalErrors(Bool_t bFlagIn=kTRUE)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetDebugMonitorMode(Bool_t bFlagIn=kTRUE)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.