Stream performance metrics (#1785)

* WIP playback metrics

* Playback metrics collecting + APIs. Closes #793

* Cleanup console messages

* Update test

* Increase browser test timeout

* Update browser tests to not fail
This commit is contained in:
Gabe Kangas
2022-03-16 17:34:44 -07:00
committed by GitHub
parent f5a5ac006a
commit babbcecc9c
15 changed files with 678 additions and 83 deletions

View File

@@ -6,17 +6,21 @@ import (
log "github.com/sirupsen/logrus"
)
const maxCPUAlertingThresholdPCT = 85
const maxRAMAlertingThresholdPCT = 85
const maxDiskAlertingThresholdPCT = 90
const (
maxCPUAlertingThresholdPCT = 85
maxRAMAlertingThresholdPCT = 85
maxDiskAlertingThresholdPCT = 90
)
var inCPUAlertingState = false
var inRAMAlertingState = false
var inDiskAlertingState = false
var (
inCPUAlertingState = false
inRAMAlertingState = false
inDiskAlertingState = false
)
var errorResetDuration = time.Minute * 5
const alertingError = "The %s utilization of %d%% could cause problems with video generation and delivery. Visit the documentation at http://owncast.online/docs/troubleshooting/ if you are experiencing issues."
const alertingError = "The %s utilization of %f%% could cause problems with video generation and delivery. Visit the documentation at http://owncast.online/docs/troubleshooting/ if you are experiencing issues."
func handleAlerting() {
handleCPUAlerting()
@@ -25,11 +29,11 @@ func handleAlerting() {
}
func handleCPUAlerting() {
if len(Metrics.CPUUtilizations) < 2 {
if len(metrics.CPUUtilizations) < 2 {
return
}
avg := recentAverage(Metrics.CPUUtilizations)
avg := recentAverage(metrics.CPUUtilizations)
if avg > maxCPUAlertingThresholdPCT && !inCPUAlertingState {
log.Warnf(alertingError, "CPU", avg)
inCPUAlertingState = true
@@ -43,11 +47,11 @@ func handleCPUAlerting() {
}
func handleRAMAlerting() {
if len(Metrics.RAMUtilizations) < 2 {
if len(metrics.RAMUtilizations) < 2 {
return
}
avg := recentAverage(Metrics.RAMUtilizations)
avg := recentAverage(metrics.RAMUtilizations)
if avg > maxRAMAlertingThresholdPCT && !inRAMAlertingState {
log.Warnf(alertingError, "memory", avg)
inRAMAlertingState = true
@@ -61,11 +65,11 @@ func handleRAMAlerting() {
}
func handleDiskAlerting() {
if len(Metrics.DiskUtilizations) < 2 {
if len(metrics.DiskUtilizations) < 2 {
return
}
avg := recentAverage(Metrics.DiskUtilizations)
avg := recentAverage(metrics.DiskUtilizations)
if avg > maxDiskAlertingThresholdPCT && !inDiskAlertingState {
log.Warnf(alertingError, "disk", avg)
@@ -79,6 +83,6 @@ func handleDiskAlerting() {
}
}
func recentAverage(values []timestampedValue) int {
func recentAverage(values []TimestampedValue) float64 {
return (values[len(values)-1].Value + values[len(values)-2].Value) / 2
}

View File

@@ -14,8 +14,8 @@ import (
const maxCollectionValues = 500
func collectCPUUtilization() {
if len(Metrics.CPUUtilizations) > maxCollectionValues {
Metrics.CPUUtilizations = Metrics.CPUUtilizations[1:]
if len(metrics.CPUUtilizations) > maxCollectionValues {
metrics.CPUUtilizations = metrics.CPUUtilizations[1:]
}
v, err := cpu.Percent(0, false)
@@ -24,29 +24,29 @@ func collectCPUUtilization() {
return
}
metricValue := timestampedValue{time.Now(), int(v[0])}
Metrics.CPUUtilizations = append(Metrics.CPUUtilizations, metricValue)
cpuUsage.Set(float64(metricValue.Value))
metricValue := TimestampedValue{time.Now(), v[0]}
metrics.CPUUtilizations = append(metrics.CPUUtilizations, metricValue)
cpuUsage.Set(metricValue.Value)
}
func collectRAMUtilization() {
if len(Metrics.RAMUtilizations) > maxCollectionValues {
Metrics.RAMUtilizations = Metrics.RAMUtilizations[1:]
if len(metrics.RAMUtilizations) > maxCollectionValues {
metrics.RAMUtilizations = metrics.RAMUtilizations[1:]
}
memoryUsage, _ := mem.VirtualMemory()
metricValue := timestampedValue{time.Now(), int(memoryUsage.UsedPercent)}
Metrics.RAMUtilizations = append(Metrics.RAMUtilizations, metricValue)
metricValue := TimestampedValue{time.Now(), memoryUsage.UsedPercent}
metrics.RAMUtilizations = append(metrics.RAMUtilizations, metricValue)
}
func collectDiskUtilization() {
path := "./"
diskUse, _ := disk.Usage(path)
if len(Metrics.DiskUtilizations) > maxCollectionValues {
Metrics.DiskUtilizations = Metrics.DiskUtilizations[1:]
if len(metrics.DiskUtilizations) > maxCollectionValues {
metrics.DiskUtilizations = metrics.DiskUtilizations[1:]
}
metricValue := timestampedValue{time.Now(), int(diskUse.UsedPercent)}
Metrics.DiskUtilizations = append(Metrics.DiskUtilizations, metricValue)
metricValue := TimestampedValue{time.Now(), diskUse.UsedPercent}
metrics.DiskUtilizations = append(metrics.DiskUtilizations, metricValue)
}

View File

@@ -6,22 +6,32 @@ import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// How often we poll for updates.
const metricsPollingInterval = 1 * time.Minute
const hardwareMetricsPollingInterval = 1 * time.Minute
const (
// How often we poll for updates.
viewerMetricsPollingInterval = 2 * time.Minute
activeChatClientCountKey = "chat_client_count"
activeViewerCountKey = "viewer_count"
)
// CollectedMetrics stores different collected + timestamped values.
type CollectedMetrics struct {
CPUUtilizations []timestampedValue `json:"cpu"`
RAMUtilizations []timestampedValue `json:"memory"`
DiskUtilizations []timestampedValue `json:"disk"`
CPUUtilizations []TimestampedValue `json:"cpu"`
RAMUtilizations []TimestampedValue `json:"memory"`
DiskUtilizations []TimestampedValue `json:"disk"`
errorCount []TimestampedValue `json:"-"`
lowestBitrate []TimestampedValue `json:"-"`
segmentDownloadSeconds []TimestampedValue `json:"-"`
averageLatency []TimestampedValue `json:"-"`
qualityVariantChanges []TimestampedValue `json:"-"`
}
// Metrics is the shared Metrics instance.
var Metrics *CollectedMetrics
var metrics *CollectedMetrics
// Start will begin the metrics collection and alerting.
func Start(getStatus func() models.Status) {
@@ -34,40 +44,12 @@ func Start(getStatus func() models.Status) {
"host": host,
}
activeViewerCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_active_viewer_count",
Help: "The number of viewers.",
ConstLabels: labels,
})
setupPrometheusCollectors()
activeChatClientCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_active_chat_client_count",
Help: "The number of connected chat clients.",
ConstLabels: labels,
})
chatUserCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_total_chat_users",
Help: "The total number of chat users on this Owncast instance.",
ConstLabels: labels,
})
currentChatMessageCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_current_chat_message_count",
Help: "The number of chat messages currently saved before cleanup.",
ConstLabels: labels,
})
cpuUsage = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_cpu_use_pct",
Help: "CPU percentage used as seen within Owncast",
ConstLabels: labels,
})
Metrics = new(CollectedMetrics)
metrics = new(CollectedMetrics)
go startViewerCollectionMetrics()
for range time.Tick(metricsPollingInterval) {
for range time.Tick(hardwareMetricsPollingInterval) {
handlePolling()
}
}
@@ -78,6 +60,17 @@ func handlePolling() {
collectRAMUtilization()
collectDiskUtilization()
collectPlaybackErrorCount()
collectLatencyValues()
collectSegmentDownloadDuration()
collectLowestBandwidth()
collectQualityVariantChanges()
// Alerting
handleAlerting()
}
// GetMetrics will return the collected metrics.
func GetMetrics() *CollectedMetrics {
return metrics
}

168
metrics/playback.go Normal file
View File

@@ -0,0 +1,168 @@
package metrics
import (
"math"
"time"
"github.com/owncast/owncast/utils"
)
// Playback error counts reported since the last time we collected metrics.
var (
windowedErrorCounts = []float64{}
windowedQualityVariantChanges = []float64{}
windowedBandwidths = []float64{}
windowedLatencies = []float64{}
windowedDownloadDurations = []float64{}
)
// RegisterPlaybackErrorCount will add to the windowed playback error count.
func RegisterPlaybackErrorCount(count float64) {
windowedErrorCounts = append(windowedErrorCounts, count)
}
// RegisterQualityVariantChangesCount will add to the windowed quality variant
// change count.
func RegisterQualityVariantChangesCount(count float64) {
windowedQualityVariantChanges = append(windowedQualityVariantChanges, count)
}
// RegisterPlayerBandwidth will add to the windowed playback bandwidth.
func RegisterPlayerBandwidth(kbps float64) {
windowedBandwidths = append(windowedBandwidths, kbps)
}
// RegisterPlayerLatency will add to the windowed player latency values.
func RegisterPlayerLatency(seconds float64) {
windowedLatencies = append(windowedLatencies, seconds)
}
// RegisterPlayerSegmentDownloadDuration will add to the windowed player segment
// download duration values.
func RegisterPlayerSegmentDownloadDuration(seconds float64) {
windowedDownloadDurations = append(windowedDownloadDurations, seconds)
}
// collectPlaybackErrorCount will take all of the error counts each individual
// player reported and average them into a single metric. This is done so
// one person with bad connectivity doesn't make it look like everything is
// horrible for everyone.
func collectPlaybackErrorCount() {
count := utils.Sum(windowedErrorCounts)
windowedErrorCounts = []float64{}
metrics.errorCount = append(metrics.errorCount, TimestampedValue{
Time: time.Now(),
Value: count,
})
if len(metrics.errorCount) > maxCollectionValues {
metrics.errorCount = metrics.errorCount[1:]
}
// Save to Prometheus collector.
playbackErrorCount.Set(count)
}
func collectSegmentDownloadDuration() {
val := 0.0
if len(windowedDownloadDurations) > 0 {
val = utils.Avg(windowedDownloadDurations)
windowedDownloadDurations = []float64{}
}
metrics.segmentDownloadSeconds = append(metrics.segmentDownloadSeconds, TimestampedValue{
Time: time.Now(),
Value: val,
})
if len(metrics.segmentDownloadSeconds) > maxCollectionValues {
metrics.segmentDownloadSeconds = metrics.segmentDownloadSeconds[1:]
}
}
// GetDownloadDurationsOverTime will return a window of durations errors over time.
func GetDownloadDurationsOverTime() []TimestampedValue {
return metrics.segmentDownloadSeconds
}
// GetPlaybackErrorCountOverTime will return a window of playback errors over time.
func GetPlaybackErrorCountOverTime() []TimestampedValue {
return metrics.errorCount
}
func collectLatencyValues() {
val := 0.0
if len(windowedLatencies) > 0 {
val = utils.Avg(windowedLatencies)
val = math.Round(val)
windowedLatencies = []float64{}
}
metrics.averageLatency = append(metrics.averageLatency, TimestampedValue{
Time: time.Now(),
Value: val,
})
if len(metrics.averageLatency) > maxCollectionValues {
metrics.averageLatency = metrics.averageLatency[1:]
}
}
// GetLatencyOverTime will return the min, max and avg latency values over time.
func GetLatencyOverTime() []TimestampedValue {
if len(metrics.averageLatency) == 0 {
return []TimestampedValue{}
}
return metrics.averageLatency
}
// collectLowestBandwidth will collect the lowest bandwidth currently collected
// so we can report to the streamer the worst possible streaming condition
// being experienced.
func collectLowestBandwidth() {
val := 0.0
if len(windowedBandwidths) > 0 {
val, _ = utils.MinMax(windowedBandwidths)
val = math.Round(val)
windowedBandwidths = []float64{}
}
metrics.lowestBitrate = append(metrics.lowestBitrate, TimestampedValue{
Time: time.Now(),
Value: math.Round(val),
})
if len(metrics.lowestBitrate) > maxCollectionValues {
metrics.lowestBitrate = metrics.lowestBitrate[1:]
}
}
// GetSlowestDownloadRateOverTime will return the collected lowest bandwidth values
// over time.
func GetSlowestDownloadRateOverTime() []TimestampedValue {
if len(metrics.lowestBitrate) == 0 {
return []TimestampedValue{}
}
return metrics.lowestBitrate
}
func collectQualityVariantChanges() {
count := utils.Sum(windowedQualityVariantChanges)
windowedQualityVariantChanges = []float64{}
metrics.qualityVariantChanges = append(metrics.qualityVariantChanges, TimestampedValue{
Time: time.Now(),
Value: count,
})
}
// GetQualityVariantChangesOverTime will return the collected quality variant
// changes.
func GetQualityVariantChangesOverTime() []TimestampedValue {
return metrics.qualityVariantChanges
}

View File

@@ -2,6 +2,7 @@ package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
@@ -11,4 +12,44 @@ var (
cpuUsage prometheus.Gauge
chatUserCount prometheus.Gauge
currentChatMessageCount prometheus.Gauge
playbackErrorCount prometheus.Gauge
)
func setupPrometheusCollectors() {
// Setup the Prometheus collectors.
activeViewerCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_active_viewer_count",
Help: "The number of viewers.",
ConstLabels: labels,
})
activeChatClientCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_active_chat_client_count",
Help: "The number of connected chat clients.",
ConstLabels: labels,
})
chatUserCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_total_chat_users",
Help: "The total number of chat users on this Owncast instance.",
ConstLabels: labels,
})
currentChatMessageCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_current_chat_message_count",
Help: "The number of chat messages currently saved before cleanup.",
ConstLabels: labels,
})
playbackErrorCount = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_playback_error_count",
Help: "Errors collected from players within this window",
ConstLabels: labels,
})
cpuUsage = promauto.NewGauge(prometheus.GaugeOpts{
Name: "owncast_instance_cpu_usage",
Help: "CPU usage as seen internally to Owncast.",
ConstLabels: labels,
})
}

