CbmRoot
Loading...
Searching...
No Matches
HistogramSender.h
Go to the documentation of this file.
1/* Copyright (C) 2023 FIAS Frankfurt Institute for Advanced Studies, Frankfurt / Main
2 SPDX-License-Identifier: GPL-3.0-only
3 Authors: Felix Weiglhofer [committer] */
4#ifndef CBM_ALGO_BASE_HISTOGRAM_SENDER_H
5#define CBM_ALGO_BASE_HISTOGRAM_SENDER_H
6
7#include <boost/archive/binary_oarchive.hpp>
8#include <boost/iostreams/device/array.hpp>
9#include <boost/iostreams/device/back_inserter.hpp>
10#ifdef BOOST_IOS_HAS_ZSTD
11#include <boost/iostreams/filter/zstd.hpp>
12#include <boost/iostreams/filtering_stream.hpp>
13#endif
14#include <boost/iostreams/stream.hpp>
15#include <boost/serialization/utility.hpp>
16#include <boost/serialization/vector.hpp>
17
18#include <string>
19#include <string_view>
20#include <zmq.hpp>
21
22namespace cbm::algo
23{
24
26 public:
27 HistogramSender(std::string_view address, int32_t hwm = 1, bool compression = false)
28 : fHistComChan(address)
30 , fbCompression(compression)
31 , fZmqContext(1)
32 , fZmqSocket(fZmqContext, zmq::socket_type::push)
33 {
34 fZmqSocket.set(zmq::sockopt::sndhwm, fHistHighWaterMark); // High-Water Mark, max nb updates kept in out buffer
35 fZmqSocket.connect(fHistComChan); // This side "connects" to socket => Other side should have "bind"!!!!
36 }
37
42 template<typename Object>
43 void PrepareAndSendMsg(const Object& obj, zmq::send_flags flags)
44 {
46 namespace b_io = boost::iostreams;
47 namespace b_ar = boost::archive;
48
49 std::string serial_str;
50 b_io::back_insert_device<std::string> inserter(serial_str);
51 b_io::stream<b_io::back_insert_device<std::string>> bstream(inserter);
52
53 serial_str.clear();
54
55 if (fbCompression) {
56#ifdef BOOST_IOS_HAS_ZSTD
57 std::unique_ptr<b_io::filtering_ostream> out_ = std::make_unique<b_io::filtering_ostream>();
58 out_->push(b_io::zstd_compressor(b_io::zstd::best_speed));
59 out_->push(bstream);
60 std::unique_ptr<b_ar::binary_oarchive> oarchive_ =
61 std::make_unique<b_ar::binary_oarchive>(*out_, b_ar::no_header);
62 *oarchive_ << obj;
63#else
64 throw std::runtime_error("Unsupported ZSTD compression (boost) for histograms emissions channel");
65#endif
66 }
67 else {
68 b_ar::binary_oarchive oa(bstream);
69 oa << obj;
70 }
71 bstream.flush();
72
73 zmq::message_t msg(serial_str.size());
74 std::copy_n(static_cast<const char*>(serial_str.data()), msg.size(), static_cast<char*>(msg.data()));
76 fZmqSocket.send(msg, flags | zmq::send_flags::dontwait);
77 }
78
79 private:
80 std::string fHistComChan = "tcp://127.0.0.1:56800";
81 int32_t fHistHighWaterMark = 1;
82 bool fbCompression = false;
83 zmq::context_t fZmqContext;
84 zmq::socket_t fZmqSocket;
85 };
86
87} // namespace cbm::algo
88
89#endif
HistogramSender(std::string_view address, int32_t hwm=1, bool compression=false)
void PrepareAndSendMsg(const Object &obj, zmq::send_flags flags)
Serialize object and send it to the histogram server.
zmq::socket_t fZmqSocket
ZMQ socket to histogram server.
zmq::context_t fZmqContext
ZMQ context FIXME: should be only one context per binary!