diff --git a/activitypub/activitypub.go b/activitypub/activitypub.go index 0d5af327a..287ea7e52 100644 --- a/activitypub/activitypub.go +++ b/activitypub/activitypub.go @@ -1,6 +1,8 @@ package activitypub import ( + "math" + "github.com/owncast/owncast/activitypub/crypto" "github.com/owncast/owncast/activitypub/inbox" "github.com/owncast/owncast/activitypub/outbox" @@ -17,7 +19,9 @@ import ( func Start(datastore *data.Datastore) { configRepository := configrepository.Get() persistence.Setup(datastore) - workerpool.InitOutboundWorkerPool() + + outboundWorkerPoolSize := getOutboundWorkerPoolSize() + workerpool.InitOutboundWorkerPool(outboundWorkerPoolSize) inbox.InitInboxWorkerPool() // Generate the keys for signing federated activity if needed. @@ -31,6 +35,17 @@ func Start(datastore *data.Datastore) { } } +func getOutboundWorkerPoolSize() int { + var followerCount int64 + fc, err := persistence.GetFollowerCount() + if err != nil { + log.Errorln("Unable to get follower count", err) + fc = 50 // Arbitrary fallback value. + } + followerCount = int64(math.Max(float64(fc), 50)) + return int(followerCount * 5) +} + // SendLive will send a "Go Live" message to followers. func SendLive() error { return outbox.SendLive() diff --git a/activitypub/workerpool/outbound.go b/activitypub/workerpool/outbound.go index 8d8f347f5..0681869e0 100644 --- a/activitypub/workerpool/outbound.go +++ b/activitypub/workerpool/outbound.go @@ -2,14 +2,10 @@ package workerpool import ( "net/http" - "runtime" log "github.com/sirupsen/logrus" ) -// workerPoolSize defines the number of concurrent HTTP ActivityPub requests. -var workerPoolSize = runtime.GOMAXPROCS(0) - // Job struct bundling the ActivityPub and the payload in one struct. type Job struct { request *http.Request @@ -18,8 +14,8 @@ type Job struct { var queue chan Job // InitOutboundWorkerPool starts n go routines that await ActivityPub jobs. -func InitOutboundWorkerPool() { - queue = make(chan Job, workerPoolSize*5) // arbitrary buffer value. +func InitOutboundWorkerPool(workerPoolSize int) { + queue = make(chan Job, workerPoolSize) // start workers for i := 1; i <= workerPoolSize; i++ { @@ -32,7 +28,7 @@ func AddToOutboundQueue(req *http.Request) { select { case queue <- Job{req}: default: - log.Warnln("Outbound ActivityPub job queue is full") + log.Debugln("Outbound ActivityPub job queue is full") queue <- Job{req} // will block until received by a worker at this point } log.Tracef("Queued request for ActivityPub destination %s", req.RequestURI)