0

refactor: use io.Pipe and ffmpeg's stdin (#1148)

This removes the usage of `syscall.Mkfifo` which was previously used and
won't work on Windows systems and opens the door for other processes on
the computer to interfere in the rtmp stream (dumping bad content in the
fifo, removing the file, blocking the file in offline status).
Instead, this patch introduces an `io.Pipe` which pipes the RTMP stream
to the ffmpeg command while staying in Owncast.

Further links:
* ffmpeg on using `pipe:0` as an input: https://ffmpeg.org/ffmpeg-protocols.html#pipe
This commit is contained in:
Jannik 2021-07-03 21:28:25 +02:00 committed by GitHub
parent 0858e2ed52
commit 3f9f4a151c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 29 deletions

View File

@ -4,8 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"os"
"syscall"
"time" "time"
"github.com/nareix/joy5/format/flv" "github.com/nareix/joy5/format/flv"
@ -15,21 +13,20 @@ import (
"github.com/nareix/joy5/format/rtmp" "github.com/nareix/joy5/format/rtmp"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
) )
var ( var (
_hasInboundRTMPConnection = false _hasInboundRTMPConnection = false
) )
var _pipe *os.File var _pipe *io.PipeWriter
var _rtmpConnection net.Conn var _rtmpConnection net.Conn
var _setStreamAsConnected func() var _setStreamAsConnected func(*io.PipeReader)
var _setBroadcaster func(models.Broadcaster) var _setBroadcaster func(models.Broadcaster)
// Start starts the rtmp service, listening on specified RTMP port. // Start starts the rtmp service, listening on specified RTMP port.
func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster)) { func Start(setStreamAsConnected func(*io.PipeReader), setBroadcaster func(models.Broadcaster)) {
_setStreamAsConnected = setStreamAsConnected _setStreamAsConnected = setStreamAsConnected
_setBroadcaster = setBroadcaster _setBroadcaster = setBroadcaster
@ -83,27 +80,15 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) {
return return
} }
rtmpOut, rtmpIn := io.Pipe()
_pipe = rtmpIn
log.Infoln("Inbound stream connected.") log.Infoln("Inbound stream connected.")
_setStreamAsConnected() _setStreamAsConnected(rtmpOut)
pipePath := utils.GetTemporaryPipePath(fmt.Sprint(data.GetRTMPPortNumber()))
if !utils.DoesFileExists(pipePath) {
err := syscall.Mkfifo(pipePath, 0666)
if err != nil {
log.Fatalln(err)
}
}
_hasInboundRTMPConnection = true _hasInboundRTMPConnection = true
_rtmpConnection = nc _rtmpConnection = nc
f, err := os.OpenFile(pipePath, os.O_RDWR, os.ModeNamedPipe) w := flv.NewMuxer(rtmpIn)
_pipe = f
if err != nil {
log.Fatalln("unable to open", pipePath, "and will exit")
}
w := flv.NewMuxer(f)
for { for {
if !_hasInboundRTMPConnection { if !_hasInboundRTMPConnection {

View File

@ -3,6 +3,7 @@ package core
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@ -29,7 +30,7 @@ var _onlineCleanupTicker *time.Ticker
var _currentBroadcast *models.CurrentBroadcast var _currentBroadcast *models.CurrentBroadcast
// setStreamAsConnected sets the stream as connected. // setStreamAsConnected sets the stream as connected.
func setStreamAsConnected() { func setStreamAsConnected(rtmpOut *io.PipeReader) {
_stats.StreamConnected = true _stats.StreamConnected = true
_stats.LastConnectTime = utils.NullTime{Time: time.Now(), Valid: true} _stats.LastConnectTime = utils.NullTime{Time: time.Now(), Valid: true}
_stats.LastDisconnectTime = utils.NullTime{Time: time.Now(), Valid: false} _stats.LastDisconnectTime = utils.NullTime{Time: time.Now(), Valid: false}
@ -65,6 +66,7 @@ func setStreamAsConnected() {
_transcoder = nil _transcoder = nil
_currentBroadcast = nil _currentBroadcast = nil
} }
_transcoder.SetStdin(rtmpOut)
_transcoder.Start() _transcoder.Start()
}() }()

View File

@ -3,6 +3,7 @@ package transcoder
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"io"
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
@ -22,6 +23,7 @@ var _commandExec *exec.Cmd
// Transcoder is a single instance of a video transcoder. // Transcoder is a single instance of a video transcoder.
type Transcoder struct { type Transcoder struct {
input string input string
stdin *io.PipeReader
segmentOutputPath string segmentOutputPath string
playlistOutputPath string playlistOutputPath string
variants []HLSVariant variants []HLSVariant
@ -95,6 +97,11 @@ func (t *Transcoder) Start() {
} }
_commandExec = exec.Command("sh", "-c", command) _commandExec = exec.Command("sh", "-c", command)
if t.stdin != nil {
_commandExec.Stdin = t.stdin
}
stdout, err := _commandExec.StderrPipe() stdout, err := _commandExec.StderrPipe()
if err != nil { if err != nil {
panic(err) panic(err)
@ -240,7 +247,7 @@ func NewTranscoder() *Transcoder {
// Playlists are available via the local HTTP server // Playlists are available via the local HTTP server
transcoder.playlistOutputPath = config.PublicHLSStoragePath transcoder.playlistOutputPath = config.PublicHLSStoragePath
transcoder.input = utils.GetTemporaryPipePath(fmt.Sprint(data.GetRTMPPortNumber())) transcoder.input = "pipe:0" // stdin
for index, quality := range transcoder.currentStreamOutputSettings { for index, quality := range transcoder.currentStreamOutputSettings {
variant := getVariantFromConfigQuality(quality, index) variant := getVariantFromConfigQuality(quality, index)
@ -389,6 +396,11 @@ func (t *Transcoder) SetInput(input string) {
t.input = input t.input = input
} }
// SetStdin sets the Stdin of the ffmpeg command.
func (t *Transcoder) SetStdin(rtmp *io.PipeReader) {
t.stdin = rtmp
}
// SetOutputPath sets the root directory that should include playlists and video segments. // SetOutputPath sets the root directory that should include playlists and video segments.
func (t *Transcoder) SetOutputPath(output string) { func (t *Transcoder) SetOutputPath(output string) {
t.segmentOutputPath = output t.segmentOutputPath = output

View File

@ -20,11 +20,6 @@ import (
"mvdan.cc/xurls" "mvdan.cc/xurls"
) )
// GetTemporaryPipePath gets the temporary path for the streampipe.flv file.
func GetTemporaryPipePath(identifier string) string {
return filepath.Join(os.TempDir(), "streampipe."+identifier)
}
// DoesFileExists checks if the file exists. // DoesFileExists checks if the file exists.
func DoesFileExists(name string) bool { func DoesFileExists(name string) bool {
if _, err := os.Stat(name); err != nil { if _, err := os.Stat(name); err != nil {