Project restructure (#18)

* First pass at restructuring the project; untested but it does compile

* Restructure builds and runs 🎉

* Add the dist folder to the gitignore

* Update core/playlist/monitor.go

* golint and reorganize the monitor.go file

Co-authored-by: Gabe Kangas <gabek@real-ity.com>
This commit is contained in:
Bradley Hilton
2020-06-22 20:11:56 -05:00
committed by GitHub
parent b0768de6c0
commit 487bd12444
42 changed files with 1309 additions and 1000 deletions

116
core/chat/client.go Normal file
View File

@@ -0,0 +1,116 @@
package chat
import (
"fmt"
"io"
log "github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
"github.com/gabek/owncast/models"
"github.com/gabek/owncast/utils"
)
const channelBufSize = 100
//Client represents a chat client.
type Client struct {
id string
ws *websocket.Conn
server *Server
ch chan models.ChatMessage
pingch chan models.PingMessage
doneCh chan bool
}
//NewClient creates a new chat client
func NewClient(ws *websocket.Conn, server *Server) *Client {
if ws == nil {
log.Panicln("ws cannot be nil")
}
if server == nil {
log.Panicln("server cannot be nil")
}
ch := make(chan models.ChatMessage, channelBufSize)
doneCh := make(chan bool)
pingch := make(chan models.PingMessage)
clientID := utils.GenerateClientIDFromRequest(ws.Request())
return &Client{clientID, ws, server, ch, pingch, doneCh}
}
//GetConnection gets the connection for the client
func (c *Client) GetConnection() *websocket.Conn {
return c.ws
}
func (c *Client) Write(msg models.ChatMessage) {
select {
case c.ch <- msg:
default:
c.server.Remove(c)
c.server.Err(fmt.Errorf("client %s is disconnected", c.id))
}
}
//Done marks the client as done
func (c *Client) Done() {
c.doneCh <- true
}
// Listen Write and Read request via chanel
func (c *Client) Listen() {
go c.listenWrite()
c.listenRead()
}
// Listen write request via chanel
func (c *Client) listenWrite() {
for {
select {
// Send a PING keepalive
case msg := <-c.pingch:
websocket.JSON.Send(c.ws, msg)
// send message to the client
case msg := <-c.ch:
msg.MessageType = "CHAT"
// log.Println("Send:", msg)
websocket.JSON.Send(c.ws, msg)
// receive done request
case <-c.doneCh:
c.server.Remove(c)
c.doneCh <- true // for listenRead method
return
}
}
}
// Listen read request via chanel
func (c *Client) listenRead() {
for {
select {
// receive done request
case <-c.doneCh:
c.server.Remove(c)
c.doneCh <- true // for listenWrite method
return
// read data from websocket connection
default:
var msg models.ChatMessage
if err := websocket.JSON.Receive(c.ws, &msg); err == io.EOF {
c.doneCh <- true
} else if err != nil {
c.server.Err(err)
} else {
c.server.SendToAll(msg)
}
}
}
}

170
core/chat/server.go Normal file
View File

@@ -0,0 +1,170 @@
package chat
import (
"fmt"
"net/http"
"time"
"github.com/gabek/owncast/core"
"github.com/gabek/owncast/models"
log "github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
)
//Server represents the server which handles the chat
type Server struct {
Pattern string
Messages []models.ChatMessage
Clients map[string]*Client
addCh chan *Client
delCh chan *Client
sendAllCh chan models.ChatMessage
pingCh chan models.PingMessage
doneCh chan bool
errCh chan error
}
//NewServer creates a new chat server
func NewServer(pattern string) *Server {
messages := []models.ChatMessage{}
clients := make(map[string]*Client)
addCh := make(chan *Client)
delCh := make(chan *Client)
sendAllCh := make(chan models.ChatMessage)
pingCh := make(chan models.PingMessage)
doneCh := make(chan bool)
errCh := make(chan error)
// Demo messages only. Remove me eventually!!!
messages = append(messages, models.ChatMessage{"Tom Nook", "I'll be there with Bells on! Ho ho!", "https://gamepedia.cursecdn.com/animalcrossingpocketcamp_gamepedia_en/thumb/4/4f/Timmy_Icon.png/120px-Timmy_Icon.png?version=87b38d7d6130411d113486c2db151385", "demo-message-1", "ChatMessage"})
messages = append(messages, models.ChatMessage{"Redd", "Fool me once, shame on you. Fool me twice, stop foolin' me.", "https://vignette.wikia.nocookie.net/animalcrossing/images/3/3d/Redd2.gif/revision/latest?cb=20100710004252", "demo-message-2", "ChatMessage"})
messages = append(messages, models.ChatMessage{"Kevin", "You just caught me before I was about to go work out weeweewee!", "https://vignette.wikia.nocookie.net/animalcrossing/images/2/20/NH-Kevin_poster.png/revision/latest/scale-to-width-down/100?cb=20200410185817", "demo-message-3", "ChatMessage"})
messages = append(messages, models.ChatMessage{"Isabelle", " Isabelle is the mayor's highly capable secretary. She can be forgetful sometimes, but you can always count on her for information about the town. She wears her hair up in a bun that makes her look like a shih tzu. Mostly because she is one! She also has a twin brother named Digby.", "https://dodo.ac/np/images/thumb/7/7b/IsabelleTrophyWiiU.png/200px-IsabelleTrophyWiiU.png", "demo-message-4", "ChatMessage"})
messages = append(messages, models.ChatMessage{"Judy", "myohmy, I'm dancing my dreams away.", "https://vignette.wikia.nocookie.net/animalcrossing/images/5/50/NH-Judy_poster.png/revision/latest/scale-to-width-down/100?cb=20200522063219", "demo-message-5", "ChatMessage"})
messages = append(messages, models.ChatMessage{"Blathers", "Blathers is an owl with brown feathers. His face is white and he has a yellow beak. His arms are wing shaped and he has yellow talons. His eyes are very big with small black irises. He also has big pink cheek circles on his cheeks. His belly appears to be checkered in diamonds with light brown and white squares, similar to an argyle vest, which is traditionally associated with academia. His green bowtie further alludes to his academic nature.", "https://vignette.wikia.nocookie.net/animalcrossing/images/b/b3/NH-character-Blathers.png/revision/latest?cb=20200229053519", "demo-message-6", "ChatMessage"})
server := &Server{
pattern,
messages,
clients,
addCh,
delCh,
sendAllCh,
pingCh,
doneCh,
errCh,
}
ticker := time.NewTicker(30 * time.Second)
go func() {
for {
select {
case <-ticker.C:
server.ping()
}
}
}()
return server
}
//Add adds a client to the server
func (s *Server) Add(c *Client) {
s.addCh <- c
}
//Remove removes a client from the server
func (s *Server) Remove(c *Client) {
s.delCh <- c
}
//SendToAll sends a message to all of the connected clients
func (s *Server) SendToAll(msg models.ChatMessage) {
s.sendAllCh <- msg
}
//Done marks the server as done
func (s *Server) Done() {
s.doneCh <- true
}
//Err handles an error
func (s *Server) Err(err error) {
s.errCh <- err
}
func (s *Server) sendPastMessages(c *Client) {
for _, msg := range s.Messages {
c.Write(msg)
}
}
func (s *Server) sendAll(msg models.ChatMessage) {
for _, c := range s.Clients {
c.Write(msg)
}
}
func (s *Server) ping() {
// fmt.Println("Start pinging....", len(s.clients))
ping := models.PingMessage{MessageType: "PING"}
for _, c := range s.Clients {
c.pingch <- ping
}
}
// Listen and serve.
// It serves client connection and broadcast request.
func (s *Server) Listen() {
// websocket handler
onConnected := func(ws *websocket.Conn) {
defer func() {
err := ws.Close()
if err != nil {
s.errCh <- err
}
}()
client := NewClient(ws, s)
s.Add(client)
client.Listen()
}
http.Handle(s.Pattern, websocket.Handler(onConnected))
log.Printf("Starting the websocket listener on: %s", s.Pattern)
for {
select {
// add new a client
case c := <-s.addCh:
s.Clients[c.id] = c
core.SetClientActive(c.id)
s.sendPastMessages(c)
// remove a client
case c := <-s.delCh:
delete(s.Clients, c.id)
core.RemoveClient(c.id)
// broadcast a message to all clients
case msg := <-s.sendAllCh:
log.Println("Sending a message to all:", msg)
s.Messages = append(s.Messages, msg)
s.sendAll(msg)
case ping := <-s.pingCh:
fmt.Println("PING?", ping)
case err := <-s.errCh:
log.Println("Error:", err.Error())
case <-s.doneCh:
return
}
}
}

20
core/constants.go Normal file
View File

@@ -0,0 +1,20 @@
package core
import (
"fmt"
)
// the following are injected at build-time
var (
//GitCommit is the commit which this version of owncast is running
GitCommit = "unknown"
//BuildVersion is the version
BuildVersion = "0.0.0"
//BuildType is the type of build
BuildType = "localdev"
)
//GetVersion gets the version string
func GetVersion() string {
return fmt.Sprintf("Owncast v%s-%s (%s)", BuildVersion, BuildType, GitCommit)
}

76
core/core.go Normal file
View File

@@ -0,0 +1,76 @@
package core
import (
"os"
"path"
"strconv"
log "github.com/sirupsen/logrus"
"github.com/gabek/owncast/config"
"github.com/gabek/owncast/core/ffmpeg"
"github.com/gabek/owncast/models"
"github.com/gabek/owncast/utils"
)
var (
_stats *models.Stats
_storage models.ChunkStorageProvider
)
//Start starts up the core processing
func Start() error {
resetDirectories()
if err := setupStats(); err != nil {
log.Println("failed to setup the stats")
return err
}
if err := setupStorage(); err != nil {
log.Println("failed to setup the storage")
return err
}
if err := createInitialOfflineState(); err != nil {
log.Println("failed to create the initial offline state")
return err
}
return nil
}
func createInitialOfflineState() error {
// Provide default files
if !utils.DoesFileExists("webroot/thumbnail.jpg") {
if err := utils.Copy("static/logo.png", "webroot/thumbnail.jpg"); err != nil {
return err
}
}
return ffmpeg.ShowStreamOfflineState()
}
func resetDirectories() {
log.Println("Resetting file directories to a clean slate.")
// Wipe the public, web-accessible hls data directory
os.RemoveAll(config.Config.PublicHLSPath)
os.RemoveAll(config.Config.PrivateHLSPath)
os.MkdirAll(config.Config.PublicHLSPath, 0777)
os.MkdirAll(config.Config.PrivateHLSPath, 0777)
// Remove the previous thumbnail
os.Remove("webroot/thumbnail.jpg")
// Create private hls data dirs
if !config.Config.VideoSettings.EnablePassthrough || len(config.Config.VideoSettings.StreamQualities) == 0 {
for index := range config.Config.VideoSettings.StreamQualities {
os.MkdirAll(path.Join(config.Config.PrivateHLSPath, strconv.Itoa(index)), 0777)
os.MkdirAll(path.Join(config.Config.PublicHLSPath, strconv.Itoa(index)), 0777)
}
} else {
os.MkdirAll(path.Join(config.Config.PrivateHLSPath, strconv.Itoa(0)), 0777)
os.MkdirAll(path.Join(config.Config.PublicHLSPath, strconv.Itoa(0)), 0777)
}
}

200
core/ffmpeg/ffmpeg.go Normal file
View File

@@ -0,0 +1,200 @@
package ffmpeg
import (
"fmt"
"math"
"os"
"os/exec"
"path"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
"github.com/gabek/owncast/config"
"github.com/gabek/owncast/utils"
)
//ShowStreamOfflineState generates and shows the stream's offline state
func ShowStreamOfflineState() error {
log.Println("----- Stream offline! Showing offline state!")
var outputDir = config.Config.PublicHLSPath
var variantPlaylistPath = config.Config.PublicHLSPath
if config.Config.IPFS.Enabled || config.Config.S3.Enabled {
outputDir = config.Config.PrivateHLSPath
variantPlaylistPath = config.Config.PrivateHLSPath
}
outputDir = path.Join(outputDir, "%v")
var variantPlaylistName = path.Join(variantPlaylistPath, "%v", "stream.m3u8")
var videoMaps = make([]string, 0)
var streamMaps = make([]string, 0)
var videoMapsString = ""
var streamMappingString = ""
if config.Config.VideoSettings.EnablePassthrough || len(config.Config.VideoSettings.StreamQualities) == 0 {
log.Println("Enabling passthrough video for offline state")
videoMapsString = "-b:v 1200k -b:a 128k" // Since we're compositing multiple sources we can't infer bitrate, so pick something reasonable.
streamMaps = append(streamMaps, fmt.Sprintf("v:%d", 0))
} else {
for index, quality := range config.Config.VideoSettings.StreamQualities {
maxRate := math.Floor(float64(quality.Bitrate) * 0.8)
videoMaps = append(videoMaps, fmt.Sprintf("-map v:0 -c:v:%d libx264 -b:v:%d %dk -maxrate %dk -bufsize %dk", index, index, int(quality.Bitrate), int(maxRate), int(maxRate)))
streamMaps = append(streamMaps, fmt.Sprintf("v:%d", index))
videoMapsString = strings.Join(videoMaps, " ")
}
}
framerate := 25
streamMappingString = "-var_stream_map \"" + strings.Join(streamMaps, " ") + "\""
ffmpegFlags := []string{
"-hide_banner",
// "-stream_loop 100",
// "-fflags", "+genpts",
"-i", config.Config.VideoSettings.OfflineImage,
"-i", "webroot/thumbnail.jpg",
"-filter_complex", "\"[0:v]scale=2640:2360[bg];[bg][1:v]overlay=200:250:enable='between(t,0,3)'\"",
videoMapsString, // All the different video variants
"-f hls",
// "-hls_list_size " + strconv.Itoa(config.Config.Files.MaxNumberInPlaylist),
"-hls_time 4", // + strconv.Itoa(config.Config.VideoSettings.ChunkLengthInSeconds),
"-hls_playlist_type", "event",
"-master_pl_name", "stream.m3u8",
"-strftime 1",
"-use_localtime 1",
"-hls_flags temp_file",
"-tune", "zerolatency",
"-g " + strconv.Itoa(framerate*2), " -keyint_min " + strconv.Itoa(framerate*2), // multiply your output frame rate * 2. For example, if your input is -framerate 30, then use -g 60
"-framerate " + strconv.Itoa(framerate),
"-preset " + config.Config.VideoSettings.EncoderPreset,
"-sc_threshold 0", // don't create key frames on scene change - only according to -g
"-profile:v", "main", // Main for standard definition (SD) to 640×480, High for high definition (HD) to 1920×1080
// "-movflags +faststart",
"-pix_fmt yuv420p",
streamMappingString,
"-hls_segment_filename " + path.Join(outputDir, "offline-%s.ts"),
// "-s", "720x480", // size
variantPlaylistName,
}
ffmpegFlagsString := strings.Join(ffmpegFlags, " ")
ffmpegCmd := config.Config.FFMpegPath + " " + ffmpegFlagsString
// log.Println(ffmpegCmd)
_, err := exec.Command("sh", "-c", ffmpegCmd).Output()
return err
}
//Start starts the ffmpeg process
func Start() error {
var outputDir = config.Config.PublicHLSPath
var variantPlaylistPath = config.Config.PublicHLSPath
if config.Config.IPFS.Enabled || config.Config.S3.Enabled {
outputDir = config.Config.PrivateHLSPath
variantPlaylistPath = config.Config.PrivateHLSPath
}
outputDir = path.Join(outputDir, "%v")
var variantPlaylistName = path.Join(variantPlaylistPath, "%v", "stream.m3u8")
log.Printf("Starting transcoder saving to /%s.", variantPlaylistName)
pipePath := utils.GetTemporaryPipePath()
var videoMaps = make([]string, 0)
var streamMaps = make([]string, 0)
var audioMaps = make([]string, 0)
var videoMapsString = ""
var audioMapsString = ""
var streamMappingString = ""
var profileString = ""
if config.Config.VideoSettings.EnablePassthrough || len(config.Config.VideoSettings.StreamQualities) == 0 {
log.Println("Enabling passthrough video for stream")
streamMaps = append(streamMaps, fmt.Sprintf("v:%d,a:%d", 0, 0))
videoMaps = append(videoMaps, "-map v:0 -c:v copy")
videoMapsString = strings.Join(videoMaps, " ")
audioMaps = append(audioMaps, "-map a:0")
audioMapsString = strings.Join(audioMaps, " ") + " -c:a copy" // Pass through audio for all the variants, don't reencode
} else {
for index, quality := range config.Config.VideoSettings.StreamQualities {
maxRate := math.Floor(float64(quality.Bitrate) * 0.8)
videoMaps = append(videoMaps, fmt.Sprintf("-map v:0 -c:v:%d libx264 -b:v:%d %dk -maxrate %dk -bufsize %dk", index, index, int(quality.Bitrate), int(maxRate), int(maxRate)))
streamMaps = append(streamMaps, fmt.Sprintf("v:%d,a:%d", index, index))
videoMapsString = strings.Join(videoMaps, " ")
audioMaps = append(audioMaps, "-map a:0")
audioMapsString = strings.Join(audioMaps, " ") + " -c:a copy" // Pass through audio for all the variants, don't reencode
profileString = "-profile:v high" // Main for standard definition (SD) to 640×480, High for high definition (HD) to 1920×1080
}
}
framerate := 25
streamMappingString = "-var_stream_map \"" + strings.Join(streamMaps, " ") + "\""
ffmpegFlags := []string{
"-hide_banner",
// "-re",
"-fflags", "+genpts",
"-i pipe:",
// "-vf scale=900:-2", // Re-enable in the future with a config to togging resizing?
// "-sws_flags fast_bilinear",
videoMapsString, // All the different video variants
audioMapsString,
"-master_pl_name stream.m3u8",
"-framerate " + strconv.Itoa(framerate),
"-g " + strconv.Itoa(framerate*2), " -keyint_min " + strconv.Itoa(framerate*2), // multiply your output frame rate * 2. For example, if your input is -framerate 30, then use -g 60
// "-r 25",
"-preset " + config.Config.VideoSettings.EncoderPreset,
"-sc_threshold 0", // don't create key frames on scene change - only according to -g
profileString,
"-movflags +faststart",
"-pix_fmt yuv420p",
"-f hls",
"-hls_list_size " + strconv.Itoa(config.Config.Files.MaxNumberInPlaylist),
"-hls_delete_threshold 10", // Keep 10 unreferenced segments on disk before they're deleted.
"-hls_time " + strconv.Itoa(config.Config.VideoSettings.ChunkLengthInSeconds),
"-strftime 1",
"-use_localtime 1",
"-hls_segment_filename " + path.Join(outputDir, "stream-%Y%m%d-%s.ts"),
"-hls_flags delete_segments+program_date_time+temp_file",
"-tune zerolatency",
// "-s", "720x480", // size
streamMappingString,
variantPlaylistName,
}
ffmpegFlagsString := strings.Join(ffmpegFlags, " ")
ffmpegCmd := "cat " + pipePath + " | " + config.Config.FFMpegPath + " " + ffmpegFlagsString
// fmt.Println(ffmpegCmd)
_, err := exec.Command("sh", "-c", ffmpegCmd).Output()
return err
}
//WritePlaylist writes the playlist to disk
func WritePlaylist(data string, filePath string) error {
f, err := os.Create(filePath)
if err != nil {
return err
}
defer f.Close()
if _, err := f.WriteString(data); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,93 @@
package ffmpeg
import (
"io/ioutil"
"os/exec"
"path"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/gabek/owncast/config"
)
//StartThumbnailGenerator starts generating thumbnails
func StartThumbnailGenerator(chunkPath string) {
// Every 20 seconds create a thumbnail from the most
// recent video segment.
ticker := time.NewTicker(20 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
if err := fireThumbnailGenerator(chunkPath); err != nil {
log.Errorln("Unable to generate thumbnail:", err)
}
case <-quit:
//TODO: evaluate if this is ever stopped
log.Println("thumbnail generator has stopped")
ticker.Stop()
return
}
}
}()
}
func fireThumbnailGenerator(chunkPath string) error {
// JPG takes less time to encode than PNG
outputFile := path.Join("webroot", "thumbnail.jpg")
framePath := path.Join(chunkPath, "0")
files, err := ioutil.ReadDir(framePath)
if err != nil {
return err
}
var modTime time.Time
var names []string
for _, fi := range files {
if path.Ext(fi.Name()) != ".ts" {
continue
}
if fi.Mode().IsRegular() {
if !fi.ModTime().Before(modTime) {
if fi.ModTime().After(modTime) {
modTime = fi.ModTime()
names = names[:0]
}
names = append(names, fi.Name())
}
}
}
if len(names) == 0 {
return nil
}
mostRecentFile := path.Join(framePath, names[0])
thumbnailCmdFlags := []string{
config.Config.FFMpegPath,
"-y", // Overwrite file
"-threads 1", // Low priority processing
"-t 1", // Pull from frame 1
"-i", mostRecentFile, // Input
"-f image2", // format
"-vframes 1", // Single frame
outputFile,
}
ffmpegCmd := strings.Join(thumbnailCmdFlags, " ")
// fmt.Println(ffmpegCmd)
if _, err := exec.Command("sh", "-c", ffmpegCmd).Output(); err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,32 @@
package ffmpeg
import (
"errors"
"fmt"
"os"
)
//VerifyFFMpegPath verifies that the path exists, is a file, and is executable
func VerifyFFMpegPath(path string) error {
stat, err := os.Stat(path)
if os.IsNotExist(err) {
return errors.New("ffmpeg path does not exist")
}
if err != nil {
return fmt.Errorf("error while verifying the ffmpeg path: %s", err.Error())
}
if stat.IsDir() {
return errors.New("ffmpeg path can not be a folder")
}
mode := stat.Mode()
//source: https://stackoverflow.com/a/60128480
if mode&0111 == 0 {
return errors.New("ffmpeg path is not executable")
}
return nil
}

163
core/playlist/monitor.go Normal file
View File

@@ -0,0 +1,163 @@
package playlist
import (
"io/ioutil"
"path"
"path/filepath"
"strconv"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/radovskyb/watcher"
"github.com/gabek/owncast/config"
"github.com/gabek/owncast/core/ffmpeg"
"github.com/gabek/owncast/models"
"github.com/gabek/owncast/utils"
)
var (
_storage models.ChunkStorageProvider
variants []models.Variant
)
//StartVideoContentMonitor starts the video content monitor
func StartVideoContentMonitor(storage models.ChunkStorageProvider) error {
_storage = storage
pathToMonitor := config.Config.PrivateHLSPath
// Create at least one structure to store the segments for the different stream variants
variants = make([]models.Variant, len(config.Config.VideoSettings.StreamQualities))
if len(config.Config.VideoSettings.StreamQualities) > 0 && !config.Config.VideoSettings.EnablePassthrough {
for index := range variants {
variants[index] = models.Variant{
VariantIndex: index,
Segments: make(map[string]*models.Segment),
}
}
} else {
variants[0] = models.Variant{
VariantIndex: 0,
Segments: make(map[string]*models.Segment),
}
}
// log.Printf("Using directory %s for storing files with %d variants...\n", pathToMonitor, len(variants))
w := watcher.New()
go func() {
for {
select {
case event := <-w.Event:
relativePath := utils.GetRelativePathFromAbsolutePath(event.Path)
if path.Ext(relativePath) == ".tmp" {
continue
}
// Ignore removals
if event.Op == watcher.Remove {
continue
}
// fmt.Println(event.Op, relativePath)
// Handle updates to the master playlist by copying it to webroot
if relativePath == path.Join(config.Config.PrivateHLSPath, "stream.m3u8") {
utils.Copy(event.Path, path.Join(config.Config.PublicHLSPath, "stream.m3u8"))
} else if filepath.Ext(event.Path) == ".m3u8" {
// Handle updates to playlists, but not the master playlist
updateVariantPlaylist(event.Path)
} else if filepath.Ext(event.Path) == ".ts" {
segment, err := getSegmentFromPath(event.Path)
if err != nil {
log.Println("failed to get the segment from path")
panic(err)
}
newObjectPathChannel := make(chan string, 1)
go func() {
newObjectPath, err := storage.Save(path.Join(config.Config.PrivateHLSPath, segment.RelativeUploadPath), 0)
if err != nil {
log.Println("failed to save the file to the chunk storage")
panic(err)
}
newObjectPathChannel <- newObjectPath
}()
newObjectPath := <-newObjectPathChannel
segment.RemoteID = newObjectPath
// fmt.Println("Uploaded", segment.RelativeUploadPath, "as", newObjectPath)
variants[segment.VariantIndex].Segments[filepath.Base(segment.RelativeUploadPath)] = &segment
// Force a variant's playlist to be updated after a file is uploaded.
associatedVariantPlaylist := strings.ReplaceAll(event.Path, path.Base(event.Path), "stream.m3u8")
updateVariantPlaylist(associatedVariantPlaylist)
}
case err := <-w.Error:
panic(err)
case <-w.Closed:
return
}
}
}()
// Watch the hls segment storage folder recursively for changes.
w.FilterOps(watcher.Write, watcher.Rename, watcher.Create)
if err := w.AddRecursive(pathToMonitor); err != nil {
return err
}
return w.Start(time.Millisecond * 200)
}
func getSegmentFromPath(fullDiskPath string) (models.Segment, error) {
segment := models.Segment{
FullDiskPath: fullDiskPath,
RelativeUploadPath: utils.GetRelativePathFromAbsolutePath(fullDiskPath),
}
index, err := strconv.Atoi(segment.RelativeUploadPath[0:1])
if err != nil {
return segment, err
}
segment.VariantIndex = index
return segment, nil
}
func getVariantIndexFromPath(fullDiskPath string) (int, error) {
return strconv.Atoi(fullDiskPath[0:1])
}
func updateVariantPlaylist(fullPath string) error {
relativePath := utils.GetRelativePathFromAbsolutePath(fullPath)
variantIndex, err := getVariantIndexFromPath(relativePath)
if err != nil {
return err
}
variant := variants[variantIndex]
playlistBytes, err := ioutil.ReadFile(fullPath)
if err != nil {
return err
}
playlistString := string(playlistBytes)
// fmt.Println("Rewriting playlist", relativePath, "to", path.Join(config.Config.PublicHLSPath, relativePath))
playlistString = _storage.GenerateRemotePlaylist(playlistString, variant)
return ffmpeg.WritePlaylist(playlistString, path.Join(config.Config.PublicHLSPath, relativePath))
}

188
core/rtmp/handler.go Normal file
View File

@@ -0,0 +1,188 @@
package rtmp
import (
"bytes"
"errors"
"io"
"os"
"syscall"
log "github.com/sirupsen/logrus"
"github.com/yutopp/go-flv"
flvtag "github.com/yutopp/go-flv/tag"
yutmp "github.com/yutopp/go-rtmp"
rtmpmsg "github.com/yutopp/go-rtmp/message"
"github.com/gabek/owncast/config"
"github.com/gabek/owncast/core"
"github.com/gabek/owncast/core/ffmpeg"
"github.com/gabek/owncast/utils"
)
var _ yutmp.Handler = (*Handler)(nil)
// Handler An RTMP connection handler
type Handler struct {
yutmp.DefaultHandler
flvFile *os.File
flvEnc *flv.Encoder
}
//OnServe handles the "OnServe" of the rtmp service
func (h *Handler) OnServe(conn *yutmp.Conn) {
}
//OnConnect handles the "OnConnect" of the rtmp service
func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error {
// log.Printf("OnConnect: %#v", cmd)
return nil
}
//OnCreateStream handles the "OnCreateStream" of the rtmp service
func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error {
// log.Printf("OnCreateStream: %#v", cmd)
return nil
}
//OnPublish handles the "OnPublish" of the rtmp service
func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error {
// log.Printf("OnPublish: %#v", cmd)
log.Println("Incoming stream connected.")
if cmd.PublishingName != config.Config.VideoSettings.StreamingKey {
return errors.New("invalid streaming key; rejecting incoming stream")
}
if _isConnected {
return errors.New("stream already running; can not overtake an existing stream")
}
// Record streams as FLV
p := utils.GetTemporaryPipePath()
syscall.Mkfifo(p, 0666)
f, err := os.OpenFile(p, os.O_RDWR, os.ModeNamedPipe)
if err != nil {
return err
}
h.flvFile = f
enc, err := flv.NewEncoder(f, flv.FlagsAudio|flv.FlagsVideo)
if err != nil {
_ = f.Close()
return err
}
h.flvEnc = enc
//TODO: why is this turned into a goroutine?
go ffmpeg.Start()
_isConnected = true
core.SetStreamAsConnected()
return nil
}
//OnSetDataFrame handles the setting of the data frame
func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDataFrame) error {
r := bytes.NewReader(data.Payload)
var script flvtag.ScriptData
if err := flvtag.DecodeScriptData(r, &script); err != nil {
log.Printf("Failed to decode script data: Err = %+v", err)
return nil // ignore
}
// log.Printf("SetDataFrame: Script = %#v", script)
if err := h.flvEnc.Encode(&flvtag.FlvTag{
TagType: flvtag.TagTypeScriptData,
Timestamp: timestamp,
Data: &script,
}); err != nil {
log.Printf("Failed to write script data: Err = %+v", err)
}
return nil
}
//OnAudio handles when we get audio from the rtmp service
func (h *Handler) OnAudio(timestamp uint32, payload io.Reader) error {
var audio flvtag.AudioData
if err := flvtag.DecodeAudioData(payload, &audio); err != nil {
return err
}
flvBody := new(bytes.Buffer)
if _, err := io.Copy(flvBody, audio.Data); err != nil {
return err
}
audio.Data = flvBody
// log.Printf("FLV Audio Data: Timestamp = %d, SoundFormat = %+v, SoundRate = %+v, SoundSize = %+v, SoundType = %+v, AACPacketType = %+v, Data length = %+v",
// timestamp,
// audio.SoundFormat,
// audio.SoundRate,
// audio.SoundSize,
// audio.SoundType,
// audio.AACPacketType,
// len(flvBody.Bytes()),
// )
if err := h.flvEnc.Encode(&flvtag.FlvTag{
TagType: flvtag.TagTypeAudio,
Timestamp: timestamp,
Data: &audio,
}); err != nil {
log.Printf("Failed to write audio: Err = %+v", err)
}
return nil
}
//OnVideo handles when we video from the rtmp service
func (h *Handler) OnVideo(timestamp uint32, payload io.Reader) error {
var video flvtag.VideoData
if err := flvtag.DecodeVideoData(payload, &video); err != nil {
return err
}
flvBody := new(bytes.Buffer)
if _, err := io.Copy(flvBody, video.Data); err != nil {
return err
}
video.Data = flvBody
// log.Printf("FLV Video Data: Timestamp = %d, FrameType = %+v, CodecID = %+v, AVCPacketType = %+v, CT = %+v, Data length = %+v",
// timestamp,
// video.FrameType,
// video.CodecID,
// video.AVCPacketType,
// video.CompositionTime,
// len(flvBody.Bytes()),
// )
if err := h.flvEnc.Encode(&flvtag.FlvTag{
TagType: flvtag.TagTypeVideo,
Timestamp: timestamp,
Data: &video,
}); err != nil {
log.Printf("Failed to write video: Err = %+v", err)
}
return nil
}
//OnClose handles the closing of the rtmp connection
func (h *Handler) OnClose() {
log.Printf("OnClose of the rtmp service")
if h.flvFile != nil {
_ = h.flvFile.Close()
}
_isConnected = false
core.SetStreamAsDisconnected()
}

