10#include "StorableTimeslice.hpp"
12#include "FairMQLogger.h"
13#include "FairMQProgOptions.h"
14#include "FairParGenericSet.h"
22#include "BoostSerializer.h"
23#include <boost/archive/binary_iarchive.hpp>
24#include <boost/serialization/utility.hpp>
31#include "RootSerializer.h"
33 using std::runtime_error::runtime_error;
44 LOG(info) <<
"Init options for CbmMqStarHistoServer.";
52 std::string sChanMap = fConfig->GetValue<std::string>(
"ChanMap");
57 fsTsBlockName = fConfig->GetValue<std::string>(
"TsBlockName");
61 size_t charPosDel = sChanMap.find(
',');
62 while (uChanIdx <
fvuChanMap.size() && std::string::npos != charPosDel) {
63 fvuChanMap[uChanIdx] = std::stoul(sChanMap.substr(0, charPosDel));
64 sChanMap = sChanMap.substr(charPosDel + 1);
66 charPosDel = sChanMap.find(
',');
72 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
73 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
74 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
78 LOG(info) <<
"Requesting TS using the SysId: 0x" << std::hex << static_cast<int>(
kusSysId) << std::dec;
82 LOG(info) <<
"Requesting TS using the following block name: " <<
fsTsBlockName;
86 LOG(error) << e.what();
87 ChangeState(fair::mq::Transition::ErrorFound);
92 LOG(info) <<
"Init parameter containers for CbmDeviceMonitorReqBmon.";
96 for (
int iparC = 0; iparC <
fParCList->GetEntries(); iparC++) {
97 FairParGenericSet* tempObj = (FairParGenericSet*) (
fParCList->At(iparC));
99 std::string paramName {tempObj->GetName()};
104 std::string message = paramName +
",111";
105 LOG(info) <<
"Requesting parameter container " << paramName <<
", sending message: " << message;
107 FairMQMessagePtr req(NewSimpleMessage(message));
108 FairMQMessagePtr rep(NewMessage());
110 FairParGenericSet* newObj =
nullptr;
112 if (Send(req,
"parameters") > 0) {
113 if (Receive(rep,
"parameters") >= 0) {
114 if (rep->GetSize() != 0) {
116 newObj =
static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
117 LOG(info) <<
"Received unpack parameter from the server:";
121 LOG(error) <<
"Received empty reply. Parameter not available";
162 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
167 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
170 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
176 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
179 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
182 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
186 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
197 if (
"" == message) message = std::to_string(
kusSysId);
198 LOG(debug) <<
"Requesting new TS by sending message: " << message;
199 FairMQMessagePtr req(NewSimpleMessage(message));
200 FairMQMessagePtr rep(NewMessage());
203 LOG(error) <<
"Failed to send the request! message was " << message;
207 LOG(error) <<
"Failed to receive a reply to the request! message was " << message;
210 else if (rep->GetSize() == 0) {
211 LOG(error) <<
"Received empty reply. Something went wrong with the timeslice generation! message was " << message;
221 LOG(error) << e.what();
222 ChangeState(fair::mq::Transition::ErrorFound);
229 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with size " << rep->GetSize();
233 std::string msgStr(
static_cast<char*
>(rep->GetData()), rep->GetSize());
234 std::istringstream iss(msgStr);
235 boost::archive::binary_iarchive inputArchive(iss);
238 fles::StorableTimeslice component {0};
239 inputArchive >> component;
247 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
248 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
268 FairMQMessagePtr messageHeader(NewMessage());
270 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
272 FairMQParts partsOut;
273 partsOut.AddPart(std::move(messageHeader));
277 FairMQMessagePtr messageHist(NewMessage());
279 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
280 partsOut.AddPart(std::move(messageHist));
285 FairMQMessagePtr messageCan(NewMessage());
287 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
288 partsOut.AddPart(std::move(messageCan));
292 FairMQMessagePtr msgHistos(NewMessage());
294 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
296 partsOut.AddPart(std::move(msgHistos));
300 LOG(error) <<
"CbmDeviceMonitorReqBmon::SendHistoConfAndData => Problem sending data";
313 FairMQMessagePtr message(NewMessage());
315 RootSerializer().Serialize(*message, &
fArrayHisto);
319 LOG(error) <<
"Problem sending data";
338 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
339 if (
kusSysId == ts.descriptor(uCompIdx, 0).sys_id) {
347 LOG(error) <<
"Failed processing TS " << ts.index() <<
" in unpacker algorithm class";
uint32_t fuHistoryHistoSize
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
double_t fdMaxPublishTime
Bool_t fbComponentsAddedToList
Ignore Overlap Ms: all fuOverlapMsNb MS at the end of timeslice.
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::string fsChannelNameHistosInput
double fdSpillCheckInterval
double_t fdMinPublishTime
CbmDeviceMonitorReqBmon()
uint32_t fuOffSpillCountLimitNonPulser
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
virtual ~CbmDeviceMonitorReqBmon()
bool SendHistoConfAndData()
TList * fParCList
Parameters management.
std::string fsTsBlockName
std::chrono::system_clock::time_point fLastPublishTime
uint32_t fuOffSpillCountLimit
uint64_t fulNumMessages
Statistics & first TS rejection.
static const uint16_t kusSysId
Constants.
virtual bool ConditionalRun()
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
CbmMcbm2018MonitorAlgoBmon * fMonitorAlgo
Processing algo.
std::string fsChannelNameDataInput
User settings parameters.
Bool_t fbIgnoreOverlapMs
Control flags.
std::vector< uint32_t > fvuChanMap
bool DoUnpack(const fles::Timeslice &ts, size_t component)
Bool_t ResetHistograms(Bool_t bResetTime=kTRUE)
void SetSpillThreshold(UInt_t uCntLimit)
void SetMonitorMode(Bool_t bFlagIn=kTRUE)
void AddMsComponentToList(size_t component, UShort_t usDetectorId)
Bool_t CreateHistograms()
Bool_t ProcessTs(const fles::Timeslice &ts)
void SetChannelMap(UInt_t uChan0, UInt_t uChan1, UInt_t uChan2, UInt_t uChan3, UInt_t uChan4, UInt_t uChan5, UInt_t uChan6, UInt_t uChan7)
void SetSpillCheckInterval(Double_t dIntervalSec)
void SetHistoryHistoSize(UInt_t inHistorySizeSec=1800)
void SetPulserTotLimits(UInt_t uMin, UInt_t uMax)
void SetSpillThresholdNonPulser(UInt_t uCntLimit)
std::vector< std::pair< TNamed *, std::string > > GetHistoVector()
void SetIgnoreOverlapMs(Bool_t bFlagIn=kTRUE)
Control flags.
std::vector< std::pair< TCanvas *, std::string > > GetCanvasVector()
void ClearVector()
For unpacker algos.