0

Persist time series viewer metrics (#1752)

* WIP persisting time series viewer metrics. Closes #1478

* Remove unused var, move around initial collection
This commit is contained in:
Gabe Kangas 2022-03-06 19:43:57 -08:00 committed by GitHub
parent 1f05783d9a
commit 1ed1cc01eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 69 additions and 21 deletions

View File

@ -3,6 +3,8 @@ package admin
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"strconv"
"time"
"github.com/owncast/owncast/controllers" "github.com/owncast/owncast/controllers"
"github.com/owncast/owncast/core" "github.com/owncast/owncast/core"
@ -14,9 +16,19 @@ import (
// GetViewersOverTime will return the number of viewers at points in time. // GetViewersOverTime will return the number of viewers at points in time.
func GetViewersOverTime(w http.ResponseWriter, r *http.Request) { func GetViewersOverTime(w http.ResponseWriter, r *http.Request) {
viewersOverTime := metrics.Metrics.Viewers windowStartAtStr := r.URL.Query().Get("windowStart")
windowStartAtUnix, err := strconv.Atoi(windowStartAtStr)
if err != nil {
controllers.WriteSimpleResponse(w, false, err.Error())
return
}
windowStartAt := time.Unix(int64(windowStartAtUnix), 0)
windowEnd := time.Now()
viewersOverTime := metrics.GetViewersOverTime(windowStartAt, windowEnd)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(viewersOverTime) err = json.NewEncoder(w).Encode(viewersOverTime)
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
} }

2
go.mod
View File

@ -57,4 +57,6 @@ require (
google.golang.org/protobuf v1.26.0 // indirect google.golang.org/protobuf v1.26.0 // indirect
) )
require github.com/nakabonne/tstorage v0.3.5
replace github.com/go-fed/activity => github.com/owncast/activity v1.0.1-0.20211229051252-7821289d4026 replace github.com/go-fed/activity => github.com/owncast/activity v1.0.1-0.20211229051252-7821289d4026

2
go.sum
View File

@ -191,6 +191,8 @@ github.com/mvdan/xurls v1.1.0 h1:OpuDelGQ1R1ueQ6sSryzi6P+1RtBpfQHM8fJwlE45ww=
github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU= github.com/mvdan/xurls v1.1.0/go.mod h1:tQlNn3BED8bE/15hnSL2HLkDeLWpNPAwtw7wkEq44oU=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nakabonne/tstorage v0.3.5 h1:AmXhEn6SM94sMy1+bwAs9xg3cuefXBXakcYOMQuQlqI=
github.com/nakabonne/tstorage v0.3.5/go.mod h1:dgOHx150reQ3xHCqyoU19TImAU0PY78bfwUIG24xNzY=
github.com/nareix/joy5 v0.0.0-20200712071056-a55089207c88 h1:CXq5QLPMcfGEZMx8uBMyLdDiUNV72vlkSiyqg+jf7AI= github.com/nareix/joy5 v0.0.0-20200712071056-a55089207c88 h1:CXq5QLPMcfGEZMx8uBMyLdDiUNV72vlkSiyqg+jf7AI=
github.com/nareix/joy5 v0.0.0-20200712071056-a55089207c88/go.mod h1:XmAOs6UJXpNXRwKk+KY/nv5kL6xXYXyellk+A1pTlko= github.com/nareix/joy5 v0.0.0-20200712071056-a55089207c88/go.mod h1:XmAOs6UJXpNXRwKk+KY/nv5kL6xXYXyellk+A1pTlko=
github.com/oschwald/geoip2-golang v1.6.1 h1:GKxT3yaWWNXSb7vj6D7eoJBns+lGYgx08QO0UcNm0YY= github.com/oschwald/geoip2-golang v1.6.1 h1:GKxT3yaWWNXSb7vj6D7eoJBns+lGYgx08QO0UcNm0YY=

View File

@ -13,15 +13,11 @@ import (
// How often we poll for updates. // How often we poll for updates.
const metricsPollingInterval = 1 * time.Minute const metricsPollingInterval = 1 * time.Minute
var _getStatus func() models.Status
// CollectedMetrics stores different collected + timestamped values. // CollectedMetrics stores different collected + timestamped values.
type CollectedMetrics struct { type CollectedMetrics struct {
CPUUtilizations []timestampedValue `json:"cpu"` CPUUtilizations []timestampedValue `json:"cpu"`
RAMUtilizations []timestampedValue `json:"memory"` RAMUtilizations []timestampedValue `json:"memory"`
DiskUtilizations []timestampedValue `json:"disk"` DiskUtilizations []timestampedValue `json:"disk"`
Viewers []timestampedValue `json:"-"`
} }
// Metrics is the shared Metrics instance. // Metrics is the shared Metrics instance.
@ -29,8 +25,6 @@ var Metrics *CollectedMetrics
// Start will begin the metrics collection and alerting. // Start will begin the metrics collection and alerting.
func Start(getStatus func() models.Status) { func Start(getStatus func() models.Status) {
_getStatus = getStatus
host := data.GetServerURL() host := data.GetServerURL()
if host == "" { if host == "" {
host = "unknown" host = "unknown"
@ -72,7 +66,6 @@ func Start(getStatus func() models.Status) {
Metrics = new(CollectedMetrics) Metrics = new(CollectedMetrics)
go startViewerCollectionMetrics() go startViewerCollectionMetrics()
handlePolling()
for range time.Tick(metricsPollingInterval) { for range time.Tick(metricsPollingInterval) {
handlePolling() handlePolling()

View File

@ -1,8 +1,21 @@
package metrics package metrics
import "time" import (
"time"
"github.com/nakabonne/tstorage"
)
type timestampedValue struct { type timestampedValue struct {
Time time.Time `json:"time"` Time time.Time `json:"time"`
Value int `json:"value"` Value int `json:"value"`
} }
func makeTimestampedValuesFromDatapoints(dp []*tstorage.DataPoint) []timestampedValue {
tv := []timestampedValue{}
for _, d := range dp {
tv = append(tv, timestampedValue{Time: time.Unix(d.Timestamp, 0), Value: int(d.Value)})
}
return tv
}

View File

@ -3,15 +3,27 @@ package metrics
import ( import (
"time" "time"
"github.com/nakabonne/tstorage"
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
) )
// How often we poll for updates. // How often we poll for updates.
const viewerMetricsPollingInterval = 2 * time.Minute const viewerMetricsPollingInterval = 2 * time.Minute
var storage tstorage.Storage
func startViewerCollectionMetrics() { func startViewerCollectionMetrics() {
storage, _ = tstorage.NewStorage(
tstorage.WithTimestampPrecision(tstorage.Seconds),
tstorage.WithDataPath("./data/metrics"),
)
defer storage.Close()
collectViewerCount() collectViewerCount()
handlePolling()
for range time.Tick(viewerMetricsPollingInterval) { for range time.Tick(viewerMetricsPollingInterval) {
collectViewerCount() collectViewerCount()
@ -19,16 +31,15 @@ func startViewerCollectionMetrics() {
} }
func collectViewerCount() { func collectViewerCount() {
if len(Metrics.Viewers) > maxCollectionValues { // Don't collect metrics for viewers if there's no stream active.
Metrics.Viewers = Metrics.Viewers[1:] if !core.GetStatus().Online {
return
} }
}
count := _getStatus().ViewerCount func collectChatClientCount() {
value := timestampedValue{ count := len(chat.GetClients())
Value: count, activeChatClientCount.Set(float64(count))
Time: time.Now(),
}
Metrics.Viewers = append(Metrics.Viewers, value)
// Save to our Prometheus collector. // Save to our Prometheus collector.
activeViewerCount.Set(float64(count)) activeViewerCount.Set(float64(count))
@ -40,9 +51,24 @@ func collectViewerCount() {
// Total user count // Total user count
uc := data.GetUsersCount() uc := data.GetUsersCount()
chatUserCount.Set(float64(uc)) chatUserCount.Set(float64(uc))
if err := storage.InsertRows([]tstorage.Row{
{
Metric: "viewercount",
DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)},
},
}); err != nil {
log.Errorln(err)
}
} }
func collectChatClientCount() { // GetViewersOverTime will return a window of viewer counts over time.
count := len(chat.GetClients()) func GetViewersOverTime(start, end time.Time) []timestampedValue {
activeChatClientCount.Set(float64(count)) p, err := storage.Select("viewercount", nil, start.Unix(), end.Unix())
if err != nil && err != tstorage.ErrNoDataPoints {
log.Errorln(err)
}
datapoints := makeTimestampedValuesFromDatapoints(p)
return datapoints
} }