Merge branch 'db_restructure'

* db_restructure:
  make new changes backwards-compatible while we migrate
  use IDs instead of hashes for join table
This commit is contained in:
Alex Grintsvayg 2019-07-10 12:48:15 -04:00
commit 47425c2c50
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5

128
db/db.go
View file

@ -66,23 +66,55 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error {
return errors.Err("not connected") return errors.Err("not connected")
} }
return addBlob(s.conn, hash, length, isStored) _, err := s.insertBlob(hash, length, isStored)
return err
} }
func addBlob(e Executor, hash string, length int, isStored bool) error { func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) {
if length <= 0 { if length <= 0 {
return errors.Err("length must be positive") return 0, errors.Err("length must be positive")
} }
err := exec(e, blobID, err := s.exec(
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
hash, isStored, length, hash, isStored, length,
) )
if err != nil { if err != nil {
return errors.Err(err) return 0, err
} }
return nil if blobID == 0 {
err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID)
if err != nil {
return 0, errors.Err(err)
}
if blobID == 0 {
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
}
}
return blobID, nil
}
func (s *SQL) insertStream(hash, sdHash string, sdBlobID int64) (int64, error) {
streamID, err := s.exec(
"INSERT IGNORE INTO stream (hash, sd_hash, sd_blob_id) VALUES (?,?, ?)",
hash, sdHash, sdBlobID,
)
if err != nil {
return 0, errors.Err(err)
}
if streamID == 0 {
err = s.conn.QueryRow("SELECT id FROM stream WHERE sd_blob_id = ?", sdBlobID).Scan(&streamID)
if err != nil {
return 0, errors.Err(err)
}
if streamID == 0 {
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
}
}
return streamID, nil
} }
// HasBlob checks if the database contains the blob information. // HasBlob checks if the database contains the blob information.
@ -157,12 +189,12 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
// Delete will remove the blob from the db // Delete will remove the blob from the db
func (s *SQL) Delete(hash string) error { func (s *SQL) Delete(hash string) error {
err := exec(s.conn, "DELETE FROM stream WHERE sd_hash = ?", hash) _, err := s.exec("DELETE FROM stream WHERE sd_hash = ?", hash)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
err = exec(s.conn, "DELETE FROM blob_ WHERE hash = ?", hash) _, err = s.exec("DELETE FROM blob_ WHERE hash = ?", hash)
return errors.Err(err) return errors.Err(err)
} }
@ -247,27 +279,21 @@ func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) {
return missingBlobs, errors.Err(err) return missingBlobs, errors.Err(err)
} }
// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information // AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored",
// into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is // but they are tracked so reflector knows what it is missing.
// rolled back and error(s) are returned.
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
if s.conn == nil { if s.conn == nil {
return errors.Err("not connected") return errors.Err("not connected")
} }
// insert sd blob sdBlobID, err := s.insertBlob(sdHash, sdBlobLength, true)
err := addBlob(s.conn, sdHash, sdBlobLength, true)
if err != nil { if err != nil {
return err return err
} }
// insert stream streamID, err := s.insertStream(sdBlob.StreamHash, sdHash, sdBlobID)
err = exec(s.conn,
"INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)",
sdBlob.StreamHash, sdHash,
)
if err != nil { if err != nil {
return errors.Err(err) return err
} }
// insert content blobs and connect them to stream // insert content blobs and connect them to stream
@ -277,14 +303,14 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
continue continue
} }
err := addBlob(s.conn, contentBlob.BlobHash, contentBlob.Length, false) blobID, err := s.insertBlob(contentBlob.BlobHash, contentBlob.Length, false)
if err != nil { if err != nil {
return err return err
} }
err = exec(s.conn, _, err = s.exec(
"INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", "INSERT IGNORE INTO stream_blob (stream_id, stream_hash, blob_id, blob_hash, num) VALUES (?,?,?,?,?)",
sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum, streamID, sdBlob.StreamHash, blobID, contentBlob.BlobHash, contentBlob.BlobNum,
) )
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -408,53 +434,67 @@ func closeRows(rows *sql.Rows) {
} }
} }
type Executor interface { func (s *SQL) exec(query string, args ...interface{}) (int64, error) {
Exec(query string, args ...interface{}) (sql.Result, error)
}
func exec(e Executor, query string, args ...interface{}) error {
logQuery(query, args...) logQuery(query, args...)
attempt, maxAttempts := 0, 3 attempt, maxAttempts := 0, 3
var err error
Retry: Retry:
attempt++ attempt++
_, err = e.Exec(query, args...) result, err := s.conn.Exec(query, args...)
if e, ok := err.(*mysql.MySQLError); ok && e.Number == 1205 { if isLockTimeoutError(err) {
if attempt <= maxAttempts { if attempt <= maxAttempts {
//Error 1205: Lock wait timeout exceeded; try restarting transaction //Error 1205: Lock wait timeout exceeded; try restarting transaction
goto Retry goto Retry
} }
err = errors.Prefix("Timed out query "+query, err) err = errors.Prefix("Lock timeout for query "+query, err)
} }
return errors.Err(err)
if err != nil {
return 0, errors.Err(err)
}
lastID, err := result.LastInsertId()
return lastID, errors.Err(err)
}
func isLockTimeoutError(err error) bool {
e, ok := err.(*mysql.MySQLError)
return ok && e != nil && e.Number == 1205
} }
/* SQL schema /* SQL schema
in prod, set tx_isolation to READ-COMMITTED to improve db performance in prod, set tx_isolation to READ-COMMITTED to improve db performance
make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space.
todo: could add UNIQUE KEY (stream_hash, num) to stream_blob ...
CREATE TABLE blob_ ( CREATE TABLE blob_ (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE,
hash char(96) NOT NULL, hash char(96) NOT NULL,
is_stored TINYINT(1) NOT NULL DEFAULT 0, is_stored TINYINT(1) NOT NULL DEFAULT 0,
length bigint(20) unsigned DEFAULT NULL, length bigint(20) unsigned DEFAULT NULL,
PRIMARY KEY (hash) PRIMARY KEY (id),
UNIQUE KEY blob_hash_idx (hash)
); );
CREATE TABLE stream ( CREATE TABLE stream (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE,
hash char(96) NOT NULL, hash char(96) NOT NULL,
sd_hash char(96) NOT NULL, sd_blob_id BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (hash), PRIMARY KEY (id),
KEY sd_hash_idx (sd_hash), UNIQUE KEY stream_hash_idx (hash),
FOREIGN KEY (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE KEY stream_sd_blob_id_idx (sd_blob_id),
FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE
); );
CREATE TABLE stream_blob ( CREATE TABLE stream_blob (
stream_hash char(96) NOT NULL, stream_id BIGINT UNSIGNED NOT NULL,
blob_hash char(96) NOT NULL, blob_id BIGINT UNSIGNED NOT NULL,
num int NOT NULL, num int NOT NULL,
PRIMARY KEY (stream_hash, blob_hash), PRIMARY KEY (stream_id, blob_id),
FOREIGN KEY (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE, KEY stream_blob_blob_id_idx (blob_id),
FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE FOREIGN KEY (stream_id) REFERENCES stream (id) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (blob_id) REFERENCES blob_ (id) ON DELETE CASCADE ON UPDATE CASCADE
); );
CREATE TABLE blocked ( CREATE TABLE blocked (
@ -462,6 +502,4 @@ CREATE TABLE blocked (
PRIMARY KEY (hash) PRIMARY KEY (hash)
); );
could add UNIQUE KEY (stream_hash, num) to stream_blob ...
*/ */