CbmRoot
Loading...
Searching...
No Matches
CbmDevEventSink.cxx
Go to the documentation of this file.
1/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Dominik Smith [committer], Pierre-Alain Loizeau, Volker Friese */
4
5#include "CbmDevEventSink.h"
6
7// CBM headers
8#include "CbmMQDefs.h"
9#include "TimesliceMetaData.h"
10
11// FAIRROOT headers
12#include "BoostSerializer.h"
13#include "FairMQProgOptions.h" // device->fConfig
14#include "FairRootFileSink.h"
15#include "FairRootManager.h"
16#include "FairRunOnline.h"
17#include "RootSerializer.h"
18
19// External packages
20#include <boost/archive/binary_iarchive.hpp>
21#include <boost/serialization/utility.hpp>
22
24#include <stdexcept>
25#include <string>
26#include <thread> // this_thread::sleep_for
27
28
29using std::istringstream;
30using std::string;
31using std::vector;
32
33
34struct InitTaskError : std::runtime_error {
35 using std::runtime_error::runtime_error;
36};
37
38
39// ----- Destructor -------------------------------------------------------
41{
42
43 // Close things properly if not already done
44 if (!fFinishDone) Finish();
45
46 // Clear and delete members
47 if (fTsMetaData) delete fTsMetaData;
48 if (fEventVec != nullptr) {
49 fEventVec->clear();
50 delete fEventVec;
51 }
52 if (fFairRun) delete fFairRun;
53}
54// ----------------------------------------------------------------------------
55
56
57// ----- Initialize -------------------------------------------------------
59try {
60
61 // Read options from executable
62 LOG(info) << "Init options for CbmDevEventSink";
63 string outputFileName = fConfig->GetValue<std::string>("OutFileName");
64 string channelNameDataInput = fConfig->GetValue<std::string>("ChannelNameDataInput");
65 string channelNameCommands = fConfig->GetValue<std::string>("ChannelNameCommands");
66
67 // --- Hook action on input channels
68 OnData(channelNameDataInput, &CbmDevEventSink::HandleData);
69 OnData(channelNameCommands, &CbmDevEventSink::HandleCommand);
70
71 // --- Prepare ROOT output
72 // TODO: WE use FairRunOnline and FairRootManager to manage the output. There might be a more
73 // elegant way.
75 fEventVec = new vector<CbmDigiEvent>();
76 if ("" != outputFileName) {
77 fFairRun = new FairRunOnline();
78 fFairRootMgr = FairRootManager::Instance();
79 fFairRootMgr->SetSink(new FairRootFileSink(outputFileName));
80 if (nullptr == fFairRootMgr->GetOutFile()) throw InitTaskError("Could not open ROOT file");
81 }
82 else {
83 throw InitTaskError("Empty output filename!");
84 }
85 fFairRootMgr->InitSink();
86 fFairRootMgr->RegisterAny("TimesliceMetaData.", fTsMetaData, kTRUE);
87 fFairRootMgr->RegisterAny("DigiEvent", fEventVec, kTRUE);
88 fFairRootMgr->WriteFolder();
89 LOG(info) << "Init ROOT Output to " << outputFileName;
90}
91catch (InitTaskError& e) {
92 LOG(error) << e.what();
94}
95// ----------------------------------------------------------------------------
96
97
98// ----- Finish execution -------------------------------------------------
100{
101 fFairRootMgr->Write();
102 fFairRootMgr->CloseSink();
103 LOG(info) << "File closed after " << fNumMessages << " and saving " << fNumTs << " TS";
104 LOG(info) << "Index of last processed timeslice: " << fPrevTsIndex, ChangeState(fair::mq::Transition::Stop);
105 std::this_thread::sleep_for(std::chrono::milliseconds(3000));
106 ChangeState(fair::mq::Transition::End);
107 fFinishDone = true;
108}
109// ----------------------------------------------------------------------------
110
111
112// ----- Handle command message -------------------------------------------
113bool CbmDevEventSink::HandleCommand(FairMQMessagePtr& msg, int)
114{
115 // Deserialize command string
116 string command;
117 string msgStrCmd(static_cast<char*>(msg->GetData()), msg->GetSize());
118 istringstream issCmd(msgStrCmd);
119 boost::archive::binary_iarchive inputArchiveCmd(issCmd);
120 inputArchiveCmd >> command;
121
122 // Command tag is up to the first blank
123 size_t charPosDel = command.find(' ');
124 string type = command.substr(0, charPosDel);
125
126 // EOF command
127 if (type == "EOF") {
128
129 // The second substring should be the last timeslice index
130 if (charPosDel == string::npos) {
131 LOG(error) << "HandleCommand: Incomplete EOF command " << command;
132 return false;
133 }
134 charPosDel++;
135 string rest = command.substr(charPosDel);
136 charPosDel = rest.find(' ');
137 if (charPosDel == string::npos) {
138 LOG(error) << "HandleCommand: Incomplete EOF command " << command;
139 return false;
140 }
141 uint64_t lastTsIndex = std::stoul(rest.substr(0, charPosDel));
142
143 // The third substring should be the timeslice count
144 charPosDel++;
145 uint64_t numTs = std::stoul(rest.substr(charPosDel));
146
147 // Log
148 LOG(info) << "HandleCommand: Received EOF command with final TS index " << lastTsIndex << " and total number of TS "
149 << numTs;
150 Finish();
151 } //? EOF
152
153 // STOP command
154 else if (type == "STOP") {
155 LOG(info) << "HandleCommand: Received STOP command";
156 Finish();
157 }
158
159 // Unknown command
160 else {
161 LOG(warning) << "HandleCommand: Unknown command " << type << " => will be ignored!";
162 }
163
164 return true;
165}
166// ----------------------------------------------------------------------------
167
168
169// ----- Handle data in input channel -------------------------------------
170bool CbmDevEventSink::HandleData(FairMQParts& parts, int)
171{
172 fNumMessages++;
173 LOG(debug) << "Received message number " << fNumMessages << " with " << parts.Size() << " parts"
174 << ", size0: " << parts.At(0)->GetSize();
175 if (0 == fNumMessages % 10000) LOG(info) << "Received " << fNumMessages << " messages";
176
177 // --- Extract TimesliceMetaData (part 0) TObject* tempObjectPointer = nullptr;
178 TObject* tempObjectPointer = nullptr;
179 RootSerializer().Deserialize(*parts.At(0), tempObjectPointer);
180 if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo("TimesliceMetaData")) {
181 (*fTsMetaData) = *(static_cast<TimesliceMetaData*>(tempObjectPointer));
182 }
183 else {
184 LOG(fatal) << "Failed to deserialize the TS metadata";
185 }
186
187 // --- Extract event vector (part 1)
188 std::string msgStrEvt(static_cast<char*>(parts.At(1)->GetData()), (parts.At(1))->GetSize());
189 std::istringstream issEvt(msgStrEvt);
190 boost::archive::binary_iarchive inputArchiveEvt(issEvt);
191 inputArchiveEvt >> (*fEventVec);
192
193 // --- Dump tree entry for this timeslice
194 fFairRootMgr->StoreWriteoutBufferData(fFairRootMgr->GetEventTime());
195 fFairRootMgr->Fill();
196 fFairRootMgr->DeleteOldWriteoutBufferData();
197 fEventVec->clear();
198
199 // --- Timeslice log
200 LOG(info) << "Processed TS " << fTsMetaData->GetIndex() << " with " << fEventVec->size() << " events";
201
202 return true;
203}
204// ----------------------------------------------------------------------------
virtual ~CbmDevEventSink()
Destructor.
virtual void InitTask()
Initialization.
FairRootManager * fFairRootMgr
FairRootManager used for ROOT file I/O.
FairRunOnline * fFairRun
FairRunOnline to instantiate FairRootManager.
uint64_t fPrevTsIndex
Index of last processed timeslice.
size_t fNumMessages
Number of received data messages.
std::vector< CbmDigiEvent > * fEventVec
Data output: events.
TimesliceMetaData * fTsMetaData
Data output: TS meta data.
bool HandleCommand(FairMQMessagePtr &, int flag)
Action on command messages.
size_t fNumTs
Number of processed timeslices.
void Finish()
Finishing run.
bool fFinishDone
Keep track of whether the Finish method was already called.
bool HandleData(FairMQParts &parts, int flag)
Action on data messages.
uint64_t GetIndex() const
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26