CbmRoot
Loading...
Searching...
No Matches
CbmDeviceTriggerHandlerEtof.cxx
Go to the documentation of this file.
1/* Copyright (C) 2019 PI-UHd, GSI
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Norbert Herrmann [committer] */
4
13
14#include "CbmMQDefs.h"
15
16#include "FairEventHeader.h"
17#include "FairFileHeader.h"
18#include "FairGeoParSet.h"
19#include "FairMQLogger.h"
20#include "FairMQProgOptions.h" // device->fConfig
21#include "FairRootFileSink.h"
22#include "FairRootManager.h"
23#include "FairRunOnline.h"
24#include "FairRuntimeDb.h"
25
26#include <thread> // this_thread::sleep_for
27
28#include <boost/archive/binary_iarchive.hpp>
29#include <boost/archive/binary_oarchive.hpp>
30#include <boost/serialization/vector.hpp>
31
32#include <chrono>
33#include <iomanip>
34#include <stdexcept>
35#include <string>
36struct InitTaskError : std::runtime_error {
37 using std::runtime_error::runtime_error;
38};
39
40static std::chrono::steady_clock::time_point dctime = std::chrono::steady_clock::now();
41static double dSize = 0.;
42
43using namespace std;
44
46 : fNumMessages(0)
47 , fiMsgCnt(0)
48 , fbMonitorMode(kFALSE)
49 , fbDebugMonitorMode(kFALSE)
50 , fbSandboxMode(kFALSE)
51 , fbEventDumpEna(kFALSE)
52 , fdEvent(0.)
53{
54}
55
57
59try {
60 // Get the information about created channels from the device
61 // Check if the defined channels from the topology (by name)
62 // are in the list of channels which are possible/allowed
63 // for the device
64 // The idea is to check at initilization if the devices are
65 // properly connected. For the time beeing this is done with a
66 // nameing convention. It is not avoided that someone sends other
67 // data on this channel.
68 int noChannel = fChannels.size();
69 LOG(info) << "Number of defined input channels: " << noChannel;
70 for (auto const& entry : fChannels) {
71 LOG(info) << "Channel name: " << entry.first;
72 if (!IsChannelNameAllowed(entry.first)) throw InitTaskError("Channel name does not match.");
73 if (entry.first != "syscmd") OnData(entry.first, &CbmDeviceTriggerHandlerEtof::HandleData);
74 else
76 }
78}
79catch (InitTaskError& e) {
80 LOG(error) << e.what();
82}
83
85{
86 for (auto const& entry : fAllowedChannels) {
87 std::size_t pos1 = channelName.find(entry);
88 if (pos1 != std::string::npos) {
89 const vector<std::string>::const_iterator pos =
90 std::find(fAllowedChannels.begin(), fAllowedChannels.end(), entry);
91 const vector<std::string>::size_type idx = pos - fAllowedChannels.begin();
92 LOG(info) << "Found " << entry << " in " << channelName;
93 LOG(info) << "Channel name " << channelName << " found in list of allowed channel names at position " << idx;
94 return true;
95 }
96 }
97 LOG(info) << "Channel name " << channelName << " not found in list of allowed channel names.";
98 LOG(error) << "Stop device.";
99 return false;
100}
101
103{
104 LOG(info) << "Init work space for CbmDeviceTriggerHandlerEtof.";
105
106 // steering variables
107 fbSandboxMode = fConfig->GetValue<bool>("SandboxMode");
108
109 return kTRUE;
110}
111
112// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
113//bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQMessagePtr& msg, int /*index*/)
114bool CbmDeviceTriggerHandlerEtof::HandleData(FairMQParts& parts, int /*index*/)
115{
116 // Don't do anything with the data
117 // Maybe add an message counter which counts the incomming messages and add
118 // an output
119 fNumMessages++;
120 LOG(debug) << "Received message " << fNumMessages << " with " << parts.Size() << " parts"
121 << ", size0: " << parts.At(0)->GetSize();
122
123 uint TrigWord {0};
124 std::string msgStrE(static_cast<char*>(parts.At(0)->GetData()), (parts.At(0))->GetSize());
125 std::istringstream issE(msgStrE);
126 boost::archive::binary_iarchive inputArchiveE(issE);
127 inputArchiveE >> TrigWord;
128
129 char* pDataBuff = static_cast<char*>(parts.At(1)->GetData());
130 int iBuffSzByte = parts.At(1)->GetSize();
131
132 // Send Subevent to STAR
133 LOG(debug) << "Send Data for event " << fdEvent << ", TrigWord " << TrigWord << " with size " << iBuffSzByte
134 << Form(" at %p ", pDataBuff);
135 if (kFALSE == fbSandboxMode) { star_rhicf_write(TrigWord, pDataBuff, iBuffSzByte); }
136 dSize += iBuffSzByte;
137 if (0 == (int) fdEvent % 10000) {
138 std::chrono::duration<double> deltatime = std::chrono::steady_clock::now() - dctime;
139 LOG(info) << "Processed " << fdEvent << " events, delta-time: " << deltatime.count()
140 << ", rate: " << dSize * 1.E-6 / deltatime.count() << "MB/s";
141 dctime = std::chrono::steady_clock::now();
142 dSize = 0.;
143 }
144 fdEvent++;
145
146 return kTRUE;
147}
148
149/************************************************************************************/
150
151bool CbmDeviceTriggerHandlerEtof::HandleMessage(FairMQMessagePtr& msg, int /*index*/)
152{
153 const char* cmd = (char*) (msg->GetData());
154 const char cmda[4] = {*cmd};
155 LOG(info) << "Handle message " << cmd << ", " << cmd[0];
156
157 // only one implemented so far "Stop"
158 if (strcmp(cmda, "STOP")) {
160 cbm::mq::LogState(this);
162 cbm::mq::LogState(this);
164 cbm::mq::LogState(this);
166 cbm::mq::LogState(this);
167 // ChangeState(fair::mq::Transition(STOP));
168 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
169 }
170
171 return true;
172}
static Double_t dctime
static double dSize
static std::chrono::steady_clock::time_point dctime
int star_rhicf_write(unsigned int trg_word, void *dta, int bytes)
bool HandleMessage(FairMQMessagePtr &, int)
std::vector< std::string > fAllowedChannels
Bool_t fbSandboxMode
Switch ON the filling of a additional set of histograms.
Double_t fdEvent
Switch ON the dumping of the events to a binary file.
Bool_t IsChannelNameAllowed(std::string channelName)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
void LogState(FairMQDevice *device)
Definition CbmMQDefs.h:47
Hash for CbmL1LinkKey.