32 #include "rapidjson/document.h"
34 #include "model/comm_drv_signalk_net.h"
35 #include "model/comm_navmsg_bus.h"
36 #include "model/comm_drv_registry.h"
37 #include "model/geodesic.h"
38 #include "model/sys_events.h"
39 #include "wxServDisc.h"
41 #include "observable.h"
43 #include "ixwebsocket/IXNetSystem.h"
44 #include "ixwebsocket/IXWebSocket.h"
45 #include "ixwebsocket/IXUserAgent.h"
46 #include "ixwebsocket/IXSocketTLSOptions.h"
48 const int kTimerSocket = 9006;
55 const wxString& PortName,
56 const wxString& strBaudRate);
60 bool SetOutMsg(
const wxString& msg);
64 void ThreadMessage(
const wxString& msg);
65 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
66 void CloseComPortPhysical();
67 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
68 size_t WriteComPortPhysical(
unsigned char* msg,
size_t length);
69 void SetGatewayOperationMode(
void);
73 wxString m_FullPortName;
75 unsigned char* put_ptr;
76 unsigned char* tak_ptr;
78 unsigned char* rx_buffer;
92 : wxEvent(
id, commandType){};
96 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
97 std::shared_ptr<std::string> GetPayload() {
return m_payload; }
100 wxEvent* Clone()
const {
102 newevent->m_payload = this->m_payload;
107 std::shared_ptr<std::string> m_payload;
115 wxEvtHandler* consumer,
const std::string& token);
116 virtual void* Entry();
119 void HandleMessage(
const std::string& message);
120 wxEvtHandler* s_wsSKConsumer;
121 wxIPV4address m_address;
122 wxEvtHandler* m_consumer;
130 wxIPV4address address,
131 wxEvtHandler* consumer,
132 const std::string& token)
133 : m_address(address),
134 m_consumer(consumer),
135 m_parentStream(parent),
137 resume_listener.
Init(SystemEvents::GetInstance().evt_resume,
141 wxLogDebug(
"WebSocketThread: restarted"); });
144 void* WebSocketThread::Entry() {
145 using namespace std::chrono_literals;
146 bool not_done =
true;
148 m_parentStream->SetThreadRunning(
true);
150 s_wsSKConsumer = m_consumer;
152 wxString host = m_address.IPAddress();
153 int port = m_address.Service();
156 std::stringstream wsAddress;
157 wsAddress <<
"ws://" << host <<
":" << port
158 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
159 std::stringstream wssAddress;
160 wssAddress <<
"wss://" << host <<
":" << port
161 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
163 if (!m_token.empty()) {
164 wsAddress <<
"&token=" << m_token;
165 wssAddress <<
"&token=" << m_token;
168 ws.setUrl(wssAddress.str());
169 ix::SocketTLSOptions opt;
170 opt.disable_hostname_validation =
true;
172 ws.setTLSOptions(opt);
173 ws.setPingInterval(30);
175 auto message_callback = [&](
const ix::WebSocketMessagePtr& msg) {
176 if (msg->type == ix::WebSocketMessageType::Message) {
177 HandleMessage(msg->str);
178 }
else if (msg->type == ix::WebSocketMessageType::Open) {
179 wxLogDebug(
"websocket: Connection established");
180 }
else if (msg->type == ix::WebSocketMessageType::Close) {
181 wxLogDebug(
"websocket: Connection disconnected");
182 }
else if (msg->type == ix::WebSocketMessageType::Error) {
183 wxLogDebug(
"websocket: error: %s", msg->errorInfo.reason.c_str());
184 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
185 : ws.setUrl(wsAddress.str());
190 ws.setOnMessageCallback(message_callback);
193 while (m_parentStream->m_Thread_run_flag > 0) {
194 std::this_thread::sleep_for(100ms);
198 m_parentStream->SetThreadRunning(
false);
199 m_parentStream->m_Thread_run_flag = -1;
204 void WebSocketThread::HandleMessage(
const std::string& message) {
205 if (s_wsSKConsumer) {
207 auto buffer = std::make_shared<std::string>(message);
209 signalKEvent.SetPayload(buffer);
210 s_wsSKConsumer->AddPendingEvent(signalKEvent);
223 m_Thread_run_flag(-1),
225 m_listener(listener) {
228 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
231 m_addr.Hostname(params->NetworkAddress);
232 m_addr.Service(params->NetworkPort);
233 m_token = params->AuthToken;
234 m_socketread_watchdog_timer.SetOwner(
this, kTimerSocket);
236 m_threadActive =
false;
241 CommDriverSignalKNet::~CommDriverSignalKNet() {
246 CommDriverRegistry::GetInstance().
Activate(shared_from_this());
249 void CommDriverSignalKNet::Open(
void) {
250 wxString discoveredIP;
257 std::string serviceIdent =
258 std::string(
"_signalk-ws._tcp.local.");
260 if (m_params->AutoSKDiscover) {
261 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
263 wxLogDebug(wxString::Format(
264 _T(
"SK server autodiscovery finds WebSocket service: %s:%d"),
265 discoveredIP.c_str(), discoveredPort));
266 m_addr.Hostname(discoveredIP);
267 m_addr.Service(discoveredPort);
272 params->NetworkAddress = discoveredIP;
273 params->NetworkPort = discoveredPort;
275 wxLogDebug(_T(
"SK server autodiscovery finds no WebSocket server."));
281 void CommDriverSignalKNet::Close() { CloseWebSocket(); }
283 bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
284 wxString& ip,
int& port,
int tSec) {
285 wxServDisc* servscan =
286 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
288 for (
int i = 0; i < 10; i++) {
289 if (servscan->getResultCount()) {
290 auto result = servscan->getResults().at(0);
293 wxServDisc* namescan =
new wxServDisc(0, result.name, QTYPE_SRV);
294 for (
int j = 0; j < 10; j++) {
295 if (namescan->getResultCount()) {
296 auto namescanResult = namescan->getResults().at(0);
297 port = namescanResult.port;
300 wxServDisc* addrscan =
301 new wxServDisc(0, namescanResult.name, QTYPE_A);
302 for (
int k = 0; k < 10; k++) {
303 if (addrscan->getResultCount()) {
304 auto addrscanResult = addrscan->getResults().at(0);
305 ip = addrscanResult.ip;
311 wxMilliSleep(1000*tSec/10);
318 wxMilliSleep(1000*tSec/10);
325 wxMilliSleep(1000*tSec/10);
333 void CommDriverSignalKNet::OpenWebSocket() {
335 wxLogMessage(wxString::Format(_T(
"Opening Signal K WebSocket client: %s"),
336 m_params.GetDSPort().c_str()));
341 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
342 wxLogError(wxT(
"Can't create WebSocketThread!"));
348 GetSocketThreadWatchdogTimer()->Start(1000,
355 void CommDriverSignalKNet::CloseWebSocket() {
357 if (IsThreadRunning()) {
358 wxLogMessage(_T(
"Stopping Secondary SignalK Thread"));
360 m_Thread_run_flag = 0;
362 while (IsThreadRunning() && tsec) {
368 if (m_Thread_run_flag <= 0)
369 msg.Printf(_T(
"Stopped in %d sec."), 10 - tsec);
371 msg.Printf(_T(
"Not Stopped after 10 sec."));
378 m_thread_run_flag = 0;
379 printf(
"sending delete\n");
380 m_wsThread->Delete();
384 while (IsThreadRunning() && (++nDeadman < 200)) {
387 printf(
"Closed in %d\n", nDeadman);
393 void CommDriverSignalKNet::handle_SK_sentence(
395 rapidjson::Document root;
399 std::string* msg =
event.GetPayload().get();
400 std::string msgTerminated = *msg;
401 msgTerminated.append(
"\r\n");
404 if (root.HasParseError()) {
405 wxLogMessage(wxString::Format(
406 _T(
"SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
407 root.GetParseError()));
413 if (root.HasMember(
"version")) {
414 wxString msg = _T(
"Connected to Signal K server version: ");
415 msg << (root[
"version"].GetString());
419 if (root.HasMember(
"self")) {
420 if (strncmp(root[
"self"].GetString(),
"vessels.", 8) == 0)
421 m_self = (root[
"self"].GetString());
424 m_self = std::string(
"vessels.")
425 .append(root[
"self"].GetString());
428 if (root.HasMember(
"context") && root[
"context"].IsString()) {
429 m_context = root[
"context"].GetString();
433 auto pos =
iface.find(
":");
434 std::string comm_interface =
"";
435 if (pos != std::string::npos)
436 comm_interface =
iface.substr(pos + 1);
438 std::make_shared<const SignalkMsg>(m_self, m_context, msgTerminated, comm_interface);
439 m_listener.
Notify(std::move(navmsg));
442 void CommDriverSignalKNet::initIXNetSystem() {
446 void CommDriverSignalKNet::uninitIXNetSystem() {
447 ix::uninitNetSystem();
451 void CommDriverSignalKNet::handleUpdate(wxJSONValue &update) {
452 wxString sfixtime =
"";
454 if (update.HasMember(
"timestamp")) {
455 sfixtime = update[
"timestamp"].AsString();
457 if (update.HasMember(
"values") && update[
"values"].IsArray()) {
458 for (
int j = 0; j < update[
"values"].Size(); ++j) {
459 wxJSONValue &item = update[
"values"][j];
460 updateItem(item, sfixtime);
465 void CommDriverSignalKNet::updateItem(wxJSONValue &item,
466 wxString &sfixtime) {
467 if (item.HasMember(
"path") && item.HasMember(
"value")) {
468 const wxString &update_path = item[
"path"].AsString();
469 wxJSONValue &value = item[
"value"];
471 if (update_path == _T(
"navigation.position") && !value.IsNull()) {
472 updateNavigationPosition(value, sfixtime);
473 }
else if (update_path == _T(
"navigation.speedOverGround") &&
474 m_bGPSValid_SK && !value.IsNull()) {
475 updateNavigationSpeedOverGround(value, sfixtime);
476 }
else if (update_path == _T(
"navigation.courseOverGroundTrue") &&
477 m_bGPSValid_SK && !value.IsNull()) {
478 updateNavigationCourseOverGround(value, sfixtime);
479 }
else if (update_path == _T(
"navigation.courseOverGroundMagnetic")) {
481 else if (update_path ==
482 _T(
"navigation.gnss.satellites"))
484 updateGnssSatellites(value, sfixtime);
485 }
else if (update_path ==
486 _T(
"navigation.gnss.satellitesInView"))
488 updateGnssSatellites(value, sfixtime);
489 }
else if (update_path == _T(
"navigation.headingTrue")) {
491 updateHeadingTrue(value, sfixtime);
492 }
else if (update_path == _T(
"navigation.headingMagnetic")) {
494 updateHeadingMagnetic(value, sfixtime);
495 }
else if (update_path == _T(
"navigation.magneticVariation")) {
497 updateMagneticVariance(value, sfixtime);
504 writer.Write(item, dbg);
505 wxString msg( _T(
"update: ") );
513 void CommDriverSignalKNet::updateNavigationPosition(
514 wxJSONValue &value,
const wxString &sfixtime) {
515 if ((value.HasMember(
"latitude" && value[
"latitude"].IsDouble())) &&
516 (value.HasMember(
"longitude") && value[
"longitude"].IsDouble())) {
518 m_lat = value[
"latitude"].AsDouble();
519 m_lon = value[
"longitude"].AsDouble();
520 m_bGPSValid_SK =
true;
522 m_bGPSValid_SK =
false;
527 void CommDriverSignalKNet::updateNavigationSpeedOverGround(
528 wxJSONValue &value,
const wxString &sfixtime){
529 double sog_ms = value.AsDouble();
530 double sog_knot = sog_ms * ms_to_knot_factor;
535 void CommDriverSignalKNet::updateNavigationCourseOverGround(
536 wxJSONValue &value,
const wxString &sfixtime) {
537 double cog_rad = value.AsDouble();
538 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
543 void CommDriverSignalKNet::updateGnssSatellites(wxJSONValue &value,
544 const wxString &sfixtime) {
547 if (value.AsInt() > 0) {
548 m_frame->setSatelitesInView(value.AsInt());
551 }
else if ((value.HasMember(
"count") && value[
"count"].IsInt())) {
552 m_frame->setSatelitesInView(value[
"count"].AsInt());
558 void CommDriverSignalKNet::updateHeadingTrue(wxJSONValue &value,
559 const wxString &sfixtime) {
560 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
563 void CommDriverSignalKNet::updateHeadingMagnetic(
564 wxJSONValue &value,
const wxString &sfixtime) {
565 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
568 void CommDriverSignalKNet::updateMagneticVariance(
569 wxJSONValue &value,
const wxString &sfixtime) {
570 m_var = GEODESIC_RAD2DEG(value.AsDouble());
const std::string iface
Physical device for 0183, else a unique string.
void Activate(DriverPtr driver)
Add driver to list of active drivers.
void Activate() override
Register driver and possibly do other post-ctor steps.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Adds a std::shared<void> element to wxCommandEvent.
wxDEFINE_EVENT(REST_IO_EVT, ObservedEvt)
Event from IO thread to main.