CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaMultiSamplerTof.cxx
Go to the documentation of this file.
1/* Copyright (C) 2017-2021 PI-UHd, GSI
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Norbert Herrmann [committer], Florian Uhlig */
4
14
15#include "CbmMQDefs.h"
16
17#include "StorableTimeslice.hpp"
18#include "TimesliceInputArchive.hpp"
19#include "TimesliceMultiInputArchive.hpp"
20#include "TimesliceMultiSubscriber.hpp"
21#include "TimesliceSubscriber.hpp"
22
23#include "FairMQLogger.h"
24#include "FairMQProgOptions.h" // device->fConfig
25
26#include <boost/algorithm/string.hpp>
27#include <boost/archive/binary_oarchive.hpp>
28#include <boost/filesystem.hpp>
29#include <boost/regex.hpp>
30
31namespace filesys = boost::filesystem;
32
33#include <thread> // this_thread::sleep_for
34
35#include <algorithm>
36#include <chrono>
37#include <ctime>
38#include <string>
39
40#include <stdio.h>
41
42using namespace std;
43
44#include <stdexcept>
45
46static uint fiSelectComponents(0);
47
48struct InitTaskError : std::runtime_error {
49 using std::runtime_error::runtime_error;
50};
51
53 : FairMQDevice()
54 , fMaxTimeslices(0)
55 , fFileName("")
56 , fDirName("")
57 , fInputFileList()
58 , fFileCounter(0)
59 , fHost("")
60 , fPort(0)
61 , fHighWaterMark(1)
62 , fTSNumber(0)
63 , fTSCounter(0)
64 , fMessageCounter(0)
65 , fSource(nullptr)
66 , fTime()
67{
68}
69
71try {
72 // Get the values from the command line options (via fConfig)
73 fFileName = fConfig->GetValue<string>("filename");
74 fDirName = fConfig->GetValue<string>("dirname");
75 fHost = fConfig->GetValue<string>("flib-host");
76 fPort = fConfig->GetValue<uint64_t>("flib-port");
77 fHighWaterMark = fConfig->GetValue<uint64_t>("high-water-mark");
78 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
79 if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
80 fiSelectComponents = fConfig->GetValue<uint64_t>("SelectComponents");
81
82 if (0 == fMaxTimeslices) fMaxTimeslices = UINT_MAX;
83
84 // Check which input is defined
85 // Posibilities
86 // filename && ! dirname : single file
87 // filename with wildcards && diranme : all files with filename regex in the directory
88 // host && port : connect to the flim server
89
90 bool isGoodInputCombi {false};
91 if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
92 isGoodInputCombi = true;
93 fInputFileList.push_back(fFileName);
94 }
95 else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
96 isGoodInputCombi = true;
97 fInputFileList.push_back(fFileName);
98 }
99 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
100 isGoodInputCombi = true;
101 LOG(info) << "Host: " << fHost;
102 LOG(info) << "Port: " << fPort;
103 }
104 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) {
105 isGoodInputCombi = true;
106 LOG(info) << "Host string: " << fHost;
107 }
108 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 == fPort) {
109 isGoodInputCombi = true;
110 LOG(info) << "Host string: " << fHost;
111 }
112 else {
113 isGoodInputCombi = false;
114 }
115
116
117 if (!isGoodInputCombi) {
118 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
119 "or host + port are allowed combination.");
120 }
121
122
123 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
124
125 // Get the information about created channels from the device
126 // Check if the defined channels from the topology (by name)
127 // are in the list of channels which are possible/allowed
128 // for the device
129 // The idea is to check at initilization if the devices are
130 // properly connected. For the time beeing this is done with a
131 // nameing convention. It is not avoided that someone sends other
132 // data on this channel.
133 int noChannel = fChannels.size();
134 LOG(info) << "Number of defined output channels: " << noChannel;
135 for (auto const& entry : fChannels) {
136 LOG(info) << "Channel name: " << entry.first;
137 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
138 }
139
140 for (auto const& value : fComponentsToSend) {
141 LOG(info) << "Value : " << value;
142 if (value > 1) {
143 throw InitTaskError("Sending same data to more than one output channel "
144 "not implemented yet.");
145 }
146 }
147
148
149 if (0 == fFileName.size() && 0 != fHost.size() && 0 != fPort) {
150 // Don't add the protocol since this is done now in the TimesliceMultiSubscriber
151 //std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
152 std::string connector = fHost + ":" + std::to_string(fPort);
153 LOG(info) << "Open TSPublisher at " << connector;
154 fSource = new fles::TimesliceMultiSubscriber(connector);
155 }
156 else if (0 == fFileName.size() && 0 != fHost.size()) {
157 std::string connector = fHost;
158 LOG(info) << "Open TSPublisher with host string: " << connector;
159 fSource = new fles::TimesliceMultiSubscriber(connector, fHighWaterMark);
160 }
161 else {
162 // Create a ";" separated string with all file names
163 std::string fileList {""};
164 for (const auto& obj : fInputFileList) {
165 std::string fileName = obj;
166 fileList += fileName;
167 fileList += ";";
168 }
169 fileList.pop_back(); // Remove the last ;
170 LOG(info) << "Input File String: " << fileList;
171 fSource = new fles::TimesliceMultiInputArchive(fileList, fDirName);
172 if (!fSource) { throw InitTaskError("Could open files from file list."); }
173 }
174
175 fTime = std::chrono::steady_clock::now();
176}
177catch (InitTaskError& e) {
178 LOG(error) << e.what();
179 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
181}
182
184{
185
186 for (auto const& entry : fAllowedChannels) {
187 LOG(info) << "Looking for name " << channelName << " in " << entry;
188 std::size_t pos1 = channelName.find(entry);
189 if (pos1 != std::string::npos) {
190 const vector<std::string>::const_iterator pos =
191 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
192 const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
193 LOG(info) << "Found " << entry << " in " << channelName;
194 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
195 if (idx < 3) { //FIXME, hardwired constant!!!
196 fComponentsToSend[idx]++;
197 fChannelsToSend[idx].push_back(channelName);
198 }
199 return true;
200 }
201 }
202 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
203 LOG(error) << "Stop device.";
204 return false;
205}
206
207bool CbmMQTsaMultiSamplerTof::IsChannelUp(std::string channelName)
208{
209 for (auto const& entry : fChannels) {
210 LOG(info) << "Inspect " << entry.first;
211 std::size_t pos1 = channelName.find(entry.first);
212 if (pos1 != std::string::npos) {
213 LOG(info) << "Channel name " << channelName << " found in list of defined channel names ";
214 return true;
215 }
216 }
217 LOG(info) << "Channel name " << channelName << " not found in list of defined channel names.";
218 LOG(error) << "Stop device.";
219 return false;
220}
221
223{
224 auto timeslice = fSource->get();
225 if (timeslice) {
227 fTSCounter++;
228
229 const fles::Timeslice& ts = *timeslice;
230 auto tsIndex = ts.index();
231
232 if (fTSCounter % 10000 == 0) LOG(info) << "Sample TimeSlice " << fTSCounter << ", Index " << tsIndex;
233
234 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice " << fTSCounter << ", index "
235 << tsIndex;
236
237
238 CheckTimeslice(ts);
239 /*
240 for (int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
241 CreateAndSendComponent(ts, nrComp);
242 }
243 */
244 // keep components together
245 std::vector<FairMQParts> parts;
246 std::vector<bool> bparts;
247 parts.resize(fComponentsToSend.size());
248 bparts.resize(parts.size());
249 for (uint i = 0; i < bparts.size(); i++)
250 bparts[i] = false;
251
252 switch (fiSelectComponents) {
253 case 0: { // send complete timeslice
254 int iSysId = 0x60;
255 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
256 if (pos != fSysId.end()) {
257 const vector<std::string>::size_type idx = pos - fSysId.begin();
258 if (fComponentsToSend[idx] > 0) {
259 fles::StorableTimeslice tss = fles::StorableTimeslice(ts);
260
261
262 std::stringstream oss;
263 boost::archive::binary_oarchive oa(oss);
264 oa << tss;
265 std::string* strMsg = new std::string(oss.str());
266 LOG(debug) << "AddPart " << idx << " with length " << strMsg->length();
267
268 parts[idx].AddPart(NewMessage(
269 const_cast<char*>(strMsg->c_str()), // data
270 strMsg->length(), // size
271 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
272 strMsg)); // object that manages the data
273 LOG(debug) << "AddParts to " << idx << ": current size " << parts[idx].Size();
274 bparts[idx] = true;
275 }
276 }
277 } break;
278
279 case 1: {
280 LOG(debug) << "parts with size " << parts.size() << ", #components: " << ts.num_components();
281
282 for (uint nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
283 // CreateAndCombineComponents(ts, nrComp);
284 LOG(debug) << "nrComp " << nrComp << ", SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
285 int iSysId = static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
286 if (iSysId == 0x90 || iSysId == 0x91) iSysId = 0x60; // treat t0 like tof
287 const vector<int>::const_iterator pos = std::find(fSysId.begin(), fSysId.end(), iSysId);
288 if (pos != fSysId.end()) {
289 const vector<std::string>::size_type idx = pos - fSysId.begin();
290 if (fComponentsToSend[idx] > 0) {
291 LOG(debug) << "Append timeslice component of link " << nrComp << " to idx " << idx;
292
293 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
294 component.append_component(ts.num_microslices(0));
295
296 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
297 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
298 }
299
300 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
301 //if(idx > parts.size()-1) parts.resize(idx+1);
302
303 //if ( !AppendData(component, idx) ) return false;
304 // serialize the timeslice and create the message
305 std::stringstream oss;
306 boost::archive::binary_oarchive oa(oss);
307 oa << component;
308 std::string* strMsg = new std::string(oss.str());
309
310 LOG(debug) << "AddParts to " << idx << ": current size " << parts[idx].Size();
311
312 parts[idx].AddPart(NewMessage(
313 const_cast<char*>(strMsg->c_str()), // data
314 strMsg->length(), // size
315 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
316 strMsg)); // object that manages the data
317
318 bparts[idx] = true;
319 }
320 }
321 }
322 } break;
323
324 default:;
325 }
326
327 for (uint idx = 0; idx < parts.size(); idx++)
328 if (bparts[idx]) {
329 LOG(debug) << "Send parts with size " << parts[idx].Size() << " to channel " << fChannelsToSend[idx][0];
330 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
331 LOG(error) << "Problem sending data";
332 return false;
333 }
334 LOG(debug) << "Sent message " << fMessageCounter << " with a size of " << parts[idx].Size();
336 }
337
338 //if(!SendTs()) return false;
339 return true;
340 }
341 else {
342 LOG(info) << " Number of requested time slices reached, exiting ";
344 return false;
345 }
346 }
347 else {
348 LOG(info) << " No more data, exiting ";
350 return false;
351 }
352}
353
354bool CbmMQTsaMultiSamplerTof::CreateAndCombineComponents(const fles::Timeslice& /*ts*/, int /*nrComp*/)
355{
356
357 // Check if component has to be send. If the corresponding channel
358 // is connected append it to parts
359 /*
360 LOG(debug) <<"nrComp "<< nrComp<< ", SysID: " << static_cast<int>(ts.descriptor(nrComp,0).sys_id);
361 const vector<int>::const_iterator pos =
362 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp,0).sys_id));
363 if (pos != fSysId.end() ) {
364 const vector<std::string>::size_type idx = pos-fSysId.begin();
365 if (fComponentsToSend[idx]>0) {
366 LOG(debug) << "Append timeslice component of link " << nrComp<< " to idx "<<idx;
367
368 fles::StorableTimeslice component{static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
369 component.append_component(ts.num_microslices(0));
370
371 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
372 component.append_microslice( 0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m) );
373 }
374
375 //LOG(debug)<<"Parts size available for "<<idx<<": "<<parts.size();
376 if(idx > parts.size()-1) parts.resize(idx+1);
377
378 if ( !AppendData(component, idx) ) return false;
379 bparts[idx]=true;
380 return true;
381 }
382 }
383 */
384 return true;
385}
386
387bool CbmMQTsaMultiSamplerTof::AppendData(const fles::StorableTimeslice& /*component*/, int /*idx*/)
388{
389 // serialize the timeslice and create the message
390 /*
391 std::stringstream oss;
392 boost::archive::binary_oarchive oa(oss);
393 oa << component;
394 std::string* strMsg = new std::string(oss.str());
395
396 LOG(debug)<<"AddParts to "<<idx<<": current size "<<parts[idx].Size();
397
398 parts[idx].AddPart(NewMessage(const_cast<char*>(strMsg->c_str()), // data
399 strMsg->length(), // size
400 [](void*, void* object){ delete static_cast<std::string*>(object); },
401 strMsg)); // object that manages the data
402 */
403 return true;
404}
405
407{
408 /*
409 for (int idx=0; idx<parts.size(); idx++)
410 if(bparts[idx]){
411 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
412 if (Send(parts[idx], fChannelsToSend[idx][0]) < 0) {
413 LOG(error) << "Problem sending data";
414 return false;
415 }
416
417 fMessageCounter++;
418 LOG(debug) << "Send message " << fMessageCounter << " with a size of "
419 << parts[idx].Size();
420 }
421 */
422 return true;
423}
424
425bool CbmMQTsaMultiSamplerTof::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
426{
427
428 // Check if component has to be send. If the corresponding channel
429 // is connected create the new timeslice and send it to the
430 // correct channel
431
432 LOG(debug) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
433 const vector<int>::const_iterator pos =
434 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
435 if (pos != fSysId.end()) {
436 const vector<std::string>::size_type idx = pos - fSysId.begin();
437 if (fComponentsToSend[idx] > 0) {
438 LOG(debug) << "Create timeslice component for link " << nrComp;
439
440 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
441 component.append_component(ts.num_microslices(nrComp));
442
443 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
444 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
445 }
446 if (!SendData(component, idx)) return false;
447 return true;
448 }
449 }
450 return true;
451}
452
453bool CbmMQTsaMultiSamplerTof::SendData(const fles::StorableTimeslice& component, int idx)
454{
455 // serialize the timeslice and create the message
456 std::stringstream oss;
457 boost::archive::binary_oarchive oa(oss);
458 oa << component;
459 std::string* strMsg = new std::string(oss.str());
460
461 FairMQMessagePtr msg(NewMessage(
462 const_cast<char*>(strMsg->c_str()), // data
463 strMsg->length(), // size
464 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
465 strMsg)); // object that manages the data
466
467 // TODO: Implement sending same data to more than one channel
468 // Need to create new message (copy message??)
469 if (fComponentsToSend[idx] > 1) { LOG(debug) << "Need to copy FairMessage"; }
470
471 // in case of error or transfer interruption,
472 // return false to go to IDLE state
473 // successfull transfer will return number of bytes
474 // transfered (can be 0 if sending an empty message).
475
476 LOG(debug) << "Send data to channel " << fChannelsToSend[idx][0];
477 if (Send(msg, fChannelsToSend[idx][0]) < 0) {
478 LOG(error) << "Problem sending data";
479 return false;
480 }
481
483 LOG(debug) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
484
485 return true;
486}
487
488
490
492{
493 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
494
495 LOG(info) << "Runtime: " << run_time.count();
496 LOG(info) << "No more input data";
497}
498
499
500void CbmMQTsaMultiSamplerTof::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
501{
502 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
503 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
504 LOG(info) << "Equipement ID: " << mdsc.eq_id;
505 LOG(info) << "Flags: " << mdsc.flags;
506 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
507 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
508 LOG(info) << "Microslice Idx: " << mdsc.idx;
509 LOG(info) << "Checksum: " << mdsc.crc;
510 LOG(info) << "Size: " << mdsc.size;
511 LOG(info) << "Offset: " << mdsc.offset;
512}
513
514bool CbmMQTsaMultiSamplerTof::CheckTimeslice(const fles::Timeslice& ts)
515{
516 if (0 == ts.num_components()) {
517 LOG(error) << "No Component in TS " << ts.index();
518 return 1;
519 }
520 LOG(debug) << "Found " << ts.num_components() << " different components in timeslice";
521
522 for (size_t c = 0; c < ts.num_components(); ++c) {
523 LOG(debug) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
524 LOG(debug) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
525 LOG(debug) << "Component " << c << " has the system id 0x" << std::hex
526 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
527 /*
528 if(ts.descriptor(c,0).sys_id == 0x90 ) { // found a t0 - timeslice
529 ts.descriptor(c,0).sys_id = 0x60; // rename t0 to tof , not allowed
530 }
531 */
532 /*
533 LOG(debug) << "Component " << c << " has the system id 0x"
534 << static_cast<int>(ts.descriptor(c,0).sys_id);
535 */
536 /*
537 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
538 PrintMicroSliceDescriptor(ts.descriptor(c,m));
539 }
540*/
541 }
542
543 return true;
544}
545
547{
548 if (IsChannelUp("syscmd")) {
549 LOG(info) << "stop subscribers in 10 sec";
550 std::this_thread::sleep_for(std::chrono::milliseconds(10000));
551
552 FairMQMessagePtr pub(NewSimpleMessage("STOP"));
553 if (Send(pub, "syscmd") < 0) { LOG(error) << "Sending STOP message failed"; }
554
555 LOG(info) << "task reset subscribers in 1 sec";
556 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
557 FairMQMessagePtr task_reset(NewSimpleMessage("TASK_RESET"));
558
559 if (Send(task_reset, "syscmd") < 0) { LOG(error) << "Sending Task_Reset message failed"; }
560 }
561 // FairMQStateMachine::ChangeState(STOP);
562}
static uint fiSelectComponents
fles::TimesliceSource * fSource
std::chrono::steady_clock::time_point fTime
std::vector< std::string > fAllowedChannels
std::vector< std::string > fInputFileList
List of input files.
bool CreateAndCombineComponents(const fles::Timeslice &, int)
bool SendData(const fles::StorableTimeslice &component)
bool AppendData(const fles::StorableTimeslice &, int)
std::vector< std::vector< std::string > > fChannelsToSend
bool CheckTimeslice(const fles::Timeslice &ts)
bool CreateAndSendComponent(const fles::Timeslice &, int)
std::vector< int > fComponentsToSend
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.