HLS video handling/storage/state refactor (#151)

* WIP with new transcoder progress monitor

* A whole different WIP in progress monitoring via local PUTs

* Use an actual hls playlist parser to rewrite master playlist

* Cleanup

* Private vs public path for thumbnail generation

* Allow each storage provider to make decisions of how to store different types of files

* Simplify inbound file writes

* Revert

* Split out set stream as connected/disconnected state methods

* Update videojs

* Add comment about the hls handler

* Rework of the offline stream state.  For #85

* Delete old unreferenced video segment files from disk

* Cleanup all segments and revert to a completely offline state after 5min

* Stop thumbnail generation on stream stop. Copy logo to thumbnail on cleanup.

* Update transcoder test

* Add comment

* Return http 200 on success to transcoder. Tweak how files are written to disk

* Force pixel color format in transcoder

* Add debugging info for S3 transfers. Add default ACL.

* Fix cleanup timer

* Reset session stats when we cleanup the session.

* Put log file back

* Update test

* File should not be a part of this commit

* Add centralized shared performance timer for use anywhere

* Post-rebase cleanup

* Support returning nil from storage provider save

* Updates to reflect package changes + other updates in master

* Fix storage providers being overwritten

* Do not return pointer in save. Support cache headers with S3 providers

* Split out videojs + vhs and point to specific working versions of them

* Bump vjs and vhs versions

* Fix test

* Remove unused

* Update upload warning message

* No longer valid comment

* Pin videojs and vhs versions
This commit is contained in:
Gabe Kangas
2020-10-14 14:07:38 -07:00
committed by GitHub
parent 57f2e4b567
commit 6ea9affce0
43 changed files with 30296 additions and 56701 deletions

View File

@@ -5,7 +5,6 @@ import (
"path"
"path/filepath"
"strconv"
"time"
log "github.com/sirupsen/logrus"
@@ -18,13 +17,16 @@ import (
)
var (
_stats *models.Stats
_storage models.ChunkStorageProvider
_cleanupTimer *time.Timer
_yp *yp.YP
_broadcaster *models.Broadcaster
_stats *models.Stats
_storage models.StorageProvider
_transcoder *ffmpeg.Transcoder
_yp *yp.YP
_broadcaster *models.Broadcaster
)
var handler ffmpeg.HLSHandler
var fileWriter = ffmpeg.FileWriterReceiverService{}
//Start starts up the core processing
func Start() error {
resetDirectories()
@@ -39,6 +41,13 @@ func Start() error {
return err
}
// The HLS handler takes the written HLS playlists and segments
// and makes storage decisions. It's rather simple right now
// but will play more useful when recordings come into play.
handler = ffmpeg.HLSHandler{}
handler.Storage = _storage
fileWriter.SetupFileWriterReceiverService(&handler)
if err := createInitialOfflineState(); err != nil {
log.Error("failed to create the initial offline state")
return err
@@ -63,31 +72,26 @@ func createInitialOfflineState() error {
}
}
ffmpeg.ShowStreamOfflineState()
transitionToOfflineVideoStreamContent()
return nil
}
func startCleanupTimer() {
_cleanupTimer = time.NewTimer(5 * time.Minute)
go func() {
for {
select {
case <-_cleanupTimer.C:
// Reset the session count since the session is over
_stats.SessionMaxViewerCount = 0
resetDirectories()
ffmpeg.ShowStreamOfflineState()
}
}
}()
}
// transitionToOfflineVideoStreamContent will overwrite the current stream with the
// offline video stream state only. No live stream HLS segments will continue to be
// referenced.
func transitionToOfflineVideoStreamContent() {
log.Traceln("Firing transcoder with offline stream state")
// StopCleanupTimer will stop the previous cleanup timer
func stopCleanupTimer() {
if _cleanupTimer != nil {
_cleanupTimer.Stop()
}
offlineFilename := "offline.ts"
offlineFilePath := "static/" + offlineFilename
_transcoder := ffmpeg.NewTranscoder()
_transcoder.SetSegmentLength(10)
_transcoder.SetInput(offlineFilePath)
_transcoder.Start()
// Copy the logo to be the thumbnail
utils.Copy(filepath.Join("webroot", config.Config.InstanceDetails.Logo.Large), "webroot/thumbnail.jpg")
}
func resetDirectories() {
@@ -112,4 +116,7 @@ func resetDirectories() {
os.MkdirAll(path.Join(config.PrivateHLSStoragePath, strconv.Itoa(0)), 0777)
os.MkdirAll(path.Join(config.PublicHLSStoragePath, strconv.Itoa(0)), 0777)
}
// Remove the previous thumbnail
utils.Copy(config.Config.InstanceDetails.Logo.Large, "webroot/thumbnail.jpg")
}

View File

@@ -1,14 +0,0 @@
package ffmpeg
import (
"github.com/owncast/owncast/config"
)
//ShowStreamOfflineState generates and shows the stream's offline state
func ShowStreamOfflineState() {
transcoder := NewTranscoder()
transcoder.SetSegmentLength(10)
transcoder.SetAppendToStream(true)
transcoder.SetInput(config.Config.GetOfflineContentPath())
transcoder.Start()
}

View File

@@ -0,0 +1,103 @@
package ffmpeg
import (
"bytes"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"net/http"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
// FileWriterReceiverServiceCallback are to be fired when transcoder responses are written to disk
type FileWriterReceiverServiceCallback interface {
SegmentWritten(localFilePath string)
VariantPlaylistWritten(localFilePath string)
MasterPlaylistWritten(localFilePath string)
}
// FileWriterReceiverService accepts transcoder responses via HTTP and fires the callbacks
type FileWriterReceiverService struct {
callbacks FileWriterReceiverServiceCallback
}
// SetupFileWriterReceiverService will start listening for transcoder responses
func (s *FileWriterReceiverService) SetupFileWriterReceiverService(callbacks FileWriterReceiverServiceCallback) {
s.callbacks = callbacks
httpServer := http.NewServeMux()
httpServer.HandleFunc("/", s.uploadHandler)
localListenerAddress := "127.0.0.1:" + strconv.Itoa(config.Config.GetPublicWebServerPort()+1)
go http.ListenAndServe(localListenerAddress, httpServer)
log.Traceln("Transcoder response listening on: " + localListenerAddress)
}
func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
path := r.URL.Path
writePath := filepath.Join(config.PrivateHLSStoragePath, path)
var buf bytes.Buffer
io.Copy(&buf, r.Body)
data := buf.Bytes()
f, err := os.Create(writePath)
if err != nil {
returnError(err, w, r)
return
}
defer f.Close()
_, err = f.Write(data)
if err != nil {
returnError(err, w, r)
return
}
s.fileWritten(writePath)
w.WriteHeader(http.StatusOK)
}
var _inWarningState = false
func (s *FileWriterReceiverService) fileWritten(path string) {
index := utils.GetIndexFromFilePath(path)
if utils.GetRelativePathFromAbsolutePath(path) == "hls/stream.m3u8" {
s.callbacks.MasterPlaylistWritten(path)
} else if strings.HasSuffix(path, ".ts") {
performanceMonitorKey := "segmentWritten-" + index
averagePerformance := utils.GetAveragePerformance(performanceMonitorKey)
utils.StartPerformanceMonitor(performanceMonitorKey)
s.callbacks.SegmentWritten(path)
if averagePerformance != 0 && averagePerformance > float64(float64(config.Config.GetVideoSegmentSecondsLength())) {
if !_inWarningState {
log.Warnln("slow encoding for variant", index, "if this continues you may see buffering or errors. troubleshoot this issue by visiting https://owncast.online/docs/troubleshooting/")
_inWarningState = true
}
} else {
_inWarningState = false
}
} else if strings.HasSuffix(path, ".m3u8") {
s.callbacks.VariantPlaylistWritten(path)
}
}
func returnError(err error, w http.ResponseWriter, r *http.Request) {
log.Errorln(err)
http.Error(w, http.StatusText(http.StatusInternalServerError)+": "+err.Error(), http.StatusInternalServerError)
}

View File

@@ -0,0 +1,63 @@
package ffmpeg
import (
log "github.com/sirupsen/logrus"
"os"
"path/filepath"
"sort"
"github.com/owncast/owncast/config"
)
// Cleanup will delete old files off disk that are no longer being referenced
// in the stream.
func Cleanup(directoryPath string) {
// Determine how many files we should keep on disk
maxNumber := config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist()
buffer := 10
files, err := getSegmentFiles(directoryPath)
if err != nil {
log.Fatal(err)
}
if len(files) < maxNumber+buffer {
return
}
// Delete old files on disk
filesToDelete := files[maxNumber+buffer:]
for _, file := range filesToDelete {
os.Remove(filepath.Join(directoryPath, file.Name()))
}
}
func getSegmentFiles(dirname string) ([]os.FileInfo, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
list, err := f.Readdir(-1) // -1 says to get a list of all files
f.Close()
if err != nil {
return nil, err
}
filteredList := make([]os.FileInfo, 0)
// Filter out playlists because we don't want to clean them up
for _, file := range list {
if filepath.Ext(file.Name()) == ".m3u8" {
continue
}
filteredList = append(filteredList, file)
}
// Sort by date so we can delete old files
sort.Slice(filteredList, func(i, j int) bool {
return filteredList[i].ModTime().UnixNano() > filteredList[j].ModTime().UnixNano()
})
return filteredList, nil
}

25
core/ffmpeg/hlsHandler.go Normal file
View File

@@ -0,0 +1,25 @@
package ffmpeg
import (
"github.com/owncast/owncast/models"
)
// HLSHandler gets told about available HLS playlists and segments
type HLSHandler struct {
Storage models.StorageProvider
}
// SegmentWritten is fired when a HLS segment is written to disk
func (h *HLSHandler) SegmentWritten(localFilePath string) {
h.Storage.SegmentWritten(localFilePath)
}
// VariantPlaylistWritten is fired when a HLS variant playlist is written to disk
func (h *HLSHandler) VariantPlaylistWritten(localFilePath string) {
h.Storage.VariantPlaylistWritten(localFilePath)
}
// MasterPlaylistWritten is fired when a HLS master playlist is written to disk
func (h *HLSHandler) MasterPlaylistWritten(localFilePath string) {
h.Storage.MasterPlaylistWritten(localFilePath)
}

View File

@@ -13,36 +13,44 @@ import (
"github.com/owncast/owncast/config"
)
var _timer *time.Ticker
func StopThumbnailGenerator() {
if _timer != nil {
_timer.Stop()
}
}
//StartThumbnailGenerator starts generating thumbnails
func StartThumbnailGenerator(chunkPath string, variantIndex int) {
// Every 20 seconds create a thumbnail from the most
// recent video segment.
ticker := time.NewTicker(20 * time.Second)
_timer = time.NewTicker(20 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
case <-_timer.C:
if err := fireThumbnailGenerator(chunkPath, variantIndex); err != nil {
log.Errorln("Unable to generate thumbnail:", err)
}
case <-quit:
//TODO: evaluate if this is ever stopped
log.Debug("thumbnail generator has stopped")
ticker.Stop()
_timer.Stop()
return
}
}
}()
}
func fireThumbnailGenerator(chunkPath string, variantIndex int) error {
func fireThumbnailGenerator(segmentPath string, variantIndex int) error {
// JPG takes less time to encode than PNG
outputFile := path.Join(config.WebRoot, "thumbnail.jpg")
previewGifFile := path.Join(config.WebRoot, "preview.gif")
framePath := path.Join(chunkPath, strconv.Itoa(variantIndex))
framePath := path.Join(segmentPath, strconv.Itoa(variantIndex))
files, err := ioutil.ReadDir(framePath)
if err != nil {
return err

View File

@@ -3,7 +3,6 @@ package ffmpeg
import (
"fmt"
"os/exec"
"path"
"strconv"
"strings"
@@ -27,6 +26,8 @@ type Transcoder struct {
appendToStream bool
ffmpegPath string
segmentIdentifier string
internalListenerPort int
TranscoderCompleted func(error)
}
// HLSVariant is a combination of settings that results in a single HLS stream
@@ -91,16 +92,27 @@ func (t *Transcoder) Start() {
log.Panicln(err, command)
}
err = _commandExec.Wait()
if t.TranscoderCompleted != nil {
t.TranscoderCompleted(err)
}
return
}
func (t *Transcoder) getString() string {
hlsOptionFlags := []string{
"delete_segments",
"program_date_time",
"temp_file",
var port int
if config.Config != nil {
port = config.Config.GetPublicWebServerPort() + 1
} else if t.internalListenerPort != 0 {
port = t.internalListenerPort
} else {
log.Panicln("A internal port must be set for transcoder callback")
}
localListenerAddress := "http://127.0.0.1:" + strconv.Itoa(port)
hlsOptionFlags := []string{}
if t.appendToStream {
hlsOptionFlags = append(hlsOptionFlags, "append_list")
}
@@ -109,32 +121,43 @@ func (t *Transcoder) getString() string {
t.segmentIdentifier = shortid.MustGenerate()
}
hlsOptionsString := ""
if len(hlsOptionFlags) > 0 {
hlsOptionsString = "-hls_flags " + strings.Join(hlsOptionFlags, "+")
}
ffmpegFlags := []string{
"cat", t.input, "|",
t.ffmpegPath,
"-hide_banner",
"-i pipe:",
"-loglevel warning",
"-i ", t.input,
t.getVariantsString(),
// HLS Output
"-f", "hls",
"-hls_time", strconv.Itoa(t.segmentLengthSeconds), // Length of each segment
"-hls_list_size", strconv.Itoa(t.hlsPlaylistLength), // Max # in variant playlist
"-hls_delete_threshold", "10", // Start deleting files after hls_list_size + 10
"-hls_flags", strings.Join(hlsOptionFlags, "+"), // Specific options in HLS generation
hlsOptionsString,
// Video settings
"-tune", "zerolatency", // Option used for good for fast encoding and low-latency streaming (always includes iframes in each segment)
// "-profile:v", "high", // Main for standard definition (SD) to 640×480, High for high definition (HD) to 1920×1080
"-pix_fmt", "yuv420p", // Force yuv420p color format
"-profile:v", "high", // Main for standard definition (SD) to 640×480, High for high definition (HD) to 1920×1080
"-sc_threshold", "0", // Disable scene change detection for creating segments
// Filenames
"-master_pl_name", "stream.m3u8",
"-strftime 1", // Support the use of strftime in filenames
"-hls_segment_filename", path.Join(t.segmentOutputPath, "/%v/stream-%s-"+t.segmentIdentifier+".ts"), // Each segment's filename
"-strftime 1", // Support the use of strftime in filenames
"-hls_segment_filename", localListenerAddress + "/%v/stream-" + t.segmentIdentifier + "%s.ts", // Send HLS segments back to us over HTTP
"-max_muxing_queue_size", "400", // Workaround for Too many packets error: https://trac.ffmpeg.org/ticket/6375?cversion=0
path.Join(t.segmentOutputPath, "/%v/stream.m3u8"), // Each variant's playlist
"2> transcoder.log",
"-method PUT -http_persistent 1", // HLS results sent back to us will be over PUTs
"-fflags +genpts", // Generate presentation time stamp if missing
localListenerAddress + "/%v/stream.m3u8", // Send HLS playlists back to us over HTTP
"2> transcoder.log", // Log to a file for debugging
}
return strings.Join(ffmpegFlags, " ")
@@ -180,7 +203,7 @@ func getVariantFromConfigQuality(quality config.StreamQuality, index int) HLSVar
}
// NewTranscoder will return a new Transcoder, populated by the config
func NewTranscoder() Transcoder {
func NewTranscoder() *Transcoder {
transcoder := new(Transcoder)
transcoder.ffmpegPath = config.Config.GetFFMpegPath()
transcoder.hlsPlaylistLength = config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist()
@@ -207,7 +230,7 @@ func NewTranscoder() Transcoder {
transcoder.AddVariant(variant)
}
return *transcoder
return transcoder
}
// Uses `map` https://www.ffmpeg.org/ffmpeg-all.html#Stream-specifiers-1 https://www.ffmpeg.org/ffmpeg-all.html#Advanced-options
@@ -364,3 +387,7 @@ func (t *Transcoder) SetAppendToStream(append bool) {
func (t *Transcoder) SetIdentifier(output string) {
t.segmentIdentifier = output
}
func (t *Transcoder) SetInternalHTTPPort(port int) {
t.internalListenerPort = port
}

View File

@@ -12,6 +12,7 @@ func TestFFmpegCommand(t *testing.T) {
transcoder.SetOutputPath("fakeOutput")
transcoder.SetHLSPlaylistLength(10)
transcoder.SetIdentifier("jdofFGg")
transcoder.SetInternalHTTPPort(8123)
variant := HLSVariant{}
variant.videoBitrate = 1200
@@ -23,7 +24,7 @@ func TestFFmpegCommand(t *testing.T) {
cmd := transcoder.getString()
expected := `cat fakecontent.flv | /fake/path/ffmpeg -hide_banner -i pipe: -map v:0 -c:v:0 libx264 -b:v:0 1200k -maxrate:v:0 1272k -bufsize:v:0 1440k -g:v:0 119 -x264-params:v:0 "scenecut=0:open_gop=0:min-keyint=119:keyint=119" -map a:0 -c:a:0 copy -r 30 -preset veryfast -var_stream_map "v:0,a:0 " -f hls -hls_time 4 -hls_list_size 10 -hls_delete_threshold 10 -hls_flags delete_segments+program_date_time+temp_file -tune zerolatency -sc_threshold 0 -master_pl_name stream.m3u8 -strftime 1 -hls_segment_filename fakeOutput/%v/stream-%s-jdofFGg.ts -max_muxing_queue_size 400 fakeOutput/%v/stream.m3u8 2> transcoder.log`
expected := `/fake/path/ffmpeg -hide_banner -loglevel warning -i fakecontent.flv -map v:0 -c:v:0 libx264 -b:v:0 1200k -maxrate:v:0 1272k -bufsize:v:0 1440k -g:v:0 119 -x264-params:v:0 "scenecut=0:open_gop=0:min-keyint=119:keyint=119" -map a:0 -c:a:0 copy -r 30 -preset veryfast -var_stream_map "v:0,a:0 " -f hls -hls_time 4 -hls_list_size 10 -hls_delete_threshold 10 -tune zerolatency -pix_fmt yuv420p -profile:v high -sc_threshold 0 -master_pl_name stream.m3u8 -strftime 1 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdofFGg%s.ts -max_muxing_queue_size 400 -method PUT -http_persistent 1 -fflags +genpts http://127.0.0.1:8123/%v/stream.m3u8 2> transcoder.log`
if cmd != expected {
t.Errorf("ffmpeg command does not match expected. Got %s, want: %s", cmd, expected)

View File

@@ -1,157 +0,0 @@
package playlist
import (
"io/ioutil"
"path"
"path/filepath"
"strconv"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/radovskyb/watcher"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
)
var (
_storage models.ChunkStorageProvider
variants []models.Variant
)
//StartVideoContentMonitor starts the video content monitor
func StartVideoContentMonitor(storage models.ChunkStorageProvider) error {
_storage = storage
pathToMonitor := config.PrivateHLSStoragePath
// Create at least one structure to store the segments for the different stream variants
variants = make([]models.Variant, len(config.Config.VideoSettings.StreamQualities))
if len(config.Config.VideoSettings.StreamQualities) > 0 {
for index := range variants {
variants[index] = models.Variant{
VariantIndex: index,
Segments: make(map[string]*models.Segment),
}
}
} else {
variants[0] = models.Variant{
VariantIndex: 0,
Segments: make(map[string]*models.Segment),
}
}
// log.Printf("Using directory %s for storing files with %d variants...\n", pathToMonitor, len(variants))
w := watcher.New()
go func() {
for {
select {
case event := <-w.Event:
relativePath := utils.GetRelativePathFromAbsolutePath(event.Path)
if path.Ext(relativePath) == ".tmp" {
continue
}
// Ignore removals
if event.Op == watcher.Remove {
continue
}
// Handle updates to the master playlist by copying it to webroot
if relativePath == path.Join(config.PrivateHLSStoragePath, "stream.m3u8") {
utils.Copy(event.Path, path.Join(config.PublicHLSStoragePath, "stream.m3u8"))
} else if filepath.Ext(event.Path) == ".m3u8" {
// Handle updates to playlists, but not the master playlist
updateVariantPlaylist(event.Path)
} else if filepath.Ext(event.Path) == ".ts" {
segment, err := getSegmentFromPath(event.Path)
if err != nil {
log.Error("failed to get the segment from path")
panic(err)
}
newObjectPathChannel := make(chan string, 1)
go func() {
newObjectPath, err := storage.Save(path.Join(config.PrivateHLSStoragePath, segment.RelativeUploadPath), 0)
if err != nil {
log.Errorln("failed to save the file to the chunk storage.", err)
}
newObjectPathChannel <- newObjectPath
}()
newObjectPath := <-newObjectPathChannel
segment.RemoteID = newObjectPath
// fmt.Println("Uploaded", segment.RelativeUploadPath, "as", newObjectPath)
variants[segment.VariantIndex].Segments[filepath.Base(segment.RelativeUploadPath)] = &segment
// Force a variant's playlist to be updated after a file is uploaded.
associatedVariantPlaylist := strings.ReplaceAll(event.Path, path.Base(event.Path), "stream.m3u8")
updateVariantPlaylist(associatedVariantPlaylist)
}
case err := <-w.Error:
panic(err)
case <-w.Closed:
return
}
}
}()
// Watch the hls segment storage folder recursively for changes.
w.FilterOps(watcher.Write, watcher.Rename, watcher.Create)
if err := w.AddRecursive(pathToMonitor); err != nil {
return err
}
return w.Start(time.Millisecond * 200)
}
func getSegmentFromPath(fullDiskPath string) (models.Segment, error) {
segment := models.Segment{
FullDiskPath: fullDiskPath,
RelativeUploadPath: utils.GetRelativePathFromAbsolutePath(fullDiskPath),
}
index, err := strconv.Atoi(segment.RelativeUploadPath[0:1])
if err != nil {
return segment, err
}
segment.VariantIndex = index
return segment, nil
}
func getVariantIndexFromPath(fullDiskPath string) (int, error) {
return strconv.Atoi(fullDiskPath[0:1])
}
func updateVariantPlaylist(fullPath string) error {
relativePath := utils.GetRelativePathFromAbsolutePath(fullPath)
variantIndex, err := getVariantIndexFromPath(relativePath)
if err != nil {
return err
}
variant := variants[variantIndex]
playlistBytes, err := ioutil.ReadFile(fullPath)
if err != nil {
return err
}
playlistString := string(playlistBytes)
playlistString = _storage.GenerateRemotePlaylist(playlistString, variant)
return WritePlaylist(playlistString, path.Join(config.PublicHLSStoragePath, relativePath))
}

View File

@@ -17,7 +17,6 @@ import (
"github.com/nareix/joy5/format/rtmp"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/core/ffmpeg"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
)
@@ -27,7 +26,6 @@ var (
_isConnected = false
)
var _transcoder ffmpeg.Transcoder
var _pipe *os.File
var _rtmpConnection net.Conn
@@ -115,9 +113,6 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) {
pipePath := utils.GetTemporaryPipePath()
syscall.Mkfifo(pipePath, 0666)
_transcoder = ffmpeg.NewTranscoder()
go _transcoder.Start()
_isConnected = true
core.SetStreamAsConnected()
_rtmpConnection = nc
@@ -153,9 +148,6 @@ func handleDisconnect(conn net.Conn) {
conn.Close()
_pipe.Close()
_isConnected = false
_transcoder.Stop()
_rtmpConnection = nil
core.SetStreamAsDisconnected()
}
// Disconnect will force disconnect the current inbound RTMP connection.

View File

@@ -115,7 +115,9 @@ func SetClientActive(client models.Client) {
func RemoveClient(clientID string) {
log.Trace("Removing the client:", clientID)
l.Lock()
delete(_stats.Clients, clientID)
l.Unlock()
}
func GetClients() []models.Client {

View File

@@ -1,12 +1,7 @@
package core
import (
"time"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/ffmpeg"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
)
//GetStatus gets the status of the system
@@ -25,40 +20,6 @@ func GetStatus() models.Status {
}
}
//SetStreamAsConnected sets the stream as connected
func SetStreamAsConnected() {
stopCleanupTimer()
_stats.StreamConnected = true
_stats.LastConnectTime = utils.NullTime{time.Now(), true}
_stats.LastDisconnectTime = utils.NullTime{time.Now(), false}
chunkPath := config.PublicHLSStoragePath
if usingExternalStorage {
chunkPath = config.PrivateHLSStoragePath
}
if _yp != nil {
_yp.Start()
}
ffmpeg.StartThumbnailGenerator(chunkPath, config.Config.VideoSettings.HighestQualityStreamIndex)
}
//SetStreamAsDisconnected sets the stream as disconnected
func SetStreamAsDisconnected() {
_stats.StreamConnected = false
_stats.LastDisconnectTime = utils.NullTime{time.Now(), true}
_broadcaster = nil
if _yp != nil {
_yp.Stop()
}
ffmpeg.ShowStreamOfflineState()
startCleanupTimer()
}
// SetBroadcaster will store the current inbound broadcasting details
func SetBroadcaster(broadcaster models.Broadcaster) {
_broadcaster = &broadcaster

View File

@@ -2,7 +2,6 @@ package core
import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/playlist"
"github.com/owncast/owncast/core/storageproviders"
)
@@ -11,17 +10,16 @@ var (
)
func setupStorage() error {
handler.Storage = _storage
if config.Config.S3.Enabled {
_storage = &storageproviders.S3Storage{}
usingExternalStorage = true
} else {
_storage = &storageproviders.LocalStorage{}
}
if usingExternalStorage {
if err := _storage.Setup(); err != nil {
return err
}
go playlist.StartVideoContentMonitor(_storage)
if err := _storage.Setup(); err != nil {
return err
}
return nil

View File

@@ -0,0 +1,63 @@
package storageproviders
import (
"path/filepath"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/ffmpeg"
"github.com/owncast/owncast/utils"
)
type LocalStorage struct {
}
// Setup configures this storage provider
func (s *LocalStorage) Setup() error {
// no-op
return nil
}
// SegmentWritten is called when a single segment of video is written
func (s *LocalStorage) SegmentWritten(localFilePath string) {
s.Save(localFilePath, 0)
}
// VariantPlaylistWritten is called when a variant hls playlist is written
func (s *LocalStorage) VariantPlaylistWritten(localFilePath string) {
_, error := s.Save(localFilePath, 0)
if error != nil {
log.Errorln(error)
return
}
}
// MasterPlaylistWritten is called when the master hls playlist is written
func (s *LocalStorage) MasterPlaylistWritten(localFilePath string) {
s.Save(localFilePath, 0)
}
// Save will save a local filepath using the storage provider
func (s *LocalStorage) Save(filePath string, retryCount int) (string, error) {
newPath := ""
// This is a hack
if filePath == "hls/stream.m3u8" {
newPath = filepath.Join(config.PublicHLSStoragePath, filepath.Base(filePath))
} else {
newPath = filepath.Join(config.WebRoot, filePath)
}
// Move video segments to the destination directory.
// Copy playlists to the destination directory so they can still be referenced in
// the private hls working directory.
if filepath.Ext(filePath) == ".m3u8" {
utils.Copy(filePath, newPath)
} else {
utils.Move(filePath, newPath)
ffmpeg.Cleanup(filepath.Dir(newPath))
}
return newPath, nil
}

View File

@@ -4,8 +4,10 @@ import (
"bufio"
"fmt"
"os"
"strings"
"path/filepath"
"github.com/owncast/owncast/core/playlist"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
@@ -14,9 +16,14 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/models"
"github.com/grafov/m3u8"
)
// If we try to upload a playlist but it is not yet on disk
// then keep a reference to it here.
var _queuedPlaylistUpdates = make(map[string]string, 0)
//S3Storage is the s3 implementation of the ChunkStorageProvider
type S3Storage struct {
sess *session.Session
@@ -31,10 +38,18 @@ type S3Storage struct {
s3ACL string
}
var _uploader *s3manager.Uploader
//Setup sets up the s3 storage for saving the video to s3
func (s *S3Storage) Setup() error {
log.Trace("Setting up S3 for external storage of video...")
if config.Config.S3.ServingEndpoint != "" {
s.host = config.Config.S3.ServingEndpoint
} else {
s.host = fmt.Sprintf("%s/%s", config.Config.S3.Endpoint, config.Config.S3.Bucket)
}
s.s3Endpoint = config.Config.S3.Endpoint
s.s3ServingEndpoint = config.Config.S3.ServingEndpoint
s.s3Region = config.Config.S3.Region
@@ -45,68 +60,113 @@ func (s *S3Storage) Setup() error {
s.sess = s.connectAWS()
_uploader = s3manager.NewUploader(s.sess)
return nil
}
//Save saves the file to the s3 bucket
func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
// fmt.Println("Saving", filePath)
// SegmentWritten is called when a single segment of video is written
func (s *S3Storage) SegmentWritten(localFilePath string) {
index := utils.GetIndexFromFilePath(localFilePath)
performanceMonitorKey := "s3upload-" + index
utils.StartPerformanceMonitor(performanceMonitorKey)
// Upload the segment
_, error := s.Save(localFilePath, 0)
if error != nil {
log.Errorln(error)
return
}
averagePerformance := utils.GetAveragePerformance(performanceMonitorKey)
// Warn the user about long-running save operations
if averagePerformance != 0 {
if averagePerformance > float64(config.Config.GetVideoSegmentSecondsLength())*0.9 {
log.Warnln("Possible slow uploads: average upload S3 save duration", averagePerformance, "ms. troubleshoot this issue by visiting https://owncast.online/docs/troubleshooting/")
}
log.Traceln(localFilePath, "uploaded to S3")
}
// Upload the variant playlist for this segment
// so the segments and the HLS playlist referencing
// them are in sync.
playlist := filepath.Join(filepath.Dir(localFilePath), "stream.m3u8")
_, error = s.Save(playlist, 0)
if error != nil {
_queuedPlaylistUpdates[playlist] = playlist
if pErr, ok := error.(*os.PathError); ok {
log.Debugln(pErr.Path, "does not yet exist locally when trying to upload to S3 storage.")
return
}
}
// If a segment file was successfully uploaded then we can delete
// it from the local filesystem.
os.Remove(localFilePath)
}
// VariantPlaylistWritten is called when a variant hls playlist is written
func (s *S3Storage) VariantPlaylistWritten(localFilePath string) {
// We are uploading the variant playlist after uploading the segment
// to make sure we're not refering to files in a playlist that don't
// yet exist. See SegmentWritten.
if _, ok := _queuedPlaylistUpdates[localFilePath]; ok {
_, error := s.Save(localFilePath, 0)
if error != nil {
log.Errorln(error)
_queuedPlaylistUpdates[localFilePath] = localFilePath
}
delete(_queuedPlaylistUpdates, localFilePath)
}
}
// MasterPlaylistWritten is called when the master hls playlist is written
func (s *S3Storage) MasterPlaylistWritten(localFilePath string) {
// Rewrite the playlist to use absolute remote S3 URLs
s.rewriteRemotePlaylist(localFilePath)
}
// Save saves the file to the s3 bucket
func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", err
}
defer file.Close()
uploader := s3manager.NewUploader(s.sess)
maxAgeSeconds := utils.GetCacheDurationSecondsForPath(filePath)
cacheControlHeader := fmt.Sprintf("Cache-Control: max-age=%d", maxAgeSeconds)
uploadInput := &s3manager.UploadInput{
Bucket: aws.String(s.s3Bucket), // Bucket to be used
Key: aws.String(filePath), // Name of the file to be saved
Body: file, // File
Bucket: aws.String(s.s3Bucket), // Bucket to be used
Key: aws.String(filePath), // Name of the file to be saved
Body: file, // File
CacheControl: &cacheControlHeader,
}
if s.s3ACL != "" {
uploadInput.ACL = aws.String(s.s3ACL)
} else {
// Default ACL
uploadInput.ACL = aws.String("public-read")
}
response, err := uploader.Upload(uploadInput)
response, err := _uploader.Upload(uploadInput)
if err != nil {
log.Trace("error uploading:", err.Error())
log.Traceln("error uploading:", filePath, err.Error())
if retryCount < 4 {
log.Trace("Retrying...")
log.Traceln("Retrying...")
return s.Save(filePath, retryCount+1)
} else {
log.Warnln("Giving up on", filePath, err)
return "", fmt.Errorf("Giving up on %s", filePath)
}
}
// fmt.Println("Uploaded", filePath, "to", response.Location)
return response.Location, nil
}
//GenerateRemotePlaylist implements the 'GenerateRemotePlaylist' method
func (s *S3Storage) GenerateRemotePlaylist(playlist string, variant models.Variant) string {
var newPlaylist = ""
scanner := bufio.NewScanner(strings.NewReader(playlist))
for scanner.Scan() {
line := scanner.Text()
if line[0:1] != "#" {
fullRemotePath := variant.GetSegmentForFilename(line)
if fullRemotePath == nil {
line = ""
} else if s.s3ServingEndpoint != "" {
line = fmt.Sprintf("%s/%s/%s", s.s3ServingEndpoint, config.PrivateHLSStoragePath, fullRemotePath.RelativeUploadPath)
} else {
line = fullRemotePath.RemoteID
}
}
newPlaylist = newPlaylist + line + "\n"
}
return newPlaylist
}
func (s S3Storage) connectAWS() *session.Session {
func (s *S3Storage) connectAWS() *session.Session {
creds := credentials.NewStaticCredentials(s.s3AccessKey, s.s3Secret, "")
_, err := creds.Get()
if err != nil {
@@ -127,3 +187,24 @@ func (s S3Storage) connectAWS() *session.Session {
}
return sess
}
// rewriteRemotePlaylist will take a local playlist and rewrite it to have absolute URLs to remote locations.
func (s *S3Storage) rewriteRemotePlaylist(filePath string) error {
f, err := os.Open(filePath)
if err != nil {
panic(err)
}
p := m3u8.NewMasterPlaylist()
err = p.DecodeFrom(bufio.NewReader(f), false)
for _, item := range p.Variants {
item.URI = s.host + filepath.Join("/hls", item.URI)
}
publicPath := filepath.Join(config.PublicHLSStoragePath, filepath.Base(filePath))
newPlaylist := p.String()
return playlist.WritePlaylist(newPlaylist, publicPath)
}

132
core/streamState.go Normal file
View File

@@ -0,0 +1,132 @@
package core
import (
"bufio"
"fmt"
"os"
"path/filepath"
"time"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/ffmpeg"
"github.com/owncast/owncast/utils"
"github.com/grafov/m3u8"
)
var _cleanupTimer *time.Timer
//SetStreamAsConnected sets the stream as connected
func SetStreamAsConnected() {
_stats.StreamConnected = true
_stats.LastConnectTime = utils.NullTime{time.Now(), true}
_stats.LastDisconnectTime = utils.NullTime{time.Now(), false}
StopCleanupTimer()
segmentPath := config.PublicHLSStoragePath
if config.Config.S3.Enabled {
segmentPath = config.PrivateHLSStoragePath
}
go func() {
_transcoder = ffmpeg.NewTranscoder()
_transcoder.TranscoderCompleted = func(error) {
SetStreamAsDisconnected()
}
_transcoder.Start()
}()
ffmpeg.StartThumbnailGenerator(segmentPath, config.Config.VideoSettings.HighestQualityStreamIndex)
}
//SetStreamAsDisconnected sets the stream as disconnected.
func SetStreamAsDisconnected() {
_stats.StreamConnected = false
_stats.LastDisconnectTime = utils.NullTime{time.Now(), true}
offlineFilename := "offline.ts"
offlineFilePath := "static/" + offlineFilename
ffmpeg.StopThumbnailGenerator()
for index := range config.Config.GetVideoStreamQualities() {
playlistFilePath := fmt.Sprintf(filepath.Join(config.PrivateHLSStoragePath, "%d/stream.m3u8"), index)
segmentFilePath := fmt.Sprintf(filepath.Join(config.PrivateHLSStoragePath, "%d/%s"), index, offlineFilename)
utils.Copy(offlineFilePath, segmentFilePath)
_storage.Save(segmentFilePath, 0)
if utils.DoesFileExists(playlistFilePath) {
f, err := os.OpenFile(playlistFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm)
defer f.Close()
if err != nil {
log.Errorln(err)
}
playlist, _, err := m3u8.DecodeFrom(bufio.NewReader(f), true)
variantPlaylist := playlist.(*m3u8.MediaPlaylist)
if len(variantPlaylist.Segments) > config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist() {
variantPlaylist.Segments = variantPlaylist.Segments[:len(variantPlaylist.Segments)]
}
err = variantPlaylist.Append(offlineFilename, 8.0, "")
variantPlaylist.SetDiscontinuity()
_, err = f.WriteAt(variantPlaylist.Encode().Bytes(), 0)
if err != nil {
log.Errorln(err)
}
} else {
p, err := m3u8.NewMediaPlaylist(1, 1)
if err != nil {
log.Errorln(err)
}
// If "offline" content gets changed then change the duration below
err = p.Append(offlineFilename, 8.0, "")
if err != nil {
log.Errorln(err)
}
p.Close()
f, err := os.Create(playlistFilePath)
if err != nil {
log.Errorln(err)
}
defer f.Close()
_, err = f.Write(p.Encode().Bytes())
if err != nil {
log.Errorln(err)
}
}
_storage.Save(playlistFilePath, 0)
}
StartCleanupTimer()
}
// StartCleanupTimer will fire a cleanup after n minutes being disconnected
func StartCleanupTimer() {
_cleanupTimer = time.NewTimer(5 * time.Minute)
go func() {
for {
select {
case <-_cleanupTimer.C:
// Reset the session count since the session is over
_stats.SessionMaxViewerCount = 0
resetDirectories()
transitionToOfflineVideoStreamContent()
}
}
}()
}
// StopCleanupTimer will stop the previous cleanup timer
func StopCleanupTimer() {
if _cleanupTimer != nil {
_cleanupTimer.Stop()
}
}