From 529cba84fd43cd016b656cc7306c13d02a1cdb7b Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Sat, 23 Apr 2022 16:31:20 -0700 Subject: [PATCH] Refactor migration to loop over each user instead of bulk inserts --- core/data/migrations.go | 123 +++++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 52 deletions(-) diff --git a/core/data/migrations.go b/core/data/migrations.go index e2f31bfcd..b1b54d476 100644 --- a/core/data/migrations.go +++ b/core/data/migrations.go @@ -2,8 +2,6 @@ package data import ( "database/sql" - "fmt" - "strings" "time" "github.com/owncast/owncast/utils" @@ -11,54 +9,11 @@ import ( "github.com/teris-io/shortid" ) +// nolint:cyclop func migrateToSchema5(db *sql.DB) { // Create the access tokens table. createAccessTokenTable(db) - // Migrate the access tokens from the users table to the access tokens table. - query := `SELECT id, access_token, created_at FROM users` - rows, err := db.Query(query) - if err != nil { - log.Errorln("error migrating access tokens to schema v5", err) - return - } - if rows.Err() != nil { - log.Errorln("error migrating access tokens to schema v5", rows.Err()) - return - } - defer rows.Close() - - valueStrings := []string{} - valueArgs := []interface{}{} - - var token string - var userID string - var timestamp time.Time - for rows.Next() { - if err := rows.Scan(&userID, &token, ×tamp); err != nil { - log.Error("There is a problem reading the database.", err) - return - } - - valueStrings = append(valueStrings, "(?, ?, ?)") - valueArgs = append(valueArgs, token, userID, timestamp) - } - - smt := `INSERT INTO user_access_tokens(token, user_id, timestamp) VALUES %s ON CONFLICT DO NOTHING` - smt = fmt.Sprintf(smt, strings.Join(valueStrings, ",")) - tx, err := db.Begin() - if err != nil { - log.Fatalln("Error starting transaction", err) - } - _, err = tx.Exec(smt, valueArgs...) - if err != nil { - _ = tx.Rollback() - log.Errorln("Error inserting access tokens", err) - } - if err := tx.Commit(); err != nil { - log.Errorln("Error committing transaction", err) - } - // 1. Authenticated bool added to the users table. // 2. Access tokens are now stored in their own table. // @@ -74,24 +29,88 @@ func migrateToSchema5(db *sql.DB) { "disabled_at" TIMESTAMP, "previous_names" TEXT DEFAULT '', "namechanged_at" TIMESTAMP, - "authenticated_at" TIMESTAMP, + "authenticated_at" TIMESTAMP, "scopes" TEXT, "type" TEXT DEFAULT 'STANDARD', "last_used" DATETIME DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) );CREATE INDEX user_id_disabled_at_index ON users (id, disabled_at); CREATE INDEX user_id_index ON users (id); - CREATE INDEX user_id_disabled_index ON users (id, disabled_at); + CREATE INDEX user_id_disabled_index ON users (id, disabled_at); CREATE INDEX user_disabled_at_index ON USERS (disabled_at);` - _, err = db.Exec(createTempTable) + _, err := db.Exec(createTempTable) if err != nil { log.Errorln("error running migration, you may experience issues: ", err) } - _, err = db.Exec(`INSERT INTO users_copy (id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used) - SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used FROM users;`) + // Start insert transaction + tx, err := db.Begin() if err != nil { - log.Errorln("error running migration, you may experience issues: ", err) + log.Errorln(err) + return + } + + // Migrate the users table to the new users_copy table. + rows, err := tx.Query(`SELECT id, access_token, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used FROM users`) + if err != nil { + log.Errorln("error migrating access tokens to schema v5", err) + return + } + if rows.Err() != nil { + log.Errorln("error migrating access tokens to schema v5", rows.Err()) + return + } + defer rows.Close() + + defer tx.Rollback() //nolint:errcheck + + log.Println("Migrating users. This may take time if you have lots of users...") + + for rows.Next() { + var id string + var accessToken string + var displayName string + var displayColor int + var createdAt time.Time + var disabledAt *time.Time + var previousNames string + var namechangedAt *time.Time + var scopes *string + var userType string + var lastUsed *time.Time + + if err := rows.Scan(&id, &accessToken, &displayName, &displayColor, &createdAt, &disabledAt, &previousNames, &namechangedAt, &scopes, &userType, &lastUsed); err != nil { + log.Error("There is a problem reading the database when migrating users.", err) + return + } + + stmt, err := tx.Prepare(`INSERT INTO users_copy (id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) + if err != nil { + log.Errorln(err) + return + } + defer stmt.Close() + + if _, err := stmt.Exec(id, displayName, displayColor, createdAt, disabledAt, previousNames, namechangedAt, scopes, userType, lastUsed); err != nil { + log.Errorln(err) + return + } + + stmt, err = tx.Prepare(`INSERT INTO user_access_tokens(token, user_id, timestamp) VALUES (?, ?, ?) ON CONFLICT DO NOTHING`) + if err != nil { + log.Errorln(err) + return + } + defer stmt.Close() + + if _, err := stmt.Exec(accessToken, id, createdAt); err != nil { + log.Errorln(err) + return + } + } + + if err := tx.Commit(); err != nil { + log.Errorln(err) } _, err = db.Exec(`PRAGMA foreign_keys = OFF;DROP TABLE "users";ALTER TABLE "users_copy" RENAME TO users;PRAGMA foreign_keys = ON;`)