CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaInfo.cxx
Go to the documentation of this file.
1/* Copyright (C) 2017-2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Florian Uhlig [committer] */
4
11
12
13#include "CbmMQTsaInfo.h"
14
15#include "CbmMQDefs.h"
16#include "FairMQLogger.h"
17#include "FairMQProgOptions.h" // device->fConfig
18#include "TimesliceInputArchive.hpp"
19#include "TimesliceSubscriber.hpp"
20
21#include <boost/archive/binary_oarchive.hpp>
22
23#include <chrono>
24#include <cstdio>
25#include <ctime>
26#include <thread> // this_thread::sleep_for
27
28using namespace std;
29
30#include <stdexcept>
31
32struct InitTaskError : std::runtime_error {
33 using std::runtime_error::runtime_error;
34};
35
36
38 : FairMQDevice()
40 , fFileName("")
42 , fFileCounter(0)
43 , fHost("")
44 , fPort(0)
45 , fTSNumber(0)
46 , fTSCounter(0)
48 , fTime()
49{
50}
51
53try {
54 // Get the values from the command line options (via fConfig)
55 fFileName = fConfig->GetValue<string>("filename");
56 fHost = fConfig->GetValue<string>("flib-host");
57 fPort = fConfig->GetValue<uint64_t>("flib-port");
58 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
59
60
61 LOG(info) << "Filename: " << fFileName;
62 LOG(info) << "Host: " << fHost;
63 LOG(info) << "Port: " << fPort;
64
65 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
66
67 // Get the information about created channels from the device
68 // Check if the defined channels from the topology (by name)
69 // are in the list of channels which are possible/allowed
70 // for the device
71 // The idea is to check at initilization if the devices are
72 // properly connected. For the time beeing this is done with a
73 // nameing convention. It is not avoided that someone sends other
74 // data on this channel.
75 int noChannel = fChannels.size();
76 LOG(info) << "Number of defined output channels: " << noChannel;
77 for (auto const& entry : fChannels) {
78 LOG(info) << "Channel name: " << entry.first;
79 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
80 }
81
82 if (0 == fFileName.size() && 0 != fHost.size()) {
83 std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
84 LOG(info) << "Open TSPublisher at " << connector;
85 fSource = new fles::TimesliceSubscriber(connector, 1);
86 if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
87 }
88 else {
89 LOG(info) << "Open the Flib input file " << fFileName;
90 // Check if the input file exist
91 FILE* inputFile = fopen(fFileName.c_str(), "r");
92 if (!inputFile) { throw InitTaskError("Input file doesn't exist."); }
93 fclose(inputFile);
94 fSource = new fles::TimesliceInputArchive(fFileName);
95 if (!fSource) { throw InitTaskError("Could not open input file."); }
96 }
97 fTime = std::chrono::steady_clock::now();
98}
99catch (InitTaskError& e) {
100 LOG(error) << e.what();
101 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
103}
104
105bool CbmMQTsaInfo::IsChannelNameAllowed(std::string channelName)
106{
107 if (std::find(fAllowedChannels.begin(), fAllowedChannels.end(), channelName) != fAllowedChannels.end()) {
108 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names.";
109 return true;
110 }
111 else {
112 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
113 LOG(error) << "Stop device.";
114 return false;
115 }
116}
117
119{
120
121
122 auto timeslice = fSource->get();
123 if (timeslice) {
124 fTSCounter++;
125 if (fTSCounter % 10000 == 0) LOG(info) << "Analyse Event " << fTSCounter;
126
127
128 const fles::Timeslice& ts = *timeslice;
129 // auto tsIndex = ts.index();
130
131
132 LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
133
134 CheckTimeslice(ts);
135
136 if (fTSCounter < fMaxTimeslices) { return true; }
137 else {
138 CalcRuntime();
139 return false;
140 }
141 }
142 else {
143 CalcRuntime();
144 return false;
145 }
146}
147
148
150
152{
153 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
154
155 LOG(info) << "Runtime: " << run_time.count();
156 LOG(info) << "No more input data";
157}
158
159
160void CbmMQTsaInfo::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
161{
162 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
163 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
164 LOG(info) << "Equipement ID: " << mdsc.eq_id;
165 LOG(info) << "Flags: " << mdsc.flags;
166 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
167 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
168 LOG(info) << "Microslice Idx: " << mdsc.idx;
169 LOG(info) << "Checksum: " << mdsc.crc;
170 LOG(info) << "Size: " << mdsc.size;
171 LOG(info) << "Offset: " << mdsc.offset;
172}
173
174bool CbmMQTsaInfo::CheckTimeslice(const fles::Timeslice& ts)
175{
176 if (0 == ts.num_components()) {
177 LOG(error) << "No Component in TS " << ts.index();
178 return 1;
179 }
180 LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
181
182 for (size_t c = 0; c < ts.num_components(); ++c) {
183 LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
184 LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
185 LOG(info) << "Component " << c << " has the system id 0x" << std::hex
186 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
187
188 /*
189 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
190 PrintMicroSliceDescriptor(ts.descriptor(c,m));
191 }
192*/
193 }
194
195 return true;
196}
std::vector< std::string > fAllowedChannels
virtual bool ConditionalRun()
std::chrono::steady_clock::time_point fTime
uint64_t fTSCounter
bool IsChannelNameAllowed(std::string)
uint64_t fMaxTimeslices
uint64_t fPort
std::vector< std::string > fInputFileList
List of input files.
virtual void InitTask()
std::string fFileName
bool CheckTimeslice(const fles::Timeslice &ts)
fles::TimesliceSource * fSource
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
uint64_t fTSNumber
uint64_t fMessageCounter
uint64_t fFileCounter
std::string fHost
virtual ~CbmMQTsaInfo()
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.