CbmRoot
Loading...
Searching...
No Matches
HitfinderChain.cxx
Go to the documentation of this file.
1/* Copyright (C) 2022 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Felix Weiglhofer [committer], Kilian Hunold */
4
5#include "HitfinderChain.h"
6
8#include "Exceptions.h"
9#include "PODVector.h"
10#include "compat/OpenMP.h"
11
12#include <numeric>
13
14using namespace cbm::algo;
15
17{
18 fPars.emplace(parameters);
20 auto& memoryPars = fPars->memory;
21
22 if (memoryPars.IsAuto()) {
23 if (xpu::device().active().backend() == xpu::cpu) {
24 memoryPars.allocationMode = RecoParams::AllocationMode::Dynamic;
25 }
26 else {
27 memoryPars.allocationMode = RecoParams::AllocationMode::Static;
28 }
29 }
30
31 if (!memoryPars.IsDynamic() && !memoryPars.IsStatic()) {
32 throw std::runtime_error(
33 fmt::format("STS Hitfinder Chain: Unknown allocation mode: {}", ToString(memoryPars.allocationMode)));
34 }
35
36 if (memoryPars.IsStatic()) {
37 AllocateDynamic(memoryPars.maxNDigisPerModule, memoryPars.maxNDigisPerTS);
38 }
39}
40
42{
43 // Explicitly free buffers in constant memory.
44 // This avoids an issue in xpu with teardown order of static variables when using CPU.
45 fHitfinder = {};
46 xpu::set<TheHitfinder>(fHitfinder);
47}
48
49sts::HitfinderChain::Result sts::HitfinderChain::operator()(gsl::span<const CbmStsDigi> digis, bool storeClusters)
50{
51 EnsureParameters();
52
53 Result result;
54
55 size_t nModules = fPars->setup.modules.size();
56 size_t nModuleSides = nModules * 2;
57 size_t nDigisTotal = digis.size();
58
59 // Getting the digis on the GPU requires 3 steps
60 // 1. Sort digis into buckets by module
61 DigiMap digiMap = CountDigisPerModules(digis);
62 // 2. Once we know number of digis per module, we can allocate
63 // the dynamic buffers on the gpu, as the buffer sizes depend on that value
64 if (fPars->memory.IsDynamic())
65 AllocateDynamic(digiMap.maxNDigisPerModule, nDigisTotal);
66 else {
67 if (digiMap.maxNDigisPerModule > fPars->memory.maxNDigisPerModule) {
68 throw ProcessingError("STS Hitfinder Chain: Too many digis per module for static allocation: {} > {}",
69 digiMap.maxNDigisPerModule, fPars->memory.maxNDigisPerModule);
70 }
71 if (nDigisTotal > fPars->memory.maxNDigisPerTS) {
72 throw ProcessingError("STS Hitfinder Chain: Too many digis per timeslice for static allocation: {} > {}",
73 nDigisTotal, fPars->memory.maxNDigisPerTS);
74 }
75 }
76 // 3. Copy digis into flat array with offsets per module
77 FlattenDigis(digis, digiMap);
78
79#ifdef CBM_ONLINE_USE_FAIRLOGGER
80 if (fair::Logger::GetConsoleSeverity() == fair::Severity::trace) {
81#else
82 if (Opts().LogLevel() == trace) {
83#endif // CBM_ONLINE_USE_FAIRLOGGER
84 EnsureDigiOffsets(digiMap);
85 }
86
87 xpu::queue queue;
88
89 // Set constants
90 auto& hfc = fHitfinder;
91 hfc.maxHitsPerModule = fPars->memory.NHitsUpperBound(digiMap.maxNDigisPerModule);
92 if (hfc.maxHitsPerModule > hfc.hitsAllocatedPerModule) {
93 L_(error) << fmt::format("STS Hitfinder Chain: Too many hits per module for static allocation: {} > {}",
94 hfc.maxHitsPerModule, hfc.hitsAllocatedPerModule);
95 hfc.maxHitsPerModule = hfc.hitsAllocatedPerModule;
96 }
97
98 // Clear buffers
99 // Not all buffers have to initialized, but useful for debugging
100 L_(debug) << "STS Hitfinder Chain: Clearing buffers...";
101 // xpu::memset(hfc.digisPerModule, 0);
102 // xpu::memset(hfc.digisPerModuleTmp, 0);
103 // xpu::memset(hfc.digisSortedPerModule, 0);
104 // xpu::memset(hfc.digiOffsetPerModule, 0);
105 queue.memset(hfc.monitor, 0);
106 queue.memset(hfc.digiConnectorsPerModule, 0);
107 queue.memset(hfc.channelOffsetPerModule, 0);
108 // queue.memset(hfc.clusterIdxPerModule, 0);
109 // xpu::memset(hfc.clusterIdxPerModuleTmp, 0);
110 // xpu::memset(hfc.clusterIdxSortedPerModule, 0);
111 // xpu::memset(hfc.clusterDataPerModule, 0);
112 queue.memset(hfc.nClustersPerModule, 0);
113 // xpu::memset(hfc.hitsPerModule, 0);
114 queue.memset(hfc.nHitsPerModule, 0);
115 queue.memset(hfc.maxClusterTimeErrorByModuleSide, 0);
116
117 L_(debug) << "STS Hitfinder Chain: Copy digis buffers...";
118 xpu::set<TheHitfinder>(fHitfinder);
119
120 // Only copy the actual number of digis, not the whole allocated buffer
121 // TODO add support in xpu, for buffer copies with offset + size
122 const CbmStsDigi* digisH = xpu::h_view(hfc.digisPerModule).data();
123 CbmStsDigi* digisD = hfc.digisPerModule.get();
124 if (digisH != digisD) queue.copy(digisH, digisD, nDigisTotal);
125 queue.copy(hfc.digiOffsetPerModule, xpu::h2d);
126
127 L_(debug) << "STS Hitfinder Chain: Sort Digis...";
128 // TODO: skip temporary buffers and sort directly into digisSortedPerModule
129
130 queue.launch<SortDigis>(xpu::n_blocks(nModuleSides));
131 xpu::k_add_bytes<SortDigis>(nDigisTotal * sizeof(CbmStsDigi));
132#ifdef CBM_ONLINE_USE_FAIRLOGGER
133 if (fair::Logger::GetConsoleSeverity() == fair::Severity::trace) {
134#else
135 if (Opts().LogLevel() == trace) {
136#endif // CBM_ONLINE_USE_FAIRLOGGER
137 L_(trace) << "Ensuring STS digis are sorted...";
138 queue.copy(hfc.digisPerModule, xpu::d2h);
139 queue.copy(hfc.digiOffsetPerModule, xpu::d2h);
140 queue.wait();
141 EnsureDigisSorted();
142 }
143
144 L_(debug) << "STS Hitfinder Chain: Find Clusters...";
145 if (!Params().sts.findClustersMultiKernels) {
146 queue.launch<FindClusters>(xpu::n_blocks(nModuleSides));
147 }
148 else {
149 queue.launch<ChannelOffsets>(xpu::n_blocks(nModuleSides));
150 xpu::k_add_bytes<ChannelOffsets>(nDigisTotal * sizeof(CbmStsDigi));
151 queue.launch<CreateDigiConnections>(xpu::n_threads(nDigisTotal));
152 xpu::k_add_bytes<CreateDigiConnections>(nDigisTotal * sizeof(CbmStsDigi));
153 queue.launch<CreateClusters>(xpu::n_threads(nDigisTotal));
154 xpu::k_add_bytes<CreateClusters>(nDigisTotal * sizeof(CbmStsDigi));
155 }
156#ifdef CBM_ONLINE_USE_FAIRLOGGER
157 if (fair::Logger::GetConsoleSeverity() == fair::Severity::trace) {
158#else
159 if (Opts().LogLevel() == trace) {
160#endif // CBM_ONLINE_USE_FAIRLOGGER
161 L_(trace) << "Ensuring STS channel offsets correct...";
162 xpu::buffer_prop propsOffset{hfc.channelOffsetPerModule};
163 std::vector<u32> channelOffsetPerModule;
164 channelOffsetPerModule.resize(propsOffset.size());
165 queue.copy(hfc.channelOffsetPerModule.get(), channelOffsetPerModule.data(), propsOffset.size());
166 queue.wait();
167 EnsureChannelOffsets(channelOffsetPerModule);
168
169 L_(trace) << "Ensuring STS clusters are ok...";
170 xpu::buffer_prop props{hfc.clusterIdxPerModule};
171
172 std::vector<ClusterIdx> clusterIdxPerModule;
173 clusterIdxPerModule.resize(props.size());
174 std::vector<PaddedToCacheLine<int>> nClustersPerModule;
175 nClustersPerModule.resize(fPars->setup.modules.size() * 2, 0);
176
177 queue.copy(hfc.clusterIdxPerModule.get(), clusterIdxPerModule.data(), props.size());
178 queue.copy(hfc.nClustersPerModule.get(), nClustersPerModule.data(), nClustersPerModule.size());
179 queue.wait();
180 EnsureClustersSane(clusterIdxPerModule, nClustersPerModule);
181 }
182
183 L_(debug) << "STS Hitfinder Chain: Sort Clusters...";
184 queue.launch<SortClusters>(xpu::n_blocks(nModuleSides));
185
186 L_(debug) << "STS Hitfinder Chain: Find Hits...";
187 queue.copy(hfc.nClustersPerModule, xpu::d2h);
188 queue.wait();
189 xpu::h_view nClusters(hfc.nClustersPerModule);
190 size_t nClustersTotal = std::accumulate(nClusters.begin(), nClusters.end(), 0);
191 xpu::k_add_bytes<SortClusters>(nClustersTotal * sizeof(ClusterIdx));
192
193 size_t nClustersFront = std::accumulate(nClusters.begin(), nClusters.begin() + nModules, 0);
194
195 bool isCpu = xpu::device::active().backend() == xpu::cpu;
196 // bool isCpu = false;
197 int threadsCPU = kFindHitsChunksPerModule * nModules;
198 xpu::grid findHitsG = xpu::n_threads(isCpu ? threadsCPU : nClustersFront);
199 queue.launch<FindHits>(findHitsG);
200 xpu::k_add_bytes<FindHits>(nClustersTotal * sizeof(sts::Cluster));
201
202 queue.copy(hfc.nHitsPerModule, xpu::d2h);
203 queue.wait();
204
205 auto hits = FlattenHits(queue);
206 queue.copy(hfc.monitor, xpu::d2h);
207
208 queue.wait();
209
210 // TODO: make this step optional / hide behind flag
211 {
212 xpu::scoped_timer hitStreams_{"Sort Hits"};
213
214 // TODO: Make number of streams configurable
215 hits = SplitHitsIntoStreams(hits, 100);
216 SortHitsWithinPartition(hits);
217#ifdef CBM_ONLINE_USE_FAIRLOGGER
218 if (fair::Logger::GetConsoleSeverity() == fair::Severity::trace) {
219#else
220 if (Opts().LogLevel() == trace) {
221#endif // CBM_ONLINE_USE_FAIRLOGGER
222 EnsureHitsSorted(hits);
223 }
224 }
225
226 xpu::h_view monitor(hfc.monitor);
227
228 // Note: Checking for cluster bucket overflow is probably paranoid
229 // as we allocate enough memory for one cluster per digi.
230 if (monitor[0].nClusterBucketOverflow > 0) {
231 L_(error) << "STS Hitfinder Chain: Cluster bucket overflow! " << monitor[0].nClusterBucketOverflow
232 << " clusters were discarded!";
233
234 for (size_t m = 0; m < nModules * 2; m++) {
235 if (static_cast<size_t>(nClusters[m]) > hfc.maxClustersPerModule) {
236 L_(error) << "STS Hitfinder Chain: Cluster bucket overflow in module " << m << " with " << *nClusters[m]
237 << " (of " << hfc.maxClustersPerModule << " max)"
238 << " clusters!";
239 nClusters[m] = hfc.maxClustersPerModule;
240 }
241 }
242 }
243
244 xpu::h_view nHits(hfc.nHitsPerModule);
245 if (monitor[0].nHitBucketOverflow > 0) {
246 L_(error) << "STS Hitfinder Chain: Hit bucket overflow! " << monitor[0].nHitBucketOverflow
247 << " hits were discarded!";
248
249 for (size_t m = 0; m < nModules; m++) {
250 const size_t nHitsModule = nHits[m];
251 const size_t nClustersModule = nClusters[m];
252 const float hitsPerCluster = nClustersModule > 0 ? nHitsModule / float(nClustersModule) : 0;
253 if (nHitsModule > hfc.maxHitsPerModule) {
254 L_(error) << "STS Hitfinder Chain: Hit bucket overflow in module " << m << " with " << nHitsModule << " (of "
255 << hfc.maxHitsPerModule << " max) hits! "
256 << "Module has " << nClustersModule << " clusters. " << hitsPerCluster << " hits per cluster.";
257 nHits[m] = hfc.maxHitsPerModule;
258 }
259 }
260 }
261
263 if (storeClusters) clusters = FlattenClusters(queue);
264
265 size_t nHitsTotal = std::accumulate(nHits.begin(), nHits.end(), 0);
266 L_(debug) << "Timeslice contains " << nHitsTotal << " STS hits and " << nClustersTotal << " STS clusters";
267
268 result.hits = std::move(hits);
269 result.clusters = std::move(clusters);
270 result.monitor.nClusterTotal = nClustersTotal;
271 result.monitor.nHitsTotal = result.hits.NElements();
272 result.monitor.SetDeviceMon(monitor[0]);
273 return result;
274}
275
277{
278 if (fPars == std::nullopt) throw std::runtime_error("sts::HitfinderChain: Parameters not set. Can't continue!");
279}
280
282{
283 EnsureParameters();
284
285 // Shorthands for common constants
286 const int nChannels = fPars->setup.nChannels / 2; // Only count channels on one side of the module
287 const int nModules = fPars->setup.modules.size();
288 const int nModuleSides = 2 * nModules; // Number of module front + backsides
289
290 xpu::queue q;
291
292 // Set GPU constants
293 fHitfinder.nModules = nModules;
294 fHitfinder.nChannels = nChannels;
295 fHitfinder.asic = fPars->setup.asic;
296
297 // Copy landau table
298 size_t nLandauTableEntries = fPars->setup.landauTable.values.size();
299 fHitfinder.landauTableSize = nLandauTableEntries;
300 fHitfinder.landauStepSize = fPars->setup.landauTable.stepSize;
301 fHitfinder.landauTable.reset(nLandauTableEntries, xpu::buf_io);
302 q.copy(fPars->setup.landauTable.values.data(), fHitfinder.landauTable.get(), nLandauTableEntries);
303
304 // Copy transformation matrix
305 fHitfinder.localToGlobalRotationByModule.reset(9 * nModules, xpu::buf_io);
306 fHitfinder.localToGlobalTranslationByModule.reset(3 * nModules, xpu::buf_io);
307
308 // - Copy matrix into flat array
309 xpu::h_view hRot(fHitfinder.localToGlobalRotationByModule);
310 xpu::h_view hTrans(fHitfinder.localToGlobalTranslationByModule);
311 for (int m = 0; m < nModules; m++) {
312 const auto& module = fPars->setup.modules.at(m);
313 std::copy_n(module.localToGlobal.rotation.data(), 9, &hRot[m * 9]);
314 std::copy_n(module.localToGlobal.translation.data(), 3, &hTrans[m * 3]);
315 }
316
317 // - Then copy to GPU
318 q.copy(fHitfinder.localToGlobalRotationByModule, xpu::h2d);
319 q.copy(fHitfinder.localToGlobalTranslationByModule, xpu::h2d);
320
321 // Copy Sensor Parameteres
322 fHitfinder.sensorPars.reset(nModules, xpu::buf_io);
323 xpu::h_view hSensorPars(fHitfinder.sensorPars);
324 for (int m = 0; m < nModules; m++) {
325 const auto& module = fPars->setup.modules.at(m);
326 auto& gpuPars = hSensorPars[m];
327 gpuPars.dY = module.dY;
328 gpuPars.pitch = module.pitch;
329 gpuPars.stereoF = module.stereoF;
330 gpuPars.stereoB = module.stereoB;
331 gpuPars.lorentzF = module.lorentzF;
332 gpuPars.lorentzB = module.lorentzB;
333 }
334 q.copy(fHitfinder.sensorPars, xpu::h2d);
335
336 // Time errors
337 fHitfinder.maxClusterTimeErrorByModuleSide.reset(nModuleSides, xpu::buf_device);
338
339 fHitfinder.monitor.reset(1, xpu::buf_io);
340
341 q.wait();
342}
343
344void sts::HitfinderChain::AllocateDynamic(size_t maxNDigisPerModule, size_t nDigisTotal)
345{
346 L_(debug) << "STS Hitfinder Chain: Allocating dynamic memory for " << maxNDigisPerModule << " digis per module and "
347 << nDigisTotal << " digis in total";
348 EnsureParameters();
349
350 xpu::scoped_timer t_("Allocate");
351
352 // TODO: some of these buffers have a constant size and can be allocated statically.
353 // Just the data they contain is static.
354
355 fPars->memory.maxNDigisPerModule = maxNDigisPerModule;
356 fPars->memory.maxNDigisPerTS = nDigisTotal;
357
358 // Shorthands for common constants
359 const int nChannels = fPars->setup.nChannels / 2; // Only count channels on one side of the module
360 const int nModules = fPars->setup.modules.size();
361 const int nModuleSides = 2 * nModules; // Number of module front + backsides
362
363 const size_t maxNClustersPerModule = fPars->memory.MaxNClustersPerModule();
364 const size_t maxNHitsPerModule = fPars->memory.MaxNHitsPerModule();
365
366 // TODO: this number should be much lower, estimate from max digis per TS
367 const size_t maxHitsTotal = maxNHitsPerModule * nModules;
368
369 // Allocate Digi Buffers
370 fHitfinder.digiOffsetPerModule.reset(nModuleSides + 1, xpu::buf_io);
371 fHitfinder.digisPerModule.reset(nDigisTotal, xpu::buf_io);
372
373 fHitfinder.digisPerModuleTmp.reset(nDigisTotal, xpu::buf_device);
374 fHitfinder.digiConnectorsPerModule.reset(nDigisTotal, xpu::buf_device);
375
376 fHitfinder.channelOffsetPerModule.reset(nModuleSides * nChannels, xpu::buf_device);
377
378 // Allocate Cluster Buffers
379 fHitfinder.maxClustersPerModule = maxNClustersPerModule;
380
381 fHitfinder.clusterIdxPerModule.reset(maxNClustersPerModule * nModuleSides, xpu::buf_device);
382 fHitfinder.clusterIdxPerModuleTmp.reset(maxNClustersPerModule * nModuleSides, xpu::buf_device);
383 fHitfinder.clusterIdxSortedPerModule.reset(nModuleSides, xpu::buf_device);
384
385 fHitfinder.clusterDataPerModule.reset(maxNClustersPerModule * nModuleSides, xpu::buf_io);
386 fHitfinder.nClustersPerModule.reset(nModuleSides, xpu::buf_io);
387
388 // Allocate Hit Buffers
389 fHitfinder.hitsAllocatedPerModule = maxNHitsPerModule;
390 fHitfinder.hitsPerModule.reset(maxNHitsPerModule * nModules, xpu::buf_device);
391 fHitfinder.nHitsPerModule.reset(nModules, xpu::buf_io);
392
393 fHitfinder.hitsFlatCapacity = maxHitsTotal;
394 fHitfinder.hitsFlat.reset(maxHitsTotal, xpu::buf_pinned);
395}
396
398{
399 L_(debug) << "STS Hitfinder Chain: Sorting " << digis.size() << " digis into modules";
400 xpu::scoped_timer t_("Count Digis By Module");
401 xpu::t_add_bytes(digis.size_bytes());
402
403 size_t nModules = fPars->setup.modules.size();
404 size_t nModuleSides = nModules * 2;
405 DigiMap digiMap;
406 digiMap.nDigisPerModule.resize(nModuleSides);
407
408 // Create map from module address to index
409 // This could be done just once in the beginning,
410 // but overhead is negligible compared to the digi copying
411 digiMap.addrToIndex.resize(1 << 17, InvalidModule);
412 for (size_t m = 0; m < nModules; m++) {
413 const auto& module = fPars->setup.modules[m];
414 i32 paddr = CbmStsAddress::PackDigiAddress(module.address);
415 digiMap.addrToIndex[paddr] = u16(m);
416 }
417
418 int nChannelsPerSide = fPars->setup.nChannels / 2;
419
420 // TODO: try to parallise
421 // Count digis per module in parallel
422 // Then calculate offset per module
423 // Write digis directly to flat array in parallel
424 // Use atomic add to increment offset per module
425 digiMap.nDigisPerThread.resize(openmp::GetMaxThreads());
426 for (auto& v : digiMap.nDigisPerThread)
427 v.resize(nModuleSides, 0);
428
430 {
431 int threadId = openmp::GetThreadNum(); // Move out of the loop, seems to create small overhead
432 auto& nDigis = digiMap.nDigisPerThread.at(threadId);
433
434 CBM_OMP(for schedule(static))
435 for (size_t i = 0; i < digis.size(); i++) {
436 const auto& digi = digis[i];
437 u16 moduleIndex = digiMap.ModuleIndex(digi);
438
439 if (moduleIndex == InvalidModule) continue;
440 // if (moduleIndex == InvalidModule || digi.GetAddress() == 0x10000002) continue;
441
442
443 bool isFront = digi.GetChannel() < nChannelsPerSide;
444 nDigis[moduleIndex + (isFront ? 0 : nModules)]++;
445 }
446 }
447
448
449 // Sum up digis per module
450 for (auto& v : digiMap.nDigisPerThread) {
451 for (size_t m = 0; m < nModuleSides; m++) {
452 digiMap.nDigisPerModule[m] += v[m];
453 }
454 }
455
456 // if (nPulsers > 0) L_(warning) << "STS Hitfinder: Discarded " << nPulsers << " pulser digis";
457
458 // Print digi counts per module
459 for (const auto& mod : fPars->setup.modules) {
460 // FIXME should use module index here
461 i32 moduleAddr = CbmStsAddress::PackDigiAddress(mod.address);
462 u16 moduleIndex = digiMap.addrToIndex[moduleAddr];
463 L_(debug) << "Module " << moduleAddr << " has " << digiMap.nDigisPerModule[moduleIndex] << " front digis and "
464 << digiMap.nDigisPerModule[moduleIndex + nModules] << " back digis";
465 }
466
467 digiMap.maxNDigisPerModule = 0;
468 for (const auto& nDigis : digiMap.nDigisPerModule) {
469 digiMap.maxNDigisPerModule = std::max(digiMap.maxNDigisPerModule, nDigis);
470 }
471
472 return digiMap;
473}
474
475void sts::HitfinderChain::FlattenDigis(gsl::span<const CbmStsDigi> digis, DigiMap& digiMap)
476{
477 L_(debug) << "STS Hitfinder Chain: Flattening digis";
478 xpu::scoped_timer t_("Flatten Digis");
479 xpu::t_add_bytes(digis.size_bytes());
480
481 const auto& modules = fPars->setup.modules;
482
483 size_t nModules = modules.size();
484 size_t nModuleSides = nModules * 2;
485 int nChannelsPerSide = fPars->setup.nChannels / 2;
486 xpu::h_view pDigisFlat(fHitfinder.digisPerModule); // Final input copied the GPU
487
488 xpu::h_view pMdigiOffset(fHitfinder.digiOffsetPerModule);
489 pMdigiOffset[0] = 0;
490 for (size_t m = 1; m < nModuleSides + 1; m++) {
491 pMdigiOffset[m] = pMdigiOffset[m - 1] + digiMap.nDigisPerModule.at(m - 1);
492 }
493
494 std::vector<std::vector<size_t>> digiOffsetsPerThread(openmp::GetMaxThreads());
495 for (auto& v : digiOffsetsPerThread)
496 v.resize(nModuleSides);
497 for (size_t i = 1; i < digiOffsetsPerThread.size(); i++) {
498 for (size_t m = 0; m < nModuleSides; m++) {
499 for (size_t j = 0; j < i; j++) {
500 digiOffsetsPerThread[i][m] += digiMap.nDigisPerThread[j][m];
501 }
502 }
503 }
504
506 {
507 int threadId = openmp::GetThreadNum(); // Move out of the loop, seems to create small overhead
508 auto& digiOffsets = digiOffsetsPerThread.at(threadId);
509
510 CBM_OMP(for schedule(static))
511 for (size_t i = 0; i < digis.size(); i++) {
512 const auto& digi = digis[i];
513
514 u16 moduleIndex = digiMap.ModuleIndex(digi);
515
516 if (moduleIndex == InvalidModule) continue;
517
518 bool isFront = digi.GetChannel() < nChannelsPerSide;
519 moduleIndex += isFront ? 0 : nModules;
520
521 size_t moduleOffset = pMdigiOffset[moduleIndex];
522 size_t digiOffset = digiOffsets[moduleIndex]++;
523
524 pDigisFlat[moduleOffset + digiOffset] = digi;
525 }
526 }
527
528 // Channels in Digis are counted 0 to 2047 by default.
529 // Channel 0 -> 1023 is the front side of the module
530 // Channel 1024 -> 2047 is the back side of the module
531 // Cluster finder only operates on one side, so we have to shift the channel numbers
532 CBM_PARALLEL_FOR(schedule(static))
533 for (size_t i = pMdigiOffset[nModules]; i < pMdigiOffset[nModuleSides]; i++) {
534 auto& digi = pDigisFlat[i];
535 digi.SetChannel(digi.GetChannel() - nChannelsPerSide);
536 }
537}
538
540{
541 auto& hfc = fHitfinder;
542 xpu::h_view nHits(hfc.nHitsPerModule);
543
544 size_t nHitsTotal = 0;
545 for (int m = 0; m < hfc.nModules; m++)
546 nHitsTotal += GetNHits(nHits, m);
547 L_(debug) << "STS Hitfinder Chain: Flattening " << nHitsTotal << " hits";
548
549 if (nHitsTotal > hfc.hitsFlatCapacity)
550 throw ProcessingError("STS Hitfinder Chain: Not enough memory for hits: {} > {}", nHitsTotal, hfc.hitsFlatCapacity);
551
552 // hitBuffer is potentially bigger than nHitsTotal
553 // So construct a span to the memory region we actually use
554 xpu::h_view hitBuffer(hfc.hitsFlat);
555 gsl::span hits(hitBuffer.data(), nHitsTotal);
556
557 // Transfer Hits / Cluster back to host
558 // Hits and Clusters are stored in buckets. Buckets aren't necessarily full.
559 // So we have to copy each bucket individually.
560 // Alternative: flatten both arrays on the GPU and transfer flat array back.
561 // Latter would require a lot of memory but give us parallel copying when running on the host.
562 //
563 // Also at this point, values in nHits could be wrong if the bucket overflowed.
564 // So take extra care to not access nHits directly.
565 if (xpu::device::active().backend() == xpu::cpu) {
566 // Hits already in host memory
567 // Flatten in parallel
568 // Note: this is still pretty slow for mCBM because there are only 9 modules...
569 xpu::scoped_timer t_("Flatten Hits");
570 xpu::t_add_bytes(nHitsTotal * sizeof(sts::Hit));
571 CBM_PARALLEL_FOR(schedule(dynamic))
572 for (int m = 0; m < hfc.nModules; m++) {
573 size_t offset = 0;
574 for (int i = 0; i < m; i++) {
575 offset += GetNHits(nHits, i);
576 }
577 std::copy_n(hfc.hitsPerModule.get() + hfc.hitsAllocatedPerModule * m, GetNHits(nHits, m), hits.begin() + offset);
578 }
579 }
580 else { // GPU
581 // Initialize host memory:
582 // memset(hits.data(), 0, nHitsTotal * sizeof(sts::Hit));
583
584 // Hits not in host memory
585 // FIXME this is very slow (1.7 GBs vs 16 GBs we should get from PCIe 3.0 x16)
586 // I assume because of page faults as the allocated memory is not touched before
587 // But even with a memset on host memory before, throughput only goes up to 3.7 GBs
588 size_t nHitsCopied = 0;
589 for (int m = 0; m < hfc.nModules; m++) {
590 size_t numHitsInModule = GetNHits(nHits, m);
591 queue.copy(hfc.hitsPerModule.get() + hfc.hitsAllocatedPerModule * m, hits.data() + nHitsCopied, numHitsInModule);
592 nHitsCopied += numHitsInModule;
593 }
594 }
595
596
597 // Could do this more efficiently, cache addresses once in the beginning, ...
598 // doesn't really matter overhead wise
599 fHitOffsets.clear();
600 // nModules + 1 entries, first entry is always 0, last entry is total number of hits (required by PartitionedSpan)
601 fHitOffsets.push_back(0);
602 for (int m = 0; m < hfc.nModules; m++)
603 fHitOffsets.push_back(fHitOffsets.back() + GetNHits(nHits, m));
604
605 fAddresses = {};
606 for (auto& m : fPars->setup.modules)
607 fAddresses.push_back(m.address);
608
609 PartitionedSpan partitionedHits(hits, fHitOffsets, fAddresses);
610 return partitionedHits;
611}
612
614{
615 auto& hfc = fHitfinder;
616
617 xpu::h_view nClusters(hfc.nClustersPerModule);
618 int maxClustersPerModule = hfc.maxClustersPerModule;
619 sts::Cluster* clusters = hfc.clusterDataPerModule.get();
620
621 int nModuleSides = nClusters.size();
622
623 L_(debug) << "Copy clusters from " << nModuleSides << " modules sides";
624
625 std::vector<u32> addresses;
626 for (size_t i = 0; i < 2; i++) { // Repeat twice for front and back
627 for (auto& m : fPars->setup.modules)
628 addresses.push_back(m.address);
629 }
630
631 size_t nClustersTotal = std::accumulate(nClusters.begin(), nClusters.end(), 0);
632
633 // TODO: copy could be made much more efficient by using pinned memory
634 // But clusters are stored for debugging only, so no effort is made to optimize this
635 std::vector<sts::Cluster> clustersFlat;
636 clustersFlat.resize(nClustersTotal);
637 size_t offset = 0;
638 for (int m = 0; m < nModuleSides; m++) {
639 size_t nClustersInModule = nClusters[m];
640 queue.copy(clusters + maxClustersPerModule * m, clustersFlat.data() + offset, nClustersInModule);
641 offset += nClustersInModule;
642 }
643 queue.wait();
644
645 std::vector<size_t> sizes(nClusters.begin(), nClusters.end());
646 PartitionedVector<sts::Cluster> out(std::move(clustersFlat), gsl::make_span(sizes), gsl::make_span(addresses));
647 return out;
648}
649
650size_t sts::HitfinderChain::GetNHits(xpu::h_view<PaddedToCacheLine<int>> nHitsPerModule, int module)
651{
652 return std::min<size_t>(nHitsPerModule[module], fHitfinder.maxHitsPerModule);
653}
654
656{
657 constexpr size_t MinHitsPerStream = 10;
658
659 int nSensors = hits.NPartitions();
660 int streamsPerSensor = std::max(1, nstreamsMax / nSensors);
661
662 // TODO: move into helper function in base/ ?
663 auto nextMultipleOf = [](int x, int mult) {
664 if (mult == 0) {
665 return x;
666 }
667
668 int rem = x % mult;
669 if (rem == 0) {
670 return x;
671 }
672
673 return x + mult - rem;
674 };
675
676 fHitStreamOffsets.clear();
677 fStreamAddresses.clear();
678
679 fHitStreamOffsets.push_back(0);
680 for (size_t sensor = 0; sensor < hits.NPartitions(); sensor++) {
681 auto [hitsSensor, addr] = hits.Partition(sensor);
682 const size_t nHits = hitsSensor.size();
683
684 // Sensor has too few hits -> create only one stream for entire sensor
685 if (nHits < streamsPerSensor * MinHitsPerStream) {
686 fHitStreamOffsets.push_back(fHitStreamOffsets.back() + nHits);
687 fStreamAddresses.push_back(addr);
688 continue;
689 }
690
691 size_t rem = nHits % streamsPerSensor;
692 if (rem == 0) {
693 size_t streamSize = nHits / streamsPerSensor;
694 for (int s = 0; s < streamsPerSensor; s++) {
695 fHitStreamOffsets.push_back(fHitStreamOffsets.back() + streamSize);
696 fStreamAddresses.push_back(addr);
697 }
698 }
699 else {
700 size_t streamSize = nextMultipleOf(nHits, streamsPerSensor) / streamsPerSensor;
701 size_t nHitsLeft = nHits;
702 for (int s = 0; s < streamsPerSensor; s++) {
703 fHitStreamOffsets.push_back(fHitStreamOffsets.back() + std::min(nHitsLeft, streamSize));
704 fStreamAddresses.push_back(addr);
705 nHitsLeft -= streamSize;
706 }
707 }
708 }
709
710 PartitionedSpan<sts::Hit> hitStreams(hits.Data(), fHitStreamOffsets, fStreamAddresses);
711 return hitStreams;
712}
713
715{
716 CBM_PARALLEL_FOR(schedule(dynamic))
717 for (size_t i = 0; i < hits.NPartitions(); i++) {
718 auto stream = hits[i];
719 std::sort(stream.begin(), stream.end(), [](const sts::Hit& a, const sts::Hit& b) { return a.fTime < b.fTime; });
720 }
721}
722
724{
725 xpu::h_view digiOffset(fHitfinder.digiOffsetPerModule);
726
727 size_t nModules = fPars->setup.modules.size();
728 size_t offset = 0;
729 // Front
730 for (size_t m = 0; m < nModules * 2; m++) {
731 if (digiOffset[m] != offset) {
732 throw FatalError("Module {}: Digi offset mismatch: {} != {}", m, digiOffset[m], offset);
733 }
734 size_t nDigis = digi.nDigisPerModule[m];
735 offset += nDigis;
736 }
737
738 if (offset != digiOffset[2 * nModules]) {
739 throw FatalError("Digi offset mismatch: {} != {}", digiOffset[2 * nModules], offset);
740 }
741}
742
744{
745 xpu::h_view digiOffset(fHitfinder.digiOffsetPerModule);
746 xpu::h_view digis(fHitfinder.digisPerModule);
747
748 bool isSorted = true;
749
750 for (size_t m = 0; m < fPars->setup.modules.size(); m++) {
751 int nDigis = digiOffset[m + 1] - digiOffset[m];
752 if (nDigis == 0) continue;
753
754 auto* digisModule = &digis[digiOffset[m]];
755
756 for (int i = 0; i < nDigis - 1; i++) {
757 const auto &digi1 = digisModule[i], &digi2 = digisModule[i + 1];
758
759 if ((digi1.GetChannel() < digi2.GetChannel())
760 || (digi1.GetChannel() == digi2.GetChannel()
761 && digi1.GetTime() <= digi2.GetTime()) // Unpacker sometimes produces multiple
762 // digis with the same time and channel, FIXME!
763 ) {
764 continue;
765 }
766
767 isSorted = false;
768 L_(error) << "Module " << m << " not sorted: " << digi1.ToString() << " " << digi2.ToString();
769 break;
770 }
771 }
772
773 if (!isSorted) {
774 throw FatalError("Digis are not sorted");
775 }
776}
777
778void sts::HitfinderChain::EnsureChannelOffsets(gsl::span<u32> channelOffsetsByModule)
779{
780 xpu::h_view digisPerModule(fHitfinder.digisPerModule);
781 xpu::h_view digiOffsetPerModule(fHitfinder.digiOffsetPerModule);
782 for (size_t m = 0; m < fPars->setup.modules.size() * 2; m++) {
783 int nChannels = fPars->setup.nChannels / 2; // Consider module sides
784
785 std::vector<u32> expectedChannelOffsets(nChannels, 0);
786
787 int offset = digiOffsetPerModule[m];
788 int nDigis = digiOffsetPerModule[m + 1] - offset;
789 gsl::span<CbmStsDigi> digis(&digisPerModule[offset], nDigis);
790 gsl::span<u32> channelOffsets = channelOffsetsByModule.subspan(m * nChannels, nChannels);
791
792 if (nDigis == 0) continue;
793
794 if (channelOffsets[0] != 0) {
795 throw FatalError("Module {}: First channel offset is not 0", m);
796 }
797
798 int chan = digis[0].GetChannel();
799 for (int i = 0; i < nDigis; i++) {
800 if (digis[i].GetChannel() != chan) {
801 while (chan < digis[i].GetChannel()) {
802 chan++;
803 expectedChannelOffsets[chan] = i;
804 }
805 }
806 }
807 while (chan < nChannels) {
808 chan++;
809 expectedChannelOffsets[chan] = nDigis;
810 }
811
812 for (int i = 0; i < nChannels; i++) {
813 if (channelOffsets[i] != expectedChannelOffsets[i]) {
814 throw FatalError("Module {}: Channel offset for channel {} is {} but should be {}", m, i, channelOffsets[i],
815 expectedChannelOffsets[i]);
816 }
817 }
818 }
819}
820
821void sts::HitfinderChain::EnsureClustersSane(gsl::span<ClusterIdx> hClusterIdx,
822 gsl::span<PaddedToCacheLine<int>> hNClusters)
823{
824 for (size_t m = 0; m < 2 * fPars->setup.modules.size(); m++) {
825 int nClusters = hNClusters[m];
826
827 L_(trace) << "Module " << m << " has " << nClusters << " clusters of " << fHitfinder.maxClustersPerModule;
828
829 if (nClusters == 0) continue;
830
831 if (nClusters < 0) {
832 throw FatalError("Module {} has negative number of clusters {}", m, nClusters);
833 }
834 if (size_t(nClusters) > fHitfinder.maxClustersPerModule) {
835 throw FatalError("Module {} has {} clusters, but only {} are allowed", m, nClusters,
836 fHitfinder.maxClustersPerModule);
837 }
838
839 auto* clusterIdx = &hClusterIdx[m * fHitfinder.maxClustersPerModule];
840
841 for (int i = 0; i < nClusters; i++) {
842 auto& cidx = clusterIdx[i];
843
844 if (int(cidx.fIdx) < 0 || size_t(cidx.fIdx) >= fHitfinder.maxClustersPerModule) {
845 throw FatalError("Cluster {} of module {} has invalid index {}", i, m, cidx.fIdx);
846 }
847
848 if (cidx.fTime == 0xFFFFFFFF) {
849 throw FatalError("Cluster {} of module has invalid time {}", i, m, cidx.fTime);
850 }
851 }
852 }
853
854 L_(trace) << "Clusters ok";
855}
856
858{
859 L_(trace) << "Ensure STS Hits sorted";
860
861 for (size_t i = 0; i < hits.NPartitions(); i++) {
862 auto [part, addr] = hits.Partition(i);
863 bool ok = std::is_sorted(part.begin(), part.end(), [](const Hit& a, const Hit& b) { return a.fTime < b.fTime; });
864 L_(trace) << "Stream " << i << ": addr=" << addr << ", nhits=" << part.size();
865 if (!ok) {
866 throw FatalError{"Hits not sorted in stream %zd (addr %u)", i, addr};
867 }
868 }
869
870 L_(trace) << "STS Hits sorted!";
871}
#define L_(level)
static vector< vector< QAHit > > hits
fscal v[fmask::Size]
Definition KfSimdPseudo.h:4
#define CBM_PARALLEL_FOR(...)
Definition OpenMP.h:25
#define CBM_OMP(...)
Definition OpenMP.h:39
#define CBM_PARALLEL(...)
Definition OpenMP.h:32
Data class for a single-channel message in the STS.
Definition CbmStsDigi.h:40
A class that represents a value with padding to a certain size.
Definition PaddedValue.h:27
A vector that is partitioned into multiple subvectors.
void EnsureHitsSorted(PartitionedSpan< sts::Hit >)
PartitionedSpan< sts::Hit > SplitHitsIntoStreams(PartitionedSpan< sts::Hit > hits, int nstreamsMax)
PartitionedVector< sts::Cluster > FlattenClusters(xpu::queue)
void SortHitsWithinPartition(PartitionedSpan< sts::Hit > hits)
void EnsureClustersSane(gsl::span< ClusterIdx >, gsl::span< PaddedToCacheLine< int > >)
void EnsureChannelOffsets(gsl::span< u32 >)
size_t GetNHits(xpu::h_view< PaddedToCacheLine< int > > nHitsPerModule, int module)
Returns the number of hits of a module.
PartitionedSpan< sts::Hit > FlattenHits(xpu::queue)
std::optional< sts::HitfinderChainPars > fPars
void FlattenDigis(gsl::span< const CbmStsDigi > digis, DigiMap &digiMap)
DigiMap CountDigisPerModules(gsl::span< const CbmStsDigi > digis)
Result operator()(gsl::span< const CbmStsDigi >, bool storeClusters=false)
void SetParameters(const HitfinderChainPars &parameters)
void AllocateDynamic(size_t, size_t)
XPU_D int32_t PackDigiAddress(int32_t address)
Strip address to contain only unit, (half)ladder and module.
int GetMaxThreads()
Definition OpenMP.h:46
int GetThreadNum()
Definition OpenMP.h:47
std::int32_t i32
Definition Definitions.h:20
std::string_view ToString(T t)
Definition EnumDict.h:64
@ kFindHitsChunksPerModule
Definition Hitfinder.h:40
std::uint16_t u16
Definition Definitions.h:19
Indicates an unrecoverable error. Should tear down the process.
Definition Exceptions.h:34
u16 ModuleIndex(const CbmStsDigi &digi) const
std::vector< std::vector< size_t > > nDigisPerThread
PartitionedVector< sts::Cluster > clusters
PartitionedSpan< sts::Hit > hits
void SetDeviceMon(const HitfinderMonDevice &devMon)