0

Refactor the offline clip handling.

More stable, reduced function complexity.
This commit is contained in:
Gabe Kangas 2021-11-26 20:53:27 -08:00
parent 3ed7035e39
commit 73e58a7801
4 changed files with 158 additions and 94 deletions

View File

@ -1,7 +1,6 @@
package core package core
import ( import (
"io"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@ -16,7 +15,6 @@ import (
"github.com/owncast/owncast/core/user" "github.com/owncast/owncast/core/user"
"github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/static"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
"github.com/owncast/owncast/yp" "github.com/owncast/owncast/yp"
) )
@ -29,8 +27,10 @@ var (
_broadcaster *models.Broadcaster _broadcaster *models.Broadcaster
) )
var handler transcoder.HLSHandler var (
var fileWriter = transcoder.FileWriterReceiverService{} handler transcoder.HLSHandler
fileWriter = transcoder.FileWriterReceiverService{}
)
// Start starts up the core processing. // Start starts up the core processing.
func Start() error { func Start() error {
@ -95,23 +95,22 @@ func createInitialOfflineState() error {
func transitionToOfflineVideoStreamContent() { func transitionToOfflineVideoStreamContent() {
log.Traceln("Firing transcoder with offline stream state") log.Traceln("Firing transcoder with offline stream state")
r, w := io.Pipe()
_transcoder := transcoder.NewTranscoder() _transcoder := transcoder.NewTranscoder()
_transcoder.SetInput("pipe:0")
_transcoder.SetStdin(r)
_transcoder.SetIdentifier("offline") _transcoder.SetIdentifier("offline")
go _transcoder.Start() _transcoder.SetLatencyLevel(models.GetLatencyLevel(4))
_transcoder.SetIsEvent(true)
d := static.GetOfflineSegment() offlineFilePath, err := saveOfflineClipToDisk("offline.ts")
if _, err := w.Write(d); err != nil { if err != nil {
log.Errorln(err) log.Fatalln("unable to save offline clip:", err)
} }
_transcoder.SetInput(offlineFilePath)
go _transcoder.Start()
// Copy the logo to be the thumbnail // Copy the logo to be the thumbnail
logo := data.GetLogoPath() logo := data.GetLogoPath()
err := utils.Copy(filepath.Join("data", logo), "webroot/thumbnail.jpg") if err = utils.Copy(filepath.Join("data", logo), "webroot/thumbnail.jpg"); err != nil {
if err != nil {
log.Warnln(err) log.Warnln(err)
} }

119
core/offlineState.go Normal file
View File

@ -0,0 +1,119 @@
package core
import (
"bufio"
"fmt"
"os"
"path/filepath"
"github.com/grafov/m3u8"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/static"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
func appendOfflineToVariantPlaylist(index int, playlistFilePath string) {
f, err := os.OpenFile(playlistFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm) //nolint
if err != nil {
log.Fatalln(err)
}
playlist, _, err := m3u8.DecodeFrom(bufio.NewReader(f), true)
if err != nil {
log.Fatalln(err)
}
if err := f.Close(); err != nil {
log.Errorln("error closing playlist file", err)
}
variantPlaylist := playlist.(*m3u8.MediaPlaylist)
variantPlaylist.MediaType = m3u8.EVENT
tmpFileName := fmt.Sprintf("tmp-stream-%d.m3u8", index)
atomicWriteTmpPlaylistFile, err := os.CreateTemp(os.TempDir(), tmpFileName)
if err != nil {
log.Errorln("error creating tmp playlist file to write to", playlistFilePath, err)
return
}
if _, err := atomicWriteTmpPlaylistFile.Write(variantPlaylist.Encode().Bytes()); err != nil {
log.Errorln(err)
}
// Manually add the offline clip to the end of the media playlist.
_, _ = atomicWriteTmpPlaylistFile.WriteString("#EXT-X-DISCONTINUITY\n")
// If "offline" content gets changed then change the duration below
_, _ = atomicWriteTmpPlaylistFile.WriteString("#EXTINF:8.000000,\n")
_, _ = atomicWriteTmpPlaylistFile.WriteString("offline.ts\n")
_, _ = atomicWriteTmpPlaylistFile.WriteString("#EXT-X-ENDLIST\n")
if err := atomicWriteTmpPlaylistFile.Close(); err != nil {
log.Errorln(err)
}
if err := utils.Move(atomicWriteTmpPlaylistFile.Name(), playlistFilePath); err != nil {
log.Errorln("error moving temp playlist to overwrite existing one", err)
}
}
func makeVariantIndexOffline(index int, offlineFilePath string, offlineFilename string) {
playlistFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/stream.m3u8"), index)
segmentFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/%s"), index, offlineFilename)
if err := utils.Copy(offlineFilePath, segmentFilePath); err != nil {
log.Warnln(err)
}
if _, err := _storage.Save(segmentFilePath, 0); err != nil {
log.Warnln(err)
}
if utils.DoesFileExists(playlistFilePath) {
appendOfflineToVariantPlaylist(index, playlistFilePath)
} else {
createEmptyOfflinePlaylist(playlistFilePath, offlineFilename)
}
if _, err := _storage.Save(playlistFilePath, 0); err != nil {
log.Warnln(err)
}
}
func createEmptyOfflinePlaylist(playlistFilePath string, offlineFilename string) {
p, err := m3u8.NewMediaPlaylist(1, 1)
if err != nil {
log.Errorln(err)
}
// If "offline" content gets changed then change the duration below
if err := p.Append(offlineFilename, 8.0, ""); err != nil {
log.Errorln(err)
}
p.Close()
f, err := os.Create(playlistFilePath)
if err != nil {
log.Errorln(err)
}
defer f.Close()
if _, err := f.Write(p.Encode().Bytes()); err != nil {
log.Errorln(err)
}
}
func saveOfflineClipToDisk(offlineFilename string) (string, error) {
offlineFileData := static.GetOfflineSegment()
offlineTmpFile, err := os.CreateTemp(os.TempDir(), offlineFilename)
if err != nil {
log.Errorln("unable to create temp file for offline video segment", err)
}
if _, err = offlineTmpFile.Write(offlineFileData); err != nil {
return "", fmt.Errorf("unable to write offline segment to disk: %s", err)
}
offlineFilePath := offlineTmpFile.Name()
return offlineFilePath, nil
}

View File

@ -1,11 +1,7 @@
package core package core
import ( import (
"bufio"
"fmt"
"io" "io"
"os"
"path/filepath"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -17,10 +13,7 @@ import (
"github.com/owncast/owncast/core/transcoder" "github.com/owncast/owncast/core/transcoder"
"github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/owncast/owncast/static"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
"github.com/grafov/m3u8"
) )
// After the stream goes offline this timer fires a full cleanup after N min. // After the stream goes offline this timer fires a full cleanup after N min.
@ -85,19 +78,14 @@ func SetStreamAsDisconnected() {
_stats.LastConnectTime = nil _stats.LastConnectTime = nil
_broadcaster = nil _broadcaster = nil
offlineFileData := static.GetOfflineSegment()
offlineFilename := "offline.ts" offlineFilename := "offline.ts"
offlineTmpFile, err := os.CreateTemp(os.TempDir(), offlineFilename)
offlineFilePath, err := saveOfflineClipToDisk(offlineFilename)
if err != nil { if err != nil {
log.Errorln("unable to create temp file for offline video segment") log.Errorln(err)
return
} }
if _, err = offlineTmpFile.Write(offlineFileData); err != nil {
log.Errorln("unable to write offline segment to disk", err)
}
offlineFilePath := offlineTmpFile.Name()
transcoder.StopThumbnailGenerator() transcoder.StopThumbnailGenerator()
rtmp.Disconnect() rtmp.Disconnect()
@ -111,69 +99,12 @@ func SetStreamAsDisconnected() {
if _currentBroadcast == nil { if _currentBroadcast == nil {
stopOnlineCleanupTimer() stopOnlineCleanupTimer()
transitionToOfflineVideoStreamContent() transitionToOfflineVideoStreamContent()
log.Errorln("unexpected nil _currentBroadcast")
return return
} }
for index := range _currentBroadcast.OutputSettings { for index := range _currentBroadcast.OutputSettings {
playlistFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/stream.m3u8"), index) makeVariantIndexOffline(index, offlineFilePath, offlineFilename)
segmentFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/%s"), index, offlineFilename)
if err := utils.Copy(offlineFilePath, segmentFilePath); err != nil {
log.Warnln(err)
}
if _, err := _storage.Save(segmentFilePath, 0); err != nil {
log.Warnln(err)
}
if utils.DoesFileExists(playlistFilePath) {
f, err := os.OpenFile(playlistFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm) //nolint
if err != nil {
log.Errorln(err)
}
defer f.Close()
playlist, _, err := m3u8.DecodeFrom(bufio.NewReader(f), true)
if err != nil {
log.Fatalln(err)
}
variantPlaylist := playlist.(*m3u8.MediaPlaylist)
if len(variantPlaylist.Segments) > data.GetStreamLatencyLevel().SegmentCount {
variantPlaylist.Segments = variantPlaylist.Segments[:len(variantPlaylist.Segments)]
}
if err := variantPlaylist.Append(offlineFilename, 8.0, ""); err != nil {
log.Fatalln(err)
}
if err := variantPlaylist.SetDiscontinuity(); err != nil {
log.Fatalln(err)
}
if _, err := f.WriteAt(variantPlaylist.Encode().Bytes(), 0); err != nil {
log.Errorln(err)
}
} else {
p, err := m3u8.NewMediaPlaylist(1, 1)
if err != nil {
log.Errorln(err)
}
// If "offline" content gets changed then change the duration below
if err := p.Append(offlineFilename, 8.0, ""); err != nil {
log.Errorln(err)
}
p.Close()
f, err := os.Create(playlistFilePath)
if err != nil {
log.Errorln(err)
}
defer f.Close()
if _, err := f.Write(p.Encode().Bytes()); err != nil {
log.Errorln(err)
}
}
if _, err := _storage.Save(playlistFilePath, 0); err != nil {
log.Warnln(err)
}
} }
StartOfflineCleanupTimer() StartOfflineCleanupTimer()

View File

@ -35,6 +35,7 @@ type Transcoder struct {
currentStreamOutputSettings []models.StreamOutputVariant currentStreamOutputSettings []models.StreamOutputVariant
currentLatencyLevel models.LatencyLevel currentLatencyLevel models.LatencyLevel
isEvent bool
TranscoderCompleted func(error) TranscoderCompleted func(error)
} }
@ -153,6 +154,16 @@ func (t *Transcoder) Start() {
} }
} }
// SetLatencyLevel will set the latency level for the instance of the transcoder.
func (t *Transcoder) SetLatencyLevel(level models.LatencyLevel) {
t.currentLatencyLevel = level
}
// SetIsEvent will allow you to set a stream as an "event".
func (t *Transcoder) SetIsEvent(isEvent bool) {
t.isEvent = isEvent
}
func (t *Transcoder) getString() string { func (t *Transcoder) getString() string {
port := t.internalListenerPort port := t.internalListenerPort
localListenerAddress := "http://127.0.0.1:" + port localListenerAddress := "http://127.0.0.1:" + port
@ -170,6 +181,14 @@ func (t *Transcoder) getString() string {
t.segmentIdentifier = shortid.MustGenerate() t.segmentIdentifier = shortid.MustGenerate()
} }
hlsEventString := ""
if t.isEvent {
hlsEventString = "-hls_playlist_type event"
} else {
// Don't let the transcoder close the playlist. We do it manually.
hlsOptionFlags = append(hlsOptionFlags, "omit_endlist")
}
hlsOptionsString := "" hlsOptionsString := ""
if len(hlsOptionFlags) > 0 { if len(hlsOptionFlags) > 0 {
hlsOptionsString = "-hls_flags " + strings.Join(hlsOptionFlags, "+") hlsOptionsString = "-hls_flags " + strings.Join(hlsOptionFlags, "+")
@ -191,6 +210,7 @@ func (t *Transcoder) getString() string {
"-hls_time", strconv.Itoa(t.currentLatencyLevel.SecondsPerSegment), // Length of each segment "-hls_time", strconv.Itoa(t.currentLatencyLevel.SecondsPerSegment), // Length of each segment
"-hls_list_size", strconv.Itoa(t.currentLatencyLevel.SegmentCount), // Max # in variant playlist "-hls_list_size", strconv.Itoa(t.currentLatencyLevel.SegmentCount), // Max # in variant playlist
hlsOptionsString, hlsOptionsString,
hlsEventString,
"-segment_format_options", "mpegts_flags=mpegts_copyts=1", "-segment_format_options", "mpegts_flags=mpegts_copyts=1",
// Video settings // Video settings
@ -411,11 +431,6 @@ func (t *Transcoder) SetOutputPath(output string) {
t.segmentOutputPath = output t.segmentOutputPath = output
} }
// SetAppendToStream enables appending to the HLS stream instead of overwriting.
func (t *Transcoder) SetAppendToStream(append bool) {
t.appendToStream = append
}
// SetIdentifier enables appending a unique identifier to segment file name. // SetIdentifier enables appending a unique identifier to segment file name.
func (t *Transcoder) SetIdentifier(output string) { func (t *Transcoder) SetIdentifier(output string) {
t.segmentIdentifier = output t.segmentIdentifier = output