User repository (#3795)
* It builds with the new user repository * fix(test): fix broken test * fix(api): fix registration endpoint that was broken after the change * fix(test): update test to reflect new user repository * fix: use interface type instead of concrete type * fix: restore commented out code
This commit is contained in:
@@ -11,6 +11,8 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/owncast/owncast/persistence/tables"
|
||||
|
||||
"github.com/owncast/owncast/config"
|
||||
"github.com/owncast/owncast/utils"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -74,8 +76,8 @@ func SetupPersistence(file string) error {
|
||||
_, _ = db.Exec("pragma wal_checkpoint(full)")
|
||||
|
||||
createWebhooksTable()
|
||||
createUsersTable(db)
|
||||
createAccessTokenTable(db)
|
||||
tables.CreateUsersTable(db)
|
||||
tables.CreateAccessTokenTable(db)
|
||||
|
||||
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS config (
|
||||
"key" string NOT NULL PRIMARY KEY,
|
||||
@@ -108,7 +110,7 @@ func SetupPersistence(file string) error {
|
||||
|
||||
// is database schema outdated?
|
||||
if version < schemaVersion {
|
||||
if err := migrateDatabaseSchema(db, version, schemaVersion); err != nil {
|
||||
if err := tables.MigrateDatabaseSchema(db, version, schemaVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,32 +9,6 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// CreateMessagesTable will create the chat messages table if needed.
|
||||
func CreateMessagesTable(db *sql.DB) {
|
||||
createTableSQL := `CREATE TABLE IF NOT EXISTS messages (
|
||||
"id" string NOT NULL,
|
||||
"user_id" TEXT,
|
||||
"body" TEXT,
|
||||
"eventType" TEXT,
|
||||
"hidden_at" DATETIME,
|
||||
"timestamp" DATETIME,
|
||||
"title" TEXT,
|
||||
"subtitle" TEXT,
|
||||
"image" TEXT,
|
||||
"link" TEXT,
|
||||
PRIMARY KEY (id)
|
||||
);`
|
||||
MustExec(createTableSQL, db)
|
||||
|
||||
// Create indexes
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS user_id_hidden_at_timestamp ON messages (id, user_id, hidden_at, timestamp);`, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_id ON messages (id);`, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_user_id ON messages (user_id);`, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_hidden_at ON messages (hidden_at);`, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_timestamp ON messages (timestamp);`, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_messages_hidden_at_timestamp on messages(hidden_at, timestamp);`, db)
|
||||
}
|
||||
|
||||
// GetMessagesCount will return the number of messages in the database.
|
||||
func GetMessagesCount() int64 {
|
||||
query := `SELECT COUNT(*) FROM messages`
|
||||
|
||||
@@ -1,371 +0,0 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/owncast/owncast/config"
|
||||
"github.com/owncast/owncast/utils"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/teris-io/shortid"
|
||||
)
|
||||
|
||||
func migrateDatabaseSchema(db *sql.DB, from, to int) error {
|
||||
log.Printf("Migrating database from version %d to %d", from, to)
|
||||
dbBackupFile := filepath.Join(config.BackupDirectory, fmt.Sprintf("owncast-v%d.bak", from))
|
||||
utils.Backup(db, dbBackupFile)
|
||||
for v := from; v < to; v++ {
|
||||
log.Tracef("Migration step from %d to %d\n", v, v+1)
|
||||
switch v {
|
||||
case 0:
|
||||
migrateToSchema1(db)
|
||||
case 1:
|
||||
migrateToSchema2(db)
|
||||
case 2:
|
||||
migrateToSchema3(db)
|
||||
case 3:
|
||||
migrateToSchema4(db)
|
||||
case 4:
|
||||
migrateToSchema5(db)
|
||||
case 5:
|
||||
migrateToSchema6(db)
|
||||
case 6:
|
||||
migrateToSchema7(db)
|
||||
default:
|
||||
log.Fatalln("missing database migration step")
|
||||
}
|
||||
}
|
||||
|
||||
_, err := db.Exec("UPDATE config SET value = ? WHERE key = ?", to, "version")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func migrateToSchema7(db *sql.DB) {
|
||||
log.Println("Migrating users. This may take time if you have lots of users...")
|
||||
|
||||
var ids []string
|
||||
|
||||
rows, err := db.Query(`SELECT id FROM users`)
|
||||
if err != nil {
|
||||
log.Errorln("error migrating access tokens to schema v5", err)
|
||||
return
|
||||
}
|
||||
if rows.Err() != nil {
|
||||
log.Errorln("error migrating users to schema v7", rows.Err())
|
||||
return
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
log.Error("There is a problem reading the database when migrating users.", err)
|
||||
return
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
tx, _ := db.Begin()
|
||||
stmt, _ := tx.Prepare("update users set display_color=? WHERE id=?")
|
||||
defer stmt.Close()
|
||||
|
||||
for _, id := range ids {
|
||||
displayColor := utils.GenerateRandomDisplayColor(config.MaxUserColor)
|
||||
|
||||
if _, err := stmt.Exec(displayColor, id); err != nil {
|
||||
log.Panic(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Panicln(err)
|
||||
}
|
||||
}
|
||||
|
||||
func migrateToSchema6(db *sql.DB) {
|
||||
// Fix chat messages table schema. Since chat is ephemeral we can drop
|
||||
// the table and recreate it.
|
||||
// Drop the old messages table
|
||||
MustExec(`DROP TABLE messages`, db)
|
||||
|
||||
// Recreate it
|
||||
CreateMessagesTable(db)
|
||||
}
|
||||
|
||||
// nolint:cyclop
|
||||
func migrateToSchema5(db *sql.DB) {
|
||||
// Create the access tokens table.
|
||||
createAccessTokenTable(db)
|
||||
|
||||
// 1. Authenticated bool added to the users table.
|
||||
// 2. Access tokens are now stored in their own table.
|
||||
//
|
||||
// Long story short, the access_token used to be the primary key of the users
|
||||
// table. However, now it's going to live in its own table. However, you
|
||||
// cannot change the primary key. So we need to create a copy table, then
|
||||
// migrate the access tokens, and then move the copy into place.
|
||||
createTempTable := `CREATE TABLE IF NOT EXISTS users_copy (
|
||||
"id" TEXT,
|
||||
"display_name" TEXT NOT NULL,
|
||||
"display_color" NUMBER NOT NULL,
|
||||
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
"disabled_at" TIMESTAMP,
|
||||
"previous_names" TEXT DEFAULT '',
|
||||
"namechanged_at" TIMESTAMP,
|
||||
"authenticated_at" TIMESTAMP,
|
||||
"scopes" TEXT,
|
||||
"type" TEXT DEFAULT 'STANDARD',
|
||||
"last_used" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (id)
|
||||
);CREATE INDEX user_id_disabled_at_index ON users (id, disabled_at);
|
||||
CREATE INDEX user_id_index ON users (id);
|
||||
CREATE INDEX user_id_disabled_index ON users (id, disabled_at);
|
||||
CREATE INDEX user_disabled_at_index ON USERS (disabled_at);`
|
||||
_, err := db.Exec(createTempTable)
|
||||
if err != nil {
|
||||
log.Errorln("error running migration, you may experience issues: ", err)
|
||||
}
|
||||
|
||||
// Start insert transaction
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Migrate the users table to the new users_copy table.
|
||||
rows, err := tx.Query(`SELECT id, access_token, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used FROM users`)
|
||||
if err != nil {
|
||||
log.Errorln("error migrating access tokens to schema v5", err)
|
||||
return
|
||||
}
|
||||
if rows.Err() != nil {
|
||||
log.Errorln("error migrating access tokens to schema v5", rows.Err())
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
defer tx.Rollback() //nolint:errcheck
|
||||
|
||||
log.Println("Migrating users. This may take time if you have lots of users...")
|
||||
|
||||
for rows.Next() {
|
||||
var id string
|
||||
var accessToken string
|
||||
var displayName string
|
||||
var displayColor int
|
||||
var createdAt time.Time
|
||||
var disabledAt *time.Time
|
||||
var previousNames string
|
||||
var namechangedAt *time.Time
|
||||
var scopes *string
|
||||
var userType string
|
||||
var lastUsed *time.Time
|
||||
|
||||
if err := rows.Scan(&id, &accessToken, &displayName, &displayColor, &createdAt, &disabledAt, &previousNames, &namechangedAt, &scopes, &userType, &lastUsed); err != nil {
|
||||
log.Error("There is a problem reading the database when migrating users.", err)
|
||||
return
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`INSERT INTO users_copy (id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(id, displayName, displayColor, createdAt, disabledAt, previousNames, namechangedAt, scopes, userType, lastUsed); err != nil {
|
||||
log.Errorln(err)
|
||||
return
|
||||
}
|
||||
|
||||
stmt, err = tx.Prepare(`INSERT INTO user_access_tokens(token, user_id, timestamp) VALUES (?, ?, ?) ON CONFLICT DO NOTHING`)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err := stmt.Exec(accessToken, id, createdAt); err != nil {
|
||||
log.Errorln(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
log.Errorln(err)
|
||||
}
|
||||
|
||||
_, err = db.Exec(`PRAGMA foreign_keys = OFF;DROP TABLE "users";ALTER TABLE "users_copy" RENAME TO users;PRAGMA foreign_keys = ON;`)
|
||||
if err != nil {
|
||||
log.Errorln("error running migration, you may experience issues: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func migrateToSchema4(db *sql.DB) {
|
||||
// We now save the follow request object.
|
||||
stmt, err := db.Prepare("ALTER TABLE ap_followers ADD COLUMN request_object BLOB")
|
||||
if err != nil {
|
||||
log.Errorln("Error running migration. This may be because you have already been running a dev version.", err)
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
func migrateToSchema3(db *sql.DB) {
|
||||
// Since it's just a backlog of chat messages let's wipe the old messages
|
||||
// and recreate the table.
|
||||
|
||||
// Drop the old messages table
|
||||
stmt, err := db.Prepare("DROP TABLE messages")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
|
||||
// Recreate it
|
||||
CreateMessagesTable(db)
|
||||
}
|
||||
|
||||
func migrateToSchema2(db *sql.DB) {
|
||||
// Since it's just a backlog of chat messages let's wipe the old messages
|
||||
// and recreate the table.
|
||||
|
||||
// Drop the old messages table
|
||||
stmt, err := db.Prepare("DROP TABLE messages")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
|
||||
// Recreate it
|
||||
CreateMessagesTable(db)
|
||||
}
|
||||
|
||||
func migrateToSchema1(db *sql.DB) {
|
||||
// Since it's just a backlog of chat messages let's wipe the old messages
|
||||
// and recreate the table.
|
||||
|
||||
// Drop the old messages table
|
||||
stmt, err := db.Prepare("DROP TABLE messages")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
|
||||
// Recreate it
|
||||
CreateMessagesTable(db)
|
||||
|
||||
// Migrate access tokens to become chat users
|
||||
type oldAccessToken struct {
|
||||
createdAt time.Time
|
||||
lastUsedAt *time.Time
|
||||
accessToken string
|
||||
displayName string
|
||||
scopes string
|
||||
}
|
||||
|
||||
oldAccessTokens := make([]oldAccessToken, 0)
|
||||
|
||||
query := `SELECT * FROM access_tokens`
|
||||
|
||||
rows, err := db.Query(query)
|
||||
if err != nil || rows.Err() != nil {
|
||||
log.Errorln("error migrating access tokens to schema v1", err, rows.Err())
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var token string
|
||||
var name string
|
||||
var scopes string
|
||||
var timestampString string
|
||||
var lastUsedString *string
|
||||
|
||||
if err := rows.Scan(&token, &name, &scopes, ×tampString, &lastUsedString); err != nil {
|
||||
log.Error("There is a problem reading the database.", err)
|
||||
return
|
||||
}
|
||||
|
||||
timestamp, err := time.Parse(time.RFC3339, timestampString)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var lastUsed *time.Time
|
||||
if lastUsedString != nil {
|
||||
lastUsedTime, _ := time.Parse(time.RFC3339, *lastUsedString)
|
||||
lastUsed = &lastUsedTime
|
||||
}
|
||||
|
||||
oldToken := oldAccessToken{
|
||||
accessToken: token,
|
||||
displayName: name,
|
||||
scopes: scopes,
|
||||
createdAt: timestamp,
|
||||
lastUsedAt: lastUsed,
|
||||
}
|
||||
|
||||
oldAccessTokens = append(oldAccessTokens, oldToken)
|
||||
}
|
||||
|
||||
// Recreate them as users
|
||||
for _, token := range oldAccessTokens {
|
||||
color := utils.GenerateRandomDisplayColor(config.MaxUserColor)
|
||||
if err := insertAPIToken(db, token.accessToken, token.displayName, color, token.scopes); err != nil {
|
||||
log.Errorln("Error migrating access token", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func insertAPIToken(db *sql.DB, token string, name string, color int, scopes string) error {
|
||||
log.Debugln("Adding new access token:", name)
|
||||
|
||||
id := shortid.MustGenerate()
|
||||
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stmt, err := tx.Prepare("INSERT INTO users(id, access_token, display_name, display_color, scopes, type) values(?, ?, ?, ?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
if _, err = stmt.Exec(id, token, name, color, scopes, "API"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func createAccessTokenTable(db *sql.DB) {
|
||||
createTableSQL := `CREATE TABLE IF NOT EXISTS user_access_tokens (
|
||||
"token" TEXT NOT NULL PRIMARY KEY,
|
||||
"user_id" TEXT NOT NULL,
|
||||
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES users(id)
|
||||
);`
|
||||
|
||||
stmt, err := db.Prepare(createTableSQL)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
|
||||
func createUsersTable(db *sql.DB) {
|
||||
log.Traceln("Creating users table...")
|
||||
|
||||
createTableSQL := `CREATE TABLE IF NOT EXISTS users (
|
||||
"id" TEXT,
|
||||
"display_name" TEXT NOT NULL,
|
||||
"display_color" NUMBER NOT NULL,
|
||||
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
"disabled_at" TIMESTAMP,
|
||||
"previous_names" TEXT DEFAULT '',
|
||||
"namechanged_at" TIMESTAMP,
|
||||
"authenticated_at" TIMESTAMP,
|
||||
"scopes" TEXT,
|
||||
"type" TEXT DEFAULT 'STANDARD',
|
||||
"last_used" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (id)
|
||||
);`
|
||||
|
||||
MustExec(createTableSQL, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_user_id ON users (id);`, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_user_id_disabled ON users (id, disabled_at);`, db)
|
||||
MustExec(`CREATE INDEX IF NOT EXISTS idx_user_disabled_at ON users (disabled_at);`, db)
|
||||
}
|
||||
|
||||
// GetUsersCount will return the number of users in the database.
|
||||
func GetUsersCount() int64 {
|
||||
query := `SELECT COUNT(*) FROM users`
|
||||
rows, err := _db.Query(query)
|
||||
if err != nil || rows.Err() != nil {
|
||||
return 0
|
||||
}
|
||||
defer rows.Close()
|
||||
var count int64
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&count); err != nil {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// MustExec will execute a SQL statement on a provided database instance.
|
||||
func MustExec(s string, db *sql.DB) {
|
||||
stmt, err := db.Prepare(s)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user