58
core/rtmp/rtmp.go Normal file
View File

@@ -0,0 +1,58 @@
package rtmp
import (
"fmt"
"io"
"net"
log "github.com/sirupsen/logrus"
yutmp "github.com/yutopp/go-rtmp"
)
var (
//IsConnected whether there is a connection or not
_isConnected = false
)
//Start starts the rtmp service, listening on port 1935
func Start() {
port := 1935
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Panicf("Failed to resolve the tcp address for the rtmp service: %+v", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Panicf("Failed to acquire the tcp listener: %+v", err)
}
srv := yutmp.NewServer(&yutmp.ServerConfig{
OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *yutmp.ConnConfig) {
l := log.StandardLogger()
l.SetLevel(log.WarnLevel)
return conn, &yutmp.ConnConfig{
Handler: &Handler{},
ControlState: yutmp.StreamControlStateConfig{
DefaultBandwidthWindowSize: 6 * 1024 * 1024 / 8,
},
Logger: l,
}
},
})
log.Printf("RTMP server is listening for incoming stream on port: %d", port)
if err := srv.Serve(listener); err != nil {
log.Panicf("Failed to serve the rtmp service: %+v", err)
}
}
//IsConnected gets whether there is an rtmp connection or not
//this is only a getter since it is controlled by the rtmp handler
func IsConnected() bool {
return _isConnected
}

