0
owncast/core/core.go
Jannik da199e5775
use worker pool to limit webhooks to 10 concurrent http executions (#1510) (#1525)
* refactor: use worker pool to limit webhooks to 10 concurrent http executions (#1510)

* chore: try to please go linter
2021-11-14 10:02:52 -08:00

137 lines
3.2 KiB
Go

package core
import (
"io"
"os"
"path"
"path/filepath"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/rtmp"
"github.com/owncast/owncast/core/transcoder"
"github.com/owncast/owncast/core/user"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/static"
"github.com/owncast/owncast/utils"
"github.com/owncast/owncast/yp"
)
var (
_stats *models.Stats
_storage models.StorageProvider
_transcoder *transcoder.Transcoder
_yp *yp.YP
_broadcaster *models.Broadcaster
)
var handler transcoder.HLSHandler
var fileWriter = transcoder.FileWriterReceiverService{}
// Start starts up the core processing.
func Start() error {
resetDirectories()
data.PopulateDefaults()
if err := data.VerifySettings(); err != nil {
log.Error(err)
return err
}
if err := setupStats(); err != nil {
log.Error("failed to setup the stats")
return err
}
// The HLS handler takes the written HLS playlists and segments
// and makes storage decisions. It's rather simple right now
// but will play more useful when recordings come into play.
handler = transcoder.HLSHandler{}
if err := setupStorage(); err != nil {
log.Errorln("storage error", err)
}
user.SetupUsers()
fileWriter.SetupFileWriterReceiverService(&handler)
if err := createInitialOfflineState(); err != nil {
log.Error("failed to create the initial offline state")
return err
}
_yp = yp.NewYP(GetStatus)
if err := chat.Start(GetStatus); err != nil {
log.Errorln(err)
}
// start the rtmp server
go rtmp.Start(setStreamAsConnected, setBroadcaster)
rtmpPort := data.GetRTMPPortNumber()
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
webhooks.InitWorkerPool()
return nil
}
func createInitialOfflineState() error {
transitionToOfflineVideoStreamContent()
return nil
}
// transitionToOfflineVideoStreamContent will overwrite the current stream with the
// offline video stream state only. No live stream HLS segments will continue to be
// referenced.
func transitionToOfflineVideoStreamContent() {
log.Traceln("Firing transcoder with offline stream state")
r, w := io.Pipe()
_transcoder := transcoder.NewTranscoder()
_transcoder.SetInput("pipe:0")
_transcoder.SetStdin(r)
_transcoder.SetIdentifier("offline")
go _transcoder.Start()
d := static.GetOfflineSegment()
if _, err := w.Write(d); err != nil {
log.Errorln(err)
}
// Copy the logo to be the thumbnail
logo := data.GetLogoPath()
err := utils.Copy(filepath.Join("data", logo), "webroot/thumbnail.jpg")
if err != nil {
log.Warnln(err)
}
// Delete the preview Gif
_ = os.Remove(path.Join(config.WebRoot, "preview.gif"))
}
func resetDirectories() {
log.Trace("Resetting file directories to a clean slate.")
// Wipe hls data directory
utils.CleanupDirectory(config.HLSStoragePath)
// Remove the previous thumbnail
logo := data.GetLogoPath()
if utils.DoesFileExists(logo) {
err := utils.Copy(path.Join("data", logo), filepath.Join(config.WebRoot, "thumbnail.jpg"))
if err != nil {
log.Warnln(err)
}
}
}