use IDs instead of hashes for join table
This commit is contained in:
parent
fce8beea4d
commit
fd39b09e95
1 changed files with 86 additions and 47 deletions
133
db/db.go
133
db/db.go
|
@ -66,23 +66,55 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error {
|
||||||
return errors.Err("not connected")
|
return errors.Err("not connected")
|
||||||
}
|
}
|
||||||
|
|
||||||
return addBlob(s.conn, hash, length, isStored)
|
_, err := s.insertBlob(hash, length, isStored)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func addBlob(e Executor, hash string, length int, isStored bool) error {
|
func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) {
|
||||||
if length <= 0 {
|
if length <= 0 {
|
||||||
return errors.Err("length must be positive")
|
return 0, errors.Err("length must be positive")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := exec(e,
|
blobID, err := s.exec(
|
||||||
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
|
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
|
||||||
hash, isStored, length,
|
hash, isStored, length,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
if blobID == 0 {
|
||||||
|
err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.Err(err)
|
||||||
|
}
|
||||||
|
if blobID == 0 {
|
||||||
|
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return blobID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
||||||
|
streamID, err := s.exec(
|
||||||
|
"INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (?,?)",
|
||||||
|
hash, sdBlobID,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if streamID == 0 {
|
||||||
|
err = s.conn.QueryRow("SELECT id FROM stream WHERE sd_blob_id = ?", sdBlobID).Scan(&streamID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.Err(err)
|
||||||
|
}
|
||||||
|
if streamID == 0 {
|
||||||
|
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return streamID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBlob checks if the database contains the blob information.
|
// HasBlob checks if the database contains the blob information.
|
||||||
|
@ -157,12 +189,12 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
||||||
|
|
||||||
// Delete will remove the blob from the db
|
// Delete will remove the blob from the db
|
||||||
func (s *SQL) Delete(hash string) error {
|
func (s *SQL) Delete(hash string) error {
|
||||||
err := exec(s.conn, "DELETE FROM stream WHERE sd_hash = ?", hash)
|
_, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = exec(s.conn, "DELETE FROM blob_ WHERE hash = ?", hash)
|
_, err = s.exec("DELETE FROM blob_ WHERE hash = ?", hash)
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,8 +246,9 @@ func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) {
|
||||||
|
|
||||||
query := `
|
query := `
|
||||||
SELECT b.hash FROM blob_ b
|
SELECT b.hash FROM blob_ b
|
||||||
INNER JOIN stream_blob sb ON b.hash = sb.blob_hash
|
INNER JOIN stream_blob sb ON b.id = sb.blob_id
|
||||||
INNER JOIN stream s ON s.hash = sb.stream_hash AND s.sd_hash = ?
|
INNER JOIN stream s ON s.id = sb.stream_id
|
||||||
|
INNER JOIN blob_ sdb ON sdb.id = s.sd_blob_id AND sdb.hash = ?
|
||||||
WHERE b.is_stored = 0
|
WHERE b.is_stored = 0
|
||||||
`
|
`
|
||||||
args := []interface{}{sdHash}
|
args := []interface{}{sdHash}
|
||||||
|
@ -247,27 +280,21 @@ func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) {
|
||||||
return missingBlobs, errors.Err(err)
|
return missingBlobs, errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information
|
// AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored",
|
||||||
// into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is
|
// but they are tracked so reflector knows what it is missing.
|
||||||
// rolled back and error(s) are returned.
|
|
||||||
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
|
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
|
||||||
if s.conn == nil {
|
if s.conn == nil {
|
||||||
return errors.Err("not connected")
|
return errors.Err("not connected")
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert sd blob
|
sdBlobID, err := s.insertBlob(sdHash, sdBlobLength, true)
|
||||||
err := addBlob(s.conn, sdHash, sdBlobLength, true)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert stream
|
streamID, err := s.insertStream(sdBlob.StreamHash, sdBlobID)
|
||||||
err = exec(s.conn,
|
|
||||||
"INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)",
|
|
||||||
sdBlob.StreamHash, sdHash,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert content blobs and connect them to stream
|
// insert content blobs and connect them to stream
|
||||||
|
@ -277,14 +304,14 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err := addBlob(s.conn, contentBlob.BlobHash, contentBlob.Length, false)
|
blobID, err := s.insertBlob(contentBlob.BlobHash, contentBlob.Length, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = exec(s.conn,
|
_, err = s.exec(
|
||||||
"INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)",
|
"INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES (?,?,?)",
|
||||||
sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum,
|
streamID, blobID, contentBlob.BlobNum,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
|
@ -408,53 +435,67 @@ func closeRows(rows *sql.Rows) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Executor interface {
|
func (s *SQL) exec(query string, args ...interface{}) (int64, error) {
|
||||||
Exec(query string, args ...interface{}) (sql.Result, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func exec(e Executor, query string, args ...interface{}) error {
|
|
||||||
logQuery(query, args...)
|
logQuery(query, args...)
|
||||||
attempt, maxAttempts := 0, 3
|
attempt, maxAttempts := 0, 3
|
||||||
var err error
|
|
||||||
Retry:
|
Retry:
|
||||||
attempt++
|
attempt++
|
||||||
_, err = e.Exec(query, args...)
|
result, err := s.conn.Exec(query, args...)
|
||||||
if e, ok := err.(*mysql.MySQLError); ok && e.Number == 1205 {
|
if isLockTimeoutError(err) {
|
||||||
if attempt <= maxAttempts {
|
if attempt <= maxAttempts {
|
||||||
//Error 1205: Lock wait timeout exceeded; try restarting transaction
|
//Error 1205: Lock wait timeout exceeded; try restarting transaction
|
||||||
goto Retry
|
goto Retry
|
||||||
}
|
}
|
||||||
err = errors.Prefix("Timed out query "+query, err)
|
err = errors.Prefix("Lock timeout for query "+query, err)
|
||||||
}
|
}
|
||||||
return errors.Err(err)
|
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lastID, err := result.LastInsertId()
|
||||||
|
return lastID, errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isLockTimeoutError(err error) bool {
|
||||||
|
e, ok := err.(*mysql.MySQLError)
|
||||||
|
return ok && e != nil && e.Number == 1205
|
||||||
}
|
}
|
||||||
|
|
||||||
/* SQL schema
|
/* SQL schema
|
||||||
|
|
||||||
in prod, set tx_isolation to READ-COMMITTED to improve db performance
|
in prod, set tx_isolation to READ-COMMITTED to improve db performance
|
||||||
|
make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space.
|
||||||
|
|
||||||
|
todo: could add UNIQUE KEY (stream_hash, num) to stream_blob ...
|
||||||
|
|
||||||
CREATE TABLE blob_ (
|
CREATE TABLE blob_ (
|
||||||
|
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE,
|
||||||
hash char(96) NOT NULL,
|
hash char(96) NOT NULL,
|
||||||
is_stored TINYINT(1) NOT NULL DEFAULT 0,
|
is_stored TINYINT(1) NOT NULL DEFAULT 0,
|
||||||
length bigint(20) unsigned DEFAULT NULL,
|
length bigint(20) unsigned DEFAULT NULL,
|
||||||
PRIMARY KEY (hash)
|
PRIMARY KEY (id),
|
||||||
|
UNIQUE KEY blob_hash_idx (hash)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE stream (
|
CREATE TABLE stream (
|
||||||
|
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE,
|
||||||
hash char(96) NOT NULL,
|
hash char(96) NOT NULL,
|
||||||
sd_hash char(96) NOT NULL,
|
sd_blob_id BIGINT UNSIGNED NOT NULL,
|
||||||
PRIMARY KEY (hash),
|
PRIMARY KEY (id),
|
||||||
KEY sd_hash_idx (sd_hash),
|
UNIQUE KEY stream_hash_idx (hash),
|
||||||
FOREIGN KEY (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE
|
KEY stream_sd_blob_id_idx (sd_blob_id),
|
||||||
|
FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE stream_blob (
|
CREATE TABLE stream_blob (
|
||||||
stream_hash char(96) NOT NULL,
|
stream_id BIGINT UNSIGNED NOT NULL,
|
||||||
blob_hash char(96) NOT NULL,
|
blob_id BIGINT UNSIGNED NOT NULL,
|
||||||
num int NOT NULL,
|
num int NOT NULL,
|
||||||
PRIMARY KEY (stream_hash, blob_hash),
|
PRIMARY KEY (stream_id, blob_id),
|
||||||
FOREIGN KEY (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE,
|
KEY stream_blob_blob_id_idx (blob_id),
|
||||||
FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE
|
FOREIGN KEY (stream_id) REFERENCES stream (id) ON DELETE CASCADE ON UPDATE CASCADE,
|
||||||
|
FOREIGN KEY (blob_id) REFERENCES blob_ (id) ON DELETE CASCADE ON UPDATE CASCADE
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE blocked (
|
CREATE TABLE blocked (
|
||||||
|
@ -462,6 +503,4 @@ CREATE TABLE blocked (
|
||||||
PRIMARY KEY (hash)
|
PRIMARY KEY (hash)
|
||||||
);
|
);
|
||||||
|
|
||||||
could add UNIQUE KEY (stream_hash, num) to stream_blob ...
|
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in a new issue