CbmRoot
Loading...
Searching...
No Matches
reco/app/cbmreco/main.cxx
Go to the documentation of this file.
1/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Felix Weiglhofer [committer] */
4#include "CbmDigiEvent.h"
7#include "algo/base/Options.h"
8#include "algo/base/System.h"
13#include "algo/global/Reco.h"
16
17#include <StorableTimeslice.hpp>
18#include <TimesliceAutoSource.hpp>
19
20#include <log.hpp>
21#include <sstream>
22
23#include <xpu/host.h>
24
25using namespace cbm::algo;
26
27namespace chron = std::chrono;
28
29std::shared_ptr<StorableRecoResults> makeStorableRecoResults(const fles::Timeslice& ts, const RecoResults& results)
30{
31 auto storable = std::make_shared<StorableRecoResults>(ts.index(), ts.start_time());
32
33 storable->DigiEvents().reserve(results.events.size());
34 for (const auto& digiEvent : results.events) {
35 storable->DigiEvents().emplace_back(digiEvent.ToStorable());
36 }
37
38 // TODO: some of these copies can be avoided / made into moves
39 storable->BmonDigis() = ToStdVector(results.bmonDigis);
40 storable->StsDigis() = ToStdVector(results.stsDigis);
41 storable->MuchDigis() = ToStdVector(results.muchDigis);
42 storable->MvdDigis() = ToStdVector(results.mvdDigis);
43 storable->Trd2dDigis() = ToStdVector(results.trd2dDigis);
44 storable->TrdDigis() = ToStdVector(results.trdDigis);
45 storable->TofDigis() = ToStdVector(results.tofDigis);
46 storable->RichDigis() = ToStdVector(results.richDigis);
47 storable->FsdDigis() = ToStdVector(results.fsdDigis);
48
49 storable->StsClusters() = results.stsClusters;
50 storable->BmonHits() = results.bmonHits;
51 storable->StsHits() = results.stsHits;
52 storable->TofHits() = results.tofHits;
53 storable->TrdHits() = results.trdHits;
54
55 storable->Tracks() = results.tracks;
56 storable->TrackStsHitIndices() = results.trackStsHitIndices;
57 storable->TrackTofHitIndices() = results.trackTofHitIndices;
58 storable->TrackTrdHitIndices() = results.trackTrdHitIndices;
59
60 return storable;
61}
62
63bool dumpArchive(const Options& opts)
64{
65 // Limit the number of events per timeslice to dump to avoid spamming the log
66 constexpr size_t DumpEventsPerTS = 10;
67 constexpr size_t DumpHitsPerSensor = 2;
68 constexpr size_t DumpTracksPerTS = 10;
69
70 if (!opts.DumpArchive()) return false;
71
72 L_(info) << "Dumping archive: " << opts.InputLocator();
73
75
76 auto desc = archive.descriptor();
77 L_(info) << "Archive descriptor: ";
78 L_(info) << " - time_created: " << desc.time_created();
79 L_(info) << " - hostname: " << desc.hostname();
80 L_(info) << " - username: " << desc.username();
81
82 for (auto recoResults = archive.get(); !archive.eos(); recoResults = archive.get()) {
83 if (recoResults == nullptr) {
84 L_(error) << "Failed to read RecoResults from archive";
85 break;
86 }
87
88 size_t nEvents = recoResults->DigiEvents().size();
89 L_(info) << "TS " << recoResults->TsIndex() << " start: " << recoResults->TsStartTime() << " events: " << nEvents
90 << ", stsHits: " << recoResults->StsHits().NElements()
91 << ", tofHits: " << recoResults->TofHits().NElements() << ", tracks: " << recoResults->Tracks().size();
92
93 for (size_t i = 0; i < std::min(DumpEventsPerTS, nEvents); i++) {
94 const auto& digiEvent = recoResults->DigiEvents().at(i);
95 L_(info) << " - Event " << i << " number: " << digiEvent.fNumber << "; time: " << digiEvent.fTime
96 << "; nStsDigis: " << digiEvent.fData.Size(ECbmModuleId::kSts)
97 << "; nTofDigis: " << digiEvent.fData.Size(ECbmModuleId::kTof);
98 }
99
100 if (nEvents > DumpEventsPerTS) L_(info) << "...";
101
102 auto& bmonHits = recoResults->BmonHits();
103 for (size_t m = 0; m < bmonHits.NPartitions(); m++) {
104 auto [hits, address] = bmonHits.Partition(m);
105 for (size_t i = 0; i < std::min(DumpHitsPerSensor, hits.size()); i++) {
106 const auto& hit = hits[i];
107 L_(info) << " - BMON Hit " << i << " sensor: " << address << "; time: " << hit.GetTime();
108 }
109 }
110
111 L_(info) << "...";
112
113 auto& stsHits = recoResults->StsHits();
114 for (size_t m = 0; m < stsHits.NPartitions(); m++) {
115 auto [hits, address] = stsHits.Partition(m);
116 for (size_t i = 0; i < std::min(DumpHitsPerSensor, hits.size()); i++) {
117 const auto& hit = hits[i];
118 L_(info) << " - STS Hit " << i << " sensor: " << address << "; time: " << hit.fTime << ", X: " << hit.fX
119 << ", Y: " << hit.fY << ", Z: " << hit.fZ;
120 }
121 }
122
123 L_(info) << "...";
124
125 auto tofHits = recoResults->TofHits();
126 for (size_t m = 0; m < tofHits.NPartitions(); m++) {
127 auto [hits, address] = tofHits.Partition(m);
128 for (size_t i = 0; i < std::min(DumpHitsPerSensor, hits.size()); i++) {
129 const auto& hit = hits[i];
130 L_(info) << " - TOF Hit " << i << " sensor: " << address << "; time: " << hit.Time() << ", X: " << hit.X()
131 << ", Y: " << hit.Y() << ", Z: " << hit.Z();
132 }
133 }
134
135 L_(info) << "...";
136
137 auto trdHits = recoResults->TrdHits();
138 for (size_t m = 0; m < trdHits.NPartitions(); m++) {
139 auto [hits, address] = trdHits.Partition(m);
140 for (size_t i = 0; i < std::min(DumpHitsPerSensor, hits.size()); i++) {
141 const auto& hit = hits[i];
142 L_(info) << " - TRD Hit " << i << " sensor: " << address << "; time: " << hit.Time() << ", X: " << hit.X()
143 << ", Y: " << hit.Y() << ", Z: " << hit.Z();
144 }
145 }
146
147 L_(info) << "...";
148
149
150 auto& tracks = recoResults->Tracks();
151 for (size_t t = 0; t < std::min(tracks.size(), DumpTracksPerTS); t++) {
152 const auto& track = tracks[t];
153 L_(info) << " - Track " << t << " nHits: " << track.fNofHits << ", chi2: " << track.fParPV.ChiSq()
154 << ", X: " << track.fParPV.X() << ", Y: " << track.fParPV.Y() << ", Z: " << track.fParPV.Z();
155 }
156 }
157
158 return true;
159}
160
161int main(int argc, char** argv)
162{
163 Options opts(argc, argv);
164
165 logging::add_console(opts.LogLevel());
166
167 if (!opts.LogFile().empty()) logging::add_file(opts.LogFile().string(), opts.LogLevel());
168
169 // XPU
170 xpu::settings settings;
171 settings.profile = opts.CollectKernelTimes();
172 settings.device = opts.Device();
173 if (opts.LogLevel() == trace) {
174 settings.verbose = true;
175 settings.logging_sink = [](std::string_view msg) { L_(trace) << msg; };
176 }
177 xpu::initialize(settings);
178 xpu::preload<GPUReco>();
179
180 auto ompThreads = opts.NumOMPThreads();
181 if (ompThreads) {
182 L_(debug) << *ompThreads << " OpenMP threads requested";
183 openmp::SetNumThreads(*ompThreads);
184 }
186
187 L_(info) << "CBMRECO buildType=" << BuildInfo::BUILD_TYPE << " gpuDebug=" << BuildInfo::GPU_DEBUG
188 << " parallelSTL=" << BuildInfo::WITH_PARALLEL_STL << " OMP=" << BuildInfo::WITH_OMP
189 << " ZSTD=" << BuildInfo::WITH_ZSTD << " commit=" << BuildInfo::GIT_HASH;
190 std::stringstream ss;
191 for (int i = 0; i < argc; i++) {
192 ss << argv[i] << " ";
193 }
194 L_(info) << ss.str();
195
196 if (dumpArchive(opts)) return 0;
197
198 Reco reco;
199 MemoryLogger memoryLogger;
200
201 auto startProcessing = chron::high_resolution_clock::now();
202 reco.Init(opts);
203
204 fles::TimesliceAutoSource source(opts.InputLocator());
205
206 ProcessingExtraMonitor extraMonitor;
207
208 std::optional<RecoResultsOutputArchiveSequence> archive;
209 if (!opts.OutputFile().empty()) {
210 L_(info) << "Writing results to file: " << opts.OutputFile();
211 fles::ArchiveCompression compression = fles::ArchiveCompression::None;
212 if (opts.CompressArchive()) {
213 compression = fles::ArchiveCompression::Zstd;
214 }
215 archive.emplace(opts.OutputFile().string(), opts.OutputMaxItems(), opts.OutputMaxSize(), compression);
216 }
217
218 int tsIdx = 0;
219 int num_ts = opts.NumTimeslices();
220 if (num_ts > 0) num_ts += opts.SkipTimeslices();
221 L_(debug) << "Starting to fetch timeslices from source...";
222
223 auto startFetchTS = chron::high_resolution_clock::now();
224 while (auto timeslice = source.get()) {
225 if (tsIdx < opts.SkipTimeslices()) {
226 tsIdx++;
227 continue;
228 }
229
230 std::unique_ptr<fles::Timeslice> ts;
231 if (opts.ReleaseMode()) {
232 ts = std::make_unique<fles::StorableTimeslice>(*timeslice);
233 timeslice.reset();
234 }
235 else {
236 ts = std::move(timeslice);
237 }
238
239 auto endFetchTS = chron::high_resolution_clock::now();
240 auto durationFetchTS = endFetchTS - startFetchTS;
241 extraMonitor.timeIdle = chron::duration_cast<chron::duration<double, std::milli>>(durationFetchTS).count();
242
243 try {
244 RecoResults result = reco.Run(*ts);
245 if (archive) {
246 xpu::scoped_timer t_{"Write Archive", &extraMonitor.timeWriteArchive};
247 auto storable = makeStorableRecoResults(*ts, result);
248 extraMonitor.bytesWritten = storable->SizeBytes();
249 xpu::t_add_bytes(extraMonitor.bytesWritten);
250 archive->put(storable);
251 }
252 }
253 catch (const ProcessingError& e) {
254 // TODO: Add flag if we want to abort on exception or continue with next timeslice
255 L_(error) << "Caught ProcessingError while processing timeslice " << tsIdx << ": " << e.what();
256 }
257 reco.QueueProcessingExtraMetrics(extraMonitor);
258
259 // Release memory after each timeslice and log memory usage
260 // This is useful to detect memory leaks as the memory usage should be constant between timeslices
261 ts.reset();
262 memoryLogger.Log();
263
264 tsIdx++;
265
266 if (num_ts > 0 && tsIdx >= num_ts) break;
267
268 startFetchTS = chron::high_resolution_clock::now();
269 }
270
271 if (archive) archive->end_stream();
272
273 reco.Finalize();
274 auto endProcessing = chron::high_resolution_clock::now();
275 auto duration = chron::duration_cast<chron::milliseconds>(endProcessing - startProcessing);
276 L_(info) << "Total Processing time (Wall): " << duration.count() << " ms";
277
278 return 0;
279}
#define L_(level)
TClonesArray * tracks
@ kTof
Time-of-flight Detector.
Definition CbmDefs.h:52
@ kSts
Silicon Tracking System.
Definition CbmDefs.h:48
static vector< vector< QAHit > > hits
Memory logging.
System functions.
Track the memory usage of the process and write it to the log.
void Log()
Log the current memory usage.
const std::string & Device() const
bool CompressArchive() const
std::optional< int > NumOMPThreads() const
bool CollectKernelTimes() const
fs::path LogFile() const
fs::path OutputFile() const
const std::string & InputLocator() const
std::size_t OutputMaxItems() const
std::size_t OutputMaxSize() const
severity_level LogLevel() const
const std::string GIT_HASH
constexpr bool WITH_PARALLEL_STL
Definition BuildInfo.h:48
constexpr bool WITH_OMP
Definition BuildInfo.h:55
constexpr bool WITH_ZSTD
Definition BuildInfo.h:62
const std::string BUILD_TYPE
int GetMaxThreads()
Definition OpenMP.h:46
void SetNumThreads(int)
Definition OpenMP.h:49
fles::InputArchive< StorableRecoResults, StorableRecoResults, fles::ArchiveType::RecoResultsArchive > RecoResultsInputArchive
task_thread_pool::task_thread_pool & GetGlobalSTLThreadPool()
Get the global thread pool for parallel stl algorithms.
Definition Algorithm.cxx:8
std::vector< T > ToStdVector(const PODVector< T > &vec)
Definition PODVector.h:20
bool dumpArchive(const Options &opts)
int main(int argc, char **argv)
std::shared_ptr< StorableRecoResults > makeStorableRecoResults(const fles::Timeslice &ts, const RecoResults &results)
Monitor for additional processing steps.
Definition Reco.h:132
ca::Vector< std::vector< HitId_t > > trackTofHitIndices
Definition RecoResults.h:54
PartitionedVector< trd::Hit > trdHits
Definition RecoResults.h:49
PODVector< CbmMuchDigi > muchDigis
Definition RecoResults.h:35
PartitionedVector< sts::Cluster > stsClusters
Definition RecoResults.h:45
ca::Vector< ca::Track > tracks
Definition RecoResults.h:52
PartitionedVector< tof::Hit > tofHits
Definition RecoResults.h:48
ca::Vector< std::vector< HitId_t > > trackTrdHitIndices
Definition RecoResults.h:55
std::vector< DigiEvent > events
Definition RecoResults.h:43
PODVector< CbmTrdDigi > trd2dDigis
Definition RecoResults.h:37
PartitionedVector< bmon::Hit > bmonHits
Definition RecoResults.h:50
PODVector< CbmFsdDigi > fsdDigis
Definition RecoResults.h:41
PODVector< CbmMvdRawDigi > mvdDigis
Definition RecoResults.h:36
PartitionedSpan< sts::Hit > stsHits
Definition RecoResults.h:47
PODVector< CbmTofDigi > tofDigis
Definition RecoResults.h:39
ca::Vector< std::vector< HitId_t > > trackStsHitIndices
Definition RecoResults.h:53
PODVector< CbmRichDigi > richDigis
Definition RecoResults.h:40
PODVector< CbmStsDigi > stsDigis
Definition RecoResults.h:34
PODVector< CbmTrdDigi > trdDigis
Definition RecoResults.h:38
PODVector< CbmBmonDigi > bmonDigis
Definition RecoResults.h:33