18#include "TimesliceInputArchive.hpp"
19#include "TimesliceMultiInputArchive.hpp"
20#include "TimesliceMultiSubscriber.hpp"
21#include "TimesliceSubscriber.hpp"
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h"
31#include "BoostSerializer.h"
32#include <boost/algorithm/string.hpp>
33#include <boost/archive/binary_oarchive.hpp>
34#include <boost/filesystem.hpp>
35#include <boost/regex.hpp>
36#include <boost/serialization/utility.hpp>
38#include "RootSerializer.h"
40namespace filesys = boost::filesystem;
56 using std::runtime_error::runtime_error;
80 fFileName = fConfig->GetValue<
string>(
"filename");
81 fDirName = fConfig->GetValue<
string>(
"dirname");
82 fHost = fConfig->GetValue<
string>(
"flib-host");
83 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
86 fbNoSplitTs = fConfig->GetValue<
bool>(
"no-split-ts");
101 LOG(warning) <<
"Both no-split-ts, send-ts-per-sysid and "
102 "send-ts-per-channel options used => "
103 <<
" second and third one will be ignored!!!!";
106 LOG(warning) <<
"Both no-split-ts and send-ts-per-sysid options used => "
107 <<
" second one will be ignored!!!!";
110 LOG(warning) <<
"Both no-split-ts and send-ts-per-channel options used => "
111 <<
" second one will be ignored!!!!";
115 LOG(warning) <<
"Both send-ts-per-sysid and send-ts-per-channel options used => "
116 <<
" second one will be ignored!!!!";
120 std::vector<std::string> vSysIdChanPairs = fConfig->GetValue<std::vector<std::string>>(
"sysid-chan");
121 for (uint32_t uPair = 0; uPair < vSysIdChanPairs.size(); ++uPair) {
122 const size_t sep = vSysIdChanPairs[uPair].find(
':');
123 if (string::npos == sep || 0 == sep || vSysIdChanPairs[uPair].
size() == sep) {
124 LOG(info) << vSysIdChanPairs[uPair];
125 throw InitTaskError(
"Provided pair of SysId + Channel name is missing a : or an argument.");
129 std::string sSysId = vSysIdChanPairs[uPair].substr(0, sep);
130 const size_t hexPos = sSysId.find(
"0x");
132 if (string::npos == hexPos) iSysId = std::stoi(sSysId);
134 iSysId = std::stoi(sSysId.substr(hexPos + 2),
nullptr, 16);
137 std::string sChannelName = vSysIdChanPairs[uPair].substr(sep + 1);
140 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
143 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
152 LOG(info) << vSysIdChanPairs[uPair] <<
" " << iSysId <<
" " << sChannelName;
163 bool isGoodInputCombi {
false};
165 isGoodInputCombi =
true;
169 isGoodInputCombi =
true;
173 isGoodInputCombi =
true;
174 LOG(info) <<
"Host: " <<
fHost;
175 LOG(info) <<
"Port: " <<
fPort;
178 isGoodInputCombi =
true;
179 LOG(info) <<
"Host string: " <<
fHost;
182 isGoodInputCombi =
false;
186 if (!isGoodInputCombi) {
187 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
188 "or host + port are allowed combination.");
202 int noChannel = fChannels.size();
203 LOG(info) <<
"Number of defined output channels: " << noChannel;
204 for (
auto const& entry : fChannels) {
214 LOG(info) <<
"Channel name: " << entry.first;
219 LOG(info) <<
"Value : " << value;
221 throw InitTaskError(
"Sending same data to more than one output channel "
222 "not implemented yet.");
230 std::string connector =
fHost +
":" + std::to_string(
fPort);
231 LOG(info) <<
"Open TSPublisher at " << connector;
232 fSource =
new fles::TimesliceMultiSubscriber(connector);
235 std::string connector =
fHost;
236 LOG(info) <<
"Open TSPublisher with host string: " << connector;
241 std::string fileList {
""};
243 std::string fileName = obj;
244 fileList += fileName;
248 LOG(info) <<
"Input File String: " << fileList;
255 if (
fbNoSplitTs) { LOG(info) <<
"Sending TS copies in no-split mode"; }
257 LOG(info) <<
"Sending components in separate TS per SysId";
260 LOG(info) <<
"Sending components in separate TS per channel";
263 fTime = std::chrono::steady_clock::now();
266 LOG(error) << e.what();
267 ChangeState(fair::mq::Transition::ErrorFound);
279 bool bFoundMatch =
false;
283 LOG(info) <<
"Looking for name " << channelName <<
" in " << entry;
284 std::size_t pos1 = channelName.find(entry);
285 if (pos1 != std::string::npos) {
291 LOG(info) <<
"Found " << entry <<
" in " << channelName;
292 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx
293 <<
" (SysId 0x" << std::hex <<
fSysId[idx] << std::dec <<
")";
306 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
307 LOG(error) <<
"Stop device.";
313 LOG(info) <<
"Histograms publication frequency in TS: " <<
fuPublishFreqTs;
314 LOG(info) <<
"Histograms publication min. interval in s: " <<
fdMinPublishTime;
315 LOG(info) <<
"Histograms publication max. interval in s: " <<
fdMaxPublishTime;
318 std::vector<std::pair<TNamed*, std::string>> vHistos = {};
320 std::vector<std::pair<TCanvas*, std::string>> vCanvases = {};
323 fhTsRate =
new TH1I(
"TsRate",
"TS rate; t [s]", 1800, 0., 1800.);
324 fhTsSize =
new TH1I(
"TsSize",
"Size of TS; Size [MB]", 15000, 0., 15000.);
325 fhTsSizeEvo =
new TProfile(
"TsSizeEvo",
"Evolution of the TS Size; t [s]; Mean size [MB]", 1800, 0., 1800.);
326 fhTsMaxSizeEvo =
new TH1F(
"TsMaxSizeEvo",
"Evolution of maximal TS Size; t [s]; Max size [MB]", 1800, 0., 1800.);
327 fhMissedTS =
new TH1I(
"Missed_TS",
"Missed TS", 2, -0.5, 1.5);
328 fhMissedTSEvo =
new TProfile(
"Missed_TS_Evo",
"Missed TS evolution; t [s]", 1800, 0., 1800.);
331 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsRate,
"Sampler"));
332 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSize,
"Sampler"));
333 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsSizeEvo,
"Sampler"));
334 vHistos.push_back(std::pair<TNamed*, std::string>(
fhTsMaxSizeEvo,
"Sampler"));
335 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTS,
"Sampler"));
336 vHistos.push_back(std::pair<TNamed*, std::string>(
fhMissedTSEvo,
"Sampler"));
341 fcSummary =
new TCanvas(
"cSampSummary",
"Sampler monitoring plots", w,
h);
377 vCanvases.push_back(std::pair<TCanvas*, std::string>(
fcSummary,
"canvases"));
383 for (UInt_t uHisto = 0; uHisto < vHistos.size(); ++uHisto) {
388 std::pair<std::string, std::string> psHistoConfig(vHistos[uHisto].
first->GetName(), vHistos[uHisto].second);
392 FairMQMessagePtr messageHist(NewMessage());
394 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageHist, psHistoConfig);
398 LOG(fatal) <<
"Problem sending histo config";
401 LOG(info) <<
"Config of hist " << psHistoConfig.first.data() <<
" in folder " << psHistoConfig.second.data();
407 for (UInt_t uCanv = 0; uCanv < vCanvases.size(); ++uCanv) {
410 std::string sCanvName = (vCanvases[uCanv].first)->GetName();
413 std::pair<std::string, std::string> psCanvConfig(sCanvName, sCanvConf);
418 FairMQMessagePtr messageCan(NewMessage());
420 BoostSerializer<std::pair<std::string, std::string>>().Serialize(*messageCan, psCanvConfig);
424 LOG(fatal) <<
"Problem sending canvas config";
427 LOG(info) <<
"Config string of Canvas " << psCanvConfig.first.data() <<
" is " << psCanvConfig.second.data();
438 if (0 ==
fTSCounter &&
nullptr !=
dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)) {
439 dynamic_cast<fles::TimesliceMultiSubscriber*
>(
fSource)->InitTimesliceSubscriber();
442 auto timeslice =
fSource->get();
447 const fles::Timeslice& ts = *timeslice;
448 uint64_t uTsIndex = ts.index();
451 uint64_t uTsTime = ts.descriptor(0, 0).idx;
454 uint64_t uSizeMb = 0;
456 for (uint64_t uComp = 0; uComp < ts.num_components(); ++uComp) {
457 uSizeMb += ts.size_component(uComp) / (1024 * 1024);
482 LOG(info) <<
"Missed Timeslices. Old TS Index was " <<
fuPrevTsIndex <<
" New TS Index is " << uTsIndex
487 std::vector<uint64_t> vulMissedIndices;
488 for (uint64_t ulMiss =
fuPrevTsIndex + 1; ulMiss < uTsIndex; ++ulMiss) {
489 vulMissedIndices.emplace_back(ulMiss);
495 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
517 if (
fTSCounter % 10000 == 0) { LOG(info) <<
"Received TS " <<
fTSCounter <<
" with index " << uTsIndex; }
519 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
531 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
545 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
559 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
567 for (
unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
572 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
585 std::chrono::system_clock::time_point currentTime = std::chrono::system_clock::now();
586 std::chrono::duration<double_t> elapsedSeconds = currentTime -
fLastPublishTime;
602 std::this_thread::sleep_for(std::chrono::seconds(10));
603 std::string sCmd =
"EOF ";
619 std::this_thread::sleep_for(std::chrono::seconds(10));
620 std::string sCmd =
"EOF ";
638 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
639 const vector<int>::const_iterator
pos =
640 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
642 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
644 LOG(debug) <<
"Create timeslice component for link " << nrComp;
646 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
647 component.append_component(ts.num_microslices(0));
649 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
650 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
657 if (!
SendData(component, idx))
return false;
668 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
669 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
671 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), usSysId);
673 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
680 std::stringstream ss;
681 ss <<
"Found " << std::setw(2) <<
fvvCompPerSysId[uSysIdx].size() <<
" components for SysId 0x" << std::hex
682 << std::setw(2) <<
fSysId[uSysIdx] << std::dec <<
" :";
684 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uSysIdx].size(); ++uComp) {
688 LOG(info) << ss.str();
697 LOG(debug) <<
"Create timeslice with components for SysId " << std::hex <<
fSysId[uSysIdx] << std::dec;
700 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
702 for (uint32_t uComp = 0; uComp <
fvvCompPerSysId[uSysIdx].size(); ++uComp) {
703 uint32_t uNumMsInComp = ts.num_microslices(
fvvCompPerSysId[uSysIdx][uComp]);
704 component.append_component(uNumMsInComp);
706 LOG(debug) <<
"Add components to TS for SysId " << std::hex <<
fSysId[uSysIdx] << std::dec <<
" TS "
709 for (
size_t m = 0; m < uNumMsInComp; ++m) {
710 component.append_microslice(uComp, m, ts.descriptor(
fvvCompPerSysId[uSysIdx][uComp], m),
715 LOG(debug) <<
"Prepared timeslice for SysId " << std::hex <<
fSysId[uSysIdx] << std::dec <<
" with "
716 << component.num_components() <<
" components";
718 if (!
SendData(component, uSysIdx))
return false;
734 for (uint32_t uChan = 0; uChan <
fChannelsToSend[uSysIdx].size(); ++uChan) {
735 const vector<std::string>::const_iterator
pos =
750 for (uint32_t uCompIdx = 0; uCompIdx < ts.num_components(); ++uCompIdx) {
751 uint16_t usSysId = ts.descriptor(uCompIdx, 0).sys_id;
753 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), usSysId);
755 const vector<std::string>::size_type idxSys =
pos -
fSysId.begin();
758 for (uint32_t uChan = 0; uChan <
fChannelsToSend[idxSys].size(); ++uChan) {
759 const vector<std::string>::const_iterator posCh =
762 const vector<std::string>::size_type idxChan = posCh -
fvChannelsToSend.begin();
770 for (uint32_t uChanIdx = 0; uChanIdx <
fvChannelsToSend.size(); ++uChanIdx) {
771 std::stringstream ss;
772 ss <<
"Found " << std::setw(2) <<
fvvCompPerChannel[uChanIdx].size() <<
" components for channel "
779 LOG(info) << ss.str();
789 for (uint32_t uChanIdx = 0; uChanIdx <
fvChannelsToSend.size(); ++uChanIdx) {
790 LOG(debug) <<
"Create timeslice with components for channel " <<
fvChannelsToSend[uChanIdx];
793 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
797 component.append_component(uNumMsInComp);
799 LOG(debug) <<
"Add components to TS for SysId " << std::hex
800 <<
static_cast<uint16_t
>(ts.descriptor(
fvvCompPerChannel[uChanIdx][uComp], 0).sys_id) << std::dec
803 for (
size_t m = 0; m < uNumMsInComp; ++m) {
804 component.append_microslice(uComp, m, ts.descriptor(
fvvCompPerChannel[uChanIdx][uComp], m),
809 LOG(debug) <<
"Prepared timeslice for channel " <<
fvChannelsToSend[uChanIdx] <<
" with "
810 << component.num_components() <<
" components";
822 for (uint32_t uChanIdx = 0; uChanIdx <
fChannelsToSend.size(); ++uChanIdx) {
824 LOG(debug) <<
"Copy timeslice component for channel " <<
fChannelsToSend[uChanIdx][0];
826 fles::StorableTimeslice fullTs {ts};
827 if (!
SendData(fullTs, uChanIdx))
return false;
836 std::stringstream oss;
837 boost::archive::binary_oarchive oa(oss);
839 std::string* strMsg =
new std::string(oss.str());
841 FairMQMessagePtr msg(NewMessage(
842 const_cast<char*
>(strMsg->c_str()),
844 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
858 LOG(error) <<
"Problem sending data";
863 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
871 std::stringstream oss;
872 boost::archive::binary_oarchive oa(oss);
874 std::string* strMsg =
new std::string(oss.str());
876 FairMQMessagePtr msg(NewMessage(
877 const_cast<char*
>(strMsg->c_str()),
879 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
886 LOG(debug) <<
"Send data to channel " << sChannel;
887 if (Send(msg, sChannel) < 0) {
888 LOG(error) <<
"Problem sending data";
893 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
900 std::stringstream oss;
901 boost::archive::binary_oarchive oa(oss);
903 std::string* strMsg =
new std::string(oss.str());
905 FairMQMessagePtr msg(NewMessage(
906 const_cast<char*
>(strMsg->c_str()),
908 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
926 std::stringstream oss;
927 boost::archive::binary_oarchive oa(oss);
929 std::string* strMsg =
new std::string(oss.str());
931 FairMQMessagePtr msg(NewMessage(
932 const_cast<char*
>(strMsg->c_str()),
934 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
958 FairMQMessagePtr message(NewMessage());
960 RootSerializer().Serialize(*message, &
fArrayHisto);
964 LOG(error) <<
"Problem sending data";
991 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() -
fTime;
993 LOG(info) <<
"Runtime: " << run_time.count();
994 LOG(info) <<
"No more input data";
1000 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
1001 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
1002 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
1003 LOG(info) <<
"Flags: " << mdsc.flags;
1004 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
1005 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
1006 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
1007 LOG(info) <<
"Checksum: " << mdsc.crc;
1008 LOG(info) <<
"Size: " << mdsc.size;
1009 LOG(info) <<
"Offset: " << mdsc.offset;
1014 if (0 == ts.num_components()) {
1015 LOG(error) <<
"No Component in TS " << ts.index();
1018 LOG(info) <<
"Found " << ts.num_components() <<
" different components in timeslice";
1020 for (
size_t c = 0; c < ts.num_components(); ++c) {
1021 LOG(info) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
1022 LOG(info) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
1023 LOG(info) <<
"Component " << c <<
" has the system id 0x" << std::hex
1024 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
1025 LOG(info) <<
"Component " << c <<
" has the system id 0x" <<
static_cast<int>(ts.descriptor(c, 0).sys_id);
static constexpr size_t size()
Generates beam ions for transport simulation.
std::string fsChannelNameCanvasConfig
bool fbListCompPerChannelReady
std::vector< std::string > fAllowedChannels
std::vector< std::pair< std::string, std::string > > fvpsHistosFolder
Vector of string pairs with ( HistoName, FolderPath ) to send to the histogram server.
std::chrono::steady_clock::time_point fTime
virtual ~CbmMQTsaMultiSampler()
bool SendData(const fles::StorableTimeslice &, int)
std::vector< std::vector< uint32_t > > fvvCompPerChannel
std::string fsChannelNameHistosConfig
std::vector< std::vector< uint32_t > > fvvCompPerSysId
std::vector< std::string > fInputFileList
List of input files.
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
bool CreateAndCombineComponentsPerChannel(const fles::Timeslice &)
bool SendMissedTsIdx(std::vector< uint64_t > vIndices)
bool IsChannelNameAllowed(std::string)
virtual bool ConditionalRun()
bool CheckTimeslice(const fles::Timeslice &ts)
TH1I * fhTsRate
Histograms.
bool fbListCompPerSysIdReady
std::vector< std::vector< std::string > > fChannelsToSend
std::vector< int > fComponentsToSend
std::vector< std::pair< std::string, std::string > > fvpsCanvasConfig
bool CreateAndSendFullTs(const fles::Timeslice &)
double_t fdMinPublishTime
TObjArray fArrayHisto
Array of histograms to send to the histogram server.
std::chrono::system_clock::time_point fLastPublishTime
fles::TimesliceSource * fSource
std::vector< std::string > fvChannelsToSend
bool CreateAndCombineComponentsPerSysId(const fles::Timeslice &)
std::string fsChannelNameCommands
std::string fsChannelNameMissedTs
std::vector< int > fSysId
bool CreateAndSendComponent(const fles::Timeslice &, int)
std::string fsChannelNameHistosInput
bool SendCommand(std::string sCommand)
double_t fdMaxPublishTime