CbmRoot
Loading...
Searching...
No Matches
CbmDeviceStsHitProducerIdeal.cxx
Go to the documentation of this file.
1/* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Florian Uhlig [committer] */
4
13
14#include "CbmMQDefs.h"
15#include "CbmStsHit.h"
16#include "CbmStsPoint.h"
17#include "CbmTrdParSetGas.h"
18
19#include "FairMQLogger.h"
20#include "FairMQProgOptions.h" // device->fConfig
21#include "FairParGenericSet.h"
22#include "FairRunAna.h"
23
24#include "TList.h"
25
26#include <boost/archive/binary_iarchive.hpp>
27#include <boost/serialization/vector.hpp>
28
29#include <stdexcept>
30#include <string>
31#include <vector>
32struct InitTaskError : std::runtime_error {
33 using std::runtime_error::runtime_error;
34};
35
36//using namespace std;
37using std::string;
38
40 : fMaxEvents {0}
41 , fNumMessages {0}
42 , fRunId {"0"}
43 , fvmcworkdir {""}
44 , fTrdGasPar {nullptr}
45{
46}
47
49
50
52try {
53
54 fMaxEvents = fConfig->GetValue<uint64_t>("max-events");
55
56 LOG(info) << "MaxEvents: " << fMaxEvents;
57
58 // Check if the defined channels from the topology (by name)
59 // are in the list of channels which are allowed
60 // Connnect channels which delivers StsPoints with the proper
61 // function handling the data
62 fChan.CheckChannels(this);
65
66 for (auto const& entry : fChannels) {
67 LOG(info) << "Channel name: " << entry.first;
68 if (entry.first.compare("StsPoint")) LOG(info) << "Connect Channel " << entry.first << "with data type StsPoint";
69 OnData(entry.first, &CbmDeviceStsHitProducerIdeal::HandleData);
70 }
71 // Initialize the algorithm and get the proper parameter containers
72 fAlgo->Init();
74}
75catch (InitTaskError& e) {
76 LOG(error) << e.what();
77 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
79}
80
82{
83 Bool_t initOK {kTRUE};
84
85 fRunId = fConfig->GetValue<string>("run-id");
86 fvmcworkdir = fConfig->GetValue<string>("vmcworkdir");
87 fMaxEvents = fConfig->GetValue<uint64_t>("max-events");
88
89 LOG(info) << "Init parameter containers for CbmDeviceStsHitProducerIdeal.";
90
91
92 TList* fParCList = fAlgo->GetParList();
93
94 for (int iparC = 0; iparC < fParCList->GetEntries(); iparC++) {
95 FairParGenericSet* tempObj = (FairParGenericSet*) (fParCList->At(iparC));
96 fParCList->Remove(tempObj);
97 std::string paramName {tempObj->GetName()};
98
99 // NewSimpleMessage create s a copy of the data and takes care of its destruction (after the transfer takes place).
100 // Should only be used for small data because of the cost of an additional copy
101
102 // Her must come the proper Runid
103 std::string message = paramName + ",111";
104 LOG(info) << "Requesting parameter container " << paramName << ", sending message: " << message;
105
106 FairMQMessagePtr req(NewSimpleMessage(message));
107 FairMQMessagePtr rep(NewMessage());
108
109 FairParGenericSet* newObj = nullptr;
110
111 if (Send(req, "parameters") > 0) {
112 if (Receive(rep, "parameters") >= 0) {
113 if (rep->GetSize() != 0) {
114 CbmMQTMessage tmsg(rep->GetData(), rep->GetSize());
115 newObj = static_cast<FairParGenericSet*>(tmsg.ReadObject(tmsg.GetClass()));
116 LOG(info) << "Received unpack parameter from the server:";
117 newObj->print();
118 }
119 else {
120 LOG(error) << "Received empty reply. Parameter not available";
121 } // if (rep->GetSize() != 0)
122 } // if (Receive(rep, "parameters") >= 0)
123 } // if (Send(req, "parameters") > 0)
124 fParCList->AddAt(newObj, iparC);
125 delete tempObj;
126 } // for ( int iparC = 0; iparC < fParCList->GetEntries(); iparC++ )
127
128 // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
129 // Should only be used for small data because of the cost of an additional copy
130
131 initOK = fAlgo->InitContainers();
132
133 return initOK;
134 return true;
135}
136
137
138// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
139bool CbmDeviceStsHitProducerIdeal::HandleData(FairMQMessagePtr& msg, int /*index*/)
140{
141
142 fNumMessages++;
143 LOG(debug) << "Received message number " << fNumMessages << " with size " << msg->GetSize();
144
145
146 // Unpack the message into a vector of CbmStsPoints
147 std::string msgStr(static_cast<char*>(msg->GetData()), msg->GetSize());
148 std::istringstream iss(msgStr);
149 boost::archive::binary_iarchive inputArchive(iss);
150
151 std::vector<CbmStsPoint> points;
152 inputArchive >> points;
153
154 // Pass the vector to the algorithm
155 // Get the vector with the newly created data objects from the algorithm
156 std::vector<CbmStsHit> hits = fAlgo->ProcessInputData(points);
157
158 // Event summary
159 LOG(info) << "Out of " << points.size() << " StsPoints, " << hits.size() << " Hits created.";
160
161
162 if (fNumMessages % 10000 == 0) LOG(info) << "Processed " << fNumMessages << " time slices";
163
164 // Send the data to a consumer
165 SendData();
166
167 return true;
168}
169
170
172
174
TClonesArray * points
Data class for a reconstructed hit in the STS.
static vector< vector< QAHit > > hits
bool HandleData(FairMQMessagePtr &, int)
std::vector< std::vector< std::string > > fChannelsToSend
bool CheckChannels(FairMQDevice *device)
std::vector< int > GetComponentsToSend()
std::vector< std::vector< std::string > > GetChannelsToSend()
virtual std::vector< CbmStsHit > ProcessInputData(const std::vector< CbmStsPoint > &)
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26