10#include "TimesliceInputArchive.hpp"
11#include "TimesliceMultiInputArchive.hpp"
12#include "TimesliceMultiSubscriber.hpp"
13#include "TimesliceSubscriber.hpp"
15#include "FairMQLogger.h"
16#include "FairMQProgOptions.h"
23#include "BoostSerializer.h"
24#include <boost/algorithm/string.hpp>
25#include <boost/archive/binary_oarchive.hpp>
27#include <boost/regex.hpp>
28#include <boost/serialization/utility.hpp>
30#include "RootSerializer.h"
48 using std::runtime_error::runtime_error;
61 fsFileName = fConfig->GetValue<
string>(
"filename");
62 fsDirName = fConfig->GetValue<
string>(
"dirname");
63 fsHost = fConfig->GetValue<
string>(
"fles-host");
64 fusPort = fConfig->GetValue<uint16_t>(
"fles-port");
68 fbNoSplitTs = fConfig->GetValue<
bool>(
"no-split-ts");
82 LOG(warning) <<
"Both no-split-ts, send-ts-per-sysid and send-ts-per-block options used => "
83 <<
" second and third one will be ignored!!!!";
86 LOG(warning) <<
"Both no-split-ts and send-ts-per-sysid options used => "
87 <<
" second one will be ignored!!!!";
90 LOG(warning) <<
"Both no-split-ts and send-ts-per-block options used => "
91 <<
" second one will be ignored!!!!";
94 LOG(debug) <<
"Running in no-split-ts mode!";
98 LOG(warning) <<
"Both send-ts-per-sysid and send-ts-per-block options used => "
99 <<
" second one will be ignored!!!!";
102 LOG(debug) <<
"Running in send-ts-per-block mode!";
105 LOG(debug) <<
"Running in send-ts-per-sysid mode!";
108 LOG(debug) <<
"Running in no-split-ts mode by default!";
113 std::vector<std::string> vSysIdBlockPairs = fConfig->GetValue<std::vector<std::string>>(
"block-sysid");
114 for (uint32_t uPair = 0; uPair < vSysIdBlockPairs.size(); ++uPair) {
115 const size_t sep = vSysIdBlockPairs[uPair].find(
':');
116 if (string::npos == sep || 0 == sep || vSysIdBlockPairs[uPair].
size() == sep) {
117 LOG(info) << vSysIdBlockPairs[uPair];
118 throw InitTaskError(
"Provided pair of Block name + SysId is missing a : or an argument.");
122 std::string sBlockName = vSysIdBlockPairs[uPair].substr(0, sep);
126 std::string sSysId = vSysIdBlockPairs[uPair].substr(sep + 1);
127 const size_t hexPos = sSysId.find(
"0x");
129 if (string::npos == hexPos) usSysId = std::stoi(sSysId);
131 usSysId = std::stoi(sSysId.substr(hexPos + 2),
nullptr, 16);
133 LOG(debug) <<
"Extracted block info from pair \"" << vSysIdBlockPairs[uPair] <<
"\": name is " << sBlockName
134 <<
" and SysId is " << usSysId <<
" extracted from " << sSysId;
137 uint32_t uSysIdIdx = 0;
138 for (; uSysIdIdx <
fSysId.size() &&
fSysId[uSysIdIdx] != usSysId; ++uSysIdIdx) {}
139 if (uSysIdIdx ==
fSysId.size()) {
throw InitTaskError(
"Unknown System ID for " + vSysIdBlockPairs[uPair]); }
141 throw InitTaskError(
"System ID already in use by another block for " + vSysIdBlockPairs[uPair]);
148 if ((*itBlock).first == sBlockName)
break;
152 (*itBlock).second.insert(usSysId);
156 fvBlocksToSend.push_back(std::pair<std::string, std::set<uint16_t>>(sBlockName, {usSysId}));
160 LOG(info) << vSysIdBlockPairs[uPair] <<
" Added SysId 0x" << std::hex << usSysId << std::dec <<
" to "
171 bool isGoodInputCombi {
false};
173 isGoodInputCombi =
true;
177 isGoodInputCombi =
true;
181 isGoodInputCombi =
true;
182 LOG(info) <<
"Host: " <<
fsHost;
183 LOG(info) <<
"Port: " <<
fusPort;
186 isGoodInputCombi =
true;
187 LOG(info) <<
"Host string: " <<
fsHost;
190 isGoodInputCombi =
false;
193 if (!isGoodInputCombi) {
194 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
195 "or host + port are allowed combination.");
203 std::string connector =
fsHost +
":" + std::to_string(
fusPort);
204 LOG(info) <<
"Open TSPublisher at " << connector;
208 std::string connector =
fsHost;
209 LOG(info) <<
"Open TSPublisher with host string: " << connector;
214 std::string fileList {
""};
216 std::string fileName = obj;
217 fileList += fileName;
221 LOG(info) <<
"Input File String: " << fileList;
228 if (
fbNoSplitTs) { LOG(info) <<
"Sending TS copies in no-split mode"; }
230 LOG(info) <<
"Sending components in separate TS per SysId";
233 LOG(info) <<
"Sending components in separate TS per block (multiple SysId)";
238 fTime = std::chrono::steady_clock::now();
241 LOG(error) << e.what();
242 ChangeState(fair::mq::Transition::ErrorFound);
244catch (boost::bad_any_cast& e) {
245 LOG(error) <<
"Error during InitTask: " << e.what();
251 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
252 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
253 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
255 LOG(info) <<
"Suffix added to folders, histograms and canvas names: " <<
fsHistosSuffix;
259 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
261 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
269 "Size of TS; Size [MB]",
272 "Evolution of the TS Size; t [s]; Mean size [MB]",
275 "Evolution of maximal TS Size; t [s]; Max size [MB]",
281 "Missed TS evolution; t [s]",
287 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsRate, sFolder));
288 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSize, sFolder));
289 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSizeEvo, sFolder));
290 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsMaxSizeEvo, sFolder));
291 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTS, sFolder));
292 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTSEvo, sFolder));
339 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
344 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
347 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
353 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
356 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
359 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
363 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
380 std::string reqStr(
static_cast<char*
>(msgReq->GetData()), msgReq->GetSize());
381 if (
"SendFirstTimesliceIndex" == reqStr) {
397 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
406 int iSysId = std::stoi(reqStr);
407 LOG(debug) <<
"Received TS SysId component request from client: 0x" << std::hex << iSysId << std::dec;
415 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
423 LOG(debug) <<
"Received TS components block request from client: " << reqStr;
431 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
442 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
443 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
462 if (0 ==
fulTsCounter &&
nullptr !=
dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)) {
463 dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)->InitTimesliceSubscriber();
466 std::unique_ptr<fles::Timeslice> timeslice =
fSource->get();
470 const fles::Timeslice& ts = *timeslice;
471 uint64_t uTsIndex = ts.index();
478 uint64_t uTsTime = ts.descriptor(0, 0).idx;
483 uint64_t uSizeMb = 0;
485 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
486 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
511 LOG(debug) <<
"Missed Timeslices. Old TS Index was " <<
fulPrevTsIndex <<
" New TS Index is " << uTsIndex
516 std::vector<uint64_t> vulMissedIndices;
519 vulMissedIndices.emplace_back(0);
522 for (uint64_t ulMiss =
fulPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
523 vulMissedIndices.emplace_back(ulMiss);
529 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
554 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
563 std::this_thread::sleep_for(std::chrono::seconds(10));
564 std::string sCmd =
"EOF ";
582 std::this_thread::sleep_for(std::chrono::seconds(10));
583 std::string sCmd =
"EOF ";
622 std::unique_ptr<fles::Timeslice> timeslice =
GetNewTs();
625 const fles::Timeslice& ts = *timeslice;
626 fles::StorableTimeslice fullTs {ts};
631 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
655 for (uint32_t uCompIdx = 0; uCompIdx <
fdpTimesliceBuffer.front()->num_components(); ++uCompIdx) {
658 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), usMsSysId);
660 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
666 for (uint32_t uSysIdx = 0; uSysIdx <
fComponents.size(); ++uSysIdx) {
667 std::stringstream ss;
668 ss <<
"Found " << std::setw(2) <<
fvvCompPerSysId[uSysIdx].size() <<
" components for SysId 0x" << std::hex
669 << std::setw(2) <<
fSysId[uSysIdx] << std::dec <<
" :";
671 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uSysIdx].size(); ++uComp) {
675 LOG(info) << ss.str();
692 const vector<std::string>::size_type idx =
pos -
fComponents.begin();
696 LOG(error) <<
"Did not find " << sSystemName <<
" in the list of known systems";
707 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
709 const vector<int>::size_type idx =
pos -
fSysId.begin();
713 LOG(error) <<
"Did not find 0x" << std::hex << iSysId << std::dec <<
" in the list of known systems";
720 LOG(debug) <<
"Create timeslice with components for SysId " << std::hex <<
fSysId[uCompIndex] << std::dec;
724 uint32_t uTsIndex = 0;
736 fles::StorableTimeslice component {
static_cast<uint32_t
>(
fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
739 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uCompIndex].size(); ++uComp) {
741 component.append_component(uNumMsInComp);
743 LOG(debug) <<
"Add components to TS for SysId " << std::hex <<
fSysId[uCompIndex] << std::dec <<
" TS "
746 for (
size_t m = 0; m < uNumMsInComp; ++m) {
747 component.append_microslice(uComp, m,
753 LOG(debug) <<
"Prepared timeslice for SysId " << std::hex <<
fSysId[uCompIndex] << std::dec <<
" with "
754 << component.num_components() <<
" components";
756 if (!
SendData(component))
return false;
774 for (
auto itSys = (itBlock->second).begin(); itSys != (itBlock->second).end(); ++itSys) {
776 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), *itSys);
778 const vector<int>::size_type idxSys =
pos -
fSysId.begin();
781 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[idxSys].size(); ++uComp) {
786 LOG(error) <<
"Error when building the components list for block " << itBlock->first;
787 LOG(error) <<
"Did not find 0x" << std::hex << *itSys << std::dec <<
" in the list of known systems";
807 if ((*itKnownBlock).first == sBlockName) {
811 uint32_t uTsIndex = 0;
823 fles::StorableTimeslice component {
static_cast<uint32_t
>(
fdpTimesliceBuffer[uTsIndex]->num_core_microslices()),
826 for (uint32_t uComp = 0; uComp <
fvvCompPerBlock[uBlockIdx].size(); ++uComp) {
828 component.append_component(uNumMsInComp);
830 LOG(debug) <<
"Add components to TS for Block " << sBlockName <<
" TS " <<
fdpTimesliceBuffer[uTsIndex]->index()
833 for (
size_t m = 0; m < uNumMsInComp; ++m) {
834 component.append_microslice(uComp, m,
840 LOG(debug) <<
"Prepared timeslice for Block " << sBlockName <<
" with " << component.num_components()
843 if (!
SendData(component))
return false;
851 LOG(error) <<
"Requested block " << sBlockName <<
" not found in the list of known blocks";
860 std::stringstream oss;
861 boost::archive::binary_oarchive oa(oss);
863 std::string* strMsg =
new std::string(oss.str());
865 FairMQMessagePtr msg(NewMessage(
866 const_cast<char*
>(strMsg->c_str()),
868 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
876 LOG(error) <<
"Problem sending reply with first TS index";
881 LOG(debug) <<
"Send message " <<
fulMessageCounter <<
" with a size of " << msg->GetSize();
888 std::stringstream oss;
889 boost::archive::binary_oarchive oa(oss);
891 std::string* strMsg =
new std::string(oss.str());
893 FairMQMessagePtr msg(NewMessage(
894 const_cast<char*
>(strMsg->c_str()),
896 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
904 LOG(error) <<
"Problem sending data";
909 LOG(debug) <<
"Send message " <<
fulMessageCounter <<
" with a size of " << msg->GetSize();
916 std::stringstream oss;
917 boost::archive::binary_oarchive oa(oss);
919 std::string* strMsg =
new std::string(oss.str());
921 FairMQMessagePtr msg(NewMessage(
922 const_cast<char*
>(strMsg->c_str()),
924 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
942 std::stringstream oss;
943 boost::archive::binary_oarchive oa(oss);
945 std::string* strMsg =
new std::string(oss.str());
947 FairMQMessagePtr msg(NewMessage(
948 const_cast<char*
>(strMsg->c_str()),
950 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
974 FairMQMessagePtr messageHeader(NewMessage());
976 BoostSerializer<std::pair<uint32_t, uint32_t>>().Serialize(*messageHeader, pairHeader);
978 FairMQParts partsOut;
979 partsOut.AddPart(std::move(messageHeader));
983 FairMQMessagePtr messageHist(NewMessage());
985 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist,
fvpsHistosFolder[uHisto]);
987 partsOut.AddPart(std::move(messageHist));
993 FairMQMessagePtr messageHist(NewMessage());
994 partsOut.AddPart(std::move(messageHist));
999 FairMQMessagePtr messageCan(NewMessage());
1001 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan,
fvpsCanvasConfig[uCanv]);
1003 partsOut.AddPart(std::move(messageCan));
1009 FairMQMessagePtr messageHist(NewMessage());
1010 partsOut.AddPart(std::move(messageHist));
1014 FairMQMessagePtr msgHistos(NewMessage());
1016 RootSerializer().Serialize(*msgHistos, &
fArrayHisto);
1018 partsOut.AddPart(std::move(msgHistos));
1022 LOG(error) <<
"CbmMQTsSamplerRepReq::SendHistoConfAndData => Problem sending data";
1035 FairMQMessagePtr message(NewMessage());
1037 RootSerializer().Serialize(*message, &
fArrayHisto);
1041 LOG(error) <<
"Problem sending data";
1067 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() -
fTime;
1069 LOG(info) <<
"Runtime: " << run_time.count();
1070 LOG(info) <<
"No more input data";
static constexpr size_t size()
Generates beam ions for transport simulation.
std::chrono::system_clock::time_point fLastPublishTime
std::deque< std::vector< bool > > fdbCompSentFlags
uint64_t fulMaxTimeslices
bool CreateCombinedComponentsPerBlock(std::string sBlockName)
std::string fsChannelNameHistosInput
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::deque< std::unique_ptr< fles::Timeslice > > fdpTimesliceBuffer
Buffering of partially sent timeslices, limited by fulHighWaterMark.
std::string fsChannelNameMissedTs
std::vector< std::vector< uint32_t > > fvvCompPerSysId
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
bool PrepareCompListPerBlock()
std::vector< int > fSysId
bool HandleRequest(FairMQMessagePtr &, int)
std::vector< std::string > fComponents
std::vector< bool > fComponentActive
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool fbConfigSent
Flag indicating whether the histograms and canvases configurations were already published.
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::string fsHistosSuffix
virtual ~CbmMQTsSamplerRepReq()
uint64_t fulHighWaterMark
bool PrepareCompListPerSysId()
std::vector< std::vector< uint32_t > > fvvCompPerBlock
uint64_t fulMessageCounter
bool SendHistoConfAndData()
bool fbEofFound
Flag indicating the EOF was reached to avoid sending an emergency STOP.
std::string fsChannelNameCommands
std::unique_ptr< fles::Timeslice > GetNewTs()
double_t fdMinPublishTime
std::vector< std::pair< std::string, std::set< uint16_t > > > fvBlocksToSend
TH1I * fhTsRate
Histograms.
std::string fsChannelNameTsRequest
bool SendData(const fles::StorableTimeslice &component)
double_t fdMaxPublishTime
std::vector< std::string > fvsInputFileList
List of input files.
bool CreateCombinedComponentsPerSysId(std::string sSystemName)
std::chrono::steady_clock::time_point fTime
bool fbListCompPerBlockReady
bool fbListCompPerSysIdReady
fles::TimesliceSource * fSource
bool CreateAndSendFullTs()
bool SendCommand(std::string sCommand)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)