From 3f9f4a151c61f8b9c0ae6fd606349c8811a01b9f Mon Sep 17 00:00:00 2001 From: Jannik Date: Sat, 3 Jul 2021 21:28:25 +0200 Subject: [PATCH] refactor: use io.Pipe and ffmpeg's stdin (#1148) This removes the usage of `syscall.Mkfifo` which was previously used and won't work on Windows systems and opens the door for other processes on the computer to interfere in the rtmp stream (dumping bad content in the fifo, removing the file, blocking the file in offline status). Instead, this patch introduces an `io.Pipe` which pipes the RTMP stream to the ffmpeg command while staying in Owncast. Further links: * ffmpeg on using `pipe:0` as an input: https://ffmpeg.org/ffmpeg-protocols.html#pipe --- core/rtmp/rtmp.go | 29 +++++++---------------------- core/streamState.go | 4 +++- core/transcoder/transcoder.go | 14 +++++++++++++- utils/utils.go | 5 ----- 4 files changed, 23 insertions(+), 29 deletions(-) diff --git a/core/rtmp/rtmp.go b/core/rtmp/rtmp.go index 20b303792..bc089538e 100644 --- a/core/rtmp/rtmp.go +++ b/core/rtmp/rtmp.go @@ -4,8 +4,6 @@ import ( "fmt" "io" "net" - "os" - "syscall" "time" "github.com/nareix/joy5/format/flv" @@ -15,21 +13,20 @@ import ( "github.com/nareix/joy5/format/rtmp" "github.com/owncast/owncast/core/data" "github.com/owncast/owncast/models" - "github.com/owncast/owncast/utils" ) var ( _hasInboundRTMPConnection = false ) -var _pipe *os.File +var _pipe *io.PipeWriter var _rtmpConnection net.Conn -var _setStreamAsConnected func() +var _setStreamAsConnected func(*io.PipeReader) var _setBroadcaster func(models.Broadcaster) // Start starts the rtmp service, listening on specified RTMP port. -func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster)) { +func Start(setStreamAsConnected func(*io.PipeReader), setBroadcaster func(models.Broadcaster)) { _setStreamAsConnected = setStreamAsConnected _setBroadcaster = setBroadcaster @@ -83,27 +80,15 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) { return } + rtmpOut, rtmpIn := io.Pipe() + _pipe = rtmpIn log.Infoln("Inbound stream connected.") - _setStreamAsConnected() - - pipePath := utils.GetTemporaryPipePath(fmt.Sprint(data.GetRTMPPortNumber())) - if !utils.DoesFileExists(pipePath) { - err := syscall.Mkfifo(pipePath, 0666) - if err != nil { - log.Fatalln(err) - } - } + _setStreamAsConnected(rtmpOut) _hasInboundRTMPConnection = true _rtmpConnection = nc - f, err := os.OpenFile(pipePath, os.O_RDWR, os.ModeNamedPipe) - _pipe = f - if err != nil { - log.Fatalln("unable to open", pipePath, "and will exit") - } - - w := flv.NewMuxer(f) + w := flv.NewMuxer(rtmpIn) for { if !_hasInboundRTMPConnection { diff --git a/core/streamState.go b/core/streamState.go index d88c49895..0b13f7749 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -3,6 +3,7 @@ package core import ( "bufio" "fmt" + "io" "os" "path/filepath" "time" @@ -29,7 +30,7 @@ var _onlineCleanupTicker *time.Ticker var _currentBroadcast *models.CurrentBroadcast // setStreamAsConnected sets the stream as connected. -func setStreamAsConnected() { +func setStreamAsConnected(rtmpOut *io.PipeReader) { _stats.StreamConnected = true _stats.LastConnectTime = utils.NullTime{Time: time.Now(), Valid: true} _stats.LastDisconnectTime = utils.NullTime{Time: time.Now(), Valid: false} @@ -65,6 +66,7 @@ func setStreamAsConnected() { _transcoder = nil _currentBroadcast = nil } + _transcoder.SetStdin(rtmpOut) _transcoder.Start() }() diff --git a/core/transcoder/transcoder.go b/core/transcoder/transcoder.go index 39b809e5d..4cb8a47a9 100644 --- a/core/transcoder/transcoder.go +++ b/core/transcoder/transcoder.go @@ -3,6 +3,7 @@ package transcoder import ( "bufio" "fmt" + "io" "os/exec" "strconv" "strings" @@ -22,6 +23,7 @@ var _commandExec *exec.Cmd // Transcoder is a single instance of a video transcoder. type Transcoder struct { input string + stdin *io.PipeReader segmentOutputPath string playlistOutputPath string variants []HLSVariant @@ -95,6 +97,11 @@ func (t *Transcoder) Start() { } _commandExec = exec.Command("sh", "-c", command) + + if t.stdin != nil { + _commandExec.Stdin = t.stdin + } + stdout, err := _commandExec.StderrPipe() if err != nil { panic(err) @@ -240,7 +247,7 @@ func NewTranscoder() *Transcoder { // Playlists are available via the local HTTP server transcoder.playlistOutputPath = config.PublicHLSStoragePath - transcoder.input = utils.GetTemporaryPipePath(fmt.Sprint(data.GetRTMPPortNumber())) + transcoder.input = "pipe:0" // stdin for index, quality := range transcoder.currentStreamOutputSettings { variant := getVariantFromConfigQuality(quality, index) @@ -389,6 +396,11 @@ func (t *Transcoder) SetInput(input string) { t.input = input } +// SetStdin sets the Stdin of the ffmpeg command. +func (t *Transcoder) SetStdin(rtmp *io.PipeReader) { + t.stdin = rtmp +} + // SetOutputPath sets the root directory that should include playlists and video segments. func (t *Transcoder) SetOutputPath(output string) { t.segmentOutputPath = output diff --git a/utils/utils.go b/utils/utils.go index 9c4e437be..b0f3a992e 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -20,11 +20,6 @@ import ( "mvdan.cc/xurls" ) -// GetTemporaryPipePath gets the temporary path for the streampipe.flv file. -func GetTemporaryPipePath(identifier string) string { - return filepath.Join(os.TempDir(), "streampipe."+identifier) -} - // DoesFileExists checks if the file exists. func DoesFileExists(name string) bool { if _, err := os.Stat(name); err != nil {