CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMonitorPsd.cxx
Go to the documentation of this file.
1/* Copyright (C) 2021 Institute for Nuclear Research, Moscow
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Nikolay Karpushkin [committer] */
4
13#include "CbmDeviceMonitorPsd.h"
14
15#include "CbmFlesCanvasTools.h"
16#include "CbmMQDefs.h"
18
19#include "StorableTimeslice.hpp"
20
21#include "FairMQLogger.h"
22#include "FairMQProgOptions.h" // device->fConfig
23#include "FairParGenericSet.h"
24
25#include "TCanvas.h"
26#include "TFile.h"
27#include "TH1.h"
28#include "TList.h"
29#include "TNamed.h"
30
31#include "BoostSerializer.h"
32#include <boost/archive/binary_iarchive.hpp>
33#include <boost/serialization/utility.hpp>
34
35#include <array>
36#include <iomanip>
37#include <stdexcept>
38#include <string>
39
40#include "RootSerializer.h"
41struct InitTaskError : std::runtime_error {
42 using std::runtime_error::runtime_error;
43};
44
45using namespace std;
46
48 : fbIgnoreOverlapMs {false}
49 , fsChannelNameDataInput {"psdcomponent"}
50 , fsChannelNameHistosInput {"histogram-in"}
51 , fsChannelNameHistosConfig {"histo-conf"}
52 , fsChannelNameCanvasConfig {"canvas-conf"}
53 , fuPublishFreqTs {100}
54 , fdMinPublishTime {0.5}
55 , fdMaxPublishTime {5.0}
56 , fuHistoryHistoSize {3600}
57 , fviHistoChargeArgs(3, 0)
58 , fviHistoAmplArgs(3, 0)
59 , fviHistoZLArgs(3, 0)
60 , fsAllowedChannels {fsChannelNameDataInput}
61 , fParCList {nullptr}
62 , fulNumMessages {0}
63 , fulTsCounter {0}
64 , fLastPublishTime {std::chrono::system_clock::now()}
65 , fMonitorAlgo {new CbmMcbm2018MonitorAlgoPsd()}
66 , fArrayHisto {}
67 , fvpsHistosFolder {}
68 , fvpsCanvasConfig {}
69{
70}
71
73try {
75 LOG(info) << "Init options for CbmMqStarHistoServer.";
76 fbIgnoreOverlapMs = fConfig->GetValue<bool>("IgnOverMs");
77 fbMonitorMode = fConfig->GetValue<bool>("MonitorMode");
78 fbMonitorChanMode = fConfig->GetValue<bool>("MonitorChanMode");
79 fbMonitorWfmMode = fConfig->GetValue<bool>("MonitorWfmMode");
80 fbMonitorFitMode = fConfig->GetValue<bool>("MonitorFitMode");
81 fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
82 fviHistoChargeArgs = fConfig->GetValue<std::vector<int>>("HistChrgArgs");
83 fviHistoAmplArgs = fConfig->GetValue<std::vector<int>>("HistAmplArgs");
84 fviHistoZLArgs = fConfig->GetValue<std::vector<int>>("HistZlArgs");
85 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
86 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
87 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
88 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
89 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
90 fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
91 fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
93
94 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
95 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
96 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
97
98 // Get the information about created channels from the device
99 // Check if the defined channels from the topology (by name)
100 // are in the list of channels which are possible/allowed
101 // for the device
102 // The idea is to check at initilization if the devices are
103 // properly connected. For the time beeing this is done with a
104 // nameing convention. It is not avoided that someone sends other
105 // data on this channel.
106 //logger::SetLogLevel("INFO");
107
108 int noChannel = fChannels.size();
109 LOG(info) << "Number of defined channels: " << noChannel;
110 for (auto const& entry : fChannels) {
111 LOG(info) << "Channel name: " << entry.first;
112 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
113 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
114 OnData(entry.first, &CbmDeviceMonitorPsd::HandleData);
115 } // if( entry.first.find( "ts" )
116 } // for( auto const &entry : fChannels )
118}
119catch (InitTaskError& e) {
120 LOG(error) << e.what();
121 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
123}
124
125bool CbmDeviceMonitorPsd::IsChannelNameAllowed(std::string channelName)
126{
127 for (auto const& entry : fsAllowedChannels) {
128 std::size_t pos1 = channelName.find(entry);
129 if (pos1 != std::string::npos) {
130 const vector<std::string>::const_iterator pos =
131 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
132 const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
133 LOG(info) << "Found " << entry << " in " << channelName;
134 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
135 return true;
136 } // if (pos1!=std::string::npos)
137 } // for(auto const &entry : fsAllowedChannels)
138 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
139 LOG(error) << "Stop device.";
140 return false;
141}
142
144{
145 LOG(info) << "Init parameter containers for CbmDeviceMonitorPsd.";
146
148
149 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
150 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
151 fParCList->Remove(tempObj);
152 std::string paramName {tempObj->GetName()};
153 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
154 // Should only be used for small data because of the cost of an additional copy
155
156 // Her must come the proper Runid
157 std::string message = paramName + ",111";
158 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
159
160 FairMQMessagePtr req(NewSimpleMessage(message));
161 FairMQMessagePtr rep(NewMessage());
162
163 FairParGenericSet* newObj = nullptr;
164
165 if (Send(req, "parameters") > 0) {
166 if (Receive(rep, "parameters") >= 0) {
167 if (rep->GetSize() != 0) {
168 CbmMqTMessage tmsg(rep->GetData(), rep->GetSize());
169 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
170 LOG(info) << "Received unpack parameter from the server:";
171 newObj->print();
172 }
173 else {
174 LOG(error) << "Received empty reply. Parameter not available";
175 } // if (rep->GetSize() != 0)
176 } // if (Receive(rep, "parameters") >= 0)
177 } // if (Send(req, "parameters") > 0)
178 fParCList->AddAt(newObj, iparC);
179 delete tempObj;
180 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
181
192 //fMonitorAlgo->AddMsComponentToList(0, 0x80);
193
194 Bool_t initOK = fMonitorAlgo->InitContainers();
195
196 // Bool_t initOK = fMonitorAlgo->ReInitContainers();
197
200 initOK &= fMonitorAlgo->CreateHistograms();
201
203 std::vector<std::pair<TNamed*, std::string>> vHistos = fMonitorAlgo->GetHistoVector();
205 std::vector<std::pair<TCanvas*, std::string>> vCanvases = fMonitorAlgo->GetCanvasVector();
206
211 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
212 // LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
213 // << " in " << vHistos[ uHisto ].second.data()
214 // ;
215 fArrayHisto.Add(vHistos[uHisto].first);
216 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].first->GetName(), vHistos[uHisto].second);
217 fvpsHistosFolder.push_back(psHistoConfig);
218
220 FairMQMessagePtr messageHist(NewMessage());
221 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageHist, psHistoConfig);
222 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);
223
225 if (Send(messageHist, fsChannelNameHistosConfig) < 0) {
226 LOG(error) << "Problem sending histo config";
227 return false;
228 } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
229
230 LOG(info) << "Config of hist " << psHistoConfig.first.data() << " in folder " << psHistoConfig.second.data();
231 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
232
236 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
237 // LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
238 // << " in " << vCanvases[ uCanv ].second.data();
239 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
240 std::string sCanvConf = GenerateCanvasConfigString(vCanvases[uCanv].first);
241
242 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
243
244 fvpsCanvasConfig.push_back(psCanvConfig);
245
247 FairMQMessagePtr messageCan(NewMessage());
248 // Serialize<BoostSerializer<std::pair<std::string, std::string>>>(*messageCan, psCanvConfig);
249 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);
250
252 if (Send(messageCan, fsChannelNameCanvasConfig) < 0) {
253 LOG(error) << "Problem sending canvas config";
254 return false;
255 } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
256
257 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data() << " is " << psCanvConfig.second.data();
258 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
259
260 return initOK;
261}
262
263
264// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
265bool CbmDeviceMonitorPsd::HandleData(FairMQMessagePtr& msg, int /*index*/)
266{
268 LOG(debug) << "Received message number " << fulNumMessages << " with size " << msg->GetSize();
269
270 if (0 == fulNumMessages % 10000) LOG(info) << "Received " << fulNumMessages << " messages";
271
272 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
273 std::istringstream iss(msgStr);
274 boost::archive::binary_iarchive inputArchive(iss);
275
277 fles::StorableTimeslice component {0};
278 inputArchive >> component;
279
281 DoUnpack(component, 0);
282
286 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
287 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
288 if ((fdMaxPublishTime < elapsedSeconds.count())
289 || (0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count())) {
291 fLastPublishTime = std::chrono::system_clock::now();
292 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
293
294 return true;
295}
296
298{
300 FairMQMessagePtr message(NewMessage());
301 // Serialize<RootSerializer>(*message, &fArrayHisto);
302 RootSerializer().Serialize(*message, &fArrayHisto);
303
305 if (Send(message, fsChannelNameHistosInput) < 0) {
306 LOG(error) << "Problem sending data";
307 return false;
308 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
309
312
313 return true;
314}
315
316
318
319Bool_t CbmDeviceMonitorPsd::DoUnpack(const fles::Timeslice& ts, size_t /*component*/)
320{
321
322 if (kFALSE == fbComponentsAddedToList) {
323 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
324 if (kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
326 } // if( kusSysId == ts.descriptor( uCompIdx, 0 ).sys_id )
327 } // for( uint32_t uComp = 0; uComp < ts.num_components(); ++uComp )
329 } // if( kFALSE == fbComponentsAddedToList )
330
331 if (kFALSE == fMonitorAlgo->ProcessTs(ts)) {
332 LOG(error) << "Failed processing TS " << ts.index() << " in unpacker algorithm class";
333 return kTRUE;
334 } // if( kFALSE == fMonitorAlgo->ProcessTs( ts ) )
335
338
339 if (0 == fulTsCounter % 10000) LOG(info) << "Processed " << fulTsCounter << "TS";
340 fulTsCounter++;
341
342 return kTRUE;
343}
344
345
std::string GenerateCanvasConfigString(TCanvas *pCanv)
bool first
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::string fsChannelNameHistosInput
bool HandleData(FairMQMessagePtr &, int)
Bool_t fbMonitorFitMode
Switch ON the filling waveforms histograms.
std::string fsChannelNameDataInput
Switch ON the filling waveform fitting histograms.
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool IsChannelNameAllowed(std::string channelName)
static const uint16_t kusSysId
Constants.
std::chrono::system_clock::time_point fLastPublishTime
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
std::string fsChannelNameCanvasConfig
uint64_t fulNumMessages
Statistics & first TS rejection.
Bool_t fbMonitorMode
Control flags.
Bool_t fbMonitorChanMode
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
Bool_t fbMonitorWfmMode
Switch ON the filling channelwise histograms.
std::vector< Int_t > fviHistoChargeArgs
std::vector< Int_t > fviHistoZLArgs
std::string fsChannelNameHistosConfig
TList * fParCList
Parameters management.
Bool_t fbIgnoreOverlapMs
Switch ON the filling of a minimal set of histograms.
CbmMcbm2018MonitorAlgoPsd * fMonitorAlgo
Processing algo.
Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
std::vector< Int_t > fviHistoAmplArgs
void SetZLHistoArgs(std::vector< Int_t > inVec)
void SetMonitorFitMode(Bool_t bFlagIn=kTRUE)
void SetMonitorMode(Bool_t bFlagIn=kTRUE)
void SetAmplHistoArgs(std::vector< Int_t > inVec)
void SetMonitorWfmMode(Bool_t bFlagIn=kTRUE)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetMonitorChanMode(Bool_t bFlagIn=kTRUE)
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetChargeHistoArgs(std::vector< Int_t > inVec)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.