71 fFileName = fConfig->GetValue<
string>(
"filename");
72 fDirName = fConfig->GetValue<
string>(
"dirname");
73 fHost = fConfig->GetValue<
string>(
"flib-host");
74 fPort = fConfig->GetValue<uint64_t>(
"flib-port");
88 bool isGoodInputCombi {
false};
90 isGoodInputCombi =
true;
94 isGoodInputCombi =
true;
98 isGoodInputCombi =
true;
99 LOG(info) <<
"Host: " <<
fHost;
100 LOG(info) <<
"Port: " <<
fPort;
103 isGoodInputCombi =
true;
104 LOG(info) <<
"Host string: " <<
fHost;
107 isGoodInputCombi =
true;
108 LOG(info) <<
"Host string: " <<
fHost;
111 isGoodInputCombi =
false;
115 if (!isGoodInputCombi) {
116 throw InitTaskError(
"Wrong combination of inputs. Either file or wildcard file + directory "
117 "or host + port are allowed combination.");
131 int noChannel = fChannels.size();
132 LOG(info) <<
"Number of defined output channels: " << noChannel;
133 for (
auto const& entry : fChannels) {
134 LOG(info) <<
"Channel name: " << entry.first;
139 LOG(info) <<
"Value : " << value;
141 throw InitTaskError(
"Sending same data to more than one output channel "
142 "not implemented yet.");
150 std::string connector =
fHost +
":" + std::to_string(
fPort);
151 LOG(info) <<
"Open TSPublisher at " << connector;
152 fSource =
new fles::TimesliceMultiSubscriber(connector);
155 std::string connector =
fHost;
156 LOG(info) <<
"Open TSPublisher with host string: " << connector;
161 std::string fileList {
""};
163 std::string fileName = obj;
164 fileList += fileName;
168 LOG(info) <<
"Input File String: " << fileList;
173 fTime = std::chrono::steady_clock::now();
176 LOG(error) << e.what();
185 LOG(info) <<
"Looking for name " << channelName <<
" in " << entry;
186 std::size_t pos1 = channelName.find(entry);
187 if (pos1 != std::string::npos) {
188 const vector<std::string>::const_iterator
pos =
191 LOG(info) <<
"Found " << entry <<
" in " << channelName;
192 LOG(info) <<
"Channel name " << channelName <<
" found in list of allowed channel names at position " << idx;
200 LOG(info) <<
"Channel name " << channelName <<
" not found in list of allowed channel names.";
201 LOG(error) <<
"Stop device.";
222 auto timeslice =
fSource->get();
227 const fles::Timeslice& ts = *timeslice;
228 auto tsIndex = ts.index();
230 if (
fTSCounter % 10000 == 0) LOG(info) <<
"Sample TimeSlice " <<
fTSCounter <<
", Index " << tsIndex;
232 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice " <<
fTSCounter <<
", index "
243 std::vector<FairMQParts> parts;
244 std::vector<bool> bparts;
246 bparts.resize(parts.size());
247 for (uint i = 0; i < bparts.size(); i++)
253 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
255 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
257 fles::StorableTimeslice tss = fles::StorableTimeslice(ts);
260 std::stringstream oss;
261 boost::archive::binary_oarchive oa(oss);
263 std::string* strMsg =
new std::string(oss.str());
264 LOG(debug) <<
"AddPart " << idx <<
" with length " << strMsg->length();
266 parts[idx].AddPart(NewMessage(
267 const_cast<char*
>(strMsg->c_str()),
269 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
271 LOG(debug) <<
"AddParts to " << idx <<
": current size " << parts[idx].Size();
278 LOG(debug) <<
"parts with size " << parts.size() <<
", #components: " << ts.num_components();
280 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
282 LOG(debug) <<
"nrComp " << nrComp <<
", SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
283 int iSysId =
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
284 if (iSysId == 0x90 || iSysId == 0x91) iSysId = 0x60;
285 const vector<int>::const_iterator
pos = std::find(
fSysId.begin(),
fSysId.end(), iSysId);
287 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
289 LOG(debug) <<
"Append timeslice component of link " << nrComp <<
" to idx " << idx;
291 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
292 component.append_component(ts.num_microslices(0));
294 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
295 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
303 std::stringstream oss;
304 boost::archive::binary_oarchive oa(oss);
306 std::string* strMsg =
new std::string(oss.str());
308 LOG(debug) <<
"AddParts to " << idx <<
": current size " << parts[idx].Size();
310 parts[idx].AddPart(NewMessage(
311 const_cast<char*
>(strMsg->c_str()),
313 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
325 for (uint idx = 0; idx < parts.size(); idx++)
327 LOG(debug) <<
"Send parts with size " << parts[idx].Size() <<
" to channel " <<
fChannelsToSend[idx][0];
329 LOG(error) <<
"Problem sending data";
332 LOG(debug) <<
"Sent message " <<
fMessageCounter <<
" with a size of " << parts[idx].Size();
340 LOG(info) <<
" Number of requested time slices reached, exiting ";
346 LOG(info) <<
" No more data, exiting ";
430 LOG(debug) <<
"SysID: " <<
static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
431 const vector<int>::const_iterator
pos =
432 std::find(
fSysId.begin(),
fSysId.end(),
static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
434 const vector<std::string>::size_type idx =
pos -
fSysId.begin();
436 LOG(debug) <<
"Create timeslice component for link " << nrComp;
438 fles::StorableTimeslice component {
static_cast<uint32_t
>(ts.num_core_microslices()), ts.index()};
439 component.append_component(ts.num_microslices(nrComp));
441 for (
size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
442 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
444 if (!
SendData(component, idx))
return false;
454 std::stringstream oss;
455 boost::archive::binary_oarchive oa(oss);
457 std::string* strMsg =
new std::string(oss.str());
459 FairMQMessagePtr msg(NewMessage(
460 const_cast<char*
>(strMsg->c_str()),
462 [](
void* ,
void*
object) { delete static_cast<std::string*>(object); },
476 LOG(error) <<
"Problem sending data";
481 LOG(debug) <<
"Send message " <<
fMessageCounter <<
" with a size of " << msg->GetSize();
500 LOG(info) <<
"Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
501 LOG(info) <<
"Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
502 LOG(info) <<
"Equipement ID: " << mdsc.eq_id;
503 LOG(info) <<
"Flags: " << mdsc.flags;
504 LOG(info) <<
"Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
505 LOG(info) <<
"Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
506 LOG(info) <<
"Microslice Idx: " << mdsc.idx;
507 LOG(info) <<
"Checksum: " << mdsc.crc;
508 LOG(info) <<
"Size: " << mdsc.size;
509 LOG(info) <<
"Offset: " << mdsc.offset;
514 if (0 == ts.num_components()) {
515 LOG(error) <<
"No Component in TS " << ts.index();
518 LOG(debug) <<
"Found " << ts.num_components() <<
" different components in timeslice";
520 for (
size_t c = 0; c < ts.num_components(); ++c) {
521 LOG(debug) <<
"Found " << ts.num_microslices(c) <<
" microslices in component " << c;
522 LOG(debug) <<
"Component " << c <<
" has a size of " << ts.size_component(c) <<
" bytes";
523 LOG(debug) <<
"Component " << c <<
" has the system id 0x" << std::hex
524 <<
static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;