Start cleaning up linter errors. (#358)
* Start cleaning up linter errors. For #357 * Fix unmarshalling NullTime values * More linter fixes * Remove commented code * Move defer up * Consolidate error check lines * Move error check to make sure row iteration was successful * Cleaner error check + do not recreate pipe if it exists * Consolidate hashing to generate client id
This commit is contained in:
@@ -40,11 +40,8 @@ func Start() error {
|
||||
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
_server.ping()
|
||||
}
|
||||
for range ticker.C {
|
||||
_server.ping()
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -94,13 +94,21 @@ func (c *Client) listenWrite() {
|
||||
select {
|
||||
// Send a PING keepalive
|
||||
case msg := <-c.pingch:
|
||||
websocket.JSON.Send(c.ws, msg)
|
||||
err := websocket.JSON.Send(c.ws, msg)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
}
|
||||
// send message to the client
|
||||
case msg := <-c.ch:
|
||||
// log.Println("Send:", msg)
|
||||
websocket.JSON.Send(c.ws, msg)
|
||||
err := websocket.JSON.Send(c.ws, msg)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
}
|
||||
case msg := <-c.usernameChangeChannel:
|
||||
websocket.JSON.Send(c.ws, msg)
|
||||
err := websocket.JSON.Send(c.ws, msg)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
}
|
||||
// receive done request
|
||||
case <-c.doneCh:
|
||||
_server.remove(c)
|
||||
@@ -114,7 +122,6 @@ func (c *Client) listenWrite() {
|
||||
func (c *Client) listenRead() {
|
||||
for {
|
||||
select {
|
||||
|
||||
// receive done request
|
||||
case <-c.doneCh:
|
||||
_server.remove(c)
|
||||
|
||||
@@ -32,7 +32,10 @@ func createTable() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
stmt.Exec()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
func addMessage(message models.ChatMessage) {
|
||||
@@ -41,16 +44,20 @@ func addMessage(message models.ChatMessage) {
|
||||
log.Fatal(err)
|
||||
}
|
||||
stmt, err := tx.Prepare("INSERT INTO messages(id, author, body, messageType, visible, timestamp) values(?, ?, ?, ?, ?, ?)")
|
||||
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
_, err = stmt.Exec(message.ID, message.Author, message.Body, message.MessageType, 1, message.Timestamp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
tx.Commit()
|
||||
|
||||
defer stmt.Close()
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func getChatHistory() []models.ChatMessage {
|
||||
@@ -89,5 +96,9 @@ func getChatHistory() []models.ChatMessage {
|
||||
history = append(history, message)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
return history
|
||||
}
|
||||
|
||||
@@ -46,11 +46,6 @@ func (s *server) SendToAll(msg models.ChatMessage) {
|
||||
s.sendAllCh <- msg
|
||||
}
|
||||
|
||||
// Done marks the server as done.
|
||||
func (s *server) done() {
|
||||
s.doneCh <- true
|
||||
}
|
||||
|
||||
// Err handles an error.
|
||||
func (s *server) err(err error) {
|
||||
s.errCh <- err
|
||||
@@ -140,18 +135,7 @@ func (s *server) sendWelcomeMessageToClient(c *Client) {
|
||||
time.Sleep(7 * time.Second)
|
||||
|
||||
initialChatMessageText := fmt.Sprintf("Welcome to %s! %s", config.Config.InstanceDetails.Title, config.Config.InstanceDetails.Summary)
|
||||
initialMessage := models.ChatMessage{"owncast-server", config.Config.InstanceDetails.Name, initialChatMessageText, "initial-message-1", "SYSTEM", true, time.Now()}
|
||||
initialMessage := models.ChatMessage{ClientID: "owncast-server", Author: config.Config.InstanceDetails.Name, Body: initialChatMessageText, ID: "initial-message-1", MessageType: "SYSTEM", Visible: true, Timestamp: time.Now()}
|
||||
c.Write(initialMessage)
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
func (s *server) getClientForClientID(clientID string) *Client {
|
||||
for _, client := range s.Clients {
|
||||
if client.ClientID == clientID {
|
||||
return client
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
43
core/core.go
43
core/core.go
@@ -99,7 +99,10 @@ func transitionToOfflineVideoStreamContent() {
|
||||
_transcoder.Start()
|
||||
|
||||
// Copy the logo to be the thumbnail
|
||||
utils.Copy(filepath.Join("webroot", config.Config.InstanceDetails.Logo.Large), "webroot/thumbnail.jpg")
|
||||
err := utils.Copy(filepath.Join("webroot", config.Config.InstanceDetails.Logo.Large), "webroot/thumbnail.jpg")
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
|
||||
// Delete the preview Gif
|
||||
os.Remove(path.Join(config.WebRoot, "preview.gif"))
|
||||
@@ -111,8 +114,15 @@ func resetDirectories() {
|
||||
// Wipe the public, web-accessible hls data directory
|
||||
os.RemoveAll(config.PublicHLSStoragePath)
|
||||
os.RemoveAll(config.PrivateHLSStoragePath)
|
||||
os.MkdirAll(config.PublicHLSStoragePath, 0777)
|
||||
os.MkdirAll(config.PrivateHLSStoragePath, 0777)
|
||||
err := os.MkdirAll(config.PublicHLSStoragePath, 0777)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
err = os.MkdirAll(config.PrivateHLSStoragePath, 0777)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
// Remove the previous thumbnail
|
||||
os.Remove(filepath.Join(config.WebRoot, "thumbnail.jpg"))
|
||||
@@ -120,14 +130,31 @@ func resetDirectories() {
|
||||
// Create private hls data dirs
|
||||
if len(config.Config.VideoSettings.StreamQualities) != 0 {
|
||||
for index := range config.Config.VideoSettings.StreamQualities {
|
||||
os.MkdirAll(path.Join(config.PrivateHLSStoragePath, strconv.Itoa(index)), 0777)
|
||||
os.MkdirAll(path.Join(config.PublicHLSStoragePath, strconv.Itoa(index)), 0777)
|
||||
err = os.MkdirAll(path.Join(config.PrivateHLSStoragePath, strconv.Itoa(index)), 0777)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
err = os.MkdirAll(path.Join(config.PublicHLSStoragePath, strconv.Itoa(index)), 0777)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
os.MkdirAll(path.Join(config.PrivateHLSStoragePath, strconv.Itoa(0)), 0777)
|
||||
os.MkdirAll(path.Join(config.PublicHLSStoragePath, strconv.Itoa(0)), 0777)
|
||||
err = os.MkdirAll(path.Join(config.PrivateHLSStoragePath, strconv.Itoa(0)), 0777)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
err = os.MkdirAll(path.Join(config.PublicHLSStoragePath, strconv.Itoa(0)), 0777)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the previous thumbnail
|
||||
utils.Copy(config.Config.InstanceDetails.Logo.Large, "webroot/thumbnail.jpg")
|
||||
err = utils.Copy(path.Join(config.WebRoot, config.Config.InstanceDetails.Logo.Large), "webroot/thumbnail.jpg")
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,14 +60,14 @@ func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http
|
||||
|
||||
f, err := os.Create(writePath)
|
||||
if err != nil {
|
||||
returnError(err, w, r)
|
||||
returnError(err, w)
|
||||
return
|
||||
}
|
||||
|
||||
defer f.Close()
|
||||
_, err = f.Write(data)
|
||||
if err != nil {
|
||||
returnError(err, w, r)
|
||||
returnError(err, w)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -82,7 +82,6 @@ func (s *FileWriterReceiverService) fileWritten(path string) {
|
||||
|
||||
if utils.GetRelativePathFromAbsolutePath(path) == "hls/stream.m3u8" {
|
||||
s.callbacks.MasterPlaylistWritten(path)
|
||||
|
||||
} else if strings.HasSuffix(path, ".ts") {
|
||||
performanceMonitorKey := "segmentWritten-" + index
|
||||
averagePerformance := utils.GetAveragePerformance(performanceMonitorKey)
|
||||
@@ -98,13 +97,12 @@ func (s *FileWriterReceiverService) fileWritten(path string) {
|
||||
} else {
|
||||
_inWarningState = false
|
||||
}
|
||||
|
||||
} else if strings.HasSuffix(path, ".m3u8") {
|
||||
s.callbacks.VariantPlaylistWritten(path)
|
||||
}
|
||||
}
|
||||
|
||||
func returnError(err error, w http.ResponseWriter, r *http.Request) {
|
||||
func returnError(err error, w http.ResponseWriter) {
|
||||
log.Errorln(err)
|
||||
http.Error(w, http.StatusText(http.StatusInternalServerError)+": "+err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
@@ -90,7 +90,12 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) {
|
||||
log.Infoln("Incoming RTMP connected.")
|
||||
|
||||
pipePath := utils.GetTemporaryPipePath()
|
||||
syscall.Mkfifo(pipePath, 0666)
|
||||
if !utils.DoesFileExists(pipePath) {
|
||||
err := syscall.Mkfifo(pipePath, 0666)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
}
|
||||
|
||||
_hasInboundRTMPConnection = true
|
||||
_setStreamAsConnected()
|
||||
@@ -119,7 +124,6 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func handleDisconnect(conn net.Conn) {
|
||||
|
||||
@@ -6,8 +6,8 @@ import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/owncast/owncast/models"
|
||||
"github.com/nareix/joy5/format/flv/flvio"
|
||||
"github.com/owncast/owncast/models"
|
||||
)
|
||||
|
||||
func getInboundDetailsFromMetadata(metadata []interface{}) (models.RTMPStreamMetadata, error) {
|
||||
@@ -21,8 +21,8 @@ func getInboundDetailsFromMetadata(metadata []interface{}) (models.RTMPStreamMet
|
||||
|
||||
metadataJSONString := submatchall[0]
|
||||
var details models.RTMPStreamMetadata
|
||||
json.Unmarshal([]byte(metadataJSONString), &details)
|
||||
return details, nil
|
||||
err := json.Unmarshal([]byte(metadataJSONString), &details)
|
||||
return details, err
|
||||
}
|
||||
|
||||
func getAudioCodec(codec interface{}) string {
|
||||
|
||||
@@ -29,12 +29,9 @@ func setupStats() error {
|
||||
|
||||
statsSaveTimer := time.NewTicker(1 * time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-statsSaveTimer.C:
|
||||
if err := saveStatsToFile(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for range statsSaveTimer.C {
|
||||
if err := saveStatsToFile(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -137,12 +134,12 @@ func getSavedStats() (models.Stats, error) {
|
||||
|
||||
jsonFile, err := ioutil.ReadFile(config.StatsFile)
|
||||
if err != nil {
|
||||
return result, nil
|
||||
return result, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(jsonFile, &result); err != nil {
|
||||
return result, nil
|
||||
return result, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return result, err
|
||||
}
|
||||
|
||||
@@ -5,10 +5,6 @@ import (
|
||||
"github.com/owncast/owncast/core/storageproviders"
|
||||
)
|
||||
|
||||
var (
|
||||
usingExternalStorage = false
|
||||
)
|
||||
|
||||
func setupStorage() error {
|
||||
handler.Storage = _storage
|
||||
|
||||
|
||||
@@ -23,11 +23,8 @@ func (s *LocalStorage) Setup() error {
|
||||
// as all HLS segments have to be publicly available on disk to keep a recording of them.
|
||||
_onlineCleanupTicker = time.NewTicker(1 * time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-_onlineCleanupTicker.C:
|
||||
ffmpeg.CleanupOldContent(config.PublicHLSStoragePath)
|
||||
}
|
||||
for range _onlineCleanupTicker.C {
|
||||
ffmpeg.CleanupOldContent(config.PublicHLSStoragePath)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
@@ -35,7 +32,10 @@ func (s *LocalStorage) Setup() error {
|
||||
|
||||
// SegmentWritten is called when a single segment of video is written.
|
||||
func (s *LocalStorage) SegmentWritten(localFilePath string) {
|
||||
s.Save(localFilePath, 0)
|
||||
_, err := s.Save(localFilePath, 0)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
// VariantPlaylistWritten is called when a variant hls playlist is written.
|
||||
@@ -49,7 +49,10 @@ func (s *LocalStorage) VariantPlaylistWritten(localFilePath string) {
|
||||
|
||||
// MasterPlaylistWritten is called when the master hls playlist is written.
|
||||
func (s *LocalStorage) MasterPlaylistWritten(localFilePath string) {
|
||||
s.Save(localFilePath, 0)
|
||||
_, err := s.Save(localFilePath, 0)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Save will save a local filepath using the storage provider.
|
||||
@@ -63,7 +66,6 @@ func (s *LocalStorage) Save(filePath string, retryCount int) (string, error) {
|
||||
newPath = filepath.Join(config.WebRoot, filePath)
|
||||
}
|
||||
|
||||
utils.Copy(filePath, newPath)
|
||||
|
||||
return newPath, nil
|
||||
err := utils.Copy(filePath, newPath)
|
||||
return newPath, err
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
|
||||
// If we try to upload a playlist but it is not yet on disk
|
||||
// then keep a reference to it here.
|
||||
var _queuedPlaylistUpdates = make(map[string]string, 0)
|
||||
var _queuedPlaylistUpdates = make(map[string]string)
|
||||
|
||||
// S3Storage is the s3 implementation of the ChunkStorageProvider.
|
||||
type S3Storage struct {
|
||||
@@ -118,7 +118,10 @@ func (s *S3Storage) VariantPlaylistWritten(localFilePath string) {
|
||||
// MasterPlaylistWritten is called when the master hls playlist is written.
|
||||
func (s *S3Storage) MasterPlaylistWritten(localFilePath string) {
|
||||
// Rewrite the playlist to use absolute remote S3 URLs
|
||||
s.rewriteRemotePlaylist(localFilePath)
|
||||
err := s.rewriteRemotePlaylist(localFilePath)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Save saves the file to the s3 bucket.
|
||||
@@ -192,6 +195,9 @@ func (s *S3Storage) rewriteRemotePlaylist(filePath string) error {
|
||||
|
||||
p := m3u8.NewMasterPlaylist()
|
||||
err = p.DecodeFrom(bufio.NewReader(f), false)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
|
||||
for _, item := range p.Variants {
|
||||
item.URI = s.host + filepath.Join("/hls", item.URI)
|
||||
|
||||
@@ -26,8 +26,8 @@ var _onlineCleanupTicker *time.Ticker
|
||||
// setStreamAsConnected sets the stream as connected.
|
||||
func setStreamAsConnected() {
|
||||
_stats.StreamConnected = true
|
||||
_stats.LastConnectTime = utils.NullTime{time.Now(), true}
|
||||
_stats.LastDisconnectTime = utils.NullTime{time.Now(), false}
|
||||
_stats.LastConnectTime = utils.NullTime{Time: time.Now(), Valid: true}
|
||||
_stats.LastDisconnectTime = utils.NullTime{Time: time.Now(), Valid: false}
|
||||
|
||||
StopOfflineCleanupTimer()
|
||||
startOnlineCleanupTimer()
|
||||
@@ -44,7 +44,6 @@ func setStreamAsConnected() {
|
||||
go func() {
|
||||
_transcoder = ffmpeg.NewTranscoder()
|
||||
_transcoder.TranscoderCompleted = func(error) {
|
||||
|
||||
SetStreamAsDisconnected()
|
||||
}
|
||||
_transcoder.Start()
|
||||
@@ -56,7 +55,7 @@ func setStreamAsConnected() {
|
||||
// SetStreamAsDisconnected sets the stream as disconnected.
|
||||
func SetStreamAsDisconnected() {
|
||||
_stats.StreamConnected = false
|
||||
_stats.LastDisconnectTime = utils.NullTime{time.Now(), true}
|
||||
_stats.LastDisconnectTime = utils.NullTime{Time: time.Now(), Valid: true}
|
||||
_broadcaster = nil
|
||||
|
||||
offlineFilename := "offline.ts"
|
||||
@@ -73,9 +72,14 @@ func SetStreamAsDisconnected() {
|
||||
playlistFilePath := fmt.Sprintf(filepath.Join(config.PrivateHLSStoragePath, "%d/stream.m3u8"), index)
|
||||
segmentFilePath := fmt.Sprintf(filepath.Join(config.PrivateHLSStoragePath, "%d/%s"), index, offlineFilename)
|
||||
|
||||
utils.Copy(offlineFilePath, segmentFilePath)
|
||||
_storage.Save(segmentFilePath, 0)
|
||||
|
||||
err := utils.Copy(offlineFilePath, segmentFilePath)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
_, err = _storage.Save(segmentFilePath, 0)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
if utils.DoesFileExists(playlistFilePath) {
|
||||
f, err := os.OpenFile(playlistFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm)
|
||||
if err != nil {
|
||||
@@ -84,13 +88,23 @@ func SetStreamAsDisconnected() {
|
||||
defer f.Close()
|
||||
|
||||
playlist, _, err := m3u8.DecodeFrom(bufio.NewReader(f), true)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
variantPlaylist := playlist.(*m3u8.MediaPlaylist)
|
||||
if len(variantPlaylist.Segments) > config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist() {
|
||||
variantPlaylist.Segments = variantPlaylist.Segments[:len(variantPlaylist.Segments)]
|
||||
}
|
||||
|
||||
err = variantPlaylist.Append(offlineFilename, 8.0, "")
|
||||
variantPlaylist.SetDiscontinuity()
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
err = variantPlaylist.SetDiscontinuity()
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
_, err = f.WriteAt(variantPlaylist.Encode().Bytes(), 0)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
@@ -118,7 +132,10 @@ func SetStreamAsDisconnected() {
|
||||
log.Errorln(err)
|
||||
}
|
||||
}
|
||||
_storage.Save(playlistFilePath, 0)
|
||||
_, err = _storage.Save(playlistFilePath, 0)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
StartOfflineCleanupTimer()
|
||||
@@ -129,14 +146,11 @@ func SetStreamAsDisconnected() {
|
||||
func StartOfflineCleanupTimer() {
|
||||
_offlineCleanupTimer = time.NewTimer(5 * time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-_offlineCleanupTimer.C:
|
||||
// Reset the session count since the session is over
|
||||
_stats.SessionMaxViewerCount = 0
|
||||
resetDirectories()
|
||||
transitionToOfflineVideoStreamContent()
|
||||
}
|
||||
for range _offlineCleanupTimer.C {
|
||||
// Reset the session count since the session is over
|
||||
_stats.SessionMaxViewerCount = 0
|
||||
resetDirectories()
|
||||
transitionToOfflineVideoStreamContent()
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -151,11 +165,8 @@ func StopOfflineCleanupTimer() {
|
||||
func startOnlineCleanupTimer() {
|
||||
_onlineCleanupTicker = time.NewTicker(1 * time.Minute)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-_onlineCleanupTicker.C:
|
||||
ffmpeg.CleanupOldContent(config.PrivateHLSStoragePath)
|
||||
}
|
||||
for range _onlineCleanupTicker.C {
|
||||
ffmpeg.CleanupOldContent(config.PrivateHLSStoragePath)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user