CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaSamplerTof.cxx
Go to the documentation of this file.
1/* Copyright (C) 2018-2019 PI-UHd, GSI
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Norbert Herrmann [committer], Florian Uhlig */
4
11
12
13#include "CbmMQTsaSamplerTof.h"
14
15#include "CbmMQDefs.h"
16
17#include "TimesliceInputArchive.hpp"
18#include "TimesliceSubscriber.hpp"
19
20#include "FairMQLogger.h"
21#include "FairMQProgOptions.h" // device->fConfig
22
23#include <boost/algorithm/string.hpp>
24#include <boost/archive/binary_oarchive.hpp>
25#include <boost/filesystem.hpp>
26#include <boost/regex.hpp>
27
28namespace filesys = boost::filesystem;
29
30#include <algorithm>
31#include <chrono>
32#include <cstdio>
33#include <ctime>
34#include <string>
35#include <thread> // this_thread::sleep_for
36
37using namespace std;
38
39#include <stdexcept>
40
41struct InitTaskError : std::runtime_error {
42 using std::runtime_error::runtime_error;
43};
44
45
47 : FairMQDevice()
49 , fFileName("")
50 , fDirName("")
52 , fFileCounter(0)
53 , fHost("")
54 , fPort(0)
55 , fTSNumber(0)
56 , fTSCounter(0)
58 , fSource(nullptr)
59 , fTime()
60{
61}
62
64try {
65 // Get the values from the command line options (via fConfig)
66 fFileName = fConfig->GetValue<string>("filename");
67 fDirName = fConfig->GetValue<string>("dirname");
68 fHost = fConfig->GetValue<string>("flib-host");
69 fPort = fConfig->GetValue<uint64_t>("flib-port");
70 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
71
72 // Check which input is defined
73 // Posibilities
74 // filename && ! dirname : single file
75 // filename with wildcards && diranme : all files with filename regex in the directory
76 // host && port : connect to the flim server
77
78 bool isGoodInputCombi {false};
79 if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
80 isGoodInputCombi = true;
81 // Create a Path object from given path string
82 filesys::path pathObj(fFileName);
83 if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
84 fInputFileList.push_back(fFileName);
85 LOG(info) << "Filename: " << fFileName;
86 }
87 else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
88 isGoodInputCombi = true;
89 filesys::path pathObj = fDirName;
90 if (!filesys::is_directory(pathObj)) { throw InitTaskError("Passed directory name is no valid directory"); }
91 if (fFileName.find("*") == std::string::npos) {
92 // Normal file without wildcards
93 pathObj += fFileName;
94 if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
95 fInputFileList.push_back(pathObj.string());
96 LOG(info) << "Filename: " << fInputFileList[0];
97 }
98 else {
99 std::vector<filesys::path> v;
100
101 // escape "." which have a special meaning in regex
102 // change "*" to ".*" to find any number
103 // e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
104 boost::replace_all(fFileName, ".", "\\.");
105 boost::replace_all(fFileName, "*", ".*");
106
107 // create regex
108 const boost::regex my_filter(fFileName);
109
110 // loop over all files in input directory
111 for (auto&& x : filesys::directory_iterator(pathObj)) {
112 // Skip if not a file
113 if (!boost::filesystem::is_regular_file(x)) continue;
114
115 // Skip if no match
116 // x.path().leaf().string() means get from directory iterator the
117 // current entry as filesys::path, from this extract the leaf
118 // filename or directory name and convert it to a string to be
119 // used in the regex:match
120 boost::smatch what;
121 if (!boost::regex_match(x.path().string(), what, my_filter)) continue;
122
123 v.push_back(x.path());
124 }
125
126 // sort the files which match the regex in increasing order
127 // (hopefully)
128 std::sort(v.begin(), v.end());
129
130 for (auto&& x : v)
131 fInputFileList.push_back(x.string());
132
133 LOG(info) << "The following files will be used in this order.";
134 for (auto&& x : v)
135 LOG(info) << " " << x;
136 }
137 // throw InitTaskError("Input is a directory");
138 }
139 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
140 isGoodInputCombi = true;
141 LOG(info) << "Host: " << fHost;
142 LOG(info) << "Port: " << fPort;
143 }
144 else {
145 isGoodInputCombi = false;
146 }
147
148 if (!isGoodInputCombi) {
149 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
150 "or host + port are allowed combination.");
151 }
152
153
154 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
155
156 // Get the information about created channels from the device
157 // Check if the defined channels from the topology (by name)
158 // are in the list of channels which are possible/allowed
159 // for the device
160 // The idea is to check at initilization if the devices are
161 // properly connected. For the time beeing this is done with a
162 // nameing convention. It is not avoided that someone sends other
163 // data on this channel.
164 int noChannel = fChannels.size();
165 LOG(info) << "Number of defined output channels: " << noChannel;
166 for (auto const& entry : fChannels) {
167 LOG(info) << "Channel name: " << entry.first;
168 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
169 }
170
171 for (auto const& value : fComponentsToSend) {
172 LOG(info) << "Value : " << value;
173 if (value > 1) {
174 throw InitTaskError("Sending same data to more than one output channel "
175 "not implemented yet.");
176 }
177 }
178
179 if (0 == fFileName.size() && 0 != fHost.size()) {
180 std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
181 LOG(info) << "Open TSPublisher at " << connector;
182 fSource = new fles::TimesliceSubscriber(connector, 1);
183 if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
184 }
185 else {
186 if (false == OpenNextFile()) {
187 throw InitTaskError("Could not open the first input file in the list, Doing nothing!");
188 }
189 }
190
191 fTime = std::chrono::steady_clock::now();
192}
193catch (InitTaskError& e) {
194 LOG(error) << e.what();
195 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
197}
198
200{
201 // First Close and delete existing source
202 if (nullptr != fSource) delete fSource;
203
204 if (fInputFileList.size() > 0) {
206 fInputFileList.erase(fInputFileList.begin());
207 LOG(info) << "Open the Flib input file " << fFileName;
208 filesys::path pathObj(fFileName);
209 if (!filesys::is_regular_file(pathObj)) {
210 LOG(error) << "Input file " << fFileName << " doesn't exist.";
211 return false;
212 }
213 fSource = new fles::TimesliceInputArchive(fFileName);
214 if (!fSource) {
215 LOG(error) << "Could not open input file.";
216 return false;
217 }
218 }
219 else {
220 LOG(info) << "End of files list reached.";
221 return false;
222 }
223 return true;
224}
225
226bool CbmMQTsaSamplerTof::IsChannelNameAllowed(std::string channelName)
227{
228 LOG(info) << "Number of allowed channels: " << fAllowedChannels.size();
229 for (auto const& entry : fAllowedChannels) {
230 LOG(info) << "Inspect " << entry;
231 std::size_t pos1 = channelName.find(entry);
232 if (pos1 != std::string::npos) {
233 const vector<std::string>::const_iterator pos =
234 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
235 const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
236 LOG(info) << "Found " << entry << " in " << channelName;
237 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
238 if (idx < 3) { //FIXME, hardwired constant!!!
239 fComponentsToSend[idx]++;
240 fChannelsToSend[idx].push_back(channelName);
241 }
242 return true;
243 }
244 }
245 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
246 LOG(error) << "Stop device.";
247 return false;
248}
249
250bool CbmMQTsaSamplerTof::IsChannelUp(std::string channelName)
251{
252 for (auto const& entry : fChannels) {
253 LOG(info) << "Inspect " << entry.first;
254 std::size_t pos1 = channelName.find(entry.first);
255 if (pos1 != std::string::npos) {
256 LOG(info) << "Channel name " << channelName << " found in list of defined channel names ";
257 return true;
258 }
259 }
260 LOG(info) << "Channel name " << channelName << " not found in list of defined channel names.";
261 LOG(error) << "Stop device.";
262 return false;
263}
264
266{
267
268 auto timeslice = fSource->get();
269 if (timeslice) {
271 fTSCounter++;
272
273 const fles::Timeslice& ts = *timeslice;
274 auto tsIndex = ts.index();
275
276 if (fTSCounter % 10000 == 0) LOG(info) << "Sample TimeSlice " << fTSCounter << ", Index " << tsIndex;
277
278 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << fTSCounter << ", index "
279 << tsIndex;
280
281
282 CheckTimeslice(ts);
283 /*
284 for (int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
285 CreateAndSendComponent(ts, nrComp);
286 }
287 */
288 // keep components together
289 std::vector<FairMQParts> parts;
290 std::vector<bool> bparts;
291 parts.resize(fComponentsToSend.size());
292 bparts.resize(parts.size());
293 for (uint i = 0; i < bparts.size(); i++)
294 bparts[i] = false;
295 LOG(debug) << "parts with size " << parts.size();
296
297 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
298 // CreateAndCombineComponents(ts, nrComp);
299 LOG(debug) << "nrComp " << nrComp << ", SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
300 const vector<int>::const_iterator pos =
301 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
302 if (pos != fSysId.end()) {
303 const vector<std::string>::size_type idx = pos - fSysId.begin();
304 if (fComponentsToSend[idx] > 0) {
305 LOG(debug) << "Append timeslice component of link " << nrComp << " to idx " << idx;
306
307 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
308 component.append_component(ts.num_microslices(0));
309
310 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
311 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
312 }
313
314 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
315 //if(idx > parts.size()-1) parts.resize(idx+1);
316
317 //if ( !AppendData(component, idx) ) return false;
318 // serialize the timeslice and create the message
319 std::stringstream oss;
320 boost::archive::binary_oarchive oa(oss);
321 oa << component;
322 std::string* strMsg = new std::string(oss.str());
323
324 LOG(debug) << "AddParts to " << idx << ": current size " << parts[idx].Size();
325
326 parts[idx].AddPart(NewMessage(
327 const_cast<char*>(strMsg->c_str()), // data
328 strMsg->length(), // size
329 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
330 strMsg)); // object that manages the data
331
332 bparts[idx] = true;
333 }
334 }
335 }
336
337 for (uint idx = 0; idx < parts.size(); idx++)
338 if (bparts[idx]) {
339 LOG(debug) << "Send parts with size " << parts[idx].Size() << " to channel " << fChannelsToSend[idx][0];
340 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
341 LOG(error) << "Problem sending data";
342 return false;
343 }
344 LOG(debug) << "Sent message " << fMessageCounter << " with a size of " << parts[idx].Size();
346 }
347
348 //if(!SendTs()) return false;
349 return true;
350 }
351 else {
352 LOG(info) << " Number of requested time slices reached, exiting ";
353 if (false == OpenNextFile()) {
354 CalcRuntime();
356 return false;
357 }
358 else {
359 CalcRuntime();
361 return false;
362 }
363 }
364 }
365 else {
366 if (false == OpenNextFile()) {
367 CalcRuntime();
369 return false;
370 }
371 else {
372 return true;
373 }
374 }
375}
376
377bool CbmMQTsaSamplerTof::CreateAndCombineComponents(const fles::Timeslice& /*ts*/, int /*nrComp*/)
378{
379
380 // Check if component has to be send. If the corresponding channel
381 // is connected append it to parts
382 /*
383 LOG(debug) <<"nrComp "<< nrComp<< ", SysID: " << static_cast<int>(ts.descriptor(nrComp,0).sys_id);
384 const vector<int>::const_iterator pos =
385 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp,0).sys_id));
386 if (pos != fSysId.end() ) {
387 const vector<std::string>::size_type idx = pos-fSysId.begin();
388 if (fComponentsToSend[idx]>0) {
389 LOG(debug) << "Append timeslice component of link " << nrComp<< " to idx "<<idx;
390
391 fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
392 component.append_component(ts.num_microslices(0));
393
394 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
395 component.append_microslice( 0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m) );
396 }
397
398 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
399 if(idx > parts.size()-1) parts.resize(idx+1);
400
401 if ( !AppendData(component, idx) ) return false;
402 bparts[idx]=true;
403 return true;
404 }
405 }
406 */
407 return true;
408}
409
410bool CbmMQTsaSamplerTof::AppendData(const fles::StorableTimeslice& /*component*/, int /*idx*/)
411{
412 // serialize the timeslice and create the message
413 /*
414 std::stringstream oss;
415 boost::archive::binary_oarchive oa(oss);
416 oa << component;
417 std::string* strMsg = new std::string(oss.str());
418
419 LOG(debug)<<"AddParts to "<<idx<<": current size "<<parts[idx].Size();
420
421 parts[idx].AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
422 strMsg->length(), // size
423 [](void*, void* object){ delete static_cast<std::string*>(object); },
424 strMsg)); // object that manages the data
425 */
426 return true;
427}
428
430{
431 /*
432 for (int idx=0; idx<parts.size(); idx++)
433 if(bparts[idx]){
434 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
435 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
436 LOG(error) << "Problem sending data";
437 return false;
438 }
439
440 fMessageCounter++;
441 LOG(debug) << "Send message " << fMessageCounter << " with a size of "
442 << parts[idx].Size();
443 }
444 */
445 return true;
446}
447
448bool CbmMQTsaSamplerTof::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
449{
450
451 // Check if component has to be send. If the corresponding channel
452 // is connected create the new timeslice and send it to the
453 // correct channel
454
455 LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
456 const vector<int>::const_iterator pos =
457 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
458 if (pos != fSysId.end()) {
459 const vector<std::string>::size_type idx = pos - fSysId.begin();
460 if (fComponentsToSend[idx] > 0) {
461 LOG(debug) << "Create timeslice component for link " << nrComp;
462
463 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
464 component.append_component(ts.num_microslices(0));
465
466 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
467 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
468 }
469 if (!SendData(component, idx)) return false;
470 return true;
471 }
472 }
473 return true;
474}
475
476bool CbmMQTsaSamplerTof::SendData(const fles::StorableTimeslice& component, int idx)
477{
478 // serialize the timeslice and create the message
479 std::stringstream oss;
480 boost::archive::binary_oarchive oa(oss);
481 oa << component;
482 std::string* strMsg = new std::string(oss.str());
483
484 FairMQMessagePtr msg(NewMessage(
485 const_cast<char*>(strMsg->c_str()), // data
486 strMsg->length(), // size
487 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
488 strMsg)); // object that manages the data
489
490 // TODO: Implement sending same data to more than one channel
491 // Need to create new message (copy message??)
492 if (fComponentsToSend[idx] > 1) { LOG(debug) << "Need to copy FairMessage"; }
493
494 // in case of error or transfer interruption,
495 // return false to go to IDLE state
496 // successfull transfer will return number of bytes
497 // transfered (can be 0 if sending an empty message).
498
499 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
500 if (Send(msg, fChannelsToSend[idx][0]) < 0) {
501 LOG(error) << "Problem sending data";
502 return false;
503 }
504
506 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
507
508 return true;
509}
510
511
513
515{
516 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
517
518 LOG(info) << "Runtime: " << run_time.count();
519 LOG(info) << "No more input data";
520}
521
522
523void CbmMQTsaSamplerTof::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
524{
525 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
526 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
527 LOG(info) << "Equipement ID: " << mdsc.eq_id;
528 LOG(info) << "Flags: " << mdsc.flags;
529 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
530 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
531 LOG(info) << "Microslice Idx: " << mdsc.idx;
532 LOG(info) << "Checksum: " << mdsc.crc;
533 LOG(info) << "Size: " << mdsc.size;
534 LOG(info) << "Offset: " << mdsc.offset;
535}
536
537bool CbmMQTsaSamplerTof::CheckTimeslice(const fles::Timeslice& ts)
538{
539 if (0 == ts.num_components()) {
540 LOG(error) << "No Component in TS " << ts.index();
541 return 1;
542 }
543 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
544
545 for (size_t c = 0; c < ts.num_components(); ++c) {
546 LOG(debug) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
547 LOG(debug) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
548 LOG(debug) << "Component " << c << " has the system id 0x" << std::hex
549 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
550 /*
551 LOG(debug) << "Component " << c << " has the system id 0x"
552 << static_cast<int>(ts.descriptor(c,0).sys_id);
553 */
554 /*
555 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
556 PrintMicroSliceDescriptor(ts.descriptor(c,m));
557 }
558*/
559 }
560
561 return true;
562}
563
565{
566 if (IsChannelUp("syscmd")) {
567 LOG(info) << "stop subscribers in 100 sec";
568 std::this_thread::sleep_for(std::chrono::milliseconds(100000));
569
570 FairMQMessagePtr pub(NewSimpleMessage("STOP"));
571 if (Send(pub, "syscmd") < 0) { LOG(error) << "Sending STOP message failed"; }
572
573 LOG(info) << "task reset subscribers in 1 sec";
574 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
575 FairMQMessagePtr task_reset(NewSimpleMessage("TASK_RESET"));
576
577 if (Send(task_reset, "syscmd") < 0) { LOG(error) << "Sending Task_Reset message failed"; }
578 }
579 // FairMQStateMachine::ChangeState(STOP);
580}
fscal v[fmask::Size]
Definition KfSimdPseudo.h:4
bool IsChannelUp(std::string)
bool AppendData(const fles::StorableTimeslice &, int)
std::vector< int > fComponentsToSend
std::vector< int > fSysId
virtual bool ConditionalRun()
fles::TimesliceSource * fSource
bool CreateAndCombineComponents(const fles::Timeslice &, int)
bool IsChannelNameAllowed(std::string)
std::vector< std::string > fAllowedChannels
bool CreateAndSendComponent(const fles::Timeslice &, int)
bool SendData(const fles::StorableTimeslice &component)
std::vector< std::string > fInputFileList
List of input files.
std::chrono::steady_clock::time_point fTime
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
bool CheckTimeslice(const fles::Timeslice &ts)
std::vector< std::vector< std::string > > fChannelsToSend
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.