OpenCPN Partial API docs
comm_out_queue.h
1 #ifndef COMM__OUT_QUEUE_H__
2 #define COMM__OUT_QUEUE_H__
3 
4 #include <chrono>
5 #include <cstdint>
6 #include <mutex>
7 #include <set>
8 #include <string>
9 #include <unordered_map>
10 #include <vector>
11 
12 using namespace std::literals::chrono_literals;
13 
14 class PerfCounter {
15 public:
16  PerfCounter()
17  : msgs_in(0),
18  msgs_out(0),
19  bytes_in(0),
20  bytes_out(0),
21  bps_in(0),
22  mps_in(0),
23  bps_out(0),
24  mps_out(0),
25  in_out_delay_us(0),
26  overflow_msgs(0),
27  in_queue(0) {}
28 
29  void in(const size_t bytes, bool ok) {
30  auto t1 = std::chrono::steady_clock::now();
31  std::chrono::duration<double, std::micro> us_time = t1 - last_in;
32  bps_in = 0.95 * bps_in + 0.05 * bytes * 1000000 / us_time.count();
33  mps_in = 0.95 * bps_in + 0.05 * 1000000 / us_time.count();
34  msgs_in++;
35  bytes_in += bytes;
36  last_in = t1;
37  if (!ok) {
38  overflow_msgs++;
39  }
40  in_queue++;
41  }
42 
43  void out(const size_t bytes,
44  std::chrono::time_point<std::chrono::steady_clock> in_ts) {
45  auto t1 = std::chrono::steady_clock::now();
46  std::chrono::duration<double, std::micro> us_time = t1 - last_in;
47  bps_out = 0.95 * bps_out + 0.05 * bytes * 1000000 / us_time.count();
48  mps_out = 0.95 * bps_out + 0.05 * 1000000 / us_time.count();
49  us_time = t1 - in_ts;
50  in_out_delay_us = 0.95 * in_out_delay_us + 0.05 * us_time.count();
51  msgs_out++;
52  bytes_out += bytes;
53  last_out = t1;
54  in_queue--;
55  }
56 
57  size_t msgs_in;
58  size_t msgs_out;
59  size_t bytes_in;
60  size_t bytes_out;
61  uint32_t bps_in;
62  double mps_in;
63  uint32_t bps_out;
64  double mps_out;
65  size_t in_out_delay_us;
66  size_t overflow_msgs;
67  size_t in_queue;
68  std::chrono::time_point<std::chrono::steady_clock> last_in;
69  std::chrono::time_point<std::chrono::steady_clock> last_out;
70 };
71 
72 std::ostream& operator<<(std::ostream& os, const PerfCounter& pc);
73 
78 class CommOutQueue {
79 public:
80 
85  virtual bool push_back(const std::string& line);
86 
91  virtual std::string pop();
92 
94  virtual int size() const;
95 
105  CommOutQueue(unsigned max_buffered,
106  std::chrono::duration<unsigned, std::milli> min_msg_gap);
111  CommOutQueue(unsigned max_buffered) : CommOutQueue(max_buffered, 0ms) {}
112 
118 
119  // Disable copying and assignment
120  CommOutQueue(const CommOutQueue& other) = delete;
121  CommOutQueue& operator=(const CommOutQueue&) = delete;
122 
123  virtual ~CommOutQueue() = default;
124 
125 protected:
126  struct BufferItem {
127  uint64_t type;
128  std::string line;
129  BufferItem(const std::string& line);
130  BufferItem(const BufferItem& other);
131  std::chrono::time_point<std::chrono::steady_clock> stamp;
132  };
133 
134  std::vector<BufferItem> m_buffer;
135  mutable std::mutex m_mutex;
136  int m_size;
137  using duration_ms = std::chrono::duration<unsigned, std::milli>;
138  duration_ms m_min_msg_gap;
139  bool m_overrun_reported;
140  std::set<uint64_t> m_rate_limits_logged;;
141 };
142 
145 public:
146  CommOutQueueSingle() : CommOutQueue(1, 0ms) {}
147 
149  bool push_back(const std::string& line) override;
150 };
151 
155 public:
156  MeasuredCommOutQueue(unsigned max_buffered,
157  std::chrono::duration<unsigned, std::milli> min_msg_gap)
158  : CommOutQueue(max_buffered, min_msg_gap), push_time(0), pop_time(0) {}
159 
160  MeasuredCommOutQueue(unsigned max_buffered)
161  : MeasuredCommOutQueue(max_buffered, 0ms) {}
162 
163  bool push_back(const std::string& line) override;
164 
165  std::string pop() override;
166 
167  std::unordered_map<unsigned long, PerfCounter> msg_perf;
168 
169  PerfCounter perf;
170  double push_time;
171  double pop_time;
172 };
173 
176 public:
177  DummyCommOutQueue() {};
178 
179  bool push_back(const std::string& line) override {
180  std::lock_guard<std::mutex> lock(m_mutex);
181  buff.insert(buff.begin(), line);
182  return true;
183  }
184 
185  std::string pop() override {
186  std::lock_guard<std::mutex> lock(m_mutex);
187  if (buff.size() <= 0)
188  throw std::underflow_error("Attempt to pop() from empty buffer");
189  auto line = buff.back();
190  buff.pop_back();
191  return line;
192  }
193 
194  int size() const override {
195  std::lock_guard<std::mutex> lock(m_mutex);
196  return buff.size();
197  }
198 
199 private:
200  mutable std::mutex m_mutex;
201  std::vector<std::string> buff;
202 };
203 
204 std::ostream& operator<<(std::ostream& os, const MeasuredCommOutQueue& q);
205 
206 #endif // COMM__OUT_QUEUE_H__
A CommOutQueue limited to one message of each kind.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
CommOutQueue(unsigned max_buffered)
Create a buffer which stores at most max_buffered items of each message.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
Simple FIFO queue without added logic.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
int size() const override
Return number of lines in queue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.