47 LOG(info) <<
"Init options for CbmMqStarHistoServer.";
53 fsTsBlockName = fConfig->GetValue<std::string>(
"TsBlockName");
56 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
57 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
58 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
61 LOG(error) << e.what();
62 ChangeState(fair::mq::Transition::ErrorFound);
67 LOG(info) <<
"Init parameter containers for CbmTsConsumerReqDevExample.";
72 for (
int iparC = 0; iparC <
fParCList->GetEntries(); iparC++) {
73 FairParGenericSet* tempObj = (FairParGenericSet*) (
fParCList->At(iparC));
75 std::string paramName {tempObj->GetName()};
80 std::string message = paramName +
",111";
81 LOG(info) <<
"Requesting parameter container " << paramName <<
", sending message: " << message;
83 FairMQMessagePtr req(NewSimpleMessage(message));
84 FairMQMessagePtr rep(NewMessage());
86 FairParGenericSet* newObj =
nullptr;
88 if (Send(req,
"parameters") > 0) {
89 if (Receive(rep,
"parameters") >= 0) {
90 if (rep->GetSize() != 0) {
92 newObj =
static_cast<FairParGenericSet*
>(tmsg.ReadObject(tmsg.GetClass()));
93 LOG(info) <<
"Received unpack parameter from the server:";
97 LOG(error) <<
"Received empty reply. Parameter not available";
125 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
128 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
134 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
139 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
142 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
148 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
151 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
154 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
158 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
169 if (
"" == message) message = std::to_string(
kusSysId);
170 LOG(debug) <<
"Requesting new TS by sending message: " << message;
171 FairMQMessagePtr req(NewSimpleMessage(message));
172 FairMQMessagePtr rep(NewMessage());
175 LOG(error) <<
"Failed to send the request! message was " << message;
179 LOG(error) <<
"Failed to receive a reply to the request! message was " << message;
182 else if (rep->GetSize() == 0) {
183 LOG(error) <<
"Received empty reply. Something went wrong with the timeslice generation! message was " << message;
193 LOG(error) << e.what();
194 ChangeState(fair::mq::Transition::ErrorFound);
201 LOG(debug) <<
"Received message number " <<
fulNumMessages <<
" with size " << rep->GetSize();
205 std::string msgStr(
static_cast<char*
>(rep->GetData()), rep->GetSize());
206 std::istringstream iss(msgStr);
207 boost::archive::binary_iarchive inputArchive(iss);
210 fles::StorableTimeslice component {0};
211 inputArchive >> component;
219 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
220 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
240 FairMQMessagePtr messageHeader(NewMessage());
242 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
244 FairMQParts partsOut;
245 partsOut.AddPart(std::move(messageHeader));
249 FairMQMessagePtr messageHist(NewMessage());
251 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
253 partsOut.AddPart(std::move(messageHist));
258 FairMQMessagePtr messageCan(NewMessage());
260 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
262 partsOut.AddPart(std::move(messageCan));
266 FairMQMessagePtr msgHistos(NewMessage());
268 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
270 partsOut.AddPart(std::move(msgHistos));
274 LOG(error) <<
"CbmTsConsumerReqDevExample::SendHistoConfAndData => Problem sending data";