OpenCPN Partial API docs
comm_out_queue.cpp
1 #include <algorithm>
2 #include <cassert>
3 #include <stdexcept>
4 
5 #include <sys/types.h>
6 
7 #include "model/comm_drv_registry.h"
8 #include "model/comm_out_queue.h"
9 #include "model/logger.h"
10 
11 // Both arm and intel are little endian, but better safe than sorry:
12 #if __BYTE_ORDER == __LITTLE_ENDIAN
13 static const uint64_t kFirstFiveBytes = 0x000000ffffffffff;
14 #else
15 static const uint64_t kFirstFiveBytes = 0xffffffffff000000;
16 #endif
17 
18 #define PUBX 190459303248 // "PUBX,"
19 #define STALK 323401897043 // "STALK"
20 
21 
22 
31 static inline uint64_t GetNmeaType(const std::string& line) {
32  size_t skipchars = 1;
33  if (line[0] == 0x5c) { // Starts with the tag block '\', we need to skip it
34  // and then also the start delimiter
35  skipchars = line.find(',', 1);
36  if (skipchars == std::string::npos) {
37  skipchars = 1; // This should never happen, there is no end of the tag
38  // block, but just in case...
39  }
40  }
41  uint64_t result = *reinterpret_cast<const uint64_t*>(&line[skipchars]);
42  uint64_t result5 = result & kFirstFiveBytes;
43  if (result5 == PUBX || result5 == STALK) {
44  /* PUBX from possibly high-speed u-blox GNSS receivers that are sure to
45  overload slow connections has a 2 digit zero-padded numerical message ID
46  in the first field Similar with STALK, the two digit Seatalk message ID
47  is in the first field Both fit nicely into 8 bytes though... */
48  return result;
49  } else {
50  return result5;
51  }
52 }
53 
54 static void ReportOverrun(const std::string& msg, bool overrun_reported) {
55  auto& registry = CommDriverRegistry::GetInstance();
56  std::string s;
57  if (msg.length() < 6) s = msg; else s = msg.substr(0, 5);
58  DEBUG_LOG << "CommOutQueue: Overrun on: " << msg;
59  if (!overrun_reported) registry.evt_comm_overrun.Notify(msg);
60 }
61 
62 
63 CommOutQueue::BufferItem::BufferItem(const std::string& _line)
64  : type(GetNmeaType(_line)),
65  line(_line),
66  stamp(std::chrono::steady_clock::now()) {}
67 
68 CommOutQueue::BufferItem::BufferItem(const BufferItem& other)
69  : type(other.type),
70  line(other.line),
71  stamp(std::chrono::steady_clock::now()) {}
72 
73 using duration_ms = std::chrono::duration<unsigned, std::milli>;
74 
75 CommOutQueue::CommOutQueue(unsigned max_buffered, duration_ms min_msg_gap)
76  : m_size(max_buffered - 1),
77  m_min_msg_gap(min_msg_gap),
78  m_overrun_reported(false) {
79  assert(max_buffered >= 1 && "Illegal buffer size");
80 }
81 
82 bool CommOutQueue::push_back(const std::string& line) {
83  if (line.size() < 7) return false;
84  BufferItem item(line);
85  auto match = [item](const BufferItem& it) { return it.type == item.type; };
86 
87  std::lock_guard<std::mutex> lock(m_mutex);
88  int found = std::count_if(m_buffer.begin(), m_buffer.end(), match);
89  if (found > 0) {
90  auto it = std::find_if(m_buffer.begin(), m_buffer.end(), match);
91  assert(it != m_buffer.end());
92  auto timespan = item.stamp - it->stamp;
93  if (timespan < m_min_msg_gap) {
94  m_buffer.erase(it);
95  if (m_rate_limits_logged.find(item.type) != m_rate_limits_logged.end()) {
96  m_rate_limits_logged.insert(item.type);
97  wxLogMessage("Limiting output rate for %u, message: %s", item.type,
98  line.c_str());
99  }
100  }
101  }
102  if (found > m_size) {
103  // overflow: too many of these kind of messages
104  // are still not processed. Drop so we keep m_size of them.
105  if (!m_overrun_reported) {
106  ReportOverrun(line, m_overrun_reported);
107  m_overrun_reported = true;
108  }
109  int matches = 0;
110  auto match_cnt = [&](const BufferItem& it) {
111  return it.type == item.type && matches++ >= m_size;
112  };
113  m_buffer.erase(std::remove_if(m_buffer.begin(), m_buffer.end(), match_cnt),
114  m_buffer.end());
115  }
116  m_buffer.insert(m_buffer.begin(), item);
117  return true;
118 }
119 
120 std::string CommOutQueue::pop() {
121  std::lock_guard<std::mutex> lock(m_mutex);
122 
123  if (m_buffer.size() <= 0)
124  throw std::underflow_error("Attempt to pop() from empty buffer");
125  auto item = m_buffer.back();
126  m_buffer.pop_back();
127  return item.line;
128 }
129 
130 int CommOutQueue::size() const {
131  std::lock_guard<std::mutex> lock(m_mutex);
132  return m_buffer.size();
133 }
134 
135 bool CommOutQueueSingle::push_back(const std::string& line) {
136  if (line.size() < 7) return false;
137  BufferItem item(line);
138  auto match = [&item](const BufferItem& it) { return it.type == item.type; };
139 
140  std::lock_guard<std::mutex> lock(m_mutex);
141  auto found = std::find_if(m_buffer.begin(), m_buffer.end(), match);
142  if (found != m_buffer.end()) {
143  // overflow: this kind of message is still not processed. Drop it
144  m_buffer.erase(std::remove_if(found, m_buffer.end(), match),
145  m_buffer.end());
146  }
147  m_buffer.push_back(item);
148  return true;
149 }
150 
151 bool MeasuredCommOutQueue::push_back(const std::string& line) {
152  using std::chrono::duration;
153  using std::chrono::steady_clock;
154 
155  auto t1 = steady_clock::now();
156  bool ok = CommOutQueue::push_back(line);
157  msg_perf[GetNmeaType(line)].in(line.size(), ok);
158  perf.in(line.size(), ok);
159  auto t2 = steady_clock::now();
160  duration<double, std::micro> us_time = t2 - t1;
161 
162  push_time = 0.95 * push_time + 0.05 * us_time.count(); // LP filter.
163  return ok;
164 }
165 
167  using std::chrono::duration;
168  using std::chrono::steady_clock;
169 
170  auto t1 = steady_clock::now();
171  // auto msg = CommOutQueue::pop(); // We need to update the perf counters,
172  // can't just pop() here
173  std::lock_guard<std::mutex> lock(m_mutex);
174  if (m_buffer.size() <= 0)
175  throw std::underflow_error("Attempt to pop() from empty buffer");
176  auto item = m_buffer.back();
177  m_buffer.pop_back();
178  perf.out(item.line.size(), item.stamp);
179  msg_perf[item.type].out(item.line.size(), item.stamp);
180  auto t2 = steady_clock::now();
181  duration<double, std::micro> us_time = t2 - t1;
182  us_time = t2 - t1;
183 
184  pop_time = 0.95 * pop_time + 0.05 * us_time.count(); // LP filter.
185  return item.line;
186 }
187 
188 std::ostream& operator<<(std::ostream& os, const MeasuredCommOutQueue& q) {
189  os << "{";
190  os << "push_time: " << q.push_time << ", ";
191  os << "pop_time: " << q.pop_time << ", ";
192  os << "perf: " << q.perf << ", ";
193  os << "msg_perf: [";
194  for (const auto& kv : q.msg_perf) {
195  os << kv.first << ": " << kv.second << ", ";
196  }
197  os << "]";
198  os << "}";
199  return os;
200 };
201 
202 std::ostream& operator<<(std::ostream& os, const PerfCounter& pc) {
203  os << "{";
204  os << "msgs_in: " << pc.msgs_in << ", ";
205  os << "msgs_out: " << pc.msgs_out << ", ";
206  os << "bytes_in: " << pc.bytes_in << ", ";
207  os << "bytes_out: " << pc.bytes_out << ", ";
208  os << "bps_in: " << pc.bps_in << ", ";
209  os << "mps_in: " << pc.mps_in << ", ";
210  os << "bps_out: " << pc.bps_out << ", ";
211  os << "mps_out: " << pc.mps_out << ", ";
212  os << "in_out_delay_us: " << pc.in_out_delay_us << ", ";
213  os << "overflow_msgs: " << pc.overflow_msgs << ", ";
214  os << "in_queue: " << pc.in_queue;
215  os << "}";
216  return os;
217 };
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.