11#include "StorableTimeslice.hpp"
13#include "FairMQLogger.h"
14#include "FairMQProgOptions.h"
15#include "FairParGenericSet.h"
23#include "BoostSerializer.h"
24#include <boost/archive/binary_iarchive.hpp>
25#include <boost/serialization/utility.hpp>
32#include "RootSerializer.h"
34 using std::runtime_error::runtime_error;
44 LOG(info) <<
"Init options for CbmMqStarHistoServer.";
52 fiGdpbIndex = fConfig->GetValue<int32_t>(
"GdpbIdx");
58 fsTsBlockName = fConfig->GetValue<std::string>(
"TsBlockName");
61 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
62 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
63 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
69 LOG(error) << e.what();
76 LOG(info) <<
"Init parameter containers for CbmDeviceMonitorReqTof.";
80 for (
int iparC = 0; iparC <
fParCList->GetEntries(); iparC++) {
81 FairParGenericSet* tempObj = (FairParGenericSet*) (
fParCList->At(iparC));
83 std::string paramName {tempObj->GetName()};
88 std::string message = paramName +
",111";
89 LOG(info) <<
"Requesting parameter container " << paramName <<
", sending message: " << message;
91 FairMQMessagePtr req(NewSimpleMessage(message));
92 FairMQMessagePtr rep(NewMessage());
94 FairParGenericSet* newObj =
nullptr;
96 if (Send(req,
"parameters") > 0) {
97 if (Receive(rep,
"parameters") >= 0) {
98 if (rep->GetSize() != 0) {
100 newObj =
static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
101 LOG(info) <<
"Received unpack parameter from the server:";
105 LOG(error) <<
"Received empty reply. Parameter not available";
141 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
146 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
149 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
155 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
158 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
161 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
165 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
176 if (
"" == message) message = std::to_string(
kusSysIdTof);
177 LOG(debug) <<
"Requesting new TS by sending message: " << message;
178 FairMQMessagePtr req(NewSimpleMessage(message));
179 FairMQMessagePtr rep(NewMessage());
182 LOG(error) <<
"Failed to send the request! message was " << message;
186 LOG(error) <<
"Failed to receive a reply to the request! message was " << message;
189 else if (rep->GetSize() == 0) {
190 LOG(error) <<
"Received empty reply. Something went wrong with the timeslice generation! message was " << message;
200 LOG(error) << e.what();
201 ChangeState(fair::mq::Transition::ErrorFound);
209 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with size " << rep->GetSize();
213 std::string msgStr(
static_cast<char*
>(rep->GetData()), rep->GetSize());
214 std::istringstream iss(msgStr);
215 boost::archive::binary_iarchive inputArchive(iss);
218 fles::StorableTimeslice component {0};
219 inputArchive >> component;
227 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
228 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
248 FairMQMessagePtr messageHeader(NewMessage());
250 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
252 FairMQParts partsOut;
253 partsOut.AddPart(std::move(messageHeader));
257 FairMQMessagePtr messageHist(NewMessage());
259 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
261 partsOut.AddPart(std::move(messageHist));
266 FairMQMessagePtr messageCan(NewMessage());
268 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
270 partsOut.AddPart(std::move(messageCan));
274 FairMQMessagePtr msgHistos(NewMessage());
276 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
278 partsOut.AddPart(std::move(msgHistos));
282 LOG(error) <<
"CbmDeviceMonitorReqTof::SendHistoConfAndData => Problem sending data";
295 FairMQMessagePtr message(NewMessage());
297 RootSerializer().Serialize(*message, &
fArrayHisto);
301 LOG(error) <<
"Problem sending data";
320 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
321 if (
kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) {
324 else if (
kusSysIdBmon == ts.descriptor(uCompIdx, 0).sys_id) {
332 LOG(error) <<
"Failed processing TS " << ts.index() <<
" in unpacker algorithm class";
virtual bool ConditionalRun()
CbmMcbm2018MonitorAlgoTof * fMonitorAlgo
Processing algo.
double_t fdMinPublishTime
double_t fdMaxPublishTime
Bool_t fbIgnoreCriticalErrors
Switch ON the filling of a additional set of histograms.
std::string fsChannelNameDataInput
User settings parameters.
std::string fsChannelNameHistosInput
Bool_t fbComponentsAddedToList
If ON not printout at all for critical errors.
Bool_t fbIgnoreOverlapMs
Control flags.
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::string fsTsBlockName
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
uint32_t fuHistoryHistoSize
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
TList * fParCList
Parameters management.
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
Bool_t fbDebugMonitorMode
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
virtual ~CbmDeviceMonitorReqTof()
static const uint16_t kusSysIdBmon
static const uint16_t kusSysIdTof
Constants.
bool SendHistoConfAndData()
uint64_t fulNumMessages
Statistics & first TS rejection.
std::chrono::system_clock::time_point fLastPublishTime
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void UseAbsoluteTime(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t CreateHistograms()
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetGdpbIndex(Int_t iGdpb=-1)
void SetIgnoreCriticalErrors(Bool_t bFlagIn=kTRUE)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetDebugMonitorMode(Bool_t bFlagIn=kTRUE)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)