0

Add stream health overview collection + apis

This commit is contained in:
Gabe Kangas 2022-03-24 23:06:47 -07:00
parent 729de44fce
commit 410b413b84
No known key found for this signature in database
GPG Key ID: 9A56337728BC81EA
8 changed files with 210 additions and 56 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/owncast/owncast/core" "github.com/owncast/owncast/core"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/metrics"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/router/middleware" "github.com/owncast/owncast/router/middleware"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -16,11 +17,12 @@ func Status(w http.ResponseWriter, r *http.Request) {
broadcaster := core.GetBroadcaster() broadcaster := core.GetBroadcaster()
status := core.GetStatus() status := core.GetStatus()
currentBroadcast := core.GetCurrentBroadcast() currentBroadcast := core.GetCurrentBroadcast()
health := metrics.GetStreamHealthOverview()
response := adminStatusResponse{ response := adminStatusResponse{
Broadcaster: broadcaster, Broadcaster: broadcaster,
CurrentBroadcast: currentBroadcast, CurrentBroadcast: currentBroadcast,
Online: status.Online, Online: status.Online,
Health: health,
ViewerCount: status.ViewerCount, ViewerCount: status.ViewerCount,
OverallPeakViewerCount: status.OverallMaxViewerCount, OverallPeakViewerCount: status.OverallMaxViewerCount,
SessionPeakViewerCount: status.SessionMaxViewerCount, SessionPeakViewerCount: status.SessionMaxViewerCount,
@ -38,12 +40,13 @@ func Status(w http.ResponseWriter, r *http.Request) {
} }
type adminStatusResponse struct { type adminStatusResponse struct {
Broadcaster *models.Broadcaster `json:"broadcaster"` Broadcaster *models.Broadcaster `json:"broadcaster"`
CurrentBroadcast *models.CurrentBroadcast `json:"currentBroadcast"` CurrentBroadcast *models.CurrentBroadcast `json:"currentBroadcast"`
Online bool `json:"online"` Online bool `json:"online"`
ViewerCount int `json:"viewerCount"` ViewerCount int `json:"viewerCount"`
OverallPeakViewerCount int `json:"overallPeakViewerCount"` OverallPeakViewerCount int `json:"overallPeakViewerCount"`
SessionPeakViewerCount int `json:"sessionPeakViewerCount"` SessionPeakViewerCount int `json:"sessionPeakViewerCount"`
StreamTitle string `json:"streamTitle"` StreamTitle string `json:"streamTitle"`
VersionNumber string `json:"versionNumber"` Health *models.StreamHealthOverview `json:"health"`
VersionNumber string `json:"versionNumber"`
} }

View File

@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"github.com/owncast/owncast/metrics" "github.com/owncast/owncast/metrics"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -32,9 +33,11 @@ func ReportPlaybackMetrics(w http.ResponseWriter, r *http.Request) {
return return
} }
metrics.RegisterPlaybackErrorCount(request.Errors) clientID := utils.GenerateClientIDFromRequest(r)
metrics.RegisterPlayerBandwidth(request.Bandwidth)
metrics.RegisterPlayerLatency(request.Latency) metrics.RegisterPlaybackErrorCount(clientID, request.Errors)
metrics.RegisterPlayerSegmentDownloadDuration(request.DownloadDuration) metrics.RegisterPlayerBandwidth(clientID, request.Bandwidth)
metrics.RegisterQualityVariantChangesCount(request.QualityVariantChanges) metrics.RegisterPlayerLatency(clientID, request.Latency)
metrics.RegisterPlayerSegmentDownloadDuration(clientID, request.DownloadDuration)
metrics.RegisterQualityVariantChangesCount(clientID, request.QualityVariantChanges)
} }

View File

