27#include "StorableTimeslice.hpp"
30#include "FairMQLogger.h"
31#include "FairMQProgOptions.h"
32#include "FairParGenericSet.h"
40#include "BoostSerializer.h"
41#include <boost/archive/binary_iarchive.hpp>
42#include <boost/serialization/utility.hpp>
50#include "RootSerializer.h"
52 using std::runtime_error::runtime_error;
64 LOG(info) <<
"Init options for CbmDeviceBmonMonitor.";
65 fsSetupName = fConfig->GetValue<std::string>(
"Setup");
66 fuRunId = fConfig->GetValue<uint32_t>(
"RunId");
67 fbUnpBmon = fConfig->GetValue<
bool>(
"UnpBmon");
70 fvsSetTimeOffs = fConfig->GetValue<std::vector<std::string>>(
"SetTimeOffs");
79 LOG(error) << e.what();
86 LOG(info) <<
"Init parameter containers for CbmDeviceBmonMonitor.";
89 TString srcDir = std::getenv(
"VMCWORKDIR");
95 FairMQMessagePtr req(NewSimpleMessage(
"setup"));
96 FairMQMessagePtr rep(NewMessage());
98 if (Send(req,
"parameters") > 0) {
99 if (Receive(rep,
"parameters") >= 0) {
100 if (0 != rep->GetSize()) {
104 exchangableSetup =
dynamic_cast<CbmSetupStorable*
>(tmsg.ReadObject(tmsg.GetClass()));
106 if (
nullptr != exchangableSetup) {
111 LOG(error) <<
"Received corrupt reply. Setup not available";
116 LOG(error) <<
"Received empty reply. Setup not available";
125 std::shared_ptr<CbmBmonUnpackConfig> bmonconfig =
nullptr;
127 bmonconfig = std::make_shared<CbmBmonUnpackConfig>(
"",
fuRunId);
130 bmonconfig->SetDoWriteOutput();
132 std::string parfilesbasepathBmon = Form(
"%s/macro/beamtime/mcbm2022/", srcDir.Data());
133 bmonconfig->SetParFilesBasePath(parfilesbasepathBmon);
134 bmonconfig->SetParFileName(
"mBmonCriPar.par");
135 bmonconfig->SetSystemTimeOffset(-1220);
138 auto monitor = std::make_shared<CbmTofUnpackMonitor>();
139 monitor->SetBmonMode(
true);
140 monitor->SetInternalHttpMode(
false);
142 monitor->SetSpillThreshold(250);
143 monitor->SetSpillThresholdNonPulser(100);
145 bmonconfig->SetMonitor(monitor);
156 size_t charPosDel = (*itStrOffs).find(
',');
157 if (std::string::npos == charPosDel) {
158 LOG(info) <<
"CbmDeviceBmonMonitor::InitContainers => "
159 <<
"Trying to set trigger window with invalid option pattern, ignored! "
160 <<
" (Should be ECbmModuleId,dWinBeg,dWinEnd but instead found " << (*itStrOffs) <<
" )";
164 std::string sSelDet = (*itStrOffs).substr(0, charPosDel);
167 int32_t iOffset = std::stoi((*itStrOffs).substr(charPosDel));
173 LOG(info) <<
"CbmDeviceBmonMonitor::InitContainers => Trying to set time "
174 "offset for unsupported detector, ignored! "
180 Bool_t initOK = kTRUE;
201 LOG(info) <<
"CbmDeviceBmonMonitor::InitParameters";
203 LOG(info) <<
"CbmDeviceBmonMonitor::InitParameters - empty requirements vector no parameters initialized.";
208 for (
auto& pair : *reqparvec) {
217 std::string paramName {pair.second->GetName()};
222 std::string message = paramName +
",111";
223 LOG(info) <<
"Requesting parameter container " << paramName <<
", sending message: " << message;
225 FairMQMessagePtr req(NewSimpleMessage(message));
226 FairMQMessagePtr rep(NewMessage());
228 FairParGenericSet* newObj =
nullptr;
230 if (Send(req,
"parameters") > 0) {
231 if (Receive(rep,
"parameters") >= 0) {
232 if (0 != rep->GetSize()) {
234 newObj =
static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
235 LOG(info) <<
"Received unpack parameter from the server: " << newObj->GetName();
239 LOG(error) <<
"Received empty reply. Parameter not available";
244 pair.second.reset(newObj);
259 std::vector<std::pair<TNamed*, std::string>> vHistos =
fBmonConfig->GetMonitor()->GetHistoVector();
262 std::vector<std::pair<TCanvas*, std::string>> vCanvases =
fBmonConfig->GetMonitor()->GetCanvasVector();
268 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
273 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
276 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
282 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
285 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
288 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
292 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
307 LOG(error) << e.what();
308 ChangeState(fair::mq::Transition::ErrorFound);
317 std::string message =
"SendFirstTimesliceIndex";
318 LOG(debug) <<
"Requesting start time by sending message: SendFirstTimesliceIndex" << message;
319 FairMQMessagePtr req(NewSimpleMessage(message));
320 FairMQMessagePtr rep(NewMessage());
323 LOG(error) <<
"Failed to send the request! message was " << message;
327 LOG(error) <<
"Failed to receive a reply to the request! message was " << message;
330 else if (rep->GetSize() == 0) {
331 LOG(error) <<
"Received empty reply. Something went wrong with the timeslice generation! message was " << message;
335 std::string msgStrRep(
static_cast<char*
>(rep->GetData()), rep->GetSize());
336 std::istringstream issRep(msgStrRep);
337 boost::archive::binary_iarchive inputArchiveRep(issRep);
338 inputArchiveRep >> sReply;
340 fBmonConfig->GetMonitor()->SetHistosStartTime((1e-9) *
static_cast<double>(std::stoul(sReply)));
345 std::string message =
"full";
346 LOG(debug) <<
"Requesting new TS by sending message: full" << message;
347 FairMQMessagePtr req(NewSimpleMessage(message));
348 FairMQMessagePtr rep(NewMessage());
351 LOG(error) <<
"Failed to send the request! message was " << message;
355 LOG(error) <<
"Failed to receive a reply to the request! message was " << message;
358 else if (rep->GetSize() == 0) {
359 LOG(error) <<
"Received empty reply. Something went wrong with the timeslice generation! message was " << message;
364 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with size " << rep->GetSize();
368 std::string msgStr(
static_cast<char*
>(rep->GetData()), rep->GetSize());
369 std::istringstream iss(msgStr);
370 boost::archive::binary_iarchive inputArchive(iss);
373 fles::StorableTimeslice ts {0};
408 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
409 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
430 FairMQMessagePtr messTsHeader(NewMessage());
434 parts.AddPart(std::move(messTsHeader));
437 std::stringstream ossBmon;
438 boost::archive::binary_oarchive oaBmon(ossBmon);
443 oaBmon << (std::vector<CbmTofDigi>());
445 std::string* strMsgBmon =
new std::string(ossBmon.str());
447 parts.AddPart(NewMessage(
448 const_cast<char*
>(strMsgBmon->c_str()),
449 strMsgBmon->length(),
450 [](
void*,
void*
object) { delete static_cast<std::string*>(object); },
456 FairMQMessagePtr messTsMeta(NewMessage());
458 RootSerializer().Serialize(*messTsMeta,
fTsMetaData);
459 parts.AddPart(std::move(messTsMeta));
474 FairMQMessagePtr messageHeader(NewMessage());
476 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
477 FairMQParts partsOut;
478 partsOut.AddPart(std::move(messageHeader));
482 FairMQMessagePtr messageHist(NewMessage());
484 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
486 partsOut.AddPart(std::move(messageHist));
492 FairMQMessagePtr messageHist(NewMessage());
493 partsOut.AddPart(std::move(messageHist));
498 FairMQMessagePtr messageCan(NewMessage());
500 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
502 partsOut.AddPart(std::move(messageCan));
508 FairMQMessagePtr messageHist(NewMessage());
509 partsOut.AddPart(std::move(messageHist));
513 FairMQMessagePtr msgHistos(NewMessage());
515 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
516 partsOut.AddPart(std::move(msgHistos));
520 LOG(error) <<
"CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";
527 fBmonConfig->GetMonitor()->ResetBmonHistograms(kFALSE);
535 FairMQMessagePtr message(NewMessage());
537 RootSerializer().Serialize(*message, &
fArrayHisto);
541 LOG(error) <<
"Problem sending data";
566 uint64_t nComponents = ts.num_components();
568 LOG(debug) <<
"Unpack: TS index " << ts.index() <<
" components " << nComponents;
570 for (uint64_t component = 0; component < nComponents; component++) {
571 auto systemId =
static_cast<std::uint16_t
>(ts.descriptor(component, 0).sys_id);
582 if (
fDoDebugPrints) LOG(error) <<
"Unpack: Unknown system ID " << systemId <<
" for component " << component;
598 auto spadic = std::make_shared<CbmTrdSpadic>();
599 spadic->SetUseBaselineAverage(useAvgBaseline);
600 spadic->SetMaxAdcToEnergyCal(1.0);
Configuration class for an unpacker algorithm.
Configuration class for an unpacker algorithm.
Configuration class for an unpacker algorithm.
Configuration class for an unpacker algorithm.
Configuration class for an unpacker algorithm.
Configuration class for an unpacker algorithm.
Configuration class for an unpacker algorithm.
std::chrono::system_clock::time_point fLastPublishTime
bool fDoDebugPrints
Flag if extended debug output is to be printed or not.
virtual ~CbmDeviceBmonMonitor()
Double_t fdTsFullSizeInNs
Total size of the overlap MS in a TS, [nanoseconds].
Bool_t DoUnpack(const fles::Timeslice &ts, size_t component)
Bool_t InitParameters(std::vector< std::pair< std::string, std::shared_ptr< FairParGenericSet > > > *reqparvec)
Parameters management.
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
uint32_t fuPublishFreqTs
Histograms management.
double_t fdMinPublishTime
std::string fsChannelNameHistosInput
std::shared_ptr< CbmTrdSpadic > GetTrdSpadic(bool useAvgBaseline)
Get the Trd Spadic.
CbmTsEventHeader * fCbmTsEventHeader
Pointer to the Timeslice header conatining start time and index.
bool fbOutputFullTimeSorting
Flag to Enable/disable a full time sorting. If off, time sorting happens per link/FLIM source.
Double_t fdTsCoreSizeInNs
Size of a single MS, [nanoseconds].
std::shared_ptr< CbmBmonUnpackConfig > fBmonConfig
Configuration of the unpackers. Provides the configured algorithm.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
Double_t fdTsOverSizeInNs
Total size of the core MS in a TS, [nanoseconds].
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
void SetUnpackConfig(std::shared_ptr< CbmBmonUnpackConfig > config)
Set the Bmon Unpack Config.
bool SendHistoConfAndData()
std::vector< std::string > fvsSetTimeOffs
Time offsets.
size_t fuNbCoreMsPerTs
TS MetaData storage: stable so should be moved somehow to parameters handling (not transmitted with e...
std::string fsSetupName
User settings parameters.
uint64_t fulNumMessages
Statistics & first TS rejection.
static constexpr std::uint16_t fkFlesBmon
Constants.
std::string fsChannelNameDataOutput
double_t fdMaxPublishTime
std::string fsChannelNameDataInput
message queues
size_t unpack(const std::uint16_t subsysid, const fles::Timeslice *ts, std::uint16_t icomp, TConfig config, std::vector< TOptOutA > *optouttargetvecA=nullptr, std::vector< TOptOutB > *optouttargetvecB=nullptr)
Template for the unpacking call of a given algorithm.
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
TimesliceMetaData * fTsMetaData
Total size of all MS in a TS, [nanoseconds].
bool fbUnpBmon
---> for selective unpacking
Bool_t fbIgnoreOverlapMs
Control flags.
void LoadStoredSetup(CbmSetupStorable *setupIn)
static CbmSetup * Instance()
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)