OpenCPN Partial API docs
comm_drv_signalk_net.cpp
1 /***************************************************************************
2  *
3  * Project: OpenCPN
4  * Purpose:
5  * Author: David Register, Alec Leamas
6  *
7  ***************************************************************************
8  * Copyright (C) 2022 by David Register, Alec Leamas *
9  * *
10  * This program is free software; you can redistribute it and/or modify *
11  * it under the terms of the GNU General Public License as published by *
12  * the Free Software Foundation; either version 2 of the License, or *
13  * (at your option) any later version. *
14  * *
15  * This program is distributed in the hope that it will be useful, *
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18  * GNU General Public License for more details. *
19  * *
20  * You should have received a copy of the GNU General Public License *
21  * along with this program; if not, write to the *
22  * Free Software Foundation, Inc., *
23  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
24  **************************************************************************/
25 
26 #include <vector>
27 #include <mutex> // std::mutex
28 #include <queue> // std::queue
29 #include <chrono>
30 #include <thread>
31 
32 #include "rapidjson/document.h"
33 
34 #include "model/comm_drv_signalk_net.h"
35 #include "model/comm_navmsg_bus.h"
36 #include "model/comm_drv_registry.h"
37 #include "model/geodesic.h"
38 #include "model/sys_events.h"
39 #include "wxServDisc.h"
40 
41 #include "observable.h"
42 
43 #include "ixwebsocket/IXNetSystem.h"
44 #include "ixwebsocket/IXWebSocket.h"
45 #include "ixwebsocket/IXUserAgent.h"
46 #include "ixwebsocket/IXSocketTLSOptions.h"
47 
48 const int kTimerSocket = 9006;
49 
50 class CommDriverSignalKNetEvent; // fwd
51 
52 class CommDriverSignalKNetThread : public wxThread {
53 public:
55  const wxString& PortName,
56  const wxString& strBaudRate);
57 
59  void* Entry();
60  bool SetOutMsg(const wxString& msg);
61  void OnExit(void);
62 
63 private:
64  void ThreadMessage(const wxString& msg);
65  bool OpenComPortPhysical(const wxString& com_name, int baud_rate);
66  void CloseComPortPhysical();
67  size_t WriteComPortPhysical(std::vector<unsigned char> msg);
68  size_t WriteComPortPhysical(unsigned char* msg, size_t length);
69  void SetGatewayOperationMode(void);
70 
71  CommDriverSignalKNet* m_pParentDriver;
72  wxString m_PortName;
73  wxString m_FullPortName;
74 
75  unsigned char* put_ptr;
76  unsigned char* tak_ptr;
77 
78  unsigned char* rx_buffer;
79 
80  int m_baud;
81  int m_n_timeout;
82 
83  // n2k_atomic_queue<char*> out_que;
84 };
85 
87 wxDECLARE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
88 
89 class CommDriverSignalKNetEvent : public wxEvent {
90 public:
91  CommDriverSignalKNetEvent(wxEventType commandType = wxEVT_NULL, int id = 0)
92  : wxEvent(id, commandType){};
94 
95  // accessors
96  void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
97  std::shared_ptr<std::string> GetPayload() { return m_payload; }
98 
99  // required for sending with wxPostEvent()
100  wxEvent* Clone() const {
102  newevent->m_payload = this->m_payload;
103  return newevent;
104  };
105 
106 private:
107  std::shared_ptr<std::string> m_payload;
108 };
109 
110 // WebSocket implementation
111 
112 class WebSocketThread : public wxThread {
113 public:
114  WebSocketThread(CommDriverSignalKNet* parent, wxIPV4address address,
115  wxEvtHandler* consumer, const std::string& token);
116  virtual void* Entry();
117 
118 private:
119  void HandleMessage(const std::string& message);
120  wxEvtHandler* s_wsSKConsumer;
121  wxIPV4address m_address;
122  wxEvtHandler* m_consumer;
123  CommDriverSignalKNet* m_parentStream;
124  std::string m_token;
125  ix::WebSocket ws;
126  ObsListener resume_listener;
127 };
128 
129 WebSocketThread::WebSocketThread(CommDriverSignalKNet* parent,
130  wxIPV4address address,
131  wxEvtHandler* consumer,
132  const std::string& token)
133  : m_address(address),
134  m_consumer(consumer),
135  m_parentStream(parent),
136  m_token(token) {
137  resume_listener.Init(SystemEvents::GetInstance().evt_resume,
138  [&](ObservedEvt& ev) {
139  ws.stop();
140  ws.start();
141  wxLogDebug("WebSocketThread: restarted"); });
142 }
143 
144 void* WebSocketThread::Entry() {
145  using namespace std::chrono_literals;
146  bool not_done = true;
147 
148  m_parentStream->SetThreadRunning(true);
149 
150  s_wsSKConsumer = m_consumer;
151 
152  wxString host = m_address.IPAddress();
153  int port = m_address.Service();
154 
155  // Craft the address string
156  std::stringstream wsAddress;
157  wsAddress << "ws://" << host << ":" << port
158  << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
159  std::stringstream wssAddress;
160  wssAddress << "wss://" << host << ":" << port
161  << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
162 
163  if (!m_token.empty()) {
164  wsAddress << "&token=" << m_token;
165  wssAddress << "&token=" << m_token;
166  }
167 
168  ws.setUrl(wssAddress.str());
169  ix::SocketTLSOptions opt;
170  opt.disable_hostname_validation = true;
171  opt.caFile = "NONE";
172  ws.setTLSOptions(opt);
173  ws.setPingInterval(30);
174 
175  auto message_callback = [&](const ix::WebSocketMessagePtr& msg) {
176  if (msg->type == ix::WebSocketMessageType::Message) {
177  HandleMessage(msg->str);
178  } else if (msg->type == ix::WebSocketMessageType::Open) {
179  wxLogDebug("websocket: Connection established");
180  } else if (msg->type == ix::WebSocketMessageType::Close) {
181  wxLogDebug("websocket: Connection disconnected");
182  } else if (msg->type == ix::WebSocketMessageType::Error) {
183  wxLogDebug("websocket: error: %s", msg->errorInfo.reason.c_str());
184  ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
185  : ws.setUrl(wsAddress.str());
186  }
187  };
188 
189 
190  ws.setOnMessageCallback(message_callback);
191  ws.start();
192 
193  while (m_parentStream->m_Thread_run_flag > 0) {
194  std::this_thread::sleep_for(100ms);
195  }
196 
197  ws.stop();
198  m_parentStream->SetThreadRunning(false);
199  m_parentStream->m_Thread_run_flag = -1;
200 
201  return 0;
202 }
203 
204 void WebSocketThread::HandleMessage(const std::string& message) {
205  if (s_wsSKConsumer) {
206  CommDriverSignalKNetEvent signalKEvent(wxEVT_COMMDRIVER_SIGNALK_NET, 0);
207  auto buffer = std::make_shared<std::string>(message);
208 
209  signalKEvent.SetPayload(buffer);
210  s_wsSKConsumer->AddPendingEvent(signalKEvent);
211  }
212 }
213 
214 //========================================================================
215 /* CommDriverSignalKNet implementation
216  * */
217 
218 wxDEFINE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
219 
220 CommDriverSignalKNet::CommDriverSignalKNet(const ConnectionParams* params,
221  DriverListener& listener)
222  : CommDriverSignalK(((ConnectionParams*)params)->GetStrippedDSPort()),
223  m_Thread_run_flag(-1),
224  m_params(*params),
225  m_listener(listener) {
226 
227  // Prepare the wxEventHandler to accept events from the actual hardware thread
228  Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
229  this);
230 
231  m_addr.Hostname(params->NetworkAddress);
232  m_addr.Service(params->NetworkPort);
233  m_token = params->AuthToken;
234  m_socketread_watchdog_timer.SetOwner(this, kTimerSocket);
235  m_wsThread = NULL;
236  m_threadActive = false;
237 
238  Open();
239 }
240 
241 CommDriverSignalKNet::~CommDriverSignalKNet() {
242  Close();
243 }
244 
246  CommDriverRegistry::GetInstance().Activate(shared_from_this());
247 }
248 
249 void CommDriverSignalKNet::Open(void) {
250  wxString discoveredIP;
251 #if 0
252  int discoveredPort;
253 #endif
254 
255  // if (m_useWebSocket)
256  {
257  std::string serviceIdent =
258  std::string("_signalk-ws._tcp.local."); // Works for node.js server
259 #if 0
260  if (m_params->AutoSKDiscover) {
261  if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
262  1)) { // 1 second scan
263  wxLogDebug(wxString::Format(
264  _T("SK server autodiscovery finds WebSocket service: %s:%d"),
265  discoveredIP.c_str(), discoveredPort));
266  m_addr.Hostname(discoveredIP);
267  m_addr.Service(discoveredPort);
268 
269  // Update the connection params, by pointer to item in global params
270  // array
271  ConnectionParams *params = (ConnectionParams *)m_params; // non-const
272  params->NetworkAddress = discoveredIP;
273  params->NetworkPort = discoveredPort;
274  } else
275  wxLogDebug(_T("SK server autodiscovery finds no WebSocket server."));
276  }
277 #endif
278  OpenWebSocket();
279  }
280 }
281 void CommDriverSignalKNet::Close() { CloseWebSocket(); }
282 
283 bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
284  wxString& ip, int& port, int tSec) {
285  wxServDisc* servscan =
286  new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
287 
288  for (int i = 0; i < 10; i++) {
289  if (servscan->getResultCount()) {
290  auto result = servscan->getResults().at(0);
291  delete servscan;
292 
293  wxServDisc* namescan = new wxServDisc(0, result.name, QTYPE_SRV);
294  for (int j = 0; j < 10; j++) {
295  if (namescan->getResultCount()) {
296  auto namescanResult = namescan->getResults().at(0);
297  port = namescanResult.port;
298  delete namescan;
299 
300  wxServDisc* addrscan =
301  new wxServDisc(0, namescanResult.name, QTYPE_A);
302  for (int k = 0; k < 10; k++) {
303  if (addrscan->getResultCount()) {
304  auto addrscanResult = addrscan->getResults().at(0);
305  ip = addrscanResult.ip;
306  delete addrscan;
307  return true;
308  break;
309  } else {
310  wxYield();
311  wxMilliSleep(1000*tSec/10);
312  }
313  }
314  delete addrscan;
315  return false;
316  } else {
317  wxYield();
318  wxMilliSleep(1000*tSec/10);
319  }
320  }
321  delete namescan;
322  return false;
323  } else {
324  wxYield();
325  wxMilliSleep(1000*tSec/10);
326  }
327  }
328 
329  delete servscan;
330  return false;
331 }
332 
333 void CommDriverSignalKNet::OpenWebSocket() {
334  // printf("OpenWebSocket\n");
335  wxLogMessage(wxString::Format(_T("Opening Signal K WebSocket client: %s"),
336  m_params.GetDSPort().c_str()));
337 
338  // Start a thread to run the client without blocking
339 
340  m_wsThread = new WebSocketThread(this, GetAddr(), this, m_token);
341  if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
342  wxLogError(wxT("Can't create WebSocketThread!"));
343 
344  return;
345  }
346 
347  ResetWatchdog();
348  GetSocketThreadWatchdogTimer()->Start(1000,
349  wxTIMER_ONE_SHOT); // Start the dog
350  SetThreadRunFlag(1);
351 
352  m_wsThread->Run();
353 }
354 
355 void CommDriverSignalKNet::CloseWebSocket() {
356  if (m_wsThread) {
357  if (IsThreadRunning()) {
358  wxLogMessage(_T("Stopping Secondary SignalK Thread"));
359 
360  m_Thread_run_flag = 0;
361  int tsec = 10;
362  while (IsThreadRunning() && tsec) {
363  wxSleep(1);
364  tsec--;
365  }
366 
367  wxString msg;
368  if (m_Thread_run_flag <= 0)
369  msg.Printf(_T("Stopped in %d sec."), 10 - tsec);
370  else
371  msg.Printf(_T("Not Stopped after 10 sec."));
372  wxLogMessage(msg);
373  }
374 
375  wxMilliSleep(100);
376 
377 #if 0
378  m_thread_run_flag = 0;
379  printf("sending delete\n");
380  m_wsThread->Delete();
381  wxMilliSleep(100);
382 
383  int nDeadman = 0;
384  while (IsThreadRunning() && (++nDeadman < 200)) { // spin for max 2 secs.
385  wxMilliSleep(10);
386  }
387  printf("Closed in %d\n", nDeadman);
388  wxMilliSleep(100);
389 #endif
390  }
391 }
392 
393 void CommDriverSignalKNet::handle_SK_sentence(
394  CommDriverSignalKNetEvent& event) {
395  rapidjson::Document root;
396 
397  // LOG_DEBUG("%s\n", msg.c_str());
398 
399  std::string* msg = event.GetPayload().get();
400  std::string msgTerminated = *msg;
401  msgTerminated.append("\r\n");
402 
403  root.Parse(*msg);
404  if (root.HasParseError()) {
405  wxLogMessage(wxString::Format(
406  _T("SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
407  root.GetParseError()));
408  return;
409  }
410 
411  // Decode just enough of string to extract some identifiers
412  // such as the sK version, "self" context, and target context
413  if (root.HasMember("version")) {
414  wxString msg = _T("Connected to Signal K server version: ");
415  msg << (root["version"].GetString());
416  wxLogMessage(msg);
417  }
418 
419  if (root.HasMember("self")) {
420  if (strncmp(root["self"].GetString(), "vessels.", 8) == 0)
421  m_self = (root["self"].GetString()); // for java server, and OpenPlotter
422  // node.js server 1.20
423  else
424  m_self = std::string("vessels.")
425  .append(root["self"].GetString()); // for Node.js server
426  }
427 
428  if (root.HasMember("context") && root["context"].IsString()) {
429  m_context = root["context"].GetString();
430  }
431 
432  // Notify all listeners
433  auto pos = iface.find(":");
434  std::string comm_interface = "";
435  if (pos != std::string::npos)
436  comm_interface = iface.substr(pos + 1);
437  auto navmsg =
438  std::make_shared<const SignalkMsg>(m_self, m_context, msgTerminated, comm_interface);
439  m_listener.Notify(std::move(navmsg));
440 }
441 
442  void CommDriverSignalKNet::initIXNetSystem() {
443  ix::initNetSystem();
444  };
445 
446  void CommDriverSignalKNet::uninitIXNetSystem() {
447  ix::uninitNetSystem();
448  };
449 
450 #if 0
451 void CommDriverSignalKNet::handleUpdate(wxJSONValue &update) {
452  wxString sfixtime = "";
453 
454  if (update.HasMember("timestamp")) {
455  sfixtime = update["timestamp"].AsString();
456  }
457  if (update.HasMember("values") && update["values"].IsArray()) {
458  for (int j = 0; j < update["values"].Size(); ++j) {
459  wxJSONValue &item = update["values"][j];
460  updateItem(item, sfixtime);
461  }
462  }
463 }
464 
465 void CommDriverSignalKNet::updateItem(wxJSONValue &item,
466  wxString &sfixtime) {
467  if (item.HasMember("path") && item.HasMember("value")) {
468  const wxString &update_path = item["path"].AsString();
469  wxJSONValue &value = item["value"];
470 
471  if (update_path == _T("navigation.position") && !value.IsNull()) {
472  updateNavigationPosition(value, sfixtime);
473  } else if (update_path == _T("navigation.speedOverGround") &&
474  m_bGPSValid_SK && !value.IsNull()) {
475  updateNavigationSpeedOverGround(value, sfixtime);
476  } else if (update_path == _T("navigation.courseOverGroundTrue") &&
477  m_bGPSValid_SK && !value.IsNull()) {
478  updateNavigationCourseOverGround(value, sfixtime);
479  } else if (update_path == _T("navigation.courseOverGroundMagnetic")) {
480  }
481  else if (update_path ==
482  _T("navigation.gnss.satellites")) // From GGA sats in use
483  {
484  /*if (g_priSats >= 2)*/ updateGnssSatellites(value, sfixtime);
485  } else if (update_path ==
486  _T("navigation.gnss.satellitesInView")) // From GSV sats in view
487  {
488  /*if (g_priSats >= 3)*/ updateGnssSatellites(value, sfixtime);
489  } else if (update_path == _T("navigation.headingTrue")) {
490  if(!value.IsNull())
491  updateHeadingTrue(value, sfixtime);
492  } else if (update_path == _T("navigation.headingMagnetic")) {
493  if(!value.IsNull())
494  updateHeadingMagnetic(value, sfixtime);
495  } else if (update_path == _T("navigation.magneticVariation")) {
496  if(!value.IsNull())
497  updateMagneticVariance(value, sfixtime);
498  } else {
499  // wxLogMessage(wxString::Format(_T("** Signal K unhandled update: %s"),
500  // update_path));
501 #if 0
502  wxString dbg;
503  wxJSONWriter writer;
504  writer.Write(item, dbg);
505  wxString msg( _T("update: ") );
506  msg.append(dbg);
507  wxLogMessage(msg);
508 #endif
509  }
510  }
511 }
512 
513 void CommDriverSignalKNet::updateNavigationPosition(
514  wxJSONValue &value, const wxString &sfixtime) {
515  if ((value.HasMember("latitude" && value["latitude"].IsDouble())) &&
516  (value.HasMember("longitude") && value["longitude"].IsDouble())) {
517  // wxLogMessage(_T(" ***** Position Update"));
518  m_lat = value["latitude"].AsDouble();
519  m_lon = value["longitude"].AsDouble();
520  m_bGPSValid_SK = true;
521  } else {
522  m_bGPSValid_SK = false;
523  }
524 }
525 
526 
527 void CommDriverSignalKNet::updateNavigationSpeedOverGround(
528  wxJSONValue &value, const wxString &sfixtime){
529  double sog_ms = value.AsDouble();
530  double sog_knot = sog_ms * ms_to_knot_factor;
531  // wxLogMessage(wxString::Format(_T(" ***** SOG: %f, %f"), sog_ms, sog_knot));
532  m_sog = sog_knot;
533 }
534 
535 void CommDriverSignalKNet::updateNavigationCourseOverGround(
536  wxJSONValue &value, const wxString &sfixtime) {
537  double cog_rad = value.AsDouble();
538  double cog_deg = GEODESIC_RAD2DEG(cog_rad);
539  // wxLogMessage(wxString::Format(_T(" ***** COG: %f, %f"), cog_rad, cog_deg));
540  m_cog = cog_deg;
541 }
542 
543 void CommDriverSignalKNet::updateGnssSatellites(wxJSONValue &value,
544  const wxString &sfixtime) {
545 #if 0
546  if (value.IsInt()) {
547  if (value.AsInt() > 0) {
548  m_frame->setSatelitesInView(value.AsInt());
549  g_priSats = 2;
550  }
551  } else if ((value.HasMember("count") && value["count"].IsInt())) {
552  m_frame->setSatelitesInView(value["count"].AsInt());
553  g_priSats = 3;
554  }
555 #endif
556 }
557 
558 void CommDriverSignalKNet::updateHeadingTrue(wxJSONValue &value,
559  const wxString &sfixtime) {
560  m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
561 }
562 
563 void CommDriverSignalKNet::updateHeadingMagnetic(
564  wxJSONValue &value, const wxString &sfixtime) {
565  m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
566 }
567 
568 void CommDriverSignalKNet::updateMagneticVariance(
569  wxJSONValue &value, const wxString &sfixtime) {
570  m_var = GEODESIC_RAD2DEG(value.AsDouble());
571 }
572 
573 #endif
575 
576 // std::vector<unsigned char>* payload = p.get();
577 //
578 // // Extract the NMEA0183 sentence
579 // std::string full_sentence = std::string(payload->begin(), payload->end());
const std::string iface
Physical device for 0183, else a unique string.
Definition: comm_driver.h:89
void Activate(DriverPtr driver)
Add driver to list of active drivers.
void Activate() override
Register driver and possibly do other post-ctor steps.
Interface implemented by transport layer and possible other parties like test code which should handl...
Definition: comm_driver.h:47
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
Definition: observable.h:210
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Definition: observable.h:227
Adds a std::shared<void> element to wxCommandEvent.
Definition: ocpn_plugin.h:1652
wxDEFINE_EVENT(REST_IO_EVT, ObservedEvt)
Event from IO thread to main.