18#include "TimesliceInputArchive.hpp"
19#include "TimesliceMultiInputArchive.hpp"
20#include "TimesliceMultiSubscriber.hpp"
21#include "TimesliceSubscriber.hpp"
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h"
31#include "BoostSerializer.h"
32#include <boost/algorithm/string.hpp>
33#include <boost/archive/binary_oarchive.hpp>
34#include <boost/filesystem.hpp>
35#include <boost/regex.hpp>
36#include <boost/serialization/utility.hpp>
38#include "RootSerializer.h"
40namespace filesys = boost::filesystem;
58 using std::runtime_error::runtime_error;
75 , fLastPublishTime {
std::chrono::system_clock::now()}
82 fFileName = fConfig->GetValue<
string>(
"filename");
83 fDirName = fConfig->GetValue<
string>(
"dirname");
84 fHost = fConfig->GetValue<
string>(
"flib-host");
85 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
88 fbNoSplitTs = fConfig->GetValue<
bool>(
"no-split-ts");
103 LOG(warning) <<
"Both no-split-ts, send-ts-per-sysid and "
104 "send-ts-per-channel options used => "
105 <<
" second and third one will be ignored!!!!";
108 LOG(warning) <<
"Both no-split-ts and send-ts-per-sysid options used => "
109 <<
" second one will be ignored!!!!";
112 LOG(warning) <<
"Both no-split-ts and send-ts-per-channel options used => "
113 <<
" second one will be ignored!!!!";
117 LOG(warning) <<
"Both send-ts-per-sysid and send-ts-per-channel options used => "
118 <<
" second one will be ignored!!!!";
122 std::vector<std::string> vSysIdChanPairs = fConfig->GetValue<std::vector<std::string>>(
"sysid-chan");
123 for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) {
124 const size_t sep = vSysIdChanPairs[uPair].find(
':');
125 if (string::npos == sep || 0 == sep || vSysIdChanPairs[uPair].
size() == sep) {
126 LOG(info) << vSysIdChanPairs[uPair];
127 throw InitTaskError(
"Provided pair of SysId + Channel name is missing a : or an argument.");
131 std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep);
132 const size_t hexPos = sSysId.find(
"0x");
134 if (string::npos == hexPos) iSysId = std::stoi(sSysId);
136 iSysId = std::stoi(sSysId.substr(hexPos + 2),
nullptr, 16);
139 std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1);
142 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
145 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
154 LOG(info) << vSysIdChanPairs[uPair] <<
" " << iSysId <<
" " << sChannelName;
165 bool isGoodInputCombi {
false};
167 isGoodInputCombi =
true;
171 isGoodInputCombi =
true;
175 isGoodInputCombi =
true;
176 LOG(info) <<
"Host: " <<
fHost;
177 LOG(info) <<
"Port: " <<
fPort;
180 isGoodInputCombi =
true;
181 LOG(info) <<
"Host string: " <<
fHost;
184 isGoodInputCombi =
false;
188 if (!isGoodInputCombi) {
189 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
190 "or host + port are allowed combination.");
204 int noChannel = fChannels.size();
205 LOG(info) <<
"Number of defined output channels: " << noChannel;
206 for (
auto const& entry : fChannels) {
216 LOG(info) <<
"Channel name: " << entry.first;
221 LOG(info) <<
"Value : " << value;
223 throw InitTaskError(
"Sending same data to more than one output channel "
224 "not implemented yet.");
232 std::string connector =
fHost +
":" + std::to_string(
fPort);
233 LOG(info) <<
"Open TSPublisher at " << connector;
234 fSource =
new fles::TimesliceMultiSubscriber(connector);
237 std::string connector =
fHost;
238 LOG(info) <<
"Open TSPublisher with host string: " << connector;
243 std::string fileList {
""};
245 std::string fileName = obj;
246 fileList += fileName;
250 LOG(info) <<
"Input File String: " << fileList;
257 if (
fbNoSplitTs) { LOG(info) <<
"Sending TS copies in no-split mode"; }
259 LOG(info) <<
"Sending components in separate TS per SysId";
262 LOG(info) <<
"Sending components in separate TS per channel";
265 fTime = std::chrono::steady_clock::now();
268 LOG(error) << e.what();
269 ChangeState(fair::mq::Transition::ErrorFound);
281 bool bFoundMatch =
false;
285 LOG(info) <<
"Looking for name " << channelName <<
" in " << entry;
286 std::size_t pos1 = channelName.find(entry);
287 if (pos1 != std::string::npos) {
293 LOG(info) <<
"Found " << entry <<
" in " << channelName;
294 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx
295 <<
" (SysId 0x" << std::hex <<
fSysId[idx] << std::dec <<
")";
308 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
309 LOG(error) <<
"Stop device.";
315 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
316 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
317 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
320 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
322 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
325 fhTsRate =
new TH1I(
"TsRate",
"TS rate; t [s]", 1800, 0., 1800.);
326 fhTsSize =
new TH1I(
"TsSize",
"Size of TS; Size [MB]", 15000, 0., 15000.);
327 fhTsSizeEvo =
new TProfile(
"TsSizeEvo",
"Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.);
328 fhTsMaxSizeEvo =
new TH1F(
"TsMaxSizeEvo",
"Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.);
329 fhMissedTS =
new TH1I(
"Missed_TS",
"Missed TS", 2, -0.5, 1.5);
330 fhMissedTSEvo =
new TProfile(
"Missed_TS_Evo",
"Missed TS evolution; t [s]", 1800, 0., 1800.);
333 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsRate,
"Sampler"));
334 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSize,
"Sampler"));
335 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSizeEvo,
"Sampler"));
336 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsMaxSizeEvo,
"Sampler"));
337 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTS,
"Sampler"));
338 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTSEvo,
"Sampler"));
343 fcSummary =
new TCanvas(
"cSampSummary",
"Sampler monitoring plots", w,
h);
379 vCanvases.push_back(std::pair<TCanvas*, std::string>(
fcSummary,
"canvases"));
385 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
390 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
394 FairMQMessagePtr messageHist(NewMessage());
396 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);
400 LOG(fatal) <<
"Problem sending histo config";
403 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
409 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
412 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
415 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
420 FairMQMessagePtr messageCan(NewMessage());
422 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);
426 LOG(fatal) <<
"Problem sending canvas config";
429 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
440 if (0 ==
fTSCounter &&
nullptr !=
dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)) {
441 dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)->InitTimesliceSubscriber();
444 auto timeslice =
fSource->get();
449 const fles::Timeslice& ts = *timeslice;
450 uint64_t uTsIndex = ts.index();
453 uint64_t uTsTime = ts.descriptor(0, 0).idx;
456 uint64_t uSizeMb = 0;
458 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
459 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
484 LOG(info) <<
"Missed Timeslices. Old TS Index was " <<
fuPrevTsIndex <<
" New TS Index is " << uTsIndex
489 std::vector<uint64_t> vulMissedIndices;
490 for (uint64_t ulMiss =
fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
491 vulMissedIndices.emplace_back(ulMiss);
497 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
519 if (
fTSCounter % 10000 == 0) { LOG(info) <<
"Received TS " <<
fTSCounter <<
" with index " << uTsIndex; }
521 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
533 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
547 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
561 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
569 for (
unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
574 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
587 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
588 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
604 std::this_thread::sleep_for(std::chrono::seconds(10));
605 std::string sCmd =
"EOF ";
621 std::this_thread::sleep_for(std::chrono::seconds(10));
622 std::string sCmd =
"EOF ";
640 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
641 const vector<int>::const_iterator
pos =
642 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
644 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
646 LOG(debug) <<
"Create timeslice component for link " << nrComp;
648 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
649 component.append_component(ts.num_microslices(0));
651 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
652 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
659 if (!
SendData(component, idx))
return false;
670 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
671 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
673 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), usSysId);
675 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
682 std::stringstream ss;
683 ss <<
"Found " << std::setw(2) <<
fvvCompPerSysId[uSysIdx].size() <<
" components for SysId 0x" << std::hex
684 << std::setw(2) <<
fSysId[uSysIdx] << std::dec <<
" :";
686 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uSysIdx].size(); ++uComp) {
690 LOG(info) << ss.str();
699 LOG(debug) <<
"Create timeslice with components for SysId " << std::hex <<
fSysId[uSysIdx] << std::dec;
702 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
704 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uSysIdx].size(); ++uComp) {
705 uint32_t uNumMsInComp = ts.num_microslices(
fvvCompPerSysId[uSysIdx][uComp]);
706 component.append_component(uNumMsInComp);
708 LOG(debug) <<
"Add components to TS for SysId " << std::hex <<
fSysId[uSysIdx] << std::dec <<
" TS "
711 for (
size_t m = 0; m < uNumMsInComp; ++m) {
712 component.append_microslice(uComp, m, ts.descriptor(
fvvCompPerSysId[uSysIdx][uComp], m),
717 LOG(debug) <<
"Prepared timeslice for SysId " << std::hex <<
fSysId[uSysIdx] << std::dec <<
" with "
718 << component.num_components() <<
" components";
720 if (!
SendData(component, uSysIdx))
return false;
736 for (uint32_t uChan = 0; uChan <
fChannelsToSend[uSysIdx].size(); ++uChan) {
737 const vector<std::string>::const_iterator
pos =
752 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
753 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
755 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), usSysId);
757 const vector<std::string>::size_type idxSys =
pos -
fSysId.begin();
760 for (uint32_t uChan = 0; uChan <
fChannelsToSend[idxSys].size(); ++uChan) {
761 const vector<std::string>::const_iterator posCh =
764 const vector<std::string>::size_type idxChan = posCh -
fvChannelsToSend.begin();
772 for (uint32_t uChanIdx = 0; uChanIdx <
fvChannelsToSend.size(); ++uChanIdx) {
773 std::stringstream ss;
774 ss <<
"Found " << std::setw(2) <<
fvvCompPerChannel[uChanIdx].size() <<
" components for channel "
781 LOG(info) << ss.str();
791 for (uint32_t uChanIdx = 0; uChanIdx <
fvChannelsToSend.size(); ++uChanIdx) {
792 LOG(debug) <<
"Create timeslice with components for channel " <<
fvChannelsToSend[uChanIdx];
795 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
799 component.append_component(uNumMsInComp);
801 LOG(debug) <<
"Add components to TS for SysId " << std::hex
802 <<
static_cast<uint16_t
>(ts.descriptor(
fvvCompPerChannel[uChanIdx][uComp], 0).sys_id) << std::dec
805 for (
size_t m = 0; m < uNumMsInComp; ++m) {
806 component.append_microslice(uComp, m, ts.descriptor(
fvvCompPerChannel[uChanIdx][uComp], m),
811 LOG(debug) <<
"Prepared timeslice for channel " <<
fvChannelsToSend[uChanIdx] <<
" with "
812 << component.num_components() <<
" components";
824 for (uint32_t uChanIdx = 0; uChanIdx <
fChannelsToSend.size(); ++uChanIdx) {
826 LOG(debug) <<
"Copy timeslice component for channel " <<
fChannelsToSend[uChanIdx][0];
828 fles::StorableTimeslice fullTs {ts};
829 if (!
SendData(fullTs, uChanIdx))
return false;
838 std::stringstream oss;
839 boost::archive::binary_oarchive oa(oss);
841 std::string* strMsg =
new std::string(oss.str());
843 FairMQMessagePtr msg(NewMessage(
844 const_cast<char*
>(strMsg->c_str()),
846 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
860 LOG(error) <<
"Problem sending data";
865 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
873 std::stringstream oss;
874 boost::archive::binary_oarchive oa(oss);
876 std::string* strMsg =
new std::string(oss.str());
878 FairMQMessagePtr msg(NewMessage(
879 const_cast<char*
>(strMsg->c_str()),
881 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
888 LOG(debug) <<
"Send data to channel " << sChannel;
889 if (Send(msg, sChannel) < 0) {
890 LOG(error) <<
"Problem sending data";
895 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
902 std::stringstream oss;
903 boost::archive::binary_oarchive oa(oss);
905 std::string* strMsg =
new std::string(oss.str());
907 FairMQMessagePtr msg(NewMessage(
908 const_cast<char*
>(strMsg->c_str()),
910 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
928 std::stringstream oss;
929 boost::archive::binary_oarchive oa(oss);
931 std::string* strMsg =
new std::string(oss.str());
933 FairMQMessagePtr msg(NewMessage(
934 const_cast<char*
>(strMsg->c_str()),
936 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
960 FairMQMessagePtr message(NewMessage());
962 RootSerializer().Serialize(*message, &
fArrayHisto);
966 LOG(error) <<
"Problem sending data";
993 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() -
fTime;
995 LOG(info) <<
"Runtime: " << run_time.count();
996 LOG(info) <<
"No more input data";
1002 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
1003 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
1004 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
1005 LOG(info) <<
"Flags: " << mdsc.flags;
1006 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
1007 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
1008 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
1009 LOG(info) <<
"Checksum: " << mdsc.crc;
1010 LOG(info) <<
"Size: " << mdsc.size;
1011 LOG(info) <<
"Offset: " << mdsc.offset;
1016 if (0 == ts.num_components()) {
1017 LOG(error) <<
"No Component in TS " << ts.index();
1020 LOG(info) <<
"Found " << ts.num_components() <<
" different components in timeslice";
1022 for (
size_t c = 0; c < ts.num_components(); ++c) {
1023 LOG(info) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
1024 LOG(info) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
1025 LOG(info) <<
"Component " << c <<
" has the system id 0x" << std::hex
1026 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
1027 LOG(info) <<
"Component " << c <<
" has the system id 0x" <<
static_cast<int>(ts.descriptor(c, 0).sys_id);
static constexpr size_t size()
Generates beam ions for transport simulation.
std::string fsChannelNameCanvasConfig
bool fbListCompPerChannelReady
std::vector< std::string > fAllowedChannels
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::chrono::steady_clock::time_point fTime
virtual ~CbmMQTsaMultiSampler()
bool SendData(const fles::StorableTimeslice &, int)
std::vector< std::vector< uint32_t > > fvvCompPerChannel
std::string fsChannelNameHistosConfig
std::vector< std::vector< uint32_t > > fvvCompPerSysId
std::vector< std::string > fInputFileList
List of input files.
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
bool CreateAndCombineComponentsPerChannel(const fles::Timeslice &)
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
bool IsChannelNameAllowed(std::string)
virtual bool ConditionalRun()
bool CheckTimeslice(const fles::Timeslice &ts)
TH1I * fhTsRate
Histograms.
bool fbListCompPerSysIdReady
std::vector< std::vector< std::string > > fChannelsToSend
std::vector< int > fComponentsToSend
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool CreateAndSendFullTs(const fles::Timeslice &)
double_t fdMinPublishTime
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::chrono::system_clock::time_point fLastPublishTime
fles::TimesliceSource * fSource
std::vector< std::string > fvChannelsToSend
bool CreateAndCombineComponentsPerSysId(const fles::Timeslice &)
std::string fsChannelNameCommands
std::string fsChannelNameMissedTs
std::vector< int > fSysId
bool CreateAndSendComponent(const fles::Timeslice &, int)
std::string fsChannelNameHistosInput
bool SendCommand(std::string sCommand)
double_t fdMaxPublishTime