diff --git a/core/chat/messages.go b/core/chat/messages.go index c5a4b2655..5e05b5796 100644 --- a/core/chat/messages.go +++ b/core/chat/messages.go @@ -29,13 +29,7 @@ func SetMessagesVisibility(messageIDs []string, visibility bool) error { return errors.New("error broadcasting message visibility payload " + err.Error()) } - // Send webhook - wh := webhooks.WebhookEvent{ - EventData: payload, - Type: event.GetMessageType(), - } - - webhooks.SendEventToWebhooks(wh) + webhooks.SendChatEventSetMessageVisibility(event) return nil } diff --git a/core/webhooks/chat.go b/core/webhooks/chat.go index 3647956b6..1fb5287f3 100644 --- a/core/webhooks/chat.go +++ b/core/webhooks/chat.go @@ -42,3 +42,14 @@ func SendChatEventUserJoined(event events.UserJoinedEvent) { SendEventToWebhooks(webhookEvent) } + +// SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more +// messages has changed. +func SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) { + webhookEvent := WebhookEvent{ + Type: models.VisibiltyToggled, + EventData: event, + } + + SendEventToWebhooks(webhookEvent) +} diff --git a/core/webhooks/chat_test.go b/core/webhooks/chat_test.go new file mode 100644 index 000000000..eb8689a70 --- /dev/null +++ b/core/webhooks/chat_test.go @@ -0,0 +1,185 @@ +package webhooks + +import ( + "testing" + "time" + + "github.com/owncast/owncast/core/chat/events" + "github.com/owncast/owncast/core/user" + "github.com/owncast/owncast/models" +) + +func TestSendChatEvent(t *testing.T) { + timestamp := time.Unix(72, 6).UTC() + user := user.User{ + ID: "user id", + DisplayName: "display name", + DisplayColor: 4, + CreatedAt: time.Unix(3, 26).UTC(), + DisabledAt: nil, + PreviousNames: []string{"somebody"}, + NameChangedAt: nil, + Scopes: []string{}, + IsBot: false, + AuthenticatedAt: nil, + Authenticated: false, + } + + checkPayload(t, models.MessageSent, func() { + SendChatEvent(&events.UserMessageEvent{ + Event: events.Event{ + Type: events.MessageSent, + ID: "id", + Timestamp: timestamp, + }, + UserEvent: events.UserEvent{ + User: &user, + ClientID: 51, + HiddenAt: nil, + }, + MessageEvent: events.MessageEvent{ + OutboundEvent: nil, + Body: "body", + RawBody: "raw body", + }, + }) + }, `{ + "body": "body", + "clientId": 51, + "id": "id", + "rawBody": "raw body", + "timestamp": "1970-01-01T00:01:12.000000006Z", + "user": { + "authenticated": false, + "createdAt": "1970-01-01T00:00:03.000000026Z", + "displayColor": 4, + "displayName": "display name", + "id": "user id", + "isBot": false, + "previousNames": ["somebody"] + }, + "visible": true + }`) +} + +func TestSendChatEventUsernameChanged(t *testing.T) { + timestamp := time.Unix(72, 6).UTC() + user := user.User{ + ID: "user id", + DisplayName: "display name", + DisplayColor: 4, + CreatedAt: time.Unix(3, 26).UTC(), + DisabledAt: nil, + PreviousNames: []string{"somebody"}, + NameChangedAt: nil, + Scopes: []string{}, + IsBot: false, + AuthenticatedAt: nil, + Authenticated: false, + } + + checkPayload(t, models.UserNameChanged, func() { + SendChatEventUsernameChanged(events.NameChangeEvent{ + Event: events.Event{ + Type: events.UserNameChanged, + ID: "id", + Timestamp: timestamp, + }, + UserEvent: events.UserEvent{ + User: &user, + ClientID: 51, + HiddenAt: nil, + }, + NewName: "new name", + }) + }, `{ + "clientId": 51, + "id": "id", + "newName": "new name", + "timestamp": "1970-01-01T00:01:12.000000006Z", + "type": "NAME_CHANGE", + "user": { + "authenticated": false, + "createdAt": "1970-01-01T00:00:03.000000026Z", + "displayColor": 4, + "displayName": "display name", + "id": "user id", + "isBot": false, + "previousNames": ["somebody"] + } + }`) +} + +func TestSendChatEventUserJoined(t *testing.T) { + timestamp := time.Unix(72, 6).UTC() + user := user.User{ + ID: "user id", + DisplayName: "display name", + DisplayColor: 4, + CreatedAt: time.Unix(3, 26).UTC(), + DisabledAt: nil, + PreviousNames: []string{"somebody"}, + NameChangedAt: nil, + Scopes: []string{}, + IsBot: false, + AuthenticatedAt: nil, + Authenticated: false, + } + + checkPayload(t, models.UserJoined, func() { + SendChatEventUserJoined(events.UserJoinedEvent{ + Event: events.Event{ + Type: events.UserJoined, + ID: "id", + Timestamp: timestamp, + }, + UserEvent: events.UserEvent{ + User: &user, + ClientID: 51, + HiddenAt: nil, + }, + }) + }, `{ + "clientId": 51, + "id": "id", + "type": "USER_JOINED", + "timestamp": "1970-01-01T00:01:12.000000006Z", + "user": { + "authenticated": false, + "createdAt": "1970-01-01T00:00:03.000000026Z", + "displayColor": 4, + "displayName": "display name", + "id": "user id", + "isBot": false, + "previousNames": ["somebody"] + } + }`) +} + +func TestSendChatEventSetMessageVisibility(t *testing.T) { + timestamp := time.Unix(72, 6).UTC() + + checkPayload(t, models.VisibiltyToggled, func() { + SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{ + Event: events.Event{ + Type: events.VisibiltyUpdate, + ID: "id", + Timestamp: timestamp, + }, + UserMessageEvent: events.UserMessageEvent{}, + MessageIDs: []string{"message1", "message2"}, + Visible: false, + }) + }, `{ + "MessageIDs": [ + "message1", + "message2" + ], + "Visible": false, + "body": "", + "id": "id", + "timestamp": "1970-01-01T00:01:12.000000006Z", + "type": "VISIBILITY-UPDATE", + "user": null + }`) +} diff --git a/core/webhooks/stream.go b/core/webhooks/stream.go index c2b282445..a0184ec59 100644 --- a/core/webhooks/stream.go +++ b/core/webhooks/stream.go @@ -10,14 +10,18 @@ import ( // SendStreamStatusEvent will send all webhook destinations the current stream status. func SendStreamStatusEvent(eventType models.EventType) { + sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now()) +} + +func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) { SendEventToWebhooks(WebhookEvent{ Type: eventType, EventData: map[string]interface{}{ - "id": shortid.MustGenerate(), + "id": id, "name": data.GetServerName(), "summary": data.GetServerSummary(), "streamTitle": data.GetStreamTitle(), - "timestamp": time.Now(), + "timestamp": timestamp, }, }) } diff --git a/core/webhooks/stream_test.go b/core/webhooks/stream_test.go new file mode 100644 index 000000000..a9802623d --- /dev/null +++ b/core/webhooks/stream_test.go @@ -0,0 +1,26 @@ +package webhooks + +import ( + "testing" + "time" + + "github.com/owncast/owncast/core/chat/events" + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/models" +) + +func TestSendStreamStatusEvent(t *testing.T) { + data.SetServerName("my server") + data.SetServerSummary("my server where I stream") + data.SetStreamTitle("my stream") + + checkPayload(t, models.StreamStarted, func() { + sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC()) + }, `{ + "id": "id", + "name": "my server", + "streamTitle": "my stream", + "summary": "my server where I stream", + "timestamp": "1970-01-01T00:01:12.000000006Z" + }`) +} diff --git a/core/webhooks/webhooks.go b/core/webhooks/webhooks.go index 3fa6ee67b..9e7be6e9f 100644 --- a/core/webhooks/webhooks.go +++ b/core/webhooks/webhooks.go @@ -1,6 +1,7 @@ package webhooks import ( + "sync" "time" "github.com/owncast/owncast/core/data" @@ -27,9 +28,17 @@ type WebhookChatMessage struct { // SendEventToWebhooks will send a single webhook event to all webhook destinations. func SendEventToWebhooks(payload WebhookEvent) { + sendEventToWebhooks(payload, nil) +} + +func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) { webhooks := data.GetWebhooksForEvent(payload.Type) for _, webhook := range webhooks { - go addToQueue(webhook, payload) + // Use wg to track the number of notifications to be sent. + if wg != nil { + wg.Add(1) + } + addToQueue(webhook, payload, wg) } } diff --git a/core/webhooks/webhooks_test.go b/core/webhooks/webhooks_test.go new file mode 100644 index 000000000..c46254066 --- /dev/null +++ b/core/webhooks/webhooks_test.go @@ -0,0 +1,347 @@ +package webhooks + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/models" + jsonpatch "gopkg.in/evanphx/json-patch.v5" +) + +func TestMain(m *testing.M) { + dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db") + if err != nil { + panic(err) + } + dbFile.Close() + defer os.Remove(dbFile.Name()) + + if err := data.SetupPersistence(dbFile.Name()); err != nil { + panic(err) + } + + InitWorkerPool() + defer close(queue) + + m.Run() +} + +// Because the other tests use `sendEventToWebhooks` with a `WaitGroup` to know when the test completes, +// this test ensures that `SendToWebhooks` without a `WaitGroup` doesn't panic. +func TestPublicSend(t *testing.T) { + // Send enough events to be sure at least one worker delivers a second event. + const eventsCount = webhookWorkerPoolSize + 1 + + var wg sync.WaitGroup + wg.Add(eventsCount) + + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wg.Done() + })) + defer svr.Close() + + hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent}) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := data.DeleteWebhook(hook); err != nil { + t.Error(err) + } + }() + + for i := 0; i < eventsCount; i++ { + wh := WebhookEvent{ + EventData: struct{}{}, + Type: models.MessageSent, + } + SendEventToWebhooks(wh) + } + + wg.Wait() +} + +// Make sure that events are only sent to interested endpoints. +func TestRouting(t *testing.T) { + eventTypes := []models.EventType{models.PING, models.PONG} + + calls := map[models.EventType]int{} + var lock sync.Mutex + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if len(r.URL.Path) < 1 || r.URL.Path[0] != '/' { + t.Fatalf("Got unexpected path %v", r.URL.Path) + } + pathType := r.URL.Path[1:] + var body WebhookEvent + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatal(err) + } + if body.Type != pathType { + t.Fatalf("Got %v payload on %v endpoint", body.Type, pathType) + } + lock.Lock() + defer lock.Unlock() + calls[pathType] += 1 + })) + defer svr.Close() + + for _, eventType := range eventTypes { + hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType}) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := data.DeleteWebhook(hook); err != nil { + t.Error(err) + } + }() + } + + var wg sync.WaitGroup + + for _, eventType := range eventTypes { + wh := WebhookEvent{ + EventData: struct{}{}, + Type: eventType, + } + sendEventToWebhooks(wh, &wg) + } + + wg.Wait() + + for _, eventType := range eventTypes { + if calls[eventType] != 1 { + t.Errorf("Expected %v to be called exactly once but it was called %v times", eventType, calls[eventType]) + } + } +} + +// Make sure that events are sent to all interested endpoints. +func TestMultiple(t *testing.T) { + const times = 2 + + var calls uint32 + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddUint32(&calls, 1) + })) + defer svr.Close() + + for i := 0; i < times; i++ { + hook, err := data.InsertWebhook(fmt.Sprintf("%v/%v", svr.URL, i), []models.EventType{models.MessageSent}) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := data.DeleteWebhook(hook); err != nil { + t.Error(err) + } + }() + } + + var wg sync.WaitGroup + + wh := WebhookEvent{ + EventData: struct{}{}, + Type: models.MessageSent, + } + sendEventToWebhooks(wh, &wg) + + wg.Wait() + + if atomic.LoadUint32(&calls) != times { + t.Errorf("Expected event to be sent exactly %v times but it was sent %v times", times, atomic.LoadUint32(&calls)) + } +} + +// Make sure when a webhook is used its last used timestamp is updated. +func TestTimestamps(t *testing.T) { + const tolerance = time.Second + start := time.Now() + eventTypes := []models.EventType{models.PING, models.PONG} + handlerIds := []int{0, 0} + handlers := []*models.Webhook{nil, nil} + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + defer svr.Close() + + for i, eventType := range eventTypes { + hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType}) + if err != nil { + t.Fatal(err) + } + handlerIds[i] = hook + defer func() { + if err := data.DeleteWebhook(hook); err != nil { + t.Error(err) + } + }() + } + + var wg sync.WaitGroup + + wh := WebhookEvent{ + EventData: struct{}{}, + Type: eventTypes[0], + } + sendEventToWebhooks(wh, &wg) + + wg.Wait() + + hooks, err := data.GetWebhooks() + if err != nil { + t.Fatal(err) + } + + for h, hook := range hooks { + for i, handlerId := range handlerIds { + if hook.ID == handlerId { + handlers[i] = &hooks[h] + } + } + } + + if handlers[0] == nil { + t.Fatal("First handler was not found in registered handlers") + } + if handlers[1] == nil { + t.Fatal("Second handler was not found in registered handlers") + } + + end := time.Now() + + if handlers[0].Timestamp.Add(tolerance).Before(start) { + t.Errorf("First handler timestamp %v should not be before start of test %v", handlers[0].Timestamp, start) + } + + if handlers[0].Timestamp.Add(tolerance).Before(handlers[1].Timestamp) { + t.Errorf("Second handler timestamp %v should not be before first handler timestamp %v", handlers[1].Timestamp, handlers[0].Timestamp) + } + + if end.Add(tolerance).Before(handlers[1].Timestamp) { + t.Errorf("End of test %v should not be before second handler timestamp %v", end, handlers[1].Timestamp) + } + + if handlers[0].LastUsed == nil { + t.Error("First handler last used should have been set") + } else if handlers[0].LastUsed.Add(tolerance).Before(handlers[1].Timestamp) { + t.Errorf("First handler last used %v should not be before second handler timestamp %v", handlers[0].LastUsed, handlers[1].Timestamp) + } else if end.Add(tolerance).Before(*handlers[0].LastUsed) { + t.Errorf("End of test %v should not be before first handler last used %v", end, handlers[0].LastUsed) + } + + if handlers[1].LastUsed != nil { + t.Error("Second handler last used should not have been set") + } +} + +// Make sure up to the expected number of events can be fired in parallel. +func TestParallel(t *testing.T) { + var calls uint32 + + var wgStart sync.WaitGroup + finished := make(chan int) + wgStart.Add(webhookWorkerPoolSize) + + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + myId := atomic.AddUint32(&calls, 1) + + // We made it to the pool size + 1 event, so we're done with the test. + if myId == webhookWorkerPoolSize+1 { + close(finished) + return + } + + // Wait until all the handlers are started. + wgStart.Done() + wgStart.Wait() + + // The first handler just returns so the pool size + 1 event can be handled. + if myId != 1 { + // The other handlers will wait for pool size + 1. + _ = <-finished + } + })) + defer svr.Close() + + hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent}) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := data.DeleteWebhook(hook); err != nil { + t.Error(err) + } + }() + + var wgMessages sync.WaitGroup + + for i := 0; i < webhookWorkerPoolSize+1; i++ { + wh := WebhookEvent{ + EventData: struct{}{}, + Type: models.MessageSent, + } + sendEventToWebhooks(wh, &wgMessages) + } + + wgMessages.Wait() +} + +// Send an event, capture it, and verify that it has the expected payload. +func checkPayload(t *testing.T, eventType models.EventType, send func(), expectedJson string) { + eventChannel := make(chan WebhookEvent) + + // Set up a server. + svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + data := WebhookEvent{} + json.NewDecoder(r.Body).Decode(&data) + eventChannel <- data + })) + defer svr.Close() + + // Subscribe to the webhook. + hook, err := data.InsertWebhook(svr.URL, []models.EventType{eventType}) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := data.DeleteWebhook(hook); err != nil { + t.Error(err) + } + }() + + // Send and capture the event. + send() + event := <-eventChannel + + if event.Type != eventType { + t.Errorf("Got event type %v but expected %v", event.Type, eventType) + } + + // Compare. + payloadJson, err := json.MarshalIndent(event.EventData, "", " ") + if err != nil { + t.Fatal(err) + } + t.Logf("Actual payload:\n%s", payloadJson) + + if !jsonpatch.Equal(payloadJson, []byte(expectedJson)) { + diff, err := jsonpatch.CreateMergePatch(payloadJson, []byte(expectedJson)) + if err != nil { + t.Fatal(err) + } + var out bytes.Buffer + if err := json.Indent(&out, diff, "", " "); err != nil { + t.Fatal(err) + } + t.Errorf("Expected difference from actual payload:\n%s", out.Bytes()) + } +} diff --git a/core/webhooks/workerpool.go b/core/webhooks/workerpool.go index cfca428c4..838688340 100644 --- a/core/webhooks/workerpool.go +++ b/core/webhooks/workerpool.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "net/http" + "sync" log "github.com/sirupsen/logrus" @@ -20,6 +21,7 @@ const ( type Job struct { webhook models.Webhook payload WebhookEvent + wg *sync.WaitGroup } var queue chan Job @@ -34,9 +36,9 @@ func InitWorkerPool() { } } -func addToQueue(webhook models.Webhook, payload WebhookEvent) { +func addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) { log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL) - queue <- Job{webhook, payload} + queue <- Job{webhook, payload, wg} } func worker(workerID int, queue <-chan Job) { @@ -49,6 +51,9 @@ func worker(workerID int, queue <-chan Job) { log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err) } log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID) + if job.wg != nil { + job.wg.Done() + } } } diff --git a/go.mod b/go.mod index 546fb101e..5519cd071 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/evanphx/json-patch.v5 v5.6.0 // indirect ) require ( diff --git a/go.sum b/go.sum index 08d50a81d..eefb8e473 100644 --- a/go.sum +++ b/go.sum @@ -689,6 +689,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/evanphx/json-patch.v5 v5.6.0 h1:BMT6KIwBD9CaU91PJCZIe46bDmBWa9ynTQgJIOpfQBk= +gopkg.in/evanphx/json-patch.v5 v5.6.0/go.mod h1:/kvTRh1TVm5wuM6OkHxqXtE/1nUZZpihg29RtuIyfvk= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=