CbmRoot
Loading...
Searching...
No Matches
reco/app/cbmreco/main.cxx
Go to the documentation of this file.
1/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Felix Weiglhofer [committer] */
4#include "BuildInfo.h"
5#include "CbmDigiEvent.h"
6#include "Exceptions.h"
7#include "Options.h"
8#include "Reco.h"
11#include "System.h"
12#include "compat/Algorithm.h"
13#include "compat/OpenMP.h"
14#include "gpu/DeviceImage.h"
15#include "util/MemoryLogger.h"
16
17#include <StorableTimeslice.hpp>
18#include <TimesliceAutoSource.hpp>
19
20#include <log.hpp>
21#include <sstream>
22
23#include <xpu/host.h>
24
25using namespace cbm::algo;
26
27namespace chron = std::chrono;
28
29std::shared_ptr<StorableRecoResults> makeStorableRecoResults(const fles::Timeslice& ts, const RecoResults& results)
30{
31 auto storable = std::make_shared<StorableRecoResults>(ts.index(), ts.start_time());
32
33 storable->DigiEvents().reserve(results.events.size());
34 for (const auto& digiEvent : results.events) {
35 storable->DigiEvents().emplace_back(digiEvent.ToStorable());
36 }
37
38 // TODO: some of these copies can be avoided / made into moves
39 storable->BmonDigis() = ToStdVector(results.bmonDigis);
40 storable->StsDigis() = ToStdVector(results.stsDigis);
41 storable->MuchDigis() = ToStdVector(results.muchDigis);
42 storable->Trd2dDigis() = ToStdVector(results.trd2dDigis);
43 storable->TrdDigis() = ToStdVector(results.trdDigis);
44 storable->TofDigis() = ToStdVector(results.tofDigis);
45 storable->RichDigis() = ToStdVector(results.richDigis);
46
47 storable->StsClusters() = results.stsClusters;
48 storable->StsHits() = results.stsHits;
49 storable->TofHits() = results.tofHits;
50 storable->TrdHits() = results.trdHits;
51
52 storable->Tracks() = results.tracks;
53 storable->TrackStsHitIndices() = results.trackStsHitIndices;
54 storable->TrackTofHitIndices() = results.trackTofHitIndices;
55
56 return storable;
57}
58
59bool dumpArchive(const Options& opts)
60{
61 // Limit the number of events per timeslice to dump to avoid spamming the log
62 constexpr size_t DumpEventsPerTS = 10;
63 constexpr size_t DumpHitsPerSensor = 2;
64 constexpr size_t DumpTracksPerTS = 10;
65
66 if (!opts.DumpArchive()) return false;
67
68 L_(info) << "Dumping archive: " << opts.InputLocator();
69
71
72 auto desc = archive.descriptor();
73 L_(info) << "Archive descriptor: ";
74 L_(info) << " - time_created: " << desc.time_created();
75 L_(info) << " - hostname: " << desc.hostname();
76 L_(info) << " - username: " << desc.username();
77
78 for (auto recoResults = archive.get(); !archive.eos(); recoResults = archive.get()) {
79 if (recoResults == nullptr) {
80 L_(error) << "Failed to read RecoResults from archive";
81 break;
82 }
83
84 size_t nEvents = recoResults->DigiEvents().size();
85 L_(info) << "TS " << recoResults->TsIndex() << " start: " << recoResults->TsStartTime() << " events: " << nEvents
86 << ", stsHits: " << recoResults->StsHits().NElements()
87 << ", tofHits: " << recoResults->TofHits().NElements() << ", tracks: " << recoResults->Tracks().size();
88
89 for (size_t i = 0; i < std::min(DumpEventsPerTS, nEvents); i++) {
90 const auto& digiEvent = recoResults->DigiEvents().at(i);
91 L_(info) << " - Event " << i << " number: " << digiEvent.fNumber << "; time: " << digiEvent.fTime
92 << "; nStsDigis: " << digiEvent.fData.Size(ECbmModuleId::kSts)
93 << "; nTofDigis: " << digiEvent.fData.Size(ECbmModuleId::kTof);
94 }
95
96 if (nEvents > DumpEventsPerTS) L_(info) << "...";
97
98 auto& stsHits = recoResults->StsHits();
99 for (size_t m = 0; m < stsHits.NPartitions(); m++) {
100 auto [hits, address] = stsHits.Partition(m);
101 for (size_t i = 0; i < std::min(DumpHitsPerSensor, hits.size()); i++) {
102 const auto& hit = hits[i];
103 L_(info) << " - STS Hit " << i << " sensor: " << address << "; time: " << hit.fTime << ", X: " << hit.fX
104 << ", Y: " << hit.fY << ", Z: " << hit.fZ;
105 }
106 }
107
108 L_(info) << "...";
109
110 auto tofHits = recoResults->TofHits();
111 for (size_t m = 0; m < tofHits.NPartitions(); m++) {
112 auto [hits, address] = tofHits.Partition(m);
113 for (size_t i = 0; i < std::min(DumpHitsPerSensor, hits.size()); i++) {
114 const auto& hit = hits[i];
115 L_(info) << " - TOF Hit " << i << " sensor: " << address << "; time: " << hit.Time() << ", X: " << hit.X()
116 << ", Y: " << hit.Y() << ", Z: " << hit.Z();
117 }
118 }
119
120 L_(info) << "...";
121
122 auto& tracks = recoResults->Tracks();
123 for (size_t t = 0; t < std::min(tracks.size(), DumpTracksPerTS); t++) {
124 const auto& track = tracks[t];
125 L_(info) << " - Track " << t << " nHits: " << track.fNofHits << ", chi2: " << track.fParPV.ChiSq()
126 << ", X: " << track.fParPV.X() << ", Y: " << track.fParPV.Y() << ", Z: " << track.fParPV.Z();
127 }
128 }
129
130 return true;
131}
132
133int main(int argc, char** argv)
134{
135 Options opts(argc, argv);
136
137 logging::add_console(opts.LogLevel());
138
139 if (!opts.LogFile().empty()) logging::add_file(opts.LogFile().string(), opts.LogLevel());
140
141 // XPU
142 xpu::settings settings;
143 settings.profile = opts.CollectKernelTimes();
144 settings.device = opts.Device();
145 if (opts.LogLevel() == trace) {
146 settings.verbose = true;
147 settings.logging_sink = [](std::string_view msg) { L_(trace) << msg; };
148 }
149 xpu::initialize(settings);
150 xpu::preload<GPUReco>();
151
152 auto ompThreads = opts.NumOMPThreads();
153 if (ompThreads) {
154 L_(debug) << *ompThreads << " OpenMP threads requested";
155 openmp::SetNumThreads(*ompThreads);
156 }
158
159 L_(info) << "CBMRECO buildType=" << BuildInfo::BUILD_TYPE << " gpuDebug=" << BuildInfo::GPU_DEBUG
160 << " parallelSTL=" << BuildInfo::WITH_PARALLEL_STL << " OMP=" << BuildInfo::WITH_OMP
161 << " ZSTD=" << BuildInfo::WITH_ZSTD << " commit=" << BuildInfo::GIT_HASH;
162 std::stringstream ss;
163 for (int i = 0; i < argc; i++) {
164 ss << argv[i] << " ";
165 }
166 L_(info) << ss.str();
167
168 if (dumpArchive(opts)) return 0;
169
170 Reco reco;
171 MemoryLogger memoryLogger;
172
173 auto startProcessing = chron::high_resolution_clock::now();
174 reco.Init(opts);
175
176 fles::TimesliceAutoSource source(opts.InputLocator());
177
178 ProcessingExtraMonitor extraMonitor;
179
180 std::optional<RecoResultsOutputArchive> archive;
181 if (!opts.OutputFile().empty()) {
182 L_(info) << "Writing results to file: " << opts.OutputFile();
183 fles::ArchiveCompression compression = fles::ArchiveCompression::None;
184 if (opts.CompressArchive()) {
185 compression = fles::ArchiveCompression::Zstd;
186 }
187 archive.emplace(opts.OutputFile().string(), compression);
188 }
189
190 int tsIdx = 0;
191 int num_ts = opts.NumTimeslices();
192 if (num_ts > 0) num_ts += opts.SkipTimeslices();
193 L_(debug) << "Starting to fetch timeslices from source...";
194
195 auto startFetchTS = chron::high_resolution_clock::now();
196 while (auto timeslice = source.get()) {
197 if (tsIdx < opts.SkipTimeslices()) {
198 tsIdx++;
199 continue;
200 }
201
202 std::unique_ptr<fles::Timeslice> ts;
203 if (opts.ReleaseMode()) {
204 ts = std::make_unique<fles::StorableTimeslice>(*timeslice);
205 timeslice.reset();
206 }
207 else {
208 ts = std::move(timeslice);
209 }
210
211 auto endFetchTS = chron::high_resolution_clock::now();
212 auto durationFetchTS = endFetchTS - startFetchTS;
213 extraMonitor.timeIdle = chron::duration_cast<chron::duration<double, std::milli>>(durationFetchTS).count();
214
215 try {
216 RecoResults result = reco.Run(*ts);
217 if (archive) {
218 xpu::scoped_timer t_{"Write Archive", &extraMonitor.timeWriteArchive};
219 auto storable = makeStorableRecoResults(*ts, result);
220 extraMonitor.bytesWritten = storable->SizeBytes();
221 xpu::t_add_bytes(extraMonitor.bytesWritten);
222 archive->put(storable);
223 }
224 }
225 catch (const ProcessingError& e) {
226 // TODO: Add flag if we want to abort on exception or continue with next timeslice
227 L_(error) << "Caught ProcessingError while processing timeslice " << tsIdx << ": " << e.what();
228 }
229 reco.QueueProcessingExtraMetrics(extraMonitor);
230
231 // Release memory after each timeslice and log memory usage
232 // This is useful to detect memory leaks as the memory usage should be constant between timeslices
233 ts.reset();
234 memoryLogger.Log();
235
236 tsIdx++;
237
238 if (num_ts > 0 && tsIdx >= num_ts) break;
239
240 startFetchTS = chron::high_resolution_clock::now();
241 }
242
243 if (archive) archive->end_stream();
244
245 reco.Finalize();
246 auto endProcessing = chron::high_resolution_clock::now();
247 auto duration = chron::duration_cast<chron::milliseconds>(endProcessing - startProcessing);
248 L_(info) << "Total Processing time (Wall): " << duration.count() << " ms";
249
250 return 0;
251}
#define L_(level)
TClonesArray * tracks
@ kTof
Time-of-flight Detector.
@ kSts
Silicon Tracking System.
static vector< vector< QAHit > > hits
Memory logging.
System functions.
Track the memory usage of the process and write it to the log.
void Log()
Log the current memory usage.
const std::string & Device() const
bool CompressArchive() const
std::optional< int > NumOMPThreads() const
bool CollectKernelTimes() const
fs::path LogFile() const
fs::path OutputFile() const
const std::string & InputLocator() const
severity_level LogLevel() const
void Init(const Options &)
Definition Reco.cxx:79
void Finalize()
Definition Reco.cxx:395
RecoResults Run(const fles::Timeslice &)
Definition Reco.cxx:229
void QueueProcessingExtraMetrics(const ProcessingExtraMonitor &)
Definition Reco.cxx:599
const std::string GIT_HASH
constexpr bool WITH_PARALLEL_STL
Definition BuildInfo.h:48
constexpr bool WITH_OMP
Definition BuildInfo.h:55
constexpr bool WITH_ZSTD
Definition BuildInfo.h:62
const std::string BUILD_TYPE
int GetMaxThreads()
Definition OpenMP.h:46
void SetNumThreads(int)
Definition OpenMP.h:49
fles::InputArchive< StorableRecoResults, StorableRecoResults, fles::ArchiveType::RecoResultsArchive > RecoResultsInputArchive
task_thread_pool::task_thread_pool & GetGlobalSTLThreadPool()
Get the global thread pool for parallel stl algorithms.
Definition Algorithm.cxx:8
std::vector< T > ToStdVector(const PODVector< T > &vec)
Definition PODVector.h:20
bool dumpArchive(const Options &opts)
int main(int argc, char **argv)
std::shared_ptr< StorableRecoResults > makeStorableRecoResults(const fles::Timeslice &ts, const RecoResults &results)
Monitor for additional processing steps.
Definition Reco.h:111
PartitionedVector< trd::Hit > trdHits
Definition RecoResults.h:43
PODVector< CbmMuchDigi > muchDigis
Definition RecoResults.h:31
PartitionedVector< sts::Cluster > stsClusters
Definition RecoResults.h:39
ca::Vector< ca::Track > tracks
Definition RecoResults.h:45
PartitionedVector< tof::Hit > tofHits
Definition RecoResults.h:42
ca::Vector< std::vector< std::pair< uint32_t, uint32_t > > > trackStsHitIndices
Definition RecoResults.h:46
std::vector< DigiEvent > events
Definition RecoResults.h:37
PODVector< CbmTrdDigi > trd2dDigis
Definition RecoResults.h:32
ca::Vector< std::vector< std::pair< uint32_t, uint32_t > > > trackTofHitIndices
Definition RecoResults.h:47
PartitionedSpan< sts::Hit > stsHits
Definition RecoResults.h:41
PODVector< CbmTofDigi > tofDigis
Definition RecoResults.h:34
PODVector< CbmRichDigi > richDigis
Definition RecoResults.h:35
PODVector< CbmStsDigi > stsDigis
Definition RecoResults.h:30
PODVector< CbmTrdDigi > trdDigis
Definition RecoResults.h:33
PODVector< CbmBmonDigi > bmonDigis
Definition RecoResults.h:29