OpenCPN Partial API docs
comm_drv_n0183_serial.cpp
Go to the documentation of this file.
1 /**************************************************************************
2  * Copyright (C) 2022 by David Register, Alec Leamas *
3  * *
4  * This program is free software; you can redistribute it and/or modify *
5  * it under the terms of the GNU General Public License as published by *
6  * the Free Software Foundation; either version 2 of the License, or *
7  * (at your option) any later version. *
8  * *
9  * This program is distributed in the hope that it will be useful, *
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
12  * GNU General Public License for more details. *
13  * *
14  * You should have received a copy of the GNU General Public License *
15  * along with this program; if not, write to the *
16  * Free Software Foundation, Inc., *
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
18  **************************************************************************/
19 
22 // For compilers that support precompilation, includes "wx.h".
23 #include <wx/wxprec.h>
24 
25 #ifndef WX_PRECOMP
26 #include <wx/wx.h>
27 #endif // precompiled headers
28 
29 #include <mutex> // std::mutex
30 #include <queue> // std::queue
31 #include <thread>
32 #include <vector>
33 
34 #include <wx/event.h>
35 #include <wx/log.h>
36 #include <wx/string.h>
37 #include <wx/utils.h>
38 
39 #include "config.h"
41 #include "model/comm_navmsg_bus.h"
42 #include "model/comm_drv_registry.h"
43 #include "model/logger.h"
44 #include "model/sys_events.h"
45 #include "model/wait_continue.h"
46 #include "observable.h"
47 
48 #ifndef __ANDROID__
49 #include "serial/serial.h"
50 #endif
51 
52 #ifdef __ANDROID__
53 #include "androidUTIL.h"
54 #endif
55 
56 typedef enum DS_ENUM_BUFFER_STATE {
57  DS_RX_BUFFER_EMPTY,
58  DS_RX_BUFFER_FULL
59 } _DS_ENUM_BUFFER_STATE;
60 
61 class CommDriverN0183Serial; // fwd
62 
63 #define MAX_OUT_QUEUE_MESSAGE_LENGTH 100
64 
65 template <typename T>
66 class n0183_atomic_queue {
67 public:
68  size_t size() {
69  std::lock_guard<std::mutex> lock(m_mutex);
70  return m_queque.size();
71  }
72 
73  bool empty() {
74  std::lock_guard<std::mutex> lock(m_mutex);
75  return m_queque.empty();
76  }
77 
78  const T& front() {
79  std::lock_guard<std::mutex> lock(m_mutex);
80  return m_queque.front();
81  }
82 
83  void push(const T& value) {
84  std::lock_guard<std::mutex> lock(m_mutex);
85  m_queque.push(value);
86  }
87 
88  void pop() {
89  std::lock_guard<std::mutex> lock(m_mutex);
90  m_queque.pop();
91  }
92 
93 private:
94  std::queue<T> m_queque;
95  mutable std::mutex m_mutex;
96 };
97 
98 #define OUT_QUEUE_LENGTH 20
99 #define MAX_OUT_QUEUE_MESSAGE_LENGTH 100
100 
101 wxDEFINE_EVENT(wxEVT_COMMDRIVER_N0183_SERIAL, CommDriverN0183SerialEvent);
102 
103 CommDriverN0183SerialEvent::CommDriverN0183SerialEvent(wxEventType commandType,
104  int id = 0)
105  : wxEvent(id, commandType){};
106 
107 CommDriverN0183SerialEvent::~CommDriverN0183SerialEvent(){};
108 
109 void CommDriverN0183SerialEvent::SetPayload(
110  std::shared_ptr<std::vector<unsigned char>> data) {
111  m_payload = data;
112 }
113 std::shared_ptr<std::vector<unsigned char>>
114 CommDriverN0183SerialEvent::GetPayload() {
115  return m_payload;
116 }
117 
118 // required for sending with wxPostEvent()
119 wxEvent* CommDriverN0183SerialEvent::Clone() const {
121  newevent->m_payload = this->m_payload;
122  return newevent;
123 };
124 
125 #ifndef __ANDROID__
127 public:
129  const wxString& PortName,
130  const wxString& strBaudRate);
131 
133  void* Entry();
134  bool SetOutMsg(const wxString& msg);
135  void OnExit(void);
136 
137 private:
138  serial::Serial m_serial;
139  void ThreadMessage(const wxString& msg);
140  bool OpenComPortPhysical(const wxString& com_name, int baud_rate);
141  void CloseComPortPhysical();
142  ssize_t WriteComPortPhysical(const std::string& msg);
143 
144  CommDriverN0183Serial* m_parent_driver;
145  wxString m_port_name;
146  wxString m_full_port_name;
147 
148  int m_baud;
149  size_t m_send_retries;
150 
152  WaitContinue device_waiter;
153  ObsListener resume_listener;
154  ObsListener new_device_listener;
155 };
156 #endif
157 
158 template <class T>
159 class circular_buffer {
160 public:
161  explicit circular_buffer(size_t size)
162  : buf_(std::unique_ptr<T[]>(new T[size])), max_size_(size) {}
163 
164  void reset();
165  size_t capacity() const;
166  size_t size() const;
167 
168  bool empty() const {
169  // if head and tail are equal, we are empty
170  return (!full_ && (head_ == tail_));
171  }
172 
173  bool full() const {
174  // If tail is ahead the head by 1, we are full
175  return full_;
176  }
177 
178  void put(T item) {
179  std::lock_guard<std::mutex> lock(mutex_);
180  buf_[head_] = item;
181  if (full_) tail_ = (tail_ + 1) % max_size_;
182 
183  head_ = (head_ + 1) % max_size_;
184 
185  full_ = head_ == tail_;
186  }
187 
188  T get() {
189  std::lock_guard<std::mutex> lock(mutex_);
190 
191  if (empty()) return T();
192 
193  // Read data and advance the tail (we now have a free space)
194  auto val = buf_[tail_];
195  full_ = false;
196  tail_ = (tail_ + 1) % max_size_;
197 
198  return val;
199  }
200 
201 private:
202  std::mutex mutex_;
203  std::unique_ptr<T[]> buf_;
204  size_t head_ = 0;
205  size_t tail_ = 0;
206  const size_t max_size_;
207  bool full_ = 0;
208 };
209 
210 CommDriverN0183Serial::CommDriverN0183Serial(const ConnectionParams* params,
211  DriverListener& listener)
212  : CommDriverN0183(NavAddr::Bus::N0183,
213  ((ConnectionParams*)params)->GetStrippedDSPort()),
214  m_Thread_run_flag(-1),
215  m_ok(false),
216  m_portstring(params->GetDSPort()),
217  m_secondary_thread(NULL),
218  m_params(*params),
219  m_listener(listener),
220  m_out_queue(std::unique_ptr<CommOutQueue>(new MeasuredCommOutQueue(12))) {
221  m_baudrate = wxString::Format("%i", params->Baudrate);
222  SetSecThreadInActive();
223  m_garmin_handler = NULL;
224  this->attributes["commPort"] = params->Port.ToStdString();
225  this->attributes["userComment"] = params->UserComment.ToStdString();
226  dsPortType iosel = params->IOSelect;
227  std::string s_iosel = std::string("IN");
228  if (iosel == DS_TYPE_INPUT_OUTPUT) {s_iosel = "OUT";}
229  else if (iosel == DS_TYPE_INPUT_OUTPUT) {s_iosel = "IN/OUT";}
230  this->attributes["ioDirection"] = s_iosel;
231 
232  // Prepare the wxEventHandler to accept events from the actual hardware thread
233  Bind(wxEVT_COMMDRIVER_N0183_SERIAL, &CommDriverN0183Serial::handle_N0183_MSG,
234  this);
235 
236  Open();
237 }
238 
239 CommDriverN0183Serial::~CommDriverN0183Serial() { Close(); }
240 
241 bool CommDriverN0183Serial::Open() {
242  wxString comx;
243  comx = m_params.GetDSPort().AfterFirst(':'); // strip "Serial:"
244  if (comx.IsEmpty()) return false;
245 
246  wxString port_uc = m_params.GetDSPort().Upper();
247 
248  if ((wxNOT_FOUND != port_uc.Find("USB")) &&
249  (wxNOT_FOUND != port_uc.Find("GARMIN"))) {
250  m_garmin_handler = new GarminProtocolHandler(comx, this, true);
251  } else if (m_params.Garmin) {
252  m_garmin_handler = new GarminProtocolHandler(comx, this, false);
253  } else {
254  // strip off any description provided by Windows
255  comx = comx.BeforeFirst(' ');
256 
257 #ifndef __ANDROID__
258  // Kick off the RX thread
259  SetSecondaryThread(new CommDriverN0183SerialThread(this, comx, m_baudrate));
260  SetThreadRunFlag(1);
261  std::thread t(&CommDriverN0183SerialThread::Entry, GetSecondaryThread());
262  t.detach();
263 #else
264  androidStartUSBSerial(comx, m_baudrate, this);
265 #endif
266  }
267 
268  return true;
269 }
270 
271 void CommDriverN0183Serial::Close() {
272  wxLogMessage(
273  wxString::Format("Closing NMEA Driver %s", m_portstring.c_str()));
274 
275  // FIXME (dave)
276  // If port is opened, and then closed immediately,
277  // the secondary thread may not stop quickly enough.
278  // It can then crash trying to send an event to its "parent".
279 
280  Unbind(wxEVT_COMMDRIVER_N0183_SERIAL,
281  &CommDriverN0183Serial::handle_N0183_MSG, this);
282 
283 #ifndef __ANDROID__
284  // Kill off the Secondary RX Thread if alive
285  if (m_secondary_thread) {
286  if (m_sec_thread_active) // Try to be sure thread object is still alive
287  {
288  wxLogMessage("Stopping Secondary Thread");
289 
290  m_Thread_run_flag = 0;
291 
292  int tsec = 10;
293  while ((m_Thread_run_flag >= 0) && (tsec--)) wxSleep(1);
294 
295  wxString msg;
296  if (m_Thread_run_flag < 0)
297  msg.Printf("Stopped in %d sec.", 10 - tsec);
298  else
299  msg.Printf("Not Stopped after 10 sec.");
300  wxLogMessage(msg);
301  }
302 
303  delete m_secondary_thread;
304  m_secondary_thread = NULL;
305  m_sec_thread_active = false;
306  }
307 
308  // Kill off the Garmin handler, if alive
309  if (m_garmin_handler) {
310  m_garmin_handler->Close();
311  delete m_garmin_handler;
312  m_garmin_handler = NULL;
313  }
314 
315 #else
316  wxString comx;
317  comx = m_params.GetDSPort().AfterFirst(':'); // strip "Serial:"
318  androidStopUSBSerial(comx);
319 #endif
320 }
321 
322 bool CommDriverN0183Serial::IsGarminThreadActive() {
323  if (m_garmin_handler) {
324  // TODO expand for serial
325 #ifdef __WXMSW__
326  if (m_garmin_handler->m_usb_handle != INVALID_HANDLE_VALUE)
327  return true;
328  else
329  return false;
330 #endif
331  }
332 
333  return false;
334 }
335 
336 void CommDriverN0183Serial::StopGarminUSBIOThread(bool b_pause) {
337  if (m_garmin_handler) {
338  m_garmin_handler->StopIOThread(b_pause);
339  }
340 }
341 
343  CommDriverRegistry::GetInstance().Activate(shared_from_this());
344  // TODO: Read input data.
345 }
346 
347 bool CommDriverN0183Serial::SendMessage(std::shared_ptr<const NavMsg> msg,
348  std::shared_ptr<const NavAddr> addr) {
349  auto msg_0183 = std::dynamic_pointer_cast<const Nmea0183Msg>(msg);
350  wxString sentence(msg_0183->payload.c_str());
351 
352 #ifdef __ANDROID__
353  wxString payload = sentence;
354  if (!sentence.EndsWith("\r\n")) payload += "\r\n";
355 
356  wxString port = m_params.GetStrippedDSPort(); // GetPort().AfterFirst(':');
357  androidWriteSerial(port, payload);
358  return true;
359 #else
360  if (GetSecondaryThread()) {
361  if (IsSecThreadActive()) {
362  int retry = 10;
363  while (retry) {
364  if (GetSecondaryThread()->SetOutMsg(sentence))
365  return true;
366  else
367  retry--;
368  }
369  return false; // could not send after several tries....
370  } else
371  return false;
372  }
373  return true;
374 #endif
375 }
376 
377 void CommDriverN0183Serial::handle_N0183_MSG(
379  // Is this an output-only port?
380  // Commonly used for "Send to GPS" function
381  if (m_params.IOSelect == DS_TYPE_OUTPUT) return;
382 
383  auto p = event.GetPayload();
384  std::vector<unsigned char>* payload = p.get();
385 
386  // Extract the NMEA0183 sentence
387  std::string full_sentence = std::string(payload->begin(), payload->end());
388 
389  if ((full_sentence[0] == '$') || (full_sentence[0] == '!')) { // Sanity check
390  std::string identifier;
391  // We notify based on full message, including the Talker ID
392  identifier = full_sentence.substr(1, 5);
393 
394  // notify message listener and also "ALL" N0183 messages, to support plugin
395  // API using original talker id
396  auto msg = std::make_shared<const Nmea0183Msg>(identifier, full_sentence,
397  GetAddress());
398  auto msg_all = std::make_shared<const Nmea0183Msg>(*msg, "ALL");
399 
400  if (m_params.SentencePassesFilter(full_sentence, FILTER_INPUT))
401  m_listener.Notify(std::move(msg));
402 
403  m_listener.Notify(std::move(msg_all));
404  }
405 }
406 
407 #ifndef __ANDROID__
408 #define DS_RX_BUFFER_SIZE 4096
409 
410 CommDriverN0183SerialThread::CommDriverN0183SerialThread(
411  CommDriverN0183Serial* Launcher, const wxString& PortName,
412  const wxString& strBaudRate) {
413  m_parent_driver = Launcher; // This thread's immediate "parent"
414 
415  m_port_name = PortName;
416  m_full_port_name = "Serial:" + PortName;
417 
418  m_baud = 4800; // default
419  long lbaud;
420  if (strBaudRate.ToLong(&lbaud)) m_baud = (int)lbaud;
421  resume_listener.Init(SystemEvents::GetInstance().evt_resume,
422  [&](ObservedEvt&) { device_waiter.Continue(); });
423  new_device_listener.Init(SystemEvents::GetInstance().evt_dev_change,
424  [&](ObservedEvt&) { device_waiter.Continue(); });
425 }
426 
427 CommDriverN0183SerialThread::~CommDriverN0183SerialThread(void) {}
428 
429 void CommDriverN0183SerialThread::OnExit(void) {}
430 
431 bool CommDriverN0183SerialThread::OpenComPortPhysical(const wxString& com_name,
432  int baud_rate) {
433  try {
434  m_serial.setPort(com_name.ToStdString());
435  m_serial.setBaudrate(baud_rate);
436  m_serial.open();
437  m_serial.setTimeout(250, 250, 0, 250, 0);
438  } catch (std::exception&) {
439  // std::cerr << "Unhandled Exception while opening serial port: " <<
440  // e.what() << std::endl;
441  }
442  return m_serial.isOpen();
443 }
444 
445 void CommDriverN0183SerialThread::CloseComPortPhysical() {
446  try {
447  m_serial.close();
448  } catch (std::exception&) {
449  // std::cerr << "Unhandled Exception while closing serial port: " <<
450  // e.what() << std::endl;
451  }
452 }
453 
454 bool CommDriverN0183SerialThread::SetOutMsg(const wxString& msg) {
455  if (out_que.size() < OUT_QUEUE_LENGTH) {
456  wxCharBuffer buf = msg.ToUTF8();
457  if (buf.data()) {
458  char* qmsg = (char*)malloc(strlen(buf.data()) + 1);
459  strcpy(qmsg, buf.data());
460  out_que.push(qmsg);
461  return true;
462  }
463  }
464 
465  return false;
466 }
467 
468 void CommDriverN0183SerialThread::ThreadMessage(const wxString& msg) {
469  // Signal the main program thread
470  // OCPN_ThreadMessageEvent event(wxEVT_OCPN_THREADMSG, 0);
471  // event.SetSString(std::string(msg.mb_str()));
472  // if (gFrame) gFrame->GetEventHandler()->AddPendingEvent(event);
473 }
474 
475 ssize_t CommDriverN0183SerialThread::WriteComPortPhysical(const std::string& msg) {
476  if (!m_serial.isOpen()) return -1;
477 
478  auto ptr = reinterpret_cast<const uint8_t*>(msg.c_str());
479  size_t written = 0;
480  int tries = 0;
481  while (written < msg.size()) {
482  int chunk_size;
483  try {
484  chunk_size = m_serial.write(ptr, msg.size() - written);
485  } catch (std::exception& e) {
486  MESSAGE_LOG << "Exception while writing to serial port: " << e.what();
487  return -1;
488  }
489  if (chunk_size < 0) return chunk_size;
490  written += chunk_size;
491  ptr += chunk_size;
492  if (tries++ > 20) {
493  MESSAGE_LOG << "Error writing data (output stalled?)";
494  return -1;
495  }
496  if (written < msg.size()) std::this_thread::sleep_for(10ms);
497  // FIXME (leamas) really?
498  }
499  return written;
500 }
501 
502 void* CommDriverN0183SerialThread::Entry() {
503  bool not_done = true;
504  m_parent_driver->SetSecThreadActive(); // I am alive
505  wxString msg;
506  circular_buffer<uint8_t> circle(DS_RX_BUFFER_SIZE);
507  std::vector<uint8_t> tmp_vec;
508 
509  // Request the com port from the comm manager
510  if (!OpenComPortPhysical(m_port_name, m_baud)) {
511  wxString msg("NMEA input device open failed: ");
512  msg.Append(m_port_name);
513  ThreadMessage(msg);
514  // goto thread_exit; // This means we will not be trying to connect = The
515  // device must be connected when the thread is created. Does not seem to be
516  // needed/what we want as the reconnection logic is able to pick it up
517  // whenever it actually appears (Of course given it appears with the
518  // expected device name).
519  }
520 
521  // The main loop
522  m_send_retries = 0;
523 
524  while ((not_done) && (m_parent_driver->m_Thread_run_flag > 0)) {
525  if (m_parent_driver->m_Thread_run_flag == 0) goto thread_exit;
526 
527  uint8_t next_byte = 0;
528  unsigned int newdata = 0;
529  uint8_t rdata[2000];
530 
531  if (m_serial.isOpen()) {
532  try {
533  newdata = m_serial.read(rdata, 200);
534  } catch (std::exception&) {
535  // std::cerr << "Serial read exception: " << e.what() <<
536  // std::endl;
537  if (10 < m_send_retries++) {
538  // We timed out waiting for the next character 10 times, let's close
539  // the port so that the reconnection logic kicks in and tries to fix
540  // our connection.
541  CloseComPortPhysical();
542  m_send_retries = 0;
543  }
544  }
545  } else {
546  // Reconnection logic. Let's try to reopen the port while waiting longer
547  // every time (until we simply keep trying every 2.5 seconds)
548  // std::cerr << "Serial port seems closed." << std::endl;
549  device_waiter.Wait(250 * m_send_retries);
550  CloseComPortPhysical();
551  if (OpenComPortPhysical(m_port_name, m_baud))
552  m_send_retries = 0;
553  else if (m_send_retries < 10)
554  m_send_retries++;
555  }
556 
557  if (newdata > 0) {
558  for (unsigned int i = 0; i < newdata; i++) circle.put(rdata[i]);
559  }
560 
561  // Process the queue until empty
562  while (!circle.empty()) {
563  if (m_parent_driver->m_Thread_run_flag == 0) goto thread_exit;
564 
565  uint8_t take_byte = circle.get();
566  while ((take_byte != 0x0a) && !circle.empty()) {
567  tmp_vec.push_back(take_byte);
568  take_byte = circle.get();
569  }
570 
571  if (circle.empty() && take_byte != 0x0a) {
572  tmp_vec.push_back(take_byte);
573  break;
574  }
575 
576  if (take_byte == 0x0a) {
577  tmp_vec.push_back(take_byte);
578 
579  // Copy the message into a vector for transmittal upstream
580  auto buffer = std::make_shared<std::vector<unsigned char>>();
581  std::vector<unsigned char>* vec = buffer.get();
582 
583  for (size_t i = 0; i < tmp_vec.size(); i++)
584  vec->push_back(tmp_vec.at(i));
585 
586  // Message is ready to parse and send out
587  // Messages may be coming in as <blah blah><lf><cr>.
588  // One example device is KVH1000 heading sensor.
589  // If that happens, the first character of a new captured message
590  // will the <cr>, and we need to discard it. This is out of spec,
591  // but we should handle it anyway
592  if (vec->at(0) == '\r') vec->erase(vec->begin());
593 
594  CommDriverN0183SerialEvent Nevent(wxEVT_COMMDRIVER_N0183_SERIAL, 0);
595  Nevent.SetPayload(buffer);
596  m_parent_driver->AddPendingEvent(Nevent);
597  tmp_vec.clear();
598  }
599  } // while
600 
601  // Check for any pending output message
602 
603  bool b_qdata = !out_que.empty();
604 
605  while (b_qdata) {
606  // Take a copy of message
607  char* qmsg = out_que.front();
608  out_que.pop();
609  // m_outCritical.Leave();
610  char msg[MAX_OUT_QUEUE_MESSAGE_LENGTH];
611  strncpy(msg, qmsg, MAX_OUT_QUEUE_MESSAGE_LENGTH - 1);
612  free(qmsg);
613 
614  if (-1 == WriteComPortPhysical(msg) && 10 < m_send_retries++) {
615  // We failed to write the port 10 times, let's close the port so that
616  // the reconnection logic kicks in and tries to fix our connection.
617  m_send_retries = 0;
618  CloseComPortPhysical();
619  }
620 
621  b_qdata = !out_que.empty();
622  } // while b_qdata
623  } // while not done.
624 
625 thread_exit:
626  CloseComPortPhysical();
627  m_parent_driver->SetSecThreadInActive(); // I am dead
628  m_parent_driver->m_Thread_run_flag = -1;
629 
630  return 0;
631 }
632 #endif //__ANDROID__
void Activate() override
Register driver and possibly do other post-ctor steps.
void Activate(DriverPtr driver)
Add driver to list of active drivers.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
Interface implemented by transport layer and possible other parties like test code which should handl...
Definition: comm_driver.h:47
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Add unit test measurements to CommOutQueue.
Where messages are sent to or received from.
Definition: comm_navmsg.h:133
Define an action to be performed when a KeyProvider is notified.
Definition: observable.h:210
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Definition: observable.h:227
Adds a std::shared<void> element to wxCommandEvent.
Definition: ocpn_plugin.h:1652
void Continue()
Release any threads blocked by Wait().
Definition: wait_continue.h:42
bool Wait(std::chrono::milliseconds timeout=0s)
Blocking wait for next Continue() with optional timeout.
Definition: wait_continue.h:51
NMEA0183 serial driver.
wxDEFINE_EVENT(REST_IO_EVT, ObservedEvt)
Event from IO thread to main.
Basic synchronization primitive.