23 #include <wx/wxprec.h>
36 #include <wx/string.h>
41 #include "model/comm_navmsg_bus.h"
42 #include "model/comm_drv_registry.h"
43 #include "model/logger.h"
44 #include "model/sys_events.h"
46 #include "observable.h"
49 #include "serial/serial.h"
53 #include "androidUTIL.h"
56 typedef enum DS_ENUM_BUFFER_STATE {
59 } _DS_ENUM_BUFFER_STATE;
63 #define MAX_OUT_QUEUE_MESSAGE_LENGTH 100
69 std::lock_guard<std::mutex> lock(m_mutex);
70 return m_queque.size();
74 std::lock_guard<std::mutex> lock(m_mutex);
75 return m_queque.empty();
79 std::lock_guard<std::mutex> lock(m_mutex);
80 return m_queque.front();
83 void push(
const T& value) {
84 std::lock_guard<std::mutex> lock(m_mutex);
89 std::lock_guard<std::mutex> lock(m_mutex);
94 std::queue<T> m_queque;
95 mutable std::mutex m_mutex;
98 #define OUT_QUEUE_LENGTH 20
99 #define MAX_OUT_QUEUE_MESSAGE_LENGTH 100
103 CommDriverN0183SerialEvent::CommDriverN0183SerialEvent(wxEventType commandType,
105 : wxEvent(id, commandType){};
107 CommDriverN0183SerialEvent::~CommDriverN0183SerialEvent(){};
109 void CommDriverN0183SerialEvent::SetPayload(
110 std::shared_ptr<std::vector<unsigned char>> data) {
113 std::shared_ptr<std::vector<unsigned char>>
114 CommDriverN0183SerialEvent::GetPayload() {
119 wxEvent* CommDriverN0183SerialEvent::Clone()
const {
121 newevent->m_payload = this->m_payload;
129 const wxString& PortName,
130 const wxString& strBaudRate);
134 bool SetOutMsg(
const wxString& msg);
138 serial::Serial m_serial;
139 void ThreadMessage(
const wxString& msg);
140 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
141 void CloseComPortPhysical();
142 ssize_t WriteComPortPhysical(
const std::string& msg);
145 wxString m_port_name;
146 wxString m_full_port_name;
149 size_t m_send_retries;
162 : buf_(std::unique_ptr<T[]>(new T[size])), max_size_(size) {}
165 size_t capacity()
const;
170 return (!full_ && (head_ == tail_));
179 std::lock_guard<std::mutex> lock(mutex_);
181 if (full_) tail_ = (tail_ + 1) % max_size_;
183 head_ = (head_ + 1) % max_size_;
185 full_ = head_ == tail_;
189 std::lock_guard<std::mutex> lock(mutex_);
191 if (empty())
return T();
194 auto val = buf_[tail_];
196 tail_ = (tail_ + 1) % max_size_;
203 std::unique_ptr<T[]> buf_;
206 const size_t max_size_;
210 CommDriverN0183Serial::CommDriverN0183Serial(
const ConnectionParams* params,
214 m_Thread_run_flag(-1),
216 m_portstring(params->GetDSPort()),
217 m_secondary_thread(NULL),
219 m_listener(listener),
221 m_baudrate = wxString::Format(
"%i", params->Baudrate);
222 SetSecThreadInActive();
223 m_garmin_handler = NULL;
224 this->attributes[
"commPort"] = params->Port.ToStdString();
225 this->attributes[
"userComment"] = params->UserComment.ToStdString();
226 dsPortType iosel = params->IOSelect;
227 std::string s_iosel = std::string(
"IN");
228 if (iosel == DS_TYPE_INPUT_OUTPUT) {s_iosel =
"OUT";}
229 else if (iosel == DS_TYPE_INPUT_OUTPUT) {s_iosel =
"IN/OUT";}
230 this->attributes[
"ioDirection"] = s_iosel;
233 Bind(wxEVT_COMMDRIVER_N0183_SERIAL, &CommDriverN0183Serial::handle_N0183_MSG,
239 CommDriverN0183Serial::~CommDriverN0183Serial() { Close(); }
241 bool CommDriverN0183Serial::Open() {
243 comx = m_params.GetDSPort().AfterFirst(
':');
244 if (comx.IsEmpty())
return false;
246 wxString port_uc = m_params.GetDSPort().Upper();
248 if ((wxNOT_FOUND != port_uc.Find(
"USB")) &&
249 (wxNOT_FOUND != port_uc.Find(
"GARMIN"))) {
251 }
else if (m_params.Garmin) {
255 comx = comx.BeforeFirst(
' ');
261 std::thread t(&CommDriverN0183SerialThread::Entry, GetSecondaryThread());
264 androidStartUSBSerial(comx, m_baudrate,
this);
271 void CommDriverN0183Serial::Close() {
273 wxString::Format(
"Closing NMEA Driver %s", m_portstring.c_str()));
280 Unbind(wxEVT_COMMDRIVER_N0183_SERIAL,
281 &CommDriverN0183Serial::handle_N0183_MSG,
this);
285 if (m_secondary_thread) {
286 if (m_sec_thread_active)
288 wxLogMessage(
"Stopping Secondary Thread");
290 m_Thread_run_flag = 0;
293 while ((m_Thread_run_flag >= 0) && (tsec--)) wxSleep(1);
296 if (m_Thread_run_flag < 0)
297 msg.Printf(
"Stopped in %d sec.", 10 - tsec);
299 msg.Printf(
"Not Stopped after 10 sec.");
303 delete m_secondary_thread;
304 m_secondary_thread = NULL;
305 m_sec_thread_active =
false;
309 if (m_garmin_handler) {
310 m_garmin_handler->Close();
311 delete m_garmin_handler;
312 m_garmin_handler = NULL;
317 comx = m_params.GetDSPort().AfterFirst(
':');
318 androidStopUSBSerial(comx);
322 bool CommDriverN0183Serial::IsGarminThreadActive() {
323 if (m_garmin_handler) {
326 if (m_garmin_handler->m_usb_handle != INVALID_HANDLE_VALUE)
336 void CommDriverN0183Serial::StopGarminUSBIOThread(
bool b_pause) {
337 if (m_garmin_handler) {
338 m_garmin_handler->StopIOThread(b_pause);
343 CommDriverRegistry::GetInstance().
Activate(shared_from_this());
347 bool CommDriverN0183Serial::SendMessage(std::shared_ptr<const NavMsg> msg,
348 std::shared_ptr<const NavAddr> addr) {
349 auto msg_0183 = std::dynamic_pointer_cast<const Nmea0183Msg>(msg);
350 wxString sentence(msg_0183->payload.c_str());
353 wxString payload = sentence;
354 if (!sentence.EndsWith(
"\r\n")) payload +=
"\r\n";
356 wxString port = m_params.GetStrippedDSPort();
357 androidWriteSerial(port, payload);
360 if (GetSecondaryThread()) {
361 if (IsSecThreadActive()) {
364 if (GetSecondaryThread()->SetOutMsg(sentence))
377 void CommDriverN0183Serial::handle_N0183_MSG(
381 if (m_params.IOSelect == DS_TYPE_OUTPUT)
return;
383 auto p =
event.GetPayload();
384 std::vector<unsigned char>* payload = p.get();
387 std::string full_sentence = std::string(payload->begin(), payload->end());
389 if ((full_sentence[0] ==
'$') || (full_sentence[0] ==
'!')) {
390 std::string identifier;
392 identifier = full_sentence.substr(1, 5);
396 auto msg = std::make_shared<const Nmea0183Msg>(identifier, full_sentence,
398 auto msg_all = std::make_shared<const Nmea0183Msg>(*msg,
"ALL");
400 if (m_params.SentencePassesFilter(full_sentence, FILTER_INPUT))
401 m_listener.
Notify(std::move(msg));
403 m_listener.
Notify(std::move(msg_all));
408 #define DS_RX_BUFFER_SIZE 4096
410 CommDriverN0183SerialThread::CommDriverN0183SerialThread(
412 const wxString& strBaudRate) {
413 m_parent_driver = Launcher;
415 m_port_name = PortName;
416 m_full_port_name =
"Serial:" + PortName;
420 if (strBaudRate.ToLong(&lbaud)) m_baud = (int)lbaud;
421 resume_listener.
Init(SystemEvents::GetInstance().evt_resume,
423 new_device_listener.
Init(SystemEvents::GetInstance().evt_dev_change,
427 CommDriverN0183SerialThread::~CommDriverN0183SerialThread(
void) {}
429 void CommDriverN0183SerialThread::OnExit(
void) {}
431 bool CommDriverN0183SerialThread::OpenComPortPhysical(
const wxString& com_name,
434 m_serial.setPort(com_name.ToStdString());
435 m_serial.setBaudrate(baud_rate);
437 m_serial.setTimeout(250, 250, 0, 250, 0);
438 }
catch (std::exception&) {
442 return m_serial.isOpen();
445 void CommDriverN0183SerialThread::CloseComPortPhysical() {
448 }
catch (std::exception&) {
454 bool CommDriverN0183SerialThread::SetOutMsg(
const wxString& msg) {
455 if (out_que.size() < OUT_QUEUE_LENGTH) {
456 wxCharBuffer buf = msg.ToUTF8();
458 char* qmsg = (
char*)malloc(strlen(buf.data()) + 1);
459 strcpy(qmsg, buf.data());
468 void CommDriverN0183SerialThread::ThreadMessage(
const wxString& msg) {
475 ssize_t CommDriverN0183SerialThread::WriteComPortPhysical(
const std::string& msg) {
476 if (!m_serial.isOpen())
return -1;
478 auto ptr =
reinterpret_cast<const uint8_t*
>(msg.c_str());
481 while (written < msg.size()) {
484 chunk_size = m_serial.write(ptr, msg.size() - written);
485 }
catch (std::exception& e) {
486 MESSAGE_LOG <<
"Exception while writing to serial port: " << e.what();
489 if (chunk_size < 0)
return chunk_size;
490 written += chunk_size;
493 MESSAGE_LOG <<
"Error writing data (output stalled?)";
496 if (written < msg.size()) std::this_thread::sleep_for(10ms);
502 void* CommDriverN0183SerialThread::Entry() {
503 bool not_done =
true;
504 m_parent_driver->SetSecThreadActive();
507 std::vector<uint8_t> tmp_vec;
510 if (!OpenComPortPhysical(m_port_name, m_baud)) {
511 wxString msg(
"NMEA input device open failed: ");
512 msg.Append(m_port_name);
524 while ((not_done) && (m_parent_driver->m_Thread_run_flag > 0)) {
525 if (m_parent_driver->m_Thread_run_flag == 0)
goto thread_exit;
527 uint8_t next_byte = 0;
528 unsigned int newdata = 0;
531 if (m_serial.isOpen()) {
533 newdata = m_serial.read(rdata, 200);
534 }
catch (std::exception&) {
537 if (10 < m_send_retries++) {
541 CloseComPortPhysical();
549 device_waiter.
Wait(250 * m_send_retries);
550 CloseComPortPhysical();
551 if (OpenComPortPhysical(m_port_name, m_baud))
553 else if (m_send_retries < 10)
558 for (
unsigned int i = 0; i < newdata; i++) circle.put(rdata[i]);
562 while (!circle.empty()) {
563 if (m_parent_driver->m_Thread_run_flag == 0)
goto thread_exit;
565 uint8_t take_byte = circle.get();
566 while ((take_byte != 0x0a) && !circle.empty()) {
567 tmp_vec.push_back(take_byte);
568 take_byte = circle.get();
571 if (circle.empty() && take_byte != 0x0a) {
572 tmp_vec.push_back(take_byte);
576 if (take_byte == 0x0a) {
577 tmp_vec.push_back(take_byte);
580 auto buffer = std::make_shared<std::vector<unsigned char>>();
581 std::vector<unsigned char>* vec = buffer.get();
583 for (
size_t i = 0; i < tmp_vec.size(); i++)
584 vec->push_back(tmp_vec.at(i));
592 if (vec->at(0) ==
'\r') vec->erase(vec->begin());
595 Nevent.SetPayload(buffer);
596 m_parent_driver->AddPendingEvent(Nevent);
603 bool b_qdata = !out_que.empty();
607 char* qmsg = out_que.front();
610 char msg[MAX_OUT_QUEUE_MESSAGE_LENGTH];
611 strncpy(msg, qmsg, MAX_OUT_QUEUE_MESSAGE_LENGTH - 1);
614 if (-1 == WriteComPortPhysical(msg) && 10 < m_send_retries++) {
618 CloseComPortPhysical();
621 b_qdata = !out_que.empty();
626 CloseComPortPhysical();
627 m_parent_driver->SetSecThreadInActive();
628 m_parent_driver->m_Thread_run_flag = -1;
void Activate() override
Register driver and possibly do other post-ctor steps.
void Activate(DriverPtr driver)
Add driver to list of active drivers.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Add unit test measurements to CommOutQueue.
Where messages are sent to or received from.
Define an action to be performed when a KeyProvider is notified.
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Adds a std::shared<void> element to wxCommandEvent.
void Continue()
Release any threads blocked by Wait().
bool Wait(std::chrono::milliseconds timeout=0s)
Blocking wait for next Continue() with optional timeout.
wxDEFINE_EVENT(REST_IO_EVT, ObservedEvt)
Event from IO thread to main.
Basic synchronization primitive.