135
core/stats.go Normal file
View File

@@ -0,0 +1,135 @@
package core
import (
"encoding/json"
"io/ioutil"
"math"
"os"
"time"
log "github.com/sirupsen/logrus"
"github.com/gabek/owncast/models"
"github.com/gabek/owncast/utils"
)
const (
statsFilePath = "stats.json"
)
func setupStats() error {
s, err := getSavedStats()
if err != nil {
return err
}
_stats = &s
statsSaveTimer := time.NewTicker(1 * time.Minute)
go func() {
for {
select {
case <-statsSaveTimer.C:
if err := saveStatsToFile(); err != nil {
panic(err)
}
}
}
}()
staleViewerPurgeTimer := time.NewTicker(3 * time.Second)
go func() {
for {
select {
case <-staleViewerPurgeTimer.C:
purgeStaleViewers()
}
}
}()
return nil
}
func purgeStaleViewers() {
for clientID, lastConnectedtime := range _stats.Clients {
timeSinceLastActive := time.Since(lastConnectedtime).Minutes()
if timeSinceLastActive > 2 {
RemoveClient(clientID)
}
}
}
//IsStreamConnected checks if the stream is connected or not
func IsStreamConnected() bool {
if !_stats.StreamConnected {
return false
}
// Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available.
// So account for that with an artificial buffer.
timeSinceLastConnected := time.Since(_stats.LastConnectTime).Seconds()
if timeSinceLastConnected < 10 {
return false
}
return _stats.StreamConnected
}
//SetClientActive sets a client as active and connected
func SetClientActive(clientID string) {
// if _, ok := s.clients[clientID]; !ok {
// fmt.Println("Marking client active:", clientID, s.GetViewerCount()+1, "clients connected.")
// }
_stats.Clients[clientID] = time.Now()
_stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Clients)), float64(_stats.SessionMaxViewerCount)))
_stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount)))
}
//RemoveClient removes a client from the active clients record
func RemoveClient(clientID string) {
log.Println("Removing the client:", clientID)
delete(_stats.Clients, clientID)
}
func saveStatsToFile() error {
jsonData, err := json.Marshal(_stats)
if err != nil {
return err
}
f, err := os.Create(statsFilePath)
if err != nil {
return err
}
defer f.Close()
if _, err := f.Write(jsonData); err != nil {
return err
}
return nil
}
func getSavedStats() (models.Stats, error) {
result := models.Stats{
Clients: make(map[string]time.Time),
}
if !utils.DoesFileExists(statsFilePath) {
return result, nil
}
jsonFile, err := ioutil.ReadFile(statsFilePath)
if err != nil {
return result, nil
}
if err := json.Unmarshal(jsonFile, &result); err != nil {
return result, nil
}
return result, nil
}