View File

@@ -6,15 +6,16 @@ import (
"github.com/nakabonne/tstorage"
)
type timestampedValue struct {
// TimestampedValue is a value with a timestamp.
type TimestampedValue struct {
Time time.Time `json:"time"`
Value int `json:"value"`
Value float64 `json:"value"`
}
func makeTimestampedValuesFromDatapoints(dp []*tstorage.DataPoint) []timestampedValue {
tv := []timestampedValue{}
func makeTimestampedValuesFromDatapoints(dp []*tstorage.DataPoint) []TimestampedValue {
tv := []TimestampedValue{}
for _, d := range dp {
tv = append(tv, timestampedValue{Time: time.Unix(d.Timestamp, 0), Value: int(d.Value)})
tv = append(tv, TimestampedValue{Time: time.Unix(d.Timestamp, 0), Value: d.Value})
}
return tv

View File

@@ -10,9 +10,6 @@ import (
log "github.com/sirupsen/logrus"
)
// How often we poll for updates.
const viewerMetricsPollingInterval = 2 * time.Minute
var storage tstorage.Storage
func startViewerCollectionMetrics() {
@@ -36,8 +33,20 @@ func collectViewerCount() {
return
}
// Save to our Prometheus collector.
activeViewerCount.Set(float64(core.GetStatus().ViewerCount))
count := core.GetStatus().ViewerCount
// Save active viewer count to our Prometheus collector.
activeViewerCount.Set(float64(count))
// Insert active viewer count into our on-disk time series storage.
if err := storage.InsertRows([]tstorage.Row{
{
Metric: activeViewerCountKey,
DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)},
},
}); err != nil {
log.Errorln(err)
}
}
func collectChatClientCount() {
@@ -46,15 +55,18 @@ func collectChatClientCount() {
// Total message count
cmc := data.GetMessagesCount()
// Insert message count into Prometheus collector.
currentChatMessageCount.Set(float64(cmc))
// Total user count
uc := data.GetUsersCount()
// Insert user count into Prometheus collector.
chatUserCount.Set(float64(uc))
// Insert active chat user count into our on-disk time series storage.
if err := storage.InsertRows([]tstorage.Row{
{
Metric: "viewercount",
Metric: activeChatClientCountKey,
DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)},
},
}); err != nil {
@@ -63,8 +75,19 @@ func collectChatClientCount() {
}
// GetViewersOverTime will return a window of viewer counts over time.
func GetViewersOverTime(start, end time.Time) []timestampedValue {
p, err := storage.Select("viewercount", nil, start.Unix(), end.Unix())
func GetViewersOverTime(start, end time.Time) []TimestampedValue {
p, err := storage.Select(activeViewerCountKey, nil, start.Unix(), end.Unix())
if err != nil && err != tstorage.ErrNoDataPoints {
log.Errorln(err)
}
datapoints := makeTimestampedValuesFromDatapoints(p)
return datapoints
}
// GetChatClientCountOverTime will return a window of connected chat clients over time.
func GetChatClientCountOverTime(start, end time.Time) []TimestampedValue {
p, err := storage.Select(activeChatClientCountKey, nil, start.Unix(), end.Unix())
if err != nil && err != tstorage.ErrNoDataPoints {
log.Errorln(err)
}