Eliminate client close deadlocks. (#1833)
This commit is contained in:
parent
a825a831fe
commit
79ca6e04f3
@ -3,6 +3,7 @@ package chat
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@ -17,6 +18,7 @@ import (
|
|||||||
|
|
||||||
// Client represents a single chat client.
|
// Client represents a single chat client.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
|
mu sync.RWMutex
|
||||||
id uint
|
id uint
|
||||||
accessToken string
|
accessToken string
|
||||||
conn *websocket.Conn
|
conn *websocket.Conn
|
||||||
@ -152,11 +154,13 @@ func (c *Client) writePump() {
|
|||||||
|
|
||||||
// Optimization: Send multiple events in a single websocket message.
|
// Optimization: Send multiple events in a single websocket message.
|
||||||
// Add queued chat messages to the current websocket message.
|
// Add queued chat messages to the current websocket message.
|
||||||
|
c.mu.RLock()
|
||||||
n := len(c.send)
|
n := len(c.send)
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
_, _ = w.Write(newline)
|
_, _ = w.Write(newline)
|
||||||
_, _ = w.Write(<-c.send)
|
_, _ = w.Write(<-c.send)
|
||||||
}
|
}
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
if err := w.Close(); err != nil {
|
if err := w.Close(); err != nil {
|
||||||
return
|
return
|
||||||
@ -177,9 +181,12 @@ func (c *Client) handleEvent(data []byte) {
|
|||||||
func (c *Client) close() {
|
func (c *Client) close() {
|
||||||
log.Traceln("client closed:", c.User.DisplayName, c.id, c.IPAddress)
|
log.Traceln("client closed:", c.User.DisplayName, c.id, c.IPAddress)
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.send != nil {
|
||||||
_ = c.conn.Close()
|
_ = c.conn.Close()
|
||||||
c.server.unregister <- c.id
|
c.server.unregister <- c.id
|
||||||
if c.send != nil {
|
|
||||||
close(c.send)
|
close(c.send)
|
||||||
c.send = nil
|
c.send = nil
|
||||||
}
|
}
|
||||||
@ -214,6 +221,9 @@ func (c *Client) sendPayload(payload interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
|
||||||
if c.send != nil {
|
if c.send != nil {
|
||||||
c.send <- data
|
c.send <- data
|
||||||
}
|
}
|
||||||
|
@ -231,8 +231,8 @@ func (s *Server) Broadcast(payload events.EventPayload) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.RLock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
for _, client := range s.clients {
|
for _, client := range s.clients {
|
||||||
if client == nil {
|
if client == nil {
|
||||||
@ -242,8 +242,7 @@ func (s *Server) Broadcast(payload events.EventPayload) error {
|
|||||||
select {
|
select {
|
||||||
case client.send <- data:
|
case client.send <- data:
|
||||||
default:
|
default:
|
||||||
client.close()
|
go client.close()
|
||||||
delete(s.clients, client.id)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user