1 #ifndef COMM__OUT_QUEUE_H__
2 #define COMM__OUT_QUEUE_H__
9 #include <unordered_map>
12 using namespace std::literals::chrono_literals;
29 void in(
const size_t bytes,
bool ok) {
30 auto t1 = std::chrono::steady_clock::now();
31 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
32 bps_in = 0.95 * bps_in + 0.05 * bytes * 1000000 / us_time.count();
33 mps_in = 0.95 * bps_in + 0.05 * 1000000 / us_time.count();
43 void out(
const size_t bytes,
44 std::chrono::time_point<std::chrono::steady_clock> in_ts) {
45 auto t1 = std::chrono::steady_clock::now();
46 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
47 bps_out = 0.95 * bps_out + 0.05 * bytes * 1000000 / us_time.count();
48 mps_out = 0.95 * bps_out + 0.05 * 1000000 / us_time.count();
50 in_out_delay_us = 0.95 * in_out_delay_us + 0.05 * us_time.count();
65 size_t in_out_delay_us;
68 std::chrono::time_point<std::chrono::steady_clock> last_in;
69 std::chrono::time_point<std::chrono::steady_clock> last_out;
72 std::ostream& operator<<(std::ostream& os,
const PerfCounter& pc);
85 virtual bool push_back(
const std::string& line);
91 virtual std::string pop();
94 virtual int size()
const;
106 std::chrono::duration<unsigned, std::milli> min_msg_gap);
131 std::chrono::time_point<std::chrono::steady_clock> stamp;
134 std::vector<BufferItem> m_buffer;
135 mutable std::mutex m_mutex;
137 using duration_ms = std::chrono::duration<unsigned, std::milli>;
138 duration_ms m_min_msg_gap;
139 bool m_overrun_reported;
140 std::set<uint64_t> m_rate_limits_logged;;
149 bool push_back(
const std::string& line)
override;
157 std::chrono::duration<unsigned, std::milli> min_msg_gap)
158 :
CommOutQueue(max_buffered, min_msg_gap), push_time(0), pop_time(0) {}
163 bool push_back(
const std::string& line)
override;
165 std::string pop()
override;
167 std::unordered_map<unsigned long, PerfCounter> msg_perf;
180 std::lock_guard<std::mutex> lock(m_mutex);
181 buff.insert(buff.begin(), line);
185 std::string
pop()
override {
186 std::lock_guard<std::mutex> lock(m_mutex);
187 if (buff.size() <= 0)
188 throw std::underflow_error(
"Attempt to pop() from empty buffer");
189 auto line = buff.back();
195 std::lock_guard<std::mutex> lock(m_mutex);
200 mutable std::mutex m_mutex;
201 std::vector<std::string> buff;
A CommOutQueue limited to one message of each kind.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
CommOutQueue(unsigned max_buffered)
Create a buffer which stores at most max_buffered items of each message.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
Simple FIFO queue without added logic.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
int size() const override
Return number of lines in queue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.