* refactor: use worker pool to limit webhooks to 10 concurrent http executions (#1510) * chore: try to please go linter
This commit is contained in:
parent
06f3644b11
commit
da199e5775
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/owncast/owncast/core/rtmp"
|
"github.com/owncast/owncast/core/rtmp"
|
||||||
"github.com/owncast/owncast/core/transcoder"
|
"github.com/owncast/owncast/core/transcoder"
|
||||||
"github.com/owncast/owncast/core/user"
|
"github.com/owncast/owncast/core/user"
|
||||||
|
"github.com/owncast/owncast/core/webhooks"
|
||||||
"github.com/owncast/owncast/models"
|
"github.com/owncast/owncast/models"
|
||||||
"github.com/owncast/owncast/static"
|
"github.com/owncast/owncast/static"
|
||||||
"github.com/owncast/owncast/utils"
|
"github.com/owncast/owncast/utils"
|
||||||
@ -77,6 +78,8 @@ func Start() error {
|
|||||||
rtmpPort := data.GetRTMPPortNumber()
|
rtmpPort := data.GetRTMPPortNumber()
|
||||||
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
|
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
|
||||||
|
|
||||||
|
webhooks.InitWorkerPool()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,16 +1,10 @@
|
|||||||
package webhooks
|
package webhooks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/owncast/owncast/core/user"
|
|
||||||
|
|
||||||
"github.com/owncast/owncast/core/data"
|
"github.com/owncast/owncast/core/data"
|
||||||
|
"github.com/owncast/owncast/core/user"
|
||||||
"github.com/owncast/owncast/models"
|
"github.com/owncast/owncast/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,40 +30,6 @@ func SendEventToWebhooks(payload WebhookEvent) {
|
|||||||
webhooks := data.GetWebhooksForEvent(payload.Type)
|
webhooks := data.GetWebhooksForEvent(payload.Type)
|
||||||
|
|
||||||
for _, webhook := range webhooks {
|
for _, webhook := range webhooks {
|
||||||
go func(webhook models.Webhook, payload WebhookEvent) {
|
go addToQueue(webhook, payload)
|
||||||
log.Debugf("Event %s sent to Webhook %s", payload.Type, webhook.URL)
|
|
||||||
if err := sendWebhook(webhook, payload); err != nil {
|
|
||||||
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", payload.Type, webhook.URL, err)
|
|
||||||
}
|
|
||||||
}(webhook, payload)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendWebhook(webhook models.Webhook, payload WebhookEvent) error {
|
|
||||||
jsonText, err := json.Marshal(payload)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", webhook.URL, bytes.NewReader(jsonText))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Header.Set("Content-Type", "application/json")
|
|
||||||
|
|
||||||
client := &http.Client{}
|
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if err := data.SetWebhookAsUsed(webhook); err != nil {
|
|
||||||
log.Warnln(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
82
core/webhooks/workerpool.go
Normal file
82
core/webhooks/workerpool.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
package webhooks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/owncast/owncast/core/data"
|
||||||
|
"github.com/owncast/owncast/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// webhookWorkerPoolSize defines the number of concurrent HTTP webhook requests.
|
||||||
|
webhookWorkerPoolSize = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
// Job struct bundling the webhook and the payload in one struct.
|
||||||
|
type Job struct {
|
||||||
|
webhook models.Webhook
|
||||||
|
payload WebhookEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
var queue chan Job
|
||||||
|
|
||||||
|
// InitWorkerPool starts n go routines that await webhook jobs.
|
||||||
|
func InitWorkerPool() {
|
||||||
|
queue = make(chan Job)
|
||||||
|
|
||||||
|
// start workers
|
||||||
|
for i := 1; i <= webhookWorkerPoolSize; i++ {
|
||||||
|
go worker(i, queue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func addToQueue(webhook models.Webhook, payload WebhookEvent) {
|
||||||
|
log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL)
|
||||||
|
queue <- Job{webhook, payload}
|
||||||
|
}
|
||||||
|
|
||||||
|
func worker(workerID int, queue <-chan Job) {
|
||||||
|
log.Debugf("Started Webhook worker %d", workerID)
|
||||||
|
|
||||||
|
for job := range queue {
|
||||||
|
log.Debugf("Event %s sent to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
|
||||||
|
|
||||||
|
if err := sendWebhook(job); err != nil {
|
||||||
|
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err)
|
||||||
|
}
|
||||||
|
log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendWebhook(job Job) error {
|
||||||
|
jsonText, err := json.Marshal(job.payload)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", job.webhook.URL, bytes.NewReader(jsonText))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
client := &http.Client{}
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if err := data.SetWebhookAsUsed(job.webhook); err != nil {
|
||||||
|
log.Warnln(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user