CbmRoot
Loading...
Searching...
No Matches
VtFeeder.cxx
Go to the documentation of this file.
1/* Copyright (C) 2025 Jagiellonian University, Krakow
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Bartosz Sobol [committer], Jan de Cuveland */
4
5#include "VtFeeder.h"
6
7#include <TimesliceAutoSource.hpp>
8#include <TimesliceOutputArchive.hpp>
9#include <TimeslicePublisher.hpp>
10
11#include <chrono>
12#include <thread>
13
14#include <fmt/format.h>
15
16VtFeeder::VtFeeder(const FeederOptions& options, volatile sig_atomic_t* signalStatus)
17 : fOptions{options}
18 , fSignalStatus{signalStatus}
19 , fSource{}
20 , fPublishers{}
21 , fCount{0}
22{
23 fSource = std::make_unique<VtSource>(fOptions.fInput);
24
25 for (const auto& output : fOptions.fOutputs) {
26 UriComponents uri{output};
27
28 if (uri.scheme == "tcp") {
29 uint32_t hwm = 1;
30 for (auto& [key, value] : uri.query_components) {
31 if (key == "hwm") {
32 hwm = stou(value);
33 }
34 else {
35 throw std::runtime_error(
36 fmt::format("[VtFeeder] Query parameter not implemented for scheme {}: {}", uri.scheme, key));
37 }
38 }
39 const auto address = uri.scheme + "://" + uri.authority;
40 fPublishers.push_back(std::unique_ptr<fles::TimesliceSink>(new fles::TimeslicePublisher(address, hwm)));
41 }
42 else if (uri.scheme == "file" || uri.scheme.empty()) {
43 fles::ArchiveCompression compression = fles::ArchiveCompression::None;
44 for (const auto& [key, value] : uri.query_components) {
45 if (key == "c") {
46 if (value == "zstd") {
47 compression = fles::ArchiveCompression::Zstd;
48 }
49 else if (value != "none") {
50 throw std::runtime_error(fmt::format("[VtFeeder] invalid compression type for scheme file: {}", value));
51 }
52 }
53 else {
54 throw std::runtime_error(
55 fmt::format("[VtFeeder] Query parameter not implemented for scheme {}: {}", uri.scheme, key));
56 }
57 }
58 const auto filePath = uri.authority + uri.path;
59 fPublishers.push_back(
60 std::unique_ptr<fles::TimesliceSink>(new fles::TimesliceOutputArchive(filePath, compression)));
61 }
62 else {
63 throw std::runtime_error(fmt::format("[VtFeeder] Unsupported output scheme: {}", uri.scheme));
64 }
65 }
66}
67
69{
70 uint64_t i = 0;
71 std::unique_ptr<fles::StorableTimeslice> timeslice{};
72 while (*fSignalStatus == 0 and (timeslice = fSource->get())) {
73 if (i >= fOptions.fOffset && (i - fOptions.fOffset) % fOptions.fStride == 0) {
74 ++i;
75 }
76 else {
77 ++i;
78 continue;
79 }
80
81 std::shared_ptr<const fles::StorableTimeslice> ts{};
82 if (fOptions.fReleaseMode) {
83 ts = std::make_shared<const fles::StorableTimeslice>(*timeslice);
84 timeslice.reset();
85 }
86 else {
87 ts = std::shared_ptr<const fles::StorableTimeslice>(std::move(timeslice));
88 }
89 for (auto& publisher : fPublishers) {
90 publisher->put(ts);
91 }
92 if (++fCount == fOptions.fMaximumNumber) {
93 break;
94 }
95 timeslice.reset();
96 }
97
98 L_(info) << fmt::format("[VtFeeder::Run] Total timeslices processed: {}", fCount);
99}
#define L_(level)
VtFeeder(const FeederOptions &options, volatile sig_atomic_t *signalStatus)
Definition VtFeeder.cxx:16
const FeederOptions & fOptions
Definition VtFeeder.h:25
volatile sig_atomic_t * fSignalStatus
Definition VtFeeder.h:26
std::vector< std::unique_ptr< fles::TimesliceSink > > fPublishers
Definition VtFeeder.h:29
std::unique_ptr< VtSource > fSource
Definition VtFeeder.h:28
uint64_t fCount
Definition VtFeeder.h:31
void Run()
Definition VtFeeder.cxx:68