diff --git a/core/rtmp/rtmp.go b/core/rtmp/rtmp.go index 20b303792..bc089538e 100644 --- a/core/rtmp/rtmp.go +++ b/core/rtmp/rtmp.go @@ -4,8 +4,6 @@ import ( "fmt" "io" "net" - "os" - "syscall" "time" "github.com/nareix/joy5/format/flv" @@ -15,21 +13,20 @@ import ( "github.com/nareix/joy5/format/rtmp" "github.com/owncast/owncast/core/data" "github.com/owncast/owncast/models" - "github.com/owncast/owncast/utils" ) var ( _hasInboundRTMPConnection = false ) -var _pipe *os.File +var _pipe *io.PipeWriter var _rtmpConnection net.Conn -var _setStreamAsConnected func() +var _setStreamAsConnected func(*io.PipeReader) var _setBroadcaster func(models.Broadcaster) // Start starts the rtmp service, listening on specified RTMP port. -func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster)) { +func Start(setStreamAsConnected func(*io.PipeReader), setBroadcaster func(models.Broadcaster)) { _setStreamAsConnected = setStreamAsConnected _setBroadcaster = setBroadcaster @@ -83,27 +80,15 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) { return } + rtmpOut, rtmpIn := io.Pipe() + _pipe = rtmpIn log.Infoln("Inbound stream connected.") - _setStreamAsConnected() - - pipePath := utils.GetTemporaryPipePath(fmt.Sprint(data.GetRTMPPortNumber())) - if !utils.DoesFileExists(pipePath) { - err := syscall.Mkfifo(pipePath, 0666) - if err != nil { - log.Fatalln(err) - } - } + _setStreamAsConnected(rtmpOut) _hasInboundRTMPConnection = true _rtmpConnection = nc - f, err := os.OpenFile(pipePath, os.O_RDWR, os.ModeNamedPipe) - _pipe = f - if err != nil { - log.Fatalln("unable to open", pipePath, "and will exit") - } - - w := flv.NewMuxer(f) + w := flv.NewMuxer(rtmpIn) for { if !_hasInboundRTMPConnection { diff --git a/core/streamState.go b/core/streamState.go index d88c49895..0b13f7749 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -3,6 +3,7 @@ package core import ( "bufio" "fmt" + "io" "os" "path/filepath" "time" @@ -29,7 +30,7 @@ var _onlineCleanupTicker *time.Ticker var _currentBroadcast *models.CurrentBroadcast // setStreamAsConnected sets the stream as connected. -func setStreamAsConnected() { +func setStreamAsConnected(rtmpOut *io.PipeReader) { _stats.StreamConnected = true _stats.LastConnectTime = utils.NullTime{Time: time.Now(), Valid: true} _stats.LastDisconnectTime = utils.NullTime{Time: time.Now(), Valid: false} @@ -65,6 +66,7 @@ func setStreamAsConnected() { _transcoder = nil _currentBroadcast = nil } + _transcoder.SetStdin(rtmpOut) _transcoder.Start() }() diff --git a/core/transcoder/transcoder.go b/core/transcoder/transcoder.go index 39b809e5d..4cb8a47a9 100644 --- a/core/transcoder/transcoder.go +++ b/core/transcoder/transcoder.go @@ -3,6 +3,7 @@ package transcoder import ( "bufio" "fmt" + "io" "os/exec" "strconv" "strings" @@ -22,6 +23,7 @@ var _commandExec *exec.Cmd // Transcoder is a single instance of a video transcoder. type Transcoder struct { input string + stdin *io.PipeReader segmentOutputPath string playlistOutputPath string variants []HLSVariant @@ -95,6 +97,11 @@ func (t *Transcoder) Start() { } _commandExec = exec.Command("sh", "-c", command) + + if t.stdin != nil { + _commandExec.Stdin = t.stdin + } + stdout, err := _commandExec.StderrPipe() if err != nil { panic(err) @@ -240,7 +247,7 @@ func NewTranscoder() *Transcoder { // Playlists are available via the local HTTP server transcoder.playlistOutputPath = config.PublicHLSStoragePath - transcoder.input = utils.GetTemporaryPipePath(fmt.Sprint(data.GetRTMPPortNumber())) + transcoder.input = "pipe:0" // stdin for index, quality := range transcoder.currentStreamOutputSettings { variant := getVariantFromConfigQuality(quality, index) @@ -389,6 +396,11 @@ func (t *Transcoder) SetInput(input string) { t.input = input } +// SetStdin sets the Stdin of the ffmpeg command. +func (t *Transcoder) SetStdin(rtmp *io.PipeReader) { + t.stdin = rtmp +} + // SetOutputPath sets the root directory that should include playlists and video segments. func (t *Transcoder) SetOutputPath(output string) { t.segmentOutputPath = output diff --git a/utils/utils.go b/utils/utils.go index 9c4e437be..b0f3a992e 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -20,11 +20,6 @@ import ( "mvdan.cc/xurls" ) -// GetTemporaryPipePath gets the temporary path for the streampipe.flv file. -func GetTemporaryPipePath(identifier string) string { - return filepath.Join(os.TempDir(), "streampipe."+identifier) -} - // DoesFileExists checks if the file exists. func DoesFileExists(name string) bool { if _, err := os.Stat(name); err != nil {