51
core/status.go Normal file
View File

@@ -0,0 +1,51 @@
package core
import (
"time"
"github.com/gabek/owncast/config"
"github.com/gabek/owncast/core/ffmpeg"
"github.com/gabek/owncast/models"
)
//GetStatus gets the status of the system
func GetStatus() models.Status {
if _stats == nil {
return models.Status{}
}
return models.Status{
Online: IsStreamConnected(),
ViewerCount: len(_stats.Clients),
OverallMaxViewerCount: _stats.OverallMaxViewerCount,
SessionMaxViewerCount: _stats.SessionMaxViewerCount,
}
}
//SetStreamAsConnected sets the stream as connected
func SetStreamAsConnected() {
_stats.StreamConnected = true
_stats.LastConnectTime = time.Now()
timeSinceDisconnect := time.Since(_stats.LastDisconnectTime).Minutes()
if timeSinceDisconnect > 15 {
_stats.SessionMaxViewerCount = 0
}
chunkPath := config.Config.PublicHLSPath
if usingExternalStorage {
chunkPath = config.Config.PrivateHLSPath
}
ffmpeg.StartThumbnailGenerator(chunkPath)
}
//SetStreamAsDisconnected sets the stream as disconnected
func SetStreamAsDisconnected() {
_stats.StreamConnected = false
_stats.LastDisconnectTime = time.Now()
if config.Config.EnableOfflineImage {
ffmpeg.ShowStreamOfflineState()
}
}

