0

fix(ap): increase outbound worker pool size to use follower count (#4049)

This commit is contained in:
Gabe Kangas 2024-12-17 08:47:15 -08:00 committed by GitHub
parent ae1be1379c
commit 4fbdb3f0cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 8 deletions

View File

@ -1,6 +1,8 @@
package activitypub package activitypub
import ( import (
"math"
"github.com/owncast/owncast/activitypub/crypto" "github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/inbox" "github.com/owncast/owncast/activitypub/inbox"
"github.com/owncast/owncast/activitypub/outbox" "github.com/owncast/owncast/activitypub/outbox"
@ -17,7 +19,9 @@ import (
func Start(datastore *data.Datastore) { func Start(datastore *data.Datastore) {
configRepository := configrepository.Get() configRepository := configrepository.Get()
persistence.Setup(datastore) persistence.Setup(datastore)
workerpool.InitOutboundWorkerPool()
outboundWorkerPoolSize := getOutboundWorkerPoolSize()
workerpool.InitOutboundWorkerPool(outboundWorkerPoolSize)
inbox.InitInboxWorkerPool() inbox.InitInboxWorkerPool()
// Generate the keys for signing federated activity if needed. // Generate the keys for signing federated activity if needed.
@ -31,6 +35,17 @@ func Start(datastore *data.Datastore) {
} }
} }
func getOutboundWorkerPoolSize() int {
var followerCount int64
fc, err := persistence.GetFollowerCount()
if err != nil {
log.Errorln("Unable to get follower count", err)
fc = 50 // Arbitrary fallback value.
}
followerCount = int64(math.Max(float64(fc), 50))
return int(followerCount * 5)
}
// SendLive will send a "Go Live" message to followers. // SendLive will send a "Go Live" message to followers.
func SendLive() error { func SendLive() error {
return outbox.SendLive() return outbox.SendLive()

View File

@ -2,14 +2,10 @@ package workerpool
import ( import (
"net/http" "net/http"
"runtime"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// workerPoolSize defines the number of concurrent HTTP ActivityPub requests.
var workerPoolSize = runtime.GOMAXPROCS(0)
// Job struct bundling the ActivityPub and the payload in one struct. // Job struct bundling the ActivityPub and the payload in one struct.
type Job struct { type Job struct {
request *http.Request request *http.Request
@ -18,8 +14,8 @@ type Job struct {
var queue chan Job var queue chan Job
// InitOutboundWorkerPool starts n go routines that await ActivityPub jobs. // InitOutboundWorkerPool starts n go routines that await ActivityPub jobs.
func InitOutboundWorkerPool() { func InitOutboundWorkerPool(workerPoolSize int) {
queue = make(chan Job, workerPoolSize*5) // arbitrary buffer value. queue = make(chan Job, workerPoolSize)
// start workers // start workers
for i := 1; i <= workerPoolSize; i++ { for i := 1; i <= workerPoolSize; i++ {
@ -32,7 +28,7 @@ func AddToOutboundQueue(req *http.Request) {
select { select {
case queue <- Job{req}: case queue <- Job{req}:
default: default:
log.Warnln("Outbound ActivityPub job queue is full") log.Debugln("Outbound ActivityPub job queue is full")
queue <- Job{req} // will block until received by a worker at this point queue <- Job{req} // will block until received by a worker at this point
} }
log.Tracef("Queued request for ActivityPub destination %s", req.RequestURI) log.Tracef("Queued request for ActivityPub destination %s", req.RequestURI)