From 91f6dcd0f6db887bab6fa9278bffc09ade95ac7f Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Thu, 20 May 2021 20:29:01 -0700 Subject: [PATCH] Active viewer PING controller. Closes #790 (#990) --- controllers/connectedClients.go | 2 +- controllers/ping.go | 13 +++++++ core/chatListener.go | 4 +-- core/data/migrator.go | 4 +-- core/stats.go | 64 ++++++++++++++++++++++++--------- core/status.go | 2 +- models/stats.go | 9 +++-- router/router.go | 3 ++ webroot/js/app-video-only.js | 7 ++++ webroot/js/app.js | 7 ++++ webroot/js/utils/constants.js | 1 + 11 files changed, 89 insertions(+), 27 deletions(-) create mode 100644 controllers/ping.go diff --git a/controllers/connectedClients.go b/controllers/connectedClients.go index b30bf0957..384abd2d6 100644 --- a/controllers/connectedClients.go +++ b/controllers/connectedClients.go @@ -9,7 +9,7 @@ import ( // GetConnectedClients returns currently connected clients. func GetConnectedClients(w http.ResponseWriter, r *http.Request) { - clients := core.GetClients() + clients := core.GetChatClients() w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(clients); err != nil { diff --git a/controllers/ping.go b/controllers/ping.go new file mode 100644 index 000000000..81b0ab6e1 --- /dev/null +++ b/controllers/ping.go @@ -0,0 +1,13 @@ +package controllers + +import ( + "net/http" + + "github.com/owncast/owncast/core" + "github.com/owncast/owncast/utils" +) + +func Ping(w http.ResponseWriter, r *http.Request) { + id := utils.GenerateClientIDFromRequest(r) + core.SetViewerIdActive(id) +} diff --git a/core/chatListener.go b/core/chatListener.go index 00d1a5e58..b868b97dc 100644 --- a/core/chatListener.go +++ b/core/chatListener.go @@ -10,12 +10,12 @@ type ChatListenerImpl struct{} // ClientAdded is for when a client is added the system. func (cl ChatListenerImpl) ClientAdded(client models.Client) { - SetClientActive(client) + SetChatClientActive(client) } // ClientRemoved is for when a client disconnects/is removed. func (cl ChatListenerImpl) ClientRemoved(clientID string) { - RemoveClient(clientID) + RemoveChatClient(clientID) } // MessageSent is for when a message is sent. diff --git a/core/data/migrator.go b/core/data/migrator.go index c08f1ab16..47009c72a 100644 --- a/core/data/migrator.go +++ b/core/data/migrator.go @@ -28,9 +28,7 @@ func RunMigrations() { } func migrateStatsFile() { - oldStats := models.Stats{ - Clients: make(map[string]models.Client), - } + oldStats := models.Stats{} if !utils.DoesFileExists(config.StatsFile) { return diff --git a/core/stats.go b/core/stats.go index 1ece58a20..ca6ebdec0 100644 --- a/core/stats.go +++ b/core/stats.go @@ -15,6 +15,7 @@ import ( ) var l = &sync.RWMutex{} +var _activeViewerPurgeTimeout = time.Second * 10 func setupStats() error { s := getSavedStats() @@ -29,6 +30,13 @@ func setupStats() error { } }() + viewerCountPruneTimer := time.NewTicker(5 * time.Second) + go func() { + for range viewerCountPruneTimer.C { + pruneViewerCount() + } + }() + return nil } @@ -49,46 +57,40 @@ func IsStreamConnected() bool { return _stats.StreamConnected } -// SetClientActive sets a client as active and connected. -func SetClientActive(client models.Client) { +// SetChatClientActive sets a client as active and connected. +func SetChatClientActive(client models.Client) { l.Lock() defer l.Unlock() // If this clientID already exists then update it. // Otherwise set a new one. - if existingClient, ok := _stats.Clients[client.ClientID]; ok { + if existingClient, ok := _stats.ChatClients[client.ClientID]; ok { existingClient.LastSeen = time.Now() existingClient.Username = client.Username existingClient.MessageCount = client.MessageCount existingClient.Geo = geoip.GetGeoFromIP(existingClient.IPAddress) - _stats.Clients[client.ClientID] = existingClient + _stats.ChatClients[client.ClientID] = existingClient } else { if client.Geo == nil { geoip.FetchGeoForIP(client.IPAddress) } - _stats.Clients[client.ClientID] = client - } - - // Don't update viewer counts if a live stream session is not active. - if _stats.StreamConnected { - _stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Clients)), float64(_stats.SessionMaxViewerCount))) - _stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount))) + _stats.ChatClients[client.ClientID] = client } } -// RemoveClient removes a client from the active clients record. -func RemoveClient(clientID string) { +// RemoveChatClient removes a client from the active clients record. +func RemoveChatClient(clientID string) { log.Trace("Removing the client:", clientID) l.Lock() - delete(_stats.Clients, clientID) + delete(_stats.ChatClients, clientID) l.Unlock() } -func GetClients() []models.Client { +func GetChatClients() []models.Client { l.RLock() clients := make([]models.Client, 0) - for _, client := range _stats.Clients { + for _, client := range _stats.ChatClients { chatClient := chat.GetClient(client.ClientID) if chatClient != nil { clients = append(clients, chatClient.GetViewerClientFromChatClient()) @@ -101,6 +103,33 @@ func GetClients() []models.Client { return clients } +// SetViewerIdActive sets a client as active and connected. +func SetViewerIdActive(id string) { + l.Lock() + defer l.Unlock() + + _stats.Viewers[id] = time.Now() + + // Don't update viewer counts if a live stream session is not active. + if _stats.StreamConnected { + _stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Viewers)), float64(_stats.SessionMaxViewerCount))) + _stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount))) + } +} + +func pruneViewerCount() { + viewers := make(map[string]time.Time) + + for viewerId := range _stats.Viewers { + viewerLastSeenTime := _stats.Viewers[viewerId] + if time.Since(viewerLastSeenTime) < _activeViewerPurgeTimeout { + viewers[viewerId] = viewerLastSeenTime + } + } + + _stats.Viewers = viewers +} + func saveStats() error { if err := data.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil { log.Errorln("error saving viewer count", err) @@ -118,7 +147,8 @@ func saveStats() error { func getSavedStats() models.Stats { savedLastDisconnectTime, savedLastDisconnectTimeErr := data.GetLastDisconnectTime() result := models.Stats{ - Clients: make(map[string]models.Client), + ChatClients: make(map[string]models.Client), + Viewers: make(map[string]time.Time), SessionMaxViewerCount: data.GetPeakSessionViewerCount(), OverallMaxViewerCount: data.GetPeakOverallViewerCount(), LastDisconnectTime: utils.NullTime{Time: savedLastDisconnectTime, Valid: savedLastDisconnectTimeErr == nil}, diff --git a/core/status.go b/core/status.go index b6e81cec2..5981a62ba 100644 --- a/core/status.go +++ b/core/status.go @@ -14,7 +14,7 @@ func GetStatus() models.Status { viewerCount := 0 if IsStreamConnected() { - viewerCount = len(_stats.Clients) + viewerCount = len(_stats.Viewers) } return models.Status{ diff --git a/models/stats.go b/models/stats.go index a94458dec..9ef3f74e2 100644 --- a/models/stats.go +++ b/models/stats.go @@ -1,6 +1,8 @@ package models import ( + "time" + "github.com/owncast/owncast/utils" ) @@ -10,7 +12,8 @@ type Stats struct { OverallMaxViewerCount int `json:"overallMaxViewerCount"` LastDisconnectTime utils.NullTime `json:"lastDisconnectTime"` - StreamConnected bool `json:"-"` - LastConnectTime utils.NullTime `json:"-"` - Clients map[string]Client `json:"-"` + StreamConnected bool `json:"-"` + LastConnectTime utils.NullTime `json:"-"` + ChatClients map[string]Client `json:"-"` + Viewers map[string]time.Time `json:"-"` } diff --git a/router/router.go b/router/router.go index a8cac39ea..4745d2b49 100644 --- a/router/router.go +++ b/router/router.go @@ -61,6 +61,9 @@ func Start() error { // return the list of video variants available http.HandleFunc("/api/video/variants", controllers.GetVideoStreamOutputVariants) + // tell the backend you're an active viewer + http.HandleFunc("/api/ping", controllers.Ping) + // Authenticated admin requests // Current inbound broadcaster diff --git a/webroot/js/app-video-only.js b/webroot/js/app-video-only.js index bba941590..97545d3fe 100644 --- a/webroot/js/app-video-only.js +++ b/webroot/js/app-video-only.js @@ -19,6 +19,7 @@ import { import { URL_CONFIG, URL_STATUS, + URL_VIEWER_PING, TIMER_STATUS_UPDATE, TIMER_STREAM_DURATION_COUNTER, TEMP_IMAGE, @@ -117,6 +118,12 @@ export default class VideoOnly extends Component { this.handleOfflineMode(); this.handleNetworkingError(`Stream status: ${error}`); }); + + // Ping the API to let them know we're an active viewer + fetch(URL_VIEWER_PING).catch((error) => { + this.handleOfflineMode(); + this.handleNetworkingError(`Viewer PING error: ${error}`); + }); } setConfigData(data = {}) { diff --git a/webroot/js/app.js b/webroot/js/app.js index 79acb1bd1..d05906f77 100644 --- a/webroot/js/app.js +++ b/webroot/js/app.js @@ -42,6 +42,7 @@ import { URL_CONFIG, URL_OWNCAST, URL_STATUS, + URL_VIEWER_PING, WIDTH_SINGLE_COL, } from './utils/constants.js'; @@ -186,6 +187,12 @@ export default class App extends Component { this.handleOfflineMode(); this.handleNetworkingError(`Stream status: ${error}`); }); + + // Ping the API to let them know we're an active viewer + fetch(URL_VIEWER_PING).catch((error) => { + this.handleOfflineMode(); + this.handleNetworkingError(`Viewer PING error: ${error}`); + }); } setConfigData(data = {}) { diff --git a/webroot/js/utils/constants.js b/webroot/js/utils/constants.js index 8a9a36e8d..03bb2ea66 100644 --- a/webroot/js/utils/constants.js +++ b/webroot/js/utils/constants.js @@ -4,6 +4,7 @@ export const URL_STATUS = `/api/status`; export const URL_CHAT_HISTORY = `/api/chat`; export const URL_CUSTOM_EMOJIS = `/api/emoji`; export const URL_CONFIG = `/api/config`; +export const URL_VIEWER_PING = `/api/ping`; // TODO: This directory is customizable in the config. So we should expose this via the config API. export const URL_STREAM = `/hls/stream.m3u8`;