17#include "TimesliceInputArchive.hpp"
18#include "TimesliceSubscriber.hpp"
20#include "FairMQLogger.h"
21#include "FairMQProgOptions.h"
23#include <boost/algorithm/string.hpp>
24#include <boost/archive/binary_oarchive.hpp>
25#include <boost/filesystem.hpp>
26#include <boost/regex.hpp>
28namespace filesys = boost::filesystem;
44 using std::runtime_error::runtime_error;
67 fFileName = fConfig->GetValue<
string>(
"filename");
68 fDirName = fConfig->GetValue<
string>(
"dirname");
69 fHost = fConfig->GetValue<
string>(
"flib-host");
70 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
79 bool isGoodInputCombi {
false};
81 isGoodInputCombi =
true;
84 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
89 isGoodInputCombi =
true;
91 if (!filesys::is_directory(pathObj)) {
throw InitTaskError(
"Passed directory name is no valid directory"); }
92 if (
fFileName.find(
"*") == std::string::npos) {
95 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
100 std::vector<filesys::path>
v;
105 boost::replace_all(
fFileName,
".",
"\\.");
106 boost::replace_all(
fFileName,
"*",
".*");
112 for (
auto&&
x : filesys::directory_iterator(pathObj)) {
114 if (!boost::filesystem::is_regular_file(
x))
continue;
122 if (!boost::regex_match(
x.path().leaf().string(), what, my_filter))
continue;
124 v.push_back(
x.path());
129 std::sort(
v.begin(),
v.end());
134 LOG(info) <<
"The following files will be used in this order.";
136 LOG(info) <<
" " <<
x;
140 isGoodInputCombi =
true;
141 LOG(info) <<
"Host: " <<
fHost;
142 LOG(info) <<
"Port: " <<
fPort;
145 isGoodInputCombi =
false;
148 if (!isGoodInputCombi) {
149 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
150 "or host + port are allowed combination.");
164 int noChannel = fChannels.size();
165 LOG(info) <<
"Number of defined output channels: " << noChannel;
166 for (
auto const& entry : fChannels) {
167 LOG(info) <<
"Channel name: " << entry.first;
172 LOG(info) <<
"Value : " << value;
174 throw InitTaskError(
"Sending same data to more than one output channel "
175 "not implemented yet.");
180 std::string connector =
"tcp://" +
fHost +
":" + std::to_string(
fPort);
181 LOG(info) <<
"Open TSPublisher at " << connector;
182 fSource =
new fles::TimesliceSubscriber(connector, 1);
187 throw InitTaskError(
"Could not open the first input file in the list, Doing nothing!");
191 fTime = std::chrono::steady_clock::now();
194 LOG(error) << e.what();
207 LOG(info) <<
"Open the Flib input file " <<
fFileName;
209 if (!filesys::is_regular_file(pathObj)) {
210 LOG(error) <<
"Input file " <<
fFileName <<
" doesn't exist.";
215 LOG(error) <<
"Could not open input file.";
220 LOG(info) <<
"End of files list reached.";
230 std::size_t pos1 = channelName.find(entry);
231 if (pos1 != std::string::npos) {
232 const vector<std::string>::const_iterator
pos =
235 LOG(info) <<
"Found " << entry <<
" in " << channelName;
236 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
242 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
243 LOG(error) <<
"Stop device.";
251 auto timeslice =
fSource->get();
258 const fles::Timeslice& ts = *timeslice;
262 LOG(info) <<
"Found " << ts.num_components() <<
" different components in timeslice";
267 for (
unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
300 LOG(info) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
301 const vector<int>::const_iterator
pos =
302 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
304 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
306 LOG(info) <<
"Create timeslice component for link " << nrComp;
308 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
309 component.append_component(ts.num_microslices(0));
311 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
312 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
314 if (!
SendData(component, idx))
return false;
324 std::stringstream oss;
325 boost::archive::binary_oarchive oa(oss);
327 std::string* strMsg =
new std::string(oss.str());
329 FairMQMessagePtr msg(NewMessage(
330 const_cast<char*
>(strMsg->c_str()),
332 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
346 LOG(error) <<
"Problem sending data";
351 LOG(info) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
361 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() -
fTime;
363 LOG(info) <<
"Runtime: " << run_time.count();
364 LOG(info) <<
"No more input data";
370 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
371 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
372 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
373 LOG(info) <<
"Flags: " << mdsc.flags;
374 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
375 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
376 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
377 LOG(info) <<
"Checksum: " << mdsc.crc;
378 LOG(info) <<
"Size: " << mdsc.size;
379 LOG(info) <<
"Offset: " << mdsc.offset;
384 if (0 == ts.num_components()) {
385 LOG(error) <<
"No Component in TS " << ts.index();
388 LOG(info) <<
"Found " << ts.num_components() <<
" different components in timeslice";
390 for (
size_t c = 0; c < ts.num_components(); ++c) {
391 LOG(info) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
392 LOG(info) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
393 LOG(info) <<
"Component " << c <<
" has the system id 0x" << std::hex
394 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
395 LOG(info) <<
"Component " << c <<
" has the system id 0x" <<
static_cast<int>(ts.descriptor(c, 0).sys_id);
bool CreateAndSendComponent(const fles::Timeslice &, int)
bool SendData(const fles::StorableTimeslice &component)
fles::TimesliceSource * fSource
virtual ~CbmMQTsaSampler()
std::chrono::steady_clock::time_point fTime
std::vector< std::vector< std::string > > fChannelsToSend
bool CheckTimeslice(const fles::Timeslice &ts)
std::vector< std::string > fAllowedChannels
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
std::vector< std::string > fInputFileList
List of input files.
bool IsChannelNameAllowed(std::string)
std::vector< int > fComponentsToSend
virtual bool ConditionalRun()
std::vector< int > fSysId
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)