From e558c549d7435586ccdc18ebee7e4c30c39d6963 Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Mon, 21 Dec 2020 19:42:47 -0800 Subject: [PATCH] Websocket fixes (#461) * Bump api spec version * Do not panic on cpu monitoring error * Centralize the socket disconnect logic and fire it also when socket errors occur. Hopefully closes #421 --- core/chat/client.go | 17 +++++++++++------ core/chat/server.go | 28 +++++++++++++++++++++++----- metrics/hardware.go | 5 ++++- test/websocketTest.yaml | 6 +++--- 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/core/chat/client.go b/core/chat/client.go index 1ce80b04d..654ffd762 100644 --- a/core/chat/client.go +++ b/core/chat/client.go @@ -72,7 +72,7 @@ func (c *Client) Write(msg models.ChatMessage) { select { case c.ch <- msg: default: - _server.remove(c) + _server.removeClient(c) _server.err(fmt.Errorf("client %s is disconnected", c.ClientID)) } } @@ -96,28 +96,33 @@ func (c *Client) listenWrite() { case msg := <-c.pingch: err := websocket.JSON.Send(c.ws, msg) if err != nil { - log.Errorln(err) + c.handleClientSocketError(err) } // send message to the client case msg := <-c.ch: err := websocket.JSON.Send(c.ws, msg) if err != nil { - log.Errorln(err) + c.handleClientSocketError(err) } case msg := <-c.usernameChangeChannel: err := websocket.JSON.Send(c.ws, msg) if err != nil { - log.Errorln(err) + c.handleClientSocketError(err) } // receive done request case <-c.doneCh: - _server.remove(c) + _server.removeClient(c) c.doneCh <- true // for listenRead method return } } } +func (c *Client) handleClientSocketError(err error) { + log.Errorln("Websocket client error: ", err.Error()) + _server.removeClient(c) +} + // Listen read request via channel. func (c *Client) listenRead() { for { @@ -136,7 +141,7 @@ func (c *Client) listenRead() { if err == io.EOF { c.doneCh <- true } else { - log.Errorln(err) + c.handleClientSocketError(err) } return } diff --git a/core/chat/server.go b/core/chat/server.go index 943987a4f..eec9b0000 100644 --- a/core/chat/server.go +++ b/core/chat/server.go @@ -3,6 +3,7 @@ package chat import ( "fmt" "net/http" + "sync" "time" log "github.com/sirupsen/logrus" @@ -16,6 +17,8 @@ var ( _server *server ) +var l = sync.Mutex{} + // Server represents the server which handles the chat. type server struct { Clients map[string]*Client @@ -74,7 +77,7 @@ func (s *server) onConnection(ws *websocket.Conn) { client := NewClient(ws) defer func() { - log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(client.ConnectedAt), client.MessageCount, client.ClientID) + s.removeClient(client) if err := ws.Close(); err != nil { s.errCh <- err @@ -96,18 +99,20 @@ func (s *server) Listen() { select { // add new a client case c := <-s.addCh: + l.Lock() s.Clients[c.socketID] = c + l.Unlock() + s.listener.ClientAdded(c.GetViewerClientFromChatClient()) s.sendWelcomeMessageToClient(c) // remove a client case c := <-s.delCh: - delete(s.Clients, c.socketID) - s.listener.ClientRemoved(c.socketID) - + s.removeClient(c) + case msg := <-s.sendAllCh: // message was received from a client and should be sanitized, validated // and distributed to other clients. - case msg := <-s.sendAllCh: + // // Will turn markdown into html, sanitize user-supplied raw html // and standardize this message into something safe we can send everyone else. msg.RenderAndSanitizeMessageBody() @@ -129,6 +134,19 @@ func (s *server) Listen() { } } +func (s *server) removeClient(c *Client) { + l.Lock() + + if _, ok := s.Clients[c.socketID]; ok { + delete(s.Clients, c.socketID) + + s.listener.ClientRemoved(c.socketID) + + log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(c.ConnectedAt), c.MessageCount, c.ClientID) + } + l.Unlock() +} + func (s *server) sendWelcomeMessageToClient(c *Client) { go func() { // Add an artificial delay so people notice this message come in. diff --git a/metrics/hardware.go b/metrics/hardware.go index 92ee9a175..a51decbc4 100644 --- a/metrics/hardware.go +++ b/metrics/hardware.go @@ -6,6 +6,8 @@ import ( "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/mem" + + log "github.com/sirupsen/logrus" ) // Max number of metrics we want to keep. @@ -18,7 +20,8 @@ func collectCPUUtilization() { v, err := cpu.Percent(0, false) if err != nil { - panic(err) + log.Errorln(err) + return } metricValue := timestampedValue{time.Now(), int(v[0])} diff --git a/test/websocketTest.yaml b/test/websocketTest.yaml index 4c462d6fb..e5e674cea 100644 --- a/test/websocketTest.yaml +++ b/test/websocketTest.yaml @@ -7,8 +7,8 @@ config: maxErrorRate: 1 phases: - - duration: 30 - arrivalRate: 10 + - duration: 100 + arrivalRate: 15 ws: subprotocols: - json @@ -21,4 +21,4 @@ scenarios: flow: - function: "createTestMessageObject" - send: "{{ data }}" - - think: 5 + - think: 10