diff --git a/.gitignore b/.gitignore index 79d1dfeb3..00bf51bb9 100644 --- a/.gitignore +++ b/.gitignore @@ -15,10 +15,11 @@ vendor/ #owncast specific -config/config.yaml -config/stats.json +/config.yaml +/stats.json owncast webroot/thumbnail.jpg webroot/hls webroot/static/content.md -hls/ \ No newline at end of file +hls/ +dist/ diff --git a/README.md b/README.md index b758936a9..2bd1127a3 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ The goal is to have a single service that you can run and it works out of the bo ### Installation 1. **TODO: Once it's installable add directions here.** -1. Copy config/config-example.yaml to config/config.yaml. +1. Copy config-example.yaml to config.yaml. 1. Edit the config file and point it to `ffmpeg` 1. Set a custom streaming key by editing `streamingKey` in your config. @@ -91,8 +91,8 @@ The goal is to have a single service that you can run and it works out of the bo If you want a simpler way to run an instance of owncast you can run it in a container with the supplied Dockerfile. -1. Copy `config/config-example.yaml` to `config/config.yaml` -1. Edit `config/config.yaml` and change the path of ffmpeg to `/usr/bin/ffmpeg` +1. Copy `config-example.yaml` to `config.yaml` +1. Edit `config.yaml` and change the path of ffmpeg to `/usr/bin/ffmpeg` 1. Make any other config changes. 1. Run `docker build -t owncast .` and wait. It may take a few minutes to build. 1. Run `docker run -p 8080:8080 -p 1935:1935 -it owncast` @@ -162,7 +162,7 @@ Here's a list of some things you can do to increase performance and make things 1. Install the [Go toolchain](https://golang.org/dl/). 1. Clone the repo. `git clone https://github.com/gabek/owncast` 1. Follow the above [Getting Started](#getting-started) instructions, making sure ffmpeg exists and your config file is set. -1. `go run *.go` on the first run will download the required packages needed for the application to build. +1. `go run main.go` on the first run will download the required packages needed for the application to build. 1. It will start running the same as in the above [Usage](#usage) instructions and you can point [OBS to your localhost](#usage-with-obs) instance of it. ## Roadmap diff --git a/chunkStorage.go b/chunkStorage.go deleted file mode 100644 index 16ba6dbdb..000000000 --- a/chunkStorage.go +++ /dev/null @@ -1,7 +0,0 @@ -package main - -type ChunkStorage interface { - Setup(config Config) - Save(filePath string, retryCount int) string - GenerateRemotePlaylist(playlist string, variant Variant) string -} diff --git a/config/config-example.yaml b/config-example.yaml similarity index 100% rename from config/config-example.yaml rename to config-example.yaml diff --git a/config.go b/config.go deleted file mode 100644 index d77701772..000000000 --- a/config.go +++ /dev/null @@ -1,112 +0,0 @@ -package main - -import ( - "fmt" - "io/ioutil" - - log "github.com/sirupsen/logrus" - - "gopkg.in/yaml.v2" -) - -// Config struct -type Config struct { - IPFS IPFS `yaml:"ipfs"` - PublicHLSPath string `yaml:"publicHLSPath"` - PrivateHLSPath string `yaml:"privateHLSPath"` - VideoSettings VideoSettings `yaml:"videoSettings"` - Files Files `yaml:"files"` - FFMpegPath string `yaml:"ffmpegPath"` - WebServerPort int `yaml:"webServerPort"` - S3 S3 `yaml:"s3"` - EnableOfflineImage bool `yaml:"enableOfflineImage"` -} - -type VideoSettings struct { - ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"` - StreamingKey string `yaml:"streamingKey"` - EncoderPreset string `yaml:"encoderPreset"` - StreamQualities []StreamQuality `yaml:"streamQualities"` - EnablePassthrough bool `yaml:"passthrough"` - OfflineImage string `yaml:"offlineImage"` -} - -type StreamQuality struct { - Bitrate int `yaml:"bitrate"` -} - -// MaxNumberOnDisk must be at least as large as MaxNumberInPlaylist -type Files struct { - MaxNumberInPlaylist int `yaml:"maxNumberInPlaylist"` -} - -type IPFS struct { - Enabled bool `yaml:"enabled"` - Gateway string `yaml:"gateway"` -} - -type S3 struct { - Enabled bool `yaml:"enabled"` - Endpoint string `yaml:"endpoint"` - AccessKey string `yaml:"accessKey"` - Secret string `yaml:"secret"` - Bucket string `yaml:"bucket"` - Region string `yaml:"region"` -} - -func getConfig() Config { - filePath := "config/config.yaml" - - if !fileExists(filePath) { - log.Fatal("ERROR: valid config/config.yaml is required. Copy config/config-example.yaml to config/config.yaml and edit.") - } - - yamlFile, err := ioutil.ReadFile(filePath) - - var config Config - err = yaml.Unmarshal(yamlFile, &config) - if err != nil { - log.Panicln(err) - } - - // fmt.Printf("%+v\n", config) - - return config -} - -func checkConfig(config Config) { - if config.S3.Enabled && config.IPFS.Enabled { - log.Panicln("S3 and IPFS support cannot be enabled at the same time. Choose one.") - } - - if config.S3.Enabled { - if config.S3.AccessKey == "" || config.S3.Secret == "" { - log.Panicln("S3 support requires an access key and secret.") - } - - if config.S3.Region == "" || config.S3.Endpoint == "" { - log.Panicln("S3 support requires a region and endpoint.") - } - - if config.S3.Bucket == "" { - log.Panicln("S3 support requires a bucket created for storing public video segments.") - } - } - - // if !fileExists(config.PrivateHLSPath) { - // os.MkdirAll(path.Join(config.PrivateHLSPath, strconv.Itoa(0)), 0777) - // } - - // if !fileExists(config.PublicHLSPath) { - // os.MkdirAll(path.Join(config.PublicHLSPath, strconv.Itoa(0)), 0777) - // } - - if !fileExists(config.FFMpegPath) { - log.Panicln(fmt.Sprintf("ffmpeg does not exist at %s.", config.FFMpegPath)) - } - - if config.VideoSettings.EncoderPreset == "" { - log.Panicln("A video encoder preset is required to be set in the config file.") - } - -} diff --git a/config/config.go b/config/config.go new file mode 100644 index 000000000..259b8d277 --- /dev/null +++ b/config/config.go @@ -0,0 +1,127 @@ +package config + +import ( + "errors" + "fmt" + "io/ioutil" + + log "github.com/sirupsen/logrus" + "gopkg.in/yaml.v2" + + "github.com/gabek/owncast/utils" +) + +//Config contains a reference to the configuration +var Config *config + +type config struct { + IPFS ipfs `yaml:"ipfs"` + PublicHLSPath string `yaml:"publicHLSPath"` + PrivateHLSPath string `yaml:"privateHLSPath"` + VideoSettings videoSettings `yaml:"videoSettings"` + Files files `yaml:"files"` + FFMpegPath string `yaml:"ffmpegPath"` + WebServerPort int `yaml:"webServerPort"` + S3 s3 `yaml:"s3"` + EnableOfflineImage bool `yaml:"enableOfflineImage"` +} + +type videoSettings struct { + ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"` + StreamingKey string `yaml:"streamingKey"` + EncoderPreset string `yaml:"encoderPreset"` + StreamQualities []streamQuality `yaml:"streamQualities"` + EnablePassthrough bool `yaml:"passthrough"` + OfflineImage string `yaml:"offlineImage"` +} + +type streamQuality struct { + Bitrate int `yaml:"bitrate"` +} + +type files struct { + MaxNumberInPlaylist int `yaml:"maxNumberInPlaylist"` +} + +type ipfs struct { + Enabled bool `yaml:"enabled"` + Gateway string `yaml:"gateway"` +} + +//s3 is for configuring the s3 integration +type s3 struct { + Enabled bool `yaml:"enabled"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"accessKey"` + Secret string `yaml:"secret"` + Bucket string `yaml:"bucket"` + Region string `yaml:"region"` +} + +func (c *config) load(filePath string) error { + if !utils.DoesFileExists(filePath) { + log.Fatal("ERROR: valid config/config.yaml is required. Copy config-example.yaml to config.yaml and edit") + } + + yamlFile, err := ioutil.ReadFile(filePath) + if err != nil { + log.Printf("yamlFile.Get err #%v ", err) + return err + } + + if err := yaml.Unmarshal(yamlFile, c); err != nil { + log.Fatalf("Unmarshal: %v", err) + return err + } + + return nil +} + +func (c *config) verifySettings() error { + if c.S3.Enabled && c.IPFS.Enabled { + return errors.New("s3 and IPFS support cannot be enabled at the same time; choose one") + } + + if c.S3.Enabled { + if c.S3.AccessKey == "" || c.S3.Secret == "" { + return errors.New("s3 support requires an access key and secret") + } + + if c.S3.Region == "" || c.S3.Endpoint == "" { + return errors.New("s3 support requires a region and endpoint") + } + + if c.S3.Bucket == "" { + return errors.New("s3 support requires a bucket created for storing public video segments") + } + } + + // if !fileExists(config.PrivateHLSPath) { + // os.MkdirAll(path.Join(config.PrivateHLSPath, strconv.Itoa(0)), 0777) + // } + + // if !fileExists(config.PublicHLSPath) { + // os.MkdirAll(path.Join(config.PublicHLSPath, strconv.Itoa(0)), 0777) + // } + + if !utils.DoesFileExists(c.FFMpegPath) { + return fmt.Errorf("ffmpeg does not exist at: %s", c.FFMpegPath) + } + + if c.VideoSettings.EncoderPreset == "" { + return errors.New("a video encoder preset is required to be set in the config file") + } + + return nil +} + +//Load tries to load the configuration file +func Load(filePath string) error { + Config = new(config) + + if err := Config.load(filePath); err != nil { + return err + } + + return Config.verifySettings() +} diff --git a/controllers/index.go b/controllers/index.go new file mode 100644 index 000000000..fa42cd132 --- /dev/null +++ b/controllers/index.go @@ -0,0 +1,24 @@ +package controllers + +import ( + "net/http" + "path" + + "github.com/gabek/owncast/core" + "github.com/gabek/owncast/router/middleware" + "github.com/gabek/owncast/utils" +) + +//IndexHandler handles the default index route +func IndexHandler(w http.ResponseWriter, r *http.Request) { + middleware.EnableCors(&w) + + http.ServeFile(w, r, path.Join("webroot", r.URL.Path)) + + if path.Ext(r.URL.Path) == ".m3u8" { + middleware.DisableCache(&w) + + clientID := utils.GenerateClientIDFromRequest(r) + core.SetClientActive(clientID) + } +} diff --git a/controllers/status.go b/controllers/status.go new file mode 100644 index 000000000..b372651a5 --- /dev/null +++ b/controllers/status.go @@ -0,0 +1,18 @@ +package controllers + +import ( + "encoding/json" + "net/http" + + "github.com/gabek/owncast/core" + "github.com/gabek/owncast/router/middleware" +) + +//GetStatus gets the status of the server +func GetStatus(w http.ResponseWriter, r *http.Request) { + middleware.EnableCors(&w) + + status := core.GetStatus() + + json.NewEncoder(w).Encode(status) +} diff --git a/client.go b/core/chat/client.go similarity index 64% rename from client.go rename to core/chat/client.go index 5f3bd4700..6dab147f0 100644 --- a/client.go +++ b/core/chat/client.go @@ -1,4 +1,4 @@ -package main +package chat import ( "fmt" @@ -6,24 +6,26 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/net/websocket" + + "github.com/gabek/owncast/models" + "github.com/gabek/owncast/utils" ) const channelBufSize = 100 -// Chat client. +//Client represents a chat client. type Client struct { id string ws *websocket.Conn server *Server - ch chan *ChatMessage - pingch chan *PingMessage + ch chan models.ChatMessage + pingch chan models.PingMessage doneCh chan bool } -// Create new chat client. +//NewClient creates a new chat client func NewClient(ws *websocket.Conn, server *Server) *Client { - if ws == nil { log.Panicln("ws cannot be nil") } @@ -32,27 +34,29 @@ func NewClient(ws *websocket.Conn, server *Server) *Client { log.Panicln("server cannot be nil") } - ch := make(chan *ChatMessage, channelBufSize) + ch := make(chan models.ChatMessage, channelBufSize) doneCh := make(chan bool) - pingch := make(chan *PingMessage) - clientID := getClientIDFromRequest(ws.Request()) + pingch := make(chan models.PingMessage) + clientID := utils.GenerateClientIDFromRequest(ws.Request()) + return &Client{clientID, ws, server, ch, pingch, doneCh} } -func (c *Client) Conn() *websocket.Conn { +//GetConnection gets the connection for the client +func (c *Client) GetConnection() *websocket.Conn { return c.ws } -func (c *Client) Write(msg *ChatMessage) { +func (c *Client) Write(msg models.ChatMessage) { select { case c.ch <- msg: default: - c.server.Del(c) - err := fmt.Errorf("client %d is disconnected.", c.id) - c.server.Err(err) + c.server.Remove(c) + c.server.Err(fmt.Errorf("client %s is disconnected", c.id)) } } +//Done marks the client as done func (c *Client) Done() { c.doneCh <- true } @@ -78,7 +82,7 @@ func (c *Client) listenWrite() { // receive done request case <-c.doneCh: - c.server.Del(c) + c.server.Remove(c) c.doneCh <- true // for listenRead method return } @@ -92,20 +96,20 @@ func (c *Client) listenRead() { // receive done request case <-c.doneCh: - c.server.Del(c) + c.server.Remove(c) c.doneCh <- true // for listenWrite method return // read data from websocket connection default: - var msg ChatMessage - err := websocket.JSON.Receive(c.ws, &msg) - if err == io.EOF { + var msg models.ChatMessage + + if err := websocket.JSON.Receive(c.ws, &msg); err == io.EOF { c.doneCh <- true } else if err != nil { c.server.Err(err) } else { - c.server.SendAll(&msg) + c.server.SendToAll(msg) } } } diff --git a/core/chat/server.go b/core/chat/server.go new file mode 100644 index 000000000..f4b1ddde7 --- /dev/null +++ b/core/chat/server.go @@ -0,0 +1,170 @@ +package chat + +import ( + "fmt" + "net/http" + "time" + + "github.com/gabek/owncast/core" + "github.com/gabek/owncast/models" + log "github.com/sirupsen/logrus" + + "golang.org/x/net/websocket" +) + +//Server represents the server which handles the chat +type Server struct { + Pattern string + Messages []models.ChatMessage + Clients map[string]*Client + + addCh chan *Client + delCh chan *Client + sendAllCh chan models.ChatMessage + pingCh chan models.PingMessage + doneCh chan bool + errCh chan error +} + +//NewServer creates a new chat server +func NewServer(pattern string) *Server { + messages := []models.ChatMessage{} + clients := make(map[string]*Client) + addCh := make(chan *Client) + delCh := make(chan *Client) + sendAllCh := make(chan models.ChatMessage) + pingCh := make(chan models.PingMessage) + doneCh := make(chan bool) + errCh := make(chan error) + + // Demo messages only. Remove me eventually!!! + messages = append(messages, models.ChatMessage{"Tom Nook", "I'll be there with Bells on! Ho ho!", "https://gamepedia.cursecdn.com/animalcrossingpocketcamp_gamepedia_en/thumb/4/4f/Timmy_Icon.png/120px-Timmy_Icon.png?version=87b38d7d6130411d113486c2db151385", "demo-message-1", "ChatMessage"}) + messages = append(messages, models.ChatMessage{"Redd", "Fool me once, shame on you. Fool me twice, stop foolin' me.", "https://vignette.wikia.nocookie.net/animalcrossing/images/3/3d/Redd2.gif/revision/latest?cb=20100710004252", "demo-message-2", "ChatMessage"}) + messages = append(messages, models.ChatMessage{"Kevin", "You just caught me before I was about to go work out weeweewee!", "https://vignette.wikia.nocookie.net/animalcrossing/images/2/20/NH-Kevin_poster.png/revision/latest/scale-to-width-down/100?cb=20200410185817", "demo-message-3", "ChatMessage"}) + messages = append(messages, models.ChatMessage{"Isabelle", " Isabelle is the mayor's highly capable secretary. She can be forgetful sometimes, but you can always count on her for information about the town. She wears her hair up in a bun that makes her look like a shih tzu. Mostly because she is one! She also has a twin brother named Digby.", "https://dodo.ac/np/images/thumb/7/7b/IsabelleTrophyWiiU.png/200px-IsabelleTrophyWiiU.png", "demo-message-4", "ChatMessage"}) + messages = append(messages, models.ChatMessage{"Judy", "myohmy, I'm dancing my dreams away.", "https://vignette.wikia.nocookie.net/animalcrossing/images/5/50/NH-Judy_poster.png/revision/latest/scale-to-width-down/100?cb=20200522063219", "demo-message-5", "ChatMessage"}) + messages = append(messages, models.ChatMessage{"Blathers", "Blathers is an owl with brown feathers. His face is white and he has a yellow beak. His arms are wing shaped and he has yellow talons. His eyes are very big with small black irises. He also has big pink cheek circles on his cheeks. His belly appears to be checkered in diamonds with light brown and white squares, similar to an argyle vest, which is traditionally associated with academia. His green bowtie further alludes to his academic nature.", "https://vignette.wikia.nocookie.net/animalcrossing/images/b/b3/NH-character-Blathers.png/revision/latest?cb=20200229053519", "demo-message-6", "ChatMessage"}) + + server := &Server{ + pattern, + messages, + clients, + addCh, + delCh, + sendAllCh, + pingCh, + doneCh, + errCh, + } + + ticker := time.NewTicker(30 * time.Second) + go func() { + for { + select { + case <-ticker.C: + server.ping() + } + } + }() + + return server +} + +//Add adds a client to the server +func (s *Server) Add(c *Client) { + s.addCh <- c +} + +//Remove removes a client from the server +func (s *Server) Remove(c *Client) { + s.delCh <- c +} + +//SendToAll sends a message to all of the connected clients +func (s *Server) SendToAll(msg models.ChatMessage) { + s.sendAllCh <- msg +} + +//Done marks the server as done +func (s *Server) Done() { + s.doneCh <- true +} + +//Err handles an error +func (s *Server) Err(err error) { + s.errCh <- err +} + +func (s *Server) sendPastMessages(c *Client) { + for _, msg := range s.Messages { + c.Write(msg) + } +} + +func (s *Server) sendAll(msg models.ChatMessage) { + for _, c := range s.Clients { + c.Write(msg) + } +} + +func (s *Server) ping() { + // fmt.Println("Start pinging....", len(s.clients)) + + ping := models.PingMessage{MessageType: "PING"} + for _, c := range s.Clients { + c.pingch <- ping + } +} + +// Listen and serve. +// It serves client connection and broadcast request. +func (s *Server) Listen() { + // websocket handler + onConnected := func(ws *websocket.Conn) { + defer func() { + err := ws.Close() + if err != nil { + s.errCh <- err + } + }() + + client := NewClient(ws, s) + s.Add(client) + client.Listen() + } + + http.Handle(s.Pattern, websocket.Handler(onConnected)) + + log.Printf("Starting the websocket listener on: %s", s.Pattern) + + for { + select { + // add new a client + case c := <-s.addCh: + s.Clients[c.id] = c + + core.SetClientActive(c.id) + s.sendPastMessages(c) + + // remove a client + case c := <-s.delCh: + delete(s.Clients, c.id) + core.RemoveClient(c.id) + + // broadcast a message to all clients + case msg := <-s.sendAllCh: + log.Println("Sending a message to all:", msg) + s.Messages = append(s.Messages, msg) + s.sendAll(msg) + + case ping := <-s.pingCh: + fmt.Println("PING?", ping) + + case err := <-s.errCh: + log.Println("Error:", err.Error()) + + case <-s.doneCh: + return + } + } +} diff --git a/core/constants.go b/core/constants.go new file mode 100644 index 000000000..a724e1942 --- /dev/null +++ b/core/constants.go @@ -0,0 +1,20 @@ +package core + +import ( + "fmt" +) + +// the following are injected at build-time +var ( + //GitCommit is the commit which this version of owncast is running + GitCommit = "unknown" + //BuildVersion is the version + BuildVersion = "0.0.0" + //BuildType is the type of build + BuildType = "localdev" +) + +//GetVersion gets the version string +func GetVersion() string { + return fmt.Sprintf("Owncast v%s-%s (%s)", BuildVersion, BuildType, GitCommit) +} diff --git a/core/core.go b/core/core.go new file mode 100644 index 000000000..a99926eb1 --- /dev/null +++ b/core/core.go @@ -0,0 +1,76 @@ +package core + +import ( + "os" + "path" + "strconv" + + log "github.com/sirupsen/logrus" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/core/ffmpeg" + "github.com/gabek/owncast/models" + "github.com/gabek/owncast/utils" +) + +var ( + _stats *models.Stats + _storage models.ChunkStorageProvider +) + +//Start starts up the core processing +func Start() error { + resetDirectories() + + if err := setupStats(); err != nil { + log.Println("failed to setup the stats") + return err + } + + if err := setupStorage(); err != nil { + log.Println("failed to setup the storage") + return err + } + + if err := createInitialOfflineState(); err != nil { + log.Println("failed to create the initial offline state") + return err + } + + return nil +} + +func createInitialOfflineState() error { + // Provide default files + if !utils.DoesFileExists("webroot/thumbnail.jpg") { + if err := utils.Copy("static/logo.png", "webroot/thumbnail.jpg"); err != nil { + return err + } + } + + return ffmpeg.ShowStreamOfflineState() +} + +func resetDirectories() { + log.Println("Resetting file directories to a clean slate.") + + // Wipe the public, web-accessible hls data directory + os.RemoveAll(config.Config.PublicHLSPath) + os.RemoveAll(config.Config.PrivateHLSPath) + os.MkdirAll(config.Config.PublicHLSPath, 0777) + os.MkdirAll(config.Config.PrivateHLSPath, 0777) + + // Remove the previous thumbnail + os.Remove("webroot/thumbnail.jpg") + + // Create private hls data dirs + if !config.Config.VideoSettings.EnablePassthrough || len(config.Config.VideoSettings.StreamQualities) == 0 { + for index := range config.Config.VideoSettings.StreamQualities { + os.MkdirAll(path.Join(config.Config.PrivateHLSPath, strconv.Itoa(index)), 0777) + os.MkdirAll(path.Join(config.Config.PublicHLSPath, strconv.Itoa(index)), 0777) + } + } else { + os.MkdirAll(path.Join(config.Config.PrivateHLSPath, strconv.Itoa(0)), 0777) + os.MkdirAll(path.Join(config.Config.PublicHLSPath, strconv.Itoa(0)), 0777) + } +} diff --git a/ffmpeg.go b/core/ffmpeg/ffmpeg.go similarity index 71% rename from ffmpeg.go rename to core/ffmpeg/ffmpeg.go index 60b1945a7..477586977 100644 --- a/ffmpeg.go +++ b/core/ffmpeg/ffmpeg.go @@ -1,4 +1,4 @@ -package main +package ffmpeg import ( "fmt" @@ -10,17 +10,21 @@ import ( "strings" log "github.com/sirupsen/logrus" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/utils" ) -func showStreamOfflineState(configuration Config) { - fmt.Println("----- Stream offline! Showing offline state!") +//ShowStreamOfflineState generates and shows the stream's offline state +func ShowStreamOfflineState() error { + log.Println("----- Stream offline! Showing offline state!") - var outputDir = configuration.PublicHLSPath - var variantPlaylistPath = configuration.PublicHLSPath + var outputDir = config.Config.PublicHLSPath + var variantPlaylistPath = config.Config.PublicHLSPath - if configuration.IPFS.Enabled || configuration.S3.Enabled { - outputDir = configuration.PrivateHLSPath - variantPlaylistPath = configuration.PrivateHLSPath + if config.Config.IPFS.Enabled || config.Config.S3.Enabled { + outputDir = config.Config.PrivateHLSPath + variantPlaylistPath = config.Config.PrivateHLSPath } outputDir = path.Join(outputDir, "%v") @@ -30,12 +34,12 @@ func showStreamOfflineState(configuration Config) { var streamMaps = make([]string, 0) var videoMapsString = "" var streamMappingString = "" - if configuration.VideoSettings.EnablePassthrough || len(configuration.VideoSettings.StreamQualities) == 0 { - fmt.Println("Enabling passthrough video") + if config.Config.VideoSettings.EnablePassthrough || len(config.Config.VideoSettings.StreamQualities) == 0 { + log.Println("Enabling passthrough video for offline state") videoMapsString = "-b:v 1200k -b:a 128k" // Since we're compositing multiple sources we can't infer bitrate, so pick something reasonable. streamMaps = append(streamMaps, fmt.Sprintf("v:%d", 0)) } else { - for index, quality := range configuration.VideoSettings.StreamQualities { + for index, quality := range config.Config.VideoSettings.StreamQualities { maxRate := math.Floor(float64(quality.Bitrate) * 0.8) videoMaps = append(videoMaps, fmt.Sprintf("-map v:0 -c:v:%d libx264 -b:v:%d %dk -maxrate %dk -bufsize %dk", index, index, int(quality.Bitrate), int(maxRate), int(maxRate))) streamMaps = append(streamMaps, fmt.Sprintf("v:%d", index)) @@ -51,13 +55,13 @@ func showStreamOfflineState(configuration Config) { "-hide_banner", // "-stream_loop 100", // "-fflags", "+genpts", - "-i", configuration.VideoSettings.OfflineImage, + "-i", config.Config.VideoSettings.OfflineImage, "-i", "webroot/thumbnail.jpg", "-filter_complex", "\"[0:v]scale=2640:2360[bg];[bg][1:v]overlay=200:250:enable='between(t,0,3)'\"", videoMapsString, // All the different video variants "-f hls", - // "-hls_list_size " + strconv.Itoa(configuration.Files.MaxNumberInPlaylist), - "-hls_time 4", // + strconv.Itoa(configuration.VideoSettings.ChunkLengthInSeconds), + // "-hls_list_size " + strconv.Itoa(config.Config.Files.MaxNumberInPlaylist), + "-hls_time 4", // + strconv.Itoa(config.Config.VideoSettings.ChunkLengthInSeconds), "-hls_playlist_type", "event", "-master_pl_name", "stream.m3u8", "-strftime 1", @@ -66,7 +70,7 @@ func showStreamOfflineState(configuration Config) { "-tune", "zerolatency", "-g " + strconv.Itoa(framerate*2), " -keyint_min " + strconv.Itoa(framerate*2), // multiply your output frame rate * 2. For example, if your input is -framerate 30, then use -g 60 "-framerate " + strconv.Itoa(framerate), - "-preset " + configuration.VideoSettings.EncoderPreset, + "-preset " + config.Config.VideoSettings.EncoderPreset, "-sc_threshold 0", // don't create key frames on scene change - only according to -g "-profile:v", "main", // Main – for standard definition (SD) to 640×480, High – for high definition (HD) to 1920×1080 // "-movflags +faststart", @@ -80,29 +84,30 @@ func showStreamOfflineState(configuration Config) { ffmpegFlagsString := strings.Join(ffmpegFlags, " ") - ffmpegCmd := configuration.FFMpegPath + " " + ffmpegFlagsString + ffmpegCmd := config.Config.FFMpegPath + " " + ffmpegFlagsString - // fmt.Println(ffmpegCmd) + // log.Println(ffmpegCmd) _, err := exec.Command("sh", "-c", ffmpegCmd).Output() - fmt.Println(err) - verifyError(err) + + return err } -func startFfmpeg(configuration Config) { - var outputDir = configuration.PublicHLSPath - var variantPlaylistPath = configuration.PublicHLSPath +//Start starts the ffmpeg process +func Start() error { + var outputDir = config.Config.PublicHLSPath + var variantPlaylistPath = config.Config.PublicHLSPath - if configuration.IPFS.Enabled || configuration.S3.Enabled { - outputDir = configuration.PrivateHLSPath - variantPlaylistPath = configuration.PrivateHLSPath + if config.Config.IPFS.Enabled || config.Config.S3.Enabled { + outputDir = config.Config.PrivateHLSPath + variantPlaylistPath = config.Config.PrivateHLSPath } outputDir = path.Join(outputDir, "%v") var variantPlaylistName = path.Join(variantPlaylistPath, "%v", "stream.m3u8") log.Printf("Starting transcoder saving to /%s.", variantPlaylistName) - pipePath := getTempPipePath() + pipePath := utils.GetTemporaryPipePath() var videoMaps = make([]string, 0) var streamMaps = make([]string, 0) @@ -112,8 +117,8 @@ func startFfmpeg(configuration Config) { var streamMappingString = "" var profileString = "" - if configuration.VideoSettings.EnablePassthrough || len(configuration.VideoSettings.StreamQualities) == 0 { - fmt.Println("Enabling passthrough video") + if config.Config.VideoSettings.EnablePassthrough || len(config.Config.VideoSettings.StreamQualities) == 0 { + log.Println("Enabling passthrough video for stream") streamMaps = append(streamMaps, fmt.Sprintf("v:%d,a:%d", 0, 0)) videoMaps = append(videoMaps, "-map v:0 -c:v copy") videoMapsString = strings.Join(videoMaps, " ") @@ -121,7 +126,7 @@ func startFfmpeg(configuration Config) { audioMapsString = strings.Join(audioMaps, " ") + " -c:a copy" // Pass through audio for all the variants, don't reencode } else { - for index, quality := range configuration.VideoSettings.StreamQualities { + for index, quality := range config.Config.VideoSettings.StreamQualities { maxRate := math.Floor(float64(quality.Bitrate) * 0.8) videoMaps = append(videoMaps, fmt.Sprintf("-map v:0 -c:v:%d libx264 -b:v:%d %dk -maxrate %dk -bufsize %dk", index, index, int(quality.Bitrate), int(maxRate), int(maxRate))) streamMaps = append(streamMaps, fmt.Sprintf("v:%d,a:%d", index, index)) @@ -148,15 +153,15 @@ func startFfmpeg(configuration Config) { "-framerate " + strconv.Itoa(framerate), "-g " + strconv.Itoa(framerate*2), " -keyint_min " + strconv.Itoa(framerate*2), // multiply your output frame rate * 2. For example, if your input is -framerate 30, then use -g 60 // "-r 25", - "-preset " + configuration.VideoSettings.EncoderPreset, + "-preset " + config.Config.VideoSettings.EncoderPreset, "-sc_threshold 0", // don't create key frames on scene change - only according to -g profileString, "-movflags +faststart", "-pix_fmt yuv420p", "-f hls", - "-hls_list_size " + strconv.Itoa(configuration.Files.MaxNumberInPlaylist), + "-hls_list_size " + strconv.Itoa(config.Config.Files.MaxNumberInPlaylist), "-hls_delete_threshold 10", // Keep 10 unreferenced segments on disk before they're deleted. - "-hls_time " + strconv.Itoa(configuration.VideoSettings.ChunkLengthInSeconds), + "-hls_time " + strconv.Itoa(config.Config.VideoSettings.ChunkLengthInSeconds), "-strftime 1", "-use_localtime 1", "-hls_segment_filename " + path.Join(outputDir, "stream-%Y%m%d-%s.ts"), @@ -170,26 +175,26 @@ func startFfmpeg(configuration Config) { ffmpegFlagsString := strings.Join(ffmpegFlags, " ") - ffmpegCmd := "cat " + pipePath + " | " + configuration.FFMpegPath + " " + ffmpegFlagsString + ffmpegCmd := "cat " + pipePath + " | " + config.Config.FFMpegPath + " " + ffmpegFlagsString // fmt.Println(ffmpegCmd) _, err := exec.Command("sh", "-c", ffmpegCmd).Output() - fmt.Println(err) - verifyError(err) + + return err } -func writePlaylist(data string, filePath string) { +//WritePlaylist writes the playlist to disk +func WritePlaylist(data string, filePath string) error { f, err := os.Create(filePath) + if err != nil { + return err + } defer f.Close() - if err != nil { - fmt.Println(err) - return - } - _, err = f.WriteString(data) - if err != nil { - fmt.Println(err) - return + if _, err := f.WriteString(data); err != nil { + return err } + + return nil } diff --git a/thumbnailGenerator.go b/core/ffmpeg/thumbnailGenerator.go similarity index 69% rename from thumbnailGenerator.go rename to core/ffmpeg/thumbnailGenerator.go index 4408ffccf..bea74d469 100644 --- a/thumbnailGenerator.go +++ b/core/ffmpeg/thumbnailGenerator.go @@ -1,28 +1,34 @@ -package main +package ffmpeg import ( - "fmt" "io/ioutil" - "os" "os/exec" "path" "strings" "time" log "github.com/sirupsen/logrus" + + "github.com/gabek/owncast/config" ) -func startThumbnailGenerator(chunkPath string) { +//StartThumbnailGenerator starts generating thumbnails +func StartThumbnailGenerator(chunkPath string) { // Every 20 seconds create a thumbnail from the most // recent video segment. ticker := time.NewTicker(20 * time.Second) quit := make(chan struct{}) + go func() { for { select { case <-ticker.C: - fireThumbnailGenerator(chunkPath) + if err := fireThumbnailGenerator(chunkPath); err != nil { + log.Errorln("Unable to generate thumbnail:", err) + } case <-quit: + //TODO: evaluate if this is ever stopped + log.Println("thumbnail generator has stopped") ticker.Stop() return } @@ -30,18 +36,14 @@ func startThumbnailGenerator(chunkPath string) { }() } -func fireThumbnailGenerator(chunkPath string) { - framePath := path.Join(chunkPath, "0") - files, err := ioutil.ReadDir(framePath) - +func fireThumbnailGenerator(chunkPath string) error { // JPG takes less time to encode than PNG outputFile := path.Join("webroot", "thumbnail.jpg") - // fmt.Println("Generating thumbnail from", framePath, "to", outputFile) - + framePath := path.Join(chunkPath, "0") + files, err := ioutil.ReadDir(framePath) if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) + return err } var modTime time.Time @@ -63,13 +65,13 @@ func fireThumbnailGenerator(chunkPath string) { } if len(names) == 0 { - return + return nil } mostRecentFile := path.Join(framePath, names[0]) thumbnailCmdFlags := []string{ - configuration.FFMpegPath, + config.Config.FFMpegPath, "-y", // Overwrite file "-threads 1", // Low priority processing "-t 1", // Pull from frame 1 @@ -83,8 +85,9 @@ func fireThumbnailGenerator(chunkPath string) { // fmt.Println(ffmpegCmd) - _, err = exec.Command("sh", "-c", ffmpegCmd).Output() - if err != nil { - log.Errorln("Unable to generate thumbnail: ", err) + if _, err := exec.Command("sh", "-c", ffmpegCmd).Output(); err != nil { + return err } + + return nil } diff --git a/core/ffmpeg/verifyInstall.go b/core/ffmpeg/verifyInstall.go new file mode 100644 index 000000000..1b8e212a9 --- /dev/null +++ b/core/ffmpeg/verifyInstall.go @@ -0,0 +1,32 @@ +package ffmpeg + +import ( + "errors" + "fmt" + "os" +) + +//VerifyFFMpegPath verifies that the path exists, is a file, and is executable +func VerifyFFMpegPath(path string) error { + stat, err := os.Stat(path) + + if os.IsNotExist(err) { + return errors.New("ffmpeg path does not exist") + } + + if err != nil { + return fmt.Errorf("error while verifying the ffmpeg path: %s", err.Error()) + } + + if stat.IsDir() { + return errors.New("ffmpeg path can not be a folder") + } + + mode := stat.Mode() + //source: https://stackoverflow.com/a/60128480 + if mode&0111 == 0 { + return errors.New("ffmpeg path is not executable") + } + + return nil +} diff --git a/core/playlist/monitor.go b/core/playlist/monitor.go new file mode 100644 index 000000000..dfe847e93 --- /dev/null +++ b/core/playlist/monitor.go @@ -0,0 +1,163 @@ +package playlist + +import ( + "io/ioutil" + "path" + "path/filepath" + "strconv" + "strings" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/radovskyb/watcher" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/core/ffmpeg" + "github.com/gabek/owncast/models" + "github.com/gabek/owncast/utils" +) + +var ( + _storage models.ChunkStorageProvider + variants []models.Variant +) + +//StartVideoContentMonitor starts the video content monitor +func StartVideoContentMonitor(storage models.ChunkStorageProvider) error { + _storage = storage + + pathToMonitor := config.Config.PrivateHLSPath + + // Create at least one structure to store the segments for the different stream variants + variants = make([]models.Variant, len(config.Config.VideoSettings.StreamQualities)) + if len(config.Config.VideoSettings.StreamQualities) > 0 && !config.Config.VideoSettings.EnablePassthrough { + for index := range variants { + variants[index] = models.Variant{ + VariantIndex: index, + Segments: make(map[string]*models.Segment), + } + } + } else { + variants[0] = models.Variant{ + VariantIndex: 0, + Segments: make(map[string]*models.Segment), + } + } + + // log.Printf("Using directory %s for storing files with %d variants...\n", pathToMonitor, len(variants)) + + w := watcher.New() + + go func() { + for { + select { + case event := <-w.Event: + + relativePath := utils.GetRelativePathFromAbsolutePath(event.Path) + if path.Ext(relativePath) == ".tmp" { + continue + } + + // Ignore removals + if event.Op == watcher.Remove { + continue + } + + // fmt.Println(event.Op, relativePath) + + // Handle updates to the master playlist by copying it to webroot + if relativePath == path.Join(config.Config.PrivateHLSPath, "stream.m3u8") { + utils.Copy(event.Path, path.Join(config.Config.PublicHLSPath, "stream.m3u8")) + + } else if filepath.Ext(event.Path) == ".m3u8" { + // Handle updates to playlists, but not the master playlist + updateVariantPlaylist(event.Path) + + } else if filepath.Ext(event.Path) == ".ts" { + segment, err := getSegmentFromPath(event.Path) + if err != nil { + log.Println("failed to get the segment from path") + panic(err) + } + + newObjectPathChannel := make(chan string, 1) + go func() { + newObjectPath, err := storage.Save(path.Join(config.Config.PrivateHLSPath, segment.RelativeUploadPath), 0) + if err != nil { + log.Println("failed to save the file to the chunk storage") + panic(err) + } + + newObjectPathChannel <- newObjectPath + }() + + newObjectPath := <-newObjectPathChannel + segment.RemoteID = newObjectPath + // fmt.Println("Uploaded", segment.RelativeUploadPath, "as", newObjectPath) + + variants[segment.VariantIndex].Segments[filepath.Base(segment.RelativeUploadPath)] = &segment + + // Force a variant's playlist to be updated after a file is uploaded. + associatedVariantPlaylist := strings.ReplaceAll(event.Path, path.Base(event.Path), "stream.m3u8") + updateVariantPlaylist(associatedVariantPlaylist) + } + case err := <-w.Error: + panic(err) + case <-w.Closed: + return + } + } + }() + + // Watch the hls segment storage folder recursively for changes. + w.FilterOps(watcher.Write, watcher.Rename, watcher.Create) + + if err := w.AddRecursive(pathToMonitor); err != nil { + return err + } + + return w.Start(time.Millisecond * 200) +} + +func getSegmentFromPath(fullDiskPath string) (models.Segment, error) { + segment := models.Segment{ + FullDiskPath: fullDiskPath, + RelativeUploadPath: utils.GetRelativePathFromAbsolutePath(fullDiskPath), + } + + index, err := strconv.Atoi(segment.RelativeUploadPath[0:1]) + if err != nil { + return segment, err + } + + segment.VariantIndex = index + + return segment, nil +} + +func getVariantIndexFromPath(fullDiskPath string) (int, error) { + return strconv.Atoi(fullDiskPath[0:1]) +} + +func updateVariantPlaylist(fullPath string) error { + relativePath := utils.GetRelativePathFromAbsolutePath(fullPath) + variantIndex, err := getVariantIndexFromPath(relativePath) + if err != nil { + return err + } + + variant := variants[variantIndex] + + playlistBytes, err := ioutil.ReadFile(fullPath) + if err != nil { + return err + } + + playlistString := string(playlistBytes) + // fmt.Println("Rewriting playlist", relativePath, "to", path.Join(config.Config.PublicHLSPath, relativePath)) + + playlistString = _storage.GenerateRemotePlaylist(playlistString, variant) + + return ffmpeg.WritePlaylist(playlistString, path.Join(config.Config.PublicHLSPath, relativePath)) +} diff --git a/handler.go b/core/rtmp/handler.go similarity index 76% rename from handler.go rename to core/rtmp/handler.go index 4b25358c7..4230e4d9e 100644 --- a/handler.go +++ b/core/rtmp/handler.go @@ -1,79 +1,91 @@ -package main +package rtmp import ( "bytes" + "errors" "io" "os" "syscall" log "github.com/sirupsen/logrus" - - "github.com/pkg/errors" "github.com/yutopp/go-flv" flvtag "github.com/yutopp/go-flv/tag" - "github.com/yutopp/go-rtmp" + yutmp "github.com/yutopp/go-rtmp" rtmpmsg "github.com/yutopp/go-rtmp/message" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/core" + "github.com/gabek/owncast/core/ffmpeg" + "github.com/gabek/owncast/utils" ) -var _ rtmp.Handler = (*Handler)(nil) +var _ yutmp.Handler = (*Handler)(nil) // Handler An RTMP connection handler type Handler struct { - rtmp.DefaultHandler + yutmp.DefaultHandler flvFile *os.File flvEnc *flv.Encoder } -func (h *Handler) OnServe(conn *rtmp.Conn) { +//OnServe handles the "OnServe" of the rtmp service +func (h *Handler) OnServe(conn *yutmp.Conn) { } +//OnConnect handles the "OnConnect" of the rtmp service func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error { // log.Printf("OnConnect: %#v", cmd) return nil } +//OnCreateStream handles the "OnCreateStream" of the rtmp service func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error { // log.Printf("OnCreateStream: %#v", cmd) return nil } +//OnPublish handles the "OnPublish" of the rtmp service func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error { // log.Printf("OnPublish: %#v", cmd) log.Println("Incoming stream connected.") - if cmd.PublishingName != configuration.VideoSettings.StreamingKey { + if cmd.PublishingName != config.Config.VideoSettings.StreamingKey { return errors.New("invalid streaming key; rejecting incoming stream") } - if stats.IsStreamConnected() { + if _isConnected { return errors.New("stream already running; can not overtake an existing stream") } // Record streams as FLV - p := getTempPipePath() + p := utils.GetTemporaryPipePath() syscall.Mkfifo(p, 0666) + f, err := os.OpenFile(p, os.O_RDWR, os.ModeNamedPipe) if err != nil { - return errors.Wrap(err, "Failed to create flv file") + return err } h.flvFile = f enc, err := flv.NewEncoder(f, flv.FlagsAudio|flv.FlagsVideo) if err != nil { _ = f.Close() - return errors.Wrap(err, "Failed to create flv encoder") + return err } h.flvEnc = enc - go startFfmpeg(configuration) + //TODO: why is this turned into a goroutine? + go ffmpeg.Start() - streamConnected() + _isConnected = true + core.SetStreamAsConnected() return nil } +//OnSetDataFrame handles the setting of the data frame func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDataFrame) error { r := bytes.NewReader(data.Payload) @@ -96,6 +108,7 @@ func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDat return nil } +//OnAudio handles when we get audio from the rtmp service func (h *Handler) OnAudio(timestamp uint32, payload io.Reader) error { var audio flvtag.AudioData if err := flvtag.DecodeAudioData(payload, &audio); err != nil { @@ -129,6 +142,7 @@ func (h *Handler) OnAudio(timestamp uint32, payload io.Reader) error { return nil } +//OnVideo handles when we video from the rtmp service func (h *Handler) OnVideo(timestamp uint32, payload io.Reader) error { var video flvtag.VideoData if err := flvtag.DecodeVideoData(payload, &video); err != nil { @@ -161,12 +175,14 @@ func (h *Handler) OnVideo(timestamp uint32, payload io.Reader) error { return nil } +//OnClose handles the closing of the rtmp connection func (h *Handler) OnClose() { - log.Printf("OnClose") + log.Printf("OnClose of the rtmp service") if h.flvFile != nil { _ = h.flvFile.Close() } - streamDisconnected() + _isConnected = false + core.SetStreamAsDisconnected() } diff --git a/core/rtmp/rtmp.go b/core/rtmp/rtmp.go new file mode 100644 index 000000000..31f3ea701 --- /dev/null +++ b/core/rtmp/rtmp.go @@ -0,0 +1,58 @@ +package rtmp + +import ( + "fmt" + "io" + "net" + + log "github.com/sirupsen/logrus" + yutmp "github.com/yutopp/go-rtmp" +) + +var ( + //IsConnected whether there is a connection or not + _isConnected = false +) + +//Start starts the rtmp service, listening on port 1935 +func Start() { + port := 1935 + + tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Panicf("Failed to resolve the tcp address for the rtmp service: %+v", err) + } + + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + log.Panicf("Failed to acquire the tcp listener: %+v", err) + } + + srv := yutmp.NewServer(&yutmp.ServerConfig{ + OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *yutmp.ConnConfig) { + l := log.StandardLogger() + l.SetLevel(log.WarnLevel) + + return conn, &yutmp.ConnConfig{ + Handler: &Handler{}, + + ControlState: yutmp.StreamControlStateConfig{ + DefaultBandwidthWindowSize: 6 * 1024 * 1024 / 8, + }, + + Logger: l, + } + }, + }) + + log.Printf("RTMP server is listening for incoming stream on port: %d", port) + if err := srv.Serve(listener); err != nil { + log.Panicf("Failed to serve the rtmp service: %+v", err) + } +} + +//IsConnected gets whether there is an rtmp connection or not +//this is only a getter since it is controlled by the rtmp handler +func IsConnected() bool { + return _isConnected +} diff --git a/core/stats.go b/core/stats.go new file mode 100644 index 000000000..fad5d1ccf --- /dev/null +++ b/core/stats.go @@ -0,0 +1,135 @@ +package core + +import ( + "encoding/json" + "io/ioutil" + "math" + "os" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/gabek/owncast/models" + "github.com/gabek/owncast/utils" +) + +const ( + statsFilePath = "stats.json" +) + +func setupStats() error { + s, err := getSavedStats() + if err != nil { + return err + } + + _stats = &s + + statsSaveTimer := time.NewTicker(1 * time.Minute) + go func() { + for { + select { + case <-statsSaveTimer.C: + if err := saveStatsToFile(); err != nil { + panic(err) + } + } + } + }() + + staleViewerPurgeTimer := time.NewTicker(3 * time.Second) + go func() { + for { + select { + case <-staleViewerPurgeTimer.C: + purgeStaleViewers() + } + } + }() + + return nil +} + +func purgeStaleViewers() { + for clientID, lastConnectedtime := range _stats.Clients { + timeSinceLastActive := time.Since(lastConnectedtime).Minutes() + if timeSinceLastActive > 2 { + RemoveClient(clientID) + } + } +} + +//IsStreamConnected checks if the stream is connected or not +func IsStreamConnected() bool { + if !_stats.StreamConnected { + return false + } + + // Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available. + // So account for that with an artificial buffer. + timeSinceLastConnected := time.Since(_stats.LastConnectTime).Seconds() + if timeSinceLastConnected < 10 { + return false + } + + return _stats.StreamConnected +} + +//SetClientActive sets a client as active and connected +func SetClientActive(clientID string) { + // if _, ok := s.clients[clientID]; !ok { + // fmt.Println("Marking client active:", clientID, s.GetViewerCount()+1, "clients connected.") + // } + + _stats.Clients[clientID] = time.Now() + _stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Clients)), float64(_stats.SessionMaxViewerCount))) + _stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount))) +} + +//RemoveClient removes a client from the active clients record +func RemoveClient(clientID string) { + log.Println("Removing the client:", clientID) + + delete(_stats.Clients, clientID) +} + +func saveStatsToFile() error { + jsonData, err := json.Marshal(_stats) + if err != nil { + return err + } + + f, err := os.Create(statsFilePath) + if err != nil { + return err + } + + defer f.Close() + + if _, err := f.Write(jsonData); err != nil { + return err + } + + return nil +} + +func getSavedStats() (models.Stats, error) { + result := models.Stats{ + Clients: make(map[string]time.Time), + } + + if !utils.DoesFileExists(statsFilePath) { + return result, nil + } + + jsonFile, err := ioutil.ReadFile(statsFilePath) + if err != nil { + return result, nil + } + + if err := json.Unmarshal(jsonFile, &result); err != nil { + return result, nil + } + + return result, nil +} diff --git a/core/status.go b/core/status.go new file mode 100644 index 000000000..93e553f56 --- /dev/null +++ b/core/status.go @@ -0,0 +1,51 @@ +package core + +import ( + "time" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/core/ffmpeg" + "github.com/gabek/owncast/models" +) + +//GetStatus gets the status of the system +func GetStatus() models.Status { + if _stats == nil { + return models.Status{} + } + + return models.Status{ + Online: IsStreamConnected(), + ViewerCount: len(_stats.Clients), + OverallMaxViewerCount: _stats.OverallMaxViewerCount, + SessionMaxViewerCount: _stats.SessionMaxViewerCount, + } +} + +//SetStreamAsConnected sets the stream as connected +func SetStreamAsConnected() { + _stats.StreamConnected = true + _stats.LastConnectTime = time.Now() + + timeSinceDisconnect := time.Since(_stats.LastDisconnectTime).Minutes() + if timeSinceDisconnect > 15 { + _stats.SessionMaxViewerCount = 0 + } + + chunkPath := config.Config.PublicHLSPath + if usingExternalStorage { + chunkPath = config.Config.PrivateHLSPath + } + + ffmpeg.StartThumbnailGenerator(chunkPath) +} + +//SetStreamAsDisconnected sets the stream as disconnected +func SetStreamAsDisconnected() { + _stats.StreamConnected = false + _stats.LastDisconnectTime = time.Now() + + if config.Config.EnableOfflineImage { + ffmpeg.ShowStreamOfflineState() + } +} diff --git a/core/storage.go b/core/storage.go new file mode 100644 index 000000000..831e1618b --- /dev/null +++ b/core/storage.go @@ -0,0 +1,31 @@ +package core + +import ( + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/core/playlist" + "github.com/gabek/owncast/core/storageproviders" +) + +var ( + usingExternalStorage = false +) + +func setupStorage() error { + if config.Config.IPFS.Enabled { + _storage = &storageproviders.IPFSStorage{} + usingExternalStorage = true + } else if config.Config.S3.Enabled { + _storage = &storageproviders.S3Storage{} + usingExternalStorage = true + } + + if usingExternalStorage { + if err := _storage.Setup(); err != nil { + return err + } + + go playlist.StartVideoContentMonitor(_storage) + } + + return nil +} diff --git a/ipfsStorage.go b/core/storageproviders/ipfsStorage.go similarity index 87% rename from ipfsStorage.go rename to core/storageproviders/ipfsStorage.go index fe4aa55f4..aafefd178 100644 --- a/ipfsStorage.go +++ b/core/storageproviders/ipfsStorage.go @@ -1,4 +1,4 @@ -package main +package storageproviders import ( "bufio" @@ -27,8 +27,12 @@ import ( "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/plugin/loader" "github.com/ipfs/go-ipfs/repo/fsrepo" + + ownconfig "github.com/gabek/owncast/config" + "github.com/gabek/owncast/models" ) +//IPFSStorage is the ipfs implementation of the ChunkStorageProvider type IPFSStorage struct { ipfs *icore.CoreAPI node *core.IpfsNode @@ -38,27 +42,32 @@ type IPFSStorage struct { gateway string } -func (s *IPFSStorage) Setup(config Config) { +//Setup sets up the ipfs storage for saving the video to ipfs +func (s *IPFSStorage) Setup() error { log.Println("Setting up IPFS for external storage of video. Please wait..") - s.gateway = config.IPFS.Gateway + s.gateway = ownconfig.Config.IPFS.Gateway s.ctx = context.Background() - ipfsInstance, node, _ := s.createIPFSInstance() + ipfsInstance, node, err := s.createIPFSInstance() + if err != nil { + return err + } + s.ipfs = ipfsInstance s.node = node - s.createIPFSDirectory("./hls") + return s.createIPFSDirectory("./hls") } -func (s *IPFSStorage) Save(filePath string, retryCount int) string { +//Save saves the file to the ipfs storage +func (s *IPFSStorage) Save(filePath string, retryCount int) (string, error) { someFile, err := getUnixfsNode(filePath) - defer someFile.Close() - if err != nil { - log.Panicln(fmt.Errorf("Could not get File: %s", err)) + return "", err } + defer someFile.Close() opts := []options.UnixfsAddOption{ options.Unixfs.Pin(false), @@ -68,30 +77,33 @@ func (s *IPFSStorage) Save(filePath string, retryCount int) string { } cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...) - if err != nil { - log.Panicln(fmt.Errorf("Could not add File: %s", err)) + return "", err } // fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String()) - newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath)) + newHash, err := s.addFileToDirectory(cidFile, filepath.Base(filePath)) + if err != nil { + return "", err + } - return s.gateway + newHash + return s.gateway + newHash, nil } -func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, variant Variant) string { +//GenerateRemotePlaylist implements the 'GenerateRemotePlaylist' method +func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, variant models.Variant) string { var newPlaylist = "" scanner := bufio.NewScanner(strings.NewReader(playlist)) for scanner.Scan() { line := scanner.Text() if line[0:1] != "#" { - fullRemotePath := variant.getSegmentForFilename(line) + fullRemotePath := variant.GetSegmentForFilename(line) if fullRemotePath != nil { - line = fullRemotePath.RemoteID - } else { line = "" + } else { + line = fullRemotePath.RemoteID } } @@ -124,8 +136,6 @@ func setupPlugins(externalPluginsPath string) error { func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) { // Open the repo repo, err := fsrepo.Open(repoPath) - verifyError(err) - if err != nil { return nil, nil, err } @@ -189,8 +199,7 @@ func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) } // Spawning an ephemeral IPFS node - coreAPI, node, err := createNode(ctx, repoPath) - return coreAPI, node, err + return createNode(ctx, repoPath) } func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error { @@ -242,13 +251,12 @@ func getUnixfsNode(path string) (files.Node, error) { return f, nil } -func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) string { +func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) (string, error) { // fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String()) directoryToAddToPath := path.New(s.directoryHash) newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath) - verifyError(err) - return newDirectoryHash.String() + "/" + filename + return newDirectoryHash.String() + "/" + filename, err } func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) { @@ -293,12 +301,19 @@ func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI { } } -func (s *IPFSStorage) createIPFSDirectory(directoryName string) { +func (s *IPFSStorage) createIPFSDirectory(directoryName string) error { directory, err := getUnixfsNode(directoryName) - verifyError(err) + if err != nil { + return err + } defer directory.Close() newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory) - verifyError(err) + if err != nil { + return err + } + s.directoryHash = newlyCreatedDirectoryHash.String() + + return nil } diff --git a/s3Storage.go b/core/storageproviders/s3Storage.go similarity index 66% rename from s3Storage.go rename to core/storageproviders/s3Storage.go index b3f9b16ed..f29c79483 100644 --- a/s3Storage.go +++ b/core/storageproviders/s3Storage.go @@ -1,4 +1,4 @@ -package main +package storageproviders import ( "bufio" @@ -11,8 +11,12 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/models" ) +//S3Storage is the s3 implementation of the ChunkStorageProvider type S3Storage struct { sess *session.Session host string @@ -24,27 +28,30 @@ type S3Storage struct { s3Secret string } -func (s *S3Storage) Setup(configuration Config) { +//Setup sets up the s3 storage for saving the video to s3 +func (s *S3Storage) Setup() error { log.Println("Setting up S3 for external storage of video...") - s.s3Endpoint = configuration.S3.Endpoint - s.s3Region = configuration.S3.Region - s.s3Bucket = configuration.S3.Bucket - s.s3AccessKey = configuration.S3.AccessKey - s.s3Secret = configuration.S3.Secret + s.s3Endpoint = config.Config.S3.Endpoint + s.s3Region = config.Config.S3.Region + s.s3Bucket = config.Config.S3.Bucket + s.s3AccessKey = config.Config.S3.AccessKey + s.s3Secret = config.Config.S3.Secret s.sess = s.connectAWS() + + return nil } -func (s *S3Storage) Save(filePath string, retryCount int) string { +//Save saves the file to the s3 bucket +func (s *S3Storage) Save(filePath string, retryCount int) (string, error) { // fmt.Println("Saving", filePath) file, err := os.Open(filePath) - defer file.Close() - if err != nil { - log.Errorln(err) + return "", err } + defer file.Close() uploader := s3manager.NewUploader(s.sess) @@ -55,30 +62,31 @@ func (s *S3Storage) Save(filePath string, retryCount int) string { }) if err != nil { - log.Errorln(err) + log.Errorln("error uploading:", err.Error()) if retryCount < 4 { log.Println("Retrying...") - s.Save(filePath, retryCount+1) + return s.Save(filePath, retryCount+1) } } // fmt.Println("Uploaded", filePath, "to", response.Location) - return response.Location + return response.Location, nil } -func (s *S3Storage) GenerateRemotePlaylist(playlist string, variant Variant) string { +//GenerateRemotePlaylist implements the 'GenerateRemotePlaylist' method +func (s *S3Storage) GenerateRemotePlaylist(playlist string, variant models.Variant) string { var newPlaylist = "" scanner := bufio.NewScanner(strings.NewReader(playlist)) for scanner.Scan() { line := scanner.Text() if line[0:1] != "#" { - fullRemotePath := variant.getSegmentForFilename(line) - if fullRemotePath != nil { - line = fullRemotePath.RemoteID - } else { + fullRemotePath := variant.GetSegmentForFilename(line) + if fullRemotePath == nil { line = "" + } else { + line = fullRemotePath.RemoteID } } diff --git a/go.mod b/go.mod index 513d0dce2..a58931127 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/multiformats/go-multiaddr v0.2.2 - github.com/pkg/errors v0.9.1 github.com/radovskyb/watcher v1.0.7 github.com/sirupsen/logrus v1.6.0 github.com/yutopp/go-flv v0.2.0 diff --git a/main.go b/main.go index 1634c90a2..e333a0cdd 100644 --- a/main.go +++ b/main.go @@ -1,121 +1,33 @@ package main import ( - "encoding/json" - "net/http" - "path" - "strconv" - log "github.com/sirupsen/logrus" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/core" + "github.com/gabek/owncast/router" ) -// Build-time injected values -var GitCommit string = "unknown" -var BuildVersion string = "0.0.0" -var BuildType string = "localdev" - -var storage ChunkStorage -var configuration = getConfig() -var server *Server -var stats *Stats - -var usingExternalStorage = false - func main() { // logrus.SetReportCaller(true) - log.StandardLogger().Printf("Owncast v%s/%s (%s)", BuildVersion, BuildType, GitCommit) + log.Println(core.GetVersion()) - checkConfig(configuration) - resetDirectories(configuration) + //TODO: potentially load the config from a flag like: + //configFile := flag.String("configFile", "config.yaml", "Config File full path. Defaults to current folder") + // flag.Parse() - stats = getSavedStats() - stats.Setup() - - if configuration.IPFS.Enabled { - storage = &IPFSStorage{} - usingExternalStorage = true - } else if configuration.S3.Enabled { - storage = &S3Storage{} - usingExternalStorage = true + if err := config.Load("config.yaml"); err != nil { + panic(err) } - if usingExternalStorage { - storage.Setup(configuration) - go monitorVideoContent(configuration.PrivateHLSPath, configuration, storage) + // starts the core + if err := core.Start(); err != nil { + log.Println("failed to start the core package") + panic(err) } - createInitialOfflineState() - go startRTMPService() - - startWebServer() -} - -func startWebServer() { - // websocket server - server = NewServer("/entry") - go server.Listen() - - // static files - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - enableCors(&w) - http.ServeFile(w, r, path.Join("webroot", r.URL.Path)) - - if path.Ext(r.URL.Path) == ".m3u8" { - clientID := getClientIDFromRequest(r) - stats.SetClientActive(clientID) - disableCache(&w) - } - }) - - http.HandleFunc("/status", getStatus) - - log.Printf("Starting public web server on port %d", configuration.WebServerPort) - - log.Fatal(http.ListenAndServe(":"+strconv.Itoa(configuration.WebServerPort), nil)) -} - -func enableCors(w *http.ResponseWriter) { - (*w).Header().Set("Access-Control-Allow-Origin", "*") -} - -func disableCache(w *http.ResponseWriter) { - (*w).Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") - (*w).Header().Set("Expires", "0") -} - -func getStatus(w http.ResponseWriter, r *http.Request) { - enableCors(&w) - - status := Status{ - Online: stats.IsStreamConnected(), - ViewerCount: stats.GetViewerCount(), - OverallMaxViewerCount: stats.GetOverallMaxViewerCount(), - SessionMaxViewerCount: stats.GetSessionMaxViewerCount(), - } - json.NewEncoder(w).Encode(status) -} - -func streamConnected() { - stats.StreamConnected() - - chunkPath := configuration.PublicHLSPath - if usingExternalStorage { - chunkPath = configuration.PrivateHLSPath - } - startThumbnailGenerator(chunkPath) -} - -func streamDisconnected() { - stats.StreamDisconnected() - if configuration.EnableOfflineImage { - showStreamOfflineState(configuration) + if err := router.Start(); err != nil { + log.Println("failed to start/run the router") + panic(err) } } - -func viewerAdded(clientID string) { - stats.SetClientActive(clientID) -} - -func viewerRemoved(clientID string) { - stats.ViewerDisconnected(clientID) -} diff --git a/message.go b/models/chatMessage.go similarity index 55% rename from message.go rename to models/chatMessage.go index 2a2611243..8caffe4ee 100644 --- a/message.go +++ b/models/chatMessage.go @@ -1,5 +1,6 @@ -package main +package models +//ChatMessage represents a single chat message type ChatMessage struct { Author string `json:"author"` Body string `json:"body"` @@ -8,10 +9,8 @@ type ChatMessage struct { MessageType string `json:"type"` } -func (s *ChatMessage) String() string { +//String converts the chat message to string +//TODO: is this required? or can we remove it +func (s ChatMessage) String() string { return s.Author + " says " + s.Body } - -type PingMessage struct { - MessageType string `json:"type"` -} diff --git a/models/pingMessage.go b/models/pingMessage.go new file mode 100644 index 000000000..358115822 --- /dev/null +++ b/models/pingMessage.go @@ -0,0 +1,6 @@ +package models + +//PingMessage represents a ping message between the client and server +type PingMessage struct { + MessageType string `json:"type"` +} diff --git a/models/playlist.go b/models/playlist.go new file mode 100644 index 000000000..9c2a4967b --- /dev/null +++ b/models/playlist.go @@ -0,0 +1,20 @@ +package models + +//Segment represents a segment of the live stream +type Segment struct { + VariantIndex int // The bitrate variant + FullDiskPath string // Where it lives on disk + RelativeUploadPath string // Path it should have remotely + RemoteID string // Used for IPFS +} + +//Variant represents a single bitrate variant and the segments that make it up +type Variant struct { + VariantIndex int + Segments map[string]*Segment +} + +//GetSegmentForFilename gets the segment for the provided filename +func (v *Variant) GetSegmentForFilename(filename string) *Segment { + return v.Segments[filename] +} diff --git a/models/stats.go b/models/stats.go new file mode 100644 index 000000000..1b6f188f0 --- /dev/null +++ b/models/stats.go @@ -0,0 +1,16 @@ +package models + +import ( + "time" +) + +//Stats holds the stats for the system +type Stats struct { + SessionMaxViewerCount int `json:"sessionMaxViewerCount"` + OverallMaxViewerCount int `json:"overallMaxViewerCount"` + LastDisconnectTime time.Time `json:"lastDisconnectTime"` + + StreamConnected bool `json:"-"` + LastConnectTime time.Time `json:"-"` + Clients map[string]time.Time `json:"-"` +} diff --git a/status.go b/models/status.go similarity index 79% rename from status.go rename to models/status.go index 30b3b2d8c..15a720a4a 100644 --- a/status.go +++ b/models/status.go @@ -1,5 +1,6 @@ -package main +package models +//Status represents the status of the system type Status struct { Online bool `json:"online"` ViewerCount int `json:"viewerCount"` diff --git a/models/storageProvider.go b/models/storageProvider.go new file mode 100644 index 000000000..4f1737271 --- /dev/null +++ b/models/storageProvider.go @@ -0,0 +1,8 @@ +package models + +//ChunkStorageProvider is how a chunk storage provider should be implemented +type ChunkStorageProvider interface { + Setup() error + Save(filePath string, retryCount int) (string, error) + GenerateRemotePlaylist(playlist string, variant Variant) string +} diff --git a/playlistMonitor.go b/playlistMonitor.go deleted file mode 100644 index ff29d6f6e..000000000 --- a/playlistMonitor.go +++ /dev/null @@ -1,146 +0,0 @@ -package main - -import ( - "io/ioutil" - "path" - "path/filepath" - "strconv" - "strings" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/radovskyb/watcher" -) - -type Segment struct { - VariantIndex int // The bitrate variant - FullDiskPath string // Where it lives on disk - RelativeUploadPath string // Path it should have remotely - RemoteID string // Used for IPFS -} - -type Variant struct { - VariantIndex int - Segments map[string]*Segment -} - -func (v *Variant) getSegmentForFilename(filename string) *Segment { - return v.Segments[filename] - // for _, segment := range v.Segments { - // if path.Base(segment.FullDiskPath) == filename { - // return &segment - // } - // } - // return nil -} - -func getSegmentFromPath(fullDiskPath string) Segment { - segment := Segment{} - segment.FullDiskPath = fullDiskPath - segment.RelativeUploadPath = getRelativePathFromAbsolutePath(fullDiskPath) - index, error := strconv.Atoi(segment.RelativeUploadPath[0:1]) - verifyError(error) - segment.VariantIndex = index - - return segment -} - -func getVariantIndexFromPath(fullDiskPath string) int { - index, error := strconv.Atoi(fullDiskPath[0:1]) - verifyError(error) - return index -} - -var variants []Variant - -func updateVariantPlaylist(fullPath string) { - relativePath := getRelativePathFromAbsolutePath(fullPath) - variantIndex := getVariantIndexFromPath(relativePath) - variant := variants[variantIndex] - - playlistBytes, err := ioutil.ReadFile(fullPath) - verifyError(err) - playlistString := string(playlistBytes) - // fmt.Println("Rewriting playlist", relativePath, "to", path.Join(configuration.PublicHLSPath, relativePath)) - - playlistString = storage.GenerateRemotePlaylist(playlistString, variant) - - writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, relativePath)) -} - -func monitorVideoContent(pathToMonitor string, configuration Config, storage ChunkStorage) { - // Create at least one structure to store the segments for the different stream variants - variants = make([]Variant, len(configuration.VideoSettings.StreamQualities)) - if len(configuration.VideoSettings.StreamQualities) > 0 && !configuration.VideoSettings.EnablePassthrough { - for index := range variants { - variants[index] = Variant{index, make(map[string]*Segment)} - } - } else { - variants[0] = Variant{0, make(map[string]*Segment)} - } - // log.Printf("Using directory %s for storing files with %d variants...\n", pathToMonitor, len(variants)) - - w := watcher.New() - - go func() { - for { - select { - case event := <-w.Event: - - relativePath := getRelativePathFromAbsolutePath(event.Path) - if path.Ext(relativePath) == ".tmp" { - continue - } - - // Ignore removals - if event.Op == watcher.Remove { - continue - } - - // fmt.Println(event.Op, relativePath) - - // Handle updates to the master playlist by copying it to webroot - if relativePath == path.Join(configuration.PrivateHLSPath, "stream.m3u8") { - copy(event.Path, path.Join(configuration.PublicHLSPath, "stream.m3u8")) - - } else if filepath.Ext(event.Path) == ".m3u8" { - // Handle updates to playlists, but not the master playlist - updateVariantPlaylist(event.Path) - } else if filepath.Ext(event.Path) == ".ts" { - segment := getSegmentFromPath(event.Path) - - newObjectPathChannel := make(chan string, 1) - go func() { - newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, segment.RelativeUploadPath), 0) - newObjectPathChannel <- newObjectPath - }() - newObjectPath := <-newObjectPathChannel - segment.RemoteID = newObjectPath - // fmt.Println("Uploaded", segment.RelativeUploadPath, "as", newObjectPath) - - variants[segment.VariantIndex].Segments[filepath.Base(segment.RelativeUploadPath)] = &segment - - // Force a variant's playlist to be updated after a file is uploaded. - associatedVariantPlaylist := strings.ReplaceAll(event.Path, path.Base(event.Path), "stream.m3u8") - updateVariantPlaylist(associatedVariantPlaylist) - } - case err := <-w.Error: - log.Fatalln(err) - case <-w.Closed: - return - } - } - }() - - // Watch the hls segment storage folder recursively for changes. - w.FilterOps(watcher.Write, watcher.Rename, watcher.Create) - - if err := w.AddRecursive(pathToMonitor); err != nil { - log.Fatalln(err) - } - - if err := w.Start(time.Millisecond * 200); err != nil { - log.Fatalln(err) - } -} diff --git a/router/middleware/cors.go b/router/middleware/cors.go new file mode 100644 index 000000000..50c7f5779 --- /dev/null +++ b/router/middleware/cors.go @@ -0,0 +1,10 @@ +package middleware + +import ( + "net/http" +) + +//EnableCors enables the cors header on the responses +func EnableCors(w *http.ResponseWriter) { + (*w).Header().Set("Access-Control-Allow-Origin", "*") +} diff --git a/router/middleware/disableCache.go b/router/middleware/disableCache.go new file mode 100644 index 000000000..43ea2b719 --- /dev/null +++ b/router/middleware/disableCache.go @@ -0,0 +1,11 @@ +package middleware + +import ( + "net/http" +) + +//DisableCache writes the disable cache header on the responses +func DisableCache(w *http.ResponseWriter) { + (*w).Header().Set("Cache-Control", "no-cache, no-store, must-revalidate") + (*w).Header().Set("Expires", "0") +} diff --git a/router/router.go b/router/router.go new file mode 100644 index 000000000..fbec05b30 --- /dev/null +++ b/router/router.go @@ -0,0 +1,35 @@ +package router + +import ( + "fmt" + "net/http" + + log "github.com/sirupsen/logrus" + + "github.com/gabek/owncast/config" + "github.com/gabek/owncast/controllers" + "github.com/gabek/owncast/core/chat" + "github.com/gabek/owncast/core/rtmp" +) + +//Start starts the router for the http, ws, and rtmp +func Start() error { + // websocket server + chatServer := chat.NewServer("/entry") + go chatServer.Listen() + + // start the rtmp server + go rtmp.Start() + + // static files + http.HandleFunc("/", controllers.IndexHandler) + + // status of the system + http.HandleFunc("/status", controllers.GetStatus) + + port := config.Config.WebServerPort + + log.Printf("Starting public web server on port: %d", port) + + return http.ListenAndServe(fmt.Sprintf(":%d", port), nil) +} diff --git a/rtmp.go b/rtmp.go deleted file mode 100644 index 98ffbaf87..000000000 --- a/rtmp.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "io" - "net" - "strconv" - - "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" - "github.com/yutopp/go-rtmp" -) - -func startRTMPService() { - port := 1935 - log.Printf("RTMP server is listening for incoming stream on port %d.\n", port) - - tcpAddr, err := net.ResolveTCPAddr("tcp", ":"+strconv.Itoa(port)) - if err != nil { - log.Panicf("Failed: %+v", err) - } - - listener, err := net.ListenTCP("tcp", tcpAddr) - if err != nil { - log.Panicf("Failed: %+v", err) - } - - srv := rtmp.NewServer(&rtmp.ServerConfig{ - OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) { - l := log.StandardLogger() - l.SetLevel(logrus.WarnLevel) - - h := &Handler{} - - return conn, &rtmp.ConnConfig{ - Handler: h, - - ControlState: rtmp.StreamControlStateConfig{ - DefaultBandwidthWindowSize: 6 * 1024 * 1024 / 8, - }, - - Logger: l, - } - }, - }) - if err := srv.Serve(listener); err != nil { - log.Panicf("Failed: %+v", err) - } -} diff --git a/scripts/build.sh b/scripts/build.sh index 291b8bc95..f5d4646ff 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -37,12 +37,12 @@ build() { echo "Building ${NAME} (${OS}/${ARCH}) release..." - mkdir -p dist/${NAME}/config + mkdir -p dist/${NAME} mkdir -p dist/${NAME}/webroot/static mkdir -p dist/${NAME}/static # Default files - cp config/config-example.yaml dist/${NAME}/config/config.yaml + cp config-example.yaml dist/${NAME}/config.yaml cp webroot/static/content-example.md dist/${NAME}/webroot/static/content.md cp webroot/img/logo.png dist/${NAME}/static/logo.png @@ -50,7 +50,7 @@ build() { cp -R doc/ dist/${NAME}/doc/ cp README.md dist/${NAME} - env CGO_ENABLED=0 GOOS=$OS GOARCH=$ARCH go build -ldflags "-X main.GitCommit=$GIT_COMMIT -X main.BuildVersion=$VERSION -X main.BuildType=$NAME" -a -o dist/$NAME/owncast + env CGO_ENABLED=0 GOOS=$OS GOARCH=$ARCH go build -ldflags "-X core.GitCommit=$GIT_COMMIT -X core.BuildVersion=$VERSION -X core.BuildType=$NAME" -a -o dist/$NAME/owncast pushd dist/${NAME} >> /dev/null zip -r -q -8 ../owncast-$NAME-$VERSION.zip . diff --git a/server.go b/server.go deleted file mode 100644 index 4d4e63c59..000000000 --- a/server.go +++ /dev/null @@ -1,164 +0,0 @@ -package main - -import ( - "fmt" - "net/http" - "time" - - log "github.com/sirupsen/logrus" - - "golang.org/x/net/websocket" -) - -// Chat server. -type Server struct { - pattern string - messages []*ChatMessage - clients map[string]*Client - addCh chan *Client - delCh chan *Client - sendAllCh chan *ChatMessage - pingCh chan *PingMessage - doneCh chan bool - errCh chan error -} - -// Create new chat server. -func NewServer(pattern string) *Server { - messages := []*ChatMessage{} - clients := make(map[string]*Client) - addCh := make(chan *Client) - delCh := make(chan *Client) - sendAllCh := make(chan *ChatMessage) - pingCh := make(chan *PingMessage) - doneCh := make(chan bool) - errCh := make(chan error) - - // Demo messages only. Remove me eventually!!! - messages = append(messages, &ChatMessage{"Tom Nook", "I'll be there with Bells on! Ho ho!", "https://gamepedia.cursecdn.com/animalcrossingpocketcamp_gamepedia_en/thumb/4/4f/Timmy_Icon.png/120px-Timmy_Icon.png?version=87b38d7d6130411d113486c2db151385", "demo-message-1", "ChatMessage"}) - messages = append(messages, &ChatMessage{"Redd", "Fool me once, shame on you. Fool me twice, stop foolin' me.", "https://vignette.wikia.nocookie.net/animalcrossing/images/3/3d/Redd2.gif/revision/latest?cb=20100710004252", "demo-message-2", "ChatMessage"}) - messages = append(messages, &ChatMessage{"Kevin", "You just caught me before I was about to go work out weeweewee!", "https://vignette.wikia.nocookie.net/animalcrossing/images/2/20/NH-Kevin_poster.png/revision/latest/scale-to-width-down/100?cb=20200410185817", "demo-message-3", "ChatMessage"}) - messages = append(messages, &ChatMessage{"Isabelle", " Isabelle is the mayor's highly capable secretary. She can be forgetful sometimes, but you can always count on her for information about the town. She wears her hair up in a bun that makes her look like a shih tzu. Mostly because she is one! She also has a twin brother named Digby.", "https://dodo.ac/np/images/thumb/7/7b/IsabelleTrophyWiiU.png/200px-IsabelleTrophyWiiU.png", "demo-message-4", "ChatMessage"}) - messages = append(messages, &ChatMessage{"Judy", "myohmy, I'm dancing my dreams away.", "https://vignette.wikia.nocookie.net/animalcrossing/images/5/50/NH-Judy_poster.png/revision/latest/scale-to-width-down/100?cb=20200522063219", "demo-message-5", "ChatMessage"}) - messages = append(messages, &ChatMessage{"Blathers", "Blathers is an owl with brown feathers. His face is white and he has a yellow beak. His arms are wing shaped and he has yellow talons. His eyes are very big with small black irises. He also has big pink cheek circles on his cheeks. His belly appears to be checkered in diamonds with light brown and white squares, similar to an argyle vest, which is traditionally associated with academia. His green bowtie further alludes to his academic nature.", "https://vignette.wikia.nocookie.net/animalcrossing/images/b/b3/NH-character-Blathers.png/revision/latest?cb=20200229053519", "demo-message-6", "ChatMessage"}) - - server := &Server{ - pattern, - messages, - clients, - addCh, - delCh, - sendAllCh, - pingCh, - doneCh, - errCh, - } - - ticker := time.NewTicker(30 * time.Second) - go func() { - for { - select { - case <-ticker.C: - server.ping() - } - } - }() - - return server -} - -func (s *Server) ClientCount() int { - return len(s.clients) -} - -func (s *Server) Add(c *Client) { - s.addCh <- c -} - -func (s *Server) Del(c *Client) { - s.delCh <- c -} - -func (s *Server) SendAll(msg *ChatMessage) { - s.sendAllCh <- msg -} - -func (s *Server) Done() { - s.doneCh <- true -} - -func (s *Server) Err(err error) { - s.errCh <- err -} - -func (s *Server) sendPastMessages(c *Client) { - for _, msg := range s.messages { - c.Write(msg) - } -} - -func (s *Server) sendAll(msg *ChatMessage) { - for _, c := range s.clients { - c.Write(msg) - } -} - -func (s *Server) ping() { - // fmt.Println("Start pinging....", len(s.clients)) - - ping := &PingMessage{"PING"} - for _, c := range s.clients { - c.pingch <- ping - } -} - -// Listen and serve. -// It serves client connection and broadcast request. -func (s *Server) Listen() { - // websocket handler - onConnected := func(ws *websocket.Conn) { - defer func() { - err := ws.Close() - if err != nil { - s.errCh <- err - } - }() - - client := NewClient(ws, s) - s.Add(client) - client.Listen() - } - http.Handle(s.pattern, websocket.Handler(onConnected)) - - for { - select { - - // Add new a client - case c := <-s.addCh: - s.clients[c.id] = c - viewerAdded(c.id) - s.sendPastMessages(c) - - // del a client - case c := <-s.delCh: - delete(s.clients, c.id) - viewerRemoved(c.id) - - // broadcast message for all clients - case msg := <-s.sendAllCh: - log.Println("Send all:", msg) - s.messages = append(s.messages, msg) - s.sendAll(msg) - - case ping := <-s.pingCh: - fmt.Println("PING?", ping) - - case err := <-s.errCh: - log.Println("Error:", err.Error()) - - case <-s.doneCh: - return - } - } - -} diff --git a/stats.go b/stats.go deleted file mode 100644 index 4d12a70d1..000000000 --- a/stats.go +++ /dev/null @@ -1,152 +0,0 @@ -/* -Viewer counting doesn't just count the number of websocket clients that are currently connected, -because people may be watching the stream outside of the web browser via any HLS video client. -Instead we keep track of requests and consider each unique IP as a "viewer". -As a signal, however, we do use the websocket disconnect from a client as a signal that a viewer -dropped and we call ViewerDisconnected(). -*/ - -package main - -import ( - "encoding/json" - "io/ioutil" - "math" - "os" - "time" - - log "github.com/sirupsen/logrus" -) - -type Stats struct { - streamConnected bool `json:"-"` - SessionMaxViewerCount int `json:"sessionMaxViewerCount"` - OverallMaxViewerCount int `json:"overallMaxViewerCount"` - LastDisconnectTime time.Time `json:"lastDisconnectTime"` - lastConnectTime time.Time `json:"-"` - clients map[string]time.Time -} - -func (s *Stats) Setup() { - s.clients = make(map[string]time.Time) - - statsSaveTimer := time.NewTicker(1 * time.Minute) - go func() { - for { - select { - case <-statsSaveTimer.C: - s.save() - } - } - }() - - staleViewerPurgeTimer := time.NewTicker(3 * time.Second) - go func() { - for { - select { - case <-staleViewerPurgeTimer.C: - s.purgeStaleViewers() - } - } - }() -} - -func (s *Stats) purgeStaleViewers() { - for clientID, lastConnectedtime := range s.clients { - timeSinceLastActive := time.Since(lastConnectedtime).Minutes() - if timeSinceLastActive > 2 { - s.ViewerDisconnected(clientID) - } - - } -} - -func (s *Stats) IsStreamConnected() bool { - if !s.streamConnected { - return false - } - - // Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available. - // So account for that with an artificial buffer. - timeSinceLastConnected := time.Since(s.lastConnectTime).Seconds() - if timeSinceLastConnected < 10 { - return false - } - - return s.streamConnected -} - -func (s *Stats) GetViewerCount() int { - return len(s.clients) -} - -func (s *Stats) GetSessionMaxViewerCount() int { - return s.SessionMaxViewerCount -} - -func (s *Stats) GetOverallMaxViewerCount() int { - return s.OverallMaxViewerCount -} - -func (s *Stats) SetClientActive(clientID string) { - // if _, ok := s.clients[clientID]; !ok { - // fmt.Println("Marking client active:", clientID, s.GetViewerCount()+1, "clients connected.") - // } - - s.clients[clientID] = time.Now() - s.SessionMaxViewerCount = int(math.Max(float64(s.GetViewerCount()), float64(s.SessionMaxViewerCount))) - s.OverallMaxViewerCount = int(math.Max(float64(s.SessionMaxViewerCount), float64(s.OverallMaxViewerCount))) - -} - -func (s *Stats) ViewerDisconnected(clientID string) { - log.Println("Removed client", clientID) - - delete(s.clients, clientID) -} - -func (s *Stats) StreamConnected() { - s.streamConnected = true - s.lastConnectTime = time.Now() - - timeSinceDisconnect := time.Since(s.LastDisconnectTime).Minutes() - if timeSinceDisconnect > 15 { - s.SessionMaxViewerCount = 0 - } -} - -func (s *Stats) StreamDisconnected() { - s.streamConnected = false - s.LastDisconnectTime = time.Now() -} - -func (s *Stats) save() { - jsonData, err := json.Marshal(&s) - verifyError(err) - - f, err := os.Create("config/stats.json") - defer f.Close() - - verifyError(err) - - _, err = f.Write(jsonData) - verifyError(err) -} - -func getSavedStats() *Stats { - filePath := "config/stats.json" - - if !fileExists(filePath) { - return &Stats{} - } - - jsonFile, err := ioutil.ReadFile(filePath) - - var stats Stats - err = json.Unmarshal(jsonFile, &stats) - if err != nil { - log.Panicln(err) - } - - return &stats -} diff --git a/utils.go b/utils.go deleted file mode 100644 index 91af770f3..000000000 --- a/utils.go +++ /dev/null @@ -1,104 +0,0 @@ -package main - -import ( - "fmt" - "io/ioutil" - "net/http" - "os" - "path" - "path/filepath" - "strconv" - "strings" - - log "github.com/sirupsen/logrus" -) - -func getTempPipePath() string { - return filepath.Join(os.TempDir(), "streampipe.flv") -} - -func fileExists(name string) bool { - if _, err := os.Stat(name); err != nil { - if os.IsNotExist(err) { - return false - } - } - return true -} - -func getRelativePathFromAbsolutePath(path string) string { - pathComponents := strings.Split(path, "/") - variant := pathComponents[len(pathComponents)-2] - file := pathComponents[len(pathComponents)-1] - return filepath.Join(variant, file) -} - -func verifyError(e error) { - if e != nil { - log.Panic(e) - } -} - -func copy(src, dst string) { - input, err := ioutil.ReadFile(src) - if err != nil { - fmt.Println(err) - return - } - - if err := ioutil.WriteFile(dst, input, 0644); err != nil { - fmt.Println("Error creating", dst) - fmt.Println(err) - return - } -} - -func resetDirectories(configuration Config) { - log.Println("Resetting file directories to a clean slate.") - - // Wipe the public, web-accessible hls data directory - os.RemoveAll(configuration.PublicHLSPath) - os.RemoveAll(configuration.PrivateHLSPath) - os.MkdirAll(configuration.PublicHLSPath, 0777) - os.MkdirAll(configuration.PrivateHLSPath, 0777) - - // Remove the previous thumbnail - os.Remove("webroot/thumbnail.jpg") - - // Create private hls data dirs - if !configuration.VideoSettings.EnablePassthrough || len(configuration.VideoSettings.StreamQualities) == 0 { - for index := range configuration.VideoSettings.StreamQualities { - os.MkdirAll(path.Join(configuration.PrivateHLSPath, strconv.Itoa(index)), 0777) - os.MkdirAll(path.Join(configuration.PublicHLSPath, strconv.Itoa(index)), 0777) - } - } else { - os.MkdirAll(path.Join(configuration.PrivateHLSPath, strconv.Itoa(0)), 0777) - os.MkdirAll(path.Join(configuration.PublicHLSPath, strconv.Itoa(0)), 0777) - } -} - -func createInitialOfflineState() { - // Provide default files - if !fileExists("webroot/thumbnail.jpg") { - copy("static/logo.png", "webroot/thumbnail.jpg") - } - - showStreamOfflineState(configuration) -} - -func getClientIDFromRequest(req *http.Request) string { - var clientID string - xForwardedFor := req.Header.Get("X-FORWARDED-FOR") - if xForwardedFor != "" { - clientID = xForwardedFor - } else { - ipAddressString := req.RemoteAddr - ipAddressComponents := strings.Split(ipAddressString, ":") - ipAddressComponents[len(ipAddressComponents)-1] = "" - clientID = strings.Join(ipAddressComponents, ":") - } - - // fmt.Println("IP address determined to be", ipAddress) - - return clientID + req.UserAgent() -} diff --git a/utils/clientId.go b/utils/clientId.go new file mode 100644 index 000000000..65b8c2aaa --- /dev/null +++ b/utils/clientId.go @@ -0,0 +1,25 @@ +package utils + +import ( + "net/http" + "strings" +) + +//GenerateClientIDFromRequest generates a client id from the provided request +func GenerateClientIDFromRequest(req *http.Request) string { + var clientID string + + xForwardedFor := req.Header.Get("X-FORWARDED-FOR") + if xForwardedFor != "" { + clientID = xForwardedFor + } else { + ipAddressString := req.RemoteAddr + ipAddressComponents := strings.Split(ipAddressString, ":") + ipAddressComponents[len(ipAddressComponents)-1] = "" + clientID = strings.Join(ipAddressComponents, ":") + } + + // fmt.Println("IP address determined to be", ipAddress) + + return clientID + req.UserAgent() +} diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 000000000..43b211d6e --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,43 @@ +package utils + +import ( + "io/ioutil" + "os" + "path/filepath" + "strings" +) + +//GetTemporaryPipePath gets the temporary path for the streampipe.flv file +func GetTemporaryPipePath() string { + return filepath.Join(os.TempDir(), "streampipe.flv") +} + +//DoesFileExists checks if the file exists +func DoesFileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + + return true +} + +//GetRelativePathFromAbsolutePath gets the relative path from the provided absolute path +func GetRelativePathFromAbsolutePath(path string) string { + pathComponents := strings.Split(path, "/") + variant := pathComponents[len(pathComponents)-2] + file := pathComponents[len(pathComponents)-1] + + return filepath.Join(variant, file) +} + +//Copy copies the +func Copy(source, destination string) error { + input, err := ioutil.ReadFile(source) + if err != nil { + return err + } + + return ioutil.WriteFile(destination, input, 0644) +}