CbmRoot
Loading...
Searching...
No Matches
CbmMqHistoServer.cxx
Go to the documentation of this file.
1/* Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer] */
4
5#include "CbmMqHistoServer.h"
6
8
9#include "FairMQProgOptions.h" // device->fConfig
10#include <Logger.h>
11
12#include "TCanvas.h"
13#include "TEnv.h"
14#include "TFile.h"
15#include "TH1.h"
16#include "TH2.h"
17#include "THttpServer.h"
18#include "TMessage.h"
19#include "TObjArray.h"
20#include "TProfile.h"
21#include "TRootSniffer.h"
22#include "TSystem.h"
23
24#include "BoostSerializer.h"
25#include <boost/serialization/utility.hpp>
26
27#include <mutex>
28
29#include "RootSerializer.h"
30
31std::mutex mtx;
32/*
33Bool_t bMqHistoServerResetHistos = kFALSE;
34Bool_t bMqHistoServerSaveHistos = kFALSE;
35*/
37 : FairMQDevice()
38 , fArrayHisto()
39{
40}
41
43
45{
47 LOG(info) << "Init options for CbmMqHistoServer.";
48 fsChannelNameHistosInput = fConfig->GetValue<std::string>("ChNameIn");
49 fsChannelNameHistosConfig = fConfig->GetValue<std::string>("ChNameHistCfg");
50 fsChannelNameCanvasConfig = fConfig->GetValue<std::string>("ChNameCanvCfg");
51 fsHistoFileName = fConfig->GetValue<std::string>("HistoFileName");
52 fuHttpServerPort = fConfig->GetValue<uint32_t>("histport");
53
60
61 fServer = new THttpServer(Form("http:%u", fuHttpServerPort));
63 fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
64 const char* jsrootsys = gSystem->Getenv("JSROOTSYS");
65 if (!jsrootsys) jsrootsys = gEnv->GetValue("HttpServ.JSRootPath", jsrootsys);
66
67 LOG(info) << "JSROOT location: " << jsrootsys;
68
69 //fServer->RegisterCommand("/Reset_Hist", "bMqHistoServerResetHistos=kTRUE");
70 //fServer->RegisterCommand("/Save_Hist", "bMqHistoServerSaveHistos=kTRUE");
71
72 //fServer->Restrict("/Reset_Hist", "allow=admin");
73 //fServer->Restrict("/Save_Hist", "allow=admin");
74}
75
76bool CbmMqHistoServer::ReceiveData(FairMQMessagePtr& msg, int /*index*/)
77{
78 LOG(debug) << "CbmMqHistoServer::ReceiveData => Processing histograms update";
79 TObject* tempObject = nullptr;
80
81 // Deserialize<RootSerializer>(*msg, tempObject);
82 RootSerializer().Deserialize(*msg, tempObject);
83
84 if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
85 std::lock_guard<std::mutex> lk(mtx);
86 TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
87 for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
88 TObject* pObj = arrayHisto->At(i);
89
90 if (nullptr != dynamic_cast<TProfile*>(pObj)) {
91 if (!ReadHistogram<TProfile>(dynamic_cast<TProfile*>(pObj))) return false;
92 } // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
93 else if (nullptr != dynamic_cast<TH2*>(pObj)) {
94 if (!ReadHistogram<TH2>(dynamic_cast<TH2*>(pObj))) return false;
95 } // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
96 else if (nullptr != dynamic_cast<TH1*>(pObj)) {
97 if (!ReadHistogram<TH1>(dynamic_cast<TH1*>(pObj))) return false;
98 } // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
99 else
100 LOG(warning) << "Unsupported object type for " << pObj->GetName();
101 } // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
102
103 LOG(debug) << "CbmMqHistoServer::ReceiveData => Deleting array";
105 arrayHisto->Delete();
106
109 if (!fbAllCanvasReady) {
110 LOG(debug) << "CbmMqHistoServer::ReceiveData => Checking for canvases updates";
111 for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
113 if (fvbCanvasReady[uCanv]) continue;
114
116 fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
117 } // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
118 } // if( !fbAllCanvasReady )
119 } // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
120 else
121 LOG(fatal) << "CbmMqHistoServer::ReceiveData => Wrong object type at input: " << tempObject->ClassName();
122
123 fNMessages += 1;
124
125 if (nullptr != tempObject) delete tempObject;
126 /*
129 if (bMqHistoServerResetHistos) {
130 std::lock_guard<std::mutex> lk(mtx);
131 // LOG(info) << "Reset Monitor histos ";
132 ResetHistograms();
133 bMqHistoServerResetHistos = kFALSE;
134 } // if( bMqHistoServerResetHistos )
135
136 if (bMqHistoServerSaveHistos) {
137 std::lock_guard<std::mutex> lk(mtx);
138 // LOG(info) << "Save All histos & canvases";
139 SaveHistograms();
140 bMqHistoServerSaveHistos = kFALSE;
141 } // if( bMqHistoServerSaveHistos )
142*/
143 LOG(debug) << "CbmMqHistoServer::ReceiveData => Finished processing histograms update";
144
145 return true;
146}
147
148bool CbmMqHistoServer::ReceiveHistoConfig(FairMQMessagePtr& msg, int /*index*/)
149{
150 std::pair<std::string, std::string> tempObject;
151
152 // Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
153 BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
154
155
156 LOG(info) << " Received configuration for histo " << tempObject.first << " : " << tempObject.second;
157
160 UInt_t uPrevHist = 0;
161 for (uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist) {
162 if (fvpsHistosFolder[uPrevHist].first == tempObject.first) break;
163 } // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
164
165 if (uPrevHist < fvpsHistosFolder.size()) {
166 LOG(info) << " Ignored new configuration for histo " << tempObject.first
167 << " due to previously received one: " << tempObject.second;
169 } // if( uPrevHist < fvpsHistosFolder.size() )
170 else {
171 fvpsHistosFolder.push_back(tempObject);
172 fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, ""));
173 fvbHistoRegistered.push_back(false);
174 fbAllHistosRegistered = false;
175 } // else of if( uPrevHist < fvpsHistosFolder.size() )
176
177 return true;
178}
179
180bool CbmMqHistoServer::ReceiveCanvasConfig(FairMQMessagePtr& msg, int /*index*/)
181{
182 std::pair<std::string, std::string> tempObject;
183
184 // Deserialize<BoostSerializer<std::pair<std::string, std::string>>>(*msg, tempObject);
185 BoostSerializer<std::pair<std::string, std::string>>().Deserialize(*msg, tempObject);
186
187 LOG(info) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
188
191 uint32_t uPrevCanv = 0;
192 for (uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv) {
193 if (fvpsCanvasConfig[uPrevCanv].first == tempObject.first) break;
194 } // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
195
196 if (uPrevCanv < fvpsCanvasConfig.size()) {
197 LOG(warning) << " Ignored new configuration for Canvas " << tempObject.first
198 << " due to previously received one: " << tempObject.second;
200 } // if( uPrevCanv < fvpsCanvasConfig.size() )
201 else {
202 fvpsCanvasConfig.push_back(tempObject);
203 fvbCanvasReady.push_back(false);
204 fbAllCanvasReady = false;
205
206 fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, ""));
207 fvbCanvasRegistered.push_back(false);
208 fbAllCanvasRegistered = false;
209 } // else of if( uPrevCanv < fvpsCanvasConfig.size() )
210 return true;
211}
212
213bool CbmMqHistoServer::ReceiveConfigAndData(FairMQParts& parts, int /*index*/)
214{
216 if (parts.Size() < 4) {
217 if (1 == parts.Size()) {
221 LOG(debug) << "CbmMqHistoServer::ReceiveConfigAndData => only 1 parts found in input, "
222 << "assuming data only message routed to wrong method!";
223 return ReceiveData(parts.At(0), 0);
224 } // if( 1 == parts.Size() )
225 LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Wrong number of parts: " << parts.Size()
226 << " instead of at least 4 (Header + Histo Config + Canvas config + Data)!";
227 } // if( parts.Size() < 4 )
228
229 LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received composed message with " << parts.Size() << " parts";
230
232 std::pair<uint32_t, uint32_t> pairHeader;
233 // Deserialize<BoostSerializer<std::pair<uint32_t, uint32_t>>>(*parts.At(0), pairHeader);
234 BoostSerializer<std::pair<uint32_t, uint32_t>>().Deserialize(*parts.At(0), pairHeader);
235
236 LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Received configuration for " << pairHeader.first
237 << " histos and " << pairHeader.second << " canvases";
238
239 uint32_t uOffsetHistoConfig = pairHeader.first;
240 if (0 == pairHeader.first) {
241 uOffsetHistoConfig = 1;
242 if (0 < (parts.At(uOffsetHistoConfig))->GetSize()) {
243 LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No histo config expected but corresponding message is"
244 << " not empty: " << (parts.At(uOffsetHistoConfig))->GetSize();
245 }
246 }
247
248 uint32_t uOffsetCanvasConfig = pairHeader.second;
249 if (0 == pairHeader.second) {
250 uOffsetCanvasConfig = 1;
251 if (0 < (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize()) {
252 LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => No Canvas config expected but corresponding message is"
253 << " not empty: " << (parts.At(uOffsetHistoConfig + uOffsetCanvasConfig))->GetSize();
254 }
255 }
256
257 if (static_cast<size_t>(parts.Size()) != 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1) {
258 LOG(fatal) << "CbmMqHistoServer::ReceiveConfigAndData => Number of parts not matching header: " << parts.Size()
259 << " instead of " << 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1;
260 } // if( parts.Size() != 1 + pairHeader.first + pairHeader.second )
261
263 for (uint32_t uHisto = 0; uHisto < pairHeader.first; ++uHisto) {
264 ReceiveHistoConfig(parts.At(1 + uHisto), 0);
265 } // for (UInt_t uHisto = 0; uHisto < pairHeader.first; ++uHisto)
266
268 for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) {
269 ReceiveCanvasConfig(parts.At(1 + uOffsetHistoConfig + uCanv), 0);
270 } // for (UInt_t uCanv = 0; uCanv < pairHeader.second; ++uCanv)
271
273 ReceiveData(parts.At(1 + uOffsetHistoConfig + uOffsetCanvasConfig), 0);
274
275 LOG(info) << "CbmMqHistoServer::ReceiveConfigAndData => Finished processing composed message with " << parts.Size()
276 << " parts";
277
278 return true;
279}
280
282{
283 fStopThread = false;
284 fThread = std::thread(&CbmMqHistoServer::UpdateHttpServer, this);
285}
286
288{
289 while (!fStopThread) {
290 std::this_thread::sleep_for(std::chrono::milliseconds(10));
291 std::lock_guard<std::mutex> lk(mtx);
292
293 fServer->ProcessRequests();
294 }
295}
296
298{
300 fStopThread = true;
301 fThread.join();
303}
304
305template<class HistoT>
307{
308 int index1 = FindHistogram(pHist->GetName());
309 if (-1 == index1) {
310 HistoT* histogram_new = static_cast<HistoT*>(pHist->Clone());
311 fArrayHisto.Add(histogram_new);
312
313 LOG(info) << "Received new histo " << pHist->GetName();
314
317 for (uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist) {
319 if (fvbHistoRegistered[uHist]) continue;
320
322 if (fvpsHistosFolder[uHist].first == histogram_new->GetName()) {
323 fvHistos[uHist] = std::pair<TNamed*, std::string>(histogram_new, fvpsHistosFolder[uHist].second);
324 fServer->Register(Form("/%s", fvHistos[uHist].second.data()), fvHistos[uHist].first);
325 fvbHistoRegistered[uHist] = true;
326
327 LOG(info) << "registered histo " << fvHistos[uHist].first->GetName() << " in folder "
328 << fvHistos[uHist].second;
329
330
333 for (uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx) {
334 if (!fvbHistoRegistered[uIdx]) {
335 fbAllHistosRegistered = false;
336 break;
337 } // if( !fvbHistoRegistered[ uIdx ] )
338 } // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
339
340 break;
341 } // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
342 } // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
343 } // if( !fbAllCanvasReady )
344 } // if (-1 == index1)
345 else {
346 HistoT* histogram_existing = dynamic_cast<HistoT*>(fArrayHisto.At(index1));
347 if (nullptr == histogram_existing) {
348 LOG(error) << "CbmMqHistoServer::ReadHistogram => "
349 << "Incompatible type found during update for histo " << pHist->GetName();
350 return false;
351 } // if( nullptr == histogram_existing )
352
353 histogram_existing->Add(pHist);
354 } // else of if (-1 == index1)
355 return true;
356}
357
358int CbmMqHistoServer::FindHistogram(const std::string& name)
359{
360 for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
361 TObject* obj = fArrayHisto.At(iHist);
362 if (TString(obj->GetName()).EqualTo(name)) { return iHist; } // if( TString( obj->GetName() ).EqualTo( name ) )
363 } // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
364 return -1;
365}
366
368{
369 for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
370 dynamic_cast<TH1*>(fArrayHisto.At(iHist))->Reset();
371 } // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
372 return true;
373}
374bool CbmMqHistoServer::PrepareCanvas(uint32_t uCanvIdx)
375{
376 LOG(debug) << " Extracting configuration for canvas index " << uCanvIdx;
378
380 uint32_t uNbPads(conf.GetNbPads());
381 for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
382 uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
383 for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
384 std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
386 if ("nullptr" != sName) {
387 if (FindHistogram(sName) < 0) {
388 return false;
389 } // if( FindHistogram( conf.GetObjName( uPadIdx, uObjIdx ) ) < 0 )
390 } // if( "nullptr" != sName )
391 } // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
392 } // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
393
394 LOG(info) << " All histos found for canvas " << conf.GetName().data() << ", now preparing it";
395
397 TCanvas* pNewCanv = new TCanvas(conf.GetName().data(), conf.GetTitle().data());
398 pNewCanv->Divide(conf.GetNbPadsX(), conf.GetNbPadsY());
399
401 for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
402 pNewCanv->cd(1 + uPadIdx);
403
405 gPad->SetGrid(conf.GetGridx(uPadIdx), conf.GetGridy(uPadIdx));
406 gPad->SetLogx(conf.GetLogx(uPadIdx));
407 gPad->SetLogy(conf.GetLogy(uPadIdx));
408 gPad->SetLogz(conf.GetLogz(uPadIdx));
409
411 uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
412 for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
413 std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
414 if ("nullptr" != sName) {
415 TObject* pObj = fArrayHisto[FindHistogram(sName)];
416
417 if (nullptr != dynamic_cast<TProfile*>(pObj)) {
418 dynamic_cast<TProfile*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
419 } // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
420 else if (nullptr != dynamic_cast<TH2*>(pObj)) {
421 dynamic_cast<TH2*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
422 } // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
423 else if (nullptr != dynamic_cast<TH1*>(pObj)) {
424 dynamic_cast<TH1*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
425 } // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
426 else
427 LOG(warning) << " Unsupported object type for " << sName << " when preparing canvas " << conf.GetName();
428
429 LOG(info) << " Configured histo " << sName << " on pad " << (1 + uPadIdx) << " for canvas "
430 << conf.GetName().data();
431 } // if( "nullptr" != sName )
432 } // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
433 } // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
434
435 fvCanvas[uCanvIdx] = std::pair<TCanvas*, std::string>(pNewCanv, "canvases");
436 fServer->Register(Form("/%s", fvCanvas[uCanvIdx].second.data()), fvCanvas[uCanvIdx].first);
437 fvbCanvasRegistered[uCanvIdx] = true;
438
439 LOG(info) << " Registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder "
440 << fvCanvas[uCanvIdx].second;
441
444 for (uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx) {
445 if (!fvbCanvasRegistered[uIdx]) {
446 fbAllCanvasRegistered = false;
447 break;
448 } // if( !fvbCanvasRegistered[ uIdx ] )
449 } // for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )
450
451 return true;
452}
453
455{
457 TFile* oldFile = gFile;
458 TDirectory* oldDir = gDirectory;
459
461 TFile* histoFile = nullptr;
462
463 // open separate histo file in recreate mode
464 histoFile = new TFile(fsHistoFileName.data(), "RECREATE");
465
466 LOG(info) << "Save Histos in file " << fsHistoFileName.data();
467
468 if (nullptr == histoFile) return false;
469
471 for (UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto) {
473 TString sFolder = fvHistos[uHisto].second.data();
474 if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
475 gDirectory->cd(sFolder);
476
478 fvHistos[uHisto].first->Write();
479
480 histoFile->cd();
481 } // for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )
482
483 for (UInt_t uCanvas = 0; uCanvas < fvCanvas.size(); ++uCanvas) {
485 TString sFolder = fvCanvas[uCanvas].second.data();
486 if (nullptr == gDirectory->Get(sFolder)) gDirectory->mkdir(sFolder);
487 gDirectory->cd(sFolder);
488
490 fvCanvas[uCanvas].first->Write();
491
492 histoFile->cd();
493 } // for( UInt_t uHisto = 0; uHisto < fvCanvas.size(); ++uHisto )
494
496 gFile = oldFile;
497 gDirectory = oldDir;
498
499 histoFile->Close();
500
501 return true;
502}
CanvasConfig ExtractCanvasConfigFromString(std::string sFullConfig)
Extraction.
std::mutex mtx
std::mutex mtx
bool first
uint32_t GetNbPadsY() const
bool GetLogz(uint32_t uPadIdx) const
bool GetGridy(uint32_t uPadIdx) const
bool GetLogy(uint32_t uPadIdx) const
std::string GetOption(uint32_t uPadIdx, uint32_t uObjIdx) const
std::string GetTitle() const
std::string GetName() const
accessors
bool GetLogx(uint32_t uPadIdx) const
uint32_t GetNbObjsInPad(uint32_t uPadIdx) const
bool GetGridx(uint32_t uPadIdx) const
accessors
uint32_t GetNbPadsX() const
std::string GetObjName(uint32_t uPadIdx, uint32_t uObjIdx) const
uint32_t GetNbPads() const
virtual void PostRun()
std::string fsChannelNameHistosInput
Parameters.
int fNMessages
Internal status.
bool PrepareCanvas(uint32_t uCanvIdx)
virtual void InitTask()
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string with ( HistoName, FolderPath ) to send to the histogram server.
std::string fsChannelNameHistosConfig
THttpServer * fServer
bool ReceiveData(FairMQMessagePtr &msg, int index)
std::vector< bool > fvbCanvasRegistered
Vector of Canvas pointers and folder path.
TObjArray fArrayHisto
Array of histograms with unique names.
int FindHistogram(const std::string &name)
std::vector< bool > fvbCanvasReady
virtual void PreRun()
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool ReceiveConfigAndData(FairMQParts &msg, int index)
bool ReceiveCanvasConfig(FairMQMessagePtr &msg, int index)
bool ReceiveHistoConfig(FairMQMessagePtr &msg, int index)
std::vector< bool > fvbHistoRegistered
Vector of Histos pointers and folder path.
std::string fsChannelNameCanvasConfig
bool ReadHistogram(HistoT *pHist)
std::vector< std::pair< TNamed *, std::string > > fvHistos
std::vector< std::pair< TCanvas *, std::string > > fvCanvas
std::string fsHistoFileName