Config repository (#3988)

* WIP

* fix(test): fix ap test failing

* fix: fix unkeyed fields being used

* chore(tests): clean up browser tests by splitting out federation UI tests
This commit is contained in:
Gabe Kangas
2024-11-15 19:20:58 -08:00
committed by GitHub
parent 56d52c283c
commit 0b5d7c8a4d
88 changed files with 2078 additions and 1643 deletions

View File

@@ -7,8 +7,8 @@ import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
@@ -23,6 +23,8 @@ var (
func Start(getStatusFunc func() models.Status) error {
setupPersistence()
configRepository := configrepository.Get()
getStatus = getStatusFunc
_server = NewChat()
@@ -35,7 +37,7 @@ func Start(getStatusFunc func() models.Status) error {
Help: "The number of chat messages incremented over time.",
ConstLabels: map[string]string{
"version": config.VersionNumber,
"host": data.GetServerURL(),
"host": configRepository.GetServerURL(),
},
})

View File

@@ -13,8 +13,8 @@ import (
"github.com/gorilla/websocket"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/services/geoip"
)
@@ -133,7 +133,9 @@ func (c *Client) readPump() {
}
// Check if this message passes the optional language filter
if data.GetChatSlurFilterEnabled() && !c.messageFilter.Allow(string(message)) {
configRepository := configrepository.Get()
if configRepository.GetChatSlurFilterEnabled() && !c.messageFilter.Allow(string(message)) {
c.sendAction("Sorry, that message contained language that is not allowed in this chat.")
continue
}
@@ -209,9 +211,11 @@ func (c *Client) close() {
}
func (c *Client) passesRateLimit() bool {
configRepository := configrepository.Get()
// If spam rate limiting is disabled, or the user is a moderator, always
// allow the message.
if !data.GetChatSpamProtectionEnabled() || c.User.IsModerator() {
if !configRepository.GetChatSpamProtectionEnabled() || c.User.IsModerator() {
return true
}

View File

@@ -8,8 +8,8 @@ import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/persistence/userrepository"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
@@ -22,10 +22,12 @@ func (s *Server) userNameChanged(eventData chatClientEvent) {
return
}
configRepository := configrepository.Get()
proposedUsername := receivedEvent.NewName
// Check if name is on the blocklist
blocklist := data.GetForbiddenUsernameList()
blocklist := configRepository.GetForbiddenUsernameList()
// Names have a max length
proposedUsername = utils.MakeSafeStringOfLength(proposedUsername, config.MaxChatDisplayNameLength)

View File

@@ -1,6 +1,8 @@
package events
import "github.com/owncast/owncast/core/data"
import (
"github.com/owncast/owncast/persistence/configrepository"
)
// FediverseEngagementEvent is a message displayed in chat on representing an action on the Fediverse.
type FediverseEngagementEvent struct {
@@ -13,6 +15,8 @@ type FediverseEngagementEvent struct {
// GetBroadcastPayload will return the object to send to all chat users.
func (e *FediverseEngagementEvent) GetBroadcastPayload() EventPayload {
configRepository := configrepository.Get()
return EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
@@ -22,7 +26,7 @@ func (e *FediverseEngagementEvent) GetBroadcastPayload() EventPayload {
"title": e.UserAccountName,
"link": e.Link,
"user": EventPayload{
"displayName": data.GetServerName(),
"displayName": configRepository.GetServerName(),
},
}
}

View File

@@ -1,6 +1,8 @@
package events
import "github.com/owncast/owncast/core/data"
import (
"github.com/owncast/owncast/persistence/configrepository"
)
// SystemMessageEvent is a message displayed in chat on behalf of the server.
type SystemMessageEvent struct {
@@ -10,13 +12,15 @@ type SystemMessageEvent struct {
// GetBroadcastPayload will return the object to send to all chat users.
func (e *SystemMessageEvent) GetBroadcastPayload() EventPayload {
configRepository := configrepository.Get()
return EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
"body": e.Body,
"type": SystemMessageSent,
"user": EventPayload{
"displayName": data.GetServerName(),
"displayName": configRepository.GetServerName(),
},
}
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/authrepository"
"github.com/owncast/owncast/persistence/tables"
log "github.com/sirupsen/logrus"
@@ -24,7 +25,9 @@ const (
func setupPersistence() {
_datastore = data.GetDatastore()
tables.CreateMessagesTable(_datastore.DB)
data.CreateBanIPTable(_datastore.DB)
authRepository := authrepository.Get()
authRepository.CreateBanIPTable(_datastore.DB)
chatDataPruner := time.NewTicker(5 * time.Minute)
go func() {

View File

@@ -13,9 +13,10 @@ import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/authrepository"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/persistence/userrepository"
"github.com/owncast/owncast/services/geoip"
"github.com/owncast/owncast/utils"
@@ -95,7 +96,9 @@ func (s *Server) Addclient(conn *websocket.Conn, user *models.User, accessToken
ConnectedAt: time.Now(),
}
shouldSendJoinedMessages := data.GetChatJoinPartMessagesEnabled()
configRepository := configrepository.Get()
shouldSendJoinedMessages := configRepository.GetChatJoinPartMessagesEnabled()
// If there are existing clients connected for this user do not send
// a user joined message. Do not put this under a mutex, as
@@ -186,8 +189,10 @@ func (s *Server) sendUserPartedMessage(c *Client) {
userPartEvent.User = c.User
userPartEvent.ClientID = c.Id
configRepository := configrepository.Get()
// If part messages are disabled.
if data.GetChatJoinPartMessagesEnabled() {
if configRepository.GetChatJoinPartMessagesEnabled() {
if err := s.Broadcast(userPartEvent.GetBroadcastPayload()); err != nil {
log.Errorln("error sending chat part message", err)
}
@@ -198,14 +203,17 @@ func (s *Server) sendUserPartedMessage(c *Client) {
// HandleClientConnection is fired when a single client connects to the websocket.
func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request) {
if data.GetChatDisabled() {
configRepository := configrepository.Get()
authRepository := authrepository.Get()
if configRepository.GetChatDisabled() {
_, _ = w.Write([]byte(events.ChatDisabled))
return
}
ipAddress := utils.GetIPAddressFromRequest(r)
// Check if this client's IP address is banned. If so send a rejection.
if blocked, err := data.IsIPAddressBanned(ipAddress); blocked {
if blocked, err := authRepository.IsIPAddressBanned(ipAddress); blocked {
log.Debugln("Client ip address has been blocked. Rejecting.")
w.WriteHeader(http.StatusForbidden)
@@ -377,12 +385,14 @@ func SendActionToUser(userID string, text string) error {
}
func (s *Server) eventReceived(event chatClientEvent) {
configRepository := configrepository.Get()
c := event.client
u := c.User
// If established chat user only mode is enabled and the user is not old
// enough then reject this event and send them an informative message.
if u != nil && data.GetChatEstbalishedUsersOnlyMode() && time.Since(event.client.User.CreatedAt) < config.GetDefaults().ChatEstablishedUserModeTimeDuration && !u.IsModerator() {
if u != nil && configRepository.GetChatEstbalishedUsersOnlyMode() && time.Since(event.client.User.CreatedAt) < config.GetDefaults().ChatEstablishedUserModeTimeDuration && !u.IsModerator() {
s.sendActionToClient(c, "You have not been an established chat participant long enough to take part in chat. Please enjoy the stream and try again later.")
return
}
@@ -409,10 +419,12 @@ func (s *Server) eventReceived(event chatClientEvent) {
}
func (s *Server) sendWelcomeMessageToClient(c *Client) {
configRepository := configrepository.Get()
// Add an artificial delay so people notice this message come in.
time.Sleep(7 * time.Second)
welcomeMessage := utils.RenderSimpleMarkdown(data.GetServerWelcomeMessage())
welcomeMessage := utils.RenderSimpleMarkdown(configRepository.GetServerWelcomeMessage())
if welcomeMessage != "" {
s.sendSystemMessageToClient(c, welcomeMessage)
@@ -420,7 +432,9 @@ func (s *Server) sendWelcomeMessageToClient(c *Client) {
}
func (s *Server) sendAllWelcomeMessage() {
welcomeMessage := utils.RenderSimpleMarkdown(data.GetServerWelcomeMessage())
configRepository := configrepository.Get()
welcomeMessage := utils.RenderSimpleMarkdown(configRepository.GetServerWelcomeMessage())
if welcomeMessage != "" {
clientMessage := events.SystemMessageEvent{

View File

@@ -16,6 +16,7 @@ import (
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/notifications"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/persistence/tables"
"github.com/owncast/owncast/utils"
"github.com/owncast/owncast/yp"
@@ -34,10 +35,10 @@ var (
// Start starts up the core processing.
func Start() error {
resetDirectories()
configRepository := configrepository.Get()
// configRepository.PopulateDefaults()
data.PopulateDefaults()
if err := data.VerifySettings(); err != nil {
if err := configRepository.VerifySettings(); err != nil {
log.Error(err)
return err
}
@@ -75,7 +76,7 @@ func Start() error {
// start the rtmp server
go rtmp.Start(setStreamAsConnected, setBroadcaster)
rtmpPort := data.GetRTMPPortNumber()
rtmpPort := configRepository.GetRTMPPortNumber()
if rtmpPort != 1935 {
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
}
@@ -113,7 +114,8 @@ func transitionToOfflineVideoStreamContent() {
go _transcoder.Start(false)
// Copy the logo to be the thumbnail
logo := data.GetLogoPath()
configRepository := configrepository.Get()
logo := configRepository.GetLogoPath()
dst := filepath.Join(config.TempDir, "thumbnail.jpg")
if err = utils.Copy(filepath.Join("data", logo), dst); err != nil {
log.Warnln(err)
@@ -130,7 +132,8 @@ func resetDirectories() {
utils.CleanupDirectory(config.HLSStoragePath)
// Remove the previous thumbnail
logo := data.GetLogoPath()
configRepository := configrepository.Get()
logo := configRepository.GetLogoPath()
if utils.DoesFileExists(logo) {
err := utils.Copy(path.Join("data", logo), filepath.Join(config.DataDirectory, "thumbnail.jpg"))
if err != nil {

View File

@@ -1,13 +0,0 @@
package data
// GetFederatedInboxMap is a mapping between account names and their outbox.
func GetFederatedInboxMap() map[string]string {
return map[string]string{
GetDefaultFederationUsername(): GetDefaultFederationUsername(),
}
}
// GetDefaultFederationUsername will return the username used for sending federation activities.
func GetDefaultFederationUsername() string {
return GetFederationUsername()
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,60 +0,0 @@
package data
import (
"bytes"
"encoding/gob"
)
// ConfigEntry is the actual object saved to the database.
// The Value is encoded using encoding/gob.
type ConfigEntry struct {
Value interface{}
Key string
}
func (c *ConfigEntry) getStringSlice() ([]string, error) {
decoder := c.getDecoder()
var result []string
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getStringMap() (map[string]string, error) {
decoder := c.getDecoder()
var result map[string]string
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getString() (string, error) {
decoder := c.getDecoder()
var result string
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getNumber() (float64, error) {
decoder := c.getDecoder()
var result float64
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getBool() (bool, error) {
decoder := c.getDecoder()
var result bool
err := decoder.Decode(&result)
return result, err
}
func (c *ConfigEntry) getObject(result interface{}) error {
decoder := c.getDecoder()
err := decoder.Decode(result)
return err
}
func (c *ConfigEntry) getDecoder() *gob.Decoder {
valueBytes := c.Value.([]byte)
decoder := gob.NewDecoder(bytes.NewBuffer(valueBytes))
return decoder
}

View File

@@ -1,23 +0,0 @@
package data
// GetPublicKey will return the public key.
func GetPublicKey() string {
value, _ := _datastore.GetString(publicKeyKey)
return value
}
// SetPublicKey will save the public key.
func SetPublicKey(key string) error {
return _datastore.SetString(publicKeyKey, key)
}
// GetPrivateKey will return the private key.
func GetPrivateKey() string {
value, _ := _datastore.GetString(privateKeyKey)
return value
}
// SetPrivateKey will save the private key.
func SetPrivateKey(key string) error {
return _datastore.SetString(privateKeyKey, key)
}

View File

@@ -4,6 +4,8 @@ import (
"fmt"
"os"
"testing"
"github.com/owncast/owncast/models"
)
func TestMain(m *testing.M) {
@@ -89,7 +91,7 @@ func TestCustomType(t *testing.T) {
}
// Save config entry to the database
if err := _datastore.Save(ConfigEntry{&testStruct, testKey}); err != nil {
if err := _datastore.Save(models.ConfigEntry{&testStruct, testKey}); err != nil {
t.Error(err)
}
@@ -101,7 +103,7 @@ func TestCustomType(t *testing.T) {
// Get a typed struct out of it
var testResult TestStruct
if err := entryResult.getObject(&testResult); err != nil {
if err := entryResult.GetObject(&testResult); err != nil {
t.Error(err)
}
@@ -121,7 +123,7 @@ func TestStringMap(t *testing.T) {
}
// Save config entry to the database
if err := _datastore.Save(ConfigEntry{&testMap, testKey}); err != nil {
if err := _datastore.Save(models.ConfigEntry{Value: &testMap, Key: testKey}); err != nil {
t.Error(err)
}
@@ -131,7 +133,7 @@ func TestStringMap(t *testing.T) {
t.Error(err)
}
testResult, err := entryResult.getStringMap()
testResult, err := entryResult.GetStringMap()
if err != nil {
t.Error(err)
}

View File

@@ -5,12 +5,11 @@ import (
"database/sql"
"encoding/gob"
"sync"
"time"
// sqlite requires a blank import.
_ "github.com/mattn/go-sqlite3"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/db"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
@@ -21,7 +20,8 @@ type Datastore struct {
DbLock *sync.Mutex
}
func (ds *Datastore) warmCache() {
// WarmCache pre-caches all configuration values in memory.
func (ds *Datastore) WarmCache() {
log.Traceln("Warming config value cache")
res, err := ds.DB.Query("SELECT key, value FROM datastore")
@@ -46,10 +46,10 @@ func (ds *Datastore) GetQueries() *db.Queries {
}
// Get will query the database for the key and return the entry.
func (ds *Datastore) Get(key string) (ConfigEntry, error) {
func (ds *Datastore) Get(key string) (models.ConfigEntry, error) {
cachedValue, err := ds.GetCachedValue(key)
if err == nil {
return ConfigEntry{
return models.ConfigEntry{
Key: key,
Value: cachedValue,
}, nil
@@ -60,10 +60,10 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) {
row := ds.DB.QueryRow("SELECT key, value FROM datastore WHERE key = ? LIMIT 1", key)
if err := row.Scan(&resultKey, &resultValue); err != nil {
return ConfigEntry{}, err
return models.ConfigEntry{}, err
}
result := ConfigEntry{
result := models.ConfigEntry{
Key: resultKey,
Value: resultValue,
}
@@ -73,7 +73,7 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) {
}
// Save will save the ConfigEntry to the database.
func (ds *Datastore) Save(e ConfigEntry) error {
func (ds *Datastore) Save(e models.ConfigEntry) error {
ds.DbLock.Lock()
defer ds.DbLock.Unlock()
@@ -93,7 +93,6 @@ func (ds *Datastore) Save(e ConfigEntry) error {
return err
}
_, err = stmt.Exec(e.Key, dataGob.Bytes())
if err != nil {
return err
}
@@ -121,26 +120,6 @@ func (ds *Datastore) Setup() {
);`
ds.MustExec(createTableSQL)
if !HasPopulatedDefaults() {
PopulateDefaults()
}
if !hasPopulatedFederationDefaults() {
if err := SetFederationGoLiveMessage(config.GetDefaults().FederationGoLiveMessage); err != nil {
log.Errorln(err)
}
if err := _datastore.SetBool("HAS_POPULATED_FEDERATION_DEFAULTS", true); err != nil {
log.Errorln(err)
}
}
// Set the server initialization date if needed.
if hasSetInitDate, _ := GetServerInitTime(); hasSetInitDate == nil || !hasSetInitDate.Valid {
_ = SetServerInitTime(time.Now())
}
migrateDatastoreValues(_datastore)
}
// Reset will delete all config entries in the datastore and start over.
@@ -156,8 +135,6 @@ func (ds *Datastore) Reset() {
if _, err = stmt.Exec(); err != nil {
log.Fatalln(err)
}
PopulateDefaults()
}
// GetDatastore returns the shared instance of the owncast datastore.

View File

@@ -1,86 +0,0 @@
package data
import (
"strings"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
const (
datastoreValuesVersion = 4
datastoreValueVersionKey = "DATA_STORE_VERSION"
)
func migrateDatastoreValues(datastore *Datastore) {
currentVersion, _ := _datastore.GetNumber(datastoreValueVersionKey)
if currentVersion == 0 {
currentVersion = datastoreValuesVersion
}
for v := currentVersion; v < datastoreValuesVersion; v++ {
log.Infof("Migration datastore values from %d to %d\n", int(v), int(v+1))
switch v {
case 0:
migrateToDatastoreValues1(datastore)
case 1:
migrateToDatastoreValues2(datastore)
case 2:
migrateToDatastoreValues3ServingEndpoint3(datastore)
case 3:
migrateToDatastoreValues4(datastore)
default:
log.Fatalln("missing datastore values migration step")
}
}
if err := _datastore.SetNumber(datastoreValueVersionKey, datastoreValuesVersion); err != nil {
log.Errorln("error setting datastore value version:", err)
}
}
func migrateToDatastoreValues1(datastore *Datastore) {
// Migrate the forbidden usernames to be a slice instead of a string.
forbiddenUsernamesString, _ := datastore.GetString(blockedUsernamesKey)
if forbiddenUsernamesString != "" {
forbiddenUsernamesSlice := strings.Split(forbiddenUsernamesString, ",")
if err := datastore.SetStringSlice(blockedUsernamesKey, forbiddenUsernamesSlice); err != nil {
log.Errorln("error migrating blocked username list:", err)
}
}
// Migrate the suggested usernames to be a slice instead of a string.
suggestedUsernamesString, _ := datastore.GetString(suggestedUsernamesKey)
if suggestedUsernamesString != "" {
suggestedUsernamesSlice := strings.Split(suggestedUsernamesString, ",")
if err := datastore.SetStringSlice(suggestedUsernamesKey, suggestedUsernamesSlice); err != nil {
log.Errorln("error migrating suggested username list:", err)
}
}
}
func migrateToDatastoreValues2(datastore *Datastore) {
oldAdminPassword, _ := datastore.GetString("stream_key")
// Avoids double hashing the password
_ = datastore.SetString("admin_password_key", oldAdminPassword)
_ = SetStreamKeys([]models.StreamKey{
{Key: oldAdminPassword, Comment: "Default stream key"},
})
}
func migrateToDatastoreValues3ServingEndpoint3(_ *Datastore) {
s3Config := GetS3Config()
if !s3Config.Enabled {
return
}
_ = SetVideoServingEndpoint(s3Config.ServingEndpoint)
}
func migrateToDatastoreValues4(datastore *Datastore) {
unhashed_pass, _ := datastore.GetString("admin_password_key")
err := SetAdminPassword(unhashed_pass)
if err != nil {
log.Fatalln("error migrating admin password:", err)
}
}

View File

@@ -1,54 +0,0 @@
package data
import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/models"
)
// HasPopulatedDefaults will determine if the defaults have been inserted into the database.
func HasPopulatedDefaults() bool {
hasPopulated, err := _datastore.GetBool("HAS_POPULATED_DEFAULTS")
if err != nil {
return false
}
return hasPopulated
}
func hasPopulatedFederationDefaults() bool {
hasPopulated, err := _datastore.GetBool("HAS_POPULATED_FEDERATION_DEFAULTS")
if err != nil {
return false
}
return hasPopulated
}
// PopulateDefaults will set default values in the database.
func PopulateDefaults() {
_datastore.warmCache()
defaults := config.GetDefaults()
if HasPopulatedDefaults() {
return
}
_ = SetAdminPassword(defaults.AdminPassword)
_ = SetStreamKeys(defaults.StreamKeys)
_ = SetHTTPPortNumber(float64(defaults.WebServerPort))
_ = SetRTMPPortNumber(float64(defaults.RTMPServerPort))
_ = SetLogoPath(defaults.Logo)
_ = SetServerMetadataTags([]string{"owncast", "streaming"})
_ = SetServerSummary(defaults.Summary)
_ = SetServerWelcomeMessage("")
_ = SetServerName(defaults.Name)
_ = SetExtraPageBodyContent(defaults.PageBodyContent)
_ = SetFederationGoLiveMessage(defaults.FederationGoLiveMessage)
_ = SetSocialHandles([]models.SocialHandle{
{
Platform: "github",
URL: "https://github.com/owncast/owncast",
},
})
_ = _datastore.SetBool("HAS_POPULATED_DEFAULTS", true)
}

View File

@@ -1,14 +1,5 @@
package data
import (
"context"
"database/sql"
"github.com/owncast/owncast/db"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
// GetMessagesCount will return the number of messages in the database.
func GetMessagesCount() int64 {
query := `SELECT COUNT(*) FROM messages`
@@ -25,58 +16,3 @@ func GetMessagesCount() int64 {
}
return count
}
// CreateBanIPTable will create the IP ban table if needed.
func CreateBanIPTable(db *sql.DB) {
createTableSQL := ` CREATE TABLE IF NOT EXISTS ip_bans (
"ip_address" TEXT NOT NULL PRIMARY KEY,
"notes" TEXT,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`
stmt, err := db.Prepare(createTableSQL)
if err != nil {
log.Fatal("error creating ip ban table", err)
}
defer stmt.Close()
if _, err := stmt.Exec(); err != nil {
log.Fatal("error creating ip ban table", err)
}
}
// BanIPAddress will persist a new IP address ban to the datastore.
func BanIPAddress(address, note string) error {
return _datastore.GetQueries().BanIPAddress(context.Background(), db.BanIPAddressParams{
IpAddress: address,
Notes: sql.NullString{String: note, Valid: true},
})
}
// IsIPAddressBanned will return if an IP address has been previously blocked.
func IsIPAddressBanned(address string) (bool, error) {
blocked, error := _datastore.GetQueries().IsIPAddressBlocked(context.Background(), address)
return blocked > 0, error
}
// GetIPAddressBans will return all the banned IP addresses.
func GetIPAddressBans() ([]models.IPAddress, error) {
result, err := _datastore.GetQueries().GetIPAddressBans(context.Background())
if err != nil {
return nil, err
}
response := []models.IPAddress{}
for _, ip := range result {
response = append(response, models.IPAddress{
IPAddress: ip.IpAddress,
Notes: ip.Notes.String,
CreatedAt: ip.CreatedAt.Time,
})
}
return response, err
}
// RemoveIPAddressBan will remove a previously banned IP address.
func RemoveIPAddressBan(address string) error {
return _datastore.GetQueries().RemoveIPAddressBan(context.Background(), address)
}

View File

@@ -1,17 +1,19 @@
package data
import "github.com/owncast/owncast/models"
// GetStringSlice will return the string slice value for a key.
func (ds *Datastore) GetStringSlice(key string) ([]string, error) {
configEntry, err := ds.Get(key)
if err != nil {
return []string{}, err
}
return configEntry.getStringSlice()
return configEntry.GetStringSlice()
}
// SetStringSlice will set the string slice value for a key.
func (ds *Datastore) SetStringSlice(key string, value []string) error {
configEntry := ConfigEntry{value, key}
configEntry := models.ConfigEntry{Value: value, Key: key}
return ds.Save(configEntry)
}
@@ -21,12 +23,12 @@ func (ds *Datastore) GetString(key string) (string, error) {
if err != nil {
return "", err
}
return configEntry.getString()
return configEntry.GetString()
}
// SetString will set the string value for a key.
func (ds *Datastore) SetString(key string, value string) error {
configEntry := ConfigEntry{value, key}
configEntry := models.ConfigEntry{Value: value, Key: key}
return ds.Save(configEntry)
}
@@ -36,12 +38,12 @@ func (ds *Datastore) GetNumber(key string) (float64, error) {
if err != nil {
return 0, err
}
return configEntry.getNumber()
return configEntry.GetNumber()
}
// SetNumber will set the numeric value for a key.
func (ds *Datastore) SetNumber(key string, value float64) error {
configEntry := ConfigEntry{value, key}
configEntry := models.ConfigEntry{Value: value, Key: key}
return ds.Save(configEntry)
}
@@ -51,12 +53,12 @@ func (ds *Datastore) GetBool(key string) (bool, error) {
if err != nil {
return false, err
}
return configEntry.getBool()
return configEntry.GetBool()
}
// SetBool will set the boolean value for a key.
func (ds *Datastore) SetBool(key string, value bool) error {
configEntry := ConfigEntry{value, key}
configEntry := models.ConfigEntry{Value: value, Key: key}
return ds.Save(configEntry)
}
@@ -66,11 +68,11 @@ func (ds *Datastore) GetStringMap(key string) (map[string]string, error) {
if err != nil {
return map[string]string{}, err
}
return configEntry.getStringMap()
return configEntry.GetStringMap()
}
// SetStringMap will set the string map value for a key.
func (ds *Datastore) SetStringMap(key string, value map[string]string) error {
configEntry := ConfigEntry{value, key}
configEntry := models.ConfigEntry{Value: value, Key: key}
return ds.Save(configEntry)
}

View File

@@ -12,8 +12,8 @@ import (
"github.com/nareix/joy5/format/rtmp"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
)
var _hasInboundRTMPConnection = false
@@ -33,7 +33,9 @@ func Start(setStreamAsConnected func(*io.PipeReader), setBroadcaster func(models
_setStreamAsConnected = setStreamAsConnected
_setBroadcaster = setBroadcaster
port := data.GetRTMPPortNumber()
configRepository := configrepository.Get()
port := configRepository.GetRTMPPortNumber()
s := rtmp.NewServer()
var lis net.Listener
var err error
@@ -78,8 +80,10 @@ func HandleConn(c *rtmp.Conn, nc net.Conn) {
return
}
configRepository := configrepository.Get()
accessGranted := false
validStreamingKeys := data.GetStreamKeys()
validStreamingKeys := configRepository.GetStreamKeys()
// If a stream key override was specified then use that instead.
if config.TemporaryStreamKey != "" {

View File

@@ -7,8 +7,8 @@ import (
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/services/geoip"
)
@@ -48,7 +48,8 @@ func IsStreamConnected() bool {
// Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available.
// So account for that with an artificial buffer of four segments.
timeSinceLastConnected := time.Since(_stats.LastConnectTime.Time).Seconds()
waitTime := math.Max(float64(data.GetStreamLatencyLevel().SecondsPerSegment)*3.0, 7)
configRepository := configrepository.Get()
waitTime := math.Max(float64(configRepository.GetStreamLatencyLevel().SecondsPerSegment)*3.0, 7)
if timeSinceLastConnected < waitTime {
return false
}
@@ -75,7 +76,7 @@ func SetViewerActive(viewer *models.Viewer) {
l.Lock()
defer l.Unlock()
// Asynchronously, optionally, fetch GeoIP data.
// Asynchronously, optionally, fetch GeoIP configRepository.
go func(viewer *models.Viewer) {
viewer.Geo = _geoIPClient.GetGeoFromIP(viewer.IPAddress)
}(viewer)
@@ -111,27 +112,29 @@ func pruneViewerCount() {
}
func saveStats() {
if err := data.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil {
configRepository := configrepository.Get()
if err := configRepository.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil {
log.Errorln("error saving viewer count", err)
}
if err := data.SetPeakSessionViewerCount(_stats.SessionMaxViewerCount); err != nil {
if err := configRepository.SetPeakSessionViewerCount(_stats.SessionMaxViewerCount); err != nil {
log.Errorln("error saving viewer count", err)
}
if _stats.LastDisconnectTime != nil && _stats.LastDisconnectTime.Valid {
if err := data.SetLastDisconnectTime(_stats.LastDisconnectTime.Time); err != nil {
if err := configRepository.SetLastDisconnectTime(_stats.LastDisconnectTime.Time); err != nil {
log.Errorln("error saving disconnect time", err)
}
}
}
func getSavedStats() models.Stats {
savedLastDisconnectTime, _ := data.GetLastDisconnectTime()
configRepository := configrepository.Get()
savedLastDisconnectTime, _ := configRepository.GetLastDisconnectTime()
result := models.Stats{
ChatClients: make(map[string]models.Client),
Viewers: make(map[string]*models.Viewer),
SessionMaxViewerCount: data.GetPeakSessionViewerCount(),
OverallMaxViewerCount: data.GetPeakOverallViewerCount(),
SessionMaxViewerCount: configRepository.GetPeakSessionViewerCount(),
OverallMaxViewerCount: configRepository.GetPeakOverallViewerCount(),
LastDisconnectTime: savedLastDisconnectTime,
}

View File

@@ -2,8 +2,8 @@ package core
import (
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
)
// GetStatus gets the status of the system.
@@ -17,6 +17,7 @@ func GetStatus() models.Status {
viewerCount = len(_stats.Viewers)
}
configRepository := configrepository.Get()
return models.Status{
Online: IsStreamConnected(),
ViewerCount: viewerCount,
@@ -25,7 +26,7 @@ func GetStatus() models.Status {
LastDisconnectTime: _stats.LastDisconnectTime,
LastConnectTime: _stats.LastConnectTime,
VersionNumber: config.VersionNumber,
StreamTitle: data.GetStreamTitle(),
StreamTitle: configRepository.GetStreamTitle(),
}
}

View File

@@ -1,12 +1,13 @@
package core
import (
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/storageproviders"
"github.com/owncast/owncast/persistence/configrepository"
)
func setupStorage() error {
s3Config := data.GetS3Config()
configRepository := configrepository.Get()
s3Config := configRepository.GetS3Config()
if s3Config.Enabled {
_storage = storageproviders.NewS3Storage()

View File

@@ -5,9 +5,8 @@ import (
"path/filepath"
"sort"
"github.com/owncast/owncast/persistence/configrepository"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/core/data"
)
// LocalStorage represents an instance of the local storage provider for HLS video.
@@ -22,7 +21,8 @@ func NewLocalStorage() *LocalStorage {
// Setup configures this storage provider.
func (s *LocalStorage) Setup() error {
s.host = data.GetVideoServingEndpoint()
configRepository := configrepository.Get()
s.host = configRepository.GetVideoServingEndpoint()
return nil
}
@@ -63,7 +63,8 @@ func (s *LocalStorage) Save(filePath string, retryCount int) (string, error) {
// Cleanup will remove old files from the storage provider.
func (s *LocalStorage) Cleanup() error {
// Determine how many files we should keep on disk
maxNumber := data.GetStreamLatencyLevel().SegmentCount
configRepository := configrepository.Get()
maxNumber := configRepository.GetStreamLatencyLevel().SegmentCount
buffer := 10
return localCleanup(maxNumber + buffer)
}

View File

@@ -11,7 +11,7 @@ import (
"sync"
"time"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@@ -64,9 +64,9 @@ func NewS3Storage() *S3Storage {
// Setup sets up the s3 storage for saving the video to s3.
func (s *S3Storage) Setup() error {
log.Trace("Setting up S3 for external storage of video...")
s3Config := data.GetS3Config()
customVideoServingEndpoint := data.GetVideoServingEndpoint()
configRepository := configrepository.Get()
s3Config := configRepository.GetS3Config()
customVideoServingEndpoint := configRepository.GetVideoServingEndpoint()
if customVideoServingEndpoint != "" {
s.host = customVideoServingEndpoint
@@ -106,8 +106,9 @@ func (s *S3Storage) SegmentWritten(localFilePath string) {
averagePerformance := utils.GetAveragePerformance(performanceMonitorKey)
// Warn the user about long-running save operations
configRepository := configrepository.Get()
if averagePerformance != 0 {
if averagePerformance > float64(data.GetStreamLatencyLevel().SecondsPerSegment)*0.9 {
if averagePerformance > float64(configRepository.GetStreamLatencyLevel().SecondsPerSegment)*0.9 {
log.Warnln("Possible slow uploads: average upload S3 save duration", averagePerformance, "s. troubleshoot this issue by visiting https://owncast.online/docs/troubleshooting/")
}
}
@@ -220,7 +221,8 @@ func (s *S3Storage) Cleanup() error {
// RemoteCleanup will remove old files from the remote storage provider.
func (s *S3Storage) RemoteCleanup() error {
// Determine how many files we should keep on S3 storage
maxNumber := data.GetStreamLatencyLevel().SegmentCount
configRepository := configrepository.Get()
maxNumber := configRepository.GetStreamLatencyLevel().SegmentCount
buffer := 20
keys, err := s.getDeletableVideoSegmentsWithOffset(maxNumber + buffer)

View File

@@ -16,6 +16,7 @@ import (
"github.com/owncast/owncast/core/webhooks"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/notifications"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/utils"
)
@@ -39,9 +40,11 @@ func setStreamAsConnected(rtmpOut *io.PipeReader) {
_stats.LastConnectTime = &now
_stats.SessionMaxViewerCount = 0
configRepository := configrepository.Get()
_currentBroadcast = &models.CurrentBroadcast{
LatencyLevel: data.GetStreamLatencyLevel(),
OutputSettings: data.GetStreamOutputVariants(),
LatencyLevel: configRepository.GetStreamLatencyLevel(),
OutputSettings: configRepository.GetStreamOutputVariants(),
}
StopOfflineCleanupTimer()
@@ -69,7 +72,7 @@ func setStreamAsConnected(rtmpOut *io.PipeReader) {
}()
go webhooks.SendStreamStatusEvent(models.StreamStarted)
selectedThumbnailVideoQualityIndex, isVideoPassthrough := data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings)
selectedThumbnailVideoQualityIndex, isVideoPassthrough := configRepository.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings)
transcoder.StartThumbnailGenerator(segmentPath, selectedThumbnailVideoQualityIndex, isVideoPassthrough)
_ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true)
@@ -176,8 +179,9 @@ func startLiveStreamNotificationsTimer() context.CancelFunc {
return
}
configRepository := configrepository.Get()
// Send Fediverse message.
if data.GetFederationEnabled() {
if configRepository.GetFederationEnabled() {
log.Traceln("Sending Federated Go Live message.")
if err := activitypub.SendLive(); err != nil {
log.Errorln(err)

View File

@@ -11,7 +11,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/utils"
)
@@ -88,9 +88,9 @@ func fireThumbnailGenerator(segmentPath string, variantIndex int) error {
if len(names) == 0 {
return nil
}
configRepository := configrepository.Get()
mostRecentFile := path.Join(framePath, names[0])
ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath())
ffmpegPath := utils.ValidatedFfmpegPath(configRepository.GetFfMpegPath())
outputFileTemp := path.Join(config.TempDir, "tempthumbnail.jpg")
thumbnailCmdFlags := []string{
@@ -120,7 +120,8 @@ func fireThumbnailGenerator(segmentPath string, variantIndex int) error {
}
func makeAnimatedGifPreview(sourceFile string, outputFile string) {
ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath())
configRepository := configrepository.Get()
ffmpegPath := utils.ValidatedFfmpegPath(configRepository.GetFfMpegPath())
outputFileTemp := path.Join(config.TempDir, "temppreview.gif")
// Filter is pulled from https://engineering.giphy.com/how-to-make-gifs-with-ffmpeg/

View File

@@ -12,9 +12,9 @@ import (
"github.com/teris-io/shortid"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/logging"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/utils"
)
@@ -275,15 +275,16 @@ func getVariantFromConfigQuality(quality models.StreamOutputVariant, index int)
// NewTranscoder will return a new Transcoder, populated by the config.
func NewTranscoder() *Transcoder {
ffmpegPath := utils.ValidatedFfmpegPath(data.GetFfMpegPath())
configRepository := configrepository.Get()
ffmpegPath := utils.ValidatedFfmpegPath(configRepository.GetFfMpegPath())
transcoder := new(Transcoder)
transcoder.ffmpegPath = ffmpegPath
transcoder.internalListenerPort = config.InternalHLSListenerPort
transcoder.currentStreamOutputSettings = data.GetStreamOutputVariants()
transcoder.currentLatencyLevel = data.GetStreamLatencyLevel()
transcoder.codec = getCodec(data.GetVideoCodec())
transcoder.currentStreamOutputSettings = configRepository.GetStreamOutputVariants()
transcoder.currentLatencyLevel = configRepository.GetStreamLatencyLevel()
transcoder.codec = getCodec(configRepository.GetVideoCodec())
transcoder.segmentOutputPath = config.HLSStoragePath
transcoder.playlistOutputPath = config.HLSStoragePath

View File

@@ -8,7 +8,7 @@ import (
"sync"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
@@ -99,9 +99,9 @@ func handleTranscoderMessage(message string) {
func createVariantDirectories() {
// Create private hls data dirs
utils.CleanupDirectory(config.HLSStoragePath)
if len(data.GetStreamOutputVariants()) != 0 {
for index := range data.GetStreamOutputVariants() {
configRepository := configrepository.Get()
if len(configRepository.GetStreamOutputVariants()) != 0 {
for index := range configRepository.GetStreamOutputVariants() {
if err := os.MkdirAll(path.Join(config.HLSStoragePath, strconv.Itoa(index)), 0o750); err != nil {
log.Fatalln(err)
}

View File

@@ -3,8 +3,8 @@ package webhooks
import (
"time"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
"github.com/teris-io/shortid"
)
@@ -14,13 +14,15 @@ func SendStreamStatusEvent(eventType models.EventType) {
}
func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) {
configRepository := configrepository.Get()
SendEventToWebhooks(WebhookEvent{
Type: eventType,
EventData: map[string]interface{}{
"id": id,
"name": data.GetServerName(),
"summary": data.GetServerSummary(),
"streamTitle": data.GetStreamTitle(),
"name": configRepository.GetServerName(),
"summary": configRepository.GetServerSummary(),
"streamTitle": configRepository.GetStreamTitle(),
"status": getStatus(),
"timestamp": timestamp,
},

View File

@@ -5,14 +5,16 @@ import (
"time"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/configrepository"
)
func TestSendStreamStatusEvent(t *testing.T) {
data.SetServerName("my server")
data.SetServerSummary("my server where I stream")
data.SetStreamTitle("my stream")
configRepository := configrepository.Get()
configRepository.SetServerName("my server")
configRepository.SetServerSummary("my server where I stream")
configRepository.SetStreamTitle("my stream")
checkPayload(t, models.StreamStarted, func() {
sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC())