6ea9affce0
* WIP with new transcoder progress monitor * A whole different WIP in progress monitoring via local PUTs * Use an actual hls playlist parser to rewrite master playlist * Cleanup * Private vs public path for thumbnail generation * Allow each storage provider to make decisions of how to store different types of files * Simplify inbound file writes * Revert * Split out set stream as connected/disconnected state methods * Update videojs * Add comment about the hls handler * Rework of the offline stream state. For #85 * Delete old unreferenced video segment files from disk * Cleanup all segments and revert to a completely offline state after 5min * Stop thumbnail generation on stream stop. Copy logo to thumbnail on cleanup. * Update transcoder test * Add comment * Return http 200 on success to transcoder. Tweak how files are written to disk * Force pixel color format in transcoder * Add debugging info for S3 transfers. Add default ACL. * Fix cleanup timer * Reset session stats when we cleanup the session. * Put log file back * Update test * File should not be a part of this commit * Add centralized shared performance timer for use anywhere * Post-rebase cleanup * Support returning nil from storage provider save * Updates to reflect package changes + other updates in master * Fix storage providers being overwritten * Do not return pointer in save. Support cache headers with S3 providers * Split out videojs + vhs and point to specific working versions of them * Bump vjs and vhs versions * Fix test * Remove unused * Update upload warning message * No longer valid comment * Pin videojs and vhs versions
176 lines
3.7 KiB
Go
176 lines
3.7 KiB
Go
package core
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"math"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/owncast/owncast/config"
|
|
"github.com/owncast/owncast/core/chat"
|
|
"github.com/owncast/owncast/geoip"
|
|
"github.com/owncast/owncast/models"
|
|
"github.com/owncast/owncast/utils"
|
|
)
|
|
|
|
const (
|
|
statsFilePath = "stats.json"
|
|
)
|
|
|
|
var l = sync.Mutex{}
|
|
|
|
func setupStats() error {
|
|
s, err := getSavedStats()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_stats = &s
|
|
|
|
statsSaveTimer := time.NewTicker(1 * time.Minute)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-statsSaveTimer.C:
|
|
if err := saveStatsToFile(); err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
staleViewerPurgeTimer := time.NewTicker(3 * time.Second)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-staleViewerPurgeTimer.C:
|
|
purgeStaleViewers()
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func purgeStaleViewers() {
|
|
for clientID, client := range _stats.Clients {
|
|
if client.LastSeen.IsZero() {
|
|
continue
|
|
}
|
|
|
|
timeSinceLastActive := time.Since(client.LastSeen).Minutes()
|
|
if timeSinceLastActive > 1 {
|
|
RemoveClient(clientID)
|
|
}
|
|
}
|
|
}
|
|
|
|
//IsStreamConnected checks if the stream is connected or not
|
|
func IsStreamConnected() bool {
|
|
if !_stats.StreamConnected {
|
|
return false
|
|
}
|
|
|
|
// Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available.
|
|
// So account for that with an artificial buffer of four segments.
|
|
timeSinceLastConnected := time.Since(_stats.LastConnectTime.Time).Seconds()
|
|
if timeSinceLastConnected < float64(config.Config.GetVideoSegmentSecondsLength())*3.0 {
|
|
return false
|
|
}
|
|
|
|
return _stats.StreamConnected
|
|
}
|
|
|
|
//SetClientActive sets a client as active and connected
|
|
func SetClientActive(client models.Client) {
|
|
l.Lock()
|
|
// If this clientID already exists then update it.
|
|
// Otherwise set a new one.
|
|
if existingClient, ok := _stats.Clients[client.ClientID]; ok {
|
|
existingClient.LastSeen = time.Now()
|
|
existingClient.Username = client.Username
|
|
existingClient.MessageCount = client.MessageCount
|
|
existingClient.Geo = geoip.GetGeoFromIP(existingClient.IPAddress)
|
|
_stats.Clients[client.ClientID] = existingClient
|
|
} else {
|
|
if client.Geo == nil {
|
|
geoip.FetchGeoForIP(client.IPAddress)
|
|
}
|
|
_stats.Clients[client.ClientID] = client
|
|
}
|
|
l.Unlock()
|
|
|
|
// Don't update viewer counts if a live stream session is not active.
|
|
if _stats.StreamConnected {
|
|
_stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Clients)), float64(_stats.SessionMaxViewerCount)))
|
|
_stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount)))
|
|
}
|
|
}
|
|
|
|
//RemoveClient removes a client from the active clients record
|
|
func RemoveClient(clientID string) {
|
|
log.Trace("Removing the client:", clientID)
|
|
|
|
l.Lock()
|
|
delete(_stats.Clients, clientID)
|
|
l.Unlock()
|
|
}
|
|
|
|
func GetClients() []models.Client {
|
|
clients := make([]models.Client, 0)
|
|
for _, client := range _stats.Clients {
|
|
chatClient := chat.GetClient(client.ClientID)
|
|
if chatClient != nil {
|
|
clients = append(clients, chatClient.GetViewerClientFromChatClient())
|
|
} else {
|
|
clients = append(clients, client)
|
|
}
|
|
}
|
|
return clients
|
|
}
|
|
|
|
func saveStatsToFile() error {
|
|
jsonData, err := json.Marshal(_stats)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
f, err := os.Create(statsFilePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
if _, err := f.Write(jsonData); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func getSavedStats() (models.Stats, error) {
|
|
result := models.Stats{
|
|
Clients: make(map[string]models.Client),
|
|
}
|
|
|
|
if !utils.DoesFileExists(statsFilePath) {
|
|
return result, nil
|
|
}
|
|
|
|
jsonFile, err := ioutil.ReadFile(statsFilePath)
|
|
if err != nil {
|
|
return result, nil
|
|
}
|
|
|
|
if err := json.Unmarshal(jsonFile, &result); err != nil {
|
|
return result, nil
|
|
}
|
|
|
|
return result, nil
|
|
}
|