diff --git a/controllers/admin/status.go b/controllers/admin/status.go index ae56f5ad1..941df655d 100644 --- a/controllers/admin/status.go +++ b/controllers/admin/status.go @@ -6,6 +6,7 @@ import ( "github.com/owncast/owncast/core" "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/metrics" "github.com/owncast/owncast/models" "github.com/owncast/owncast/router/middleware" log "github.com/sirupsen/logrus" @@ -16,11 +17,12 @@ func Status(w http.ResponseWriter, r *http.Request) { broadcaster := core.GetBroadcaster() status := core.GetStatus() currentBroadcast := core.GetCurrentBroadcast() - + health := metrics.GetStreamHealthOverview() response := adminStatusResponse{ Broadcaster: broadcaster, CurrentBroadcast: currentBroadcast, Online: status.Online, + Health: health, ViewerCount: status.ViewerCount, OverallPeakViewerCount: status.OverallMaxViewerCount, SessionPeakViewerCount: status.SessionMaxViewerCount, @@ -38,12 +40,13 @@ func Status(w http.ResponseWriter, r *http.Request) { } type adminStatusResponse struct { - Broadcaster *models.Broadcaster `json:"broadcaster"` - CurrentBroadcast *models.CurrentBroadcast `json:"currentBroadcast"` - Online bool `json:"online"` - ViewerCount int `json:"viewerCount"` - OverallPeakViewerCount int `json:"overallPeakViewerCount"` - SessionPeakViewerCount int `json:"sessionPeakViewerCount"` - StreamTitle string `json:"streamTitle"` - VersionNumber string `json:"versionNumber"` + Broadcaster *models.Broadcaster `json:"broadcaster"` + CurrentBroadcast *models.CurrentBroadcast `json:"currentBroadcast"` + Online bool `json:"online"` + ViewerCount int `json:"viewerCount"` + OverallPeakViewerCount int `json:"overallPeakViewerCount"` + SessionPeakViewerCount int `json:"sessionPeakViewerCount"` + StreamTitle string `json:"streamTitle"` + Health *models.StreamHealthOverview `json:"health"` + VersionNumber string `json:"versionNumber"` } diff --git a/controllers/playbackMetrics.go b/controllers/playbackMetrics.go index 2b2f29df9..00242e7c5 100644 --- a/controllers/playbackMetrics.go +++ b/controllers/playbackMetrics.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/owncast/owncast/metrics" + "github.com/owncast/owncast/utils" log "github.com/sirupsen/logrus" ) @@ -32,9 +33,11 @@ func ReportPlaybackMetrics(w http.ResponseWriter, r *http.Request) { return } - metrics.RegisterPlaybackErrorCount(request.Errors) - metrics.RegisterPlayerBandwidth(request.Bandwidth) - metrics.RegisterPlayerLatency(request.Latency) - metrics.RegisterPlayerSegmentDownloadDuration(request.DownloadDuration) - metrics.RegisterQualityVariantChangesCount(request.QualityVariantChanges) + clientID := utils.GenerateClientIDFromRequest(r) + + metrics.RegisterPlaybackErrorCount(clientID, request.Errors) + metrics.RegisterPlayerBandwidth(clientID, request.Bandwidth) + metrics.RegisterPlayerLatency(clientID, request.Latency) + metrics.RegisterPlayerSegmentDownloadDuration(clientID, request.DownloadDuration) + metrics.RegisterQualityVariantChangesCount(clientID, request.QualityVariantChanges) } diff --git a/metrics/hardware.go b/metrics/hardware.go index 7b749827a..595944006 100644 --- a/metrics/hardware.go +++ b/metrics/hardware.go @@ -11,7 +11,7 @@ import ( ) // Max number of metrics we want to keep. -const maxCollectionValues = 500 +const maxCollectionValues = 300 func collectCPUUtilization() { if len(metrics.CPUUtilizations) > maxCollectionValues { diff --git a/metrics/healthOverview.go b/metrics/healthOverview.go new file mode 100644 index 000000000..897a5c052 --- /dev/null +++ b/metrics/healthOverview.go @@ -0,0 +1,95 @@ +package metrics + +import ( + "fmt" + "sort" + + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/models" +) + +var errorMessages = map[string]string{ + "LOWSPEED": "%d of %d clients (%d%%) are consuming video slower than, or too close to your bitrate of %d kbps.", + "PLAYBACK_ERRORS": "%d of %d clients (%d%%) are experiencing different, unspecified, playback issues.", +} + +// GetStreamHealthOverview will return the stream health overview. +func GetStreamHealthOverview() *models.StreamHealthOverview { + return metrics.streamHealthOverview +} + +func generateStreamHealthOverview() { + overview := models.StreamHealthOverview{ + Healthy: true, + HealthyPercentage: 100, + } + + defer func() { + metrics.streamHealthOverview = &overview + }() + + type singleVariant struct { + isVideoPassthrough bool + bitrate int + } + + outputVariants := data.GetStreamOutputVariants() + + streamSortVariants := make([]singleVariant, len(outputVariants)) + for i, variant := range outputVariants { + variantSort := singleVariant{ + bitrate: variant.VideoBitrate, + isVideoPassthrough: variant.IsVideoPassthrough, + } + streamSortVariants[i] = variantSort + } + + sort.Slice(streamSortVariants, func(i, j int) bool { + if streamSortVariants[i].isVideoPassthrough && !streamSortVariants[j].isVideoPassthrough { + return true + } + + if !streamSortVariants[i].isVideoPassthrough && streamSortVariants[j].isVideoPassthrough { + return false + } + + return streamSortVariants[i].bitrate > streamSortVariants[j].bitrate + }) + + lowestSupportedBitrate := float64(streamSortVariants[0].bitrate) + totalNumberOfClients := len(windowedBandwidths) + + if totalNumberOfClients == 0 { + return + } + + // Determine healthy status based on bandwidth speeds of clients. + unhealthyClientCount := 0 + for _, speed := range windowedBandwidths { + if int(speed) < int(lowestSupportedBitrate*1.1) { + unhealthyClientCount++ + } + } + if unhealthyClientCount > 0 { + overview.Message = fmt.Sprintf(errorMessages["LOWSPEED"], unhealthyClientCount, totalNumberOfClients, int((float64(unhealthyClientCount)/float64(totalNumberOfClients))*100), int(lowestSupportedBitrate)) + } + + // If bandwidth is ok, determine healthy status based on error + // counts of clients. + if unhealthyClientCount == 0 { + for _, errors := range windowedErrorCounts { + unhealthyClientCount += int(errors) + } + if unhealthyClientCount > 0 { + overview.Message = fmt.Sprintf(errorMessages["PLAYBACK_ERRORS"], unhealthyClientCount, totalNumberOfClients, int((float64(unhealthyClientCount)/float64(totalNumberOfClients))*100)) + } + } + + if unhealthyClientCount == 0 { + return + } + + percentUnhealthy := 100 - ((float64(unhealthyClientCount) / float64(totalNumberOfClients)) * 100) + overview.HealthyPercentage = int(percentUnhealthy) + overview.Healthy = overview.HealthyPercentage > 95 +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 377a566e2..513861e24 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -11,6 +11,7 @@ import ( // How often we poll for updates. const hardwareMetricsPollingInterval = 1 * time.Minute +const playbackMetricsPollingInterval = 3 * time.Minute const ( // How often we poll for updates. @@ -41,13 +42,18 @@ type CollectedMetrics struct { medianLatency []TimestampedValue `json:"-"` qualityVariantChanges []TimestampedValue `json:"-"` + + streamHealthOverview *models.StreamHealthOverview } // Metrics is the shared Metrics instance. var metrics *CollectedMetrics +var _getStatus func() models.Status + // Start will begin the metrics collection and alerting. func Start(getStatus func() models.Status) { + _getStatus = getStatus host := data.GetServerURL() if host == "" { host = "unknown" @@ -62,9 +68,17 @@ func Start(getStatus func() models.Status) { metrics = new(CollectedMetrics) go startViewerCollectionMetrics() - for range time.Tick(hardwareMetricsPollingInterval) { - handlePolling() - } + go func() { + for range time.Tick(hardwareMetricsPollingInterval) { + handlePolling() + } + }() + + go func() { + for range time.Tick(playbackMetricsPollingInterval) { + handlePlaybackPolling() + } + }() } func handlePolling() { @@ -76,12 +90,6 @@ func handlePolling() { collectRAMUtilization() collectDiskUtilization() - collectPlaybackErrorCount() - collectLatencyValues() - collectSegmentDownloadDuration() - collectLowestBandwidth() - collectQualityVariantChanges() - // Alerting handleAlerting() } diff --git a/metrics/playback.go b/metrics/playback.go index 45312cd47..280eea9b5 100644 --- a/metrics/playback.go +++ b/metrics/playback.go @@ -9,48 +9,65 @@ import ( // Playback error counts reported since the last time we collected metrics. var ( - windowedErrorCounts = []float64{} - windowedQualityVariantChanges = []float64{} - windowedBandwidths = []float64{} - windowedLatencies = []float64{} - windowedDownloadDurations = []float64{} + windowedErrorCounts = map[string]float64{} + windowedQualityVariantChanges = map[string]float64{} + windowedBandwidths = map[string]float64{} + windowedLatencies = map[string]float64{} + windowedDownloadDurations = map[string]float64{} ) -// RegisterPlaybackErrorCount will add to the windowed playback error count. -func RegisterPlaybackErrorCount(count float64) { +func handlePlaybackPolling() { metrics.m.Lock() defer metrics.m.Unlock() - windowedErrorCounts = append(windowedErrorCounts, count) + + // Make sure this is fired first before all the values get cleared below. + if _getStatus().Online { + generateStreamHealthOverview() + } + + collectPlaybackErrorCount() + collectLatencyValues() + collectSegmentDownloadDuration() + collectLowestBandwidth() + collectQualityVariantChanges() +} + +// RegisterPlaybackErrorCount will add to the windowed playback error count. +func RegisterPlaybackErrorCount(clientID string, count float64) { + metrics.m.Lock() + defer metrics.m.Unlock() + windowedErrorCounts[clientID] = count + // windowedErrorCounts = append(windowedErrorCounts, count) } // RegisterQualityVariantChangesCount will add to the windowed quality variant // change count. -func RegisterQualityVariantChangesCount(count float64) { +func RegisterQualityVariantChangesCount(clientID string, count float64) { metrics.m.Lock() defer metrics.m.Unlock() - windowedQualityVariantChanges = append(windowedQualityVariantChanges, count) + windowedQualityVariantChanges[clientID] = count } // RegisterPlayerBandwidth will add to the windowed playback bandwidth. -func RegisterPlayerBandwidth(kbps float64) { +func RegisterPlayerBandwidth(clientID string, kbps float64) { metrics.m.Lock() defer metrics.m.Unlock() - windowedBandwidths = append(windowedBandwidths, kbps) + windowedBandwidths[clientID] = kbps } // RegisterPlayerLatency will add to the windowed player latency values. -func RegisterPlayerLatency(seconds float64) { +func RegisterPlayerLatency(clientID string, seconds float64) { metrics.m.Lock() defer metrics.m.Unlock() - windowedLatencies = append(windowedLatencies, seconds) + windowedLatencies[clientID] = seconds } // RegisterPlayerSegmentDownloadDuration will add to the windowed player segment // download duration values. -func RegisterPlayerSegmentDownloadDuration(seconds float64) { +func RegisterPlayerSegmentDownloadDuration(clientID string, seconds float64) { metrics.m.Lock() defer metrics.m.Unlock() - windowedDownloadDurations = append(windowedDownloadDurations, seconds) + windowedDownloadDurations[clientID] = seconds } // collectPlaybackErrorCount will take all of the error counts each individual @@ -58,8 +75,9 @@ func RegisterPlayerSegmentDownloadDuration(seconds float64) { // one person with bad connectivity doesn't make it look like everything is // horrible for everyone. func collectPlaybackErrorCount() { - count := utils.Sum(windowedErrorCounts) - windowedErrorCounts = []float64{} + valueSlice := utils.Float64MapToSlice(windowedErrorCounts) + count := utils.Sum(valueSlice) + windowedErrorCounts = map[string]float64{} metrics.errorCount = append(metrics.errorCount, TimestampedValue{ Time: time.Now(), @@ -79,10 +97,12 @@ func collectSegmentDownloadDuration() { max := 0.0 min := 0.0 - if len(windowedDownloadDurations) > 0 { - median = utils.Median(windowedDownloadDurations) - min, max = utils.MinMax(windowedDownloadDurations) - windowedDownloadDurations = []float64{} + valueSlice := utils.Float64MapToSlice(windowedDownloadDurations) + + if len(valueSlice) > 0 { + median = utils.Median(valueSlice) + min, max = utils.MinMax(valueSlice) + windowedDownloadDurations = map[string]float64{} } metrics.medianSegmentDownloadSeconds = append(metrics.medianSegmentDownloadSeconds, TimestampedValue{ @@ -138,10 +158,13 @@ func collectLatencyValues() { min := 0.0 max := 0.0 - if len(windowedLatencies) > 0 { - median = utils.Median(windowedLatencies) - min, max = utils.MinMax(windowedLatencies) - windowedLatencies = []float64{} + valueSlice := utils.Float64MapToSlice(windowedLatencies) + windowedLatencies = map[string]float64{} + + if len(valueSlice) > 0 { + median = utils.Median(valueSlice) + min, max = utils.MinMax(valueSlice) + windowedLatencies = map[string]float64{} } metrics.medianLatency = append(metrics.medianLatency, TimestampedValue{ @@ -207,12 +230,14 @@ func collectLowestBandwidth() { median := 0.0 max := 0.0 + valueSlice := utils.Float64MapToSlice(windowedBandwidths) + if len(windowedBandwidths) > 0 { - min, max = utils.MinMax(windowedBandwidths) + min, max = utils.MinMax(valueSlice) min = math.Round(min) max = math.Round(max) - median = utils.Median(windowedBandwidths) - windowedBandwidths = []float64{} + median = utils.Median(valueSlice) + windowedBandwidths = map[string]float64{} } metrics.lowestBitrate = append(metrics.lowestBitrate, TimestampedValue{ @@ -286,8 +311,9 @@ func GetMaxDownloadRateOverTime() []TimestampedValue { } func collectQualityVariantChanges() { - count := utils.Sum(windowedQualityVariantChanges) - windowedQualityVariantChanges = []float64{} + valueSlice := utils.Float64MapToSlice(windowedQualityVariantChanges) + count := utils.Sum(valueSlice) + windowedQualityVariantChanges = map[string]float64{} metrics.qualityVariantChanges = append(metrics.qualityVariantChanges, TimestampedValue{ Time: time.Now(), diff --git a/models/streamHealth.go b/models/streamHealth.go new file mode 100644 index 000000000..a85b8fb33 --- /dev/null +++ b/models/streamHealth.go @@ -0,0 +1,8 @@ +package models + +// StreamHealthOverview represents an overview of the current stream health. +type StreamHealthOverview struct { + Healthy bool `json:"healthy"` + HealthyPercentage int `json:"healthPercentage"` + Message string `json:"message"` +} diff --git a/utils/utils.go b/utils/utils.go index b68e74b28..a90136214 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -318,7 +318,7 @@ func FindInSlice(slice []string, val string) (int, bool) { return -1, false } -// StringSliceToMap is a convinience function to convert a slice of strings into +// StringSliceToMap is a convenience function to convert a slice of strings into // a map using the string as the key. func StringSliceToMap(stringSlice []string) map[string]interface{} { stringMap := map[string]interface{}{} @@ -330,6 +330,17 @@ func StringSliceToMap(stringSlice []string) map[string]interface{} { return stringMap } +// Float64MapToSlice is a convenience function to convert a map of floats into. +func Float64MapToSlice(float64Map map[string]float64) []float64 { + float64Slice := []float64{} + + for _, val := range float64Map { + float64Slice = append(float64Slice, val) + } + + return float64Slice +} + // StringMapKeys returns a slice of string keys from a map. func StringMapKeys(stringMap map[string]interface{}) []string { stringSlice := []string{}