135 lines
2.9 KiB
Go
135 lines
2.9 KiB
Go
package rtmp
|
|
|
|
import (
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/nareix/joy4/av/avutil"
|
|
"github.com/nareix/joy4/format/ts"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/gabek/owncast/config"
|
|
"github.com/gabek/owncast/core"
|
|
"github.com/gabek/owncast/core/ffmpeg"
|
|
"github.com/gabek/owncast/utils"
|
|
|
|
"github.com/nareix/joy4/format"
|
|
"github.com/nareix/joy4/format/rtmp"
|
|
)
|
|
|
|
var (
|
|
//IsConnected whether there is a connection or not
|
|
_isConnected = false
|
|
)
|
|
|
|
func init() {
|
|
format.RegisterAll()
|
|
}
|
|
|
|
//Start starts the rtmp service, listening on port 1935
|
|
func Start() {
|
|
port := 1935
|
|
server := &rtmp.Server{}
|
|
|
|
server.HandlePublish = handlePublish
|
|
|
|
error := server.ListenAndServe()
|
|
if error != nil {
|
|
log.Panicln(error)
|
|
}
|
|
log.Printf("RTMP server is listening for incoming stream on port: %d", port)
|
|
}
|
|
|
|
func handlePublish(conn *rtmp.Conn) {
|
|
if _isConnected {
|
|
log.Errorln("stream already running; can not overtake an existing stream")
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
streamingKeyComponents := strings.Split(conn.URL.Path, "/")
|
|
streamingKey := streamingKeyComponents[len(streamingKeyComponents)-1]
|
|
if streamingKey != config.Config.VideoSettings.StreamingKey {
|
|
log.Errorln("invalid streaming key; rejecting incoming stream")
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
log.Infoln("Incoming RTMP connected.")
|
|
|
|
pipePath := utils.GetTemporaryPipePath()
|
|
syscall.Mkfifo(pipePath, 0666)
|
|
transcoder := ffmpeg.NewTranscoder()
|
|
go transcoder.Start()
|
|
|
|
_isConnected = true
|
|
core.SetStreamAsConnected()
|
|
|
|
f, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Is this too fast? Are there downsides to peeking
|
|
// into the stream so frequently?
|
|
ticker := time.NewTicker(500 * time.Millisecond)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
error := connCheck(conn.NetConn())
|
|
if error == io.EOF {
|
|
handleDisconnect(conn)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
muxer := ts.NewMuxer(f)
|
|
avutil.CopyFile(muxer, conn)
|
|
}
|
|
|
|
// Proactively check if the RTMP connection is still active or not.
|
|
// Taken from https://stackoverflow.com/a/58664631.
|
|
func connCheck(conn net.Conn) error {
|
|
var sysErr error = nil
|
|
rc, err := conn.(syscall.Conn).SyscallConn()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = rc.Read(func(fd uintptr) bool {
|
|
var buf []byte = []byte{0}
|
|
n, _, err := syscall.Recvfrom(int(fd), buf, syscall.MSG_PEEK|syscall.MSG_DONTWAIT)
|
|
switch {
|
|
case n == 0 && err == nil:
|
|
sysErr = io.EOF
|
|
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
|
|
sysErr = nil
|
|
default:
|
|
sysErr = err
|
|
}
|
|
return true
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return sysErr
|
|
}
|
|
|
|
func handleDisconnect(conn *rtmp.Conn) {
|
|
log.Infoln("RTMP disconnected.")
|
|
conn.Close()
|
|
_isConnected = false
|
|
core.SetStreamAsDisconnected()
|
|
}
|
|
|
|
//IsConnected gets whether there is an rtmp connection or not
|
|
//this is only a getter since it is controlled by the rtmp handler
|
|
func IsConnected() bool {
|
|
return _isConnected
|
|
}
|