CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaSamplerTof.cxx
Go to the documentation of this file.
1/* Copyright (C) 2018-2019 PI-UHd, GSI
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Norbert Herrmann [committer], Florian Uhlig */
4
13#include "CbmMQTsaSamplerTof.h"
14
15#include "CbmMQDefs.h"
16
17#include "TimesliceInputArchive.hpp"
18#include "TimesliceSubscriber.hpp"
19
20#include "FairMQLogger.h"
21#include "FairMQProgOptions.h" // device->fConfig
22
23#include <boost/algorithm/string.hpp>
24#include <boost/archive/binary_oarchive.hpp>
25#include <boost/filesystem.hpp>
26#include <boost/regex.hpp>
27
28namespace filesys = boost::filesystem;
29
30#include <thread> // this_thread::sleep_for
31
32#include <algorithm>
33#include <chrono>
34#include <ctime>
35#include <string>
36
37#include <stdio.h>
38
39using namespace std;
40
41#include <stdexcept>
42
43struct InitTaskError : std::runtime_error {
44 using std::runtime_error::runtime_error;
45};
46
47
49 : FairMQDevice()
50 , fMaxTimeslices(0)
51 , fFileName("")
52 , fDirName("")
53 , fInputFileList()
54 , fFileCounter(0)
55 , fHost("")
56 , fPort(0)
57 , fTSNumber(0)
58 , fTSCounter(0)
59 , fMessageCounter(0)
60 , fSource(nullptr)
61 , fTime()
62{
63}
64
66try {
67 // Get the values from the command line options (via fConfig)
68 fFileName = fConfig->GetValue<string>("filename");
69 fDirName = fConfig->GetValue<string>("dirname");
70 fHost = fConfig->GetValue<string>("flib-host");
71 fPort = fConfig->GetValue<uint64_t>("flib-port");
72 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
73
74 // Check which input is defined
75 // Posibilities
76 // filename && ! dirname : single file
77 // filename with wildcards && diranme : all files with filename regex in the directory
78 // host && port : connect to the flim server
79
80 bool isGoodInputCombi {false};
81 if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
82 isGoodInputCombi = true;
83 // Create a Path object from given path string
84 filesys::path pathObj(fFileName);
85 if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
86 fInputFileList.push_back(fFileName);
87 LOG(info) << "Filename: " << fFileName;
88 }
89 else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
90 isGoodInputCombi = true;
91 filesys::path pathObj = fDirName;
92 if (!filesys::is_directory(pathObj)) { throw InitTaskError("Passed directory name is no valid directory"); }
93 if (fFileName.find("*") == std::string::npos) {
94 // Normal file without wildcards
95 pathObj += fFileName;
96 if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
97 fInputFileList.push_back(pathObj.string());
98 LOG(info) << "Filename: " << fInputFileList[0];
99 }
100 else {
101 std::vector<filesys::path> v;
102
103 // escape "." which have a special meaning in regex
104 // change "*" to ".*" to find any number
105 // e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
106 boost::replace_all(fFileName, ".", "\\.");
107 boost::replace_all(fFileName, "*", ".*");
108
109 // create regex
110 const boost::regex my_filter(fFileName);
111
112 // loop over all files in input directory
113 for (auto&& x : filesys::directory_iterator(pathObj)) {
114 // Skip if not a file
115 if (!boost::filesystem::is_regular_file(x)) continue;
116
117 // Skip if no match
118 // x.path().leaf().string() means get from directory iterator the
119 // current entry as filesys::path, from this extract the leaf
120 // filename or directory name and convert it to a string to be
121 // used in the regex:match
122 boost::smatch what;
123 if (!boost::regex_match(x.path().leaf().string(), what, my_filter)) continue;
124
125 v.push_back(x.path());
126 }
127
128 // sort the files which match the regex in increasing order
129 // (hopefully)
130 std::sort(v.begin(), v.end());
131
132 for (auto&& x : v)
133 fInputFileList.push_back(x.string());
134
135 LOG(info) << "The following files will be used in this order.";
136 for (auto&& x : v)
137 LOG(info) << " " << x;
138 }
139 // throw InitTaskError("Input is a directory");
140 }
141 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
142 isGoodInputCombi = true;
143 LOG(info) << "Host: " << fHost;
144 LOG(info) << "Port: " << fPort;
145 }
146 else {
147 isGoodInputCombi = false;
148 }
149
150 if (!isGoodInputCombi) {
151 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
152 "or host + port are allowed combination.");
153 }
154
155
156 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
157
158 // Get the information about created channels from the device
159 // Check if the defined channels from the topology (by name)
160 // are in the list of channels which are possible/allowed
161 // for the device
162 // The idea is to check at initilization if the devices are
163 // properly connected. For the time beeing this is done with a
164 // nameing convention. It is not avoided that someone sends other
165 // data on this channel.
166 int noChannel = fChannels.size();
167 LOG(info) << "Number of defined output channels: " << noChannel;
168 for (auto const& entry : fChannels) {
169 LOG(info) << "Channel name: " << entry.first;
170 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
171 }
172
173 for (auto const& value : fComponentsToSend) {
174 LOG(info) << "Value : " << value;
175 if (value > 1) {
176 throw InitTaskError("Sending same data to more than one output channel "
177 "not implemented yet.");
178 }
179 }
180
181 if (0 == fFileName.size() && 0 != fHost.size()) {
182 std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
183 LOG(info) << "Open TSPublisher at " << connector;
184 fSource = new fles::TimesliceSubscriber(connector, 1);
185 if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
186 }
187 else {
188 if (false == OpenNextFile()) {
189 throw InitTaskError("Could not open the first input file in the list, Doing nothing!");
190 }
191 }
192
193 fTime = std::chrono::steady_clock::now();
194}
195catch (InitTaskError& e) {
196 LOG(error) << e.what();
197 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
199}
200
202{
203 // First Close and delete existing source
204 if (nullptr != fSource) delete fSource;
205
206 if (fInputFileList.size() > 0) {
208 fInputFileList.erase(fInputFileList.begin());
209 LOG(info) << "Open the Flib input file " << fFileName;
210 filesys::path pathObj(fFileName);
211 if (!filesys::is_regular_file(pathObj)) {
212 LOG(error) << "Input file " << fFileName << " doesn't exist.";
213 return false;
214 }
215 fSource = new fles::TimesliceInputArchive(fFileName);
216 if (!fSource) {
217 LOG(error) << "Could not open input file.";
218 return false;
219 }
220 }
221 else {
222 LOG(info) << "End of files list reached.";
223 return false;
224 }
225 return true;
226}
227
228bool CbmMQTsaSamplerTof::IsChannelNameAllowed(std::string channelName)
229{
230 LOG(info) << "Number of allowed channels: " << fAllowedChannels.size();
231 for (auto const& entry : fAllowedChannels) {
232 LOG(info) << "Inspect " << entry;
233 std::size_t pos1 = channelName.find(entry);
234 if (pos1 != std::string::npos) {
235 const vector<std::string>::const_iterator pos =
236 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
237 const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
238 LOG(info) << "Found " << entry << " in " << channelName;
239 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
240 if (idx < 3) { //FIXME, hardwired constant!!!
241 fComponentsToSend[idx]++;
242 fChannelsToSend[idx].push_back(channelName);
243 }
244 return true;
245 }
246 }
247 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
248 LOG(error) << "Stop device.";
249 return false;
250}
251
252bool CbmMQTsaSamplerTof::IsChannelUp(std::string channelName)
253{
254 for (auto const& entry : fChannels) {
255 LOG(info) << "Inspect " << entry.first;
256 std::size_t pos1 = channelName.find(entry.first);
257 if (pos1 != std::string::npos) {
258 LOG(info) << "Channel name " << channelName << " found in list of defined channel names ";
259 return true;
260 }
261 }
262 LOG(info) << "Channel name " << channelName << " not found in list of defined channel names.";
263 LOG(error) << "Stop device.";
264 return false;
265}
266
268{
269
270 auto timeslice = fSource->get();
271 if (timeslice) {
273 fTSCounter++;
274
275 const fles::Timeslice& ts = *timeslice;
276 auto tsIndex = ts.index();
277
278 if (fTSCounter % 10000 == 0) LOG(info) << "Sample TimeSlice " << fTSCounter << ", Index " << tsIndex;
279
280 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << fTSCounter << ", index "
281 << tsIndex;
282
283
284 CheckTimeslice(ts);
285 /*
286 for (int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
287 CreateAndSendComponent(ts, nrComp);
288 }
289 */
290 // keep components together
291 std::vector<FairMQParts> parts;
292 std::vector<bool> bparts;
293 parts.resize(fComponentsToSend.size());
294 bparts.resize(parts.size());
295 for (uint i = 0; i < bparts.size(); i++)
296 bparts[i] = false;
297 LOG(debug) << "parts with size " << parts.size();
298
299 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
300 // CreateAndCombineComponents(ts, nrComp);
301 LOG(debug) << "nrComp " << nrComp << ", SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
302 const vector<int>::const_iterator pos =
303 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
304 if (pos != fSysId.end()) {
305 const vector<std::string>::size_type idx = pos - fSysId.begin();
306 if (fComponentsToSend[idx] > 0) {
307 LOG(debug) << "Append timeslice component of link " << nrComp << " to idx " << idx;
308
309 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
310 component.append_component(ts.num_microslices(0));
311
312 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
313 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
314 }
315
316 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
317 //if(idx > parts.size()-1) parts.resize(idx+1);
318
319 //if ( !AppendData(component, idx) ) return false;
320 // serialize the timeslice and create the message
321 std::stringstream oss;
322 boost::archive::binary_oarchive oa(oss);
323 oa << component;
324 std::string* strMsg = new std::string(oss.str());
325
326 LOG(debug) << "AddParts to " << idx << ": current size " << parts[idx].Size();
327
328 parts[idx].AddPart(NewMessage(
329 const_cast<char*>(strMsg->c_str()), // data
330 strMsg->length(), // size
331 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
332 strMsg)); // object that manages the data
333
334 bparts[idx] = true;
335 }
336 }
337 }
338
339 for (uint idx = 0; idx < parts.size(); idx++)
340 if (bparts[idx]) {
341 LOG(debug) << "Send parts with size " << parts[idx].Size() << " to channel " << fChannelsToSend[idx][0];
342 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
343 LOG(error) << "Problem sending data";
344 return false;
345 }
346 LOG(debug) << "Sent message " << fMessageCounter << " with a size of " << parts[idx].Size();
348 }
349
350 //if(!SendTs()) return false;
351 return true;
352 }
353 else {
354 LOG(info) << " Number of requested time slices reached, exiting ";
355 if (false == OpenNextFile()) {
356 CalcRuntime();
358 return false;
359 }
360 else {
361 CalcRuntime();
363 return false;
364 }
365 }
366 }
367 else {
368 if (false == OpenNextFile()) {
369 CalcRuntime();
371 return false;
372 }
373 else {
374 return true;
375 }
376 }
377}
378
379bool CbmMQTsaSamplerTof::CreateAndCombineComponents(const fles::Timeslice& /*ts*/, int /*nrComp*/)
380{
381
382 // Check if component has to be send. If the corresponding channel
383 // is connected append it to parts
384 /*
385 LOG(debug) <<"nrComp "<< nrComp<< ", SysID: " << static_cast<int>(ts.descriptor(nrComp,0).sys_id);
386 const vector<int>::const_iterator pos =
387 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp,0).sys_id));
388 if (pos != fSysId.end() ) {
389 const vector<std::string>::size_type idx = pos-fSysId.begin();
390 if (fComponentsToSend[idx]>0) {
391 LOG(debug) << "Append timeslice component of link " << nrComp<< " to idx "<<idx;
392
393 fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
394 component.append_component(ts.num_microslices(0));
395
396 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
397 component.append_microslice( 0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m) );
398 }
399
400 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
401 if(idx > parts.size()-1) parts.resize(idx+1);
402
403 if ( !AppendData(component, idx) ) return false;
404 bparts[idx]=true;
405 return true;
406 }
407 }
408 */
409 return true;
410}
411
412bool CbmMQTsaSamplerTof::AppendData(const fles::StorableTimeslice& /*component*/, int /*idx*/)
413{
414 // serialize the timeslice and create the message
415 /*
416 std::stringstream oss;
417 boost::archive::binary_oarchive oa(oss);
418 oa << component;
419 std::string* strMsg = new std::string(oss.str());
420
421 LOG(debug)<<"AddParts to "<<idx<<": current size "<<parts[idx].Size();
422
423 parts[idx].AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
424 strMsg->length(), // size
425 [](void*, void* object){ delete static_cast<std::string*>(object); },
426 strMsg)); // object that manages the data
427 */
428 return true;
429}
430
432{
433 /*
434 for (int idx=0; idx<parts.size(); idx++)
435 if(bparts[idx]){
436 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
437 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
438 LOG(error) << "Problem sending data";
439 return false;
440 }
441
442 fMessageCounter++;
443 LOG(debug) << "Send message " << fMessageCounter << " with a size of "
444 << parts[idx].Size();
445 }
446 */
447 return true;
448}
449
450bool CbmMQTsaSamplerTof::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
451{
452
453 // Check if component has to be send. If the corresponding channel
454 // is connected create the new timeslice and send it to the
455 // correct channel
456
457 LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
458 const vector<int>::const_iterator pos =
459 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
460 if (pos != fSysId.end()) {
461 const vector<std::string>::size_type idx = pos - fSysId.begin();
462 if (fComponentsToSend[idx] > 0) {
463 LOG(debug) << "Create timeslice component for link " << nrComp;
464
465 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
466 component.append_component(ts.num_microslices(0));
467
468 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
469 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
470 }
471 if (!SendData(component, idx)) return false;
472 return true;
473 }
474 }
475 return true;
476}
477
478bool CbmMQTsaSamplerTof::SendData(const fles::StorableTimeslice& component, int idx)
479{
480 // serialize the timeslice and create the message
481 std::stringstream oss;
482 boost::archive::binary_oarchive oa(oss);
483 oa << component;
484 std::string* strMsg = new std::string(oss.str());
485
486 FairMQMessagePtr msg(NewMessage(
487 const_cast<char*>(strMsg->c_str()), // data
488 strMsg->length(), // size
489 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
490 strMsg)); // object that manages the data
491
492 // TODO: Implement sending same data to more than one channel
493 // Need to create new message (copy message??)
494 if (fComponentsToSend[idx] > 1) { LOG(debug) << "Need to copy FairMessage"; }
495
496 // in case of error or transfer interruption,
497 // return false to go to IDLE state
498 // successfull transfer will return number of bytes
499 // transfered (can be 0 if sending an empty message).
500
501 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
502 if (Send(msg, fChannelsToSend[idx][0]) < 0) {
503 LOG(error) << "Problem sending data";
504 return false;
505 }
506
508 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
509
510 return true;
511}
512
513
515
517{
518 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
519
520 LOG(info) << "Runtime: " << run_time.count();
521 LOG(info) << "No more input data";
522}
523
524
525void CbmMQTsaSamplerTof::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
526{
527 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
528 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
529 LOG(info) << "Equipement ID: " << mdsc.eq_id;
530 LOG(info) << "Flags: " << mdsc.flags;
531 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
532 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
533 LOG(info) << "Microslice Idx: " << mdsc.idx;
534 LOG(info) << "Checksum: " << mdsc.crc;
535 LOG(info) << "Size: " << mdsc.size;
536 LOG(info) << "Offset: " << mdsc.offset;
537}
538
539bool CbmMQTsaSamplerTof::CheckTimeslice(const fles::Timeslice& ts)
540{
541 if (0 == ts.num_components()) {
542 LOG(error) << "No Component in TS " << ts.index();
543 return 1;
544 }
545 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
546
547 for (size_t c = 0; c < ts.num_components(); ++c) {
548 LOG(debug) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
549 LOG(debug) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
550 LOG(debug) << "Component " << c << " has the system id 0x" << std::hex
551 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
552 /*
553 LOG(debug) << "Component " << c << " has the system id 0x"
554 << static_cast<int>(ts.descriptor(c,0).sys_id);
555 */
556 /*
557 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
558 PrintMicroSliceDescriptor(ts.descriptor(c,m));
559 }
560*/
561 }
562
563 return true;
564}
565
567{
568 if (IsChannelUp("syscmd")) {
569 LOG(info) << "stop subscribers in 100 sec";
570 std::this_thread::sleep_for(std::chrono::milliseconds(100000));
571
572 FairMQMessagePtr pub(NewSimpleMessage("STOP"));
573 if (Send(pub, "syscmd") < 0) { LOG(error) << "Sending STOP message failed"; }
574
575 LOG(info) << "task reset subscribers in 1 sec";
576 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
577 FairMQMessagePtr task_reset(NewSimpleMessage("TASK_RESET"));
578
579 if (Send(task_reset, "syscmd") < 0) { LOG(error) << "Sending Task_Reset message failed"; }
580 }
581 // FairMQStateMachine::ChangeState(STOP);
582}
fscal v[fmask::Size]
Definition KfSimdPseudo.h:4
bool IsChannelUp(std::string)
bool AppendData(const fles::StorableTimeslice &, int)
std::vector< int > fComponentsToSend
std::vector< int > fSysId
virtual bool ConditionalRun()
fles::TimesliceSource * fSource
bool CreateAndCombineComponents(const fles::Timeslice &, int)
bool IsChannelNameAllowed(std::string)
std::vector< std::string > fAllowedChannels
bool CreateAndSendComponent(const fles::Timeslice &, int)
bool SendData(const fles::StorableTimeslice &component)
std::vector< std::string > fInputFileList
List of input files.
std::chrono::steady_clock::time_point fTime
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
bool CheckTimeslice(const fles::Timeslice &ts)
std::vector< std::vector< std::string > > fChannelsToSend
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.