0

Websocket fixes (#461)

* Bump api spec version

* Do not panic on cpu monitoring error

* Centralize the socket disconnect logic and fire it also when socket errors occur. Hopefully closes #421
This commit is contained in:
Gabe Kangas 2020-12-21 19:42:47 -08:00 committed by GitHub
parent eab45c7e92
commit e558c549d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 15 deletions

View File

@ -72,7 +72,7 @@ func (c *Client) Write(msg models.ChatMessage) {
select { select {
case c.ch <- msg: case c.ch <- msg:
default: default:
_server.remove(c) _server.removeClient(c)
_server.err(fmt.Errorf("client %s is disconnected", c.ClientID)) _server.err(fmt.Errorf("client %s is disconnected", c.ClientID))
} }
} }
@ -96,28 +96,33 @@ func (c *Client) listenWrite() {
case msg := <-c.pingch: case msg := <-c.pingch:
err := websocket.JSON.Send(c.ws, msg) err := websocket.JSON.Send(c.ws, msg)
if err != nil { if err != nil {
log.Errorln(err) c.handleClientSocketError(err)
} }
// send message to the client // send message to the client
case msg := <-c.ch: case msg := <-c.ch:
err := websocket.JSON.Send(c.ws, msg) err := websocket.JSON.Send(c.ws, msg)
if err != nil { if err != nil {
log.Errorln(err) c.handleClientSocketError(err)
} }
case msg := <-c.usernameChangeChannel: case msg := <-c.usernameChangeChannel:
err := websocket.JSON.Send(c.ws, msg) err := websocket.JSON.Send(c.ws, msg)
if err != nil { if err != nil {
log.Errorln(err) c.handleClientSocketError(err)
} }
// receive done request // receive done request
case <-c.doneCh: case <-c.doneCh:
_server.remove(c) _server.removeClient(c)
c.doneCh <- true // for listenRead method c.doneCh <- true // for listenRead method
return return
} }
} }
} }
func (c *Client) handleClientSocketError(err error) {
log.Errorln("Websocket client error: ", err.Error())
_server.removeClient(c)
}
// Listen read request via channel. // Listen read request via channel.
func (c *Client) listenRead() { func (c *Client) listenRead() {
for { for {
@ -136,7 +141,7 @@ func (c *Client) listenRead() {
if err == io.EOF { if err == io.EOF {
c.doneCh <- true c.doneCh <- true
} else { } else {
log.Errorln(err) c.handleClientSocketError(err)
} }
return return
} }

View File

@ -3,6 +3,7 @@ package chat
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -16,6 +17,8 @@ var (
_server *server _server *server
) )
var l = sync.Mutex{}
// Server represents the server which handles the chat. // Server represents the server which handles the chat.
type server struct { type server struct {
Clients map[string]*Client Clients map[string]*Client
@ -74,7 +77,7 @@ func (s *server) onConnection(ws *websocket.Conn) {
client := NewClient(ws) client := NewClient(ws)
defer func() { defer func() {
log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(client.ConnectedAt), client.MessageCount, client.ClientID) s.removeClient(client)
if err := ws.Close(); err != nil { if err := ws.Close(); err != nil {
s.errCh <- err s.errCh <- err
@ -96,18 +99,20 @@ func (s *server) Listen() {
select { select {
// add new a client // add new a client
case c := <-s.addCh: case c := <-s.addCh:
l.Lock()
s.Clients[c.socketID] = c s.Clients[c.socketID] = c
l.Unlock()
s.listener.ClientAdded(c.GetViewerClientFromChatClient()) s.listener.ClientAdded(c.GetViewerClientFromChatClient())
s.sendWelcomeMessageToClient(c) s.sendWelcomeMessageToClient(c)
// remove a client // remove a client
case c := <-s.delCh: case c := <-s.delCh:
delete(s.Clients, c.socketID) s.removeClient(c)
s.listener.ClientRemoved(c.socketID) case msg := <-s.sendAllCh:
// message was received from a client and should be sanitized, validated // message was received from a client and should be sanitized, validated
// and distributed to other clients. // and distributed to other clients.
case msg := <-s.sendAllCh: //
// Will turn markdown into html, sanitize user-supplied raw html // Will turn markdown into html, sanitize user-supplied raw html
// and standardize this message into something safe we can send everyone else. // and standardize this message into something safe we can send everyone else.
msg.RenderAndSanitizeMessageBody() msg.RenderAndSanitizeMessageBody()
@ -129,6 +134,19 @@ func (s *server) Listen() {
} }
} }
func (s *server) removeClient(c *Client) {
l.Lock()
if _, ok := s.Clients[c.socketID]; ok {
delete(s.Clients, c.socketID)
s.listener.ClientRemoved(c.socketID)
log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(c.ConnectedAt), c.MessageCount, c.ClientID)
}
l.Unlock()
}
func (s *server) sendWelcomeMessageToClient(c *Client) { func (s *server) sendWelcomeMessageToClient(c *Client) {
go func() { go func() {
// Add an artificial delay so people notice this message come in. // Add an artificial delay so people notice this message come in.

View File

@ -6,6 +6,8 @@ import (
"github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/mem"
log "github.com/sirupsen/logrus"
) )
// Max number of metrics we want to keep. // Max number of metrics we want to keep.
@ -18,7 +20,8 @@ func collectCPUUtilization() {
v, err := cpu.Percent(0, false) v, err := cpu.Percent(0, false)
if err != nil { if err != nil {
panic(err) log.Errorln(err)
return
} }
metricValue := timestampedValue{time.Now(), int(v[0])} metricValue := timestampedValue{time.Now(), int(v[0])}

View File

@ -7,8 +7,8 @@ config:
maxErrorRate: 1 maxErrorRate: 1
phases: phases:
- duration: 30 - duration: 100
arrivalRate: 10 arrivalRate: 15
ws: ws:
subprotocols: subprotocols:
- json - json
@ -21,4 +21,4 @@ scenarios:
flow: flow:
- function: "createTestMessageObject" - function: "createTestMessageObject"
- send: "{{ data }}" - send: "{{ data }}"
- think: 5 - think: 10