diff --git a/db/db.go b/db/db.go index 958c9c0..254c1da 100644 --- a/db/db.go +++ b/db/db.go @@ -66,23 +66,55 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error { return errors.Err("not connected") } - return addBlob(s.conn, hash, length, isStored) + _, err := s.insertBlob(hash, length, isStored) + return err } -func addBlob(e Executor, hash string, length int, isStored bool) error { +func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) { if length <= 0 { - return errors.Err("length must be positive") + return 0, errors.Err("length must be positive") } - err := exec(e, + blobID, err := s.exec( "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", hash, isStored, length, ) if err != nil { - return errors.Err(err) + return 0, err } - return nil + if blobID == 0 { + err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID) + if err != nil { + return 0, errors.Err(err) + } + if blobID == 0 { + return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing") + } + } + + return blobID, nil +} + +func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { + streamID, err := s.exec( + "INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (?,?)", + hash, sdBlobID, + ) + if err != nil { + return 0, errors.Err(err) + } + + if streamID == 0 { + err = s.conn.QueryRow("SELECT id FROM stream WHERE sd_blob_id = ?", sdBlobID).Scan(&streamID) + if err != nil { + return 0, errors.Err(err) + } + if streamID == 0 { + return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing") + } + } + return streamID, nil } // HasBlob checks if the database contains the blob information. @@ -157,12 +189,12 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { // Delete will remove the blob from the db func (s *SQL) Delete(hash string) error { - err := exec(s.conn, "DELETE FROM stream WHERE sd_hash = ?", hash) + _, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash) if err != nil { return errors.Err(err) } - err = exec(s.conn, "DELETE FROM blob_ WHERE hash = ?", hash) + _, err = s.exec("DELETE FROM blob_ WHERE hash = ?", hash) return errors.Err(err) } @@ -214,8 +246,9 @@ func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) { query := ` SELECT b.hash FROM blob_ b - INNER JOIN stream_blob sb ON b.hash = sb.blob_hash - INNER JOIN stream s ON s.hash = sb.stream_hash AND s.sd_hash = ? + INNER JOIN stream_blob sb ON b.id = sb.blob_id + INNER JOIN stream s ON s.id = sb.stream_id + INNER JOIN blob_ sdb ON sdb.id = s.sd_blob_id AND sdb.hash = ? WHERE b.is_stored = 0 ` args := []interface{}{sdHash} @@ -247,27 +280,21 @@ func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) { return missingBlobs, errors.Err(err) } -// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information -// into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is -// rolled back and error(s) are returned. +// AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored", +// but they are tracked so reflector knows what it is missing. func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { if s.conn == nil { return errors.Err("not connected") } - // insert sd blob - err := addBlob(s.conn, sdHash, sdBlobLength, true) + sdBlobID, err := s.insertBlob(sdHash, sdBlobLength, true) if err != nil { return err } - // insert stream - err = exec(s.conn, - "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", - sdBlob.StreamHash, sdHash, - ) + streamID, err := s.insertStream(sdBlob.StreamHash, sdBlobID) if err != nil { - return errors.Err(err) + return err } // insert content blobs and connect them to stream @@ -277,14 +304,14 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { continue } - err := addBlob(s.conn, contentBlob.BlobHash, contentBlob.Length, false) + blobID, err := s.insertBlob(contentBlob.BlobHash, contentBlob.Length, false) if err != nil { return err } - err = exec(s.conn, - "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", - sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum, + _, err = s.exec( + "INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES (?,?,?)", + streamID, blobID, contentBlob.BlobNum, ) if err != nil { return errors.Err(err) @@ -408,53 +435,67 @@ func closeRows(rows *sql.Rows) { } } -type Executor interface { - Exec(query string, args ...interface{}) (sql.Result, error) -} - -func exec(e Executor, query string, args ...interface{}) error { +func (s *SQL) exec(query string, args ...interface{}) (int64, error) { logQuery(query, args...) attempt, maxAttempts := 0, 3 - var err error Retry: attempt++ - _, err = e.Exec(query, args...) - if e, ok := err.(*mysql.MySQLError); ok && e.Number == 1205 { + result, err := s.conn.Exec(query, args...) + if isLockTimeoutError(err) { if attempt <= maxAttempts { //Error 1205: Lock wait timeout exceeded; try restarting transaction goto Retry } - err = errors.Prefix("Timed out query "+query, err) + err = errors.Prefix("Lock timeout for query "+query, err) } - return errors.Err(err) + + if err != nil { + return 0, errors.Err(err) + } + + lastID, err := result.LastInsertId() + return lastID, errors.Err(err) +} + +func isLockTimeoutError(err error) bool { + e, ok := err.(*mysql.MySQLError) + return ok && e != nil && e.Number == 1205 } /* SQL schema in prod, set tx_isolation to READ-COMMITTED to improve db performance +make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space. + +todo: could add UNIQUE KEY (stream_hash, num) to stream_blob ... CREATE TABLE blob_ ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, hash char(96) NOT NULL, is_stored TINYINT(1) NOT NULL DEFAULT 0, length bigint(20) unsigned DEFAULT NULL, - PRIMARY KEY (hash) + PRIMARY KEY (id), + UNIQUE KEY blob_hash_idx (hash) ); CREATE TABLE stream ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, hash char(96) NOT NULL, - sd_hash char(96) NOT NULL, - PRIMARY KEY (hash), - KEY sd_hash_idx (sd_hash), - FOREIGN KEY (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE + sd_blob_id BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY stream_hash_idx (hash), + KEY stream_sd_blob_id_idx (sd_blob_id), + FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE ); CREATE TABLE stream_blob ( - stream_hash char(96) NOT NULL, - blob_hash char(96) NOT NULL, + stream_id BIGINT UNSIGNED NOT NULL, + blob_id BIGINT UNSIGNED NOT NULL, num int NOT NULL, - PRIMARY KEY (stream_hash, blob_hash), - FOREIGN KEY (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE + PRIMARY KEY (stream_id, blob_id), + KEY stream_blob_blob_id_idx (blob_id), + FOREIGN KEY (stream_id) REFERENCES stream (id) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (blob_id) REFERENCES blob_ (id) ON DELETE CASCADE ON UPDATE CASCADE ); CREATE TABLE blocked ( @@ -462,6 +503,4 @@ CREATE TABLE blocked ( PRIMARY KEY (hash) ); -could add UNIQUE KEY (stream_hash, num) to stream_blob ... - */