7 #include "model/comm_drv_registry.h"
8 #include "model/comm_out_queue.h"
9 #include "model/logger.h"
12 #if __BYTE_ORDER == __LITTLE_ENDIAN
13 static const uint64_t kFirstFiveBytes = 0x000000ffffffffff;
15 static const uint64_t kFirstFiveBytes = 0xffffffffff000000;
18 #define PUBX 190459303248
19 #define STALK 323401897043
31 static inline uint64_t GetNmeaType(
const std::string& line) {
33 if (line[0] == 0x5c) {
35 skipchars = line.find(
',', 1);
36 if (skipchars == std::string::npos) {
41 uint64_t result = *
reinterpret_cast<const uint64_t*
>(&line[skipchars]);
42 uint64_t result5 = result & kFirstFiveBytes;
43 if (result5 == PUBX || result5 == STALK) {
54 static void ReportOverrun(
const std::string& msg,
bool overrun_reported) {
55 auto& registry = CommDriverRegistry::GetInstance();
57 if (msg.length() < 6) s = msg;
else s = msg.substr(0, 5);
58 DEBUG_LOG <<
"CommOutQueue: Overrun on: " << msg;
59 if (!overrun_reported) registry.evt_comm_overrun.Notify(msg);
63 CommOutQueue::BufferItem::BufferItem(
const std::string& _line)
64 : type(GetNmeaType(_line)),
66 stamp(std::chrono::steady_clock::now()) {}
68 CommOutQueue::BufferItem::BufferItem(
const BufferItem& other)
71 stamp(std::chrono::steady_clock::now()) {}
73 using duration_ms = std::chrono::duration<unsigned, std::milli>;
76 : m_size(max_buffered - 1),
77 m_min_msg_gap(min_msg_gap),
78 m_overrun_reported(false) {
79 assert(max_buffered >= 1 &&
"Illegal buffer size");
83 if (line.size() < 7)
return false;
85 auto match = [item](
const BufferItem& it) {
return it.type == item.type; };
87 std::lock_guard<std::mutex> lock(m_mutex);
88 int found = std::count_if(m_buffer.begin(), m_buffer.end(), match);
90 auto it = std::find_if(m_buffer.begin(), m_buffer.end(), match);
91 assert(it != m_buffer.end());
92 auto timespan = item.stamp - it->stamp;
93 if (timespan < m_min_msg_gap) {
95 if (m_rate_limits_logged.find(item.type) != m_rate_limits_logged.end()) {
96 m_rate_limits_logged.insert(item.type);
97 wxLogMessage(
"Limiting output rate for %u, message: %s", item.type,
102 if (found > m_size) {
105 if (!m_overrun_reported) {
106 ReportOverrun(line, m_overrun_reported);
107 m_overrun_reported =
true;
111 return it.type == item.type && matches++ >= m_size;
113 m_buffer.erase(std::remove_if(m_buffer.begin(), m_buffer.end(), match_cnt),
116 m_buffer.insert(m_buffer.begin(), item);
121 std::lock_guard<std::mutex> lock(m_mutex);
123 if (m_buffer.size() <= 0)
124 throw std::underflow_error(
"Attempt to pop() from empty buffer");
125 auto item = m_buffer.back();
131 std::lock_guard<std::mutex> lock(m_mutex);
132 return m_buffer.size();
136 if (line.size() < 7)
return false;
138 auto match = [&item](
const BufferItem& it) {
return it.type == item.type; };
140 std::lock_guard<std::mutex> lock(m_mutex);
141 auto found = std::find_if(m_buffer.begin(), m_buffer.end(), match);
142 if (found != m_buffer.end()) {
144 m_buffer.erase(std::remove_if(found, m_buffer.end(), match),
147 m_buffer.push_back(item);
152 using std::chrono::duration;
153 using std::chrono::steady_clock;
155 auto t1 = steady_clock::now();
157 msg_perf[GetNmeaType(line)].in(line.size(), ok);
158 perf.in(line.size(), ok);
159 auto t2 = steady_clock::now();
160 duration<double, std::micro> us_time = t2 - t1;
162 push_time = 0.95 * push_time + 0.05 * us_time.count();
167 using std::chrono::duration;
168 using std::chrono::steady_clock;
170 auto t1 = steady_clock::now();
173 std::lock_guard<std::mutex> lock(m_mutex);
174 if (m_buffer.size() <= 0)
175 throw std::underflow_error(
"Attempt to pop() from empty buffer");
176 auto item = m_buffer.back();
178 perf.out(item.line.size(), item.stamp);
179 msg_perf[item.type].out(item.line.size(), item.stamp);
180 auto t2 = steady_clock::now();
181 duration<double, std::micro> us_time = t2 - t1;
184 pop_time = 0.95 * pop_time + 0.05 * us_time.count();
190 os <<
"push_time: " << q.push_time <<
", ";
191 os <<
"pop_time: " << q.pop_time <<
", ";
192 os <<
"perf: " << q.perf <<
", ";
194 for (
const auto& kv : q.msg_perf) {
195 os << kv.first <<
": " << kv.second <<
", ";
202 std::ostream& operator<<(std::ostream& os,
const PerfCounter& pc) {
204 os <<
"msgs_in: " << pc.msgs_in <<
", ";
205 os <<
"msgs_out: " << pc.msgs_out <<
", ";
206 os <<
"bytes_in: " << pc.bytes_in <<
", ";
207 os <<
"bytes_out: " << pc.bytes_out <<
", ";
208 os <<
"bps_in: " << pc.bps_in <<
", ";
209 os <<
"mps_in: " << pc.mps_in <<
", ";
210 os <<
"bps_out: " << pc.bps_out <<
", ";
211 os <<
"mps_out: " << pc.mps_out <<
", ";
212 os <<
"in_out_delay_us: " << pc.in_out_delay_us <<
", ";
213 os <<
"overflow_msgs: " << pc.overflow_msgs <<
", ";
214 os <<
"in_queue: " << pc.in_queue;
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.