15#include "THttpServer.h"
19#include "TProfile2D.h"
20#include "TRootSniffer.h"
25#include <boost/archive/binary_iarchive.hpp>
26#include <boost/iostreams/device/array.hpp>
27#ifdef BOOST_IOS_HAS_ZSTD
28#include <boost/iostreams/filter/zstd.hpp>
29#include <boost/iostreams/filtering_stream.hpp>
31#include <boost/iostreams/stream.hpp>
32#include <boost/serialization/utility.hpp>
33#include <boost/serialization/vector.hpp>
36#include <zmq_addon.hpp>
38#include <fmt/format.h>
42namespace b_io = boost::iostreams;
43namespace b_ar = boost::archive;
57 , fSignalStatus(signalStatus)
60 LOG(info) <<
"Options for Application:";
61 LOG(info) <<
" Input ZMQ channel: " <<
fOpt.
ComChan();
76 fServer->GetSniffer()->SetScanGlobalDir(kFALSE);
77 const char* jsrootsys = gSystem->Getenv(
"JSROOTSYS");
78 if (!jsrootsys) jsrootsys = gEnv->GetValue(
"HttpServ.JSRootPath", jsrootsys);
85 fServer->RegisterCommand(
"/Reset_Hist",
"/UiCmdActor/->SetResetHistos()");
86 fServer->RegisterCommand(
"/Save_Hist",
"/UiCmdActor/->SetSaveHistos()");
87 fServer->RegisterCommand(
"/Stop_Server",
"/UiCmdActor/->SetServerStop()");
94 fServer->Restrict(
"/Reset_Hist",
"allow=admin");
95 fServer->Restrict(
"/Save_Hist",
"allow=admin");
96 fServer->Restrict(
"/Stop_Server",
"allow=admin");
99 LOG(info) <<
"JSROOT location: " << jsrootsys;
108 LOG(info) <<
"Listening to ZMQ messages ...";
115 std::vector<zmq::message_t> vMsg;
116 const auto ret = zmq::recv_multipart(
fZmqSocket, std::back_inserter(vMsg));
119 std::lock_guard<std::mutex> lk(
mtx);
123 else if (*ret == 1) {
127 LOG(error) <<
"Invalid number of message parts received: should be either 1 or more than 3 vs " << *ret;
130 catch (
const zmq::error_t& err) {
131 if (err.num() != EINTR) {
155 std::this_thread::sleep_for(std::chrono::milliseconds(10));
156 std::lock_guard<std::mutex> lk(
mtx);
163 LOG(info) <<
"Reset Monitor histos ";
169 LOG(info) <<
"Save All histos & canvases";
205 LOG(debug) <<
"Application::ReceiveData => Processing histograms update";
210 b_io::basic_array_source<char> device(
static_cast<char*
>(msg.data()), msg.size());
211 b_io::stream<b_io::basic_array_source<char>> s(device);
215#ifdef BOOST_IOS_HAS_ZSTD
216 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
217 in_->push(b_io::zstd_decompressor());
219 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
222 throw std::runtime_error(
"Unsupported ZSTD decompression (boost) for histograms input channel");
226 b_ar::binary_iarchive iarch(s);
238 auto CollectHistogram1D = [&](
const auto& container) ->
bool {
239 for (
const auto& hist : container) {
258 auto CollectHistogram2D = [&](
const auto& container) ->
bool {
259 for (
const auto& hist : container) {
269 if (!CollectHistogram1D(vHist.
fvH1))
return false;
270 if (!CollectHistogram2D(vHist.
fvH2))
return false;
271 if (!CollectHistogram1D(vHist.
fvP1))
return false;
272 if (!CollectHistogram2D(vHist.
fvP2))
return false;
278 LOG(debug) <<
"Application::ReceiveData => Checking for canvases updates";
319 arrayHisto->Delete();
323 if (!fbAllCanvasReady) {
324 LOG(debug) << "Application::ReceiveData => Checking for canvases updates";
325 for (uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv) {
327 if (fvbCanvasReady[uCanv]) { //
332 fvbCanvasReady[uCanv] = PrepareCanvas(uCanv);
333 } // for( uint32_t uCanv = 0; uCanv < fvpsCanvasConfig.size(); ++uCanv )
334 } // if( !fbAllCanvasReady )
335 } // if (TString(tempObject->ClassName()).EqualTo("TObjArray"))
338 std::string err_msg = "Application::ReceiveData => Wrong object type at input: ";
339 err_msg += tempObject->ClassName();
340 throw std::runtime_error(err_msg);
343 if (nullptr != tempObject) delete tempObject;
348 LOG(debug) <<
"Application::ReceiveData => Finished processing histograms update";
352 LOG(info) <<
"HistServ::Application::ReceiveData => Finished processing histograms update #" <<
fNMessages
367 b_io::basic_array_source<char> device(
static_cast<char*
>(msg.data()), msg.size());
368 b_io::stream<b_io::basic_array_source<char>> s(device);
370 std::pair<std::string, std::string> tempObject(
"",
"");
372#ifdef BOOST_IOS_HAS_ZSTD
373 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
374 in_->push(b_io::zstd_decompressor());
376 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
377 *iarchive_ >> tempObject;
379 throw std::runtime_error(
"Unsupported ZSTD decompression (boost) for histograms config input channel");
383 b_ar::binary_iarchive iarch(s);
388 std::string& name = tempObject.first;
389 std::string metadataMsg{};
390 std::tie(name, metadataMsg) = HistogramMetadata::SeparateNameAndMetadata(name);
393 if (!metadata.GetFlag(EHistFlag::OmitIntegrated)) {
397 if (metadata.GetFlag(EHistFlag::StoreVsTsId)) {
399 this->
RegisterHistoConfig(std::make_pair(name + std::string(HistogramMetadata::ksTsIdSuffix), tempObject.second));
411 b_io::basic_array_source<char> device(
static_cast<char*
>(msg.data()), msg.size());
412 b_io::stream<b_io::basic_array_source<char>> s(device);
414 std::pair<std::string, std::string> tempObject(
"",
"");
416#ifdef BOOST_IOS_HAS_ZSTD
417 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
418 in_->push(b_io::zstd_decompressor());
420 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
421 *iarchive_ >> tempObject;
423 throw std::runtime_error(
"Unsupported ZSTD decompression (boost) for canvas config input channel");
427 b_ar::binary_iarchive iarch(s);
431 LOG(debug) <<
" Received configuration for canvas " << tempObject.first <<
" : " << tempObject.second;
435 uint32_t uPrevCanv = 0;
443 LOG(debug) <<
" Ignored new configuration for Canvas " << tempObject.first
444 <<
" due to previously received one: " << tempObject.second;
452 fvCanvas.push_back(std::pair<TCanvas*, std::string>(
nullptr,
""));
455 LOG(info) <<
" Stored configuration for canvas " << tempObject.first <<
" : " << tempObject.second;
465 LOG(debug) <<
"Application::ReceiveConfigAndData => Received composed message with " << vMsg.size() <<
" parts";
470 b_io::basic_array_source<char> device_header(
static_cast<char*
>(vMsg.at(0).data()), vMsg.at(0).size());
471 b_io::stream<b_io::basic_array_source<char>> s_header(device_header);
473 std::pair<uint32_t, uint32_t> pairHeader;
475#ifdef BOOST_IOS_HAS_ZSTD
476 std::unique_ptr<b_io::filtering_istream> in_ = std::make_unique<b_io::filtering_istream>();
477 in_->push(b_io::zstd_decompressor());
479 std::unique_ptr<b_ar::binary_iarchive> iarchive_ = std::make_unique<b_ar::binary_iarchive>(*in_, b_ar::no_header);
480 *iarchive_ >> pairHeader;
482 throw std::runtime_error(
"Unsupported ZSTD decompression (boost) for Config + Histos input channel");
486 b_ar::binary_iarchive iarch_header(s_header);
487 iarch_header >> pairHeader;
490 LOG(debug) <<
"Application::ReceiveConfigAndData => Received configuration for " << pairHeader.first <<
" histos and "
491 << pairHeader.second <<
" canvases";
493 uint32_t uOffsetHistoConfig = pairHeader.first;
494 if (0 == pairHeader.first) {
495 uOffsetHistoConfig = 1;
496 if (0 < vMsg[uOffsetHistoConfig].
size()) {
499 std::string err_msg =
"Application::ReceiveConfigAndData => No histo config expected but corresponding message";
500 err_msg +=
" is not empty: ";
501 err_msg += vMsg[uOffsetHistoConfig].size();
502 throw std::runtime_error(err_msg);
506 uint32_t uOffsetCanvasConfig = pairHeader.second;
507 if (0 == pairHeader.second) {
508 uOffsetCanvasConfig = 1;
509 if (0 < vMsg[uOffsetHistoConfig + uOffsetCanvasConfig].
size()) {
512 std::string err_msg =
"Application::ReceiveConfigAndData => No Canvas config expected but corresponding ";
513 err_msg +=
" message is not empty: ";
514 err_msg += vMsg[uOffsetHistoConfig + uOffsetCanvasConfig].size();
515 throw std::runtime_error(err_msg);
519 if ((1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1) != vMsg.size()) {
522 std::string err_msg =
"Application::ReceiveConfigAndData => Nb parts in message not matching configs numbers ";
523 err_msg +=
" declared in header";
524 err_msg += vMsg.size();
526 err_msg += 1 + uOffsetHistoConfig + uOffsetCanvasConfig + 1;
527 throw std::runtime_error(err_msg);
531 for (uint32_t uHisto = 0; uHisto < pairHeader.first; ++uHisto) {
534 LOG(debug) <<
"Application::ReceiveConfigAndData => Processed configuration for " << pairHeader.first <<
" histos";
537 for (uint32_t uCanv = 0; uCanv < pairHeader.second; ++uCanv) {
540 LOG(debug) <<
"Application::ReceiveConfigAndData => Processed configuration for " << pairHeader.second <<
" canvases";
543 ReceiveData(vMsg[1 + uOffsetHistoConfig + uOffsetCanvasConfig]);
550template<
class HistoDst,
class HistoSrc>
557 HistoDst* histogram_new =
static_cast<HistoDst*
>(pHist->Clone());
560 LOG(info) <<
"Received new histo " << pHist->GetName();
576 LOG(info) <<
"registered histo " <<
fvHistos[uHist].first->GetName() <<
" in folder "
596 LOG(debug) <<
"Received update for: " << pHist->GetName();
597 HistoDst* histogram_existing =
dynamic_cast<HistoDst*
>(
fArrayHisto.At(index1));
598 if (
nullptr == histogram_existing) {
599 LOG(error) <<
"CbmMqHistoServer::ReadHistogram => "
600 <<
"Incompatible type found during update for histo " << pHist->GetName();
605 histogram_existing->Add(pHist);
613template<
class HistoSrc>
616 constexpr bool IsSrcH1D = std::is_same_v<HistoSrc, H1D>;
617 constexpr bool IsSrcProf1D = std::is_same_v<HistoSrc, Prof1D>;
619 if constexpr (IsSrcH1D || IsSrcProf1D) {
621 using HistoDst_t =
typename std::conditional<IsSrcH1D, TH2D, TProfile2D>::type;
624 std::string sHistoName = rHist.GetName() + std::string(HistogramMetadata::ksTsIdSuffix);
628 HistoDst_t* histogram_new =
nullptr;
630 std::string title = rHist.GetTitle();
631 title.insert(title.find_first_of(
';'),
" vs. TS index;TS index");
633 double minX =
static_cast<double>(tsIndex) - 0.5;
634 double maxX =
static_cast<double>(tsIndex + nBinsX) - 0.5;
635 int nBinsY = rHist.GetNbinsX();
636 double minY = rHist.GetMinX();
637 double maxY = rHist.GetMaxX();
638 if constexpr (IsSrcH1D) {
639 histogram_new =
new TH2D(sHistoName.c_str(), title.c_str(), nBinsX, minX, maxX, nBinsY, minY, maxY);
641 else if constexpr (IsSrcProf1D) {
642 double minZ = rHist.GetMinY();
643 double maxZ = rHist.GetMaxY();
645 new TProfile2D(sHistoName.c_str(), title.c_str(), nBinsX, minX, maxX, nBinsY, minY, maxY, minZ, maxZ);
646 histogram_new->Sumw2();
652 LOG(info) <<
"Received new histo " << sHistoName;
668 LOG(info) <<
"registered histo " <<
fvHistos[uHist].first->GetName() <<
" in folder "
688 LOG(debug) <<
"Received update for: " << sHistoName;
689 HistoDst_t* histogram_existing =
dynamic_cast<HistoDst_t*
>(
fArrayHisto.At(index1));
690 if (
nullptr == histogram_existing) {
691 LOG(error) <<
"CbmMqHistoServer::ReadHistogram => "
692 <<
"Incompatible type found during update for histo " << sHistoName;
699 LOG(warn) <<
"Histogram " << rHist.GetName() <<
" cannot be plotted vs. TS index. Ignoring";
709 LOG(debug) <<
" Received configuration for histo " << config.first <<
" : " << config.second;
713 UInt_t uPrevHist = 0;
721 LOG(debug) <<
" Ignored new configuration for histo " << config.first
722 <<
" due to previously received one: " << config.second;
727 fvHistos.push_back(std::pair<TNamed*, std::string>(
nullptr,
""));
730 LOG(info) <<
" Stored configuration for histo " << config.first <<
" : " << config.second;
740 for (
int iHist = 0; iHist <
fArrayHisto.GetEntriesFast(); ++iHist) {
742 if (TString(obj->GetName()).EqualTo(name)) {
754 LOG(info) <<
" Extracting configuration for canvas index " << uCanvIdx <<
"(name: " << conf.
GetName().data() <<
")";
758 for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
760 for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
761 std::string sName(conf.
GetObjName(uPadIdx, uObjIdx));
763 if (
"nullptr" != sName) {
765 LOG(warn) <<
"Histogram \"" << sName <<
"\" requested by canvas \"" << conf.
GetName().data()
766 <<
"\" was not found";
773 LOG(info) <<
" All histos found for canvas " << conf.
GetName().data() <<
", now preparing it";
776 std::string sNameFull = conf.
GetName();
777 size_t lastSlashPos = sNameFull.find_last_of(
'/');
778 std::string sNamePart = lastSlashPos > sNameFull.size() ? sNameFull : sNameFull.substr(lastSlashPos + 1);
779 std::string sDir = lastSlashPos > sNameFull.size() ?
"" : sNameFull.substr(0, lastSlashPos);
780 std::string canvDir = sDir.empty() ?
"canvases" : fmt::format(
"canvases/{}", sDir);
783 TCanvas* pNewCanv =
new TCanvas(sNamePart.c_str(), conf.
GetTitle().data());
787 for (uint32_t uPadIdx = 0; uPadIdx < uNbPads; ++uPadIdx) {
788 pNewCanv->cd(1 + uPadIdx);
792 gPad->SetLogx(conf.
GetLogx(uPadIdx));
793 gPad->SetLogy(conf.
GetLogy(uPadIdx));
794 gPad->SetLogz(conf.
GetLogz(uPadIdx));
798 for (uint32_t uObjIdx = 0; uObjIdx < uNbObj; ++uObjIdx) {
799 std::string sName(conf.
GetObjName(uPadIdx, uObjIdx));
800 if (
"nullptr" != sName) {
803 if (
nullptr !=
dynamic_cast<TProfile2D*
>(pObj)) {
804 dynamic_cast<TProfile2D*
>(pObj)->Draw(conf.
GetOption(uPadIdx, uObjIdx).data());
806 else if (
nullptr !=
dynamic_cast<TProfile*
>(pObj)) {
807 dynamic_cast<TProfile*
>(pObj)->Draw(conf.
GetOption(uPadIdx, uObjIdx).data());
809 else if (
nullptr !=
dynamic_cast<TH2*
>(pObj)) {
810 dynamic_cast<TH2*
>(pObj)->Draw(conf.
GetOption(uPadIdx, uObjIdx).data());
812 else if (
nullptr !=
dynamic_cast<TH1*
>(pObj)) {
813 dynamic_cast<TH1*
>(pObj)->Draw(conf.
GetOption(uPadIdx, uObjIdx).data());
816 LOG(warning) <<
" Unsupported object type for " << sName <<
" when preparing canvas " << conf.
GetName();
818 LOG(info) <<
" Configured histo " << sName <<
" on pad " << (1 + uPadIdx) <<
" for canvas "
824 fvCanvas[uCanvIdx] = std::pair<TCanvas*, std::string>(pNewCanv, canvDir);
828 LOG(info) <<
" Registered canvas " <<
fvCanvas[uCanvIdx].first->GetName() <<
" in folder "
847 for (
int iHist = 0; iHist <
fArrayHisto.GetEntriesFast(); ++iHist) {
848 dynamic_cast<TH1*
>(
fArrayHisto.At(iHist))->Reset();
858 LOG(error) <<
"Filename for saving histograms and canvases not defined. Ignoring request.";
863 TFile* oldFile = gFile;
864 TDirectory* oldDir = gDirectory;
867 TFile* histoFile =
nullptr;
872 if (
nullptr == histoFile) {
875 LOG(error) <<
"Ignoring request to save histograms and canvases: could not open output file " <<
fOpt.
HistoFile();
879 LOG(info) <<
"Saving Histograms and canvases in file: " <<
fOpt.
HistoFile();
882 for (UInt_t uHisto = 0; uHisto <
fvHistos.size(); ++uHisto) {
886 TString sFolder =
fvHistos[uHisto].second.data();
887 if (
nullptr == gDirectory->Get(sFolder)) {
888 gDirectory->mkdir(sFolder);
890 gDirectory->cd(sFolder);
899 for (UInt_t uCanvas = 0; uCanvas <
fvCanvas.size(); ++uCanvas) {
903 TString sFolder =
fvCanvas[uCanvas].second.data();
904 if (
nullptr == gDirectory->Get(sFolder)) {
905 gDirectory->mkdir(sFolder);
907 gDirectory->cd(sFolder);
Set of tools for online->ROOT QA-objects conversions (header)
A histogram container for the histogram server (header)
static constexpr size_t size()
Application(ProgramOptions const &opt)
Standard constructor, initialize the application.
uint32_t GetNbPadsY() const
bool GetLogz(uint32_t uPadIdx) const
bool GetGridy(uint32_t uPadIdx) const
bool GetLogy(uint32_t uPadIdx) const
std::string GetOption(uint32_t uPadIdx, uint32_t uObjIdx) const
std::string GetTitle() const
std::string GetName() const
accessors
bool GetLogx(uint32_t uPadIdx) const
uint32_t GetNbObjsInPad(uint32_t uPadIdx) const
bool GetGridx(uint32_t uPadIdx) const
accessors
uint32_t GetNbPadsX() const
std::string GetObjName(uint32_t uPadIdx, uint32_t uObjIdx) const
uint32_t GetNbPads() const
static void AddSlice(const H1D &src, double value, TH2D *dst)
Fills a slice of a histogram of a higher dimension for a given value (....)
static TH1D * ROOTHistogram(const H1D &hist)
Converts histogram H1D to ROOT histogram TH1D.
bool fbAllHistosRegistered
uint32_t fNMessages
Internal status.
bool ReadHistogram(const HistoSrc &rHist)
Read a histogram.
bool SaveHistograms()
Saves handled histograms.
bool ReceiveCanvasConfig(zmq::message_t &msg)
Receives canvas configuration.
int FindHistogram(const std::string &name)
Collects histograms of the same type from the histogram list.
ProgramOptions const & fOpt
A handler for system signals.
std::vector< std::pair< TCanvas *, std::string > > fvCanvas
Vector of Canvas pointers and folder path.
std::vector< bool > fvbCanvasReady
TObjArray fArrayHisto
Array of histograms with unique names.
std::vector< bool > fvbCanvasRegistered
volatile sig_atomic_t * fSignalStatus
Global signal status.
bool RegisterHistoConfig(const std::pair< std::string, std::string > &config)
Register a histogram config in the histogram server.
bool ReadHistogramExtendedTsId(const HistoSrc &pHistSrc, uint64_t tsIndex)
Reads a histogram slice for an extended histogram with the TS ID.
std::vector< std::pair< TNamed *, std::string > > fvHistos
Vector of Histos pointers and folder path.
bool ReceiveData(zmq::message_t &msg)
Find histogram index in the histogram array.
bool PrepareCanvas(uint32_t uCanvIdx)
Prepares canvases using received canvas configuration.
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool fbAllCanvasRegistered
std::vector< bool > fvbHistoRegistered
bool ResetHistograms()
Resets handled histograms.
void Exec()
Run the application.
THttpServer * fServer
ROOT Histogram server (JSroot)
bool ReceiveConfigAndData(std::vector< zmq::message_t > &vMsg)
Receives a list of canvases and histograms.
~Application()
Destructor.
bool ReceiveHistoConfig(zmq::message_t &msg)
Receives histogram configuration.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string with ( HistoName, FolderPath ) to configure the histogram.
std::unique_ptr< UiCmdActor > fUiCmdActor
bool HideGuiCommands() const
Get overwrite option.
const std::string & HistoFile() const
Get output file name (.root format)
bool CompressedInput() const
Get compressed input option.
const int32_t & ComChanZmqRcvHwm() const
Get receive High-Water Mark for interface channel.
const std::string & ComChan() const
Get interface channel name or hostname + port or whatever or ????? (FIXME: replacement of FairMQ)
const uint32_t & HttpPort() const
Get histo server http port.
const int32_t & ComChanZmqRcvTo() const
Get receive timeout for interface channel.
bool Overwrite() const
Get overwrite option.
EHistFlag
Histogram control flags (bit masks)
@ StoreVsTsId
Store the histogram vs timeslice index.
@ OmitIntegrated
Omits storing integrated histogram.
Structure to keep the histograms for sending them on the histogram server.
std::forward_list< qa::Prof1D > fvP1
List of 1D-profiles.
std::forward_list< qa::Prof2D > fvP2
List of 2D-profiles.
std::forward_list< qa::H1D > fvH1
List of 1D-histograms.
uint64_t fTimesliceId
Index of the timeslice.
std::forward_list< qa::H2D > fvH2
List of 2D-histograms.
int fNofTsToStore
Number of consequent timeslices to store.