diff --git a/controllers/admin/viewers.go b/controllers/admin/viewers.go index 9133f3da3..e2f48b1c1 100644 --- a/controllers/admin/viewers.go +++ b/controllers/admin/viewers.go @@ -3,6 +3,8 @@ package admin import ( "encoding/json" "net/http" + "strconv" + "time" "github.com/owncast/owncast/controllers" "github.com/owncast/owncast/core" @@ -14,9 +16,19 @@ import ( // GetViewersOverTime will return the number of viewers at points in time. func GetViewersOverTime(w http.ResponseWriter, r *http.Request) { - viewersOverTime := metrics.Metrics.Viewers + windowStartAtStr := r.URL.Query().Get("windowStart") + windowStartAtUnix, err := strconv.Atoi(windowStartAtStr) + if err != nil { + controllers.WriteSimpleResponse(w, false, err.Error()) + return + } + + windowStartAt := time.Unix(int64(windowStartAtUnix), 0) + windowEnd := time.Now() + + viewersOverTime := metrics.GetViewersOverTime(windowStartAt, windowEnd) w.Header().Set("Content-Type", "application/json") - err := json.NewEncoder(w).Encode(viewersOverTime) + err = json.NewEncoder(w).Encode(viewersOverTime) if err != nil { log.Errorln(err) } diff --git a/go.mod b/go.mod index 6b5a83e93..2ef043bc3 100644 --- a/go.mod +++ b/go.mod @@ -57,4 +57,6 @@ require ( google.golang.org/protobuf v1.26.0 // indirect ) +require github.com/nakabonne/tstorage v0.3.5 + replace github.com/go-fed/activity => github.com/owncast/activity v1.0.1-0.20211229051252-7821289d4026 diff --git a/go.sum b/go.sum index c79a21c29..377e1506b 100644 --- a/go.sum +++ b/go.sum @@ -191,6 +191,8 @@ github.com/mvdan/xurls v1.1.0 h1:OpuDelGQ1R1ueQ6sSryzi6P+1RtBpfQHM8fJwlE45ww= github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nakabonne/tstorage v0.3.5 h1:AmXhEn6SM94sMy1+bwAs9xg3cuefXBXakcYOMQuQlqI= +github.com/nakabonne/tstorage v0.3.5/go.mod h1:dgOHx150reQ3xHCqyoU19TImAU0PY78bfwUIG24xNzY= github.com/nareix/joy5 v0.0.0-20200712071056-a55089207c88 h1:CXq5QLPMcfGEZMx8uBMyLdDiUNV72vlkSiyqg+jf7AI= github.com/nareix/joy5 v0.0.0-20200712071056-a55089207c88/go.mod h1:XmAOs6UJXpNXRwKk+KY/nv5kL6xXYXyellk+A1pTlko= github.com/oschwald/geoip2-golang v1.6.1 h1:GKxT3yaWWNXSb7vj6D7eoJBns+lGYgx08QO0UcNm0YY= diff --git a/metrics/metrics.go b/metrics/metrics.go index 6a8db5e95..8de152c22 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -13,15 +13,11 @@ import ( // How often we poll for updates. const metricsPollingInterval = 1 * time.Minute -var _getStatus func() models.Status - // CollectedMetrics stores different collected + timestamped values. type CollectedMetrics struct { CPUUtilizations []timestampedValue `json:"cpu"` RAMUtilizations []timestampedValue `json:"memory"` DiskUtilizations []timestampedValue `json:"disk"` - - Viewers []timestampedValue `json:"-"` } // Metrics is the shared Metrics instance. @@ -29,8 +25,6 @@ var Metrics *CollectedMetrics // Start will begin the metrics collection and alerting. func Start(getStatus func() models.Status) { - _getStatus = getStatus - host := data.GetServerURL() if host == "" { host = "unknown" @@ -72,7 +66,6 @@ func Start(getStatus func() models.Status) { Metrics = new(CollectedMetrics) go startViewerCollectionMetrics() - handlePolling() for range time.Tick(metricsPollingInterval) { handlePolling() diff --git a/metrics/timestampedValue.go b/metrics/timestampedValue.go index 35e560141..0eba30a84 100644 --- a/metrics/timestampedValue.go +++ b/metrics/timestampedValue.go @@ -1,8 +1,21 @@ package metrics -import "time" +import ( + "time" + + "github.com/nakabonne/tstorage" +) type timestampedValue struct { Time time.Time `json:"time"` Value int `json:"value"` } + +func makeTimestampedValuesFromDatapoints(dp []*tstorage.DataPoint) []timestampedValue { + tv := []timestampedValue{} + for _, d := range dp { + tv = append(tv, timestampedValue{Time: time.Unix(d.Timestamp, 0), Value: int(d.Value)}) + } + + return tv +} diff --git a/metrics/viewers.go b/metrics/viewers.go index f1bdbac72..83958f927 100644 --- a/metrics/viewers.go +++ b/metrics/viewers.go @@ -3,15 +3,27 @@ package metrics import ( "time" + "github.com/nakabonne/tstorage" + "github.com/owncast/owncast/core" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" + log "github.com/sirupsen/logrus" ) // How often we poll for updates. const viewerMetricsPollingInterval = 2 * time.Minute +var storage tstorage.Storage + func startViewerCollectionMetrics() { + storage, _ = tstorage.NewStorage( + tstorage.WithTimestampPrecision(tstorage.Seconds), + tstorage.WithDataPath("./data/metrics"), + ) + defer storage.Close() + collectViewerCount() + handlePolling() for range time.Tick(viewerMetricsPollingInterval) { collectViewerCount() @@ -19,16 +31,15 @@ func startViewerCollectionMetrics() { } func collectViewerCount() { - if len(Metrics.Viewers) > maxCollectionValues { - Metrics.Viewers = Metrics.Viewers[1:] + // Don't collect metrics for viewers if there's no stream active. + if !core.GetStatus().Online { + return } +} - count := _getStatus().ViewerCount - value := timestampedValue{ - Value: count, - Time: time.Now(), - } - Metrics.Viewers = append(Metrics.Viewers, value) +func collectChatClientCount() { + count := len(chat.GetClients()) + activeChatClientCount.Set(float64(count)) // Save to our Prometheus collector. activeViewerCount.Set(float64(count)) @@ -40,9 +51,24 @@ func collectViewerCount() { // Total user count uc := data.GetUsersCount() chatUserCount.Set(float64(uc)) + + if err := storage.InsertRows([]tstorage.Row{ + { + Metric: "viewercount", + DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)}, + }, + }); err != nil { + log.Errorln(err) + } } -func collectChatClientCount() { - count := len(chat.GetClients()) - activeChatClientCount.Set(float64(count)) +// GetViewersOverTime will return a window of viewer counts over time. +func GetViewersOverTime(start, end time.Time) []timestampedValue { + p, err := storage.Select("viewercount", nil, start.Unix(), end.Unix()) + if err != nil && err != tstorage.ErrNoDataPoints { + log.Errorln(err) + } + datapoints := makeTimestampedValuesFromDatapoints(p) + + return datapoints }