31
core/storage.go Normal file
View File

@@ -0,0 +1,31 @@
package core
import (
"github.com/gabek/owncast/config"
"github.com/gabek/owncast/core/playlist"
"github.com/gabek/owncast/core/storageproviders"
)
var (
usingExternalStorage = false
)
func setupStorage() error {
if config.Config.IPFS.Enabled {
_storage = &storageproviders.IPFSStorage{}
usingExternalStorage = true
} else if config.Config.S3.Enabled {
_storage = &storageproviders.S3Storage{}
usingExternalStorage = true
}
if usingExternalStorage {
if err := _storage.Setup(); err != nil {
return err
}
go playlist.StartVideoContentMonitor(_storage)
}
return nil
}

View File

@@ -0,0 +1,319 @@
package storageproviders
import (
"bufio"
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
log "github.com/sirupsen/logrus"
config "github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/core/corehttp"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
ownconfig "github.com/gabek/owncast/config"
"github.com/gabek/owncast/models"
)
//IPFSStorage is the ipfs implementation of the ChunkStorageProvider
type IPFSStorage struct {
ipfs *icore.CoreAPI
node *core.IpfsNode
ctx context.Context
directoryHash string
gateway string
}
//Setup sets up the ipfs storage for saving the video to ipfs
func (s *IPFSStorage) Setup() error {
log.Println("Setting up IPFS for external storage of video. Please wait..")
s.gateway = ownconfig.Config.IPFS.Gateway
s.ctx = context.Background()
ipfsInstance, node, err := s.createIPFSInstance()
if err != nil {
return err
}
s.ipfs = ipfsInstance
s.node = node
return s.createIPFSDirectory("./hls")
}
//Save saves the file to the ipfs storage
func (s *IPFSStorage) Save(filePath string, retryCount int) (string, error) {
someFile, err := getUnixfsNode(filePath)
if err != nil {
return "", err
}
defer someFile.Close()
opts := []options.UnixfsAddOption{
options.Unixfs.Pin(false),
// options.Unixfs.CidVersion(1),
// options.Unixfs.RawLeaves(false),
// options.Unixfs.Nocopy(false),
}
cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...)
if err != nil {
return "", err
}
// fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String())
newHash, err := s.addFileToDirectory(cidFile, filepath.Base(filePath))
if err != nil {
return "", err
}
return s.gateway + newHash, nil
}
//GenerateRemotePlaylist implements the 'GenerateRemotePlaylist' method
func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, variant models.Variant) string {
var newPlaylist = ""
scanner := bufio.NewScanner(strings.NewReader(playlist))
for scanner.Scan() {
line := scanner.Text()
if line[0:1] != "#" {
fullRemotePath := variant.GetSegmentForFilename(line)
if fullRemotePath != nil {
line = ""
} else {
line = fullRemotePath.RemoteID
}
}
newPlaylist = newPlaylist + line + "\n"
}
return newPlaylist
}
func setupPlugins(externalPluginsPath string) error {
// Load any external plugins if available on externalPluginsPath
plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}
// Load preloaded and external plugins
if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
if err := plugins.Inject(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
return nil
}
// Creates an IPFS node and returns its coreAPI
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) {
// Open the repo
repo, err := fsrepo.Open(repoPath)
if err != nil {
return nil, nil, err
}
// Construct the node
nodeOptions := &core.BuildCfg{
Online: true,
Routing: libp2p.DHTOption, // This option sets the node to be a full DHT node (both fetching and storing DHT Records)
// Routing: libp2p.DHTClientOption, // This option sets the node to be a client DHT node (only fetching records)
Repo: repo,
}
node, err := core.NewNode(ctx, nodeOptions)
node.IsDaemon = true
if err != nil {
return nil, nil, err
}
// Attach the Core API to the constructed node
coreAPI, err := coreapi.NewCoreAPI(node)
if err != nil {
return nil, nil, err
}
return coreAPI, node, nil
}
func createTempRepo(ctx context.Context) (string, error) {
repoPath, err := ioutil.TempDir("", "ipfs-shell")
if err != nil {
return "", fmt.Errorf("failed to get temp dir: %s", err)
}
// Create a config with default options and a 2048 bit key
cfg, err := config.Init(ioutil.Discard, 2048)
if err != nil {
return "", err
}
// Create the repo with the config
err = fsrepo.Init(repoPath, cfg)
if err != nil {
return "", fmt.Errorf("failed to init ephemeral node: %s", err)
}
return repoPath, nil
}
// Spawns a node to be used just for this run (i.e. creates a tmp repo)
func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) {
if err := setupPlugins(""); err != nil {
return nil, nil, err
}
// Create a Temporary Repo
repoPath, err := createTempRepo(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to create temp repo: %s", err)
}
// Spawning an ephemeral IPFS node
return createNode(ctx, repoPath)
}
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
var wg sync.WaitGroup
peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers))
for _, addrStr := range peers {
addr, err := ma.NewMultiaddr(addrStr)
if err != nil {
return err
}
pii, err := peerstore.InfoFromP2pAddr(addr)
if err != nil {
return err
}
pi, ok := peerInfos[pii.ID]
if !ok {
pi = &peerstore.PeerInfo{ID: pii.ID}
peerInfos[pi.ID] = pi
}
pi.Addrs = append(pi.Addrs, pii.Addrs...)
}
wg.Add(len(peerInfos))
for _, peerInfo := range peerInfos {
go func(peerInfo *peerstore.PeerInfo) {
defer wg.Done()
err := ipfs.Swarm().Connect(ctx, *peerInfo)
if err != nil {
log.Printf("failed to connect to %s: %s", peerInfo.ID, err)
}
}(peerInfo)
}
wg.Wait()
return nil
}
func getUnixfsNode(path string) (files.Node, error) {
st, err := os.Stat(path)
if err != nil {
return nil, err
}
f, err := files.NewSerialFile(path, false, st)
if err != nil {
return nil, err
}
return f, nil
}
func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) (string, error) {
// fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String())
directoryToAddToPath := path.New(s.directoryHash)
newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath)
return newDirectoryHash.String() + "/" + filename, err
}
func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
// Spawn a node using a temporary path, creating a temporary repo for the run
api, node, error := spawnEphemeral(s.ctx)
// api, node, error := spawnDefault(ctx)
return &api, node, error
}
func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI {
defer log.Println("IPFS node exited")
log.Println("IPFS node is running")
bootstrapNodes := []string{
// IPFS Bootstrapper nodes.
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
// IPFS Cluster Pinning nodes
"/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
// You can add more nodes here, for example, another IPFS node you might have running locally, mine was:
// "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
// "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
}
go connectToPeers(s.ctx, *s.ipfs, bootstrapNodes)
addr := "/ip4/127.0.0.1/tcp/5001"
var opts = []corehttp.ServeOption{
corehttp.GatewayOption(true, "/ipfs", "/ipns"),
}
if err := corehttp.ListenAndServe(s.node, addr, opts...); err != nil {
return
}
}
func (s *IPFSStorage) createIPFSDirectory(directoryName string) error {
directory, err := getUnixfsNode(directoryName)
if err != nil {
return err
}
defer directory.Close()
newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory)
if err != nil {
return err
}
s.directoryHash = newlyCreatedDirectoryHash.String()
return nil
}

