add tests for webhook events (#2180)
* add tests for webhook events * atomic.Uint32 is not in Go 1.18
This commit is contained in:
parent
155d671df0
commit
10055664bb
@ -29,13 +29,7 @@ func SetMessagesVisibility(messageIDs []string, visibility bool) error {
|
|||||||
return errors.New("error broadcasting message visibility payload " + err.Error())
|
return errors.New("error broadcasting message visibility payload " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send webhook
|
webhooks.SendChatEventSetMessageVisibility(event)
|
||||||
wh := webhooks.WebhookEvent{
|
|
||||||
EventData: payload,
|
|
||||||
Type: event.GetMessageType(),
|
|
||||||
}
|
|
||||||
|
|
||||||
webhooks.SendEventToWebhooks(wh)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -42,3 +42,14 @@ func SendChatEventUserJoined(event events.UserJoinedEvent) {
|
|||||||
|
|
||||||
SendEventToWebhooks(webhookEvent)
|
SendEventToWebhooks(webhookEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more
|
||||||
|
// messages has changed.
|
||||||
|
func SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) {
|
||||||
|
webhookEvent := WebhookEvent{
|
||||||
|
Type: models.VisibiltyToggled,
|
||||||
|
EventData: event,
|
||||||
|
}
|
||||||
|
|
||||||
|
SendEventToWebhooks(webhookEvent)
|
||||||
|
}
|
||||||
|
185
core/webhooks/chat_test.go
Normal file
185
core/webhooks/chat_test.go
Normal file
@ -0,0 +1,185 @@
|
|||||||
|
package webhooks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/owncast/owncast/core/chat/events"
|
||||||
|
"github.com/owncast/owncast/core/user"
|
||||||
|
"github.com/owncast/owncast/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSendChatEvent(t *testing.T) {
|
||||||
|
timestamp := time.Unix(72, 6).UTC()
|
||||||
|
user := user.User{
|
||||||
|
ID: "user id",
|
||||||
|
DisplayName: "display name",
|
||||||
|
DisplayColor: 4,
|
||||||
|
CreatedAt: time.Unix(3, 26).UTC(),
|
||||||
|
DisabledAt: nil,
|
||||||
|
PreviousNames: []string{"somebody"},
|
||||||
|
NameChangedAt: nil,
|
||||||
|
Scopes: []string{},
|
||||||
|
IsBot: false,
|
||||||
|
AuthenticatedAt: nil,
|
||||||
|
Authenticated: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
checkPayload(t, models.MessageSent, func() {
|
||||||
|
SendChatEvent(&events.UserMessageEvent{
|
||||||
|
Event: events.Event{
|
||||||
|
Type: events.MessageSent,
|
||||||
|
ID: "id",
|
||||||
|
Timestamp: timestamp,
|
||||||
|
},
|
||||||
|
UserEvent: events.UserEvent{
|
||||||
|
User: &user,
|
||||||
|
ClientID: 51,
|
||||||
|
HiddenAt: nil,
|
||||||
|
},
|
||||||
|
MessageEvent: events.MessageEvent{
|
||||||
|
OutboundEvent: nil,
|
||||||
|
Body: "body",
|
||||||
|
RawBody: "raw body",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}, `{
|
||||||
|
"body": "body",
|
||||||
|
"clientId": 51,
|
||||||
|
"id": "id",
|
||||||
|
"rawBody": "raw body",
|
||||||
|
"timestamp": "1970-01-01T00:01:12.000000006Z",
|
||||||
|
"user": {
|
||||||
|
"authenticated": false,
|
||||||
|
"createdAt": "1970-01-01T00:00:03.000000026Z",
|
||||||
|
"displayColor": 4,
|
||||||
|
"displayName": "display name",
|
||||||
|
"id": "user id",
|
||||||
|
"isBot": false,
|
||||||
|
"previousNames": ["somebody"]
|
||||||
|
},
|
||||||
|
"visible": true
|
||||||
|
}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendChatEventUsernameChanged(t *testing.T) {
|
||||||
|
timestamp := time.Unix(72, 6).UTC()
|
||||||
|
user := user.User{
|
||||||
|
ID: "user id",
|
||||||
|
DisplayName: "display name",
|
||||||
|
DisplayColor: 4,
|
||||||
|
CreatedAt: time.Unix(3, 26).UTC(),
|
||||||
|
DisabledAt: nil,
|
||||||
|
PreviousNames: []string{"somebody"},
|
||||||
|
NameChangedAt: nil,
|
||||||
|
Scopes: []string{},
|
||||||
|
IsBot: false,
|
||||||
|
AuthenticatedAt: nil,
|
||||||
|
Authenticated: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
checkPayload(t, models.UserNameChanged, func() {
|
||||||
|
SendChatEventUsernameChanged(events.NameChangeEvent{
|
||||||
|
Event: events.Event{
|
||||||
|
Type: events.UserNameChanged,
|
||||||
|
ID: "id",
|
||||||
|
Timestamp: timestamp,
|
||||||
|
},
|
||||||
|
UserEvent: events.UserEvent{
|
||||||
|
User: &user,
|
||||||
|
ClientID: 51,
|
||||||
|
HiddenAt: nil,
|
||||||
|
},
|
||||||
|
NewName: "new name",
|
||||||
|
})
|
||||||
|
}, `{
|
||||||
|
"clientId": 51,
|
||||||
|
"id": "id",
|
||||||
|
"newName": "new name",
|
||||||
|
"timestamp": "1970-01-01T00:01:12.000000006Z",
|
||||||
|
"type": "NAME_CHANGE",
|
||||||
|
"user": {
|
||||||
|
"authenticated": false,
|
||||||
|
"createdAt": "1970-01-01T00:00:03.000000026Z",
|
||||||
|
"displayColor": 4,
|
||||||
|
"displayName": "display name",
|
||||||
|
"id": "user id",
|
||||||
|
"isBot": false,
|
||||||
|
"previousNames": ["somebody"]
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendChatEventUserJoined(t *testing.T) {
|
||||||
|
timestamp := time.Unix(72, 6).UTC()
|
||||||
|
user := user.User{
|
||||||
|
ID: "user id",
|
||||||
|
DisplayName: "display name",
|
||||||
|
DisplayColor: 4,
|
||||||
|
CreatedAt: time.Unix(3, 26).UTC(),
|
||||||
|
DisabledAt: nil,
|
||||||
|
PreviousNames: []string{"somebody"},
|
||||||
|
NameChangedAt: nil,
|
||||||
|
Scopes: []string{},
|
||||||
|
IsBot: false,
|
||||||
|
AuthenticatedAt: nil,
|
||||||
|
Authenticated: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
checkPayload(t, models.UserJoined, func() {
|
||||||
|
SendChatEventUserJoined(events.UserJoinedEvent{
|
||||||
|
Event: events.Event{
|
||||||
|
Type: events.UserJoined,
|
||||||
|
ID: "id",
|
||||||
|
Timestamp: timestamp,
|
||||||
|
},
|
||||||
|
UserEvent: events.UserEvent{
|
||||||
|
User: &user,
|
||||||
|
ClientID: 51,
|
||||||
|
HiddenAt: nil,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}, `{
|
||||||
|
"clientId": 51,
|
||||||
|
"id": "id",
|
||||||
|
"type": "USER_JOINED",
|
||||||
|
"timestamp": "1970-01-01T00:01:12.000000006Z",
|
||||||
|
"user": {
|
||||||
|
"authenticated": false,
|
||||||
|
"createdAt": "1970-01-01T00:00:03.000000026Z",
|
||||||
|
"displayColor": 4,
|
||||||
|
"displayName": "display name",
|
||||||
|
"id": "user id",
|
||||||
|
"isBot": false,
|
||||||
|
"previousNames": ["somebody"]
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendChatEventSetMessageVisibility(t *testing.T) {
|
||||||
|
timestamp := time.Unix(72, 6).UTC()
|
||||||
|
|
||||||
|
checkPayload(t, models.VisibiltyToggled, func() {
|
||||||
|
SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{
|
||||||
|
Event: events.Event{
|
||||||
|
Type: events.VisibiltyUpdate,
|
||||||
|
ID: "id",
|
||||||
|
Timestamp: timestamp,
|
||||||
|
},
|
||||||
|
UserMessageEvent: events.UserMessageEvent{},
|
||||||
|
MessageIDs: []string{"message1", "message2"},
|
||||||
|
Visible: false,
|
||||||
|
})
|
||||||
|
}, `{
|
||||||
|
"MessageIDs": [
|
||||||
|
"message1",
|
||||||
|
"message2"
|
||||||
|
],
|
||||||
|
"Visible": false,
|
||||||
|
"body": "",
|
||||||
|
"id": "id",
|
||||||
|
"timestamp": "1970-01-01T00:01:12.000000006Z",
|
||||||
|
"type": "VISIBILITY-UPDATE",
|
||||||
|
"user": null
|
||||||
|
}`)
|
||||||
|
}
|
@ -10,14 +10,18 @@ import (
|
|||||||
|
|
||||||
// SendStreamStatusEvent will send all webhook destinations the current stream status.
|
// SendStreamStatusEvent will send all webhook destinations the current stream status.
|
||||||
func SendStreamStatusEvent(eventType models.EventType) {
|
func SendStreamStatusEvent(eventType models.EventType) {
|
||||||
|
sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now())
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) {
|
||||||
SendEventToWebhooks(WebhookEvent{
|
SendEventToWebhooks(WebhookEvent{
|
||||||
Type: eventType,
|
Type: eventType,
|
||||||
EventData: map[string]interface{}{
|
EventData: map[string]interface{}{
|
||||||
"id": shortid.MustGenerate(),
|
"id": id,
|
||||||
"name": data.GetServerName(),
|
"name": data.GetServerName(),
|
||||||
"summary": data.GetServerSummary(),
|
"summary": data.GetServerSummary(),
|
||||||
"streamTitle": data.GetStreamTitle(),
|
"streamTitle": data.GetStreamTitle(),
|
||||||
"timestamp": time.Now(),
|
"timestamp": timestamp,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
26
core/webhooks/stream_test.go
Normal file
26
core/webhooks/stream_test.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package webhooks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/owncast/owncast/core/chat/events"
|
||||||
|
"github.com/owncast/owncast/core/data"
|
||||||
|
"github.com/owncast/owncast/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSendStreamStatusEvent(t *testing.T) {
|
||||||
|
data.SetServerName("my server")
|
||||||
|
data.SetServerSummary("my server where I stream")
|
||||||
|
data.SetStreamTitle("my stream")
|
||||||
|
|
||||||
|
checkPayload(t, models.StreamStarted, func() {
|
||||||
|
sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC())
|
||||||
|
}, `{
|
||||||
|
"id": "id",
|
||||||
|
"name": "my server",
|
||||||
|
"streamTitle": "my stream",
|
||||||
|
"summary": "my server where I stream",
|
||||||
|
"timestamp": "1970-01-01T00:01:12.000000006Z"
|
||||||
|
}`)
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
package webhooks
|
package webhooks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/owncast/owncast/core/data"
|
"github.com/owncast/owncast/core/data"
|
||||||
@ -27,9 +28,17 @@ type WebhookChatMessage struct {
|
|||||||
|
|
||||||
// SendEventToWebhooks will send a single webhook event to all webhook destinations.
|
// SendEventToWebhooks will send a single webhook event to all webhook destinations.
|
||||||
func SendEventToWebhooks(payload WebhookEvent) {
|
func SendEventToWebhooks(payload WebhookEvent) {
|
||||||
|
sendEventToWebhooks(payload, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) {
|
||||||
webhooks := data.GetWebhooksForEvent(payload.Type)
|
webhooks := data.GetWebhooksForEvent(payload.Type)
|
||||||
|
|
||||||
for _, webhook := range webhooks {
|
for _, webhook := range webhooks {
|
||||||
go addToQueue(webhook, payload)
|
// Use wg to track the number of notifications to be sent.
|
||||||
|
if wg != nil {
|
||||||
|
wg.Add(1)
|
||||||
|
}
|
||||||
|
addToQueue(webhook, payload, wg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
347
core/webhooks/webhooks_test.go
Normal file
347
core/webhooks/webhooks_test.go
Normal file
@ -0,0 +1,347 @@
|
|||||||
|
package webhooks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/owncast/owncast/core/data"
|
||||||
|
"github.com/owncast/owncast/models"
|
||||||
|
jsonpatch "gopkg.in/evanphx/json-patch.v5"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
dbFile.Close()
|
||||||
|
defer os.Remove(dbFile.Name())
|
||||||
|
|
||||||
|
if err := data.SetupPersistence(dbFile.Name()); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
InitWorkerPool()
|
||||||
|
defer close(queue)
|
||||||
|
|
||||||
|
m.Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Because the other tests use `sendEventToWebhooks` with a `WaitGroup` to know when the test completes,
|
||||||
|
// this test ensures that `SendToWebhooks` without a `WaitGroup` doesn't panic.
|
||||||
|
func TestPublicSend(t *testing.T) {
|
||||||
|
// Send enough events to be sure at least one worker delivers a second event.
|
||||||
|
const eventsCount = webhookWorkerPoolSize + 1
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(eventsCount)
|
||||||
|
|
||||||
|
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
wg.Done()
|
||||||
|
}))
|
||||||
|
defer svr.Close()
|
||||||
|
|
||||||
|
hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := data.DeleteWebhook(hook); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for i := 0; i < eventsCount; i++ {
|
||||||
|
wh := WebhookEvent{
|
||||||
|
EventData: struct{}{},
|
||||||
|
Type: models.MessageSent,
|
||||||
|
}
|
||||||
|
SendEventToWebhooks(wh)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that events are only sent to interested endpoints.
|
||||||
|
func TestRouting(t *testing.T) {
|
||||||
|
eventTypes := []models.EventType{models.PING, models.PONG}
|
||||||
|
|
||||||
|
calls := map[models.EventType]int{}
|
||||||
|
var lock sync.Mutex
|
||||||
|
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if len(r.URL.Path) < 1 || r.URL.Path[0] != '/' {
|
||||||
|
t.Fatalf("Got unexpected path %v", r.URL.Path)
|
||||||
|
}
|
||||||
|
pathType := r.URL.Path[1:]
|
||||||
|
var body WebhookEvent
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if body.Type != pathType {
|
||||||
|
t.Fatalf("Got %v payload on %v endpoint", body.Type, pathType)
|
||||||
|
}
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
calls[pathType] += 1
|
||||||
|
}))
|
||||||
|
defer svr.Close()
|
||||||
|
|
||||||
|
for _, eventType := range eventTypes {
|
||||||
|
hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := data.DeleteWebhook(hook); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, eventType := range eventTypes {
|
||||||
|
wh := WebhookEvent{
|
||||||
|
EventData: struct{}{},
|
||||||
|
Type: eventType,
|
||||||
|
}
|
||||||
|
sendEventToWebhooks(wh, &wg)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
for _, eventType := range eventTypes {
|
||||||
|
if calls[eventType] != 1 {
|
||||||
|
t.Errorf("Expected %v to be called exactly once but it was called %v times", eventType, calls[eventType])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that events are sent to all interested endpoints.
|
||||||
|
func TestMultiple(t *testing.T) {
|
||||||
|
const times = 2
|
||||||
|
|
||||||
|
var calls uint32
|
||||||
|
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
atomic.AddUint32(&calls, 1)
|
||||||
|
}))
|
||||||
|
defer svr.Close()
|
||||||
|
|
||||||
|
for i := 0; i < times; i++ {
|
||||||
|
hook, err := data.InsertWebhook(fmt.Sprintf("%v/%v", svr.URL, i), []models.EventType{models.MessageSent})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := data.DeleteWebhook(hook); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wh := WebhookEvent{
|
||||||
|
EventData: struct{}{},
|
||||||
|
Type: models.MessageSent,
|
||||||
|
}
|
||||||
|
sendEventToWebhooks(wh, &wg)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if atomic.LoadUint32(&calls) != times {
|
||||||
|
t.Errorf("Expected event to be sent exactly %v times but it was sent %v times", times, atomic.LoadUint32(&calls))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure when a webhook is used its last used timestamp is updated.
|
||||||
|
func TestTimestamps(t *testing.T) {
|
||||||
|
const tolerance = time.Second
|
||||||
|
start := time.Now()
|
||||||
|
eventTypes := []models.EventType{models.PING, models.PONG}
|
||||||
|
handlerIds := []int{0, 0}
|
||||||
|
handlers := []*models.Webhook{nil, nil}
|
||||||
|
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
}))
|
||||||
|
defer svr.Close()
|
||||||
|
|
||||||
|
for i, eventType := range eventTypes {
|
||||||
|
hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
handlerIds[i] = hook
|
||||||
|
defer func() {
|
||||||
|
if err := data.DeleteWebhook(hook); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wh := WebhookEvent{
|
||||||
|
EventData: struct{}{},
|
||||||
|
Type: eventTypes[0],
|
||||||
|
}
|
||||||
|
sendEventToWebhooks(wh, &wg)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
hooks, err := data.GetWebhooks()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for h, hook := range hooks {
|
||||||
|
for i, handlerId := range handlerIds {
|
||||||
|
if hook.ID == handlerId {
|
||||||
|
handlers[i] = &hooks[h]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if handlers[0] == nil {
|
||||||
|
t.Fatal("First handler was not found in registered handlers")
|
||||||
|
}
|
||||||
|
if handlers[1] == nil {
|
||||||
|
t.Fatal("Second handler was not found in registered handlers")
|
||||||
|
}
|
||||||
|
|
||||||
|
end := time.Now()
|
||||||
|
|
||||||
|
if handlers[0].Timestamp.Add(tolerance).Before(start) {
|
||||||
|
t.Errorf("First handler timestamp %v should not be before start of test %v", handlers[0].Timestamp, start)
|
||||||
|
}
|
||||||
|
|
||||||
|
if handlers[0].Timestamp.Add(tolerance).Before(handlers[1].Timestamp) {
|
||||||
|
t.Errorf("Second handler timestamp %v should not be before first handler timestamp %v", handlers[1].Timestamp, handlers[0].Timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if end.Add(tolerance).Before(handlers[1].Timestamp) {
|
||||||
|
t.Errorf("End of test %v should not be before second handler timestamp %v", end, handlers[1].Timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
if handlers[0].LastUsed == nil {
|
||||||
|
t.Error("First handler last used should have been set")
|
||||||
|
} else if handlers[0].LastUsed.Add(tolerance).Before(handlers[1].Timestamp) {
|
||||||
|
t.Errorf("First handler last used %v should not be before second handler timestamp %v", handlers[0].LastUsed, handlers[1].Timestamp)
|
||||||
|
} else if end.Add(tolerance).Before(*handlers[0].LastUsed) {
|
||||||
|
t.Errorf("End of test %v should not be before first handler last used %v", end, handlers[0].LastUsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
if handlers[1].LastUsed != nil {
|
||||||
|
t.Error("Second handler last used should not have been set")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure up to the expected number of events can be fired in parallel.
|
||||||
|
func TestParallel(t *testing.T) {
|
||||||
|
var calls uint32
|
||||||
|
|
||||||
|
var wgStart sync.WaitGroup
|
||||||
|
finished := make(chan int)
|
||||||
|
wgStart.Add(webhookWorkerPoolSize)
|
||||||
|
|
||||||
|
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
myId := atomic.AddUint32(&calls, 1)
|
||||||
|
|
||||||
|
// We made it to the pool size + 1 event, so we're done with the test.
|
||||||
|
if myId == webhookWorkerPoolSize+1 {
|
||||||
|
close(finished)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until all the handlers are started.
|
||||||
|
wgStart.Done()
|
||||||
|
wgStart.Wait()
|
||||||
|
|
||||||
|
// The first handler just returns so the pool size + 1 event can be handled.
|
||||||
|
if myId != 1 {
|
||||||
|
// The other handlers will wait for pool size + 1.
|
||||||
|
_ = <-finished
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer svr.Close()
|
||||||
|
|
||||||
|
hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := data.DeleteWebhook(hook); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wgMessages sync.WaitGroup
|
||||||
|
|
||||||
|
for i := 0; i < webhookWorkerPoolSize+1; i++ {
|
||||||
|
wh := WebhookEvent{
|
||||||
|
EventData: struct{}{},
|
||||||
|
Type: models.MessageSent,
|
||||||
|
}
|
||||||
|
sendEventToWebhooks(wh, &wgMessages)
|
||||||
|
}
|
||||||
|
|
||||||
|
wgMessages.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send an event, capture it, and verify that it has the expected payload.
|
||||||
|
func checkPayload(t *testing.T, eventType models.EventType, send func(), expectedJson string) {
|
||||||
|
eventChannel := make(chan WebhookEvent)
|
||||||
|
|
||||||
|
// Set up a server.
|
||||||
|
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
data := WebhookEvent{}
|
||||||
|
json.NewDecoder(r.Body).Decode(&data)
|
||||||
|
eventChannel <- data
|
||||||
|
}))
|
||||||
|
defer svr.Close()
|
||||||
|
|
||||||
|
// Subscribe to the webhook.
|
||||||
|
hook, err := data.InsertWebhook(svr.URL, []models.EventType{eventType})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := data.DeleteWebhook(hook); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send and capture the event.
|
||||||
|
send()
|
||||||
|
event := <-eventChannel
|
||||||
|
|
||||||
|
if event.Type != eventType {
|
||||||
|
t.Errorf("Got event type %v but expected %v", event.Type, eventType)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compare.
|
||||||
|
payloadJson, err := json.MarshalIndent(event.EventData, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Logf("Actual payload:\n%s", payloadJson)
|
||||||
|
|
||||||
|
if !jsonpatch.Equal(payloadJson, []byte(expectedJson)) {
|
||||||
|
diff, err := jsonpatch.CreateMergePatch(payloadJson, []byte(expectedJson))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
var out bytes.Buffer
|
||||||
|
if err := json.Indent(&out, diff, "", " "); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Errorf("Expected difference from actual payload:\n%s", out.Bytes())
|
||||||
|
}
|
||||||
|
}
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
@ -20,6 +21,7 @@ const (
|
|||||||
type Job struct {
|
type Job struct {
|
||||||
webhook models.Webhook
|
webhook models.Webhook
|
||||||
payload WebhookEvent
|
payload WebhookEvent
|
||||||
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
var queue chan Job
|
var queue chan Job
|
||||||
@ -34,9 +36,9 @@ func InitWorkerPool() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func addToQueue(webhook models.Webhook, payload WebhookEvent) {
|
func addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) {
|
||||||
log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL)
|
log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL)
|
||||||
queue <- Job{webhook, payload}
|
queue <- Job{webhook, payload, wg}
|
||||||
}
|
}
|
||||||
|
|
||||||
func worker(workerID int, queue <-chan Job) {
|
func worker(workerID int, queue <-chan Job) {
|
||||||
@ -49,6 +51,9 @@ func worker(workerID int, queue <-chan Job) {
|
|||||||
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err)
|
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err)
|
||||||
}
|
}
|
||||||
log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
|
log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
|
||||||
|
if job.wg != nil {
|
||||||
|
job.wg.Done()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
1
go.mod
1
go.mod
@ -53,6 +53,7 @@ require (
|
|||||||
github.com/prometheus/procfs v0.8.0 // indirect
|
github.com/prometheus/procfs v0.8.0 // indirect
|
||||||
golang.org/x/text v0.3.7 // indirect
|
golang.org/x/text v0.3.7 // indirect
|
||||||
google.golang.org/protobuf v1.28.1 // indirect
|
google.golang.org/protobuf v1.28.1 // indirect
|
||||||
|
gopkg.in/evanphx/json-patch.v5 v5.6.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
2
go.sum
2
go.sum
@ -689,6 +689,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
|
|||||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||||
|
gopkg.in/evanphx/json-patch.v5 v5.6.0 h1:BMT6KIwBD9CaU91PJCZIe46bDmBWa9ynTQgJIOpfQBk=
|
||||||
|
gopkg.in/evanphx/json-patch.v5 v5.6.0/go.mod h1:/kvTRh1TVm5wuM6OkHxqXtE/1nUZZpihg29RtuIyfvk=
|
||||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
Loading…
x
Reference in New Issue
Block a user