@ -11,7 +11,7 @@ import (
) )
// Max number of metrics we want to keep. // Max number of metrics we want to keep.
const maxCollectionValues = 500 const maxCollectionValues = 300
func collectCPUUtilization() { func collectCPUUtilization() {
if len(metrics.CPUUtilizations) > maxCollectionValues { if len(metrics.CPUUtilizations) > maxCollectionValues {

95
metrics/healthOverview.go Normal file
View File

@ -0,0 +1,95 @@
package metrics
import (
"fmt"
"sort"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
)
var errorMessages = map[string]string{
"LOWSPEED": "%d of %d clients (%d%%) are consuming video slower than, or too close to your bitrate of %d kbps.",
"PLAYBACK_ERRORS": "%d of %d clients (%d%%) are experiencing different, unspecified, playback issues.",
}
// GetStreamHealthOverview will return the stream health overview.
func GetStreamHealthOverview() *models.StreamHealthOverview {
return metrics.streamHealthOverview
}
func generateStreamHealthOverview() {
overview := models.StreamHealthOverview{
Healthy: true,
HealthyPercentage: 100,
}
defer func() {
metrics.streamHealthOverview = &overview
}()
type singleVariant struct {
isVideoPassthrough bool
bitrate int
}
outputVariants := data.GetStreamOutputVariants()
streamSortVariants := make([]singleVariant, len(outputVariants))
for i, variant := range outputVariants {
variantSort := singleVariant{
bitrate: variant.VideoBitrate,
isVideoPassthrough: variant.IsVideoPassthrough,
}
streamSortVariants[i] = variantSort
}
sort.Slice(streamSortVariants, func(i, j int) bool {
if streamSortVariants[i].isVideoPassthrough && !streamSortVariants[j].isVideoPassthrough {
return true
}
if !streamSortVariants[i].isVideoPassthrough && streamSortVariants[j].isVideoPassthrough {
return false
}
return streamSortVariants[i].bitrate > streamSortVariants[j].bitrate
})
lowestSupportedBitrate := float64(streamSortVariants[0].bitrate)
totalNumberOfClients := len(windowedBandwidths)
if totalNumberOfClients == 0 {
return
}
// Determine healthy status based on bandwidth speeds of clients.
unhealthyClientCount := 0
for _, speed := range windowedBandwidths {
if int(speed) < int(lowestSupportedBitrate*1.1) {
unhealthyClientCount++
}
}
if unhealthyClientCount > 0 {
overview.Message = fmt.Sprintf(errorMessages["LOWSPEED"], unhealthyClientCount, totalNumberOfClients, int((float64(unhealthyClientCount)/float64(totalNumberOfClients))*100), int(lowestSupportedBitrate))
}
// If bandwidth is ok, determine healthy status based on error
// counts of clients.
if unhealthyClientCount == 0 {
for _, errors := range windowedErrorCounts {
unhealthyClientCount += int(errors)
}
if unhealthyClientCount > 0 {
overview.Message = fmt.Sprintf(errorMessages["PLAYBACK_ERRORS"], unhealthyClientCount, totalNumberOfClients, int((float64(unhealthyClientCount)/float64(totalNumberOfClients))*100))
}
}
if unhealthyClientCount == 0 {
return
}
percentUnhealthy := 100 - ((float64(unhealthyClientCount) / float64(totalNumberOfClients)) * 100)
overview.HealthyPercentage = int(percentUnhealthy)
overview.Healthy = overview.HealthyPercentage > 95
}

View File

@ -11,6 +11,7 @@ import (
// How often we poll for updates. // How often we poll for updates.
const hardwareMetricsPollingInterval = 1 * time.Minute const hardwareMetricsPollingInterval = 1 * time.Minute
const playbackMetricsPollingInterval = 3 * time.Minute
const ( const (
// How often we poll for updates. // How often we poll for updates.
@ -41,13 +42,18 @@ type CollectedMetrics struct {
medianLatency []TimestampedValue `json:"-"` medianLatency []TimestampedValue `json:"-"`
qualityVariantChanges []TimestampedValue `json:"-"` qualityVariantChanges []TimestampedValue `json:"-"`
streamHealthOverview *models.StreamHealthOverview
} }
// Metrics is the shared Metrics instance. // Metrics is the shared Metrics instance.
var metrics *CollectedMetrics var metrics *CollectedMetrics
var _getStatus func() models.Status
// Start will begin the metrics collection and alerting. // Start will begin the metrics collection and alerting.
func Start(getStatus func() models.Status) { func Start(getStatus func() models.Status) {
_getStatus = getStatus
host := data.GetServerURL() host := data.GetServerURL()
if host == "" { if host == "" {
host = "unknown" host = "unknown"
@ -62,9 +68,17 @@ func Start(getStatus func() models.Status) {
metrics = new(CollectedMetrics) metrics = new(CollectedMetrics)
go startViewerCollectionMetrics() go startViewerCollectionMetrics()
for range time.Tick(hardwareMetricsPollingInterval) { go func() {
handlePolling() for range time.Tick(hardwareMetricsPollingInterval) {
} handlePolling()
}
}()
go func() {
for range time.Tick(playbackMetricsPollingInterval) {
handlePlaybackPolling()
}
}()
} }
func handlePolling() { func handlePolling() {
@ -76,12 +90,6 @@ func handlePolling() {
collectRAMUtilization() collectRAMUtilization()
collectDiskUtilization() collectDiskUtilization()
collectPlaybackErrorCount()
collectLatencyValues()
collectSegmentDownloadDuration()
collectLowestBandwidth()
collectQualityVariantChanges()
// Alerting // Alerting
handleAlerting() handleAlerting()
} }

View File

@ -9,48 +9,65 @@ import (
// Playback error counts reported since the last time we collected metrics. // Playback error counts reported since the last time we collected metrics.
var ( var (
windowedErrorCounts = []float64{} windowedErrorCounts = map[string]float64{}
windowedQualityVariantChanges = []float64{} windowedQualityVariantChanges = map[string]float64{}
windowedBandwidths = []float64{} windowedBandwidths = map[string]float64{}
windowedLatencies = []float64{} windowedLatencies = map[string]float64{}
windowedDownloadDurations = []float64{} windowedDownloadDurations = map[string]float64{}
) )
// RegisterPlaybackErrorCount will add to the windowed playback error count. func handlePlaybackPolling() {
func RegisterPlaybackErrorCount(count float64) {
metrics.m.Lock() metrics.m.Lock()
defer metrics.m.Unlock() defer metrics.m.Unlock()
windowedErrorCounts = append(windowedErrorCounts, count)
// Make sure this is fired first before all the values get cleared below.
if _getStatus().Online {
generateStreamHealthOverview()
}
collectPlaybackErrorCount()
collectLatencyValues()
collectSegmentDownloadDuration()
collectLowestBandwidth()
collectQualityVariantChanges()
}
// RegisterPlaybackErrorCount will add to the windowed playback error count.
func RegisterPlaybackErrorCount(clientID string, count float64) {
metrics.m.Lock()
defer metrics.m.Unlock()
windowedErrorCounts[clientID] = count
// windowedErrorCounts = append(windowedErrorCounts, count)
} }
// RegisterQualityVariantChangesCount will add to the windowed quality variant // RegisterQualityVariantChangesCount will add to the windowed quality variant
// change count. // change count.
func RegisterQualityVariantChangesCount(count float64) { func RegisterQualityVariantChangesCount(clientID string, count float64) {
metrics.m.Lock() metrics.m.Lock()
defer metrics.m.Unlock() defer metrics.m.Unlock()
windowedQualityVariantChanges = append(windowedQualityVariantChanges, count) windowedQualityVariantChanges[clientID] = count
} }
// RegisterPlayerBandwidth will add to the windowed playback bandwidth. // RegisterPlayerBandwidth will add to the windowed playback bandwidth.
func RegisterPlayerBandwidth(kbps float64) { func RegisterPlayerBandwidth(clientID string, kbps float64) {
metrics.m.Lock() metrics.m.Lock()
defer metrics.m.Unlock() defer metrics.m.Unlock()
windowedBandwidths = append(windowedBandwidths, kbps) windowedBandwidths[clientID] = kbps
} }
// RegisterPlayerLatency will add to the windowed player latency values. // RegisterPlayerLatency will add to the windowed player latency values.
func RegisterPlayerLatency(seconds float64) { func RegisterPlayerLatency(clientID string, seconds float64) {
metrics.m.Lock() metrics.m.Lock()
defer metrics.m.Unlock() defer metrics.m.Unlock()
windowedLatencies = append(windowedLatencies, seconds) windowedLatencies[clientID] = seconds
} }
// RegisterPlayerSegmentDownloadDuration will add to the windowed player segment // RegisterPlayerSegmentDownloadDuration will add to the windowed player segment
// download duration values. // download duration values.
func RegisterPlayerSegmentDownloadDuration(seconds float64) { func RegisterPlayerSegmentDownloadDuration(clientID string, seconds float64) {
metrics.m.Lock() metrics.m.Lock()
defer metrics.m.Unlock() defer metrics.m.Unlock()
windowedDownloadDurations = append(windowedDownloadDurations, seconds) windowedDownloadDurations[clientID] = seconds
} }
// collectPlaybackErrorCount will take all of the error counts each individual // collectPlaybackErrorCount will take all of the error counts each individual
@ -58,8 +75,9 @@ func RegisterPlayerSegmentDownloadDuration(seconds float64) {
// one person with bad connectivity doesn't make it look like everything is // one person with bad connectivity doesn't make it look like everything is
// horrible for everyone. // horrible for everyone.
func collectPlaybackErrorCount() { func collectPlaybackErrorCount() {
count := utils.Sum(windowedErrorCounts) valueSlice := utils.Float64MapToSlice(windowedErrorCounts)
windowedErrorCounts = []float64{} count := utils.Sum(valueSlice)
windowedErrorCounts = map[string]float64{}
metrics.errorCount = append(metrics.errorCount, TimestampedValue{ metrics.errorCount = append(metrics.errorCount, TimestampedValue{
Time: time.Now(), Time: time.Now(),
@ -79,10 +97,12 @@ func collectSegmentDownloadDuration() {
max := 0.0 max := 0.0
min := 0.0 min := 0.0
if len(windowedDownloadDurations) > 0 { valueSlice := utils.Float64MapToSlice(windowedDownloadDurations)
median = utils.Median(windowedDownloadDurations)
min, max = utils.MinMax(windowedDownloadDurations) if len(valueSlice) > 0 {
windowedDownloadDurations = []float64{} median = utils.Median(valueSlice)
min, max = utils.MinMax(valueSlice)
windowedDownloadDurations = map[string]float64{}
} }
metrics.medianSegmentDownloadSeconds = append(metrics.medianSegmentDownloadSeconds, TimestampedValue{ metrics.medianSegmentDownloadSeconds = append(metrics.medianSegmentDownloadSeconds, TimestampedValue{
@ -138,10 +158,13 @@ func collectLatencyValues() {
min := 0.0 min := 0.0
max := 0.0 max := 0.0
if len(windowedLatencies) > 0 { valueSlice := utils.Float64MapToSlice(windowedLatencies)
median = utils.Median(windowedLatencies) windowedLatencies = map[string]float64{}
min, max = utils.MinMax(windowedLatencies)
windowedLatencies = []float64{} if len(valueSlice) > 0 {
median = utils.Median(valueSlice)
min, max = utils.MinMax(valueSlice)
windowedLatencies = map[string]float64{}
} }
metrics.medianLatency = append(metrics.medianLatency, TimestampedValue{ metrics.medianLatency = append(metrics.medianLatency, TimestampedValue{
@ -207,12 +230,14 @@ func collectLowestBandwidth() {
median := 0.0 median := 0.0
max := 0.0 max := 0.0
valueSlice := utils.Float64MapToSlice(windowedBandwidths)
if len(windowedBandwidths) > 0 { if len(windowedBandwidths) > 0 {
min, max = utils.MinMax(windowedBandwidths) min, max = utils.MinMax(valueSlice)
min = math.Round(min) min = math.Round(min)
max = math.Round(max) max = math.Round(max)
median = utils.Median(windowedBandwidths) median = utils.Median(valueSlice)
windowedBandwidths = []float64{} windowedBandwidths = map[string]float64{}
} }
metrics.lowestBitrate = append(metrics.lowestBitrate, TimestampedValue{ metrics.lowestBitrate = append(metrics.lowestBitrate, TimestampedValue{
@ -286,8 +311,9 @@ func GetMaxDownloadRateOverTime() []TimestampedValue {
} }
func collectQualityVariantChanges() { func collectQualityVariantChanges() {
count := utils.Sum(windowedQualityVariantChanges) valueSlice := utils.Float64MapToSlice(windowedQualityVariantChanges)
windowedQualityVariantChanges = []float64{} count := utils.Sum(valueSlice)
windowedQualityVariantChanges = map[string]float64{}
metrics.qualityVariantChanges = append(metrics.qualityVariantChanges, TimestampedValue{ metrics.qualityVariantChanges = append(metrics.qualityVariantChanges, TimestampedValue{
Time: time.Now(), Time: time.Now(),

8
models/streamHealth.go Normal file
View File

@ -0,0 +1,8 @@
package models
// StreamHealthOverview represents an overview of the current stream health.
type StreamHealthOverview struct {
Healthy bool `json:"healthy"`
HealthyPercentage int `json:"healthPercentage"`
Message string `json:"message"`
}

View File

@ -318,7 +318,7 @@ func FindInSlice(slice []string, val string) (int, bool) {
return -1, false return -1, false
} }
// StringSliceToMap is a convinience function to convert a slice of strings into // StringSliceToMap is a convenience function to convert a slice of strings into
// a map using the string as the key. // a map using the string as the key.
func StringSliceToMap(stringSlice []string) map[string]interface{} { func StringSliceToMap(stringSlice []string) map[string]interface{} {
stringMap := map[string]interface{}{} stringMap := map[string]interface{}{}
@ -330,6 +330,17 @@ func StringSliceToMap(stringSlice []string) map[string]interface{} {
return stringMap return stringMap
} }
// Float64MapToSlice is a convenience function to convert a map of floats into.
func Float64MapToSlice(float64Map map[string]float64) []float64 {
float64Slice := []float64{}
for _, val := range float64Map {
float64Slice = append(float64Slice, val)
}
return float64Slice
}
// StringMapKeys returns a slice of string keys from a map. // StringMapKeys returns a slice of string keys from a map.
func StringMapKeys(stringMap map[string]interface{}) []string { func StringMapKeys(stringMap map[string]interface{}) []string {
stringSlice := []string{} stringSlice := []string{}