CbmRoot
Loading...
Searching...
No Matches
CbmDeviceMcbmMonitorPulser.cxx
Go to the documentation of this file.
1/* Copyright (C) 2020 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
13
14#include "CbmMQDefs.h"
15
16#include "TimesliceMetaData.h"
17
18//#include "CbmMcbm2018MonitorAlgoTof.h"
19#include "CbmFlesCanvasTools.h"
20
21#include "StorableTimeslice.hpp"
22
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h" // device->fConfig
25#include "FairParGenericSet.h"
26
27#include "TCanvas.h"
28#include "TFile.h"
29#include "TH1.h"
30#include "TList.h"
31#include "TNamed.h"
32
33#include "BoostSerializer.h"
34#include <boost/archive/binary_iarchive.hpp>
35#include <boost/serialization/utility.hpp>
36
37#include <array>
38#include <iomanip>
39#include <stdexcept>
40#include <string>
41
42#include "RootSerializer.h"
43struct InitTaskError : std::runtime_error {
44 using std::runtime_error::runtime_error;
45};
46
47using namespace std;
48
49//Bool_t bMcbm2018MonitorTaskTofResetHistos = kFALSE;
50
52// : fMonitorAlgo{ new CbmMcbm2018MonitorAlgoTof() }
53{
54}
55
57try {
59 LOG(info) << "Init options for CbmMqStarHistoServer.";
60
61 fbDebugMonitorMode = fConfig->GetValue<bool>("DebugMoni");
62 fuHistoryHistoSize = fConfig->GetValue<uint32_t>("HistEvoSz");
63 fuMinTotPulser = fConfig->GetValue<uint32_t>("PulsTotMin");
64 fuMaxTotPulser = fConfig->GetValue<uint32_t>("PulsTotMax");
65
66 fuPublishFreqTs = fConfig->GetValue<uint32_t>("PubFreqTs");
67 fdMinPublishTime = fConfig->GetValue<double_t>("PubTimeMin");
68 fdMaxPublishTime = fConfig->GetValue<double_t>("PubTimeMax");
69 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
70 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
71 fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
72 fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
74
75 LOG(info) << "Histograms publication frequency in TS: " << fuPublishFreqTs;
76 LOG(info) << "Histograms publication min. interval in s: " << fdMinPublishTime;
77 LOG(info) << "Histograms publication max. interval in s: " << fdMaxPublishTime;
78
80 // fMonitorAlgo->UseAbsoluteTime();
81
82 // Get the information about created channels from the device
83 // Check if the defined channels from the topology (by name)
84 // are in the list of channels which are possible/allowed
85 // for the device
86 // The idea is to check at initilization if the devices are
87 // properly connected. For the time beeing this is done with a
88 // nameing convention. It is not avoided that someone sends other
89 // data on this channel.
90 //logger::SetLogLevel("INFO");
91
92 int noChannel = fChannels.size();
93 LOG(info) << "Number of defined channels: " << noChannel;
94 for (auto const& entry : fChannels) {
95 LOG(info) << "Channel name: " << entry.first;
96 if (std::string::npos != entry.first.find(fsChannelNameDataInput)) {
97 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
98 OnData(entry.first, &CbmDeviceMcbmMonitorPulser::HandleData);
99 } // if( std::string::npos != entry.first.find( fsChannelNameDataInput ) )
100 } // for( auto const &entry : fChannels )
102}
103catch (InitTaskError& e) {
104 LOG(error) << e.what();
105 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
107}
108
110{
111 for (auto const& entry : fsAllowedChannels) {
112 std::size_t pos1 = channelName.find(entry);
113 if (pos1 != std::string::npos) {
114 const vector<std::string>::const_iterator pos =
115 std::find(fsAllowedChannels.begin(), fsAllowedChannels.end(), entry);
116 const vector<std::string>::size_type idx = pos - fsAllowedChannels.begin();
117 LOG(info) << "Found " << entry << " in " << channelName;
118 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
119 return true;
120 } // if (pos1!=std::string::npos)
121 } // for(auto const &entry : fsAllowedChannels)
122 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
123 LOG(error) << "Stop device.";
124 return false;
125}
126
128{
129 LOG(info) << "Init parameter containers for CbmDeviceMcbmMonitorPulser.";
130 Bool_t initOK = kTRUE;
131 /*
132 fParCList = fMonitorAlgo->GetParList();
133
134 for( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ ) {
135 FairParGenericSet* tempObj = (FairParGenericSet*)( fParCList->At( iparC ) );
136 fParCList->Remove( tempObj );
137 std::string paramName{ tempObj->GetName() };
138 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
139 // Should only be used for small data because of the cost of an additional copy
140
141 // Her must come the proper Runid
142 std::string message = paramName + ",111";
143 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
144
145 FairMQMessagePtr req( NewSimpleMessage(message) );
146 FairMQMessagePtr rep( NewMessage() );
147
148 FairParGenericSet* newObj = nullptr;
149
150 if ( Send(req, "parameters") > 0 ) {
151 if ( Receive( rep, "parameters" ) >= 0) {
152 if ( rep->GetSize() != 0 ) {
153 CbmMqTMessage tmsg( rep->GetData(), rep->GetSize() );
154 newObj = static_cast< FairParGenericSet* >( tmsg.ReadObject( tmsg.GetClass() ) );
155 LOG( info ) << "Received unpack parameter from the server:";
156 newObj->print();
157 } else {
158 LOG( error ) << "Received empty reply. Parameter not available";
159 } // if (rep->GetSize() != 0)
160 } // if (Receive(rep, "parameters") >= 0)
161 } // if (Send(req, "parameters") > 0)
162 fParCList->AddAt( newObj, iparC );
163 delete tempObj;
164 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
165
167 fMonitorAlgo->SetIgnoreOverlapMs( fbIgnoreOverlapMs );
168 fMonitorAlgo->SetDebugMonitorMode( fbDebugMonitorMode );
169 fMonitorAlgo->SetIgnoreCriticalErrors( fbIgnoreCriticalErrors );
170 fMonitorAlgo->SetHistoryHistoSize( fuHistoryHistoSize );
171 fMonitorAlgo->SetPulserTotLimits( fuMinTotPulser, fuMaxTotPulser );
172
173 Bool_t initOK = fMonitorAlgo->InitContainers();
174*/
175 // Bool_t initOK = fMonitorAlgo->ReInitContainers();
176
177 // CreateHistos();
178 /*
181 initOK &= fMonitorAlgo->CreateHistograms();
182
184 std::vector< std::pair< TNamed *, std::string > > vHistos = fMonitorAlgo->GetHistoVector();
186 std::vector< std::pair< TCanvas *, std::string > > vCanvases = fMonitorAlgo->GetCanvasVector();
187
192 for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
193 {
194// LOG(info) << "Registering " << vHistos[ uHisto ].first->GetName()
195// << " in " << vHistos[ uHisto ].second.data()
196// ;
197 fArrayHisto.Add( vHistos[ uHisto ].first );
198 std::pair< std::string, std::string > psHistoConfig( vHistos[ uHisto ].first->GetName(),
199 vHistos[ uHisto ].second );
200 fvpsHistosFolder.push_back( psHistoConfig );
201
203 FairMQMessagePtr messageHist( NewMessage() );
204// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageHist, psHistoConfig );
205 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,psHistoConfig);
206
208 if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
209 {
210 LOG(error) << "Problem sending histo config";
211 return false;
212 } // if( Send( messageHist, fsChannelNameHistosConfig ) < 0 )
213
214 LOG(info) << "Config of hist " << psHistoConfig.first.data()
215 << " in folder " << psHistoConfig.second.data() ;
216 } // for( UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto )
217
221 for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
222 {
223// LOG(info) << "Registering " << vCanvases[ uCanv ].first->GetName()
224// << " in " << vCanvases[ uCanv ].second.data();
225 std::string sCanvName = (vCanvases[ uCanv ].first)->GetName();
226 std::string sCanvConf = GenerateCanvasConfigString( vCanvases[ uCanv ].first );
227
228 std::pair< std::string, std::string > psCanvConfig( sCanvName, sCanvConf );
229
230 fvpsCanvasConfig.push_back( psCanvConfig );
231
233 FairMQMessagePtr messageCan( NewMessage() );
234// Serialize< BoostSerializer < std::pair< std::string, std::string > > >( *messageCan, psCanvConfig );
235 BoostSerializer < std::pair< std::string, std::string > >().Serialize( *messageCan, psCanvConfig );
236
238 if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
239 {
240 LOG(error) << "Problem sending canvas config";
241 return false;
242 } // if( Send( messageCan, fsChannelNameCanvasConfig ) < 0 )
243
244 LOG(info) << "Config string of Canvas " << psCanvConfig.first.data()
245 << " is " << psCanvConfig.second.data() ;
246 } // for( UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv )
247*/
248 return initOK;
249}
250
251
252// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
253bool CbmDeviceMcbmMonitorPulser::HandleData(FairMQParts& parts, int /*index*/)
254{
256
257 LOG(debug) << "Received message " << fulNumMessages << " with " << parts.Size() << " parts"
258 << ", size0: " << parts.At(0)->GetSize();
259
260 uint32_t uPartIdx = 0;
261
263 /*
264 std::string msgStrTsMeta( static_cast< char * >( parts.At( uPartIdx )->GetData() ),
265 ( parts.At( uPartIdx ) )->GetSize() );
266 std::istringstream issTsMeta(msgStrTsMeta);
267 boost::archive::binary_iarchive inputArchiveTsMeta(issTsMeta);
268 inputArchiveTsMeta >> (*fTsMetaData);
269 ++uPartIdx;
270*/
271 // Deserialize<RootSerializer>(*parts.At(uPartIdx), fTsMetaData);
272 RootSerializer().Deserialize(*parts.At(uPartIdx), fTsMetaData);
273 ++uPartIdx;
274
275 std::string msgStrBmon(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
276 std::istringstream issBmon(msgStrBmon);
277 boost::archive::binary_iarchive inputArchiveBmon(issBmon);
278 inputArchiveBmon >> fvDigiBmon;
279 ++uPartIdx;
280
281 std::string msgStrSts(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
282 std::istringstream issSts(msgStrSts);
283 boost::archive::binary_iarchive inputArchiveSts(issSts);
284 inputArchiveSts >> fvDigiSts;
285 ++uPartIdx;
286
287 std::string msgStrMuch(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
288 std::istringstream issMuch(msgStrMuch);
289 boost::archive::binary_iarchive inputArchiveMuch(issMuch);
290 inputArchiveMuch >> fvDigiMuch;
291 ++uPartIdx;
292
293 std::string msgStrTrd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
294 std::istringstream issTrd(msgStrTrd);
295 boost::archive::binary_iarchive inputArchiveTrd(issTrd);
296 inputArchiveTrd >> fvDigiTrd;
297 ++uPartIdx;
298
299 std::string msgStrTof(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
300 std::istringstream issTof(msgStrTof);
301 boost::archive::binary_iarchive inputArchiveTof(issTof);
302 inputArchiveTof >> fvDigiTof;
303 ++uPartIdx;
304
305 std::string msgStrRich(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
306 std::istringstream issRich(msgStrRich);
307 boost::archive::binary_iarchive inputArchiveRich(issRich);
308 inputArchiveRich >> fvDigiRich;
309 ++uPartIdx;
310
311 std::string msgStrPsd(static_cast<char*>(parts.At(uPartIdx)->GetData()), (parts.At(uPartIdx))->GetSize());
312 std::istringstream issPsd(msgStrPsd);
313 boost::archive::binary_iarchive inputArchivePsd(issPsd);
314 inputArchivePsd >> fvDigiPsd;
315 ++uPartIdx;
316
318
320 delete fTsMetaData;
321 fvDigiBmon.clear();
322 fvDigiSts.clear();
323 fvDigiMuch.clear();
324 fvDigiTrd.clear();
325 fvDigiTof.clear();
326 fvDigiRich.clear();
327 fvDigiPsd.clear();
328
329 /*
330 LOG(debug) << "Received message number "<< fulNumMessages
331 << " with size " << msg->GetSize();
332
333 if( 0 == fulNumMessages % 10000 )
334 LOG(info) << "Received " << fulNumMessages << " messages";
335
336 std::string msgStr( static_cast<char*>( msg->GetData() ), msg->GetSize() );
337 std::istringstream iss( msgStr );
338 boost::archive::binary_iarchive inputArchive( iss );
339
341 fles::StorableTimeslice component{ 0 };
342 inputArchive >> component;
343
345 DoUnpack(component, 0);
346
350 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
351 std::chrono::duration<double_t> elapsedSeconds = currentTime - fLastPublishTime;
352 if( ( fdMaxPublishTime < elapsedSeconds.count() ) ||
353 ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
354 {
355 SendHistograms();
356 fLastPublishTime = std::chrono::system_clock::now();
357 } // if( ( fdMaxPublishTime < elapsedSeconds.count() ) || ( 0 == fulNumMessages % fuPublishFreqTs && fdMinPublishTime < elapsedSeconds.count() ) )
358*/
359 return true;
360}
361
363{
365 FairMQMessagePtr message(NewMessage());
366 // Serialize<RootSerializer>(*message, &fArrayHisto);
367 RootSerializer().Serialize(*message, &fArrayHisto);
368
369 // test code to check if deserialization works
370 /*
371 TObject* tempObject = nullptr;
372// Deserialize<RootDeserializer>(*message, tempObject);
373 RootDeserializer().Deserialize(*message, tempObject);
374
375 if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
376 TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
377 LOG(info) << "Array contains " << arrayHisto->GetEntriesFast()
378 << " entries";
379 for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
380 TObject* obj = arrayHisto->At(i);
381 LOG(info) << obj->GetName();
382 TH1* histogram = static_cast<TH1*>(obj);
383 LOG(info) << histogram->GetNbinsX();
384 }
385 }
386*/
387
389 if (Send(message, fsChannelNameHistosInput) < 0) {
390 LOG(error) << "Problem sending data";
391 return false;
392 } // if( Send( message, fsChannelNameHistosInput ) < 0 )
393
395 // fMonitorAlgo->ResetHistograms( kFALSE );
396
397 return true;
398}
399
400
402
std::vector< CbmStsDigi > fvDigiSts
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
std::vector< CbmRichDigi > fvDigiRich
uint64_t fulNumMessages
Parameters management.
std::vector< CbmTrdDigi > fvDigiTrd
std::vector< CbmTofDigi > fvDigiTof
std::vector< CbmMuchBeamTimeDigi > fvDigiMuch
bool IsChannelNameAllowed(std::string channelName)
std::string fsChannelNameDataInput
User settings parameters.
std::vector< CbmTofDigi > fvDigiBmon
Digis storage.
std::vector< CbmPsdDigi > fvDigiPsd
TObjArray fArrayHisto
Processing algo.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.