12#include "BoostSerializer.h"
13#include "FairMQProgOptions.h"
14#include "FairRootFileSink.h"
15#include "FairRootManager.h"
16#include "FairRunOnline.h"
17#include "RootSerializer.h"
20#include <boost/archive/binary_iarchive.hpp>
21#include <boost/serialization/utility.hpp>
29using std::istringstream;
35 using std::runtime_error::runtime_error;
62 LOG(info) <<
"Init options for CbmDevEventSink";
63 string outputFileName = fConfig->GetValue<std::string>(
"OutFileName");
64 string channelNameDataInput = fConfig->GetValue<std::string>(
"ChannelNameDataInput");
65 string channelNameCommands = fConfig->GetValue<std::string>(
"ChannelNameCommands");
76 if (
"" != outputFileName) {
79 fFairRootMgr->SetSink(
new FairRootFileSink(outputFileName));
89 LOG(info) <<
"Init ROOT Output to " << outputFileName;
92 LOG(error) << e.what();
103 LOG(info) <<
"File closed after " <<
fNumMessages <<
" and saving " <<
fNumTs <<
" TS";
104 LOG(info) <<
"Index of last processed timeslice: " <<
fPrevTsIndex, ChangeState(fair::mq::Transition::Stop);
105 std::this_thread::sleep_for(std::chrono::milliseconds(3000));
106 ChangeState(fair::mq::Transition::End);
117 string msgStrCmd(
static_cast<char*
>(msg->GetData()), msg->GetSize());
118 istringstream issCmd(msgStrCmd);
119 boost::archive::binary_iarchive inputArchiveCmd(issCmd);
120 inputArchiveCmd >> command;
123 size_t charPosDel = command.find(
' ');
124 string type = command.substr(0, charPosDel);
130 if (charPosDel == string::npos) {
131 LOG(error) <<
"HandleCommand: Incomplete EOF command " << command;
135 string rest = command.substr(charPosDel);
136 charPosDel = rest.find(
' ');
137 if (charPosDel == string::npos) {
138 LOG(error) <<
"HandleCommand: Incomplete EOF command " << command;
141 uint64_t lastTsIndex = std::stoul(rest.substr(0, charPosDel));
145 uint64_t numTs = std::stoul(rest.substr(charPosDel));
148 LOG(info) <<
"HandleCommand: Received EOF command with final TS index " << lastTsIndex <<
" and total number of TS "
154 else if (type ==
"STOP") {
155 LOG(info) <<
"HandleCommand: Received STOP command";
161 LOG(warning) <<
"HandleCommand: Unknown command " << type <<
" => will be ignored!";
173 LOG(debug) <<
"Received message number " <<
fNumMessages <<
" with " << parts.Size() <<
" parts"
174 <<
", size0: " << parts.At(0)->GetSize();
178 TObject* tempObjectPointer =
nullptr;
179 RootSerializer().Deserialize(*parts.At(0), tempObjectPointer);
180 if (tempObjectPointer && TString(tempObjectPointer->ClassName()).EqualTo(
"TimesliceMetaData")) {
184 LOG(fatal) <<
"Failed to deserialize the TS metadata";
188 std::string msgStrEvt(
static_cast<char*
>(parts.At(1)->GetData()), (parts.At(1))->GetSize());
189 std::istringstream issEvt(msgStrEvt);
190 boost::archive::binary_iarchive inputArchiveEvt(issEvt);
191 inputArchiveEvt >> (*fEventVec);
virtual ~CbmDevEventSink()
Destructor.
virtual void InitTask()
Initialization.
FairRootManager * fFairRootMgr
FairRootManager used for ROOT file I/O.
FairRunOnline * fFairRun
FairRunOnline to instantiate FairRootManager.
uint64_t fPrevTsIndex
Index of last processed timeslice.
size_t fNumMessages
Number of received data messages.
std::vector< CbmDigiEvent > * fEventVec
Data output: events.
TimesliceMetaData * fTsMetaData
Data output: TS meta data.
bool HandleCommand(FairMQMessagePtr &, int flag)
Action on command messages.
size_t fNumTs
Number of processed timeslices.
void Finish()
Finishing run.
bool fFinishDone
Keep track of whether the Finish method was already called.
bool HandleData(FairMQParts &parts, int flag)
Action on data messages.
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)