17#include "StorableTimeslice.hpp"
19#include "FairMQLogger.h"
20#include "FairMQProgOptions.h"
21#include "FairParGenericSet.h"
29#include "BoostSerializer.h"
30#include <boost/archive/binary_iarchive.hpp>
31#include <boost/serialization/utility.hpp>
38#include "RootSerializer.h"
40 using std::runtime_error::runtime_error;
47 : fbIgnoreOverlapMs {false}
48 , fsChannelNameDataInput {
"t0component"}
49 , fsChannelNameHistosInput {
"histogram-in"}
50 , fuHistoryHistoSize {3600}
51 , fuMinTotPulser {185}
52 , fuMaxTotPulser {195}
53 , fuOffSpillCountLimit {25}
54 , fuOffSpillCountLimitNonPulser {10}
55 , fdSpillCheckInterval {0.0128}
56 , fvuChanMap {0, 1, 2, 3, 4, 5, 6, 7}
57 , fuPublishFreqTs {100}
58 , fdMinPublishTime {0.5}
59 , fdMaxPublishTime {5.0}
60 , fsAllowedChannels {fsChannelNameDataInput}
64 , fLastPublishTime {
std::chrono::system_clock::now()}
75 LOG(info) <<
"Init options for CbmMqStarHistoServer.";
83 std::string sChanMap = fConfig->GetValue<std::string>(
"ChanMap");
92 size_t charPosDel = sChanMap.find(
',');
93 while (uChanIdx <
fvuChanMap.size() && std::string::npos != charPosDel) {
94 LOG(info) << sChanMap.substr(0, charPosDel);
95 fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel));
97 sChanMap = sChanMap.substr(charPosDel + 1);
98 LOG(info) << sChanMap;
100 charPosDel = sChanMap.find(
',');
103 LOG(info) << sChanMap;
108 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
109 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
110 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
122 int noChannel = fChannels.size();
123 LOG(info) <<
"Number of defined channels: " << noChannel;
124 for (
auto const& entry : fChannels) {
125 LOG(info) <<
"Channel name: " << entry.first;
133 LOG(error) << e.what();
134 ChangeState(fair::mq::Transition::ErrorFound);
140 std::size_t pos1 = channelName.find(entry);
141 if (pos1 != std::string::npos) {
142 const vector<std::string>::const_iterator
pos =
145 LOG(info) <<
"Found " << entry <<
" in " << channelName;
146 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
150 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
151 LOG(error) <<
"Stop device.";
157 LOG(info) <<
"Init parameter containers for CbmDeviceMonitorBmon.";
161 for (
int iparC = 0; iparC <
fParCList->GetEntries(); iparC++) {
162 FairParGenericSet* tempObj = (FairParGenericSet*) (
fParCList->At(iparC));
164 std::string paramName {tempObj->GetName()};
169 std::string message = paramName +
",111";
170 LOG(info) <<
"Requesting parameter container " << paramName <<
", sending message: " << message;
172 FairMQMessagePtr req(NewSimpleMessage(message));
173 FairMQMessagePtr rep(NewMessage());
175 FairParGenericSet* newObj =
nullptr;
177 if (Send(req,
"parameters") > 0) {
178 if (Receive(rep,
"parameters") >= 0) {
179 if (rep->GetSize() != 0) {
181 newObj =
static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
182 LOG(info) <<
"Received unpack parameter from the server:";
186 LOG(error) <<
"Received empty reply. Parameter not available";
227 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
232 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
235 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
241 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
244 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
247 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
251 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
267 LOG(error) << e.what();
268 ChangeState(fair::mq::Transition::ErrorFound);
275 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with size " << msg->GetSize();
279 std::string msgStr(
static_cast<char*
>(msg->GetData()), msg->GetSize());
280 std::istringstream iss(msgStr);
281 boost::archive::binary_iarchive inputArchive(iss);
284 fles::StorableTimeslice component {0};
285 inputArchive >> component;
293 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
294 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
314 FairMQMessagePtr messageHeader(NewMessage());
316 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
318 FairMQParts partsOut;
319 partsOut.AddPart(std::move(messageHeader));
323 FairMQMessagePtr messageHist(NewMessage());
325 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
327 partsOut.AddPart(std::move(messageHist));
332 FairMQMessagePtr messageCan(NewMessage());
334 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
336 partsOut.AddPart(std::move(messageCan));
340 FairMQMessagePtr msgHistos(NewMessage());
342 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
344 partsOut.AddPart(std::move(msgHistos));
348 LOG(error) <<
"CbmDeviceMonitorBmon::SendHistoConfAndData => Problem sending data";
361 FairMQMessagePtr message(NewMessage());
363 RootSerializer().Serialize(*message, &
fArrayHisto);
367 LOG(error) <<
"Problem sending data";
386 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
387 if (
kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
395 LOG(error) <<
"Failed processing TS " << ts.index() <<
" in unpacker algorithm class";
virtual ~CbmDeviceMonitorBmon()
TList * fParCList
Parameters management.
std::string fsChannelNameDataInput
User settings parameters.
bool HandleData(FairMQMessagePtr &, int)
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
double_t fdMaxPublishTime
std::chrono::system_clock::time_point fLastPublishTime
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
uint32_t fuOffSpillCountLimitNonPulser
bool DoUnpack(const fles::Timeslice &ts, size_t component)
static const uint16_t kusSysId
Constants.
Bool_t fbComponentsAddedToList
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
double fdSpillCheckInterval
std::vector< uint32_t > fvuChanMap
bool IsChannelNameAllowed(std::string channelName)
uint32_t fuHistoryHistoSize
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
CbmMcbm2018MonitorAlgoBmon * fMonitorAlgo
Processing algo.
double_t fdMinPublishTime
uint32_t fuOffSpillCountLimit
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::vector< std::string > fsAllowedChannels
List of MQ channels names.
uint64_t fulNumMessages
Statistics & first TS rejection.
Bool_t fbIgnoreOverlapMs
Control flags.
std::string fsChannelNameHistosInput
bool SendHistoConfAndData()
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetSpillThreshold(UInt_t uCntLimit)
void SetMonitorMode(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t CreateHistograms()
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetChannelMap(UInt_t uChan0, UInt_t uChan1, UInt_t uChan2, UInt_t uChan3, UInt_t uChan4, UInt_t uChan5, UInt_t uChan6, UInt_t uChan7)
void SetSpillCheckInterval(Double_t dIntervalSec)
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void SetSpillThresholdNonPulser(UInt_t uCntLimit)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.