348 lines
8.8 KiB
Go
348 lines
8.8 KiB
Go
package lite_db
|
|
|
|
import (
|
|
"database/sql"
|
|
"time"
|
|
|
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
qt "github.com/lbryio/lbry.go/v2/extras/query"
|
|
|
|
"github.com/go-sql-driver/mysql"
|
|
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/volatiletech/null"
|
|
)
|
|
|
|
// SdBlob is a special blob that contains information on the rest of the blobs in the stream
|
|
type SdBlob struct {
|
|
StreamName string `json:"stream_name"`
|
|
Blobs []struct {
|
|
Length int `json:"length"`
|
|
BlobNum int `json:"blob_num"`
|
|
BlobHash string `json:"blob_hash,omitempty"`
|
|
IV string `json:"iv"`
|
|
} `json:"blobs"`
|
|
StreamType string `json:"stream_type"`
|
|
Key string `json:"key"`
|
|
SuggestedFileName string `json:"suggested_file_name"`
|
|
StreamHash string `json:"stream_hash"`
|
|
}
|
|
|
|
// SQL implements the DB interface
|
|
type SQL struct {
|
|
conn *sql.DB
|
|
|
|
TrackAccessTime bool
|
|
}
|
|
|
|
func logQuery(query string, args ...interface{}) {
|
|
s, err := qt.InterpolateParams(query, args...)
|
|
if err != nil {
|
|
log.Errorln(err)
|
|
} else {
|
|
log.Debugln(s)
|
|
}
|
|
}
|
|
|
|
// Connect will create a connection to the database
|
|
func (s *SQL) Connect(dsn string) error {
|
|
var err error
|
|
// interpolateParams is necessary. otherwise uploading a stream with thousands of blobs
|
|
// will hit MySQL's max_prepared_stmt_count limit because the prepared statements are all
|
|
// opened inside a transaction. closing them manually doesn't seem to help
|
|
dsn += "?parseTime=1&collation=utf8mb4_unicode_ci&interpolateParams=1"
|
|
s.conn, err = sql.Open("mysql", dsn)
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
|
|
s.conn.SetMaxIdleConns(12)
|
|
|
|
return errors.Err(s.conn.Ping())
|
|
}
|
|
|
|
// AddBlob adds a blob to the database.
|
|
func (s *SQL) AddBlob(hash string, length int) error {
|
|
if s.conn == nil {
|
|
return errors.Err("not connected")
|
|
}
|
|
|
|
_, err := s.insertBlob(hash, length)
|
|
return err
|
|
}
|
|
|
|
func (s *SQL) insertBlob(hash string, length int) (int64, error) {
|
|
if length <= 0 {
|
|
return 0, errors.Err("length must be positive")
|
|
}
|
|
const isStored = true
|
|
now := time.Now()
|
|
args := []interface{}{hash, isStored, length, now}
|
|
blobID, err := s.exec(
|
|
"INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at=VALUES(last_accessed_at)",
|
|
args...,
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if blobID == 0 {
|
|
err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID)
|
|
if err != nil {
|
|
return 0, errors.Err(err)
|
|
}
|
|
if blobID == 0 {
|
|
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
|
|
}
|
|
}
|
|
|
|
return blobID, nil
|
|
}
|
|
|
|
// HasBlob checks if the database contains the blob information.
|
|
func (s *SQL) HasBlob(hash string) (bool, error) {
|
|
exists, err := s.HasBlobs([]string{hash})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return exists[hash], nil
|
|
}
|
|
|
|
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
|
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
|
exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
|
|
s.touch(streamsNeedingTouch)
|
|
return exists, err
|
|
}
|
|
|
|
func (s *SQL) touch(blobIDs []uint64) error {
|
|
if len(blobIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")"
|
|
args := make([]interface{}, len(blobIDs)+1)
|
|
args[0] = time.Now()
|
|
for i := range blobIDs {
|
|
args[i+1] = blobIDs[i]
|
|
}
|
|
|
|
startTime := time.Now()
|
|
_, err := s.exec(query, args...)
|
|
log.Debugf("blobs access query touched %d blobs and took %s", len(blobIDs), time.Since(startTime))
|
|
return errors.Err(err)
|
|
}
|
|
|
|
func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
|
|
if s.conn == nil {
|
|
return nil, nil, errors.Err("not connected")
|
|
}
|
|
|
|
var (
|
|
hash string
|
|
blobID uint64
|
|
lastAccessedAt null.Time
|
|
)
|
|
|
|
var needsTouch []uint64
|
|
exists := make(map[string]bool)
|
|
|
|
touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time
|
|
maxBatchSize := 10000
|
|
doneIndex := 0
|
|
|
|
for len(hashes) > doneIndex {
|
|
sliceEnd := doneIndex + maxBatchSize
|
|
if sliceEnd > len(hashes) {
|
|
sliceEnd = len(hashes)
|
|
}
|
|
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
|
|
batch := hashes[doneIndex:sliceEnd]
|
|
|
|
// TODO: this query doesn't work for SD blobs, which are not in the stream_blob table
|
|
|
|
query := `SELECT hash, id, last_accessed_at
|
|
FROM blob_
|
|
WHERE is_stored = ? and hash IN (` + qt.Qs(len(batch)) + `)`
|
|
args := make([]interface{}, len(batch)+1)
|
|
args[0] = true
|
|
for i := range batch {
|
|
args[i+1] = batch[i]
|
|
}
|
|
|
|
logQuery(query, args...)
|
|
|
|
err := func() error {
|
|
startTime := time.Now()
|
|
rows, err := s.conn.Query(query, args...)
|
|
log.Debugf("hashes query took %s", time.Since(startTime))
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
defer closeRows(rows)
|
|
|
|
for rows.Next() {
|
|
err := rows.Scan(&hash, &blobID, &lastAccessedAt)
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
exists[hash] = true
|
|
if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) {
|
|
needsTouch = append(needsTouch, blobID)
|
|
}
|
|
}
|
|
|
|
err = rows.Err()
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
|
|
doneIndex += len(batch)
|
|
return nil
|
|
}()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
return exists, needsTouch, nil
|
|
}
|
|
|
|
// Delete will remove the blob from the db
|
|
func (s *SQL) Delete(hash string) error {
|
|
_, err := s.exec("UPDATE blob_ set is_stored = ? WHERE hash = ?", 0, hash)
|
|
return errors.Err(err)
|
|
}
|
|
|
|
// AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored",
|
|
// but they are tracked so reflector knows what it is missing.
|
|
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int) error {
|
|
if s.conn == nil {
|
|
return errors.Err("not connected")
|
|
}
|
|
|
|
_, err := s.insertBlob(sdHash, sdBlobLength)
|
|
return err
|
|
}
|
|
|
|
// GetHashRange gets the smallest and biggest hashes in the db
|
|
func (s *SQL) GetLRUBlobs(maxBlobs int) ([]string, error) {
|
|
if s.conn == nil {
|
|
return nil, errors.Err("not connected")
|
|
}
|
|
|
|
query := "SELECT hash from blob_ where is_stored = ? order by last_accessed_at limit ?"
|
|
const isStored = true
|
|
logQuery(query, isStored, maxBlobs)
|
|
rows, err := s.conn.Query(query, isStored, maxBlobs)
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
defer closeRows(rows)
|
|
blobs := make([]string, 0, maxBlobs)
|
|
for rows.Next() {
|
|
var hash string
|
|
err := rows.Scan(&hash)
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
blobs = append(blobs, hash)
|
|
}
|
|
return blobs, nil
|
|
}
|
|
|
|
func (s *SQL) AllBlobs() ([]string, error) {
|
|
if s.conn == nil {
|
|
return nil, errors.Err("not connected")
|
|
}
|
|
|
|
query := "SELECT hash from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense?
|
|
const isStored = true
|
|
logQuery(query, isStored)
|
|
rows, err := s.conn.Query(query, isStored)
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
defer closeRows(rows)
|
|
totalBlobs, err := s.BlobsCount()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
blobs := make([]string, 0, totalBlobs)
|
|
for rows.Next() {
|
|
var hash string
|
|
err := rows.Scan(&hash)
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
blobs = append(blobs, hash)
|
|
}
|
|
return blobs, nil
|
|
}
|
|
|
|
func (s *SQL) BlobsCount() (int, error) {
|
|
if s.conn == nil {
|
|
return 0, errors.Err("not connected")
|
|
}
|
|
|
|
query := "SELECT count(id) from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense?
|
|
const isStored = true
|
|
logQuery(query, isStored)
|
|
var count int
|
|
err := s.conn.QueryRow(query, isStored).Scan(&count)
|
|
return count, errors.Err(err)
|
|
}
|
|
|
|
func closeRows(rows *sql.Rows) {
|
|
if rows != nil {
|
|
err := rows.Close()
|
|
if err != nil {
|
|
log.Error("error closing rows: ", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *SQL) exec(query string, args ...interface{}) (int64, error) {
|
|
logQuery(query, args...)
|
|
attempt, maxAttempts := 0, 3
|
|
Retry:
|
|
attempt++
|
|
result, err := s.conn.Exec(query, args...)
|
|
if isLockTimeoutError(err) {
|
|
if attempt <= maxAttempts {
|
|
//Error 1205: Lock wait timeout exceeded; try restarting transaction
|
|
goto Retry
|
|
}
|
|
err = errors.Prefix("Lock timeout for query "+query, err)
|
|
}
|
|
|
|
if err != nil {
|
|
return 0, errors.Err(err)
|
|
}
|
|
|
|
lastID, err := result.LastInsertId()
|
|
return lastID, errors.Err(err)
|
|
}
|
|
|
|
func isLockTimeoutError(err error) bool {
|
|
e, ok := err.(*mysql.MySQLError)
|
|
return ok && e != nil && e.Number == 1205
|
|
}
|
|
|
|
/* SQL schema
|
|
|
|
in prod make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space.
|
|
|
|
CREATE TABLE `blob_` (
|
|
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
|
|
`hash` char(96) NOT NULL,
|
|
`is_stored` tinyint(1) NOT NULL DEFAULT '0',
|
|
`length` bigint unsigned DEFAULT NULL,
|
|
`last_accessed_at` datetime DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (`id`),
|
|
UNIQUE KEY `id` (`id`),
|
|
UNIQUE KEY `blob_hash_idx` (`hash`),
|
|
KEY `blob_last_accessed_idx` (`last_accessed_at`)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=latin1
|
|
|
|
*/
|