0

feat(webhooks): add stream status to webhook (#2934)

Expand the payload sent for stream status webhooks. Closes #2881
This commit is contained in:
Gabe Kangas 2023-05-30 11:32:05 -07:00 committed by GitHub
parent b9508ba1c8
commit 209756fed3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 6 deletions

View File

@ -78,7 +78,7 @@ func Start() error {
rtmpPort := data.GetRTMPPortNumber() rtmpPort := data.GetRTMPPortNumber()
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort) log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
webhooks.InitWorkerPool() webhooks.SetupWebhooks(GetStatus)
notifications.Setup(data.GetStore()) notifications.Setup(data.GetStore())

View File

@ -21,6 +21,7 @@ func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time
"name": data.GetServerName(), "name": data.GetServerName(),
"summary": data.GetServerSummary(), "summary": data.GetServerSummary(),
"streamTitle": data.GetStreamTitle(), "streamTitle": data.GetStreamTitle(),
"status": getStatus(),
"timestamp": timestamp, "timestamp": timestamp,
}, },
}) })

View File

@ -21,6 +21,16 @@ func TestSendStreamStatusEvent(t *testing.T) {
"name": "my server", "name": "my server",
"streamTitle": "my stream", "streamTitle": "my stream",
"summary": "my server where I stream", "summary": "my server where I stream",
"timestamp": "1970-01-01T00:01:12.000000006Z" "timestamp": "1970-01-01T00:01:12.000000006Z",
"status": {
"lastConnectTime": null,
"lastDisconnectTime": null,
"online": true,
"overallMaxViewerCount": 420,
"sessionMaxViewerCount": 69,
"streamTitle": "my stream",
"versionNumber": "1.2.3",
"viewerCount": 5
}
}`) }`)
} }

View File

@ -17,6 +17,17 @@ import (
jsonpatch "gopkg.in/evanphx/json-patch.v5" jsonpatch "gopkg.in/evanphx/json-patch.v5"
) )
func fakeGetStatus() models.Status {
return models.Status{
Online: true,
ViewerCount: 5,
OverallMaxViewerCount: 420,
SessionMaxViewerCount: 69,
StreamTitle: "my stream",
VersionNumber: "1.2.3",
}
}
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db") dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db")
if err != nil { if err != nil {
@ -29,7 +40,8 @@ func TestMain(m *testing.M) {
panic(err) panic(err)
} }
InitWorkerPool() SetupWebhooks(fakeGetStatus)
defer close(queue) defer close(queue)
m.Run() m.Run()

View File

@ -24,10 +24,19 @@ type Job struct {
wg *sync.WaitGroup wg *sync.WaitGroup
} }
var queue chan Job var (
queue chan Job
getStatus func() models.Status
)
// InitWorkerPool starts n go routines that await webhook jobs. // SetupWebhooks initializes the webhook worker pool and sets the function to get the current status.
func InitWorkerPool() { func SetupWebhooks(getStatusFunc func() models.Status) {
getStatus = getStatusFunc
initWorkerPool()
}
// initWorkerPool starts n go routines that await webhook jobs.
func initWorkerPool() {
queue = make(chan Job) queue = make(chan Job)
// start workers // start workers