CbmRoot
Loading...
Searching...
No Matches
CbmDevUnpack.cxx
Go to the documentation of this file.
1/* Copyright (C) 2022 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau, Dominik Smith [committer] */
4
12#include "CbmDevUnpack.h"
13
14#include "BoostSerializer.h"
15#include "CbmDigiTimeslice.h"
16#include "CbmMQDefs.h"
17#include "FairMQLogger.h"
18#include "FairMQProgOptions.h" // device->fConfig
19#include "FairParGenericSet.h"
20#include "RootSerializer.h"
21#include "StorableTimeslice.hpp"
22#include "TStopwatch.h"
23#include "TimesliceMetaData.h"
24
25#include <boost/archive/binary_iarchive.hpp>
26#include <boost/serialization/utility.hpp>
27
28#include <array>
29#include <iomanip>
30#include <stdexcept>
31#include <string>
32#include <utility>
33struct InitTaskError : std::runtime_error {
34 using std::runtime_error::runtime_error;
35};
36
37using namespace std;
38using cbm::algo::UnpackMuchElinkPar;
39using cbm::algo::UnpackMuchPar;
40using cbm::algo::UnpackStsElinkPar;
41using cbm::algo::UnpackStsPar;
42
44
46try {
48 LOG(info) << "Init options for CbmDevUnpack.";
49 fsChannelNameDataInput = fConfig->GetValue<std::string>("TsNameIn");
50 fsChannelNameDataOutput = fConfig->GetValue<std::string>("TsNameOut");
51}
52catch (InitTaskError& e) {
53 LOG(error) << e.what();
54 // Wrapper defined in CbmMQDefs.h to support different FairMQ versions
56}
57
59{
60 // --- Common parameters for all components for STS
61 uint32_t numChansPerAsicSts = 128; // R/O channels per ASIC for STS
62 uint32_t numAsicsPerModuleSts = 16; // Number of ASICs per module for STS
63
64 // Create one algorithm per component for STS and configure it with parameters
65 auto equipIdsSts = fStsConfig.GetEquipmentIds();
66 for (auto& equip : equipIdsSts) {
67 std::unique_ptr<UnpackStsPar> par(new UnpackStsPar());
68 par->fNumChansPerAsic = numChansPerAsicSts;
69 par->fNumAsicsPerModule = numAsicsPerModuleSts;
70 const size_t numElinks = fStsConfig.GetNumElinks(equip);
71 for (size_t elink = 0; elink < numElinks; elink++) {
72 UnpackStsElinkPar elinkPar;
73 auto mapEntry = fStsConfig.Map(equip, elink);
74 elinkPar.fAddress = mapEntry.first; // Module address for this elink
75 elinkPar.fAsicNr = mapEntry.second; // ASIC number within module
76 elinkPar.fTimeOffset = 0.;
77 elinkPar.fAdcOffset = 0.;
78 elinkPar.fAdcGain = 0.;
79 // TODO: Add parameters for time and ADC calibration
80 par->fElinkParams.push_back(elinkPar);
81 }
82 fAlgoSts[equip].SetParams(std::move(par));
83 LOG(info) << "--- Configured equipment " << equip << " with " << numElinks << " elinks";
84 } //# equipments
85
86 LOG(info) << "--- Configured " << fAlgoSts.size() << " unpacker algorithms for STS.";
87 LOG(debug) << "Readout map:" << fStsConfig.PrintReadoutMap();
88 LOG(info) << "==================================================";
89 std::cout << std::endl;
90
91 // Create one algorithm per component for MUCH and configure it with parameters
92 auto equipIdsMuch = fMuchConfig.GetEquipmentIds();
93 for (auto& equip : equipIdsMuch) {
94 std::unique_ptr<UnpackMuchPar> par(new UnpackMuchPar());
95 const size_t numElinks = fMuchConfig.GetNumElinks(equip);
96 for (size_t elink = 0; elink < numElinks; elink++) {
97 UnpackMuchElinkPar elinkPar;
98 elinkPar.fAddress = fMuchConfig.Map(equip, elink); // Vector of MUCH addresses for this elink
99 elinkPar.fTimeOffset = 0.;
100 par->fElinkParams.push_back(elinkPar);
101 }
102 fAlgoMuch[equip].SetParams(std::move(par));
103 LOG(info) << "--- Configured equipment " << equip << " with " << numElinks << " elinks";
104 }
105
106 LOG(info) << "--- Configured " << fAlgoMuch.size() << " unpacker algorithms for MUCH.";
107 LOG(info) << "==================================================";
108 std::cout << std::endl;
109
110 return true;
111}
112
113
114// Method called by run loop and requesting new data from the TS source whenever
116{
118 std::string message = "full";
119 LOG(debug) << "Requesting new TS by sending message: full" << message;
120 FairMQMessagePtr req(NewSimpleMessage(message));
121 FairMQMessagePtr rep(NewMessage());
122
123 if (Send(req, fsChannelNameDataInput) <= 0) {
124 LOG(error) << "Failed to send the request! message was " << message;
125 return false;
126 }
127 else if (Receive(rep, fsChannelNameDataInput) < 0) {
128 LOG(error) << "Failed to receive a reply to the request! message was " << message;
129 return false;
130 }
131 else if (rep->GetSize() == 0) {
132 LOG(error) << "Received empty reply. Something went wrong with the timeslice generation! message was " << message;
133 return false;
134 }
135
137 if (0 == fNumMessages) {
138 InitAlgos();
139 }
140
141 fNumMessages++;
142 LOG(debug) << "Received message number " << fNumMessages << " with size " << rep->GetSize();
143
144 if (0 == fNumMessages % 10000) LOG(info) << "Received " << fNumMessages << " messages";
145
146 std::string msgStr(static_cast<char*>(rep->GetData()), rep->GetSize());
147 std::istringstream iss(msgStr);
148 boost::archive::binary_iarchive inputArchive(iss);
149
151 fles::StorableTimeslice ts{0};
152 inputArchive >> ts;
153
155 const size_t NbCoreMsPerTs = ts.num_core_microslices();
156 const size_t NbOverMsPerTs = ts.num_microslices(0) - ts.num_core_microslices();
157 const double MsSizeInNs = (ts.descriptor(0, NbCoreMsPerTs).idx - ts.descriptor(0, 0).idx) / NbCoreMsPerTs;
158 const double TsCoreSizeInNs = MsSizeInNs * (NbCoreMsPerTs);
159 const double TsOverSizeInNs = MsSizeInNs * (NbOverMsPerTs);
160 const double TsFullSizeInNs = TsCoreSizeInNs + TsOverSizeInNs;
161 const TimesliceMetaData TsMetaData(ts.start_time(), TsCoreSizeInNs, TsOverSizeInNs, ts.index());
162
163 if (0 == fNumTs) {
164 LOG(info) << "Timeslice parameters: each TS has " << NbCoreMsPerTs << " Core MS and " << NbOverMsPerTs
165 << " Overlap MS, for a MS duration of " << MsSizeInNs << " ns, a core duration of " << TsCoreSizeInNs
166 << " ns and a full duration of " << TsFullSizeInNs << " ns";
167 }
168
170 CbmDigiTimeslice digiTs = DoUnpack(ts);
171
172 LOG(debug) << "Unpack: Sending TS index " << ts.index();
174 if (!SendData(digiTs, TsMetaData)) return false;
175 LOG(debug) << "Unpack: Sent TS index " << ts.index();
176
177 return true;
178}
179
180bool CbmDevUnpack::SendData(const CbmDigiTimeslice& timeslice, const TimesliceMetaData& TsMetaData)
181{
182 FairMQParts parts;
183
185 std::stringstream ossTS;
186 boost::archive::binary_oarchive oaTS(ossTS);
187 oaTS << timeslice;
188
189 std::string* strMsgTS = new std::string(ossTS.str());
190
191 parts.AddPart(NewMessage(
192 const_cast<char*>(strMsgTS->c_str()), // data
193 strMsgTS->length(), // size
194 [](void*, void* object) { delete static_cast<std::string*>(object); },
195 strMsgTS)); // object that manages the data
196
200 FairMQMessagePtr messTsMeta(NewMessage());
201 RootSerializer().Serialize(*messTsMeta, &TsMetaData);
202 parts.AddPart(std::move(messTsMeta));
203
204 if (Send(parts, fsChannelNameDataOutput) < 0) {
205 LOG(error) << "Problem sending data to " << fsChannelNameDataOutput;
206 return false;
207 }
208
209 return true;
210}
211
212CbmDigiTimeslice CbmDevUnpack::DoUnpack(const fles::Timeslice& timeslice)
213{
214 fNumTs++;
215
216 // Output digi timeslice
217 CbmDigiTimeslice digiTs;
218
219 // --- Timeslice properties
220 const uint64_t tsIndex = timeslice.index();
221 const uint64_t tsTime = timeslice.start_time();
222 const uint64_t numComp = timeslice.num_components();
223
224 // --- Timer
225 TStopwatch timer;
226 timer.Start();
227
228 // --- Counters
229 size_t numMs = 0;
230 size_t numBytes = 0;
231 size_t numDigis = 0;
232 uint64_t numCompUsed = 0;
233
234 // --- Component loop
235 for (uint64_t comp = 0; comp < numComp; comp++) {
236
237 // --- Component log
238 size_t numBytesInComp = 0;
239 size_t numDigisInComp = 0;
240 uint64_t numMsInComp = 0;
241
242 TStopwatch compTimer;
243 compTimer.Start();
244
245 auto systemId = static_cast<fles::Subsystem>(timeslice.descriptor(comp, 0).sys_id);
246
247 if (systemId == fles::Subsystem::STS) {
248 const uint16_t equipmentId = timeslice.descriptor(comp, 0).eq_id;
249 const auto algoIt = fAlgoSts.find(equipmentId);
250 assert(algoIt != fAlgoSts.end());
251
252 // The current algorithm works for the STS data format version 0x20 used in 2021.
253 // Other versions are not yet supported.
254 // In the future, different data formats will be supported by instantiating different
255 // algorithms depending on the version.
256 assert(timeslice.descriptor(comp, 0).sys_ver == 0x20);
257
258 // --- Microslice loop
259 numMsInComp = timeslice.num_microslices(comp);
260 for (uint64_t mslice = 0; mslice < numMsInComp; mslice++) {
261 const auto msDescriptor = timeslice.descriptor(comp, mslice);
262 const auto msContent = timeslice.content(comp, mslice);
263 numBytesInComp += msDescriptor.size;
264 auto result = (algoIt->second)(msContent, msDescriptor, tsTime);
265 LOG(debug1) << "CbmDevUnpack::DoUnpack(): Component " << comp << ", microslice " << mslice << ", digis "
266 << result.first.size() << ", errors " << result.second.fNumNonHitOrTsbMessage << " | "
267 << result.second.fNumErrElinkOutOfRange << " | " << result.second.fNumErrInvalidFirstMessage
268 << " | " << result.second.fNumErrInvalidMsSize << " | " << result.second.fNumErrTimestampOverflow
269 << " | ";
270 const auto it = digiTs.fData.fSts.fDigis.end();
271 digiTs.fData.fSts.fDigis.insert(it, result.first.begin(), result.first.end());
272 numDigisInComp += result.first.size();
273 }
274 numCompUsed++;
275 } // system STS
276
277 if (systemId == fles::Subsystem::MUCH) {
278 const uint16_t equipmentId = timeslice.descriptor(comp, 0).eq_id;
279 const auto algoIt = fAlgoMuch.find(equipmentId);
280 assert(algoIt != fAlgoMuch.end());
281
282 // The current algorithm works for the MUCH data format version 0x20 used in 2021.
283 // Other versions are not yet supported.
284 // In the future, different data formats will be supported by instantiating different
285 // algorithms depending on the version.
286 assert(timeslice.descriptor(comp, 0).sys_ver == 0x20);
287
288 // --- Microslice loop
289 numMsInComp = timeslice.num_microslices(comp);
290 for (uint64_t mslice = 0; mslice < numMsInComp; mslice++) {
291 const auto msDescriptor = timeslice.descriptor(comp, mslice);
292 const auto msContent = timeslice.content(comp, mslice);
293 numBytesInComp += msDescriptor.size;
294 auto result = (algoIt->second)(msContent, msDescriptor, tsTime);
295 LOG(debug1) << "CbmDevUnpack::DoUnpack(): Component " << comp << ", microslice " << mslice << ", digis "
296 << result.first.size() << ", errors " << result.second.fNumNonHitOrTsbMessage << " | "
297 << result.second.fNumErrElinkOutOfRange << " | " << result.second.fNumErrInvalidFirstMessage
298 << " | " << result.second.fNumErrInvalidMsSize << " | " << result.second.fNumErrTimestampOverflow
299 << " | ";
300 const auto it = digiTs.fData.fMuch.fDigis.end();
301 digiTs.fData.fMuch.fDigis.insert(it, result.first.begin(), result.first.end());
302 numDigisInComp += result.first.size();
303 }
304 numCompUsed++;
305 } // system MUCH
306
307
308 compTimer.Stop();
309 LOG(debug) << "CbmDevUnpack::DoUnpack(): Component " << comp << ", microslices " << numMsInComp << " input size "
310 << numBytesInComp << " bytes, "
311 << ", digis " << numDigisInComp << ", CPU time " << compTimer.CpuTime() * 1000. << " ms";
312 numBytes += numBytesInComp;
313 numDigis += numDigisInComp;
314 numMs += numMsInComp;
315 } //# component
316
317 // --- Sorting of output digis. Is required by both digi trigger and event builder.
318 std::sort(digiTs.fData.fSts.fDigis.begin(), digiTs.fData.fSts.fDigis.end(),
319 [](CbmStsDigi digi1, CbmStsDigi digi2) { return digi1.GetTime() < digi2.GetTime(); });
320 std::sort(digiTs.fData.fMuch.fDigis.begin(), digiTs.fData.fMuch.fDigis.end(),
321 [](CbmMuchDigi digi1, CbmMuchDigi digi2) { return digi1.GetTime() < digi2.GetTime(); });
322
323 // --- Timeslice log
324 timer.Stop();
325 stringstream logOut;
326 logOut << setw(15) << left << "CbmDevUnpack::DoUnpackGetName(): [";
327 logOut << fixed << setw(8) << setprecision(1) << right << timer.RealTime() * 1000. << " ms] ";
328 logOut << "TS " << fNumTs << " (index " << tsIndex << ")";
329 logOut << ", components " << numCompUsed << " / " << numComp << ", microslices " << numMs;
330 logOut << ", input rate " << double(numBytes) / timer.RealTime() / 1.e6 << " MB/s";
331 logOut << ", digis " << numDigis;
332 LOG(debug) << logOut.str();
333
334 if (0 == fNumTs % 10000) LOG(info) << "Processed " << fNumTs << " time slices";
335
336 return digiTs;
337}
virtual void InitTask()
Read command line parameters for MQ device.
CbmDigiTimeslice DoUnpack(const fles::Timeslice &ts)
Unpack a single timeslice.
size_t fNumMessages
Statistics & first TS rejection.
cbm::algo::StsReadoutConfigLegacy fStsConfig
std::map< uint16_t, cbm::algo::UnpackMuch > fAlgoMuch
std::string fsChannelNameDataInput
message queues
cbm::algo::MuchReadoutConfig fMuchConfig
std::map< uint16_t, cbm::algo::UnpackSts > fAlgoSts
bool ConditionalRun()
Called by run loop, does init steps on first TS.
std::string fsChannelNameDataOutput
bool SendData(const CbmDigiTimeslice &timeslice, const TimesliceMetaData &TsMetaData)
Serialize unpacked digi timeslice and send to output channel.
bool InitAlgos()
Initialize runtime parameters for UnpackSts algos.
CbmStsDigiData fSts
STS data.
Definition CbmDigiData.h:36
CbmMuchDigiData fMuch
MUCH data.
Definition CbmDigiData.h:37
Collection of digis from all detector systems within one timeslice.
CbmDigiData fData
Timeslice data.
std::vector< CbmMuchDigi > fDigis
Data vector.
std::vector< CbmStsDigi > fDigis
Data vector.
Data class for a single-channel message in the STS.
Definition CbmStsDigi.h:40
void ChangeState(FairMQDevice *device, cbm::mq::Transition transition)
Definition CbmMQDefs.h:26
Hash for CbmL1LinkKey.