73 fFileName = fConfig->GetValue<
string>(
"filename");
74 fDirName = fConfig->GetValue<
string>(
"dirname");
75 fHost = fConfig->GetValue<
string>(
"flib-host");
76 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
90 bool isGoodInputCombi {
false};
92 isGoodInputCombi =
true;
96 isGoodInputCombi =
true;
100 isGoodInputCombi =
true;
101 LOG(info) <<
"Host: " <<
fHost;
102 LOG(info) <<
"Port: " <<
fPort;
105 isGoodInputCombi =
true;
106 LOG(info) <<
"Host string: " <<
fHost;
109 isGoodInputCombi =
true;
110 LOG(info) <<
"Host string: " <<
fHost;
113 isGoodInputCombi =
false;
117 if (!isGoodInputCombi) {
118 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
119 "or host + port are allowed combination.");
133 int noChannel = fChannels.size();
134 LOG(info) <<
"Number of defined output channels: " << noChannel;
135 for (
auto const& entry : fChannels) {
136 LOG(info) <<
"Channel name: " << entry.first;
141 LOG(info) <<
"Value : " << value;
143 throw InitTaskError(
"Sending same data to more than one output channel "
144 "not implemented yet.");
152 std::string connector =
fHost +
":" + std::to_string(
fPort);
153 LOG(info) <<
"Open TSPublisher at " << connector;
154 fSource =
new fles::TimesliceMultiSubscriber(connector);
157 std::string connector =
fHost;
158 LOG(info) <<
"Open TSPublisher with host string: " << connector;
163 std::string fileList {
""};
165 std::string fileName = obj;
166 fileList += fileName;
170 LOG(info) <<
"Input File String: " << fileList;
175 fTime = std::chrono::steady_clock::now();
178 LOG(error) << e.what();
187 LOG(info) <<
"Looking for name " << channelName <<
" in " << entry;
188 std::size_t pos1 = channelName.find(entry);
189 if (pos1 != std::string::npos) {
190 const vector<std::string>::const_iterator
pos =
193 LOG(info) <<
"Found " << entry <<
" in " << channelName;
194 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
202 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
203 LOG(error) <<
"Stop device.";
224 auto timeslice =
fSource->get();
229 const fles::Timeslice& ts = *timeslice;
230 auto tsIndex = ts.index();
232 if (
fTSCounter % 10000 == 0) LOG(info) <<
"Sample TimeSlice " <<
fTSCounter <<
", Index " << tsIndex;
234 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice " <<
fTSCounter <<
", index "
245 std::vector<FairMQParts> parts;
246 std::vector<bool> bparts;
248 bparts.resize(parts.size());
249 for (uint i = 0; i < bparts.size(); i++)
255 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
257 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
259 fles::StorableTimeslice tss = fles::StorableTimeslice(ts);
262 std::stringstream oss;
263 boost::archive::binary_oarchive oa(oss);
265 std::string* strMsg =
new std::string(oss.str());
266 LOG(debug) <<
"AddPart " << idx <<
" with length " << strMsg->length();
268 parts[idx].AddPart(NewMessage(
269 const_cast<char*
>(strMsg->c_str()),
271 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
273 LOG(debug) <<
"AddParts to " << idx <<
": current size " << parts[idx].Size();
280 LOG(debug) <<
"parts with size " << parts.size() <<
", #components: " << ts.num_components();
282 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
284 LOG(debug) <<
"nrComp " << nrComp <<
", SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
285 int iSysId =
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
286 if (iSysId == 0x90 || iSysId == 0x91) iSysId = 0x60;
287 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
289 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
291 LOG(debug) <<
"Append timeslice component of link " << nrComp <<
" to idx " << idx;
293 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
294 component.append_component(ts.num_microslices(0));
296 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
297 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
305 std::stringstream oss;
306 boost::archive::binary_oarchive oa(oss);
308 std::string* strMsg =
new std::string(oss.str());
310 LOG(debug) <<
"AddParts to " << idx <<
": current size " << parts[idx].Size();
312 parts[idx].AddPart(NewMessage(
313 const_cast<char*
>(strMsg->c_str()),
315 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
327 for (uint idx = 0; idx < parts.size(); idx++)
329 LOG(debug) <<
"Send parts with size " << parts[idx].Size() <<
" to channel " <<
fChannelsToSend[idx][0];
331 LOG(error) <<
"Problem sending data";
334 LOG(debug) <<
"Sent message " <<
fMessageCounter <<
" with a size of " << parts[idx].Size();
342 LOG(info) <<
" Number of requested time slices reached, exiting ";
348 LOG(info) <<
" No more data, exiting ";
432 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
433 const vector<int>::const_iterator
pos =
434 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
436 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
438 LOG(debug) <<
"Create timeslice component for link " << nrComp;
440 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
441 component.append_component(ts.num_microslices(nrComp));
443 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
444 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
446 if (!
SendData(component, idx))
return false;
456 std::stringstream oss;
457 boost::archive::binary_oarchive oa(oss);
459 std::string* strMsg =
new std::string(oss.str());
461 FairMQMessagePtr msg(NewMessage(
462 const_cast<char*
>(strMsg->c_str()),
464 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
478 LOG(error) <<
"Problem sending data";
483 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
502 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
503 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
504 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
505 LOG(info) <<
"Flags: " << mdsc.flags;
506 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
507 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
508 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
509 LOG(info) <<
"Checksum: " << mdsc.crc;
510 LOG(info) <<
"Size: " << mdsc.size;
511 LOG(info) <<
"Offset: " << mdsc.offset;
516 if (0 == ts.num_components()) {
517 LOG(error) <<
"No Component in TS " << ts.index();
520 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
522 for (
size_t c = 0; c < ts.num_components(); ++c) {
523 LOG(debug) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
524 LOG(debug) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
525 LOG(debug) <<
"Component " << c <<
" has the system id 0x" << std::hex
526 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;