17#include "TimesliceInputArchive.hpp"
18#include "TimesliceSubscriber.hpp"
20#include "FairMQLogger.h"
21#include "FairMQProgOptions.h"
23#include <boost/algorithm/string.hpp>
24#include <boost/archive/binary_oarchive.hpp>
25#include <boost/filesystem.hpp>
26#include <boost/regex.hpp>
28namespace filesys = boost::filesystem;
42 using std::runtime_error::runtime_error;
65 fFileName = fConfig->GetValue<
string>(
"filename");
66 fDirName = fConfig->GetValue<
string>(
"dirname");
67 fHost = fConfig->GetValue<
string>(
"flib-host");
68 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
77 bool isGoodInputCombi {
false};
79 isGoodInputCombi =
true;
82 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
87 isGoodInputCombi =
true;
89 if (!filesys::is_directory(pathObj)) {
throw InitTaskError(
"Passed directory name is no valid directory"); }
90 if (
fFileName.find(
"*") == std::string::npos) {
93 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
98 std::vector<filesys::path>
v;
103 boost::replace_all(
fFileName,
".",
"\\.");
104 boost::replace_all(
fFileName,
"*",
".*");
110 for (
auto&&
x : filesys::directory_iterator(pathObj)) {
112 if (!boost::filesystem::is_regular_file(
x))
continue;
120 if (!boost::regex_match(
x.path().string(), what, my_filter))
continue;
122 v.push_back(
x.path());
127 std::sort(
v.begin(),
v.end());
132 LOG(info) <<
"The following files will be used in this order.";
134 LOG(info) <<
" " <<
x;
138 isGoodInputCombi =
true;
139 LOG(info) <<
"Host: " <<
fHost;
140 LOG(info) <<
"Port: " <<
fPort;
143 isGoodInputCombi =
false;
146 if (!isGoodInputCombi) {
147 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
148 "or host + port are allowed combination.");
162 int noChannel = fChannels.size();
163 LOG(info) <<
"Number of defined output channels: " << noChannel;
164 for (
auto const& entry : fChannels) {
165 LOG(info) <<
"Channel name: " << entry.first;
170 LOG(info) <<
"Value : " << value;
172 throw InitTaskError(
"Sending same data to more than one output channel "
173 "not implemented yet.");
178 std::string connector =
"tcp://" +
fHost +
":" + std::to_string(
fPort);
179 LOG(info) <<
"Open TSPublisher at " << connector;
180 fSource =
new fles::TimesliceSubscriber(connector, 1);
185 throw InitTaskError(
"Could not open the first input file in the list, Doing nothing!");
189 fTime = std::chrono::steady_clock::now();
192 LOG(error) << e.what();
205 LOG(info) <<
"Open the Flib input file " <<
fFileName;
207 if (!filesys::is_regular_file(pathObj)) {
208 LOG(error) <<
"Input file " <<
fFileName <<
" doesn't exist.";
213 LOG(error) <<
"Could not open input file.";
218 LOG(info) <<
"End of files list reached.";
228 std::size_t pos1 = channelName.find(entry);
229 if (pos1 != std::string::npos) {
230 const vector<std::string>::const_iterator
pos =
233 LOG(info) <<
"Found " << entry <<
" in " << channelName;
234 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
240 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
241 LOG(error) <<
"Stop device.";
249 auto timeslice =
fSource->get();
256 const fles::Timeslice& ts = *timeslice;
260 LOG(info) <<
"Found " << ts.num_components() <<
" different components in timeslice";
265 for (
unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
298 LOG(info) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
299 const vector<int>::const_iterator
pos =
300 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
302 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
304 LOG(info) <<
"Create timeslice component for link " << nrComp;
306 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
307 component.append_component(ts.num_microslices(0));
309 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
310 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
312 if (!
SendData(component, idx))
return false;
322 std::stringstream oss;
323 boost::archive::binary_oarchive oa(oss);
325 std::string* strMsg =
new std::string(oss.str());
327 FairMQMessagePtr msg(NewMessage(
328 const_cast<char*
>(strMsg->c_str()),
330 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
344 LOG(error) <<
"Problem sending data";
349 LOG(info) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
359 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() -
fTime;
361 LOG(info) <<
"Runtime: " << run_time.count();
362 LOG(info) <<
"No more input data";
368 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
369 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
370 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
371 LOG(info) <<
"Flags: " << mdsc.flags;
372 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
373 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
374 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
375 LOG(info) <<
"Checksum: " << mdsc.crc;
376 LOG(info) <<
"Size: " << mdsc.size;
377 LOG(info) <<
"Offset: " << mdsc.offset;
382 if (0 == ts.num_components()) {
383 LOG(error) <<
"No Component in TS " << ts.index();
386 LOG(info) <<
"Found " << ts.num_components() <<
" different components in timeslice";
388 for (
size_t c = 0; c < ts.num_components(); ++c) {
389 LOG(info) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
390 LOG(info) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
391 LOG(info) <<
"Component " << c <<
" has the system id 0x" << std::hex
392 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
393 LOG(info) <<
"Component " << c <<
" has the system id 0x" <<
static_cast<int>(ts.descriptor(c, 0).sys_id);
bool CreateAndSendComponent(const fles::Timeslice &, int)
bool SendData(const fles::StorableTimeslice &component)
fles::TimesliceSource * fSource
virtual ~CbmMQTsaSampler()
std::chrono::steady_clock::time_point fTime
std::vector< std::vector< std::string > > fChannelsToSend
bool CheckTimeslice(const fles::Timeslice &ts)
std::vector< std::string > fAllowedChannels
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
std::vector< std::string > fInputFileList
List of input files.
bool IsChannelNameAllowed(std::string)
std::vector< int > fComponentsToSend
virtual bool ConditionalRun()
std::vector< int > fSysId
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)