66 fFileName = fConfig->GetValue<
string>(
"filename");
67 fDirName = fConfig->GetValue<
string>(
"dirname");
68 fHost = fConfig->GetValue<
string>(
"flib-host");
69 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
78 bool isGoodInputCombi {
false};
80 isGoodInputCombi =
true;
83 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
88 isGoodInputCombi =
true;
90 if (!filesys::is_directory(pathObj)) {
throw InitTaskError(
"Passed directory name is no valid directory"); }
91 if (
fFileName.find(
"*") == std::string::npos) {
94 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
99 std::vector<filesys::path>
v;
104 boost::replace_all(
fFileName,
".",
"\\.");
105 boost::replace_all(
fFileName,
"*",
".*");
111 for (
auto&&
x : filesys::directory_iterator(pathObj)) {
113 if (!boost::filesystem::is_regular_file(
x))
continue;
121 if (!boost::regex_match(
x.path().string(), what, my_filter))
continue;
123 v.push_back(
x.path());
128 std::sort(
v.begin(),
v.end());
133 LOG(info) <<
"The following files will be used in this order.";
135 LOG(info) <<
" " <<
x;
140 isGoodInputCombi =
true;
141 LOG(info) <<
"Host: " <<
fHost;
142 LOG(info) <<
"Port: " <<
fPort;
145 isGoodInputCombi =
false;
148 if (!isGoodInputCombi) {
149 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
150 "or host + port are allowed combination.");
164 int noChannel = fChannels.size();
165 LOG(info) <<
"Number of defined output channels: " << noChannel;
166 for (
auto const& entry : fChannels) {
167 LOG(info) <<
"Channel name: " << entry.first;
172 LOG(info) <<
"Value : " << value;
174 throw InitTaskError(
"Sending same data to more than one output channel "
175 "not implemented yet.");
180 std::string connector =
"tcp://" +
fHost +
":" + std::to_string(
fPort);
181 LOG(info) <<
"Open TSPublisher at " << connector;
182 fSource =
new fles::TimesliceSubscriber(connector, 1);
187 throw InitTaskError(
"Could not open the first input file in the list, Doing nothing!");
191 fTime = std::chrono::steady_clock::now();
194 LOG(error) << e.what();
230 LOG(info) <<
"Inspect " << entry;
231 std::size_t pos1 = channelName.find(entry);
232 if (pos1 != std::string::npos) {
233 const vector<std::string>::const_iterator
pos =
236 LOG(info) <<
"Found " << entry <<
" in " << channelName;
237 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
245 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
246 LOG(error) <<
"Stop device.";
268 auto timeslice =
fSource->get();
273 const fles::Timeslice& ts = *timeslice;
274 auto tsIndex = ts.index();
276 if (
fTSCounter % 10000 == 0) LOG(info) <<
"Sample TimeSlice " <<
fTSCounter <<
", Index " << tsIndex;
278 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice " <<
fTSCounter <<
", index "
289 std::vector<FairMQParts> parts;
290 std::vector<bool> bparts;
292 bparts.resize(parts.size());
293 for (uint i = 0; i < bparts.size(); i++)
295 LOG(debug) <<
"parts with size " << parts.size();
297 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
299 LOG(debug) <<
"nrComp " << nrComp <<
", SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
300 const vector<int>::const_iterator
pos =
301 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
303 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
305 LOG(debug) <<
"Append timeslice component of link " << nrComp <<
" to idx " << idx;
307 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
308 component.append_component(ts.num_microslices(0));
310 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
311 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
319 std::stringstream oss;
320 boost::archive::binary_oarchive oa(oss);
322 std::string* strMsg =
new std::string(oss.str());
324 LOG(debug) <<
"AddParts to " << idx <<
": current size " << parts[idx].Size();
326 parts[idx].AddPart(NewMessage(
327 const_cast<char*
>(strMsg->c_str()),
329 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
337 for (uint idx = 0; idx < parts.size(); idx++)
339 LOG(debug) <<
"Send parts with size " << parts[idx].Size() <<
" to channel " <<
fChannelsToSend[idx][0];
341 LOG(error) <<
"Problem sending data";
344 LOG(debug) <<
"Sent message " <<
fMessageCounter <<
" with a size of " << parts[idx].Size();
352 LOG(info) <<
" Number of requested time slices reached, exiting ";
455 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
456 const vector<int>::const_iterator
pos =
457 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
459 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
461 LOG(debug) <<
"Create timeslice component for link " << nrComp;
463 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
464 component.append_component(ts.num_microslices(0));
466 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
467 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
469 if (!
SendData(component, idx))
return false;
479 std::stringstream oss;
480 boost::archive::binary_oarchive oa(oss);
482 std::string* strMsg =
new std::string(oss.str());
484 FairMQMessagePtr msg(NewMessage(
485 const_cast<char*
>(strMsg->c_str()),
487 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
501 LOG(error) <<
"Problem sending data";
506 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
525 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
526 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
527 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
528 LOG(info) <<
"Flags: " << mdsc.flags;
529 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
530 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
531 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
532 LOG(info) <<
"Checksum: " << mdsc.crc;
533 LOG(info) <<
"Size: " << mdsc.size;
534 LOG(info) <<
"Offset: " << mdsc.offset;
539 if (0 == ts.num_components()) {
540 LOG(error) <<
"No Component in TS " << ts.index();
543 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
545 for (
size_t c = 0; c < ts.num_components(); ++c) {
546 LOG(debug) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
547 LOG(debug) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
548 LOG(debug) <<
"Component " << c <<
" has the system id 0x" << std::hex
549 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;