18#include "StorableTimeslice.hpp"
20#include "FairMQLogger.h"
21#include "FairMQProgOptions.h"
22#include "FairParGenericSet.h"
30#include "BoostSerializer.h"
31#include <boost/archive/binary_iarchive.hpp>
32#include <boost/serialization/utility.hpp>
39#include "RootSerializer.h"
41 using std::runtime_error::runtime_error;
51 LOG(info) <<
"Init options for CbmMqStarHistoServer.";
59 fiGdpbIndex = fConfig->GetValue<int32_t>(
"GdpbIdx");
68 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
69 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
70 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
85 int noChannel = fChannels.size();
86 LOG(info) <<
"Number of defined channels: " << noChannel;
87 for (
auto const& entry : fChannels) {
88 LOG(info) <<
"Channel name: " << entry.first;
96 LOG(error) << e.what();
104 std::size_t pos1 = channelName.find(entry);
105 if (pos1 != std::string::npos) {
106 const vector<std::string>::const_iterator
pos =
109 LOG(info) <<
"Found " << entry <<
" in " << channelName;
110 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
114 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
115 LOG(error) <<
"Stop device.";
121 LOG(info) <<
"Init parameter containers for CbmDeviceMonitorTof.";
125 for (
int iparC = 0; iparC <
fParCList->GetEntries(); iparC++) {
126 FairParGenericSet* tempObj = (FairParGenericSet*) (
fParCList->At(iparC));
128 std::string paramName {tempObj->GetName()};
133 std::string message = paramName +
",111";
134 LOG(info) <<
"Requesting parameter container " << paramName <<
", sending message: " << message;
136 FairMQMessagePtr req(NewSimpleMessage(message));
137 FairMQMessagePtr rep(NewMessage());
139 FairParGenericSet* newObj =
nullptr;
141 if (Send(req,
"parameters") > 0) {
142 if (Receive(rep,
"parameters") >= 0) {
143 if (rep->GetSize() != 0) {
145 newObj =
static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
146 LOG(info) <<
"Received unpack parameter from the server:";
150 LOG(error) <<
"Received empty reply. Parameter not available";
186 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
191 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
194 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
200 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
203 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
206 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
210 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
225 LOG(error) << e.what();
226 ChangeState(fair::mq::Transition::ErrorFound);
233 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with size " << msg->GetSize();
237 std::string msgStr(
static_cast<char*
>(msg->GetData()), msg->GetSize());
238 std::istringstream iss(msgStr);
239 boost::archive::binary_iarchive inputArchive(iss);
242 fles::StorableTimeslice component {0};
243 inputArchive >> component;
251 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
252 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
272 FairMQMessagePtr messageHeader(NewMessage());
274 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
276 FairMQParts partsOut;
277 partsOut.AddPart(std::move(messageHeader));
281 FairMQMessagePtr messageHist(NewMessage());
283 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
285 partsOut.AddPart(std::move(messageHist));
290 FairMQMessagePtr messageCan(NewMessage());
292 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
294 partsOut.AddPart(std::move(messageCan));
298 FairMQMessagePtr msgHistos(NewMessage());
300 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
302 partsOut.AddPart(std::move(msgHistos));
306 LOG(error) <<
"CbmDeviceMonitorTof::SendHistoConfAndData => Problem sending data";
319 FairMQMessagePtr message(NewMessage());
321 RootSerializer().Serialize(*message, &
fArrayHisto);
325 LOG(error) <<
"Problem sending data";
344 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
345 if (
kusSysIdTof == ts.descriptor(uCompIdx, 0).sys_id) {
348 else if (
kusSysIdBmon == ts.descriptor(uCompIdx, 0).sys_id) {
356 LOG(error) <<
"Failed processing TS " << ts.index() <<
" in unpacker algorithm class";
uint64_t fulNumMessages
Statistics & first TS rejection.
bool IsChannelNameAllowed(std::string channelName)
Bool_t fbComponentsAddedToList
If ON not printout at all for critical errors.
std::chrono::system_clock::time_point fLastPublishTime
std::string fsChannelNameHistosInput
uint32_t fuHistoryHistoSize
Bool_t fbDebugMonitorMode
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
CbmMcbm2018MonitorAlgoTof * fMonitorAlgo
Processing algo.
std::string fsChannelNameDataInput
User settings parameters.
TList * fParCList
Parameters management.
static const uint16_t kusSysIdTof
Constants.
Bool_t fbIgnoreCriticalErrors
Switch ON the filling of a additional set of histograms.
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
Bool_t fbIgnoreOverlapMs
Control flags.
bool HandleData(FairMQMessagePtr &, int)
double_t fdMinPublishTime
bool SendHistoConfAndData()
Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
double_t fdMaxPublishTime
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
static const uint16_t kusSysIdBmon
virtual ~CbmDeviceMonitorTof()
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void UseAbsoluteTime(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t CreateHistograms()
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetGdpbIndex(Int_t iGdpb=-1)
void SetIgnoreCriticalErrors(Bool_t bFlagIn=kTRUE)
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetDebugMonitorMode(Bool_t bFlagIn=kTRUE)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)