CbmRoot
Loading...
Searching...
No Matches
services/histserv/app/Application.cxx
Go to the documentation of this file.
1/* Copyright (C) 2023-2024 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer], Sergei Zharko */
4
5#include "Application.h"
6
10#include "TCanvas.h"
11#include "TEnv.h"
12#include "TFile.h"
13#include "TH1.h"
14#include "TH2.h"
15#include "THttpServer.h"
16#include "TMessage.h"
17#include "TObjArray.h"
18#include "TProfile.h"
19#include "TProfile2D.h"
20#include "TRootSniffer.h"
21#include "TSystem.h"
22
23#include <Logger.h>
24
25#include <boost/archive/binary_iarchive.hpp>
26#include <boost/iostreams/device/array.hpp>
27#ifdef BOOST_IOS_HAS_ZSTD
28#include <boost/iostreams/filter/zstd.hpp>
29#include <boost/iostreams/filtering_stream.hpp>
30#endif
31#include <boost/iostreams/stream.hpp>
32#include <boost/serialization/utility.hpp>
33#include <boost/serialization/vector.hpp>
34
35#include <mutex>
36#include <zmq_addon.hpp>
37
38#include <fmt/format.h>
39
40std::mutex mtx;
41
42namespace b_io = boost::iostreams;
43namespace b_ar = boost::archive;
44
46
52
53// ---------------------------------------------------------------------------------------------------------------------
54//
55Application::Application(ProgramOptions const& opt, volatile sig_atomic_t* signalStatus)
56 : fOpt(opt)
57 , fSignalStatus(signalStatus)
58{
60 LOG(info) << "Options for Application:";
61 LOG(info) << " Input ZMQ channel: " << fOpt.ComChan();
62 LOG(info) << " HTTP server port: " << fOpt.HttpPort();
63 if ("" != fOpt.HistoFile()) { //
64 LOG(info) << " Output filename: " << fOpt.HistoFile() << (fOpt.Overwrite() ? " (in overwrite mode)" : "");
65 }
66
70 fZmqSocket.set(zmq::sockopt::rcvhwm, fOpt.ComChanZmqRcvHwm()); // High-Water Mark, nb updates kept in buffer
71 fZmqSocket.set(zmq::sockopt::rcvtimeo, fOpt.ComChanZmqRcvTo()); // Timeout in ms to avoid stuck in loop!
72 fZmqSocket.bind(fOpt.ComChan().c_str()); // This side "binds" the socket => Other side should connect!!!!
73
74 fServer = new THttpServer(Form("http:%u", fOpt.HttpPort()));
76 fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
77 const char* jsrootsys = gSystem->Getenv("JSROOTSYS");
78 if (!jsrootsys) jsrootsys = gEnv->GetValue("HttpServ.JSRootPath", jsrootsys);
79
80 fUiCmdActor = std::make_unique<UiCmdActor>();
81 fServer->Register("/", fUiCmdActor.get());
82 fServer->Hide("/UiCmdActor");
83
84 if (!fOpt.HideGuiCommands()) {
85 fServer->RegisterCommand("/Reset_Hist", "/UiCmdActor/->SetResetHistos()");
86 fServer->RegisterCommand("/Save_Hist", "/UiCmdActor/->SetSaveHistos()");
87 fServer->RegisterCommand("/Stop_Server", "/UiCmdActor/->SetServerStop()");
88
89 /*
90 fServer->RegisterCommand("/Reset_Hist", "this->ResetHistograms()");
91 fServer->RegisterCommand("/Save_Hist", "this->SaveHistograms()");
92 */
93
94 fServer->Restrict("/Reset_Hist", "allow=admin");
95 fServer->Restrict("/Save_Hist", "allow=admin");
96 fServer->Restrict("/Stop_Server", "allow=admin");
97 }
98
99 LOG(info) << "JSROOT location: " << jsrootsys;
100}
101
102// ---------------------------------------------------------------------------------------------------------------------
103//
105{
106 fStopThread = false;
107 fThread = std::thread(&Application::UpdateHttpServer, this);
108 LOG(info) << "Listening to ZMQ messages ...";
109 while (!(fUiCmdActor->GetServerStop()) && *fSignalStatus == 0) {
110 try {
114 /* Jan suggestion with zmq_addon CPP interface */
115 std::vector<zmq::message_t> vMsg;
116 const auto ret = zmq::recv_multipart(fZmqSocket, std::back_inserter(vMsg));
117 if (!ret) continue;
118
119 std::lock_guard<std::mutex> lk(mtx);
120 if (*ret > 3) {
122 }
123 else if (*ret == 1) {
124 ReceiveData(vMsg[0]);
125 }
126 else {
127 LOG(error) << "Invalid number of message parts received: should be either 1 or more than 3 vs " << *ret;
128 }
129 }
130 catch (const zmq::error_t& err) {
131 if (err.num() != EINTR) {
132 throw;
133 }
134 }
135 }
136 // FIXME: SZh: Are the socket and the context finished properly?
137}
138
139// ---------------------------------------------------------------------------------------------------------------------
140//
142{
144 fStopThread = true;
145 fThread.join();
147}
148
149// ---------------------------------------------------------------------------------------------------------------------
150//
152{
154 while (!fStopThread) {
155 std::this_thread::sleep_for(std::chrono::milliseconds(10));
156 std::lock_guard<std::mutex> lk(mtx);
157
158 fServer->ProcessRequests();
159
162 if (fUiCmdActor->GetResetHistos()) {
163 LOG(info) << "Reset Monitor histos ";
165 fUiCmdActor->SetResetHistos(false);
166 } // if( fUiCmdActor->GetResetHistos() )
167
168 if (fUiCmdActor->GetSaveHistos()) {
169 LOG(info) << "Save All histos & canvases";
171 fUiCmdActor->SetSaveHistos(false);
172 } // if( fUiCmdActor->GetSaveHistos() )
173 }
174}
175
176// ---------------------------------------------------------------------------------------------------------------------
177//
178//template<class HistoSrc>
179//bool Appliaction::CollectHistograms(const std::forward_list<HistoSrc>& container)
180//{
181// for (const auto& hist : container) {
182// if (!hist.GetFlag(cbm::algo::qa::EHistFlag::OmitIntegrated)) {
183// if (!ReadHistogram<TH1>(hist)) {
184// return false;
185// }
186// }
187// if (hist.GetFlag(cbm::algo::qa::EHistFlag::StoreVsTsId)) {
188// if constexpr (std::is_same_v<HistSrc, H1D> || std::is_same_v<HistSrc, Prof1D>) {
189// if (!ReadHistogramExtendedTsId(hist, timesliceId)) {
190// return false;
191// }
192// }
193// else {
194// LOG(warn) << "Histogram " << rHist.GetName() << " cannot be plotted vs. TS index. Ignoring";
195// }
196// }
197// }
198// return true;
199//}
200
201// ---------------------------------------------------------------------------------------------------------------------
202//
203bool Application::ReceiveData(zmq::message_t& msg)
204{
205 LOG(debug) << "Application::ReceiveData => Processing histograms update";
206
210 b_io::basic_array_source<char> device(static_cast<char*>(msg.data()), msg.size());
211 b_io::stream<b_io::basic_array_source<char>> s(device);
212
214 if (fOpt.CompressedInput()) {
215#ifdef BOOST_IOS_HAS_ZSTD
216 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
217 in_->push(b_io::zstd_decompressor());
218 in_->push(s);
219 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
220 *iarchive_ >> vHist;
221#else
222 throw std::runtime_error("Unsupported ZSTD decompression (boost) for histograms input channel");
223#endif
224 }
225 else {
226 b_ar::binary_iarchive iarch(s);
227 iarch >> vHist;
228 }
229
230
235 auto timesliceId = vHist.fTimesliceId;
236
237 // Collects 1D-histograms (with possible extention vs. TS)
238 auto CollectHistogram1D = [&](const auto& container) -> bool {
239 for (const auto& hist : container) {
240 if (!hist.GetFlag(cbm::algo::qa::EHistFlag::OmitIntegrated)) {
241 if (!ReadHistogram<TH1>(hist)) {
242 return false;
243 }
244 }
245 if (hist.GetFlag(cbm::algo::qa::EHistFlag::StoreVsTsId)) {
246 if (!ReadHistogramExtendedTsId(hist, timesliceId)) {
247 return false;
248 }
249 }
250 }
251 return true;
252 };
253
254 // Collects 2D-histograms
255 // TODO: Add a possibility to store multiple histograms vs each time-slice depending on the StoreVsTsId flag
256 // NOTE: TProfile2D can be extended to TProfile3D, and TH2D can be extended to TH3D. Should we provide such a possibility,
257 // or it does not make sence? (SZh)
258 auto CollectHistogram2D = [&](const auto& container) -> bool {
259 for (const auto& hist : container) {
260 if (!hist.GetFlag(cbm::algo::qa::EHistFlag::OmitIntegrated)) {
261 if (!ReadHistogram<TH1>(hist)) {
262 return false;
263 }
264 }
265 }
266 return true;
267 };
268
269 if (!CollectHistogram1D(vHist.fvH1)) return false;
270 if (!CollectHistogram2D(vHist.fvH2)) return false;
271 if (!CollectHistogram1D(vHist.fvP1)) return false;
272 if (!CollectHistogram2D(vHist.fvP2)) return false;
273
274
277 if (!fbAllCanvasReady) {
278 LOG(debug) << "Application::ReceiveData => Checking for canvases updates";
279 for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
281 if (fvbCanvasReady[uCanv]) { //
282 continue;
283 }
284
286 fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
287 } // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
288 } // if( !fbAllCanvasReady )
289
290 /*
291 TObject* tempObject = nullptr;
292 if (TString(tempObject->ClassName()).EqualTo("TObjArray")) {
293 std::lock_guard<std::mutex> lk(mtx);
294 TObjArray* arrayHisto = static_cast<TObjArray*>(tempObject);
295 for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++) {
296 TObject* pObj = arrayHisto->At(i);
297
298 if (nullptr != dynamic_cast<TProfile*>(pObj)) {
299 if (!ReadHistogram<TProfile>(dynamic_cast<TProfile*>(pObj))) { //
300 return false;
301 }
302 } // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
303 else if (nullptr != dynamic_cast<TH2*>(pObj)) {
304 if (!ReadHistogram<TH2>(dynamic_cast<TH2*>(pObj))) { //
305 return false;
306 }
307 } // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
308 else if (nullptr != dynamic_cast<TH1*>(pObj)) {
309 if (!ReadHistogram<TH1>(dynamic_cast<TH1*>(pObj))) { //
310 return false;
311 }
312 } // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
313 else
314 LOG(warning) << "Unsupported object type for " << pObj->GetName();
315 } // for (Int_t i = 0; i < arrayHisto->GetEntriesFast(); i++)
316
317 LOG(debug) << "Application::ReceiveData => Deleting array";
319 arrayHisto->Delete();
320
323 if (!fbAllCanvasReady) {
324 LOG(debug) << "Application::ReceiveData => Checking for canvases updates";
325 for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
327 if (fvbCanvasReady[uCanv]) { //
328 continue;
329 }
330
332 fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
333 } // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
334 } // if( !fbAllCanvasReady )
335 } // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
336 else {
337 fStopThread = true;
338 std::string err_msg = "Application::ReceiveData => Wrong object type at input: ";
339 err_msg += tempObject->ClassName();
340 throw std::runtime_error(err_msg);
341 }
342
343 if (nullptr != tempObject) delete tempObject;
344 */
345
346 fNMessages += 1;
347
348 LOG(debug) << "Application::ReceiveData => Finished processing histograms update";
349
351 if (0 == fNMessages % 100) {
352 LOG(info) << "HistServ::Application::ReceiveData => Finished processing histograms update #" << fNMessages
353 << ", still alive!";
354 }
355
356 return true;
357}
358
359// ---------------------------------------------------------------------------------------------------------------------
360//
361bool Application::ReceiveHistoConfig(zmq::message_t& msg)
362{
366 // BoostSerializer<std::pair<std::string, std::string>>().Deserialize(msg, tempObject);
367 b_io::basic_array_source<char> device(static_cast<char*>(msg.data()), msg.size());
368 b_io::stream<b_io::basic_array_source<char>> s(device);
369
370 std::pair<std::string, std::string> tempObject("", "");
371 if (fOpt.CompressedInput()) {
372#ifdef BOOST_IOS_HAS_ZSTD
373 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
374 in_->push(b_io::zstd_decompressor());
375 in_->push(s);
376 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
377 *iarchive_ >> tempObject;
378#else
379 throw std::runtime_error("Unsupported ZSTD decompression (boost) for histograms config input channel");
380#endif
381 }
382 else {
383 b_ar::binary_iarchive iarch(s);
384 iarch >> tempObject;
385 }
386
387 // Parse metadata
388 std::string& name = tempObject.first;
389 std::string metadataMsg{};
390 std::tie(name, metadataMsg) = HistogramMetadata::SeparateNameAndMetadata(name);
391 auto metadata = HistogramMetadata(metadataMsg);
392
393 if (!metadata.GetFlag(EHistFlag::OmitIntegrated)) {
394 // Main (integrated over time) histogram
395 this->RegisterHistoConfig(tempObject);
396 }
397 if (metadata.GetFlag(EHistFlag::StoreVsTsId)) {
398 // Histogram vs. TS id
399 this->RegisterHistoConfig(std::make_pair(name + std::string(HistogramMetadata::ksTsIdSuffix), tempObject.second));
400 }
401
402 return true;
403}
404
405// ---------------------------------------------------------------------------------------------------------------------
406//
407bool Application::ReceiveCanvasConfig(zmq::message_t& msg)
408{
410 // BoostSerializer<std::pair<std::string, std::string>>().Deserialize(msg, tempObject);
411 b_io::basic_array_source<char> device(static_cast<char*>(msg.data()), msg.size());
412 b_io::stream<b_io::basic_array_source<char>> s(device);
413
414 std::pair<std::string, std::string> tempObject("", "");
415 if (fOpt.CompressedInput()) {
416#ifdef BOOST_IOS_HAS_ZSTD
417 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
418 in_->push(b_io::zstd_decompressor());
419 in_->push(s);
420 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
421 *iarchive_ >> tempObject;
422#else
423 throw std::runtime_error("Unsupported ZSTD decompression (boost) for canvas config input channel");
424#endif
425 }
426 else {
427 b_ar::binary_iarchive iarch(s);
428 iarch >> tempObject;
429 }
430
431 LOG(debug) << " Received configuration for canvas " << tempObject.first << " : " << tempObject.second;
432
435 uint32_t uPrevCanv = 0;
436 for (uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv) {
437 if (fvpsCanvasConfig[uPrevCanv].first == tempObject.first) { //
438 break;
439 }
440 } // for( UInt_t uPrevCanv = 0; uPrevCanv < fvpsCanvasConfig.size(); ++uPrevCanv )
441
442 if (uPrevCanv < fvpsCanvasConfig.size()) {
443 LOG(debug) << " Ignored new configuration for Canvas " << tempObject.first
444 << " due to previously received one: " << tempObject.second;
446 } // if( uPrevCanv < fvpsCanvasConfig.size() )
447 else {
448 fvpsCanvasConfig.push_back(tempObject);
449 fvbCanvasReady.push_back(false);
450 fbAllCanvasReady = false;
451
452 fvCanvas.push_back(std::pair<TCanvas*, std::string>(nullptr, ""));
453 fvbCanvasRegistered.push_back(false);
454 fbAllCanvasRegistered = false;
455 LOG(info) << " Stored configuration for canvas " << tempObject.first << " : " << tempObject.second;
456 } // else of if( uPrevCanv < fvpsCanvasConfig.size() )
457 return true;
458}
459
460// ---------------------------------------------------------------------------------------------------------------------
461//
462bool Application::ReceiveConfigAndData(std::vector<zmq::message_t>& vMsg)
463{
465 LOG(debug) << "Application::ReceiveConfigAndData => Received composed message with " << vMsg.size() << " parts";
466
469 // BoostSerializer<std::pair<uint32_t, uint32_t>>().Deserialize(vMsg.at(0), pairHeader);
470 b_io::basic_array_source<char> device_header(static_cast<char*>(vMsg.at(0).data()), vMsg.at(0).size());
471 b_io::stream<b_io::basic_array_source<char>> s_header(device_header);
472
473 std::pair<uint32_t, uint32_t> pairHeader;
474 if (fOpt.CompressedInput()) {
475#ifdef BOOST_IOS_HAS_ZSTD
476 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
477 in_->push(b_io::zstd_decompressor());
478 in_->push(s_header);
479 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
480 *iarchive_ >> pairHeader;
481#else
482 throw std::runtime_error("Unsupported ZSTD decompression (boost) for Config + Histos input channel");
483#endif
484 }
485 else {
486 b_ar::binary_iarchive iarch_header(s_header);
487 iarch_header >> pairHeader;
488 }
489
490 LOG(debug) << "Application::ReceiveConfigAndData => Received configuration for " << pairHeader.first << " histos and "
491 << pairHeader.second << " canvases";
492
493 uint32_t uOffsetHistoConfig = pairHeader.first;
494 if (0 == pairHeader.first) {
495 uOffsetHistoConfig = 1;
496 if (0 < vMsg[uOffsetHistoConfig].size()) {
497 fStopThread = true;
498 fUiCmdActor->SetServerStop();
499 std::string err_msg = "Application::ReceiveConfigAndData => No histo config expected but corresponding message";
500 err_msg += " is not empty: ";
501 err_msg += vMsg[uOffsetHistoConfig].size();
502 throw std::runtime_error(err_msg);
503 }
504 }
505
506 uint32_t uOffsetCanvasConfig = pairHeader.second;
507 if (0 == pairHeader.second) {
508 uOffsetCanvasConfig = 1;
509 if (0 < vMsg[uOffsetHistoConfig + uOffsetCanvasConfig].size()) {
510 fStopThread = true;
511 fUiCmdActor->SetServerStop();
512 std::string err_msg = "Application::ReceiveConfigAndData => No Canvas config expected but corresponding ";
513 err_msg += " message is not empty: ";
514 err_msg += vMsg[uOffsetHistoConfig + uOffsetCanvasConfig].size();
515 throw std::runtime_error(err_msg);
516 }
517 }
518
519 if ((1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1) != vMsg.size()) {
520 fStopThread = true;
521 fUiCmdActor->SetServerStop();
522 std::string err_msg = "Application::ReceiveConfigAndData => Nb parts in message not matching configs numbers ";
523 err_msg += " declared in header";
524 err_msg += vMsg.size();
525 err_msg += " VS ";
526 err_msg += 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1;
527 throw std::runtime_error(err_msg);
528 }
529
531 for (uint32_t uHisto = 0; uHisto < pairHeader.first; ++uHisto) {
532 ReceiveHistoConfig(vMsg[1 + uHisto]);
533 } // for (UInt_t uHisto = 0; uHisto < pairHeader.first; ++uHisto)
534 LOG(debug) << "Application::ReceiveConfigAndData => Processed configuration for " << pairHeader.first << " histos";
535
537 for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) {
538 ReceiveCanvasConfig(vMsg[1 + uOffsetHistoConfig + uCanv]);
539 } // for (UInt_t uCanv = 0; uCanv < pairHeader.second; ++uCanv)
540 LOG(debug) << "Application::ReceiveConfigAndData => Processed configuration for " << pairHeader.second << " canvases";
541
543 ReceiveData(vMsg[1 + uOffsetHistoConfig + uOffsetCanvasConfig]);
544
545 return true;
546}
547
548// ---------------------------------------------------------------------------------------------------------------------
549//
550template<class HistoDst, class HistoSrc>
551bool Application::ReadHistogram(const HistoSrc& rHist)
552{
553 HistoDst* pHist = cbm::qa::OnlineInterface::ROOTHistogram(rHist);
554 int index1 = FindHistogram(pHist->GetName());
555 if (-1 == index1) {
556 // ----- Creating new histogram
557 HistoDst* histogram_new = static_cast<HistoDst*>(pHist->Clone());
558 fArrayHisto.Add(histogram_new);
559
560 LOG(info) << "Received new histo " << pHist->GetName();
561
564 for (uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist) {
566 if (fvbHistoRegistered[uHist]) { //
567 continue;
568 }
569
571 if (fvpsHistosFolder[uHist].first == histogram_new->GetName()) {
572 fvHistos[uHist] = std::pair<TNamed*, std::string>(histogram_new, fvpsHistosFolder[uHist].second);
573 fServer->Register(Form("/%s", fvHistos[uHist].second.data()), fvHistos[uHist].first);
574 fvbHistoRegistered[uHist] = true;
575
576 LOG(info) << "registered histo " << fvHistos[uHist].first->GetName() << " in folder "
577 << fvHistos[uHist].second;
578
579
582 for (uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx) {
583 if (!fvbHistoRegistered[uIdx]) {
584 fbAllHistosRegistered = false;
585 break;
586 } // if( !fvbHistoRegistered[ uIdx ] )
587 } // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
588
589 break;
590 } // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
591 } // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
592 } // if( !fbAllCanvasReady )
593 } // if (-1 == index1)
594 else {
595 // ----- Update histogram
596 LOG(debug) << "Received update for: " << pHist->GetName();
597 HistoDst* histogram_existing = dynamic_cast<HistoDst*>(fArrayHisto.At(index1));
598 if (nullptr == histogram_existing) {
599 LOG(error) << "CbmMqHistoServer::ReadHistogram => "
600 << "Incompatible type found during update for histo " << pHist->GetName();
601 delete pHist;
602 return false;
603 } // if( nullptr == histogram_existing )
604
605 histogram_existing->Add(pHist);
606 } // else of if (-1 == index1)
607 delete pHist;
608 return true;
609}
610
611// ---------------------------------------------------------------------------------------------------------------------
612//
613template<class HistoSrc>
614bool Application::ReadHistogramExtendedTsId(const HistoSrc& rHist, uint64_t tsIndex)
615{
616 constexpr bool IsSrcH1D = std::is_same_v<HistoSrc, H1D>;
617 constexpr bool IsSrcProf1D = std::is_same_v<HistoSrc, Prof1D>;
618
619 if constexpr (IsSrcH1D || IsSrcProf1D) {
620 /* clang-format off */
621 using HistoDst_t = typename std::conditional<IsSrcH1D, TH2D, TProfile2D>::type;
622 /* clang-format on */
623
624 std::string sHistoName = rHist.GetName() + std::string(HistogramMetadata::ksTsIdSuffix);
625 int index1 = FindHistogram(sHistoName.c_str());
626 if (-1 == index1) {
627 // ----- Creating new histogram
628 HistoDst_t* histogram_new = nullptr;
629 {
630 std::string title = rHist.GetTitle();
631 title.insert(title.find_first_of(';'), " vs. TS index;TS index");
632 int nBinsX = fConfig.fNofTsToStore;
633 double minX = static_cast<double>(tsIndex) - 0.5;
634 double maxX = static_cast<double>(tsIndex + nBinsX) - 0.5;
635 int nBinsY = rHist.GetNbinsX();
636 double minY = rHist.GetMinX();
637 double maxY = rHist.GetMaxX();
638 if constexpr (IsSrcH1D) {
639 histogram_new = new TH2D(sHistoName.c_str(), title.c_str(), nBinsX, minX, maxX, nBinsY, minY, maxY);
640 }
641 else if constexpr (IsSrcProf1D) {
642 double minZ = rHist.GetMinY();
643 double maxZ = rHist.GetMaxY();
644 histogram_new =
645 new TProfile2D(sHistoName.c_str(), title.c_str(), nBinsX, minX, maxX, nBinsY, minY, maxY, minZ, maxZ);
646 histogram_new->Sumw2();
647 }
648 }
649 cbm::qa::OnlineInterface::AddSlice(rHist, double(tsIndex), histogram_new);
650 fArrayHisto.Add(histogram_new);
651
652 LOG(info) << "Received new histo " << sHistoName;
653
656 for (uint32_t uHist = 0; uHist < fvpsHistosFolder.size(); ++uHist) {
658 if (fvbHistoRegistered[uHist]) { //
659 continue;
660 }
661
663 if (fvpsHistosFolder[uHist].first == histogram_new->GetName()) {
664 fvHistos[uHist] = std::pair<TNamed*, std::string>(histogram_new, fvpsHistosFolder[uHist].second);
665 fServer->Register(Form("/%s", fvHistos[uHist].second.data()), fvHistos[uHist].first);
666 fvbHistoRegistered[uHist] = true;
667
668 LOG(info) << "registered histo " << fvHistos[uHist].first->GetName() << " in folder "
669 << fvHistos[uHist].second;
670
671
674 for (uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx) {
675 if (!fvbHistoRegistered[uIdx]) {
676 fbAllHistosRegistered = false;
677 break;
678 } // if( !fvbHistoRegistered[ uIdx ] )
679 } // for( uint32_t uIdx = 0; uIdx < fvbHistoRegistered.size(); ++uIdx )
680
681 break;
682 } // if( fvpsHistosFolder[ uHist ].first == histogram_new->GetName() )
683 } // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
684 } // if( !fbAllCanvasReady )
685 } // if (-1 == index1)
686 else {
687 // ----- Update histogram
688 LOG(debug) << "Received update for: " << sHistoName;
689 HistoDst_t* histogram_existing = dynamic_cast<HistoDst_t*>(fArrayHisto.At(index1));
690 if (nullptr == histogram_existing) {
691 LOG(error) << "CbmMqHistoServer::ReadHistogram => "
692 << "Incompatible type found during update for histo " << sHistoName;
693 return false;
694 } // if( nullptr == histogram_existing )
695 cbm::qa::OnlineInterface::AddSlice(rHist, double(tsIndex), histogram_existing);
696 } // else of if (-1 == index1)
697 }
698 else {
699 LOG(warn) << "Histogram " << rHist.GetName() << " cannot be plotted vs. TS index. Ignoring";
700 }
701
702 return true;
703}
704
705// ---------------------------------------------------------------------------------------------------------------------
706//
707bool Application::RegisterHistoConfig(const std::pair<std::string, std::string>& config)
708{
709 LOG(debug) << " Received configuration for histo " << config.first << " : " << config.second;
710
713 UInt_t uPrevHist = 0;
714 for (uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist) {
715 if (fvpsHistosFolder[uPrevHist].first == config.first) { //
716 break;
717 }
718 } // for( UInt_t uPrevHist = 0; uPrevHist < fvpsHistosFolder.size(); ++uPrevHist )
719
720 if (uPrevHist < fvpsHistosFolder.size()) {
721 LOG(debug) << " Ignored new configuration for histo " << config.first
722 << " due to previously received one: " << config.second;
724 } // if( uPrevHist < fvpsHistosFolder.size() )
725 else {
726 fvpsHistosFolder.push_back(config);
727 fvHistos.push_back(std::pair<TNamed*, std::string>(nullptr, ""));
728 fvbHistoRegistered.push_back(false);
729 fbAllHistosRegistered = false;
730 LOG(info) << " Stored configuration for histo " << config.first << " : " << config.second;
731 } // else of if( uPrevHist < fvpsHistosFolder.size() )
732
733 return true;
734}
735
736// ---------------------------------------------------------------------------------------------------------------------
737//
738int Application::FindHistogram(const std::string& name)
739{
740 for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
741 TObject* obj = fArrayHisto.At(iHist);
742 if (TString(obj->GetName()).EqualTo(name)) { //
743 return iHist;
744 }
745 } // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
746 return -1;
747}
748
749// ---------------------------------------------------------------------------------------------------------------------
750//
751bool Application::PrepareCanvas(uint32_t uCanvIdx)
752{
754 LOG(info) << " Extracting configuration for canvas index " << uCanvIdx << "(name: " << conf.GetName().data() << ")";
755
757 uint32_t uNbPads(conf.GetNbPads());
758 for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
759 uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
760 for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
761 std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
763 if ("nullptr" != sName) {
764 if (FindHistogram(sName) < 0) {
765 LOG(warn) << "Histogram \"" << sName << "\" requested by canvas \"" << conf.GetName().data()
766 << "\" was not found";
767 return false;
768 } // if( FindHistogram( conf.GetObjName( uPadIdx, uObjIdx ) ) < 0 )
769 } // if( "nullptr" != sName )
770 } // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
771 } // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
772
773 LOG(info) << " All histos found for canvas " << conf.GetName().data() << ", now preparing it";
774
775 // Temporary solution to save canvases into directories
776 std::string sNameFull = conf.GetName();
777 size_t lastSlashPos = sNameFull.find_last_of('/');
778 std::string sNamePart = lastSlashPos > sNameFull.size() ? sNameFull : sNameFull.substr(lastSlashPos + 1);
779 std::string sDir = lastSlashPos > sNameFull.size() ? "" : sNameFull.substr(0, lastSlashPos);
780 std::string canvDir = sDir.empty() ? "canvases" : fmt::format("canvases/{}", sDir);
781
783 TCanvas* pNewCanv = new TCanvas(sNamePart.c_str(), conf.GetTitle().data());
784 pNewCanv->Divide(conf.GetNbPadsX(), conf.GetNbPadsY());
785
787 for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
788 pNewCanv->cd(1 + uPadIdx);
789
791 gPad->SetGrid(conf.GetGridx(uPadIdx), conf.GetGridy(uPadIdx));
792 gPad->SetLogx(conf.GetLogx(uPadIdx));
793 gPad->SetLogy(conf.GetLogy(uPadIdx));
794 gPad->SetLogz(conf.GetLogz(uPadIdx));
795
797 uint32_t uNbObj(conf.GetNbObjsInPad(uPadIdx));
798 for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
799 std::string sName(conf.GetObjName(uPadIdx, uObjIdx));
800 if ("nullptr" != sName) {
801 TObject* pObj = fArrayHisto[FindHistogram(sName)];
802
803 if (nullptr != dynamic_cast<TProfile2D*>(pObj)) {
804 dynamic_cast<TProfile2D*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
805 } // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
806 else if (nullptr != dynamic_cast<TProfile*>(pObj)) {
807 dynamic_cast<TProfile*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
808 } // if( nullptr != dynamic_cast< TProfile *>( pObj ) )
809 else if (nullptr != dynamic_cast<TH2*>(pObj)) {
810 dynamic_cast<TH2*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
811 } // if( nullptr != dynamic_cast< TH2 *>( pObj ) )
812 else if (nullptr != dynamic_cast<TH1*>(pObj)) {
813 dynamic_cast<TH1*>(pObj)->Draw(conf.GetOption(uPadIdx, uObjIdx).data());
814 } // if( nullptr != dynamic_cast< TH1 *>( pObj ) )
815 else
816 LOG(warning) << " Unsupported object type for " << sName << " when preparing canvas " << conf.GetName();
817
818 LOG(info) << " Configured histo " << sName << " on pad " << (1 + uPadIdx) << " for canvas "
819 << conf.GetName().data();
820 } // if( "nullptr" != sName )
821 } // for( uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx )
822 } // for( uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx )
823
824 fvCanvas[uCanvIdx] = std::pair<TCanvas*, std::string>(pNewCanv, canvDir);
825 fServer->Register(Form("/%s", fvCanvas[uCanvIdx].second.data()), fvCanvas[uCanvIdx].first);
826 fvbCanvasRegistered[uCanvIdx] = true;
827
828 LOG(info) << " Registered canvas " << fvCanvas[uCanvIdx].first->GetName() << " in folder "
829 << fvCanvas[uCanvIdx].second;
830
833 for (uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx) {
834 if (!fvbCanvasRegistered[uIdx]) {
835 fbAllCanvasRegistered = false;
836 break;
837 } // if( !fvbCanvasRegistered[ uIdx ] )
838 } // for( uint32_t uIdx = 0; uIdx < fvbCanvasRegistered.size(); ++uIdx )
839
840 return true;
841}
842
843// ---------------------------------------------------------------------------------------------------------------------
844//
846{
847 for (int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist) {
848 dynamic_cast<TH1*>(fArrayHisto.At(iHist))->Reset();
849 } // for( int iHist = 0; iHist < fArrayHisto.GetEntriesFast(); ++iHist )
850 return true;
851}
852
853// ---------------------------------------------------------------------------------------------------------------------
854//
856{
857 if ("" == fOpt.HistoFile()) { //
858 LOG(error) << "Filename for saving histograms and canvases not defined. Ignoring request.";
859 return false;
860 }
861
863 TFile* oldFile = gFile;
864 TDirectory* oldDir = gDirectory;
865
867 TFile* histoFile = nullptr;
868
869 // open separate histo file in recreate mode
870 histoFile = new TFile(fOpt.HistoFile().data(), fOpt.Overwrite() ? "RECREATE" : "CREATE");
871
872 if (nullptr == histoFile) { //
873 gFile = oldFile;
874 gDirectory = oldDir;
875 LOG(error) << "Ignoring request to save histograms and canvases: could not open output file " << fOpt.HistoFile();
876 return false;
877 }
878
879 LOG(info) << "Saving Histograms and canvases in file: " << fOpt.HistoFile();
880
882 for (UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto) {
884 if (nullptr != fvHistos[uHisto].first) {
886 TString sFolder = fvHistos[uHisto].second.data();
887 if (nullptr == gDirectory->Get(sFolder)) { //
888 gDirectory->mkdir(sFolder);
889 }
890 gDirectory->cd(sFolder);
891
893 fvHistos[uHisto].first->Write();
894 }
895
896 histoFile->cd();
897 } // for( UInt_t uHisto = 0; uHisto < fvHistos.size(); ++uHisto )
898
899 for (UInt_t uCanvas = 0; uCanvas < fvCanvas.size(); ++uCanvas) {
901 if (nullptr != fvCanvas[uCanvas].first) {
903 TString sFolder = fvCanvas[uCanvas].second.data();
904 if (nullptr == gDirectory->Get(sFolder)) { //
905 gDirectory->mkdir(sFolder);
906 }
907 gDirectory->cd(sFolder);
908
910 fvCanvas[uCanvas].first->Write();
911 }
912
913 histoFile->cd();
914 } // for( UInt_t uHisto = 0; uHisto < fvCanvas.size(); ++uHisto )
915
917 gFile = oldFile;
918 gDirectory = oldDir;
919
920 histoFile->Close();
921
922 return true;
923}
CanvasConfig ExtractCanvasConfigFromString(std::string sFullConfig)
Extraction.
std::mutex mtx
Set of tools for online->ROOT QA-objects conversions (header)
A histogram container for the histogram server (header)
static constexpr size_t size()
Definition KfSimdPseudo.h:2
bool first
Application(ProgramOptions const &opt)
Standard constructor, initialize the application.
uint32_t GetNbPadsY() const
bool GetLogz(uint32_t uPadIdx) const
bool GetGridy(uint32_t uPadIdx) const
bool GetLogy(uint32_t uPadIdx) const
std::string GetOption(uint32_t uPadIdx, uint32_t uObjIdx) const
std::string GetTitle() const
std::string GetName() const
accessors
bool GetLogx(uint32_t uPadIdx) const
uint32_t GetNbObjsInPad(uint32_t uPadIdx) const
bool GetGridx(uint32_t uPadIdx) const
accessors
uint32_t GetNbPadsX() const
std::string GetObjName(uint32_t uPadIdx, uint32_t uObjIdx) const
uint32_t GetNbPads() const
1D-histogram
2D-histogram
Metadata of the histogram.
Definition Histogram.h:64
static void AddSlice(const H1D &src, double value, TH2D *dst)
Fills a slice of a histogram of a higher dimension for a given value (....)
static TH1D * ROOTHistogram(const H1D &hist)
Converts histogram H1D to ROOT histogram TH1D.
bool ReadHistogram(const HistoSrc &rHist)
Read a histogram.
bool ReceiveCanvasConfig(zmq::message_t &msg)
Receives canvas configuration.
int FindHistogram(const std::string &name)
Collects histograms of the same type from the histogram list.
ProgramOptions const & fOpt
A handler for system signals.
std::vector< std::pair< TCanvas *, std::string > > fvCanvas
Vector of Canvas pointers and folder path.
TObjArray fArrayHisto
Array of histograms with unique names.
volatile sig_atomic_t * fSignalStatus
Global signal status.
bool RegisterHistoConfig(const std::pair< std::string, std::string > &config)
Register a histogram config in the histogram server.
bool ReadHistogramExtendedTsId(const HistoSrc &pHistSrc, uint64_t tsIndex)
Reads a histogram slice for an extended histogram with the TS ID.
std::vector< std::pair< TNamed *, std::string > > fvHistos
Vector of Histos pointers and folder path.
bool ReceiveData(zmq::message_t &msg)
Find histogram index in the histogram array.
bool PrepareCanvas(uint32_t uCanvIdx)
Prepares canvases using received canvas configuration.
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
void Exec()
Run the application.
THttpServer * fServer
ROOT Histogram server (JSroot)
bool ReceiveConfigAndData(std::vector< zmq::message_t > &vMsg)
Receives a list of canvases and histograms.
bool ReceiveHistoConfig(zmq::message_t &msg)
Receives histogram configuration.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string with ( HistoName, FolderPath ) to configure the histogram.
const std::string & HistoFile() const
Get output file name (.root format)
bool CompressedInput() const
Get compressed input option.
const int32_t & ComChanZmqRcvHwm() const
Get receive High-Water Mark for interface channel.
const std::string & ComChan() const
Get interface channel name or hostname + port or whatever or ????? (FIXME: replacement of FairMQ)
const uint32_t & HttpPort() const
Get histo server http port.
const int32_t & ComChanZmqRcvTo() const
Get receive timeout for interface channel.
EHistFlag
Histogram control flags (bit masks)
Definition Histogram.h:56
@ StoreVsTsId
Store the histogram vs timeslice index.
@ OmitIntegrated
Omits storing integrated histogram.
Structure to keep the histograms for sending them on the histogram server.
std::forward_list< qa::Prof1D > fvP1
List of 1D-profiles.
std::forward_list< qa::Prof2D > fvP2
List of 2D-profiles.
std::forward_list< qa::H1D > fvH1
List of 1D-histograms.
uint64_t fTimesliceId
Index of the timeslice.
std::forward_list< qa::H2D > fvH2
List of 2D-histograms.
int fNofTsToStore
Number of consequent timeslices to store.