@@ -167,7 +167,6 @@ func (s *Server) userMessageSent(eventData chatClientEvent) {
|
||||
|
||||
SaveUserMessage(event)
|
||||
eventData.client.MessageCount++
|
||||
_lastSeenCache[event.User.ID] = time.Now()
|
||||
}
|
||||
|
||||
func logSanitize(userValue string) string {
|
||||
|
||||
@@ -8,6 +8,8 @@ const (
|
||||
MessageSent EventType = "CHAT"
|
||||
// UserJoined is the event sent when a chat user join action takes place.
|
||||
UserJoined EventType = "USER_JOINED"
|
||||
// UserParted is the event sent when a chat user part action takes place.
|
||||
UserParted EventType = "USER_PARTED"
|
||||
// UserNameChanged is the event sent when a chat username change takes place.
|
||||
UserNameChanged EventType = "NAME_CHANGE"
|
||||
// UserColorChanged is the event sent when a chat user color change takes place.
|
||||
|
||||
17
core/chat/events/userPartEvent.go
Normal file
17
core/chat/events/userPartEvent.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package events
|
||||
|
||||
// UserPartEvent is the event fired when a user leaves chat.
|
||||
type UserPartEvent struct {
|
||||
Event
|
||||
UserEvent
|
||||
}
|
||||
|
||||
// GetBroadcastPayload will return the object to send to all chat users.
|
||||
func (e *UserPartEvent) GetBroadcastPayload() EventPayload {
|
||||
return EventPayload{
|
||||
"type": UserParted,
|
||||
"id": e.ID,
|
||||
"timestamp": e.Timestamp,
|
||||
"user": e.User,
|
||||
}
|
||||
}
|
||||
@@ -22,9 +22,6 @@ import (
|
||||
|
||||
var _server *Server
|
||||
|
||||
// a map of user IDs and when they last were active.
|
||||
var _lastSeenCache = map[string]time.Time{}
|
||||
|
||||
// Server represents an instance of the chat server.
|
||||
type Server struct {
|
||||
clients map[uint]*Client
|
||||
@@ -43,6 +40,9 @@ type Server struct {
|
||||
maxSocketConnectionLimit int64
|
||||
|
||||
mu sync.RWMutex
|
||||
|
||||
// a map of user IDs and timers that fire for chat part messages.
|
||||
userPartedTimers map[string]*time.Ticker
|
||||
}
|
||||
|
||||
// NewChat will return a new instance of the chat server.
|
||||
@@ -57,6 +57,7 @@ func NewChat() *Server {
|
||||
unregister: make(chan uint),
|
||||
maxSocketConnectionLimit: maximumConcurrentConnectionLimit,
|
||||
geoipClient: geoip.NewClient(),
|
||||
userPartedTimers: map[string]*time.Ticker{},
|
||||
}
|
||||
|
||||
return server
|
||||
@@ -67,7 +68,8 @@ func (s *Server) Run() {
|
||||
for {
|
||||
select {
|
||||
case clientID := <-s.unregister:
|
||||
if _, ok := s.clients[clientID]; ok {
|
||||
if client, ok := s.clients[clientID]; ok {
|
||||
s.handleClientDisconnected(client)
|
||||
s.mu.Lock()
|
||||
delete(s.clients, clientID)
|
||||
s.mu.Unlock()
|
||||
@@ -92,18 +94,22 @@ func (s *Server) Addclient(conn *websocket.Conn, user *user.User, accessToken st
|
||||
ConnectedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Do not send user re-joined broadcast message if they've been active within 10 minutes.
|
||||
shouldSendJoinedMessages := data.GetChatJoinMessagesEnabled()
|
||||
if previouslyLastSeen, ok := _lastSeenCache[user.ID]; ok && time.Since(previouslyLastSeen) < time.Minute*10 {
|
||||
shouldSendJoinedMessages = false
|
||||
}
|
||||
shouldSendJoinedMessages := data.GetChatJoinPartMessagesEnabled()
|
||||
|
||||
s.mu.Lock()
|
||||
{
|
||||
// If there is a pending disconnect timer then clear it.
|
||||
// Do not send user joined message if enough time hasn't passed where the
|
||||
// user chat part message hasn't been sent yet.
|
||||
if ticker, ok := s.userPartedTimers[user.ID]; ok {
|
||||
ticker.Stop()
|
||||
delete(s.userPartedTimers, user.ID)
|
||||
shouldSendJoinedMessages = false
|
||||
}
|
||||
|
||||
client.Id = s.seq
|
||||
s.clients[client.Id] = client
|
||||
s.seq++
|
||||
_lastSeenCache[user.ID] = time.Now()
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
@@ -143,16 +149,43 @@ func (s *Server) sendUserJoinedMessage(c *Client) {
|
||||
webhooks.SendChatEventUserJoined(userJoinedEvent)
|
||||
}
|
||||
|
||||
// ClientClosed is fired when a client disconnects or connection is dropped.
|
||||
func (s *Server) ClientClosed(c *Client) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
c.close()
|
||||
|
||||
func (s *Server) handleClientDisconnected(c *Client) {
|
||||
if _, ok := s.clients[c.Id]; ok {
|
||||
log.Debugln("Deleting", c.Id)
|
||||
delete(s.clients, c.Id)
|
||||
}
|
||||
|
||||
additionalClientCheck, _ := GetClientsForUser(c.User.ID)
|
||||
if len(additionalClientCheck) > 0 {
|
||||
// This user is still connected to chat with another client.
|
||||
return
|
||||
}
|
||||
|
||||
s.userPartedTimers[c.User.ID] = time.NewTicker(10 * time.Second)
|
||||
|
||||
go func() {
|
||||
<-s.userPartedTimers[c.User.ID].C
|
||||
s.sendUserPartedMessage(c)
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) sendUserPartedMessage(c *Client) {
|
||||
s.userPartedTimers[c.User.ID].Stop()
|
||||
delete(s.userPartedTimers, c.User.ID)
|
||||
|
||||
userPartEvent := events.UserPartEvent{}
|
||||
userPartEvent.SetDefaults()
|
||||
userPartEvent.User = c.User
|
||||
userPartEvent.ClientID = c.Id
|
||||
|
||||
// If part messages are disabled.
|
||||
if data.GetChatJoinPartMessagesEnabled() {
|
||||
if err := s.Broadcast(userPartEvent.GetBroadcastPayload()); err != nil {
|
||||
log.Errorln("error sending chat part message", err)
|
||||
}
|
||||
}
|
||||
// Send chat user joined webhook
|
||||
webhooks.SendChatEventUserParted(userPartEvent)
|
||||
}
|
||||
|
||||
// HandleClientConnection is fired when a single client connects to the websocket.
|
||||
|
||||
@@ -816,8 +816,8 @@ func SetChatJoinMessagesEnabled(enabled bool) error {
|
||||
return _datastore.SetBool(chatJoinMessagesEnabledKey, enabled)
|
||||
}
|
||||
|
||||
// GetChatJoinMessagesEnabled will return if chat join messages are enabled.
|
||||
func GetChatJoinMessagesEnabled() bool {
|
||||
// GetChatJoinPartMessagesEnabled will return if chat join messages are enabled.
|
||||
func GetChatJoinPartMessagesEnabled() bool {
|
||||
enabled, err := _datastore.GetBool(chatJoinMessagesEnabledKey)
|
||||
if err != nil {
|
||||
return true
|
||||
|
||||
@@ -43,6 +43,16 @@ func SendChatEventUserJoined(event events.UserJoinedEvent) {
|
||||
SendEventToWebhooks(webhookEvent)
|
||||
}
|
||||
|
||||
// SendChatEventUserParted sends a webhook notifying that a user has parted.
|
||||
func SendChatEventUserParted(event events.UserPartEvent) {
|
||||
webhookEvent := WebhookEvent{
|
||||
Type: events.UserParted,
|
||||
EventData: event,
|
||||
}
|
||||
|
||||
SendEventToWebhooks(webhookEvent)
|
||||
}
|
||||
|
||||
// SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more
|
||||
// messages has changed.
|
||||
func SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/owncast/owncast/core/chat/events"
|
||||
"github.com/owncast/owncast/core/data"
|
||||
"github.com/owncast/owncast/models"
|
||||
jsonpatch "gopkg.in/evanphx/json-patch.v5"
|
||||
@@ -84,7 +85,7 @@ func TestPublicSend(t *testing.T) {
|
||||
|
||||
// Make sure that events are only sent to interested endpoints.
|
||||
func TestRouting(t *testing.T) {
|
||||
eventTypes := []models.EventType{models.ChatActionSent, models.UserJoined}
|
||||
eventTypes := []models.EventType{models.ChatActionSent, models.UserJoined, events.UserParted}
|
||||
|
||||
calls := map[models.EventType]int{}
|
||||
var lock sync.Mutex
|
||||
|
||||
Reference in New Issue
Block a user