CbmRoot
Loading...
Searching...
No Matches
CbmMQTsaSampler.cxx
Go to the documentation of this file.
1/* Copyright (C) 2017-2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Florian Uhlig [committer] */
4
13#include "CbmMQTsaSampler.h"
14
15#include "CbmMQDefs.h"
16
17#include "TimesliceInputArchive.hpp"
18#include "TimesliceSubscriber.hpp"
19
20#include "FairMQLogger.h"
21#include "FairMQProgOptions.h" // device->fConfig
22
23#include <boost/algorithm/string.hpp>
24#include <boost/archive/binary_oarchive.hpp>
25#include <boost/filesystem.hpp>
26#include <boost/regex.hpp>
27
28namespace filesys = boost::filesystem;
29
30#include <thread> // this_thread::sleep_for
31
32#include <algorithm>
33#include <chrono>
34#include <ctime>
35#include <string>
36
37#include <stdio.h>
38
39using namespace std;
40
41#include <stdexcept>
42
43struct InitTaskError : std::runtime_error {
44 using std::runtime_error::runtime_error;
45};
46
48 : FairMQDevice()
49 , fMaxTimeslices(0)
50 , fFileName("")
51 , fDirName("")
52 , fInputFileList()
53 , fFileCounter(0)
54 , fHost("")
55 , fPort(0)
56 , fTSNumber(0)
57 , fTSCounter(0)
58 , fMessageCounter(0)
59 , fSource(nullptr)
60 , fTime()
61{
62}
63
65try {
66 // Get the values from the command line options (via fConfig)
67 fFileName = fConfig->GetValue<string>("filename");
68 fDirName = fConfig->GetValue<string>("dirname");
69 fHost = fConfig->GetValue<string>("flib-host");
70 fPort = fConfig->GetValue<uint64_t>("flib-port");
71 fMaxTimeslices = fConfig->GetValue<uint64_t>("max-timeslices");
72
73 // Check which input is defined
74 // Posibilities
75 // filename && ! dirname : single file
76 // filename with wildcards && diranme : all files with filename regex in the directory
77 // host && port : connect to the flim server
78
79 bool isGoodInputCombi {false};
80 if (0 != fFileName.size() && 0 == fDirName.size() && 0 == fHost.size() && 0 == fPort) {
81 isGoodInputCombi = true;
82 // Create a Path object from given path string
83 filesys::path pathObj(fFileName);
84 if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
85 fInputFileList.push_back(fFileName);
86 LOG(info) << "Filename: " << fFileName;
87 }
88 else if (0 != fFileName.size() && 0 != fDirName.size() && 0 == fHost.size() && 0 == fPort) {
89 isGoodInputCombi = true;
90 filesys::path pathObj = fDirName;
91 if (!filesys::is_directory(pathObj)) { throw InitTaskError("Passed directory name is no valid directory"); }
92 if (fFileName.find("*") == std::string::npos) {
93 // Normal file without wildcards
94 pathObj += fFileName;
95 if (!filesys::is_regular_file(pathObj)) { throw InitTaskError("Passed file name is no valid file"); }
96 fInputFileList.push_back(pathObj.string());
97 LOG(info) << "Filename: " << fInputFileList[0];
98 }
99 else {
100 std::vector<filesys::path> v;
101
102 // escape "." which have a special meaning in regex
103 // change "*" to ".*" to find any number
104 // e.g. tofget4_hd2018.*.tsa => tofget4_hd2018\..*\.tsa
105 boost::replace_all(fFileName, ".", "\\.");
106 boost::replace_all(fFileName, "*", ".*");
107
108 // create regex
109 const boost::regex my_filter(fFileName);
110
111 // loop over all files in input directory
112 for (auto&& x : filesys::directory_iterator(pathObj)) {
113 // Skip if not a file
114 if (!boost::filesystem::is_regular_file(x)) continue;
115
116 // Skip if no match
117 // x.path().leaf().string() means get from directory iterator the
118 // current entry as filesys::path, from this extract the leaf
119 // filename or directory name and convert it to a string to be
120 // used in the regex:match
121 boost::smatch what;
122 if (!boost::regex_match(x.path().leaf().string(), what, my_filter)) continue;
123
124 v.push_back(x.path());
125 }
126
127 // sort the files which match the regex in increasing order
128 // (hopefully)
129 std::sort(v.begin(), v.end());
130
131 for (auto&& x : v)
132 fInputFileList.push_back(x.string());
133
134 LOG(info) << "The following files will be used in this order.";
135 for (auto&& x : v)
136 LOG(info) << " " << x;
137 }
138 }
139 else if (0 == fFileName.size() && 0 == fDirName.size() && 0 != fHost.size() && 0 != fPort) {
140 isGoodInputCombi = true;
141 LOG(info) << "Host: " << fHost;
142 LOG(info) << "Port: " << fPort;
143 }
144 else {
145 isGoodInputCombi = false;
146 }
147
148 if (!isGoodInputCombi) {
149 throw InitTaskError("Wrong combination of inputs. Either file or wildcard file + directory "
150 "or host + port are allowed combination.");
151 }
152
153
154 LOG(info) << "MaxTimeslices: " << fMaxTimeslices;
155
156 // Get the information about created channels from the device
157 // Check if the defined channels from the topology (by name)
158 // are in the list of channels which are possible/allowed
159 // for the device
160 // The idea is to check at initilization if the devices are
161 // properly connected. For the time beeing this is done with a
162 // nameing convention. It is not avoided that someone sends other
163 // data on this channel.
164 int noChannel = fChannels.size();
165 LOG(info) << "Number of defined output channels: " << noChannel;
166 for (auto const& entry : fChannels) {
167 LOG(info) << "Channel name: " << entry.first;
168 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
169 }
170
171 for (auto const& value : fComponentsToSend) {
172 LOG(info) << "Value : " << value;
173 if (value > 1) {
174 throw InitTaskError("Sending same data to more than one output channel "
175 "not implemented yet.");
176 }
177 }
178
179 if (0 == fFileName.size() && 0 != fHost.size()) {
180 std::string connector = "tcp://" + fHost + ":" + std::to_string(fPort);
181 LOG(info) << "Open TSPublisher at " << connector;
182 fSource = new fles::TimesliceSubscriber(connector, 1);
183 if (!fSource) { throw InitTaskError("Could not connect to publisher."); }
184 }
185 else {
186 if (false == OpenNextFile()) {
187 throw InitTaskError("Could not open the first input file in the list, Doing nothing!");
188 }
189 }
190
191 fTime = std::chrono::steady_clock::now();
192}
193catch (InitTaskError& e) {
194 LOG(error) << e.what();
195 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
197}
198
200{
201 // First Close and delete existing source
202 if (nullptr != fSource) delete fSource;
203
204 if (fInputFileList.size() > 0) {
206 fInputFileList.erase(fInputFileList.begin());
207 LOG(info) << "Open the Flib input file " << fFileName;
208 filesys::path pathObj(fFileName);
209 if (!filesys::is_regular_file(pathObj)) {
210 LOG(error) << "Input file " << fFileName << " doesn't exist.";
211 return false;
212 }
213 fSource = new fles::TimesliceInputArchive(fFileName);
214 if (!fSource) {
215 LOG(error) << "Could not open input file.";
216 return false;
217 }
218 }
219 else {
220 LOG(info) << "End of files list reached.";
221 return false;
222 }
223 return true;
224}
225
226bool CbmMQTsaSampler::IsChannelNameAllowed(std::string channelName)
227{
228
229 for (auto const& entry : fAllowedChannels) {
230 std::size_t pos1 = channelName.find(entry);
231 if (pos1 != std::string::npos) {
232 const vector<std::string>::const_iterator pos =
233 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
234 const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
235 LOG(info) << "Found " << entry << " in " << channelName;
236 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
237 fComponentsToSend[idx]++;
238 fChannelsToSend[idx].push_back(channelName);
239 return true;
240 }
241 }
242 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
243 LOG(error) << "Stop device.";
244 return false;
245}
246
248{
249
250
251 auto timeslice = fSource->get();
252 if (timeslice) {
254 fTSCounter++;
255 if (fTSCounter % 10000 == 0) LOG(info) << "Analyse Event " << fTSCounter;
256
257
258 const fles::Timeslice& ts = *timeslice;
259 // auto tsIndex = ts.index();
260
261
262 LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
263
264
265 CheckTimeslice(ts);
266
267 for (unsigned int nrComp = 0; nrComp < ts.num_components(); ++nrComp) {
268 CreateAndSendComponent(ts, nrComp);
269 }
270 return true;
271 }
272 else {
273 if (false == OpenNextFile()) {
274 CalcRuntime();
275 return false;
276 }
277 else {
278 return true;
279 }
280 }
281 }
282 else {
283 if (false == OpenNextFile()) {
284 CalcRuntime();
285 return false;
286 }
287 else {
288 return true;
289 }
290 }
291}
292
293bool CbmMQTsaSampler::CreateAndSendComponent(const fles::Timeslice& ts, int nrComp)
294{
295
296 // Check if component has to be send. If the corresponding channel
297 // is connected create the new timeslice and send it to the
298 // correct channel
299
300 LOG(info) << "SysID: " << static_cast<int>(ts.descriptor(nrComp, 0).sys_id);
301 const vector<int>::const_iterator pos =
302 std::find(fSysId.begin(), fSysId.end(), static_cast<int>(ts.descriptor(nrComp, 0).sys_id));
303 if (pos != fSysId.end()) {
304 const vector<std::string>::size_type idx = pos - fSysId.begin();
305 if (fComponentsToSend[idx] > 0) {
306 LOG(info) << "Create timeslice component for link " << nrComp;
307
308 fles::StorableTimeslice component {static_cast<uint32_t>(ts.num_core_microslices()), ts.index()};
309 component.append_component(ts.num_microslices(0));
310
311 for (size_t m = 0; m < ts.num_microslices(nrComp); ++m) {
312 component.append_microslice(0, m, ts.descriptor(nrComp, m), ts.content(nrComp, m));
313 }
314 if (!SendData(component, idx)) return false;
315 return true;
316 }
317 }
318 return true;
319}
320
321bool CbmMQTsaSampler::SendData(const fles::StorableTimeslice& component, int idx)
322{
323 // serialize the timeslice and create the message
324 std::stringstream oss;
325 boost::archive::binary_oarchive oa(oss);
326 oa << component;
327 std::string* strMsg = new std::string(oss.str());
328
329 FairMQMessagePtr msg(NewMessage(
330 const_cast<char*>(strMsg->c_str()), // data
331 strMsg->length(), // size
332 [](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
333 strMsg)); // object that manages the data
334
335 // TODO: Implement sending same data to more than one channel
336 // Need to create new message (copy message??)
337 if (fComponentsToSend[idx] > 1) { LOG(info) << "Need to copy FairMessage"; }
338
339 // in case of error or transfer interruption,
340 // return false to go to IDLE state
341 // successfull transfer will return number of bytes
342 // transfered (can be 0 if sending an empty message).
343
344 LOG(info) << "Send data to channel " << fChannelsToSend[idx][0];
345 if (Send(msg, fChannelsToSend[idx][0]) < 0) {
346 LOG(error) << "Problem sending data";
347 return false;
348 }
349
351 LOG(info) << "Send message " << fMessageCounter << " with a size of " << msg->GetSize();
352
353 return true;
354}
355
356
358
360{
361 std::chrono::duration<double> run_time = std::chrono::steady_clock::now() - fTime;
362
363 LOG(info) << "Runtime: " << run_time.count();
364 LOG(info) << "No more input data";
365}
366
367
368void CbmMQTsaSampler::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
369{
370 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
371 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
372 LOG(info) << "Equipement ID: " << mdsc.eq_id;
373 LOG(info) << "Flags: " << mdsc.flags;
374 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
375 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
376 LOG(info) << "Microslice Idx: " << mdsc.idx;
377 LOG(info) << "Checksum: " << mdsc.crc;
378 LOG(info) << "Size: " << mdsc.size;
379 LOG(info) << "Offset: " << mdsc.offset;
380}
381
382bool CbmMQTsaSampler::CheckTimeslice(const fles::Timeslice& ts)
383{
384 if (0 == ts.num_components()) {
385 LOG(error) << "No Component in TS " << ts.index();
386 return 1;
387 }
388 LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
389
390 for (size_t c = 0; c < ts.num_components(); ++c) {
391 LOG(info) << "Found " << ts.num_microslices(c) << " microslices in component " << c;
392 LOG(info) << "Component " << c << " has a size of " << ts.size_component(c) << " bytes";
393 LOG(info) << "Component " << c << " has the system id 0x" << std::hex
394 << static_cast<int>(ts.descriptor(c, 0).sys_id) << std::dec;
395 LOG(info) << "Component " << c << " has the system id 0x" << static_cast<int>(ts.descriptor(c, 0).sys_id);
396
397 /*
398 for (size_t m = 0; m < ts.num_microslices(c); ++m) {
399 PrintMicroSliceDescriptor(ts.descriptor(c,m));
400 }
401*/
402 }
403
404 return true;
405}
fscal v[fmask::Size]
Definition KfSimdPseudo.h:4
uint64_t fMaxTimeslices
bool CreateAndSendComponent(const fles::Timeslice &, int)
bool SendData(const fles::StorableTimeslice &component)
fles::TimesliceSource * fSource
std::chrono::steady_clock::time_point fTime
uint64_t fMessageCounter
virtual void InitTask()
std::vector< std::vector< std::string > > fChannelsToSend
bool CheckTimeslice(const fles::Timeslice &ts)
std::string fHost
std::vector< std::string > fAllowedChannels
std::string fFileName
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
std::string fDirName
std::vector< std::string > fInputFileList
List of input files.
bool IsChannelNameAllowed(std::string)
std::vector< int > fComponentsToSend
virtual bool ConditionalRun()
std::vector< int > fSysId
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.