CbmRoot
Loading...
Searching...
No Matches
CbmMcbm2018Source.cxx
Go to the documentation of this file.
1/* Copyright (C) 2018-2020 Facility for Antiproton and Ion Research in Europe, Darmstadt
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Pierre-Alain Loizeau [committer], Florian Uhlig */
4
5// -----------------------------------------------------------------------------
6// ----- -----
7// ----- CbmMcbm2018Source -----
8// ----- Created 19.01.2018 by P.-A. Loizeau -----
9// ----- -----
10// -----------------------------------------------------------------------------
11
12#include "CbmMcbm2018Source.h"
13
14#include "CbmMcbmUnpack.h"
15
16#include "MicrosliceContents.hpp"
17#include "Timeslice.hpp"
18#include "TimesliceInputArchive.hpp"
19#include "TimesliceMetaData.h"
20#include "TimesliceMultiInputArchive.hpp"
21#include "TimesliceMultiSubscriber.hpp"
22#include "TimesliceSubscriber.hpp"
23
24#include "FairRunOnline.h"
25#include <Logger.h>
26
27#include "TClonesArray.h"
28#include "TH1.h"
29#include "THttpServer.h"
30#include "TProfile.h"
31
32#include <fstream>
33#include <iomanip>
34#include <iostream>
35
37 : FairSource()
38 , fFileName("")
39 , fDirName("")
40 , fInputFileList(new TObjString())
41 , fHost("localhost")
42 , fUnpackers()
43 , fUnpackersToRun()
44 , fTSNumber(0)
45 , fTSCounter(0)
46 , fTimer()
47 , fHistoMissedTS(nullptr)
48 , fHistoMissedTSEvo(nullptr)
49 , fNofTSSinceLastTS(0)
50 , fuTsReduction(1)
51 , fSource(nullptr)
52 , fuSubscriberHwm(1)
53 , fbWriteOutput(kFALSE)
54 , fTimeSliceMetaDataArray(nullptr)
55{
56}
57
59
61{
62 if (0 == fFileName.Length() && 0 == fInputFileList.GetSize()) {
63 // Create a ";" separated string with all host/port combinations
64 fInputFileList.Add(new TObjString(fHost));
65 std::string fileList {""};
66 for (const auto&& obj : fInputFileList) {
67 std::string fileName = dynamic_cast<TObjString*>(obj)->GetString().Data();
68 fileList += fileName;
69 fileList += ";";
70 }
71 fileList.pop_back(); // Remove the last ;
72 fSource.reset(new fles::TimesliceMultiSubscriber(fileList, fuSubscriberHwm));
73
76 dynamic_cast<fles::TimesliceMultiSubscriber*>(fSource.get())->InitTimesliceSubscriber();
77
78 if (!fSource) { LOG(fatal) << "Could not connect to publisher."; }
79 }
80 else {
81 // Create a ";" separated string with all file names
82 std::string fileList {""};
83 for (const auto&& obj : fInputFileList) {
84 std::string fileName = dynamic_cast<TObjString*>(obj)->GetString().Data();
85 fileList += fileName;
86 fileList += ";";
87 }
88 fileList.pop_back(); // Remove the last ;
89 LOG(info) << "Input File String: " << fileList;
90 if (fDirName.Length() > 0) { fSource.reset(new fles::TimesliceMultiInputArchive(fileList, fDirName.Data())); }
91 else {
92 fSource.reset(new fles::TimesliceMultiInputArchive(fileList));
93 }
94 }
95
97 for (auto it = fUnpackers.begin(); it != fUnpackers.end(); ++it)
98 fUnpackersToRun.insert(it->second);
99
100 for (auto itUnp = fUnpackersToRun.begin(); itUnp != fUnpackersToRun.end(); ++itUnp) {
101 LOG(info) << "Initialize " << (*itUnp)->GetName();
102 (*itUnp)->Init();
103 }
104
105 THttpServer* server = FairRunOnline::Instance()->GetHttpServer();
106
107 fHistoMissedTS = new TH1I("Missed_TS", "Missed TS", 2, 0., 2.);
108 fHistoMissedTSEvo = new TProfile("Missed_TS_Evo", "Missed TS evolution; TS Idx []", 100000, 0., 10000000.);
109
110 if (server) {
111 server->Register("/Fles", fHistoMissedTS);
112 server->Register("/Fles", fHistoMissedTSEvo);
113 } // if (server)
114
116 FairRootManager* ioman = FairRootManager::Instance();
117 if (NULL == ioman) { LOG(fatal) << "No FairRootManager instance"; }
118 fTimeSliceMetaDataArray = new TClonesArray("TimesliceMetaData", 10);
119 if (NULL == fTimeSliceMetaDataArray) { LOG(fatal) << "Failed creating the TS meta data TClonesarray "; }
120 ioman->Register("TimesliceMetaData", "TS Meta Data", fTimeSliceMetaDataArray, fbWriteOutput);
121
129 if (0 <= fiUnpSpillIdxStart) {
130 switch (fuFlagSpillStart) {
131 case 0: {
133 if (fvuSpillBreakBegTs.size() - 1 <= static_cast<UInt_t>(fiUnpSpillIdxStop)) {
134 LOG(warning) << "Chosen last spill index larger than spills contained in chosen spill start vector: "
135 << fiUnpSpillIdxStop << " VS " << fvuSpillBreakBegTs.size() - 1;
136 if (static_cast<UInt_t>(fiUnpSpillIdxStart) < fvuSpillBreakBegTs.size() - 1) {
138 LOG(warning) << "Using last possible spill instead as final one";
139 } // if( static_cast< UInt_t >(fiUnpSpillIdxStart) < fvuSpillBreakBegTs.size() - 1 )
140 else
141 LOG(fatal) << "Start index also too large, exiting";
142 } // if( fvuSpillBreakBegTs.size() - 1 <= static_cast< UInt_t >(fiUnpSpillIdxStop) )
143
146 break;
147 }
148 case 1: {
150 if (fvuSpillBreakMidTs.size() - 1 <= static_cast<UInt_t>(fiUnpSpillIdxStop)) {
151 LOG(warning) << "Chosen last spill index larger than spills contained in chosen spill start vector: "
152 << fiUnpSpillIdxStop << " VS " << fvuSpillBreakMidTs.size() - 1;
153 if (static_cast<UInt_t>(fiUnpSpillIdxStart) < fvuSpillBreakMidTs.size() - 1) {
155 LOG(warning) << "Using last possible spill instead as final one";
156 } // if( static_cast< UInt_t >(fiUnpSpillIdxStart) < fvuSpillBreakMidTs.size() - 1 )
157 else
158 LOG(fatal) << "Start index also too large, exiting";
159 } // if( fvuSpillBreakMidTs.size() - 1 <= static_cast< UInt_t >(fiUnpSpillIdxStop) )
160
163 break;
164 }
165 case 2: {
167 if (fvuSpillBreakEndTs.size() - 1 <= static_cast<UInt_t>(fiUnpSpillIdxStop)) {
168 LOG(warning) << "Chosen last spill index larger than spills contained in chosen spill start vector: "
169 << fiUnpSpillIdxStop << " VS " << fvuSpillBreakEndTs.size() - 1;
170 if (static_cast<UInt_t>(fiUnpSpillIdxStart) < fvuSpillBreakEndTs.size() - 1) {
172 LOG(warning) << "Using last possible spill instead as final one";
173 } // if( static_cast< UInt_t >(fiUnpSpillIdxStart) < fvuSpillBreakEndTs.size() - 1 )
174 else
175 LOG(fatal) << "Start index also too large, exiting";
176 } // if( fvuSpillBreakEndTs.size() - 1 <= static_cast< UInt_t >(fiUnpSpillIdxStop) )
177
180 break;
181 }
182 default: {
183 LOG(fatal) << "Unknown spill start point option: " << fuFlagSpillStart;
184 break;
185 }
186 } // switch( fuFlagSpillStart )
187 } // if (0 <= fiUnpSpillIdxStart)
188
189 return kTRUE;
190}
191
193{
194 for (auto itUnp = fUnpackersToRun.begin(); itUnp != fUnpackersToRun.end(); ++itUnp) {
195 LOG(info) << "Set parameter container " << (*itUnp)->GetName();
196 (*itUnp)->SetParContainers();
197 }
198}
199
201{
202 Bool_t result = kTRUE;
203 for (auto itUnp = fUnpackersToRun.begin(); itUnp != fUnpackersToRun.end(); ++itUnp) {
204 LOG(info) << "Initialize parameter container " << (*itUnp)->GetName();
205 result = result && (*itUnp)->InitContainers();
206 }
207 return result;
208}
209
211{
212 Bool_t result = kTRUE;
213 for (auto itUnp = fUnpackersToRun.begin(); itUnp != fUnpackersToRun.end(); ++itUnp) {
214 LOG(info) << "Initialize parameter container " << (*itUnp)->GetName();
215 result = result && (*itUnp)->ReInitContainers();
216 }
217 return result;
218}
219
221{
222 Int_t retVal = FillBuffer();
223
224 if (1 == retVal) { LOG(info) << "No more input"; }
225
226 return retVal; // no more data; trigger end of run
227}
228
229void CbmMcbm2018Source::PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor& mdsc)
230{
231 LOG(info) << "Header ID: Ox" << std::hex << static_cast<int>(mdsc.hdr_id) << std::dec;
232 LOG(info) << "Header version: Ox" << std::hex << static_cast<int>(mdsc.hdr_ver) << std::dec;
233 LOG(info) << "Equipement ID: " << mdsc.eq_id;
234 LOG(info) << "Flags: " << mdsc.flags;
235 LOG(info) << "Sys ID: Ox" << std::hex << static_cast<int>(mdsc.sys_id) << std::dec;
236 LOG(info) << "Sys version: Ox" << std::hex << static_cast<int>(mdsc.sys_ver) << std::dec;
237 LOG(info) << "Microslice Idx: " << mdsc.idx;
238 LOG(info) << "Checksum: " << mdsc.crc;
239 LOG(info) << "Size: " << mdsc.size;
240 LOG(info) << "Offset: " << mdsc.offset;
241}
242
244{
245 if (0 == ts.num_components()) {
246 LOG(error) << "No Component in TS " << ts.index();
247 return 1;
248 }
249 LOG(info) << "Found " << ts.num_components() << " different components in timeslice";
250 return kTRUE;
251}
252
254{
255 for (auto itUnp = fUnpackersToRun.begin(); itUnp != fUnpackersToRun.end(); ++itUnp) {
256 LOG(info) << "Finish " << (*itUnp)->GetName();
257 (*itUnp)->Finish();
258 }
259 /*
260 fHistoMissedTS->Write();
261 fHistoMissedTSEvo->Write();
262*/
263}
264
266{
267 for (auto it = fUnpackers.begin(); it != fUnpackers.end(); ++it) {
268 it->second->Reset();
269 }
271}
272
274{
275 while (auto timeslice = fSource->get()) {
276 fTSCounter++;
277 if (0 == fTSCounter % 10000) { LOG(info) << "Analyse Event " << fTSCounter; }
278
279 const fles::Timeslice& ts = *timeslice;
280 auto tsIndex = ts.index();
281 if ((tsIndex != (fTSNumber + 1)) && (fTSNumber != 0)) {
282 LOG(debug) << "Missed Timeslices. Old TS Number was " << fTSNumber << " New TS Number is " << tsIndex;
283 fHistoMissedTS->Fill(1, tsIndex - fTSNumber - 1);
284 fHistoMissedTSEvo->Fill(tsIndex, 1, tsIndex - fTSNumber - 1);
285 fNofTSSinceLastTS = tsIndex - fTSNumber;
286 }
287 else {
289 }
290 fHistoMissedTS->Fill(0);
291 fHistoMissedTSEvo->Fill(tsIndex, 0, 1);
292 fTSNumber = tsIndex;
293
294 if (0 == fTSNumber % 1000) { LOG(info) << "Reading Timeslice " << fTSNumber; }
295
296 if (1 == fTSCounter) {
297 for (size_t c {0}; c < ts.num_components(); c++) {
298 auto systemID = static_cast<int>(ts.descriptor(c, 0).sys_id);
299 LOG(info) << "Found systemID: " << std::hex << systemID << std::dec;
300
302 auto it_list = fUnpackers.equal_range(systemID);
303 if (it_list.first == it_list.second) {
304 LOG(info) << "Could not find unpacker for system id 0x" << std::hex << systemID << std::dec;
305 }
306 else { // if( it == fUnpackers.end() )
307 for (auto it = it_list.first; it != it_list.second; ++it) {
308 it->second->AddMsComponentToList(c, systemID);
309 it->second->SetNbMsInTs(ts.num_core_microslices(), ts.num_microslices(c) - ts.num_core_microslices());
310 } // for( auto it = it_list.first; it != it_list.second; ++it )
311 } // else of if( it == fUnpackers.end() )
312 } // for (size_t c {0}; c < ts.num_components(); c++)
313
315 auto nMsInTs = ts.num_core_microslices();
316 if (nMsInTs > 2) {
317 // This assumes that we have a component 0 and component independent ms/ts settings!
318 auto msDescA = ts.descriptor(0, 1);
319 auto msDescB = ts.descriptor(0, 2);
320 auto msLength = msDescB.idx - msDescA.idx;
321 fTSLength = msLength * nMsInTs;
322 fTSOverlappLength = msLength * (ts.num_microslices(0) - nMsInTs);
323 LOG(info) << "CbmMcbm2018Source::FillBuffer() - TS 1 - Calculated "
324 << "TimesliceMetaData information from microslices Metadata -> "
325 << "MS length found to be " << msLength << " ns, TS length " << fTSLength
326 << " ns, and TS overlap length " << fTSOverlappLength << " ns";
327 }
328 else {
329 LOG(warning) << "CbmMcbm2018Source::FillBuffer() - TS 1 - Calculate "
330 "TimesliceMetaData information - single microslice timeslices -> "
331 "TS duration can not be calculated with the given method. Hence, "
332 "TimesliceMetaData duration values are filled with 0";
333 }
334 } // if( 1 == fTSCounter )
335
337 if (0 <= fiUnpSpillIdxStart) {
338 if (tsIndex < fuSpillBegTs) {
340 continue;
341 } // if (tsIndex < fuSpillBegTs)
342 else if (fuSpillEndTs <= tsIndex) {
344 return 1;
345 } // else if
346 } // if (0 <= fiUnpSpillIdxStart)
347
348
350 if (0 == tsIndex % fuTsReduction) {
351 for (auto itUnp = fUnpackersToRun.begin(); itUnp != fUnpackersToRun.end(); ++itUnp) {
352 (*itUnp)->DoUnpack(ts, 0);
353 } // for( auto itUnp = fUnpackersToRun.begin(); itUnp != fUnpackersToRun.end(); ++ itUnp )
354 } // if( 0 == tsIndex % fuTsReduction )
355
357 new ((*fTimeSliceMetaDataArray)[fTimeSliceMetaDataArray->GetEntriesFast()])
358 TimesliceMetaData(ts.descriptor(0, 0).idx, fTSLength, fTSOverlappLength, tsIndex);
359
360 return 0;
361 }
362 return 1;
363}
364
ClassImp(CbmConverterManager)
int Int_t
bool Bool_t
TClonesArray * fTimeSliceMetaDataArray
If ON the output TClonesArray of meta-data is written to disk.
void PrintMicroSliceDescriptor(const fles::MicrosliceDescriptor &mdsc)
std::vector< ULong64_t > fvuSpillBreakMidTs
virtual void SetParUnpackers()
virtual Bool_t InitUnpackers()
std::unique_ptr< fles::TimesliceSource > fSource
std::vector< ULong64_t > fvuSpillBreakEndTs
virtual Bool_t ReInitUnpackers()
UInt_t fTSNumber
List of all unpackers for which at least one matching container was found.
Int_t fiUnpSpillIdxStop
>= 0 means unpack only from this spill
UInt_t fuSpillBegTs
0 = Break begin, 1 = Break middle, 2 = Break end
TList fInputFileList
List of input files.
std::multimap< Int_t, CbmMcbmUnpack * > fUnpackers
std::unordered_set< CbmMcbmUnpack * > fUnpackersToRun
List pairs of system ID and unpacker pointer (unpacker can appear multiple times)
Bool_t CheckTimeslice(const fles::Timeslice &ts)
Bool_t fbWriteOutput
Output ClonesArray.
UInt_t fuFlagSpillStart
>= 0 means unpack only up to this spill (included)
std::vector< ULong64_t > fvuSpillBreakBegTs