0.0.6 -> Master (#731)

* Implement webhook events for external integrations (#574)

* Implement webhook events for external integrations

Reference #556

* move message type to models and remove duplicate

* add json header so content type can be determined

* Pass at migrating webhooks to datastore + management apis (#589)

* Pass at migrating webhooks to datastore + management apis

* Support nil lastUsed timestamps and return back the new webhook on create

* Cleanup from review feedback

* Simplify a bit

Co-authored-by: Aaron Ogle <aaron@geekgonecrazy.com>

Co-authored-by: Gabe Kangas <gabek@real-ity.com>

* Webhook query cleanup

* Access tokens + Send system message external API (#585)

* New add, get and delete access token APIs

* Create auth token middleware

* Update last_used timestamp when using an access token

* Add auth'ed endpoint for sending system messages

* Cleanup

* Update api spec for new apis

* Commit updated API documentation

* Add auth'ed endpoint for sending user chat messages

* Return access token string

* Commit updated API documentation

* Fix route

* Support nil lastUsed time

* Commit updated Javascript packages

* Remove duplicate function post rebase

* Fix msg id generation

* Update controllers/admin/chat.go

Co-authored-by: Aaron Ogle <geekgonecrazy@users.noreply.github.com>

* Webhook query cleanup

* Add SystemMessageSent to EventType

Co-authored-by: Owncast <owncast@owncast.online>
Co-authored-by: Aaron Ogle <geekgonecrazy@users.noreply.github.com>

* Set webhook as used on completion. Closes #610

* Display webhook errors as errors

* Commit updated API documentation

* Add user joined chat event

* Change integration API paths. Update API spec

* Update development version of admin that supports integration apis

* Commit updated API documentation

* Add automated tests for external integration APIs

* check error

* quiet this test for now

* Route up some additional 3rd party apis. #638

* Commit updated API documentation

* Save username on user joined event

* Add missing scope to valid scopes list

* Add generic chat action event API for 3rd parties. Closes #666

* Commit updated API documentation

* First pass at moving WIP config framework into project for #234

* Only support exported fields in custom types

* Using YP get/set key as a first pass at using the data layer. Fixes + integration.

* Ignore test db

* Start adding getters and setters for config values

* More get/set config work. Starting to populate api with data

* Wire up some config edit endpoints

* More endpoints

* Disable cors middleware

* Add more endpoints and add test to test them

* Remove the in-memory change APIs

* Add endpoint for changing tags

* Add more config endpoints

* Starting to point more things away from config file and to the datastore

* Populate YP with db data

* Create new util method for parsing page body markdown and return it in api

* Verify proposed path to ffmpeg

* For development purposes show the config key in logs

* Move stats values to datastore

* Moving over more values to the datastore

* Move S3 config to datastore

* First pass the config -> db migrator

* Add the start of the video config apis

* It builds pointing everything away from the config

* Tweak ffmpeg path error message

* Backup database every hour. Closes #549

* Config + defaults + migration work for db

* Cleanup logging

* Remove all the old config structs

* Add descriptive info about migration

* Tweak ffmpeg validation logic

* Fix db backup path. backup on db version migration

* Set video and s3 configurations

* Update api spec with new config endpoints

* Add migrator for stats file

* Commit updated API documentation

* Use a dynamic system port for internal HLS writes. Closes #577 (#626)

* Use a dynamic system port for internal HLS writes. Closes #577

* Cleanup

* YP key migration to datastore

* Create a backup directory if needed before migrations

* Remove config test that no longer makes sense. Cleanup.

* Change number types from float32 to float64

* Update automated test suite

* Allow restoring a database backup via command line flags. Closes #549

* Add new hls segment config api

* Commit updated API documentation

* Update apis to require a value container property

* add socialHandles api

* Commit updated API documentation

* Add new latancy level setting to replace segment settings

* Commit updated API documentation

* Fix spelling

* Commit updated API documentation

* hardcode a json api of available social platforms

* Add additional icons

* Return social handles in server config api

* Add socialhandles validation to test

* Move list of hard coded social platforms to an api

* Remove audio only code from transcoder since we do not use it

* Add latency levels api + snapshot of video settings as current broadcast

* Add config/serverurl endpoint

* Return 404 on YP api if disabled

* Surface stream title in YP response

* Add stream title to web ui

* Cleanup log message. Closes #520

* Rename ffmpeg package to transcoder

* Add ws package for testing

* Reduce chat backlog to past 5hrs, max 50. Closes #548

* Fix error formatting

* Add endpoint for resetting yp registration

* Add yp/reset to api spec. return status in response

* Return zero viewer count if stream is offline. Closes #422

* Post-rebase fixes

* Fix merge conflict in openapi file

* Commit updated API documentation

* Standardize controller names

* Support setting the stream key via the command line. Closes #665

* Return social handles with YP data. First half of https://github.com/owncast/owncast-yp/issues/28

* Give the YP package access to server status regardless if enabled or not

* Change delay in automated tests

* Add stream title integration API. For #638

* Commit updated API documentation

* Add storage to the migrator

* Missing returning NSFW value in server config

* Add flag to ignore websocket client. Closes #537

* Add error for parsing broadcaster metadata

* Add support for a cli specified http server port. Closes #674

* Add cpu usage levels and a temporary mapping between it and libx264 presets

* Test for valid url endpoint when saving s3 config

* Re-configure storage on every stream to allow changing storage providers

* After 5 minutes of a stream being stopped clear the stream title

* Hide viewer count once stream goes offline instead of when player stops

* Pull steamTitle from the status that gets updated instead of the config

* Commit updated API documentation

* Optionally show stream title in the header

* Reset stream title when server starts

* Show chat action when stream title is updated

* Allow system messages to come back in persistence

* Split out getting chat history for moderation + fix tests

* Remove server title and standardize on name only

* Commit updated API documentation

* Bump github.com/aws/aws-sdk-go from 1.37.1 to 1.37.2 (#680)

Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.37.1 to 1.37.2.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.37.1...v1.37.2)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Add video variant and stream latency config file migrator

* Remove mostly unused disable upgrade check bool

* Commit updated API documentation

* Allow bundling the admin from the 0.0.6 branch

* Fix saving port numbers

* Use name instead of old title on window focus

* Work on latency levels. Fix test to use levels. Clean up transcoder to only reference levels

* Another place where title -> name

* Fix test

* Bump github.com/aws/aws-sdk-go from 1.37.2 to 1.37.3 (#690)

Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.37.2 to 1.37.3.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.37.2...v1.37.3)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Update dependabot config

* Bump github.com/aws/aws-sdk-go from 1.37.3 to 1.37.5 (#693)

Bumps [github.com/aws/aws-sdk-go](https://github.com/aws/aws-sdk-go) from 1.37.3 to 1.37.5.
- [Release notes](https://github.com/aws/aws-sdk-go/releases)
- [Changelog](https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/aws/aws-sdk-go/compare/v1.37.3...v1.37.5)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump video.js from 7.10.2 to 7.11.4 in /build/javascript (#694)

* Bump video.js from 7.10.2 to 7.11.4 in /build/javascript

Bumps [video.js](https://github.com/videojs/video.js) from 7.10.2 to 7.11.4.
- [Release notes](https://github.com/videojs/video.js/releases)
- [Changelog](https://github.com/videojs/video.js/blob/main/CHANGELOG.md)
- [Commits](https://github.com/videojs/video.js/compare/v7.10.2...v7.11.4)

Signed-off-by: dependabot[bot] <support@github.com>

* Commit updated Javascript packages

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Owncast <owncast@owncast.online>

* Make the latency migrator dynamic so I can tweak values easier

* Split out fetching ffmpeg path from validating the path so it can be changed in the admin

* Some commenting and linter cleanup

* Validate the path for a logo change and throw an error if it does not exist

* Logo change requests have to be a real file now

* Cleanup, making linter happy

* Format javascript on push

* Only format js in master

* Tweak latency level values

* Remove unused config file examples

* Fix thumbnail generation after messing with the ffmpeg path getter

* Reduce how often we report high hardware utilization warnings

* Bundle the 0.0.6 branch version of the admin

* Return validated ffmpeg path in admin server config

* Change the logo to be stored in the data directory instead of webroot

* Bump postcss from 8.2.4 to 8.2.5 in /build/javascript (#702)

Bumps [postcss](https://github.com/postcss/postcss) from 8.2.4 to 8.2.5.
- [Release notes](https://github.com/postcss/postcss/releases)
- [Changelog](https://github.com/postcss/postcss/blob/main/CHANGELOG.md)
- [Commits](https://github.com/postcss/postcss/compare/8.2.4...8.2.5)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Default config file no longer used

* don't show stream title when offline

addresses https://github.com/owncast/owncast/issues/677

* Remove auto-clearing stream title. #677

* webroot -> data when using logo as thumbnail

* Do not list websocket/access token create/delete as integration APIs

* Commit updated API documentation

* Bundle updated admin

* Remove pointing to the 0.0.6 admin branch

* Linter cleanup

* Linter cleanup

* Add donations and follow links to show up under social handles

* Prettified Code!

* More linter cleanup

* Update admin bundle

* Remove use of platforms.js and return icons with social handles. Closes #732

* Update admin bundle

* Support custom config path for use in migration

* Remove unused platform-logos.gif

* Reduce log level of message

* Remove unused logo files in static dir

* Handle dev vs. release build info

* Restore logo.png for initial thumbnail

* Cleanup some files from the build process that are not needed

* Fix incorrect build-time injection var

* Fix missing file getting copied to the build

* Remove console directory message.

* Update admin bundle

* Fix comment

* Report storage setup error

* add some value set error checking

* Use validated dynamic ffmpeg path for animated gif preview

* Make chat message links be white so they don't hide in the bg. Closes #599

* Restore conditional that was accidentally removed

Co-authored-by: Aaron Ogle <geekgonecrazy@users.noreply.github.com>
Co-authored-by: Owncast <owncast@owncast.online>
Co-authored-by: Ginger Wong <omqmail@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: nebunez <uoj2y7wak869@opayq.net>
Co-authored-by: gabek <gabek@users.noreply.github.com>
This commit is contained in:
Gabe Kangas
2021-02-18 23:05:52 -08:00
committed by GitHub
parent 05ec74a1e3
commit bc2caadb74
125 changed files with 5544 additions and 1510 deletions

View File

@@ -60,12 +60,16 @@ func SendMessage(message models.ChatEvent) {
}
// GetMessages gets all of the messages.
func GetMessages(filtered bool) []models.ChatEvent {
func GetMessages() []models.ChatEvent {
if _server == nil {
return []models.ChatEvent{}
}
return getChatHistory(filtered)
return getChatHistory()
}
func GetModerationChatMessages() []models.ChatEvent {
return getChatModerationHistory()
}
func GetClient(clientID string) *Client {

View File

@@ -28,36 +28,38 @@ type Client struct {
Username *string
ClientID string // How we identify unique viewers when counting viewer counts.
Geo *geoip.GeoDetails `json:"geo"`
Ignore bool // If set to true this will not be treated as a viewer
socketID string // How we identify a single websocket client.
ws *websocket.Conn
ch chan models.ChatEvent
pingch chan models.PingMessage
usernameChangeChannel chan models.NameChangeEvent
userJoinedChannel chan models.UserJoinedEvent
doneCh chan bool
rateLimiter *rate.Limiter
}
const (
CHAT = "CHAT"
NAMECHANGE = "NAME_CHANGE"
PING = "PING"
PONG = "PONG"
VISIBILITYUPDATE = "VISIBILITY-UPDATE"
)
// NewClient creates a new chat client.
func NewClient(ws *websocket.Conn) *Client {
if ws == nil {
log.Panicln("ws cannot be nil")
}
var ignoreClient = false
for _, extraData := range ws.Config().Protocol {
if extraData == "IGNORE_CLIENT" {
ignoreClient = true
}
}
ch := make(chan models.ChatEvent, channelBufSize)
doneCh := make(chan bool)
pingch := make(chan models.PingMessage)
usernameChangeChannel := make(chan models.NameChangeEvent)
userJoinedChannel := make(chan models.UserJoinedEvent)
ipAddress := utils.GetIPAddressFromRequest(ws.Request())
userAgent := ws.Request().UserAgent()
@@ -66,7 +68,7 @@ func NewClient(ws *websocket.Conn) *Client {
rateLimiter := rate.NewLimiter(0.6, 5)
return &Client{time.Now(), 0, userAgent, ipAddress, nil, clientID, nil, socketID, ws, ch, pingch, usernameChangeChannel, doneCh, rateLimiter}
return &Client{time.Now(), 0, userAgent, ipAddress, nil, clientID, nil, ignoreClient, socketID, ws, ch, pingch, usernameChangeChannel, userJoinedChannel, doneCh, rateLimiter}
}
func (c *Client) write(msg models.ChatEvent) {
@@ -105,6 +107,12 @@ func (c *Client) listenWrite() {
if err != nil {
c.handleClientSocketError(err)
}
case msg := <-c.userJoinedChannel:
err := websocket.JSON.Send(c.ws, msg)
if err != nil {
c.handleClientSocketError(err)
}
// receive done request
case <-c.doneCh:
_server.removeClient(c)
@@ -157,28 +165,46 @@ func (c *Client) listenRead() {
log.Errorln(err)
}
messageType := messageTypeCheck["type"]
messageType := messageTypeCheck["type"].(string)
if !c.passesRateLimit() {
continue
}
if messageType == CHAT {
if messageType == models.MessageSent {
c.chatMessageReceived(data)
} else if messageType == NAMECHANGE {
} else if messageType == models.UserNameChanged {
c.userChangedName(data)
} else if messageType == models.UserJoined {
c.userJoined(data)
}
}
}
}
func (c *Client) userJoined(data []byte) {
var msg models.UserJoinedEvent
if err := json.Unmarshal(data, &msg); err != nil {
log.Errorln(err)
return
}
msg.ID = shortid.MustGenerate()
msg.Type = models.UserJoined
msg.Timestamp = time.Now()
c.Username = &msg.Username
_server.userJoined(msg)
}
func (c *Client) userChangedName(data []byte) {
var msg models.NameChangeEvent
err := json.Unmarshal(data, &msg)
if err != nil {
log.Errorln(err)
}
msg.Type = NAMECHANGE
msg.Type = models.UserNameChanged
msg.ID = shortid.MustGenerate()
_server.usernameChanged(msg)
c.Username = &msg.NewName
@@ -191,10 +217,7 @@ func (c *Client) chatMessageReceived(data []byte) {
log.Errorln(err)
}
id, _ := shortid.Generate()
msg.ID = id
msg.Timestamp = time.Now()
msg.Visible = true
msg.SetDefaults()
c.MessageCount++
c.Username = &msg.Author

View File

@@ -1,6 +1,8 @@
package chat
import (
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
@@ -20,8 +22,10 @@ func SetMessagesVisibility(messageIDs []string, visibility bool) error {
log.Errorln(err)
continue
}
message.MessageType = VISIBILITYUPDATE
message.MessageType = models.VisibiltyToggled
_server.sendAll(message)
go webhooks.SendChatEvent(message)
}
return nil

View File

@@ -61,15 +61,8 @@ func addMessage(message models.ChatEvent) {
}
}
func getChatHistory(filtered bool) []models.ChatEvent {
func getChat(query string) []models.ChatEvent {
history := make([]models.ChatEvent, 0)
// Get all messages sent within the past day
var query = "SELECT * FROM messages WHERE messageType != 'SYSTEM' AND datetime(timestamp) >=datetime('now', '-1 Day')"
if filtered {
query = query + " AND visible = 1"
}
rows, err := _db.Query(query)
if err != nil {
log.Fatal(err)
@@ -80,7 +73,7 @@ func getChatHistory(filtered bool) []models.ChatEvent {
var id string
var author string
var body string
var messageType string
var messageType models.EventType
var visible int
var timestamp time.Time
@@ -109,6 +102,17 @@ func getChatHistory(filtered bool) []models.ChatEvent {
return history
}
func getChatModerationHistory() []models.ChatEvent {
var query = "SELECT * FROM messages WHERE messageType == 'CHAT' AND datetime(timestamp) >=datetime('now', '-5 Hour')"
return getChat(query)
}
func getChatHistory() []models.ChatEvent {
// Get all messages sent within the past 5hrs, max 50
var query = "SELECT * FROM messages WHERE datetime(timestamp) >=datetime('now', '-5 Hour') AND visible = 1 LIMIT 50"
return getChat(query)
}
func saveMessageVisibility(messageIDs []string, visible bool) error {
tx, err := _db.Begin()
if err != nil {
@@ -150,7 +154,7 @@ func getMessageById(messageID string) (models.ChatEvent, error) {
var id string
var author string
var body string
var messageType string
var messageType models.EventType
var visible int
var timestamp time.Time

View File

@@ -9,7 +9,8 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models"
)
@@ -61,7 +62,7 @@ func (s *server) sendAll(msg models.ChatEvent) {
}
func (s *server) ping() {
ping := models.PingMessage{MessageType: PING}
ping := models.PingMessage{MessageType: models.PING}
for _, c := range s.Clients {
c.pingch <- ping
}
@@ -71,6 +72,16 @@ func (s *server) usernameChanged(msg models.NameChangeEvent) {
for _, c := range s.Clients {
c.usernameChangeChannel <- msg
}
go webhooks.SendChatEventUsernameChanged(msg)
}
func (s *server) userJoined(msg models.UserJoinedEvent) {
for _, c := range s.Clients {
c.userJoinedChannel <- msg
}
go webhooks.SendChatEventUserJoined(msg)
}
func (s *server) onConnection(ws *websocket.Conn) {
@@ -103,8 +114,10 @@ func (s *server) Listen() {
s.Clients[c.socketID] = c
l.Unlock()
s.listener.ClientAdded(c.GetViewerClientFromChatClient())
s.sendWelcomeMessageToClient(c)
if !c.Ignore {
s.listener.ClientAdded(c.GetViewerClientFromChatClient())
s.sendWelcomeMessageToClient(c)
}
// remove a client
case c := <-s.delCh:
@@ -122,7 +135,11 @@ func (s *server) Listen() {
s.sendAll(msg)
// Store in the message history
msg.SetDefaults()
addMessage(msg)
// Send webhooks
go webhooks.SendChatEvent(msg)
}
case ping := <-s.pingCh:
fmt.Println("PING?", ping)
@@ -154,8 +171,8 @@ func (s *server) sendWelcomeMessageToClient(c *Client) {
// Add an artificial delay so people notice this message come in.
time.Sleep(7 * time.Second)
initialChatMessageText := fmt.Sprintf("Welcome to %s! %s", config.Config.InstanceDetails.Title, config.Config.InstanceDetails.Summary)
initialMessage := models.ChatEvent{ClientID: "owncast-server", Author: config.Config.InstanceDetails.Name, Body: initialChatMessageText, ID: "initial-message-1", MessageType: "SYSTEM", Visible: true, Timestamp: time.Now()}
initialChatMessageText := fmt.Sprintf("Welcome to %s! %s", data.GetServerName(), data.GetServerSummary())
initialMessage := models.ChatEvent{ClientID: "owncast-server", Author: data.GetServerName(), Body: initialChatMessageText, ID: "initial-message-1", MessageType: "SYSTEM", Visible: true, Timestamp: time.Now()}
c.write(initialMessage)
}()
}

View File

@@ -1,8 +1,6 @@
package core
import (
"errors"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/models"
)
@@ -26,16 +24,16 @@ func (cl ChatListenerImpl) MessageSent(message models.ChatEvent) {
// SendMessageToChat sends a message to the chat server.
func SendMessageToChat(message models.ChatEvent) error {
if !message.Valid() {
return errors.New("invalid chat message; id, author, and body are required")
}
chat.SendMessage(message)
return nil
}
// GetAllChatMessages gets all of the chat messages.
func GetAllChatMessages(filtered bool) []models.ChatEvent {
return chat.GetMessages(filtered)
func GetAllChatMessages() []models.ChatEvent {
return chat.GetMessages()
}
func GetModerationChatMessages() []models.ChatEvent {
return chat.GetModerationChatMessages()
}

View File

@@ -10,8 +10,9 @@ import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/ffmpeg"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/rtmp"
"github.com/owncast/owncast/core/transcoder"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
"github.com/owncast/owncast/yp"
@@ -20,33 +21,41 @@ import (
var (
_stats *models.Stats
_storage models.StorageProvider
_transcoder *ffmpeg.Transcoder
_transcoder *transcoder.Transcoder
_yp *yp.YP
_broadcaster *models.Broadcaster
)
var handler ffmpeg.HLSHandler
var fileWriter = ffmpeg.FileWriterReceiverService{}
var handler transcoder.HLSHandler
var fileWriter = transcoder.FileWriterReceiverService{}
// Start starts up the core processing.
func Start() error {
resetDirectories()
data.PopulateDefaults()
// Once a couple versions pass we can remove the old data migrators.
data.RunMigrations()
if err := data.VerifySettings(); err != nil {
log.Error(err)
return err
}
if err := setupStats(); err != nil {
log.Error("failed to setup the stats")
return err
}
if err := setupStorage(); err != nil {
log.Error("failed to setup the storage")
return err
}
// The HLS handler takes the written HLS playlists and segments
// and makes storage decisions. It's rather simple right now
// but will play more useful when recordings come into play.
handler = ffmpeg.HLSHandler{}
handler.Storage = _storage
handler = transcoder.HLSHandler{}
if err := setupStorage(); err != nil {
log.Errorln("storage error", err)
}
fileWriter.SetupFileWriterReceiverService(&handler)
if err := createInitialOfflineState(); err != nil {
@@ -54,10 +63,8 @@ func Start() error {
return err
}
if config.Config.YP.Enabled {
if data.GetDirectoryEnabled() {
_yp = yp.NewYP(GetStatus)
} else {
yp.DisplayInstructions()
}
chat.Setup(ChatListenerImpl{})
@@ -65,8 +72,8 @@ func Start() error {
// start the rtmp server
go rtmp.Start(setStreamAsConnected, setBroadcaster)
port := config.Config.GetPublicWebServerPort()
rtmpPort := config.Config.GetRTMPServerPort()
port := config.WebServerPort
rtmpPort := data.GetRTMPPortNumber()
log.Infof("Web server is listening on port %d, RTMP is accepting inbound streams on port %d.", port, rtmpPort)
log.Infoln("The web admin interface is available at /admin.")
@@ -94,13 +101,13 @@ func transitionToOfflineVideoStreamContent() {
offlineFilename := "offline.ts"
offlineFilePath := "static/" + offlineFilename
_transcoder := ffmpeg.NewTranscoder()
_transcoder.SetSegmentLength(10)
_transcoder := transcoder.NewTranscoder()
_transcoder.SetInput(offlineFilePath)
_transcoder.Start()
// Copy the logo to be the thumbnail
err := utils.Copy(filepath.Join("webroot", config.Config.InstanceDetails.Logo), "webroot/thumbnail.jpg")
logo := data.GetLogoPath()
err := utils.Copy(filepath.Join("data", logo), "webroot/thumbnail.jpg")
if err != nil {
log.Warnln(err)
}
@@ -129,8 +136,8 @@ func resetDirectories() {
os.Remove(filepath.Join(config.WebRoot, "thumbnail.jpg"))
// Create private hls data dirs
if len(config.Config.VideoSettings.StreamQualities) != 0 {
for index := range config.Config.VideoSettings.StreamQualities {
if len(data.GetStreamOutputVariants()) != 0 {
for index := range data.GetStreamOutputVariants() {
err = os.MkdirAll(path.Join(config.PrivateHLSStoragePath, strconv.Itoa(index)), 0777)
if err != nil {
log.Fatalln(err)
@@ -154,7 +161,8 @@ func resetDirectories() {
}
// Remove the previous thumbnail
err = utils.Copy(path.Join(config.WebRoot, config.Config.InstanceDetails.Logo), "webroot/thumbnail.jpg")
logo := data.GetLogoPath()
err = utils.Copy(path.Join("data", logo), "webroot/thumbnail.jpg")
if err != nil {
log.Warnln(err)
}

199
core/data/accessTokens.go Normal file
View File

@@ -0,0 +1,199 @@
package data
import (
"errors"
"strings"
"time"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
func createAccessTokensTable() {
log.Traceln("Creating access_tokens table...")
createTableSQL := `CREATE TABLE IF NOT EXISTS access_tokens (
"token" string NOT NULL PRIMARY KEY,
"name" string,
"scopes" TEXT,
"timestamp" DATETIME DEFAULT CURRENT_TIMESTAMP,
"last_used" DATETIME
);`
stmt, err := _db.Prepare(createTableSQL)
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln(err)
}
}
// InsertToken will add a new token to the database.
func InsertToken(token string, name string, scopes []string) error {
log.Println("Adding new access token:", name)
scopesString := strings.Join(scopes, ",")
tx, err := _db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare("INSERT INTO access_tokens(token, name, scopes) values(?, ?, ?)")
if err != nil {
return err
}
defer stmt.Close()
if _, err = stmt.Exec(token, name, scopesString); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
// DeleteToken will delete a token from the database.
func DeleteToken(token string) error {
log.Println("Deleting access token:", token)
tx, err := _db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare("DELETE FROM access_tokens WHERE token = ?")
if err != nil {
return err
}
defer stmt.Close()
result, err := stmt.Exec(token)
if err != nil {
return err
}
if rowsDeleted, _ := result.RowsAffected(); rowsDeleted == 0 {
tx.Rollback() //nolint
return errors.New(token + " not found")
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
// DoesTokenSupportScope will determine if a specific token has access to perform a scoped action.
func DoesTokenSupportScope(token string, scope string) (bool, error) {
// This will split the scopes from comma separated to individual rows
// so we can efficiently find if a token supports a single scope.
// This is SQLite specific, so if we ever support other database
// backends we need to support other methods.
var query = `SELECT count(*) FROM (
WITH RECURSIVE split(token, scope, rest) AS (
SELECT token, '', scopes || ',' FROM access_tokens
UNION ALL
SELECT token,
substr(rest, 0, instr(rest, ',')),
substr(rest, instr(rest, ',')+1)
FROM split
WHERE rest <> '')
SELECT token, scope
FROM split
WHERE scope <> ''
ORDER BY token, scope
) AS token WHERE token.token = ? AND token.scope = ?;`
row := _db.QueryRow(query, token, scope)
var count = 0
err := row.Scan(&count)
return count > 0, err
}
// GetAccessTokens will return all access tokens.
func GetAccessTokens() ([]models.AccessToken, error) { //nolint
tokens := make([]models.AccessToken, 0)
// Get all messages sent within the past day
var query = "SELECT * FROM access_tokens"
rows, err := _db.Query(query)
if err != nil {
return tokens, err
}
defer rows.Close()
for rows.Next() {
var token string
var name string
var scopes string
var timestampString string
var lastUsedString *string
if err := rows.Scan(&token, &name, &scopes, &timestampString, &lastUsedString); err != nil {
log.Error("There is a problem reading the database.", err)
return tokens, err
}
timestamp, err := time.Parse(time.RFC3339, timestampString)
if err != nil {
return tokens, err
}
var lastUsed *time.Time = nil
if lastUsedString != nil {
lastUsedTime, _ := time.Parse(time.RFC3339, *lastUsedString)
lastUsed = &lastUsedTime
}
singleToken := models.AccessToken{
Name: name,
Token: token,
Scopes: strings.Split(scopes, ","),
Timestamp: timestamp,
LastUsed: lastUsed,
}
tokens = append(tokens, singleToken)
}
if err := rows.Err(); err != nil {
return tokens, err
}
return tokens, nil
}
// SetAccessTokenAsUsed will update the last used timestamp for a token.
func SetAccessTokenAsUsed(token string) error {
tx, err := _db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare("UPDATE access_tokens SET last_used = CURRENT_TIMESTAMP WHERE token = ?")
if err != nil {
return err
}
defer stmt.Close()
if _, err := stmt.Exec(token); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}

18
core/data/cache.go Normal file
View File

@@ -0,0 +1,18 @@
package data
import "errors"
// GetCachedValue will return a value for key from the cache.
func (ds *Datastore) GetCachedValue(key string) ([]byte, error) {
// Check for a cached value
if val, ok := ds.cache[key]; ok {
return val, nil
}
return nil, errors.New(key + " not found in cache")
}
// SetCachedValue will set a value for key in the cache.
func (ds *Datastore) SetCachedValue(key string, b []byte) {
ds.cache[key] = b
}

450
core/data/config.go Normal file
View File

@@ -0,0 +1,450 @@
package data
import (
"errors"
"sort"
"strings"
"time"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
const extraContentKey = "extra_page_content"
const streamTitleKey = "stream_title"
const streamKeyKey = "stream_key"
const logoPathKey = "logo_path"
const serverSummaryKey = "server_summary"
const serverNameKey = "server_name"
const serverURLKey = "server_url"
const httpPortNumberKey = "http_port_number"
const rtmpPortNumberKey = "rtmp_port_number"
const serverMetadataTagsKey = "server_metadata_tags"
const directoryEnabledKey = "directory_enabled"
const directoryRegistrationKeyKey = "directory_registration_key"
const socialHandlesKey = "social_handles"
const peakViewersSessionKey = "peak_viewers_session"
const peakViewersOverallKey = "peak_viewers_overall"
const lastDisconnectTimeKey = "last_disconnect_time"
const ffmpegPathKey = "ffmpeg_path"
const nsfwKey = "nsfw"
const s3StorageEnabledKey = "s3_storage_enabled"
const s3StorageConfigKey = "s3_storage_config"
const videoLatencyLevel = "video_latency_level"
const videoStreamOutputVariantsKey = "video_stream_output_variants"
// GetExtraPageBodyContent will return the user-supplied body content.
func GetExtraPageBodyContent() string {
content, err := _datastore.GetString(extraContentKey)
if err != nil {
log.Errorln(extraContentKey, err)
return config.GetDefaults().PageBodyContent
}
return content
}
// SetExtraPageBodyContent will set the user-supplied body content.
func SetExtraPageBodyContent(content string) error {
return _datastore.SetString(extraContentKey, content)
}
// GetStreamTitle will return the name of the current stream.
func GetStreamTitle() string {
title, err := _datastore.GetString(streamTitleKey)
if err != nil {
return ""
}
return title
}
// SetStreamTitle will set the name of the current stream.
func SetStreamTitle(title string) error {
return _datastore.SetString(streamTitleKey, title)
}
// GetStreamKey will return the inbound streaming password.
func GetStreamKey() string {
key, err := _datastore.GetString(streamKeyKey)
if err != nil {
log.Errorln(streamKeyKey, err)
return ""
}
return key
}
// SetStreamKey will set the inbound streaming password.
func SetStreamKey(key string) error {
return _datastore.SetString(streamKeyKey, key)
}
// GetLogoPath will return the path for the logo, relative to webroot.
func GetLogoPath() string {
logo, err := _datastore.GetString(logoPathKey)
if err != nil {
log.Errorln(logoPathKey, err)
return config.GetDefaults().Logo
}
if logo == "" {
return config.GetDefaults().Logo
}
return logo
}
// SetLogoPath will set the path for the logo, relative to webroot.
func SetLogoPath(logo string) error {
return _datastore.SetString(logoPathKey, logo)
}
// GetServerSummary will return the server summary text.
func GetServerSummary() string {
summary, err := _datastore.GetString(serverSummaryKey)
if err != nil {
log.Errorln(serverSummaryKey, err)
return ""
}
return summary
}
// SetServerSummary will set the server summary text.
func SetServerSummary(summary string) error {
return _datastore.SetString(serverSummaryKey, summary)
}
// GetServerName will return the server name text.
func GetServerName() string {
name, err := _datastore.GetString(serverNameKey)
if err != nil {
log.Errorln(serverNameKey, err)
return ""
}
return name
}
// SetServerName will set the server name text.
func SetServerName(name string) error {
return _datastore.SetString(serverNameKey, name)
}
// GetServerURL will return the server URL.
func GetServerURL() string {
url, err := _datastore.GetString(serverURLKey)
if err != nil {
return ""
}
return url
}
// SetServerURL will set the server URL.
func SetServerURL(url string) error {
return _datastore.SetString(serverURLKey, url)
}
// GetHTTPPortNumber will return the server HTTP port.
func GetHTTPPortNumber() int {
port, err := _datastore.GetNumber(httpPortNumberKey)
if err != nil {
log.Errorln(httpPortNumberKey, err)
return config.GetDefaults().WebServerPort
}
if port == 0 {
return config.GetDefaults().WebServerPort
}
return int(port)
}
// SetHTTPPortNumber will set the server HTTP port.
func SetHTTPPortNumber(port float64) error {
return _datastore.SetNumber(httpPortNumberKey, port)
}
// GetRTMPPortNumber will return the server RTMP port.
func GetRTMPPortNumber() int {
port, err := _datastore.GetNumber(rtmpPortNumberKey)
if err != nil {
log.Errorln(rtmpPortNumberKey, err)
return config.GetDefaults().RTMPServerPort
}
if port == 0 {
return config.GetDefaults().RTMPServerPort
}
return int(port)
}
// SetRTMPPortNumber will set the server RTMP port.
func SetRTMPPortNumber(port float64) error {
return _datastore.SetNumber(rtmpPortNumberKey, port)
}
// GetServerMetadataTags will return the metadata tags.
func GetServerMetadataTags() []string {
tagsString, err := _datastore.GetString(serverMetadataTagsKey)
if err != nil {
log.Errorln(serverMetadataTagsKey, err)
return []string{}
}
return strings.Split(tagsString, ",")
}
// SetServerMetadataTags will return the metadata tags.
func SetServerMetadataTags(tags []string) error {
tagString := strings.Join(tags, ",")
return _datastore.SetString(serverMetadataTagsKey, tagString)
}
// GetDirectoryEnabled will return if this server should register to YP.
func GetDirectoryEnabled() bool {
enabled, err := _datastore.GetBool(directoryEnabledKey)
if err != nil {
return config.GetDefaults().YPEnabled
}
return enabled
}
// SetDirectoryEnabled will set if this server should register to YP.
func SetDirectoryEnabled(enabled bool) error {
return _datastore.SetBool(directoryEnabledKey, enabled)
}
// SetDirectoryRegistrationKey will set the YP protocol registration key.
func SetDirectoryRegistrationKey(key string) error {
return _datastore.SetString(directoryRegistrationKeyKey, key)
}
// GetDirectoryRegistrationKey will return the YP protocol registration key.
func GetDirectoryRegistrationKey() string {
key, _ := _datastore.GetString(directoryRegistrationKeyKey)
return key
}
// GetSocialHandles will return the external social links.
func GetSocialHandles() []models.SocialHandle {
var socialHandles []models.SocialHandle
configEntry, err := _datastore.Get(socialHandlesKey)
if err != nil {
log.Errorln(socialHandlesKey, err)
return socialHandles
}
if err := configEntry.getObject(&socialHandles); err != nil {
log.Errorln(err)
return socialHandles
}
return socialHandles
}
// SetSocialHandles will set the external social links.
func SetSocialHandles(socialHandles []models.SocialHandle) error {
var configEntry = ConfigEntry{Key: socialHandlesKey, Value: socialHandles}
return _datastore.Save(configEntry)
}
// GetPeakSessionViewerCount will return the max number of viewers for this stream.
func GetPeakSessionViewerCount() int {
count, err := _datastore.GetNumber(peakViewersSessionKey)
if err != nil {
return 0
}
return int(count)
}
// SetPeakSessionViewerCount will set the max number of viewers for this stream.
func SetPeakSessionViewerCount(count int) error {
return _datastore.SetNumber(peakViewersSessionKey, float64(count))
}
// GetPeakOverallViewerCount will return the overall max number of viewers.
func GetPeakOverallViewerCount() int {
count, err := _datastore.GetNumber(peakViewersOverallKey)
if err != nil {
return 0
}
return int(count)
}
// SetPeakOverallViewerCount will set the overall max number of viewers.
func SetPeakOverallViewerCount(count int) error {
return _datastore.SetNumber(peakViewersOverallKey, float64(count))
}
// GetLastDisconnectTime will return the time the last stream ended.
func GetLastDisconnectTime() (time.Time, error) {
var disconnectTime time.Time
configEntry, err := _datastore.Get(lastDisconnectTimeKey)
if err != nil {
return disconnectTime, err
}
if err := configEntry.getObject(disconnectTime); err != nil {
return disconnectTime, err
}
return disconnectTime, nil
}
// SetLastDisconnectTime will set the time the last stream ended.
func SetLastDisconnectTime(disconnectTime time.Time) error {
var configEntry = ConfigEntry{Key: lastDisconnectTimeKey, Value: disconnectTime}
return _datastore.Save(configEntry)
}
// SetNSFW will set if this stream has NSFW content.
func SetNSFW(isNSFW bool) error {
return _datastore.SetBool(nsfwKey, isNSFW)
}
// GetNSFW will return if this stream has NSFW content.
func GetNSFW() bool {
nsfw, err := _datastore.GetBool(nsfwKey)
if err != nil {
return false
}
return nsfw
}
// SetFfmpegPath will set the custom ffmpeg path.
func SetFfmpegPath(path string) error {
return _datastore.SetString(ffmpegPathKey, path)
}
// GetFfMpegPath will return the ffmpeg path.
func GetFfMpegPath() string {
path, err := _datastore.GetString(ffmpegPathKey)
if err != nil {
return ""
}
return path
}
// GetS3Config will return the external storage configuration.
func GetS3Config() models.S3 {
configEntry, err := _datastore.Get(s3StorageConfigKey)
if err != nil {
return models.S3{Enabled: false}
}
var s3Config models.S3
if err := configEntry.getObject(&s3Config); err != nil {
return models.S3{Enabled: false}
}
return s3Config
}
// SetS3Config will set the external storage configuration.
func SetS3Config(config models.S3) error {
var configEntry = ConfigEntry{Key: s3StorageConfigKey, Value: config}
return _datastore.Save(configEntry)
}
// GetS3StorageEnabled will return if external storage is enabled.
func GetS3StorageEnabled() bool {
enabled, err := _datastore.GetBool(s3StorageEnabledKey)
if err != nil {
log.Errorln(err)
return false
}
return enabled
}
// SetS3StorageEnabled will enable or disable external storage.
func SetS3StorageEnabled(enabled bool) error {
return _datastore.SetBool(s3StorageEnabledKey, enabled)
}
// GetStreamLatencyLevel will return the stream latency level.
func GetStreamLatencyLevel() models.LatencyLevel {
level, err := _datastore.GetNumber(videoLatencyLevel)
if err != nil || level == 0 {
level = 4
}
return models.GetLatencyLevel(int(level))
}
// SetStreamLatencyLevel will set the stream latency level.
func SetStreamLatencyLevel(level float64) error {
return _datastore.SetNumber(videoLatencyLevel, level)
}
// GetStreamOutputVariants will return all of the stream output variants.
func GetStreamOutputVariants() []models.StreamOutputVariant {
configEntry, err := _datastore.Get(videoStreamOutputVariantsKey)
if err != nil {
return config.GetDefaults().StreamVariants
}
var streamOutputVariants []models.StreamOutputVariant
if err := configEntry.getObject(&streamOutputVariants); err != nil {
return config.GetDefaults().StreamVariants
}
if len(streamOutputVariants) == 0 {
return config.GetDefaults().StreamVariants
}
return streamOutputVariants
}
// SetStreamOutputVariants will set the stream output variants.
func SetStreamOutputVariants(variants []models.StreamOutputVariant) error {
var configEntry = ConfigEntry{Key: videoStreamOutputVariantsKey, Value: variants}
return _datastore.Save(configEntry)
}
// VerifySettings will perform a sanity check for specific settings values.
func VerifySettings() error {
if GetStreamKey() == "" {
return errors.New("no stream key set. Please set one in your config file")
}
return nil
}
// FindHighestVideoQualityIndex will return the highest quality from a slice of variants.
func FindHighestVideoQualityIndex(qualities []models.StreamOutputVariant) int {
type IndexedQuality struct {
index int
quality models.StreamOutputVariant
}
if len(qualities) < 2 {
return 0
}
indexedQualities := make([]IndexedQuality, 0)
for index, quality := range qualities {
indexedQuality := IndexedQuality{index, quality}
indexedQualities = append(indexedQualities, indexedQuality)
}
sort.Slice(indexedQualities, func(a, b int) bool {
if indexedQualities[a].quality.IsVideoPassthrough && !indexedQualities[b].quality.IsVideoPassthrough {
return true
}
if !indexedQualities[a].quality.IsVideoPassthrough && indexedQualities[b].quality.IsVideoPassthrough {
return false
}
return indexedQualities[a].quality.VideoBitrate > indexedQualities[b].quality.VideoBitrate
})
return indexedQualities[0].index
}

46
core/data/configEntry.go Normal file
View File

@@ -0,0 +1,46 @@
package data
import (
"bytes"
"encoding/gob"
)
// ConfigEntry is the actual object saved to the database.
// The Value is encoded using encoding/gob.
type ConfigEntry struct {
Key string
Value interface{}
}
func (c *ConfigEntry) getString() (string, error) {
decoder := c.getDecoder()
var result string
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getNumber() (float64, error) {
decoder := c.getDecoder()
var result float64
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getBool() (bool, error) {
decoder := c.getDecoder()
var result bool
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getObject(result interface{}) error {
decoder := c.getDecoder()
err := decoder.Decode(result)
return err
}
func (c *ConfigEntry) getDecoder() *gob.Decoder {
valueBytes := c.Value.([]byte)
decoder := gob.NewDecoder(bytes.NewBuffer(valueBytes))
return decoder
}

View File

@@ -8,25 +8,32 @@ import (
"database/sql"
"fmt"
"os"
"time"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
const (
schemaVersion = 0
backupFile = "backup/owncastdb.bak"
)
var _db *sql.DB
var _datastore *Datastore
// GetDatabase will return the shared instance of the actual database.
func GetDatabase() *sql.DB {
return _db
}
func SetupPersistence() error {
file := config.Config.DatabaseFilePath
// GetStore will return the shared instance of the read/write datastore.
func GetStore() *Datastore {
return _datastore
}
// SetupPersistence will open the datastore and make it available.
func SetupPersistence(file string) error {
// Create empty DB file if it doesn't exist.
if !utils.DoesFileExists(file) {
log.Traceln("Creating new database at", file)
@@ -79,11 +86,26 @@ func SetupPersistence() error {
}
_db = db
createWebhooksTable()
createAccessTokensTable()
_datastore = &Datastore{}
_datastore.Setup()
dbBackupTicker := time.NewTicker(1 * time.Hour)
go func() {
for range dbBackupTicker.C {
utils.Backup(_db, backupFile)
}
}()
return nil
}
func migrateDatabase(db *sql.DB, from, to int) error {
log.Printf("Migrating database from version %d to %d\n", from, to)
utils.Backup(db, fmt.Sprintf("backup/owncast-v%d.bak", from))
for v := from; v < to; v++ {
switch v {
case 0:

115
core/data/data_test.go Normal file
View File

@@ -0,0 +1,115 @@
package data
import (
"fmt"
"testing"
)
func TestMain(m *testing.M) {
dbFile := "../../test/test.db"
SetupPersistence(dbFile)
m.Run()
}
func TestString(t *testing.T) {
const testKey = "test string key"
const testValue = "test string value"
err := _datastore.SetString(testKey, testValue)
if err != nil {
panic(err)
}
// Get the config entry from the database
stringTestResult, err := _datastore.GetString(testKey)
if err != nil {
panic(err)
}
if stringTestResult != testValue {
t.Error("expected", testValue, "but test returned", stringTestResult)
}
}
func TestNumber(t *testing.T) {
const testKey = "test number key"
const testValue = 42
err := _datastore.SetNumber(testKey, testValue)
if err != nil {
panic(err)
}
// Get the config entry from the database
numberTestResult, err := _datastore.GetNumber(testKey)
if err != nil {
panic(err)
}
fmt.Println(numberTestResult)
if numberTestResult != testValue {
t.Error("expected", testValue, "but test returned", numberTestResult)
}
}
func TestBool(t *testing.T) {
const testKey = "test bool key"
const testValue = true
err := _datastore.SetBool(testKey, testValue)
if err != nil {
panic(err)
}
// Get the config entry from the database
numberTestResult, err := _datastore.GetBool(testKey)
if err != nil {
panic(err)
}
fmt.Println(numberTestResult)
if numberTestResult != testValue {
t.Error("expected", testValue, "but test returned", numberTestResult)
}
}
func TestCustomType(t *testing.T) {
const testKey = "test custom type key"
// Test an example struct with a slice
testStruct := TestStruct{
Test: "Test string 123 in test struct",
TestSlice: []string{"test string 1", "test string 2"},
}
// Save config entry to the database
if err := _datastore.Save(ConfigEntry{testKey, &testStruct}); err != nil {
t.Error(err)
}
// Get the config entry from the database
entryResult, err := _datastore.Get(testKey)
if err != nil {
t.Error(err)
}
// Get a typed struct out of it
var testResult TestStruct
if err := entryResult.getObject(&testResult); err != nil {
t.Error(err)
}
fmt.Printf("%+v", testResult)
if testResult.TestSlice[0] != testStruct.TestSlice[0] {
t.Error("expected", testStruct.TestSlice[0], "but test returned", testResult.TestSlice[0])
}
}
// Custom type for testing
type TestStruct struct {
Test string
TestSlice []string
privateProperty string
}

43
core/data/defaults.go Normal file
View File

@@ -0,0 +1,43 @@
package data
import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/models"
)
// HasPopulatedDefaults will determine if the defaults have been inserted into the database.
func HasPopulatedDefaults() bool {
hasPopulated, err := _datastore.GetBool("HAS_POPULATED_DEFAULTS")
if err != nil {
return false
}
return hasPopulated
}
// PopulateDefaults will set default values in the database.
func PopulateDefaults() {
defaults := config.GetDefaults()
if HasPopulatedDefaults() {
return
}
_ = SetStreamKey(defaults.StreamKey)
_ = SetHTTPPortNumber(float64(defaults.WebServerPort))
_ = SetRTMPPortNumber(float64(defaults.RTMPServerPort))
_ = SetLogoPath(defaults.Logo)
_ = SetServerMetadataTags([]string{"owncast", "streaming"})
_ = SetServerSummary("Welcome to your new Owncast server! This description can be changed in the admin")
_ = SetServerName("Owncast")
_ = SetStreamKey(defaults.StreamKey)
_ = SetExtraPageBodyContent("This is your page's content that can be edited in the admin.")
_ = SetSocialHandles([]models.SocialHandle{
{
Platform: "github",
URL: "https://github.com/owncast/owncast",
},
})
_datastore.warmCache()
_ = _datastore.SetBool("HAS_POPULATED_DEFAULTS", true)
}

266
core/data/migrator.go Normal file
View File

@@ -0,0 +1,266 @@
package data
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
)
// RunMigrations will start the migration process from the config file.
func RunMigrations() {
if !utils.DoesFileExists(config.BackupDirectory) {
if err := os.Mkdir(config.BackupDirectory, 0700); err != nil {
log.Errorln("Unable to create backup directory", err)
return
}
}
migrateConfigFile()
migrateStatsFile()
migrateYPKey()
}
func migrateStatsFile() {
oldStats := models.Stats{
Clients: make(map[string]models.Client),
}
if !utils.DoesFileExists(config.StatsFile) {
return
}
log.Infoln("Migrating", config.StatsFile, "to new datastore")
jsonFile, err := ioutil.ReadFile(config.StatsFile)
if err != nil {
log.Errorln(err)
return
}
if err := json.Unmarshal(jsonFile, &oldStats); err != nil {
log.Errorln(err)
return
}
_ = SetPeakSessionViewerCount(oldStats.SessionMaxViewerCount)
_ = SetPeakOverallViewerCount(oldStats.OverallMaxViewerCount)
if err := utils.Move(config.StatsFile, "backup/stats.old"); err != nil {
log.Warnln(err)
}
}
func migrateYPKey() {
filePath := ".yp.key"
if !utils.DoesFileExists(filePath) {
return
}
log.Infoln("Migrating", filePath, "to new datastore")
keyFile, err := ioutil.ReadFile(filePath)
if err != nil {
log.Errorln("Unable to migrate", keyFile, "there may be issues registering with the directory")
}
if err := SetDirectoryRegistrationKey(string(keyFile)); err != nil {
log.Errorln("Unable to migrate", keyFile, "there may be issues registering with the directory")
return
}
if err := utils.Move(filePath, "backup/yp.key.old"); err != nil {
log.Warnln(err)
}
}
func migrateConfigFile() {
filePath := config.ConfigFilePath
if !utils.DoesFileExists(filePath) {
return
}
log.Infoln("Migrating", filePath, "to new datastore")
var oldConfig configFile
yamlFile, err := ioutil.ReadFile(filePath)
if err != nil {
log.Errorln("config file err", err)
return
}
if err := yaml.Unmarshal(yamlFile, &oldConfig); err != nil {
log.Errorln("Error reading the config file.", err)
return
}
_ = SetServerName(oldConfig.InstanceDetails.Name)
_ = SetServerSummary(oldConfig.InstanceDetails.Summary)
_ = SetServerMetadataTags(oldConfig.InstanceDetails.Tags)
_ = SetStreamKey(oldConfig.VideoSettings.StreamingKey)
_ = SetNSFW(oldConfig.InstanceDetails.NSFW)
_ = SetServerURL(oldConfig.YP.InstanceURL)
_ = SetDirectoryEnabled(oldConfig.YP.Enabled)
_ = SetSocialHandles(oldConfig.InstanceDetails.SocialHandles)
_ = SetFfmpegPath(oldConfig.FFMpegPath)
_ = SetHTTPPortNumber(float64(oldConfig.WebServerPort))
_ = SetRTMPPortNumber(float64(oldConfig.RTMPServerPort))
// Migrate logo
if logo := oldConfig.InstanceDetails.Logo; logo != "" {
filename := filepath.Base(logo)
newPath := filepath.Join("data", filename)
err := utils.Copy(filepath.Join("webroot", logo), newPath)
log.Traceln("Copying logo from", logo, "to", newPath)
if err != nil {
log.Errorln("Error moving logo", logo, err)
} else {
_ = SetLogoPath(filename)
}
}
// Migrate video variants
variants := []models.StreamOutputVariant{}
for _, variant := range oldConfig.VideoSettings.StreamQualities {
migratedVariant := models.StreamOutputVariant{}
migratedVariant.IsAudioPassthrough = true
migratedVariant.IsVideoPassthrough = variant.IsVideoPassthrough
migratedVariant.Framerate = variant.Framerate
migratedVariant.VideoBitrate = variant.VideoBitrate
migratedVariant.ScaledHeight = variant.ScaledHeight
migratedVariant.ScaledWidth = variant.ScaledWidth
presetMapping := map[string]int{
"ultrafast": 1,
"superfast": 2,
"veryfast": 3,
"faster": 4,
"fast": 5,
}
migratedVariant.CPUUsageLevel = presetMapping[variant.EncoderPreset]
variants = append(variants, migratedVariant)
}
_ = SetStreamOutputVariants(variants)
// Migrate latency level
level := 4
oldSegmentLength := oldConfig.VideoSettings.ChunkLengthInSeconds
oldNumberOfSegments := oldConfig.Files.MaxNumberInPlaylist
latencyLevels := models.GetLatencyConfigs()
if oldSegmentLength == latencyLevels[1].SecondsPerSegment && oldNumberOfSegments == latencyLevels[1].SegmentCount {
level = 1
} else if oldSegmentLength == latencyLevels[2].SecondsPerSegment && oldNumberOfSegments == latencyLevels[2].SegmentCount {
level = 2
} else if oldSegmentLength == latencyLevels[3].SecondsPerSegment && oldNumberOfSegments == latencyLevels[3].SegmentCount {
level = 3
} else if oldSegmentLength == latencyLevels[5].SecondsPerSegment && oldNumberOfSegments == latencyLevels[5].SegmentCount {
level = 5
} else if oldSegmentLength >= latencyLevels[6].SecondsPerSegment && oldNumberOfSegments >= latencyLevels[6].SegmentCount {
level = 6
}
_ = SetStreamLatencyLevel(float64(level))
// Migrate storage config
_ = SetS3Config(models.S3(oldConfig.Storage))
// Migrate the old content.md file
content, err := ioutil.ReadFile(config.ExtraInfoFile)
if err == nil && len(content) > 0 {
_ = SetExtraPageBodyContent(string(content))
}
if err := utils.Move(filePath, "backup/config.old"); err != nil {
log.Warnln(err)
}
log.Infoln("Your old config file can be found in the backup directory for reference. For all future configuration use the web admin.")
}
type configFile struct {
DatabaseFilePath string `yaml:"databaseFile"`
EnableDebugFeatures bool `yaml:"-"`
FFMpegPath string
Files files `yaml:"files"`
InstanceDetails instanceDetails `yaml:"instanceDetails"`
VersionInfo string `yaml:"-"` // For storing the version/build number
VersionNumber string `yaml:"-"`
VideoSettings videoSettings `yaml:"videoSettings"`
WebServerPort int
RTMPServerPort int
YP yp `yaml:"yp"`
Storage s3 `yaml:"s3"`
}
// instanceDetails defines the user-visible information about this particular instance.
type instanceDetails struct {
Name string `yaml:"name"`
Title string `yaml:"title"`
Summary string `yaml:"summary"`
Logo string `yaml:"logo"`
Tags []string `yaml:"tags"`
Version string `yaml:"version"`
NSFW bool `yaml:"nsfw"`
ExtraPageContent string `yaml:"extraPageContent"`
StreamTitle string `yaml:"streamTitle"`
SocialHandles []models.SocialHandle `yaml:"socialHandles"`
}
type videoSettings struct {
ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"`
StreamingKey string `yaml:"streamingKey"`
StreamQualities []streamQuality `yaml:"streamQualities"`
HighestQualityStreamIndex int `yaml:"-"`
}
// yp allows registration to the central Owncast yp (Yellow pages) service operating as a directory.
type yp struct {
Enabled bool `yaml:"enabled"`
InstanceURL string `yaml:"instanceUrl"` // The public URL the directory should link to
}
// streamQuality defines the specifics of a single HLS stream variant.
type streamQuality struct {
// Enable passthrough to copy the video and/or audio directly from the
// incoming stream and disable any transcoding. It will ignore any of
// the below settings.
IsVideoPassthrough bool `yaml:"videoPassthrough" json:"videoPassthrough"`
IsAudioPassthrough bool `yaml:"audioPassthrough" json:"audioPassthrough"`
VideoBitrate int `yaml:"videoBitrate" json:"videoBitrate"`
AudioBitrate int `yaml:"audioBitrate" json:"audioBitrate"`
// Set only one of these in order to keep your current aspect ratio.
// Or set neither to not scale the video.
ScaledWidth int `yaml:"scaledWidth" json:"scaledWidth,omitempty"`
ScaledHeight int `yaml:"scaledHeight" json:"scaledHeight,omitempty"`
Framerate int `yaml:"framerate" json:"framerate"`
EncoderPreset string `yaml:"encoderPreset" json:"encoderPreset"`
}
type files struct {
MaxNumberInPlaylist int `yaml:"maxNumberInPlaylist"`
}
// s3 is for configuring the s3 integration.
type s3 struct {
Enabled bool `yaml:"enabled" json:"enabled"`
Endpoint string `yaml:"endpoint" json:"endpoint,omitempty"`
ServingEndpoint string `yaml:"servingEndpoint" json:"servingEndpoint,omitempty"`
AccessKey string `yaml:"accessKey" json:"accessKey,omitempty"`
Secret string `yaml:"secret" json:"secret,omitempty"`
Bucket string `yaml:"bucket" json:"bucket,omitempty"`
Region string `yaml:"region" json:"region,omitempty"`
ACL string `yaml:"acl" json:"acl,omitempty"`
}

152
core/data/persistence.go Normal file
View File

@@ -0,0 +1,152 @@
package data
import (
"bytes"
"database/sql"
"encoding/gob"
// sqlite requires a blank import.
_ "github.com/mattn/go-sqlite3"
log "github.com/sirupsen/logrus"
)
// Datastore is the global key/value store for configuration values.
type Datastore struct {
db *sql.DB
cache map[string][]byte
}
func (ds *Datastore) warmCache() {
log.Traceln("Warming config value cache")
res, err := ds.db.Query("SELECT key, value FROM datastore")
if err != nil || res.Err() != nil {
log.Errorln("error warming config cache", err, res.Err())
}
defer res.Close()
for res.Next() {
var rowKey string
var rowValue []byte
if err := res.Scan(&rowKey, &rowValue); err != nil {
log.Errorln("error pre-caching config row", err)
}
ds.cache[rowKey] = rowValue
}
}
// Get will query the database for the key and return the entry.
func (ds *Datastore) Get(key string) (ConfigEntry, error) {
cachedValue, err := ds.GetCachedValue(key)
if err == nil {
return ConfigEntry{
Key: key,
Value: cachedValue,
}, nil
}
var resultKey string
var resultValue []byte
row := ds.db.QueryRow("SELECT key, value FROM datastore WHERE key = ? LIMIT 1", key)
if err := row.Scan(&resultKey, &resultValue); err != nil {
return ConfigEntry{}, err
}
result := ConfigEntry{
Key: resultKey,
Value: resultValue,
}
return result, nil
}
// Save will save the ConfigEntry to the database.
func (ds *Datastore) Save(e ConfigEntry) error {
var dataGob bytes.Buffer
enc := gob.NewEncoder(&dataGob)
if err := enc.Encode(e.Value); err != nil {
return err
}
tx, err := ds.db.Begin()
if err != nil {
return err
}
var stmt *sql.Stmt
var count int
row := ds.db.QueryRow("SELECT COUNT(*) FROM datastore WHERE key = ? LIMIT 1", e.Key)
if err := row.Scan(&count); err != nil {
return err
}
if count == 0 {
stmt, err = tx.Prepare("INSERT INTO datastore(key, value) values(?, ?)")
if err != nil {
return err
}
_, err = stmt.Exec(e.Key, dataGob.Bytes())
} else {
stmt, err = tx.Prepare("UPDATE datastore SET value=? WHERE key=?")
if err != nil {
return err
}
_, err = stmt.Exec(dataGob.Bytes(), e.Key)
}
if err != nil {
return err
}
defer stmt.Close()
if err = tx.Commit(); err != nil {
log.Fatalln(err)
}
ds.SetCachedValue(e.Key, dataGob.Bytes())
return nil
}
// Setup will create the datastore table and perform initial initialization.
func (ds *Datastore) Setup() {
ds.cache = make(map[string][]byte)
ds.db = GetDatabase()
createTableSQL := `CREATE TABLE IF NOT EXISTS datastore (
"key" string NOT NULL PRIMARY KEY,
"value" BLOB,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL
);`
stmt, err := ds.db.Prepare(createTableSQL)
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Fatalln(err)
}
if !HasPopulatedDefaults() {
PopulateDefaults()
}
}
// Reset will delete all config entries in the datastore and start over.
func (ds *Datastore) Reset() {
sql := "DELETE FROM datastore"
stmt, err := ds.db.Prepare(sql)
if err != nil {
log.Fatalln(err)
}
defer stmt.Close()
if _, err = stmt.Exec(); err != nil {
log.Fatalln(err)
}
PopulateDefaults()
}

46
core/data/types.go Normal file
View File

@@ -0,0 +1,46 @@
package data
// GetString will return the string value for a key.
func (ds *Datastore) GetString(key string) (string, error) {
configEntry, err := ds.Get(key)
if err != nil {
return "", err
}
return configEntry.getString()
}
// SetString will set the string value for a key.
func (ds *Datastore) SetString(key string, value string) error {
configEntry := ConfigEntry{key, value}
return ds.Save(configEntry)
}
// GetNumber will return the numeric value for a key.
func (ds *Datastore) GetNumber(key string) (float64, error) {
configEntry, err := ds.Get(key)
if err != nil {
return 0, err
}
return configEntry.getNumber()
}
// SetNumber will set the numeric value for a key.
func (ds *Datastore) SetNumber(key string, value float64) error {
configEntry := ConfigEntry{key, value}
return ds.Save(configEntry)
}
// GetBool will return the boolean value for a key.
func (ds *Datastore) GetBool(key string) (bool, error) {
configEntry, err := ds.Get(key)
if err != nil {
return false, err
}
return configEntry.getBool()
}
// SetBool will set the boolean value for a key.
func (ds *Datastore) SetBool(key string, value bool) error {
configEntry := ConfigEntry{key, value}
return ds.Save(configEntry)
}

220
core/data/webhooks.go Normal file
View File

@@ -0,0 +1,220 @@
package data
import (
"errors"
"fmt"
"strings"
"time"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
func createWebhooksTable() {
log.Traceln("Creating webhooks table...")
createTableSQL := `CREATE TABLE IF NOT EXISTS webhooks (
"id" INTEGER PRIMARY KEY AUTOINCREMENT,
"url" string NOT NULL,
"events" TEXT NOT NULL,
"timestamp" DATETIME DEFAULT CURRENT_TIMESTAMP,
"last_used" DATETIME
);`
stmt, err := _db.Prepare(createTableSQL)
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln(err)
}
}
// InsertWebhook will add a new webhook to the database.
func InsertWebhook(url string, events []models.EventType) (int, error) {
log.Println("Adding new webhook:", url)
eventsString := strings.Join(events, ",")
tx, err := _db.Begin()
if err != nil {
return 0, err
}
stmt, err := tx.Prepare("INSERT INTO webhooks(url, events) values(?, ?)")
if err != nil {
return 0, err
}
defer stmt.Close()
insertResult, err := stmt.Exec(url, eventsString)
if err != nil {
return 0, err
}
if err = tx.Commit(); err != nil {
return 0, err
}
newID, err := insertResult.LastInsertId()
if err != nil {
return 0, err
}
return int(newID), err
}
// DeleteWebhook will delete a webhook from the database.
func DeleteWebhook(id int) error {
log.Println("Deleting webhook:", id)
tx, err := _db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare("DELETE FROM webhooks WHERE id = ?")
if err != nil {
return err
}
defer stmt.Close()
result, err := stmt.Exec(id)
if err != nil {
return err
}
if rowsDeleted, _ := result.RowsAffected(); rowsDeleted == 0 {
tx.Rollback() //nolint
return errors.New(fmt.Sprint(id) + " not found")
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
// GetWebhooksForEvent will return all of the webhooks that want to be notified about an event type.
func GetWebhooksForEvent(event models.EventType) []models.Webhook {
webhooks := make([]models.Webhook, 0)
var query = `SELECT * FROM (
WITH RECURSIVE split(url, event, rest) AS (
SELECT url, '', events || ',' FROM webhooks
UNION ALL
SELECT url,
substr(rest, 0, instr(rest, ',')),
substr(rest, instr(rest, ',')+1)
FROM split
WHERE rest <> '')
SELECT url, event
FROM split
WHERE event <> ''
) AS webhook WHERE event IS "` + event + `"`
rows, err := _db.Query(query)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
for rows.Next() {
var url string
err = rows.Scan(&url, &event)
if err != nil {
log.Debugln(err)
log.Error("There is a problem with the database.")
break
}
singleWebhook := models.Webhook{
URL: url,
}
webhooks = append(webhooks, singleWebhook)
}
return webhooks
}
// GetWebhooks will return all the webhooks.
func GetWebhooks() ([]models.Webhook, error) { //nolint
webhooks := make([]models.Webhook, 0)
var query = "SELECT * FROM webhooks"
rows, err := _db.Query(query)
if err != nil {
return webhooks, err
}
defer rows.Close()
for rows.Next() {
var id int
var url string
var events string
var timestampString string
var lastUsedString *string
if err := rows.Scan(&id, &url, &events, &timestampString, &lastUsedString); err != nil {
log.Error("There is a problem reading the database.", err)
return webhooks, err
}
timestamp, err := time.Parse(time.RFC3339, timestampString)
if err != nil {
return webhooks, err
}
var lastUsed *time.Time = nil
if lastUsedString != nil {
lastUsedTime, _ := time.Parse(time.RFC3339, *lastUsedString)
lastUsed = &lastUsedTime
}
singleWebhook := models.Webhook{
ID: id,
URL: url,
Events: strings.Split(events, ","),
Timestamp: timestamp,
LastUsed: lastUsed,
}
webhooks = append(webhooks, singleWebhook)
}
if err := rows.Err(); err != nil {
return webhooks, err
}
return webhooks, nil
}
// SetWebhookAsUsed will update the last used time for a webhook.
func SetWebhookAsUsed(id string) error {
tx, err := _db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare("UPDATE webhooks SET last_used = CURRENT_TIMESTAMP WHERE id = ?")
if err != nil {
return err
}
defer stmt.Close()
if _, err := stmt.Exec(id); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}

View File

@@ -11,7 +11,7 @@ import (
func setCurrentBroadcasterInfo(t flvio.Tag, remoteAddr string) {
data, err := getInboundDetailsFromMetadata(t.DebugFields())
if err != nil {
log.Traceln("RTMP meadata:", err)
log.Warnln("Unable to parse inbound broadcaster details:", err)
}
broadcaster := models.Broadcaster{

View File

@@ -8,14 +8,13 @@ import (
"strings"
"syscall"
"time"
"unsafe"
"github.com/nareix/joy5/format/flv"
"github.com/nareix/joy5/format/flv/flvio"
log "github.com/sirupsen/logrus"
"github.com/nareix/joy5/format/rtmp"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
)
@@ -35,7 +34,7 @@ func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster))
_setStreamAsConnected = setStreamAsConnected
_setBroadcaster = setBroadcaster
port := config.Config.GetRTMPServerPort()
port := data.GetRTMPPortNumber()
s := rtmp.NewServer()
var lis net.Listener
var err error
@@ -45,7 +44,7 @@ func Start(setStreamAsConnected func(), setBroadcaster func(models.Broadcaster))
s.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) {
es := rtmp.EventString[e]
log.Traceln(unsafe.Pointer(c), nc.LocalAddr(), nc.RemoteAddr(), es)
log.Traceln("RTMP", nc.LocalAddr(), nc.RemoteAddr(), es)
}
s.HandleConn = HandleConn
@@ -81,7 +80,7 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) {
streamingKeyComponents := strings.Split(c.URL.Path, "/")
streamingKey := streamingKeyComponents[len(streamingKeyComponents)-1]
if streamingKey != config.Config.VideoSettings.StreamingKey {
if streamingKey != data.GetStreamKey() {
log.Errorln("invalid streaming key; rejecting incoming stream")
nc.Close()
return

View File

@@ -1,17 +1,14 @@
package core
import (
"encoding/json"
"io/ioutil"
"math"
"os"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/geoip"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
@@ -20,17 +17,13 @@ import (
var l = sync.Mutex{}
func setupStats() error {
s, err := getSavedStats()
if err != nil {
return err
}
s := getSavedStats()
_stats = &s
statsSaveTimer := time.NewTicker(1 * time.Minute)
go func() {
for range statsSaveTimer.C {
if err := saveStatsToFile(); err != nil {
if err := saveStats(); err != nil {
panic(err)
}
}
@@ -48,7 +41,8 @@ func IsStreamConnected() bool {
// Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available.
// So account for that with an artificial buffer of four segments.
timeSinceLastConnected := time.Since(_stats.LastConnectTime.Time).Seconds()
if timeSinceLastConnected < float64(config.Config.GetVideoSegmentSecondsLength())*2.3 {
waitTime := math.Max(float64(data.GetStreamLatencyLevel().SecondsPerSegment)*3.0, 7)
if timeSinceLastConnected < waitTime {
return false
}
@@ -103,42 +97,27 @@ func GetClients() []models.Client {
return clients
}
func saveStatsToFile() error {
jsonData, err := json.Marshal(_stats)
if err != nil {
return err
func saveStats() error {
if err := data.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil {
log.Errorln("error saving viewer count", err)
}
f, err := os.Create(config.StatsFile)
if err != nil {
return err
if err := data.SetPeakSessionViewerCount(_stats.SessionMaxViewerCount); err != nil {
log.Errorln("error saving viewer count", err)
}
defer f.Close()
if _, err := f.Write(jsonData); err != nil {
return err
if err := data.SetLastDisconnectTime(_stats.LastConnectTime.Time); err != nil {
log.Errorln("error saving disconnect time", err)
}
return nil
}
func getSavedStats() (models.Stats, error) {
func getSavedStats() models.Stats {
savedLastDisconnectTime, savedLastDisconnectTimeErr := data.GetLastDisconnectTime()
result := models.Stats{
Clients: make(map[string]models.Client),
}
if !utils.DoesFileExists(config.StatsFile) {
return result, nil
}
jsonFile, err := ioutil.ReadFile(config.StatsFile)
if err != nil {
return result, err
}
if err := json.Unmarshal(jsonFile, &result); err != nil {
return result, err
Clients: make(map[string]models.Client),
SessionMaxViewerCount: data.GetPeakSessionViewerCount(),
OverallMaxViewerCount: data.GetPeakOverallViewerCount(),
LastDisconnectTime: utils.NullTime{Time: savedLastDisconnectTime, Valid: savedLastDisconnectTimeErr == nil},
}
// If the stats were saved > 5min ago then ignore the
@@ -147,5 +126,5 @@ func getSavedStats() (models.Stats, error) {
result.SessionMaxViewerCount = 0
}
return result, err
return result
}

View File

@@ -2,6 +2,7 @@ package core
import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
)
@@ -11,17 +12,27 @@ func GetStatus() models.Status {
return models.Status{}
}
viewerCount := 0
if IsStreamConnected() {
viewerCount = len(_stats.Clients)
}
return models.Status{
Online: IsStreamConnected(),
ViewerCount: len(_stats.Clients),
ViewerCount: viewerCount,
OverallMaxViewerCount: _stats.OverallMaxViewerCount,
SessionMaxViewerCount: _stats.SessionMaxViewerCount,
LastDisconnectTime: _stats.LastDisconnectTime,
LastConnectTime: _stats.LastConnectTime,
VersionNumber: config.Config.VersionNumber,
VersionNumber: config.VersionNumber,
StreamTitle: data.GetStreamTitle(),
}
}
func GetCurrentBroadcast() *models.CurrentBroadcast {
return _currentBroadcast
}
// setBroadcaster will store the current inbound broadcasting details.
func setBroadcaster(broadcaster models.Broadcaster) {
_broadcaster = &broadcaster

View File

@@ -1,14 +1,14 @@
package core
import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/storageproviders"
)
func setupStorage() error {
handler.Storage = _storage
s3Config := data.GetS3Config()
if config.Config.S3.Enabled {
if s3Config.Enabled {
_storage = &storageproviders.S3Storage{}
} else {
_storage = &storageproviders.LocalStorage{}
@@ -18,5 +18,7 @@ func setupStorage() error {
return err
}
handler.Storage = _storage
return nil
}

View File

@@ -7,7 +7,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/ffmpeg"
"github.com/owncast/owncast/core/transcoder"
"github.com/owncast/owncast/utils"
)
@@ -24,7 +24,7 @@ func (s *LocalStorage) Setup() error {
_onlineCleanupTicker = time.NewTicker(1 * time.Minute)
go func() {
for range _onlineCleanupTicker.C {
ffmpeg.CleanupOldContent(config.PublicHLSStoragePath)
transcoder.CleanupOldContent(config.PublicHLSStoragePath)
}
}()
return nil

View File

@@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/playlist"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
@@ -44,19 +45,20 @@ var _uploader *s3manager.Uploader
func (s *S3Storage) Setup() error {
log.Trace("Setting up S3 for external storage of video...")
if config.Config.S3.ServingEndpoint != "" {
s.host = config.Config.S3.ServingEndpoint
s3Config := data.GetS3Config()
if s3Config.ServingEndpoint != "" {
s.host = s3Config.ServingEndpoint
} else {
s.host = fmt.Sprintf("%s/%s", config.Config.S3.Endpoint, config.Config.S3.Bucket)
s.host = fmt.Sprintf("%s/%s", s3Config.Endpoint, s3Config.Bucket)
}
s.s3Endpoint = config.Config.S3.Endpoint
s.s3ServingEndpoint = config.Config.S3.ServingEndpoint
s.s3Region = config.Config.S3.Region
s.s3Bucket = config.Config.S3.Bucket
s.s3AccessKey = config.Config.S3.AccessKey
s.s3Secret = config.Config.S3.Secret
s.s3ACL = config.Config.S3.ACL
s.s3Endpoint = s3Config.Endpoint
s.s3ServingEndpoint = s3Config.ServingEndpoint
s.s3Region = s3Config.Region
s.s3Bucket = s3Config.Bucket
s.s3AccessKey = s3Config.AccessKey
s.s3Secret = s3Config.Secret
s.s3ACL = s3Config.ACL
s.sess = s.connectAWS()
@@ -81,7 +83,7 @@ func (s *S3Storage) SegmentWritten(localFilePath string) {
// Warn the user about long-running save operations
if averagePerformance != 0 {
if averagePerformance > float64(config.Config.GetVideoSegmentSecondsLength())*0.9 {
if averagePerformance > float64(data.GetStreamLatencyLevel().SecondsPerSegment)*0.9 {
log.Warnln("Possible slow uploads: average upload S3 save duration", averagePerformance, "s. troubleshoot this issue by visiting https://owncast.online/docs/troubleshooting/")
}
}

View File

@@ -10,8 +10,11 @@ import (
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/ffmpeg"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/rtmp"
"github.com/owncast/owncast/core/transcoder"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
"github.com/grafov/m3u8"
@@ -23,12 +26,20 @@ var _offlineCleanupTimer *time.Timer
// While a stream takes place cleanup old HLS content every N min.
var _onlineCleanupTicker *time.Ticker
var _currentBroadcast *models.CurrentBroadcast
// setStreamAsConnected sets the stream as connected.
func setStreamAsConnected() {
_stats.StreamConnected = true
_stats.LastConnectTime = utils.NullTime{Time: time.Now(), Valid: true}
_stats.LastDisconnectTime = utils.NullTime{Time: time.Now(), Valid: false}
_currentBroadcast = &models.CurrentBroadcast{
LatencyLevel: data.GetStreamLatencyLevel(),
OutputSettings: data.GetStreamOutputVariants(),
}
StopOfflineCleanupTimer()
startOnlineCleanupTimer()
@@ -37,23 +48,28 @@ func setStreamAsConnected() {
}
segmentPath := config.PublicHLSStoragePath
if config.Config.S3.Enabled {
s3Config := data.GetS3Config()
if err := setupStorage(); err != nil {
log.Fatalln("failed to setup the storage", err)
}
if s3Config.Enabled {
segmentPath = config.PrivateHLSStoragePath
}
go func() {
_transcoder = ffmpeg.NewTranscoder()
if _broadcaster != nil {
_transcoder.SetVideoOnly(_broadcaster.StreamDetails.VideoOnly)
}
_transcoder = transcoder.NewTranscoder()
_transcoder.TranscoderCompleted = func(error) {
SetStreamAsDisconnected()
_transcoder = nil
_currentBroadcast = nil
}
_transcoder.Start()
}()
ffmpeg.StartThumbnailGenerator(segmentPath, config.Config.VideoSettings.HighestQualityStreamIndex)
go webhooks.SendStreamStatusEvent(models.StreamStarted)
transcoder.StartThumbnailGenerator(segmentPath, data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings))
}
// SetStreamAsDisconnected sets the stream as disconnected.
@@ -65,14 +81,14 @@ func SetStreamAsDisconnected() {
offlineFilename := "offline.ts"
offlineFilePath := "static/" + offlineFilename
ffmpeg.StopThumbnailGenerator()
transcoder.StopThumbnailGenerator()
rtmp.Disconnect()
if _yp != nil {
_yp.Stop()
}
for index := range config.Config.GetVideoStreamQualities() {
for index := range data.GetStreamOutputVariants() {
playlistFilePath := fmt.Sprintf(filepath.Join(config.PrivateHLSStoragePath, "%d/stream.m3u8"), index)
segmentFilePath := fmt.Sprintf(filepath.Join(config.PrivateHLSStoragePath, "%d/%s"), index, offlineFilename)
@@ -97,7 +113,7 @@ func SetStreamAsDisconnected() {
}
variantPlaylist := playlist.(*m3u8.MediaPlaylist)
if len(variantPlaylist.Segments) > config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist() {
if len(variantPlaylist.Segments) > int(data.GetStreamLatencyLevel().SegmentCount) {
variantPlaylist.Segments = variantPlaylist.Segments[:len(variantPlaylist.Segments)]
}
@@ -144,6 +160,8 @@ func SetStreamAsDisconnected() {
StartOfflineCleanupTimer()
stopOnlineCleanupTimer()
go webhooks.SendStreamStatusEvent(models.StreamStopped)
}
// StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected.
@@ -153,6 +171,7 @@ func StartOfflineCleanupTimer() {
for range _offlineCleanupTimer.C {
// Reset the session count since the session is over
_stats.SessionMaxViewerCount = 0
// Set video to offline state
resetDirectories()
transitionToOfflineVideoStreamContent()
}
@@ -170,7 +189,7 @@ func startOnlineCleanupTimer() {
_onlineCleanupTicker = time.NewTicker(1 * time.Minute)
go func() {
for range _onlineCleanupTicker.C {
ffmpeg.CleanupOldContent(config.PrivateHLSStoragePath)
transcoder.CleanupOldContent(config.PrivateHLSStoragePath)
}
}()
}

View File

@@ -1,11 +1,11 @@
package ffmpeg
package transcoder
import (
"bytes"
"io"
"net"
"os"
"path/filepath"
"strconv"
"strings"
"net/http"
@@ -34,15 +34,22 @@ func (s *FileWriterReceiverService) SetupFileWriterReceiverService(callbacks Fil
httpServer := http.NewServeMux()
httpServer.HandleFunc("/", s.uploadHandler)
localListenerAddress := "127.0.0.1:" + strconv.Itoa(config.Config.GetPublicWebServerPort()+1)
localListenerAddress := "127.0.0.1:0"
go func() {
if err := http.ListenAndServe(localListenerAddress, httpServer); err != nil {
log.Fatal(err)
listener, err := net.Listen("tcp", localListenerAddress)
if err != nil {
log.Fatalln("Unable to start internal video writing service", err)
}
listenerPort := strings.Split(listener.Addr().String(), ":")[1]
config.InternalHLSListenerPort = listenerPort
log.Traceln("Transcoder response service listening on: " + listenerPort)
if err := http.Serve(listener, httpServer); err != nil {
log.Fatalln("Unable to start internal video writing service", err)
}
}()
log.Traceln("Transcoder response listening on: " + localListenerAddress)
}
func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http.Request) {
@@ -86,6 +93,6 @@ func (s *FileWriterReceiverService) fileWritten(path string) {
}
func returnError(err error, w http.ResponseWriter) {
log.Errorln(err)
log.Debugln(err)
http.Error(w, http.StatusText(http.StatusInternalServerError)+": "+err.Error(), http.StatusInternalServerError)
}

View File

@@ -1,4 +1,4 @@
package ffmpeg
package transcoder
import (
log "github.com/sirupsen/logrus"
@@ -7,14 +7,14 @@ import (
"path/filepath"
"sort"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
)
// CleanupOldContent will delete old files from the private dir that are no longer being referenced
// in the stream.
func CleanupOldContent(baseDirectory string) {
// Determine how many files we should keep on disk
maxNumber := config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist()
maxNumber := int(data.GetStreamLatencyLevel().SegmentCount)
buffer := 10
files, err := getAllFilesRecursive(baseDirectory)

View File

@@ -1,4 +1,4 @@
package ffmpeg
package transcoder
import (
"github.com/owncast/owncast/models"

View File

@@ -1,4 +1,4 @@
package ffmpeg
package transcoder
import (
"io/ioutil"
@@ -11,6 +11,8 @@ import (
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/utils"
)
var _timer *time.Ticker
@@ -78,9 +80,10 @@ func fireThumbnailGenerator(segmentPath string, variantIndex int) error {
}
mostRecentFile := path.Join(framePath, names[0])
ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath())
thumbnailCmdFlags := []string{
config.Config.GetFFMpegPath(),
ffmpegPath,
"-y", // Overwrite file
"-threads 1", // Low priority processing
"-t 1", // Pull from frame 1
@@ -96,7 +99,7 @@ func fireThumbnailGenerator(segmentPath string, variantIndex int) error {
}
// If YP support is enabled also create an animated GIF preview
if config.Config.YP.Enabled {
if data.GetDirectoryEnabled() {
makeAnimatedGifPreview(mostRecentFile, previewGifFile)
}
@@ -104,9 +107,11 @@ func fireThumbnailGenerator(segmentPath string, variantIndex int) error {
}
func makeAnimatedGifPreview(sourceFile string, outputFile string) {
ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath())
// Filter is pulled from https://engineering.giphy.com/how-to-make-gifs-with-ffmpeg/
animatedGifFlags := []string{
config.Config.GetFFMpegPath(),
ffmpegPath,
"-y", // Overwrite file
"-threads 1", // Low priority processing
"-i", sourceFile, // Input

View File

@@ -1,4 +1,4 @@
package ffmpeg
package transcoder
import (
"fmt"
@@ -10,6 +10,8 @@ import (
"github.com/teris-io/shortid"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
)
@@ -21,14 +23,15 @@ type Transcoder struct {
segmentOutputPath string
playlistOutputPath string
variants []HLSVariant
hlsPlaylistLength int
segmentLengthSeconds int
appendToStream bool
ffmpegPath string
segmentIdentifier string
internalListenerPort int
videoOnly bool // If true ignore any audio, if any
TranscoderCompleted func(error)
internalListenerPort string
currentStreamOutputSettings []models.StreamOutputVariant
currentLatencyLevel models.LatencyLevel
TranscoderCompleted func(error)
}
// HLSVariant is a combination of settings that results in a single HLS stream.
@@ -81,11 +84,8 @@ func (t *Transcoder) Start() {
command := t.getString()
log.Tracef("Video transcoder started with %d stream variants.", len(t.variants))
if t.videoOnly {
log.Tracef("Transcoder requested to operate on video only, ignoring audio.")
}
if config.Config.EnableDebugFeatures {
if config.EnableDebugFeatures {
log.Println(command)
}
@@ -103,16 +103,8 @@ func (t *Transcoder) Start() {
}
func (t *Transcoder) getString() string {
var port int
if config.Config != nil {
port = config.Config.GetPublicWebServerPort() + 1
} else if t.internalListenerPort != 0 {
port = t.internalListenerPort
} else {
log.Panicln("A internal port must be set for transcoder callback")
}
localListenerAddress := "http://127.0.0.1:" + strconv.Itoa(port)
var port = t.internalListenerPort
localListenerAddress := "http://127.0.0.1:" + port
hlsOptionFlags := []string{}
@@ -139,8 +131,8 @@ func (t *Transcoder) getString() string {
// HLS Output
"-f", "hls",
"-hls_time", strconv.Itoa(t.segmentLengthSeconds), // Length of each segment
"-hls_list_size", strconv.Itoa(t.hlsPlaylistLength), // Max # in variant playlist
"-hls_time", strconv.Itoa(t.currentLatencyLevel.SecondsPerSegment), // Length of each segment
"-hls_list_size", strconv.Itoa(t.currentLatencyLevel.SegmentCount), // Max # in variant playlist
"-hls_delete_threshold", "10", // Start deleting files after hls_list_size + 10
hlsOptionsString,
@@ -165,7 +157,7 @@ func (t *Transcoder) getString() string {
return strings.Join(ffmpegFlags, " ")
}
func getVariantFromConfigQuality(quality config.StreamQuality, index int) HLSVariant {
func getVariantFromConfigQuality(quality models.StreamOutputVariant, index int) HLSVariant {
variant := HLSVariant{}
variant.index = index
variant.isAudioPassthrough = quality.IsAudioPassthrough
@@ -202,12 +194,17 @@ func getVariantFromConfigQuality(quality config.StreamQuality, index int) HLSVar
// NewTranscoder will return a new Transcoder, populated by the config.
func NewTranscoder() *Transcoder {
ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath())
transcoder := new(Transcoder)
transcoder.ffmpegPath = config.Config.GetFFMpegPath()
transcoder.hlsPlaylistLength = config.Config.GetMaxNumberOfReferencedSegmentsInPlaylist()
transcoder.ffmpegPath = ffmpegPath
transcoder.internalListenerPort = config.InternalHLSListenerPort
transcoder.currentStreamOutputSettings = data.GetStreamOutputVariants()
transcoder.currentLatencyLevel = data.GetStreamLatencyLevel()
var outputPath string
if config.Config.S3.Enabled {
if data.GetS3Config().Enabled {
// Segments are not available via the local HTTP server
outputPath = config.PrivateHLSStoragePath
} else {
@@ -220,10 +217,8 @@ func NewTranscoder() *Transcoder {
transcoder.playlistOutputPath = config.PublicHLSStoragePath
transcoder.input = utils.GetTemporaryPipePath()
transcoder.segmentLengthSeconds = config.Config.GetVideoSegmentSecondsLength()
qualities := config.Config.GetVideoStreamQualities()
for index, quality := range qualities {
for index, quality := range transcoder.currentStreamOutputSettings {
variant := getVariantFromConfigQuality(quality, index)
transcoder.AddVariant(variant)
}
@@ -257,12 +252,7 @@ func (t *Transcoder) getVariantsString() string {
for _, variant := range t.variants {
variantsCommandFlags = variantsCommandFlags + " " + variant.getVariantString(t)
singleVariantMap := ""
if t.videoOnly {
singleVariantMap = fmt.Sprintf("v:%d ", variant.index)
} else {
singleVariantMap = fmt.Sprintf("v:%d,a:%d ", variant.index, variant.index)
}
singleVariantMap = fmt.Sprintf("v:%d,a:%d ", variant.index, variant.index)
variantsStreamMaps = variantsStreamMaps + singleVariantMap
}
variantsCommandFlags = variantsCommandFlags + " " + variantsStreamMaps + "\""
@@ -306,7 +296,7 @@ func (v *HLSVariant) getVideoQualityString(t *Transcoder) string {
// -1 to work around segments being generated slightly larger than expected.
// https://trac.ffmpeg.org/ticket/6915?replyto=58#comment:57
gop := (t.segmentLengthSeconds * v.framerate) - 1
gop := (t.currentLatencyLevel.SecondsPerSegment * v.framerate) - 1
// For limiting the output bitrate
// https://trac.ffmpeg.org/wiki/Limiting%20the%20output%20bitrate
@@ -374,16 +364,6 @@ func (t *Transcoder) SetOutputPath(output string) {
t.segmentOutputPath = output
}
// SetHLSPlaylistLength will set the max number of items in a HLS variant's playlist.
func (t *Transcoder) SetHLSPlaylistLength(length int) {
t.hlsPlaylistLength = length
}
// SetSegmentLength Specifies the number of seconds each segment should be.
func (t *Transcoder) SetSegmentLength(seconds int) {
t.segmentLengthSeconds = seconds
}
// SetAppendToStream enables appending to the HLS stream instead of overwriting.
func (t *Transcoder) SetAppendToStream(append bool) {
t.appendToStream = append
@@ -394,11 +374,6 @@ func (t *Transcoder) SetIdentifier(output string) {
t.segmentIdentifier = output
}
func (t *Transcoder) SetInternalHTTPPort(port int) {
func (t *Transcoder) SetInternalHTTPPort(port string) {
t.internalListenerPort = port
}
// SetVideoOnly will ignore any audio streams, if any.
func (t *Transcoder) SetVideoOnly(videoOnly bool) {
t.videoOnly = videoOnly
}

View File

@@ -1,18 +1,21 @@
package ffmpeg
package transcoder
import (
"testing"
"github.com/owncast/owncast/models"
)
func TestFFmpegCommand(t *testing.T) {
latencyLevel := models.GetLatencyLevel(3)
transcoder := new(Transcoder)
transcoder.ffmpegPath = "/fake/path/ffmpeg"
transcoder.SetSegmentLength(4)
transcoder.SetInput("fakecontent.flv")
transcoder.SetOutputPath("fakeOutput")
transcoder.SetHLSPlaylistLength(10)
transcoder.SetIdentifier("jdofFGg")
transcoder.SetInternalHTTPPort(8123)
transcoder.SetInternalHTTPPort("8123")
transcoder.currentLatencyLevel = latencyLevel
variant := HLSVariant{}
variant.videoBitrate = 1200
@@ -35,7 +38,7 @@ func TestFFmpegCommand(t *testing.T) {
cmd := transcoder.getString()
expected := `/fake/path/ffmpeg -hide_banner -loglevel warning -i fakecontent.flv -map v:0 -c:v:0 libx264 -b:v:0 1200k -maxrate:v:0 1272k -bufsize:v:0 1440k -g:v:0 119 -profile:v:0 high -r:v:0 30 -x264-params:v:0 "scenecut=0:open_gop=0:min-keyint=119:keyint=119" -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 libx264 -b:v:1 3500k -maxrate:v:1 3710k -bufsize:v:1 4200k -g:v:1 95 -profile:v:1 high -r:v:1 24 -x264-params:v:1 "scenecut=0:open_gop=0:min-keyint=95:keyint=95" -map a:0? -c:a:1 copy -preset faster -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 4 -hls_list_size 10 -hls_delete_threshold 10 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -strftime 1 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdofFGg%s.ts -max_muxing_queue_size 400 -method PUT -http_persistent 0 -fflags +genpts http://127.0.0.1:8123/%v/stream.m3u8 2> transcoder.log`
expected := `/fake/path/ffmpeg -hide_banner -loglevel warning -i fakecontent.flv -map v:0 -c:v:0 libx264 -b:v:0 1200k -maxrate:v:0 1272k -bufsize:v:0 1440k -g:v:0 89 -profile:v:0 high -r:v:0 30 -x264-params:v:0 "scenecut=0:open_gop=0:min-keyint=89:keyint=89" -map a:0? -c:a:0 copy -preset veryfast -map v:0 -c:v:1 libx264 -b:v:1 3500k -maxrate:v:1 3710k -bufsize:v:1 4200k -g:v:1 71 -profile:v:1 high -r:v:1 24 -x264-params:v:1 "scenecut=0:open_gop=0:min-keyint=71:keyint=71" -map a:0? -c:a:1 copy -preset faster -map v:0 -c:v:2 copy -map a:0? -c:a:2 copy -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2 " -f hls -hls_time 3 -hls_list_size 3 -hls_delete_threshold 10 -tune zerolatency -pix_fmt yuv420p -sc_threshold 0 -master_pl_name stream.m3u8 -strftime 1 -hls_segment_filename http://127.0.0.1:8123/%v/stream-jdofFGg%s.ts -max_muxing_queue_size 400 -method PUT -http_persistent 0 -fflags +genpts http://127.0.0.1:8123/%v/stream.m3u8 2> transcoder.log`
if cmd != expected {
t.Errorf("ffmpeg command does not match expected.\nGot %s\n, want: %s", cmd, expected)

39
core/webhooks/chat.go Normal file
View File

@@ -0,0 +1,39 @@
package webhooks
import (
"github.com/owncast/owncast/models"
)
func SendChatEvent(chatEvent models.ChatEvent) {
webhookEvent := WebhookEvent{
Type: chatEvent.MessageType,
EventData: &WebhookChatMessage{
Author: chatEvent.Author,
Body: chatEvent.Body,
RawBody: chatEvent.RawBody,
ID: chatEvent.ID,
Visible: chatEvent.Visible,
Timestamp: &chatEvent.Timestamp,
},
}
SendEventToWebhooks(webhookEvent)
}
func SendChatEventUsernameChanged(event models.NameChangeEvent) {
webhookEvent := WebhookEvent{
Type: models.UserNameChanged,
EventData: event,
}
SendEventToWebhooks(webhookEvent)
}
func SendChatEventUserJoined(event models.UserJoinedEvent) {
webhookEvent := WebhookEvent{
Type: models.UserNameChanged,
EventData: event,
}
SendEventToWebhooks(webhookEvent)
}

7
core/webhooks/stream.go Normal file
View File

@@ -0,0 +1,7 @@
package webhooks
import "github.com/owncast/owncast/models"
func SendStreamStatusEvent(eventType models.EventType) {
SendEventToWebhooks(WebhookEvent{Type: eventType})
}

67
core/webhooks/webhooks.go Normal file
View File

@@ -0,0 +1,67 @@
package webhooks
import (
"bytes"
"encoding/json"
"net/http"
"time"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
)
type WebhookEvent struct {
Type models.EventType `json:"type"` // messageSent | userJoined | userNameChange
EventData interface{} `json:"eventData,omitempty"`
}
type WebhookChatMessage struct {
Author string `json:"author,omitempty"`
Body string `json:"body,omitempty"`
RawBody string `json:"rawBody,omitempty"`
ID string `json:"id,omitempty"`
Visible bool `json:"visible"`
Timestamp *time.Time `json:"timestamp,omitempty"`
}
func SendEventToWebhooks(payload WebhookEvent) {
webhooks := data.GetWebhooksForEvent(payload.Type)
for _, webhook := range webhooks {
log.Debugf("Event %s sent to Webhook %s", payload.Type, webhook.URL)
if err := sendWebhook(webhook.URL, payload); err != nil {
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", payload.Type, webhook.URL, err)
}
}
}
func sendWebhook(url string, payload WebhookEvent) error {
jsonText, err := json.Marshal(payload)
if err != nil {
return err
}
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonText))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if err := data.SetWebhookAsUsed(url); err != nil {
log.Warnln(err)
}
return nil
}