68 fFileName = fConfig->GetValue<
string>(
"filename");
69 fDirName = fConfig->GetValue<
string>(
"dirname");
70 fHost = fConfig->GetValue<
string>(
"flib-host");
71 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
80 bool isGoodInputCombi {
false};
82 isGoodInputCombi =
true;
85 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
90 isGoodInputCombi =
true;
92 if (!filesys::is_directory(pathObj)) {
throw InitTaskError(
"Passed directory name is no valid directory"); }
93 if (
fFileName.find(
"*") == std::string::npos) {
96 if (!filesys::is_regular_file(pathObj)) {
throw InitTaskError(
"Passed file name is no valid file"); }
101 std::vector<filesys::path>
v;
106 boost::replace_all(
fFileName,
".",
"\\.");
107 boost::replace_all(
fFileName,
"*",
".*");
113 for (
auto&&
x : filesys::directory_iterator(pathObj)) {
115 if (!boost::filesystem::is_regular_file(
x))
continue;
123 if (!boost::regex_match(
x.path().leaf().string(), what, my_filter))
continue;
125 v.push_back(
x.path());
130 std::sort(
v.begin(),
v.end());
135 LOG(info) <<
"The following files will be used in this order.";
137 LOG(info) <<
" " <<
x;
142 isGoodInputCombi =
true;
143 LOG(info) <<
"Host: " <<
fHost;
144 LOG(info) <<
"Port: " <<
fPort;
147 isGoodInputCombi =
false;
150 if (!isGoodInputCombi) {
151 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
152 "or host + port are allowed combination.");
166 int noChannel = fChannels.size();
167 LOG(info) <<
"Number of defined output channels: " << noChannel;
168 for (
auto const& entry : fChannels) {
169 LOG(info) <<
"Channel name: " << entry.first;
174 LOG(info) <<
"Value : " << value;
176 throw InitTaskError(
"Sending same data to more than one output channel "
177 "not implemented yet.");
182 std::string connector =
"tcp://" +
fHost +
":" + std::to_string(
fPort);
183 LOG(info) <<
"Open TSPublisher at " << connector;
184 fSource =
new fles::TimesliceSubscriber(connector, 1);
189 throw InitTaskError(
"Could not open the first input file in the list, Doing nothing!");
193 fTime = std::chrono::steady_clock::now();
196 LOG(error) << e.what();
232 LOG(info) <<
"Inspect " << entry;
233 std::size_t pos1 = channelName.find(entry);
234 if (pos1 != std::string::npos) {
235 const vector<std::string>::const_iterator
pos =
238 LOG(info) <<
"Found " << entry <<
" in " << channelName;
239 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
247 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
248 LOG(error) <<
"Stop device.";
270 auto timeslice =
fSource->get();
275 const fles::Timeslice& ts = *timeslice;
276 auto tsIndex = ts.index();
278 if (
fTSCounter % 10000 == 0) LOG(info) <<
"Sample TimeSlice " <<
fTSCounter <<
", Index " << tsIndex;
280 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice " <<
fTSCounter <<
", index "
291 std::vector<FairMQParts> parts;
292 std::vector<bool> bparts;
294 bparts.resize(parts.size());
295 for (uint i = 0; i < bparts.size(); i++)
297 LOG(debug) <<
"parts with size " << parts.size();
299 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
301 LOG(debug) <<
"nrComp " << nrComp <<
", SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
302 const vector<int>::const_iterator
pos =
303 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
305 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
307 LOG(debug) <<
"Append timeslice component of link " << nrComp <<
" to idx " << idx;
309 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
310 component.append_component(ts.num_microslices(0));
312 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
313 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
321 std::stringstream oss;
322 boost::archive::binary_oarchive oa(oss);
324 std::string* strMsg =
new std::string(oss.str());
326 LOG(debug) <<
"AddParts to " << idx <<
": current size " << parts[idx].Size();
328 parts[idx].AddPart(NewMessage(
329 const_cast<char*
>(strMsg->c_str()),
331 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
339 for (uint idx = 0; idx < parts.size(); idx++)
341 LOG(debug) <<
"Send parts with size " << parts[idx].Size() <<
" to channel " <<
fChannelsToSend[idx][0];
343 LOG(error) <<
"Problem sending data";
346 LOG(debug) <<
"Sent message " <<
fMessageCounter <<
" with a size of " << parts[idx].Size();
354 LOG(info) <<
" Number of requested time slices reached, exiting ";
457 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
458 const vector<int>::const_iterator
pos =
459 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
461 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
463 LOG(debug) <<
"Create timeslice component for link " << nrComp;
465 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
466 component.append_component(ts.num_microslices(0));
468 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
469 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
471 if (!
SendData(component, idx))
return false;
481 std::stringstream oss;
482 boost::archive::binary_oarchive oa(oss);
484 std::string* strMsg =
new std::string(oss.str());
486 FairMQMessagePtr msg(NewMessage(
487 const_cast<char*
>(strMsg->c_str()),
489 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
503 LOG(error) <<
"Problem sending data";
508 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
527 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
528 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
529 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
530 LOG(info) <<
"Flags: " << mdsc.flags;
531 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
532 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
533 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
534 LOG(info) <<
"Checksum: " << mdsc.crc;
535 LOG(info) <<
"Size: " << mdsc.size;
536 LOG(info) <<
"Offset: " << mdsc.offset;
541 if (0 == ts.num_components()) {
542 LOG(error) <<
"No Component in TS " << ts.index();
545 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
547 for (
size_t c = 0; c < ts.num_components(); ++c) {
548 LOG(debug) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
549 LOG(debug) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
550 LOG(debug) <<
"Component " << c <<
" has the system id 0x" << std::hex
551 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;