View File

@@ -0,0 +1,119 @@
package storageproviders
import (
"bufio"
"os"
"strings"
log "github.com/sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/gabek/owncast/config"
"github.com/gabek/owncast/models"
)
//S3Storage is the s3 implementation of the ChunkStorageProvider
type S3Storage struct {
sess *session.Session
host string
s3Endpoint string
s3Region string
s3Bucket string
s3AccessKey string
s3Secret string
}
//Setup sets up the s3 storage for saving the video to s3
func (s *S3Storage) Setup() error {
log.Println("Setting up S3 for external storage of video...")
s.s3Endpoint = config.Config.S3.Endpoint
s.s3Region = config.Config.S3.Region
s.s3Bucket = config.Config.S3.Bucket
s.s3AccessKey = config.Config.S3.AccessKey
s.s3Secret = config.Config.S3.Secret
s.sess = s.connectAWS()
return nil
}
//Save saves the file to the s3 bucket
func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
// fmt.Println("Saving", filePath)
file, err := os.Open(filePath)
if err != nil {
return "", err
}
defer file.Close()
uploader := s3manager.NewUploader(s.sess)
response, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.s3Bucket), // Bucket to be used
Key: aws.String(filePath), // Name of the file to be saved
Body: file, // File
})
if err != nil {
log.Errorln("error uploading:", err.Error())
if retryCount < 4 {
log.Println("Retrying...")
return s.Save(filePath, retryCount+1)
}
}
// fmt.Println("Uploaded", filePath, "to", response.Location)
return response.Location, nil
}
//GenerateRemotePlaylist implements the 'GenerateRemotePlaylist' method
func (s *S3Storage) GenerateRemotePlaylist(playlist string, variant models.Variant) string {
var newPlaylist = ""
scanner := bufio.NewScanner(strings.NewReader(playlist))
for scanner.Scan() {
line := scanner.Text()
if line[0:1] != "#" {
fullRemotePath := variant.GetSegmentForFilename(line)
if fullRemotePath == nil {
line = ""
} else {
line = fullRemotePath.RemoteID
}
}
newPlaylist = newPlaylist + line + "\n"
}
return newPlaylist
}
func (s S3Storage) connectAWS() *session.Session {
creds := credentials.NewStaticCredentials(s.s3AccessKey, s.s3Secret, "")
_, err := creds.Get()
if err != nil {
log.Panicln(err)
}
sess, err := session.NewSession(
&aws.Config{
Region: aws.String(s.s3Region),
Credentials: creds,
Endpoint: aws.String(s.s3Endpoint),
S3ForcePathStyle: aws.Bool(true),
},
)
if err != nil {
log.Panicln(err)
}
return sess
}