10#include "TimesliceInputArchive.hpp"
11#include "TimesliceMultiInputArchive.hpp"
12#include "TimesliceMultiSubscriber.hpp"
13#include "TimesliceSubscriber.hpp"
15#include "FairMQLogger.h"
16#include "FairMQProgOptions.h"
23#include "BoostSerializer.h"
24#include <boost/algorithm/string.hpp>
25#include <boost/archive/binary_oarchive.hpp>
27#include <boost/regex.hpp>
28#include <boost/serialization/utility.hpp>
30#include "RootSerializer.h"
50 using std::runtime_error::runtime_error;
56 , fLastPublishTime {
std::chrono::system_clock::now()}
63 fsFileName = fConfig->GetValue<
string>(
"filename");
64 fsDirName = fConfig->GetValue<
string>(
"dirname");
65 fsHost = fConfig->GetValue<
string>(
"fles-host");
66 fusPort = fConfig->GetValue<uint16_t>(
"fles-port");
70 fbNoSplitTs = fConfig->GetValue<
bool>(
"no-split-ts");
84 LOG(warning) <<
"Both no-split-ts, send-ts-per-sysid and send-ts-per-block options used => "
85 <<
" second and third one will be ignored!!!!";
88 LOG(warning) <<
"Both no-split-ts and send-ts-per-sysid options used => "
89 <<
" second one will be ignored!!!!";
92 LOG(warning) <<
"Both no-split-ts and send-ts-per-block options used => "
93 <<
" second one will be ignored!!!!";
96 LOG(debug) <<
"Running in no-split-ts mode!";
100 LOG(warning) <<
"Both send-ts-per-sysid and send-ts-per-block options used => "
101 <<
" second one will be ignored!!!!";
104 LOG(debug) <<
"Running in send-ts-per-block mode!";
107 LOG(debug) <<
"Running in send-ts-per-sysid mode!";
110 LOG(debug) <<
"Running in no-split-ts mode by default!";
115 std::vector<std::string> vSysIdBlockPairs = fConfig->GetValue<std::vector<std::string>>(
"block-sysid");
116 for (uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair) {
117 const size_t sep = vSysIdBlockPairs[uPair].find(
':');
118 if (string::npos == sep || 0 == sep || vSysIdBlockPairs[uPair].
size() == sep) {
119 LOG(info) << vSysIdBlockPairs[uPair];
120 throw InitTaskError(
"Provided pair of Block name + SysId is missing a : or an argument.");
124 std::string sBlockName = vSysIdBlockPairs[uPair].substr(0, sep);
128 std::string sSysId = vSysIdBlockPairs[uPair].substr(sep + 1);
129 const size_t hexPos = sSysId.find(
"0x");
131 if (string::npos == hexPos) usSysId = std::stoi(sSysId);
133 usSysId = std::stoi(sSysId.substr(hexPos + 2),
nullptr, 16);
135 LOG(debug) <<
"Extracted block info from pair \"" << vSysIdBlockPairs[uPair] <<
"\": name is " << sBlockName
136 <<
" and SysId is " << usSysId <<
" extracted from " << sSysId;
139 uint32_t uSysIdIdx = 0;
140 for (; uSysIdIdx <
fSysId.size() &&
fSysId[uSysIdIdx] != usSysId; ++uSysIdIdx) {}
141 if (uSysIdIdx ==
fSysId.size()) {
throw InitTaskError(
"Unknown System ID for " + vSysIdBlockPairs[uPair]); }
143 throw InitTaskError(
"System ID already in use by another block for " + vSysIdBlockPairs[uPair]);
150 if ((*itBlock).first == sBlockName)
break;
154 (*itBlock).second.insert(usSysId);
158 fvBlocksToSend.push_back(std::pair<std::string, std::set<uint16_t>>(sBlockName, {usSysId}));
162 LOG(info) << vSysIdBlockPairs[uPair] <<
" Added SysId 0x" << std::hex << usSysId << std::dec <<
" to "
173 bool isGoodInputCombi {
false};
175 isGoodInputCombi =
true;
179 isGoodInputCombi =
true;
183 isGoodInputCombi =
true;
184 LOG(info) <<
"Host: " <<
fsHost;
185 LOG(info) <<
"Port: " <<
fusPort;
188 isGoodInputCombi =
true;
189 LOG(info) <<
"Host string: " <<
fsHost;
192 isGoodInputCombi =
false;
195 if (!isGoodInputCombi) {
196 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
197 "or host + port are allowed combination.");
205 std::string connector =
fsHost +
":" + std::to_string(
fusPort);
206 LOG(info) <<
"Open TSPublisher at " << connector;
210 std::string connector =
fsHost;
211 LOG(info) <<
"Open TSPublisher with host string: " << connector;
216 std::string fileList {
""};
218 std::string fileName = obj;
219 fileList += fileName;
223 LOG(info) <<
"Input File String: " << fileList;
230 if (
fbNoSplitTs) { LOG(info) <<
"Sending TS copies in no-split mode"; }
232 LOG(info) <<
"Sending components in separate TS per SysId";
235 LOG(info) <<
"Sending components in separate TS per block (multiple SysId)";
240 fTime = std::chrono::steady_clock::now();
243 LOG(error) << e.what();
244 ChangeState(fair::mq::Transition::ErrorFound);
246catch (boost::bad_any_cast& e) {
247 LOG(error) <<
"Error during InitTask: " << e.what();
253 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
254 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
255 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
257 LOG(info) <<
"Suffix added to folders, histograms and canvas names: " <<
fsHistosSuffix;
261 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
263 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
271 "Size of TS; Size [MB]",
274 "Evolution of the TS Size; t [s]; Mean size [MB]",
277 "Evolution of maximal TS Size; t [s]; Max size [MB]",
283 "Missed TS evolution; t [s]",
289 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsRate, sFolder));
290 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSize, sFolder));
291 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSizeEvo, sFolder));
292 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsMaxSizeEvo, sFolder));
293 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTS, sFolder));
294 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTSEvo, sFolder));
341 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
346 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
349 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
355 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
358 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
361 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
365 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
382 std::string reqStr(
static_cast<char*
>(msgReq->GetData()), msgReq->GetSize());
383 if (
"SendFirstTimesliceIndex" == reqStr) {
399 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
408 int iSysId = std::stoi(reqStr);
409 LOG(debug) <<
"Received TS SysId component request from client: 0x" << std::hex << iSysId << std::dec;
417 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
425 LOG(debug) <<
"Received TS components block request from client: " << reqStr;
433 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
444 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
445 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
464 if (0 ==
fulTsCounter &&
nullptr !=
dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)) {
465 dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)->InitTimesliceSubscriber();
468 std::unique_ptr<fles::Timeslice> timeslice =
fSource->get();
472 const fles::Timeslice& ts = *timeslice;
473 uint64_t uTsIndex = ts.index();
480 uint64_t uTsTime = ts.descriptor(0, 0).idx;
485 uint64_t uSizeMb = 0;
487 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
488 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
513 LOG(debug) <<
"Missed Timeslices. Old TS Index was " <<
fulPrevTsIndex <<
" New TS Index is " << uTsIndex
518 std::vector<uint64_t> vulMissedIndices;
521 vulMissedIndices.emplace_back(0);
524 for (uint64_t ulMiss =
fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
525 vulMissedIndices.emplace_back(ulMiss);
531 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
556 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
565 std::this_thread::sleep_for(std::chrono::seconds(10));
566 std::string sCmd =
"EOF ";
584 std::this_thread::sleep_for(std::chrono::seconds(10));
585 std::string sCmd =
"EOF ";
624 std::unique_ptr<fles::Timeslice> timeslice =
GetNewTs();
627 const fles::Timeslice& ts = *timeslice;
628 fles::StorableTimeslice fullTs {ts};
633 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
657 for (uint32_t uCompIdx = 0; uCompIdx <
fdpTimesliceBuffer.front()->num_components(); ++uCompIdx) {
660 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), usMsSysId);
662 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
668 for (uint32_t uSysIdx = 0; uSysIdx <
fComponents.size(); ++uSysIdx) {
669 std::stringstream ss;
670 ss <<
"Found " << std::setw(2) <<
fvvCompPerSysId[uSysIdx].size() <<
" components for SysId 0x" << std::hex
671 << std::setw(2) <<
fSysId[uSysIdx] << std::dec <<
" :";
673 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uSysIdx].size(); ++uComp) {
677 LOG(info) << ss.str();
694 const vector<std::string>::size_type idx =
pos -
fComponents.begin();
698 LOG(error) <<
"Did not find " << sSystemName <<
" in the list of known systems";
709 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
711 const vector<int>::size_type idx =
pos -
fSysId.begin();
715 LOG(error) <<
"Did not find 0x" << std::hex << iSysId << std::dec <<
" in the list of known systems";
722 LOG(debug) <<
"Create timeslice with components for SysId " << std::hex <<
fSysId[uCompIndex] << std::dec;
726 uint32_t uTsIndex = 0;
738 fles::StorableTimeslice component {
static_cast<uint32_t
>(
fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
741 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uCompIndex].size(); ++uComp) {
743 component.append_component(uNumMsInComp);
745 LOG(debug) <<
"Add components to TS for SysId " << std::hex <<
fSysId[uCompIndex] << std::dec <<
" TS "
748 for (
size_t m = 0; m < uNumMsInComp; ++m) {
749 component.append_microslice(uComp, m,
755 LOG(debug) <<
"Prepared timeslice for SysId " << std::hex <<
fSysId[uCompIndex] << std::dec <<
" with "
756 << component.num_components() <<
" components";
758 if (!
SendData(component))
return false;
776 for (
auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys) {
778 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), *itSys);
780 const vector<int>::size_type idxSys =
pos -
fSysId.begin();
783 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[idxSys].size(); ++uComp) {
788 LOG(error) <<
"Error when building the components list for block " << itBlock->first;
789 LOG(error) <<
"Did not find 0x" << std::hex << *itSys << std::dec <<
" in the list of known systems";
809 if ((*itKnownBlock).first == sBlockName) {
813 uint32_t uTsIndex = 0;
825 fles::StorableTimeslice component {
static_cast<uint32_t
>(
fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
828 for (uint32_t uComp = 0; uComp <
fvvCompPerBlock[uBlockIdx].size(); ++uComp) {
830 component.append_component(uNumMsInComp);
832 LOG(debug) <<
"Add components to TS for Block " << sBlockName <<
" TS " <<
fdpTimesliceBuffer[uTsIndex]->index()
835 for (
size_t m = 0; m < uNumMsInComp; ++m) {
836 component.append_microslice(uComp, m,
842 LOG(debug) <<
"Prepared timeslice for Block " << sBlockName <<
" with " << component.num_components()
845 if (!
SendData(component))
return false;
853 LOG(error) <<
"Requested block " << sBlockName <<
" not found in the list of known blocks";
862 std::stringstream oss;
863 boost::archive::binary_oarchive oa(oss);
865 std::string* strMsg =
new std::string(oss.str());
867 FairMQMessagePtr msg(NewMessage(
868 const_cast<char*
>(strMsg->c_str()),
870 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
878 LOG(error) <<
"Problem sending reply with first TS index";
883 LOG(debug) <<
"Send message " <<
fulMessageCounter <<
" with a size of " << msg->GetSize();
890 std::stringstream oss;
891 boost::archive::binary_oarchive oa(oss);
893 std::string* strMsg =
new std::string(oss.str());
895 FairMQMessagePtr msg(NewMessage(
896 const_cast<char*
>(strMsg->c_str()),
898 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
906 LOG(error) <<
"Problem sending data";
911 LOG(debug) <<
"Send message " <<
fulMessageCounter <<
" with a size of " << msg->GetSize();
918 std::stringstream oss;
919 boost::archive::binary_oarchive oa(oss);
921 std::string* strMsg =
new std::string(oss.str());
923 FairMQMessagePtr msg(NewMessage(
924 const_cast<char*
>(strMsg->c_str()),
926 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
944 std::stringstream oss;
945 boost::archive::binary_oarchive oa(oss);
947 std::string* strMsg =
new std::string(oss.str());
949 FairMQMessagePtr msg(NewMessage(
950 const_cast<char*
>(strMsg->c_str()),
952 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
976 FairMQMessagePtr messageHeader(NewMessage());
978 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
980 FairMQParts partsOut;
981 partsOut.AddPart(std::move(messageHeader));
985 FairMQMessagePtr messageHist(NewMessage());
987 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
989 partsOut.AddPart(std::move(messageHist));
995 FairMQMessagePtr messageHist(NewMessage());
996 partsOut.AddPart(std::move(messageHist));
1001 FairMQMessagePtr messageCan(NewMessage());
1003 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
1005 partsOut.AddPart(std::move(messageCan));
1011 FairMQMessagePtr messageHist(NewMessage());
1012 partsOut.AddPart(std::move(messageHist));
1016 FairMQMessagePtr msgHistos(NewMessage());
1018 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
1020 partsOut.AddPart(std::move(msgHistos));
1024 LOG(error) <<
"CbmMQTsSamplerRepReq::SendHistoConfAndData => Problem sending data";
1037 FairMQMessagePtr message(NewMessage());
1039 RootSerializer().Serialize(*message, &
fArrayHisto);
1043 LOG(error) <<
"Problem sending data";
1069 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() -
fTime;
1071 LOG(info) <<
"Runtime: " << run_time.count();
1072 LOG(info) <<
"No more input data";
static constexpr size_t size()
Generates beam ions for transport simulation.
std::chrono::system_clock::time_point fLastPublishTime
std::deque< std::vector< bool > > fdbCompSentFlags
uint64_t fulMaxTimeslices
bool CreateCombinedComponentsPerBlock(std::string sBlockName)
std::string fsChannelNameHistosInput
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::deque< std::unique_ptr< fles::Timeslice > > fdpTimesliceBuffer
Buffering of partially sent timeslices, limited by fulHighWaterMark.
std::string fsChannelNameMissedTs
std::vector< std::vector< uint32_t > > fvvCompPerSysId
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
bool PrepareCompListPerBlock()
std::vector< int > fSysId
bool HandleRequest(FairMQMessagePtr &, int)
std::vector< std::string > fComponents
std::vector< bool > fComponentActive
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::string fsHistosSuffix
virtual ~CbmMQTsSamplerRepReq()
uint64_t fulHighWaterMark
bool PrepareCompListPerSysId()
std::vector< std::vector< uint32_t > > fvvCompPerBlock
uint64_t fulMessageCounter
bool SendHistoConfAndData()
bool fbEofFound
Flag indicating the EOF was reached to avoid sending an emergency STOP.
std::string fsChannelNameCommands
std::unique_ptr< fles::Timeslice > GetNewTs()
double_t fdMinPublishTime
std::vector< std::pair< std::string, std::set< uint16_t > > > fvBlocksToSend
TH1I * fhTsRate
Histograms.
std::string fsChannelNameTsRequest
bool SendData(const fles::StorableTimeslice &component)
double_t fdMaxPublishTime
std::vector< std::string > fvsInputFileList
List of input files.
bool CreateCombinedComponentsPerSysId(std::string sSystemName)
std::chrono::steady_clock::time_point fTime
bool fbListCompPerBlockReady
bool fbListCompPerSysIdReady
fles::TimesliceSource * fSource
bool CreateAndSendFullTs()
bool SendCommand(std::string sCommand)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)