Optimize/fix concurrency for chat
This commit is contained in:
@@ -73,6 +73,9 @@ func GetModerationChatMessages() []models.ChatEvent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func GetClient(clientID string) *Client {
|
func GetClient(clientID string) *Client {
|
||||||
|
l.RLock()
|
||||||
|
defer l.RUnlock()
|
||||||
|
|
||||||
for _, client := range _server.Clients {
|
for _, client := range _server.Clients {
|
||||||
if client.ClientID == clientID {
|
if client.ClientID == clientID {
|
||||||
return client
|
return client
|
||||||
|
|||||||
@@ -153,11 +153,10 @@ func (c *Client) listenRead() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
c.doneCh <- true
|
c.doneCh <- true
|
||||||
} else {
|
|
||||||
c.handleClientSocketError(err)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
c.handleClientSocketError(err)
|
||||||
|
}
|
||||||
|
|
||||||
var messageTypeCheck map[string]interface{}
|
var messageTypeCheck map[string]interface{}
|
||||||
err = json.Unmarshal(data, &messageTypeCheck)
|
err = json.Unmarshal(data, &messageTypeCheck)
|
||||||
@@ -165,12 +164,12 @@ func (c *Client) listenRead() {
|
|||||||
log.Errorln(err)
|
log.Errorln(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
messageType := messageTypeCheck["type"].(string)
|
|
||||||
|
|
||||||
if !c.passesRateLimit() {
|
if !c.passesRateLimit() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
messageType := messageTypeCheck["type"].(string)
|
||||||
|
|
||||||
if messageType == models.MessageSent {
|
if messageType == models.MessageSent {
|
||||||
c.chatMessageReceived(data)
|
c.chatMessageReceived(data)
|
||||||
} else if messageType == models.UserNameChanged {
|
} else if messageType == models.UserNameChanged {
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ var (
|
|||||||
_server *server
|
_server *server
|
||||||
)
|
)
|
||||||
|
|
||||||
var l = sync.Mutex{}
|
var l = &sync.RWMutex{}
|
||||||
|
|
||||||
// Server represents the server which handles the chat.
|
// Server represents the server which handles the chat.
|
||||||
type server struct {
|
type server struct {
|
||||||
@@ -56,32 +56,42 @@ func (s *server) err(err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) sendAll(msg models.ChatEvent) {
|
func (s *server) sendAll(msg models.ChatEvent) {
|
||||||
|
l.RLock()
|
||||||
for _, c := range s.Clients {
|
for _, c := range s.Clients {
|
||||||
c.write(msg)
|
c.write(msg)
|
||||||
}
|
}
|
||||||
|
l.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) ping() {
|
func (s *server) ping() {
|
||||||
ping := models.PingMessage{MessageType: models.PING}
|
ping := models.PingMessage{MessageType: models.PING}
|
||||||
|
|
||||||
|
l.RLock()
|
||||||
for _, c := range s.Clients {
|
for _, c := range s.Clients {
|
||||||
c.pingch <- ping
|
c.pingch <- ping
|
||||||
}
|
}
|
||||||
|
l.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) usernameChanged(msg models.NameChangeEvent) {
|
func (s *server) usernameChanged(msg models.NameChangeEvent) {
|
||||||
|
l.RLock()
|
||||||
for _, c := range s.Clients {
|
for _, c := range s.Clients {
|
||||||
c.usernameChangeChannel <- msg
|
c.usernameChangeChannel <- msg
|
||||||
}
|
}
|
||||||
|
l.RUnlock()
|
||||||
|
|
||||||
go webhooks.SendChatEventUsernameChanged(msg)
|
go webhooks.SendChatEventUsernameChanged(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) userJoined(msg models.UserJoinedEvent) {
|
func (s *server) userJoined(msg models.UserJoinedEvent) {
|
||||||
|
l.RLock()
|
||||||
if s.listener.IsStreamConnected() {
|
if s.listener.IsStreamConnected() {
|
||||||
for _, c := range s.Clients {
|
for _, c := range s.Clients {
|
||||||
c.userJoinedChannel <- msg
|
c.userJoinedChannel <- msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
l.RUnlock()
|
||||||
|
|
||||||
go webhooks.SendChatEventUserJoined(msg)
|
go webhooks.SendChatEventUserJoined(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,7 +102,8 @@ func (s *server) onConnection(ws *websocket.Conn) {
|
|||||||
s.removeClient(client)
|
s.removeClient(client)
|
||||||
|
|
||||||
if err := ws.Close(); err != nil {
|
if err := ws.Close(); err != nil {
|
||||||
s.errCh <- err
|
log.Debugln(err)
|
||||||
|
//s.errCh <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -113,12 +124,12 @@ func (s *server) Listen() {
|
|||||||
case c := <-s.addCh:
|
case c := <-s.addCh:
|
||||||
l.Lock()
|
l.Lock()
|
||||||
s.Clients[c.socketID] = c
|
s.Clients[c.socketID] = c
|
||||||
l.Unlock()
|
|
||||||
|
|
||||||
if !c.Ignore {
|
if !c.Ignore {
|
||||||
s.listener.ClientAdded(c.GetViewerClientFromChatClient())
|
s.listener.ClientAdded(c.GetViewerClientFromChatClient())
|
||||||
s.sendWelcomeMessageToClient(c)
|
s.sendWelcomeMessageToClient(c)
|
||||||
}
|
}
|
||||||
|
l.Unlock()
|
||||||
|
|
||||||
// remove a client
|
// remove a client
|
||||||
case c := <-s.delCh:
|
case c := <-s.delCh:
|
||||||
@@ -158,12 +169,10 @@ func (s *server) Listen() {
|
|||||||
|
|
||||||
func (s *server) removeClient(c *Client) {
|
func (s *server) removeClient(c *Client) {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
|
|
||||||
if _, ok := s.Clients[c.socketID]; ok {
|
if _, ok := s.Clients[c.socketID]; ok {
|
||||||
delete(s.Clients, c.socketID)
|
delete(s.Clients, c.socketID)
|
||||||
|
|
||||||
s.listener.ClientRemoved(c.socketID)
|
s.listener.ClientRemoved(c.socketID)
|
||||||
|
|
||||||
log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(c.ConnectedAt), c.MessageCount, c.ClientID)
|
log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(c.ConnectedAt), c.MessageCount, c.ClientID)
|
||||||
}
|
}
|
||||||
l.Unlock()
|
l.Unlock()
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/owncast/owncast/utils"
|
"github.com/owncast/owncast/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
var l = sync.Mutex{}
|
var l = &sync.RWMutex{}
|
||||||
|
|
||||||
func setupStats() error {
|
func setupStats() error {
|
||||||
s := getSavedStats()
|
s := getSavedStats()
|
||||||
@@ -52,6 +52,8 @@ func IsStreamConnected() bool {
|
|||||||
// SetClientActive sets a client as active and connected.
|
// SetClientActive sets a client as active and connected.
|
||||||
func SetClientActive(client models.Client) {
|
func SetClientActive(client models.Client) {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
|
|
||||||
// If this clientID already exists then update it.
|
// If this clientID already exists then update it.
|
||||||
// Otherwise set a new one.
|
// Otherwise set a new one.
|
||||||
if existingClient, ok := _stats.Clients[client.ClientID]; ok {
|
if existingClient, ok := _stats.Clients[client.ClientID]; ok {
|
||||||
@@ -66,7 +68,6 @@ func SetClientActive(client models.Client) {
|
|||||||
}
|
}
|
||||||
_stats.Clients[client.ClientID] = client
|
_stats.Clients[client.ClientID] = client
|
||||||
}
|
}
|
||||||
l.Unlock()
|
|
||||||
|
|
||||||
// Don't update viewer counts if a live stream session is not active.
|
// Don't update viewer counts if a live stream session is not active.
|
||||||
if _stats.StreamConnected {
|
if _stats.StreamConnected {
|
||||||
@@ -85,6 +86,7 @@ func RemoveClient(clientID string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func GetClients() []models.Client {
|
func GetClients() []models.Client {
|
||||||
|
l.RLock()
|
||||||
clients := make([]models.Client, 0)
|
clients := make([]models.Client, 0)
|
||||||
for _, client := range _stats.Clients {
|
for _, client := range _stats.Clients {
|
||||||
chatClient := chat.GetClient(client.ClientID)
|
chatClient := chat.GetClient(client.ClientID)
|
||||||
@@ -94,6 +96,8 @@ func GetClients() []models.Client {
|
|||||||
clients = append(clients, client)
|
clients = append(clients, client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
l.RUnlock()
|
||||||
|
|
||||||
return clients
|
return clients
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,8 +7,15 @@ config:
|
|||||||
maxErrorRate: 1
|
maxErrorRate: 1
|
||||||
|
|
||||||
phases:
|
phases:
|
||||||
- duration: 100
|
- duration: 30
|
||||||
arrivalRate: 15
|
arrivalRate: 5
|
||||||
|
rampTo: 5
|
||||||
|
name: "Warming up"
|
||||||
|
- duration: 240
|
||||||
|
arrivalRate: 5
|
||||||
|
rampTo: 40
|
||||||
|
name: "Max load"
|
||||||
|
|
||||||
ws:
|
ws:
|
||||||
subprotocols:
|
subprotocols:
|
||||||
- json
|
- json
|
||||||
@@ -16,9 +23,10 @@ config:
|
|||||||
Connection: Upgrade
|
Connection: Upgrade
|
||||||
Origin: http://localhost:8080
|
Origin: http://localhost:8080
|
||||||
Sec-WebSocket-Version: 13
|
Sec-WebSocket-Version: 13
|
||||||
|
|
||||||
scenarios:
|
scenarios:
|
||||||
- engine: "ws"
|
- engine: "ws"
|
||||||
flow:
|
flow:
|
||||||
- function: "createTestMessageObject"
|
- function: "createTestMessageObject"
|
||||||
- send: "{{ data }}"
|
- send: "{{ data }}"
|
||||||
- think: 10
|
- think: 30 # Each client should stay connected for 30 seconds
|
||||||
|
|||||||
Reference in New Issue
Block a user