CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaMultiSamplerTof.cxx
Go to the documentation of this file.
1/* Copyright (C) 2017-2021 PI-UHd, GSI
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Norbert Herrmann [committer], Florian Uhlig */
4
11
12
14
15#include "CbmMQDefs.h"
16
17#include "StorableTimeslice.hpp"
18#include "TimesliceInputArchive.hpp"
19#include "TimesliceMultiInputArchive.hpp"
20#include "TimesliceMultiSubscriber.hpp"
21#include "TimesliceSubscriber.hpp"
22
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h" // device->fConfig
25
26#include <boost/algorithm/string.hpp>
27#include <boost/archive/binary_oarchive.hpp>
28#include <boost/filesystem.hpp>
29#include <boost/regex.hpp>
30
31namespace filesys = boost::filesystem;
32
33#include <algorithm>
34#include <chrono>
35#include <cstdio>
36#include <ctime>
37#include <string>
38#include <thread> // this_thread::sleep_for
39
40using namespace std;
41
42#include <stdexcept>
43
44static uint fiSelectComponents(0);
45
46struct InitTaskError : std::runtime_error {
47 using std::runtime_error::runtime_error;
48};
49
51 : FairMQDevice()
53 , fFileName("")
54 , fDirName("")
56 , fFileCounter(0)
57 , fHost("")
58 , fPort(0)
60 , fTSNumber(0)
61 , fTSCounter(0)
63 , fSource(nullptr)
64 , fTime()
65{
66}
67
69try {
70 // Get the values from the command line options (via fConfig)
71 fFileName = fConfig->GetValue<string>("filename");
72 fDirName = fConfig->GetValue<string>("dirname");
73 fHost = fConfig->GetValue<string>("flib-host");
74 fPort = fConfig->GetValue<uint64_t>("flib-port");
75 fHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
76 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
77 if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
78 fiSelectComponents = fConfig->GetValue<uint64_t>("SelectComponents");
79
80 if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
81
82 // Check which input is defined
83 // Posibilities
84 // filename && ! dirname : single file
85 // filename with wildcards && diranme : all files with filename regex in the directory
86 // host && port : connect to the flim server
87
88 bool isGoodInputCombi {false};
89 if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
90 isGoodInputCombi = true;
91 fInputFileList.push_back(fFileName);
92 }
93 else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
94 isGoodInputCombi = true;
95 fInputFileList.push_back(fFileName);
96 }
97 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
98 isGoodInputCombi = true;
99 LOG(info) << "Host: " << fHost;
100 LOG(info) << "Port: " << fPort;
101 }
102 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) {
103 isGoodInputCombi = true;
104 LOG(info) << "Host string: " << fHost;
105 }
106 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) {
107 isGoodInputCombi = true;
108 LOG(info) << "Host string: " << fHost;
109 }
110 else {
111 isGoodInputCombi = false;
112 }
113
114
115 if (!isGoodInputCombi) {
116 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
117 "or host + port are allowed combination.");
118 }
119
120
121 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
122
123 // Get the information about created channels from the device
124 // Check if the defined channels from the topology (by name)
125 // are in the list of channels which are possible/allowed
126 // for the device
127 // The idea is to check at initilization if the devices are
128 // properly connected. For the time beeing this is done with a
129 // nameing convention. It is not avoided that someone sends other
130 // data on this channel.
131 int noChannel = fChannels.size();
132 LOG(info) << "Number of defined output channels: " << noChannel;
133 for (auto const& entry : fChannels) {
134 LOG(info) << "Channel name: " << entry.first;
135 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
136 }
137
138 for (auto const& value : fComponentsToSend) {
139 LOG(info) << "Value : " << value;
140 if (value > 1) {
141 throw InitTaskError("Sending same data to more than one output channel "
142 "not implemented yet.");
143 }
144 }
145
146
147 if (0 == fFileName.size() && 0 != fHost.size() && 0 != fPort) {
148 // Don't add the protocol since this is done now in the TimesliceMultiSubscriber
149 //std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
150 std::string connector = fHost + ":" + std::to_string(fPort);
151 LOG(info) << "Open TSPublisher at " << connector;
152 fSource = new fles::TimesliceMultiSubscriber(connector);
153 }
154 else if (0 == fFileName.size() && 0 != fHost.size()) {
155 std::string connector = fHost;
156 LOG(info) << "Open TSPublisher with host string: " << connector;
157 fSource = new fles::TimesliceMultiSubscriber(connector, fHighWaterMark);
158 }
159 else {
160 // Create a ";" separated string with all file names
161 std::string fileList {""};
162 for (const auto& obj : fInputFileList) {
163 std::string fileName = obj;
164 fileList += fileName;
165 fileList += ";";
166 }
167 fileList.pop_back(); // Remove the last ;
168 LOG(info) << "Input File String: " << fileList;
169 fSource = new fles::TimesliceMultiInputArchive(fileList, fDirName);
170 if (!fSource) { throw InitTaskError("Could open files from file list."); }
171 }
172
173 fTime = std::chrono::steady_clock::now();
174}
175catch (InitTaskError& e) {
176 LOG(error) << e.what();
177 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
179}
180
182{
183
184 for (auto const& entry : fAllowedChannels) {
185 LOG(info) << "Looking for name " << channelName << " in " << entry;
186 std::size_t pos1 = channelName.find(entry);
187 if (pos1 != std::string::npos) {
188 const vector<std::string>::const_iterator pos =
189 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
190 const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
191 LOG(info) << "Found " << entry << " in " << channelName;
192 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
193 if (idx < 3) { //FIXME, hardwired constant!!!
194 fComponentsToSend[idx]++;
195 fChannelsToSend[idx].push_back(channelName);
196 }
197 return true;
198 }
199 }
200 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
201 LOG(error) << "Stop device.";
202 return false;
203}
204
205bool CbmMQTsaMultiSamplerTof::IsChannelUp(std::string channelName)
206{
207 for (auto const& entry : fChannels) {
208 LOG(info) << "Inspect " << entry.first;
209 std::size_t pos1 = channelName.find(entry.first);
210 if (pos1 != std::string::npos) {
211 LOG(info) << "Channel name " << channelName << " found in list of defined channel names ";
212 return true;
213 }
214 }
215 LOG(info) << "Channel name " << channelName << " not found in list of defined channel names.";
216 LOG(error) << "Stop device.";
217 return false;
218}
219
221{
222 auto timeslice = fSource->get();
223 if (timeslice) {
225 fTSCounter++;
226
227 const fles::Timeslice& ts = *timeslice;
228 auto tsIndex = ts.index();
229
230 if (fTSCounter % 10000 == 0) LOG(info) << "Sample TimeSlice " << fTSCounter << ", Index " << tsIndex;
231
232 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << fTSCounter << ", index "
233 << tsIndex;
234
235
236 CheckTimeslice(ts);
237 /*
238 for (int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
239 CreateAndSendComponent(ts, nrComp);
240 }
241 */
242 // keep components together
243 std::vector<FairMQParts> parts;
244 std::vector<bool> bparts;
245 parts.resize(fComponentsToSend.size());
246 bparts.resize(parts.size());
247 for (uint i = 0; i < bparts.size(); i++)
248 bparts[i] = false;
249
250 switch (fiSelectComponents) {
251 case 0: { // send complete timeslice
252 int iSysId = 0x60;
253 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
254 if (pos != fSysId.end()) {
255 const vector<std::string>::size_type idx = pos - fSysId.begin();
256 if (fComponentsToSend[idx] > 0) {
257 fles::StorableTimeslice tss = fles::StorableTimeslice(ts);
258
259
260 std::stringstream oss;
261 boost::archive::binary_oarchive oa(oss);
262 oa << tss;
263 std::string* strMsg = new std::string(oss.str());
264 LOG(debug) << "AddPart " << idx << " with length " << strMsg->length();
265
266 parts[idx].AddPart(NewMessage(
267 const_cast<char*>(strMsg->c_str()), // data
268 strMsg->length(), // size
269 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
270 strMsg)); // object that manages the data
271 LOG(debug) << "AddParts to " << idx << ": current size " << parts[idx].Size();
272 bparts[idx] = true;
273 }
274 }
275 } break;
276
277 case 1: {
278 LOG(debug) << "parts with size " << parts.size() << ", #components: " << ts.num_components();
279
280 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
281 // CreateAndCombineComponents(ts, nrComp);
282 LOG(debug) << "nrComp " << nrComp << ", SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
283 int iSysId = static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
284 if (iSysId == 0x90 || iSysId == 0x91) iSysId = 0x60; // treat t0 like tof
285 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
286 if (pos != fSysId.end()) {
287 const vector<std::string>::size_type idx = pos - fSysId.begin();
288 if (fComponentsToSend[idx] > 0) {
289 LOG(debug) << "Append timeslice component of link " << nrComp << " to idx " << idx;
290
291 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
292 component.append_component(ts.num_microslices(0));
293
294 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
295 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
296 }
297
298 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
299 //if(idx > parts.size()-1) parts.resize(idx+1);
300
301 //if ( !AppendData(component, idx) ) return false;
302 // serialize the timeslice and create the message
303 std::stringstream oss;
304 boost::archive::binary_oarchive oa(oss);
305 oa << component;
306 std::string* strMsg = new std::string(oss.str());
307
308 LOG(debug) << "AddParts to " << idx << ": current size " << parts[idx].Size();
309
310 parts[idx].AddPart(NewMessage(
311 const_cast<char*>(strMsg->c_str()), // data
312 strMsg->length(), // size
313 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
314 strMsg)); // object that manages the data
315
316 bparts[idx] = true;
317 }
318 }
319 }
320 } break;
321
322 default:;
323 }
324
325 for (uint idx = 0; idx < parts.size(); idx++)
326 if (bparts[idx]) {
327 LOG(debug) << "Send parts with size " << parts[idx].Size() << " to channel " << fChannelsToSend[idx][0];
328 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
329 LOG(error) << "Problem sending data";
330 return false;
331 }
332 LOG(debug) << "Sent message " << fMessageCounter << " with a size of " << parts[idx].Size();
334 }
335
336 //if(!SendTs()) return false;
337 return true;
338 }
339 else {
340 LOG(info) << " Number of requested time slices reached, exiting ";
342 return false;
343 }
344 }
345 else {
346 LOG(info) << " No more data, exiting ";
348 return false;
349 }
350}
351
352bool CbmMQTsaMultiSamplerTof::CreateAndCombineComponents(const fles::Timeslice& /*ts*/, int /*nrComp*/)
353{
354
355 // Check if component has to be send. If the corresponding channel
356 // is connected append it to parts
357 /*
358 LOG(debug) <<"nrComp "<< nrComp<< ", SysID: " << static_cast<int>(ts.descriptor(nrComp,0).sys_id);
359 const vector<int>::const_iterator pos =
360 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp,0).sys_id));
361 if (pos != fSysId.end() ) {
362 const vector<std::string>::size_type idx = pos-fSysId.begin();
363 if (fComponentsToSend[idx]>0) {
364 LOG(debug) << "Append timeslice component of link " << nrComp<< " to idx "<<idx;
365
366 fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
367 component.append_component(ts.num_microslices(0));
368
369 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
370 component.append_microslice( 0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m) );
371 }
372
373 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
374 if(idx > parts.size()-1) parts.resize(idx+1);
375
376 if ( !AppendData(component, idx) ) return false;
377 bparts[idx]=true;
378 return true;
379 }
380 }
381 */
382 return true;
383}
384
385bool CbmMQTsaMultiSamplerTof::AppendData(const fles::StorableTimeslice& /*component*/, int /*idx*/)
386{
387 // serialize the timeslice and create the message
388 /*
389 std::stringstream oss;
390 boost::archive::binary_oarchive oa(oss);
391 oa << component;
392 std::string* strMsg = new std::string(oss.str());
393
394 LOG(debug)<<"AddParts to "<<idx<<": current size "<<parts[idx].Size();
395
396 parts[idx].AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
397 strMsg->length(), // size
398 [](void*, void* object){ delete static_cast<std::string*>(object); },
399 strMsg)); // object that manages the data
400 */
401 return true;
402}
403
405{
406 /*
407 for (int idx=0; idx<parts.size(); idx++)
408 if(bparts[idx]){
409 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
410 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
411 LOG(error) << "Problem sending data";
412 return false;
413 }
414
415 fMessageCounter++;
416 LOG(debug) << "Send message " << fMessageCounter << " with a size of "
417 << parts[idx].Size();
418 }
419 */
420 return true;
421}
422
423bool CbmMQTsaMultiSamplerTof::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
424{
425
426 // Check if component has to be send. If the corresponding channel
427 // is connected create the new timeslice and send it to the
428 // correct channel
429
430 LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
431 const vector<int>::const_iterator pos =
432 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
433 if (pos != fSysId.end()) {
434 const vector<std::string>::size_type idx = pos - fSysId.begin();
435 if (fComponentsToSend[idx] > 0) {
436 LOG(debug) << "Create timeslice component for link " << nrComp;
437
438 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
439 component.append_component(ts.num_microslices(nrComp));
440
441 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
442 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
443 }
444 if (!SendData(component, idx)) return false;
445 return true;
446 }
447 }
448 return true;
449}
450
451bool CbmMQTsaMultiSamplerTof::SendData(const fles::StorableTimeslice& component, int idx)
452{
453 // serialize the timeslice and create the message
454 std::stringstream oss;
455 boost::archive::binary_oarchive oa(oss);
456 oa << component;
457 std::string* strMsg = new std::string(oss.str());
458
459 FairMQMessagePtr msg(NewMessage(
460 const_cast<char*>(strMsg->c_str()), // data
461 strMsg->length(), // size
462 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
463 strMsg)); // object that manages the data
464
465 // TODO: Implement sending same data to more than one channel
466 // Need to create new message (copy message??)
467 if (fComponentsToSend[idx] > 1) { LOG(debug) << "Need to copy FairMessage"; }
468
469 // in case of error or transfer interruption,
470 // return false to go to IDLE state
471 // successfull transfer will return number of bytes
472 // transfered (can be 0 if sending an empty message).
473
474 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
475 if (Send(msg, fChannelsToSend[idx][0]) < 0) {
476 LOG(error) << "Problem sending data";
477 return false;
478 }
479
481 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
482
483 return true;
484}
485
486
488
490{
491 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
492
493 LOG(info) << "Runtime: " << run_time.count();
494 LOG(info) << "No more input data";
495}
496
497
498void CbmMQTsaMultiSamplerTof::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
499{
500 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
501 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
502 LOG(info) << "Equipement ID: " << mdsc.eq_id;
503 LOG(info) << "Flags: " << mdsc.flags;
504 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
505 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
506 LOG(info) << "Microslice Idx: " << mdsc.idx;
507 LOG(info) << "Checksum: " << mdsc.crc;
508 LOG(info) << "Size: " << mdsc.size;
509 LOG(info) << "Offset: " << mdsc.offset;
510}
511
512bool CbmMQTsaMultiSamplerTof::CheckTimeslice(const fles::Timeslice& ts)
513{
514 if (0 == ts.num_components()) {
515 LOG(error) << "No Component in TS " << ts.index();
516 return 1;
517 }
518 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
519
520 for (size_t c = 0; c < ts.num_components(); ++c) {
521 LOG(debug) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
522 LOG(debug) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
523 LOG(debug) << "Component " << c << " has the system id 0x" << std::hex
524 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
525 /*
526 if(ts.descriptor(c,0).sys_id == 0x90 ) { // found a t0 - timeslice
527 ts.descriptor(c,0).sys_id = 0x60; // rename t0 to tof , not allowed
528 }
529 */
530 /*
531 LOG(debug) << "Component " << c << " has the system id 0x"
532 << static_cast<int>(ts.descriptor(c,0).sys_id);
533 */
534 /*
535 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
536 PrintMicroSliceDescriptor(ts.descriptor(c,m));
537 }
538*/
539 }
540
541 return true;
542}
543
545{
546 if (IsChannelUp("syscmd")) {
547 LOG(info) << "stop subscribers in 10 sec";
548 std::this_thread::sleep_for(std::chrono::milliseconds(10000));
549
550 FairMQMessagePtr pub(NewSimpleMessage("STOP"));
551 if (Send(pub, "syscmd") < 0) { LOG(error) << "Sending STOP message failed"; }
552
553 LOG(info) << "task reset subscribers in 1 sec";
554 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
555 FairMQMessagePtr task_reset(NewSimpleMessage("TASK_RESET"));
556
557 if (Send(task_reset, "syscmd") < 0) { LOG(error) << "Sending Task_Reset message failed"; }
558 }
559 // FairMQStateMachine::ChangeState(STOP);
560}
static uint fiSelectComponents
fles::TimesliceSource * fSource
std::chrono::steady_clock::time_point fTime
std::vector< std::string > fAllowedChannels
std::vector< std::string > fInputFileList
List of input files.
bool CreateAndCombineComponents(const fles::Timeslice &, int)
bool SendData(const fles::StorableTimeslice &component)
bool AppendData(const fles::StorableTimeslice &, int)
std::vector< std::vector< std::string > > fChannelsToSend
bool CheckTimeslice(const fles::Timeslice &ts)
bool CreateAndSendComponent(const fles::Timeslice &, int)
std::vector< int > fComponentsToSend
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.