0

Active viewer PING controller. Closes #790 (#990)

This commit is contained in:
Gabe Kangas 2021-05-20 20:29:01 -07:00 committed by GitHub
parent be71685937
commit 91f6dcd0f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 89 additions and 27 deletions

View File

@ -9,7 +9,7 @@ import (
// GetConnectedClients returns currently connected clients. // GetConnectedClients returns currently connected clients.
func GetConnectedClients(w http.ResponseWriter, r *http.Request) { func GetConnectedClients(w http.ResponseWriter, r *http.Request) {
clients := core.GetClients() clients := core.GetChatClients()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(clients); err != nil { if err := json.NewEncoder(w).Encode(clients); err != nil {

13
controllers/ping.go Normal file
View File

@ -0,0 +1,13 @@
package controllers
import (
"net/http"
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/utils"
)
func Ping(w http.ResponseWriter, r *http.Request) {
id := utils.GenerateClientIDFromRequest(r)
core.SetViewerIdActive(id)
}

View File

@ -10,12 +10,12 @@ type ChatListenerImpl struct{}
// ClientAdded is for when a client is added the system. // ClientAdded is for when a client is added the system.
func (cl ChatListenerImpl) ClientAdded(client models.Client) { func (cl ChatListenerImpl) ClientAdded(client models.Client) {
SetClientActive(client) SetChatClientActive(client)
} }
// ClientRemoved is for when a client disconnects/is removed. // ClientRemoved is for when a client disconnects/is removed.
func (cl ChatListenerImpl) ClientRemoved(clientID string) { func (cl ChatListenerImpl) ClientRemoved(clientID string) {
RemoveClient(clientID) RemoveChatClient(clientID)
} }
// MessageSent is for when a message is sent. // MessageSent is for when a message is sent.

View File

@ -28,9 +28,7 @@ func RunMigrations() {
} }
func migrateStatsFile() { func migrateStatsFile() {
oldStats := models.Stats{ oldStats := models.Stats{}
Clients: make(map[string]models.Client),
}
if !utils.DoesFileExists(config.StatsFile) { if !utils.DoesFileExists(config.StatsFile) {
return return

View File

@ -15,6 +15,7 @@ import (
) )
var l = &sync.RWMutex{} var l = &sync.RWMutex{}
var _activeViewerPurgeTimeout = time.Second * 10
func setupStats() error { func setupStats() error {
s := getSavedStats() s := getSavedStats()
@ -29,6 +30,13 @@ func setupStats() error {
} }
}() }()
viewerCountPruneTimer := time.NewTicker(5 * time.Second)
go func() {
for range viewerCountPruneTimer.C {
pruneViewerCount()
}
}()
return nil return nil
} }
@ -49,46 +57,40 @@ func IsStreamConnected() bool {
return _stats.StreamConnected return _stats.StreamConnected
} }
// SetClientActive sets a client as active and connected. // SetChatClientActive sets a client as active and connected.
func SetClientActive(client models.Client) { func SetChatClientActive(client models.Client) {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
// If this clientID already exists then update it. // If this clientID already exists then update it.
// Otherwise set a new one. // Otherwise set a new one.
if existingClient, ok := _stats.Clients[client.ClientID]; ok { if existingClient, ok := _stats.ChatClients[client.ClientID]; ok {
existingClient.LastSeen = time.Now() existingClient.LastSeen = time.Now()
existingClient.Username = client.Username existingClient.Username = client.Username
existingClient.MessageCount = client.MessageCount existingClient.MessageCount = client.MessageCount
existingClient.Geo = geoip.GetGeoFromIP(existingClient.IPAddress) existingClient.Geo = geoip.GetGeoFromIP(existingClient.IPAddress)
_stats.Clients[client.ClientID] = existingClient _stats.ChatClients[client.ClientID] = existingClient
} else { } else {
if client.Geo == nil { if client.Geo == nil {
geoip.FetchGeoForIP(client.IPAddress) geoip.FetchGeoForIP(client.IPAddress)
} }
_stats.Clients[client.ClientID] = client _stats.ChatClients[client.ClientID] = client
}
// Don't update viewer counts if a live stream session is not active.
if _stats.StreamConnected {
_stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Clients)), float64(_stats.SessionMaxViewerCount)))
_stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount)))
} }
} }
// RemoveClient removes a client from the active clients record. // RemoveChatClient removes a client from the active clients record.
func RemoveClient(clientID string) { func RemoveChatClient(clientID string) {
log.Trace("Removing the client:", clientID) log.Trace("Removing the client:", clientID)
l.Lock() l.Lock()
delete(_stats.Clients, clientID) delete(_stats.ChatClients, clientID)
l.Unlock() l.Unlock()
} }
func GetClients() []models.Client { func GetChatClients() []models.Client {
l.RLock() l.RLock()
clients := make([]models.Client, 0) clients := make([]models.Client, 0)
for _, client := range _stats.Clients { for _, client := range _stats.ChatClients {
chatClient := chat.GetClient(client.ClientID) chatClient := chat.GetClient(client.ClientID)
if chatClient != nil { if chatClient != nil {
clients = append(clients, chatClient.GetViewerClientFromChatClient()) clients = append(clients, chatClient.GetViewerClientFromChatClient())
@ -101,6 +103,33 @@ func GetClients() []models.Client {
return clients return clients
} }
// SetViewerIdActive sets a client as active and connected.
func SetViewerIdActive(id string) {
l.Lock()
defer l.Unlock()
_stats.Viewers[id] = time.Now()
// Don't update viewer counts if a live stream session is not active.
if _stats.StreamConnected {
_stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Viewers)), float64(_stats.SessionMaxViewerCount)))
_stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount)))
}
}
func pruneViewerCount() {
viewers := make(map[string]time.Time)
for viewerId := range _stats.Viewers {
viewerLastSeenTime := _stats.Viewers[viewerId]
if time.Since(viewerLastSeenTime) < _activeViewerPurgeTimeout {
viewers[viewerId] = viewerLastSeenTime
}
}
_stats.Viewers = viewers
}
func saveStats() error { func saveStats() error {
if err := data.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil { if err := data.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil {
log.Errorln("error saving viewer count", err) log.Errorln("error saving viewer count", err)
@ -118,7 +147,8 @@ func saveStats() error {
func getSavedStats() models.Stats { func getSavedStats() models.Stats {
savedLastDisconnectTime, savedLastDisconnectTimeErr := data.GetLastDisconnectTime() savedLastDisconnectTime, savedLastDisconnectTimeErr := data.GetLastDisconnectTime()
result := models.Stats{ result := models.Stats{
Clients: make(map[string]models.Client), ChatClients: make(map[string]models.Client),
Viewers: make(map[string]time.Time),
SessionMaxViewerCount: data.GetPeakSessionViewerCount(), SessionMaxViewerCount: data.GetPeakSessionViewerCount(),
OverallMaxViewerCount: data.GetPeakOverallViewerCount(), OverallMaxViewerCount: data.GetPeakOverallViewerCount(),
LastDisconnectTime: utils.NullTime{Time: savedLastDisconnectTime, Valid: savedLastDisconnectTimeErr == nil}, LastDisconnectTime: utils.NullTime{Time: savedLastDisconnectTime, Valid: savedLastDisconnectTimeErr == nil},

View File

@ -14,7 +14,7 @@ func GetStatus() models.Status {
viewerCount := 0 viewerCount := 0
if IsStreamConnected() { if IsStreamConnected() {
viewerCount = len(_stats.Clients) viewerCount = len(_stats.Viewers)
} }
return models.Status{ return models.Status{

View File

@ -1,6 +1,8 @@
package models package models
import ( import (
"time"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
) )
@ -10,7 +12,8 @@ type Stats struct {
OverallMaxViewerCount int `json:"overallMaxViewerCount"` OverallMaxViewerCount int `json:"overallMaxViewerCount"`
LastDisconnectTime utils.NullTime `json:"lastDisconnectTime"` LastDisconnectTime utils.NullTime `json:"lastDisconnectTime"`
StreamConnected bool `json:"-"` StreamConnected bool `json:"-"`
LastConnectTime utils.NullTime `json:"-"` LastConnectTime utils.NullTime `json:"-"`
Clients map[string]Client `json:"-"` ChatClients map[string]Client `json:"-"`
Viewers map[string]time.Time `json:"-"`
} }

View File

@ -61,6 +61,9 @@ func Start() error {
// return the list of video variants available // return the list of video variants available
http.HandleFunc("/api/video/variants", controllers.GetVideoStreamOutputVariants) http.HandleFunc("/api/video/variants", controllers.GetVideoStreamOutputVariants)
// tell the backend you're an active viewer
http.HandleFunc("/api/ping", controllers.Ping)
// Authenticated admin requests // Authenticated admin requests
// Current inbound broadcaster // Current inbound broadcaster

View File

@ -19,6 +19,7 @@ import {
import { import {
URL_CONFIG, URL_CONFIG,
URL_STATUS, URL_STATUS,
URL_VIEWER_PING,
TIMER_STATUS_UPDATE, TIMER_STATUS_UPDATE,
TIMER_STREAM_DURATION_COUNTER, TIMER_STREAM_DURATION_COUNTER,
TEMP_IMAGE, TEMP_IMAGE,
@ -117,6 +118,12 @@ export default class VideoOnly extends Component {
this.handleOfflineMode(); this.handleOfflineMode();
this.handleNetworkingError(`Stream status: ${error}`); this.handleNetworkingError(`Stream status: ${error}`);
}); });
// Ping the API to let them know we're an active viewer
fetch(URL_VIEWER_PING).catch((error) => {
this.handleOfflineMode();
this.handleNetworkingError(`Viewer PING error: ${error}`);
});
} }
setConfigData(data = {}) { setConfigData(data = {}) {

View File

@ -42,6 +42,7 @@ import {
URL_CONFIG, URL_CONFIG,
URL_OWNCAST, URL_OWNCAST,
URL_STATUS, URL_STATUS,
URL_VIEWER_PING,
WIDTH_SINGLE_COL, WIDTH_SINGLE_COL,
} from './utils/constants.js'; } from './utils/constants.js';
@ -186,6 +187,12 @@ export default class App extends Component {
this.handleOfflineMode(); this.handleOfflineMode();
this.handleNetworkingError(`Stream status: ${error}`); this.handleNetworkingError(`Stream status: ${error}`);
}); });
// Ping the API to let them know we're an active viewer
fetch(URL_VIEWER_PING).catch((error) => {
this.handleOfflineMode();
this.handleNetworkingError(`Viewer PING error: ${error}`);
});
} }
setConfigData(data = {}) { setConfigData(data = {}) {

View File

@ -4,6 +4,7 @@ export const URL_STATUS = `/api/status`;
export const URL_CHAT_HISTORY = `/api/chat`; export const URL_CHAT_HISTORY = `/api/chat`;
export const URL_CUSTOM_EMOJIS = `/api/emoji`; export const URL_CUSTOM_EMOJIS = `/api/emoji`;
export const URL_CONFIG = `/api/config`; export const URL_CONFIG = `/api/config`;
export const URL_VIEWER_PING = `/api/ping`;
// TODO: This directory is customizable in the config. So we should expose this via the config API. // TODO: This directory is customizable in the config. So we should expose this via the config API.
export const URL_STREAM = `/hls/stream.m3u8`; export const URL_STREAM = `/hls/stream.m3u8`;