|
|
@@ -0,0 +1,218 @@
|
|
|
+// package web_socket -- реализация высокоуровнего веб-сокета для работы десктопа
|
|
|
+package web_socket
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ "net/url"
|
|
|
+ "os"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/gorilla/websocket"
|
|
|
+
|
|
|
+ "wartank/pkg/components/safebool"
|
|
|
+ "wartank/pkg/types"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ strWebSocket = "web_socket"
|
|
|
+ TypeMsgBin = 2
|
|
|
+)
|
|
|
+
|
|
|
+// WebSocket -- реализация высокоуровнего веб-сокета для работы десктопа
|
|
|
+type WebSocket struct {
|
|
|
+ kern types.IKernel
|
|
|
+ slog types.ISlog
|
|
|
+ url string
|
|
|
+ isConnect *safebool.SafeBool
|
|
|
+ ws *websocket.Conn
|
|
|
+ block sync.RWMutex
|
|
|
+}
|
|
|
+
|
|
|
+// NewWebSocket -- возвращает новый веб-сокет
|
|
|
+func NewWebSocket(kern types.IKernel) (*WebSocket, error) {
|
|
|
+ log.Println("NewWebSocket()")
|
|
|
+ if kern == nil {
|
|
|
+ return nil, fmt.Errorf("NewWebSocket(): IKernel == nil")
|
|
|
+ }
|
|
|
+ url := os.Getenv("SERVER_URL")
|
|
|
+ if url == "" {
|
|
|
+ return nil, fmt.Errorf("NewWebSocket(): env SERVER_URL not set")
|
|
|
+ }
|
|
|
+ sf := &WebSocket{
|
|
|
+ kern: kern,
|
|
|
+ slog: kern.Slog(),
|
|
|
+ url: url,
|
|
|
+ isConnect: safebool.NewSafeBool(),
|
|
|
+ }
|
|
|
+
|
|
|
+ sf.connect()
|
|
|
+ go sf.close()
|
|
|
+ return sf, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Подключает веб-сокет к серверу
|
|
|
+func (sf *WebSocket) connect() {
|
|
|
+ log.Println("WebSocket.connect()")
|
|
|
+ fnConnect := func() {
|
|
|
+ u := url.URL{Scheme: "ws", Host: sf.url, Path: "/api/ws"}
|
|
|
+ strUrl := u.String()
|
|
|
+ log.Printf("WebSocket.connect(): wait connect to %q\n", strUrl)
|
|
|
+ var err error
|
|
|
+ sf.ws, _, err = websocket.DefaultDialer.Dial(strUrl, nil)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("WebSocket.connect(): in dial, err=\n\t%v\n", err)
|
|
|
+ time.Sleep(time.Second * 2)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ sf.isConnect.Set()
|
|
|
+ log.Println("WebSocket.connect(): ok")
|
|
|
+ }
|
|
|
+ for !sf.isConnect.Get() {
|
|
|
+ select {
|
|
|
+ case <-sf.kern.CtxApp().Done():
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ fnConnect()
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Read -- потокобезопасное чтение топика сервера
|
|
|
+func (sf *WebSocket) Read(topic string) (map[string]string, error) {
|
|
|
+ sf.block.Lock()
|
|
|
+ defer sf.block.Unlock()
|
|
|
+ dictResp, err := sf.read(topic)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("WebSocket.Read(): in read, err=\n\t%w", err)
|
|
|
+ }
|
|
|
+ return dictResp, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Скрытая потоко-небезопасна функция
|
|
|
+func (sf *WebSocket) read(topic string) (dictResp map[string]string, err error) {
|
|
|
+ var binResp []byte
|
|
|
+ for {
|
|
|
+ dictReq := make(map[string]string)
|
|
|
+ dictReq["topic"] = topic
|
|
|
+ binReq, err := json.Marshal(dictReq)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("WebSocket.read(): in marshall topic(%q), err=\n\t%w", topic, err)
|
|
|
+ }
|
|
|
+ err = sf.ws.WriteMessage(TypeMsgBin, binReq)
|
|
|
+ if err != nil {
|
|
|
+ sf.slog.Errorf("WebSocket.read(): in write msg, err=\n\t%v\n", err)
|
|
|
+ sf.ws.Close()
|
|
|
+ sf.isConnect.Reset()
|
|
|
+ sf.connect()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ _, binResp, err = sf.ws.ReadMessage()
|
|
|
+ if err != nil {
|
|
|
+ sf.slog.Errorf("WebSocket.read(): in read msg, err=\n\t%v\n", err)
|
|
|
+ sf.ws.Close()
|
|
|
+ sf.isConnect.Reset()
|
|
|
+ sf.connect()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ dictResp = make(map[string]string)
|
|
|
+ err = json.Unmarshal(binResp, &dictResp)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("WebSocket.read(): in unmarshal binResp, err=\n\t%w", err)
|
|
|
+ }
|
|
|
+ return dictResp, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Write -- потокобезопасная запись топика
|
|
|
+func (sf *WebSocket) Write(topic string, dictReq map[string]string) error {
|
|
|
+ sf.block.Lock()
|
|
|
+ defer sf.block.Unlock()
|
|
|
+ err := sf.write(topic, dictReq)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("WebSocket.Write(): in write, err=\n\t%w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// Скрытая потоко-небезопасна функция
|
|
|
+func (sf *WebSocket) write(topic string, dictReq map[string]string) error {
|
|
|
+ dictReq["topic"] = topic
|
|
|
+ binData, err := json.Marshal(dictReq)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("WebSocket.write(): in marshal msg, err=\n\t%w", err)
|
|
|
+ }
|
|
|
+ for {
|
|
|
+ err = sf.ws.WriteMessage(TypeMsgBin, binData)
|
|
|
+ if err != nil {
|
|
|
+ sf.slog.Errorf("WebSocket.write(): in write msg, err=\n\t%v\n", err)
|
|
|
+ sf.isConnect.Reset()
|
|
|
+ sf.ws.Close()
|
|
|
+ sf.connect()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// IsConnect -- потокобезопасный признак подключенности сервера
|
|
|
+func (sf *WebSocket) IsConnect() bool {
|
|
|
+ sf.block.RLock()
|
|
|
+ defer sf.block.RUnlock()
|
|
|
+ return sf.isConnect.Get()
|
|
|
+}
|
|
|
+
|
|
|
+// Call -- потокобезопасный вызов удалённого топика
|
|
|
+func (sf *WebSocket) Call(topic string, dictReq map[string]string) (dictResp map[string]string, err error) {
|
|
|
+ sf.block.Lock()
|
|
|
+ defer sf.block.Unlock()
|
|
|
+ var binResp []byte
|
|
|
+ for {
|
|
|
+ dictReq["topic"] = topic
|
|
|
+ binReq, err := json.Marshal(dictReq)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("WebSocket.Call(): in marshall topic(%q), err=\n\t%w", topic, err)
|
|
|
+ }
|
|
|
+ err = sf.ws.WriteMessage(TypeMsgBin, binReq)
|
|
|
+ if err != nil {
|
|
|
+ sf.slog.Errorf("WebSocket.Call(): in write msg, err=\n\t%v\n", err)
|
|
|
+ sf.ws.Close()
|
|
|
+ sf.isConnect.Reset()
|
|
|
+ sf.connect()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ _, binResp, err = sf.ws.ReadMessage()
|
|
|
+ if err != nil {
|
|
|
+ sf.slog.Errorf("WebSocket.Call(): in read msg, err=\n\t%v\n", err)
|
|
|
+ sf.ws.Close()
|
|
|
+ sf.isConnect.Reset()
|
|
|
+ sf.connect()
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ dictResp = make(map[string]string)
|
|
|
+ err = json.Unmarshal(binResp, &dictResp)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("WebSocket.Call(): in unmarshal binResp, err=\n\t%w", err)
|
|
|
+ }
|
|
|
+ return dictResp, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Потокобезопасное ожидание закрытия в отдельном потоке
|
|
|
+func (sf *WebSocket) close() {
|
|
|
+ <-sf.kern.Done()
|
|
|
+ sf.block.Lock()
|
|
|
+ defer sf.block.Unlock()
|
|
|
+ if !sf.isConnect.Get() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ sf.isConnect.Reset()
|
|
|
+ sf.ws.Close()
|
|
|
+}
|