when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50
7 changed files with 357 additions and 447 deletions
|
@ -8,9 +8,10 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||||
"github.com/lbryio/reflector.go/db"
|
"github.com/lbryio/reflector.go/db"
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
"github.com/lbryio/reflector.go/lite_db"
|
|
||||||
"github.com/lbryio/reflector.go/meta"
|
"github.com/lbryio/reflector.go/meta"
|
||||||
"github.com/lbryio/reflector.go/peer"
|
"github.com/lbryio/reflector.go/peer"
|
||||||
"github.com/lbryio/reflector.go/peer/http3"
|
"github.com/lbryio/reflector.go/peer/http3"
|
||||||
|
@ -68,10 +69,11 @@ func init() {
|
||||||
|
|
||||||
func reflectorCmd(cmd *cobra.Command, args []string) {
|
func reflectorCmd(cmd *cobra.Command, args []string) {
|
||||||
log.Printf("reflector %s", meta.VersionString())
|
log.Printf("reflector %s", meta.VersionString())
|
||||||
|
cleanerStopper := stop.New()
|
||||||
|
|
||||||
// the blocklist logic requires the db backed store to be the outer-most store
|
// the blocklist logic requires the db backed store to be the outer-most store
|
||||||
underlyingStore := setupStore()
|
underlyingStore := setupStore()
|
||||||
outerStore := wrapWithCache(underlyingStore)
|
outerStore := wrapWithCache(underlyingStore, cleanerStopper)
|
||||||
|
|
||||||
if !disableUploads {
|
if !disableUploads {
|
||||||
reflectorServer := reflector.NewServer(underlyingStore)
|
reflectorServer := reflector.NewServer(underlyingStore)
|
||||||
|
@ -102,13 +104,14 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
|
||||||
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
||||||
metricsServer.Start()
|
metricsServer.Start()
|
||||||
defer metricsServer.Shutdown()
|
defer metricsServer.Shutdown()
|
||||||
defer underlyingStore.Shutdown()
|
|
||||||
defer outerStore.Shutdown()
|
defer outerStore.Shutdown()
|
||||||
|
defer underlyingStore.Shutdown()
|
||||||
|
|
||||||
interruptChan := make(chan os.Signal, 1)
|
interruptChan := make(chan os.Signal, 1)
|
||||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||||
<-interruptChan
|
<-interruptChan
|
||||||
// deferred shutdowns happen now
|
// deferred shutdowns happen now
|
||||||
|
cleanerStopper.StopAndWait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupStore() store.BlobStore {
|
func setupStore() store.BlobStore {
|
||||||
|
@ -149,20 +152,20 @@ func setupStore() store.BlobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
if useDB {
|
if useDB {
|
||||||
db := new(db.SQL)
|
dbInst := new(db.SQL)
|
||||||
db.TrackAccessTime = true
|
dbInst.TrackAccess = db.TrackAccessStreams
|
||||||
err := db.Connect(globalConfig.DBConn)
|
err := dbInst.Connect(globalConfig.DBConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
s = store.NewDBBackedStore(s, db, false)
|
s = store.NewDBBackedStore(s, dbInst, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func wrapWithCache(s store.BlobStore) store.BlobStore {
|
func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore {
|
||||||
wrapped := s
|
wrapped := s
|
||||||
|
|
||||||
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
|
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
|
||||||
|
@ -174,17 +177,22 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
localDb := new(lite_db.SQL)
|
|
||||||
localDb.TrackAccessTime = true
|
localDb := new(db.SQL)
|
||||||
|
localDb.SoftDelete = true
|
||||||
|
localDb.TrackAccess = db.TrackAccessBlobs
|
||||||
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false)
|
||||||
wrapped = store.NewCachingStore(
|
wrapped = store.NewCachingStore(
|
||||||
"reflector",
|
"reflector",
|
||||||
wrapped,
|
wrapped,
|
||||||
store.NewLiteDBBackedStore("hdd", store.NewDiskStore(diskCachePath, 2), localDb, int(realCacheSize)),
|
store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper)
|
||||||
}
|
}
|
||||||
|
|
||||||
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
|
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
|
||||||
|
@ -238,3 +246,49 @@ func diskCacheParams(diskParams string) (int, string) {
|
||||||
}
|
}
|
||||||
return int(maxSize), path
|
return int(maxSize), path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
|
||||||
|
const cleanupInterval = 10 * time.Second
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-stopper.Ch():
|
||||||
|
log.Infoln("stopping self cleanup")
|
||||||
|
return
|
||||||
|
case <-time.After(cleanupInterval):
|
||||||
|
err := doClean(maxItems, db, store, stopper)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(errors.FullTrace(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error {
|
||||||
|
blobsCount, err := db.Count()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if blobsCount >= maxItems {
|
||||||
|
itemsToDelete := blobsCount / 10
|
||||||
|
blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, hash := range blobs {
|
||||||
|
select {
|
||||||
|
case <-stopper.Ch():
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
err = store.Delete(hash)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
264
db/db.go
264
db/db.go
|
@ -30,11 +30,26 @@ type SdBlob struct {
|
||||||
StreamHash string `json:"stream_hash"`
|
StreamHash string `json:"stream_hash"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type trackAccess int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Don't track accesses
|
||||||
|
TrackAccessNone trackAccess = iota
|
||||||
|
// Track accesses at the stream level
|
||||||
|
TrackAccessStreams
|
||||||
|
// Track accesses at the blob level
|
||||||
|
TrackAccessBlobs
|
||||||
|
)
|
||||||
|
|
||||||
// SQL implements the DB interface
|
// SQL implements the DB interface
|
||||||
type SQL struct {
|
type SQL struct {
|
||||||
conn *sql.DB
|
conn *sql.DB
|
||||||
|
|
||||||
TrackAccessTime bool
|
// Track the approx last time a blob or stream was accessed
|
||||||
|
TrackAccess trackAccess
|
||||||
|
|
||||||
|
// Instead of deleting a blob, marked it as not stored in the db
|
||||||
|
SoftDelete bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func logQuery(query string, args ...interface{}) {
|
func logQuery(query string, args ...interface{}) {
|
||||||
|
@ -78,11 +93,19 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
|
||||||
return 0, errors.Err("length must be positive")
|
return 0, errors.Err("length must be positive")
|
||||||
}
|
}
|
||||||
|
|
||||||
args := []interface{}{hash, isStored, length}
|
var (
|
||||||
blobID, err := s.exec(
|
q string
|
||||||
"INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
|
args []interface{}
|
||||||
args...,
|
|
||||||
)
|
)
|
||||||
|
if s.TrackAccess == TrackAccessBlobs {
|
||||||
|
args = []interface{}{hash, isStored, length, time.Now()}
|
||||||
|
q = "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at = VALUES(last_accessed_at)"
|
||||||
|
} else {
|
||||||
|
args = []interface{}{hash, isStored, length}
|
||||||
|
q = "INSERT INTO blob_ (hash, is_stored, length) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))"
|
||||||
|
}
|
||||||
|
|
||||||
|
blobID, err := s.exec(q, args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -95,17 +118,33 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
|
||||||
if blobID == 0 {
|
if blobID == 0 {
|
||||||
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
|
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.TrackAccess == TrackAccessBlobs {
|
||||||
|
err := s.touchBlobs([]uint64{uint64(blobID)})
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return blobID, nil
|
return blobID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
||||||
args := []interface{}{hash, sdBlobID, time.Now()}
|
var (
|
||||||
streamID, err := s.exec(
|
q string
|
||||||
"INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")",
|
args []interface{}
|
||||||
args...,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if s.TrackAccess == TrackAccessStreams {
|
||||||
|
args = []interface{}{hash, sdBlobID, time.Now()}
|
||||||
|
q = "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ")"
|
||||||
|
} else {
|
||||||
|
args = []interface{}{hash, sdBlobID}
|
||||||
|
q = "INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (" + qt.Qs(len(args)) + ")"
|
||||||
|
}
|
||||||
|
|
||||||
|
streamID, err := s.exec(q, args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.Err(err)
|
return 0, errors.Err(err)
|
||||||
}
|
}
|
||||||
|
@ -119,8 +158,8 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
||||||
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
|
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.TrackAccessTime {
|
if s.TrackAccess == TrackAccessStreams {
|
||||||
err := s.touch([]uint64{uint64(streamID)})
|
err := s.touchStreams([]uint64{uint64(streamID)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.Err(err)
|
return 0, errors.Err(err)
|
||||||
}
|
}
|
||||||
|
@ -140,12 +179,36 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
|
||||||
|
|
||||||
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
||||||
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
||||||
exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
|
exists, idsNeedingTouch, err := s.hasBlobs(hashes)
|
||||||
s.touch(streamsNeedingTouch)
|
|
||||||
|
if s.TrackAccess == TrackAccessBlobs {
|
||||||
|
s.touchBlobs(idsNeedingTouch)
|
||||||
|
} else if s.TrackAccess == TrackAccessStreams {
|
||||||
|
s.touchStreams(idsNeedingTouch)
|
||||||
|
}
|
||||||
|
|
||||||
return exists, err
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQL) touch(streamIDs []uint64) error {
|
func (s *SQL) touchBlobs(blobIDs []uint64) error {
|
||||||
|
if len(blobIDs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")"
|
||||||
|
args := make([]interface{}, len(blobIDs)+1)
|
||||||
|
args[0] = time.Now()
|
||||||
|
for i := range blobIDs {
|
||||||
|
args[i+1] = blobIDs[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime := time.Now()
|
||||||
|
_, err := s.exec(query, args...)
|
||||||
|
log.Debugf("touched %d blobs and took %s", len(blobIDs), time.Since(startTime))
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SQL) touchStreams(streamIDs []uint64) error {
|
||||||
if len(streamIDs) == 0 {
|
if len(streamIDs) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -159,7 +222,7 @@ func (s *SQL) touch(streamIDs []uint64) error {
|
||||||
|
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
_, err := s.exec(query, args...)
|
_, err := s.exec(query, args...)
|
||||||
log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime))
|
log.Debugf("touched %d streams and took %s", len(streamIDs), time.Since(startTime))
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,7 +233,8 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
hash string
|
hash string
|
||||||
streamID uint64
|
blobID uint64
|
||||||
|
streamID null.Uint64
|
||||||
lastAccessedAt null.Time
|
lastAccessedAt null.Time
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -189,17 +253,23 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
|
||||||
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
|
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
|
||||||
batch := hashes[doneIndex:sliceEnd]
|
batch := hashes[doneIndex:sliceEnd]
|
||||||
|
|
||||||
// TODO: this query doesn't work for SD blobs, which are not in the stream_blob table
|
var lastAccessedAtSelect string
|
||||||
|
if s.TrackAccess == TrackAccessBlobs {
|
||||||
|
lastAccessedAtSelect = "b.last_accessed_at"
|
||||||
|
} else if s.TrackAccess == TrackAccessStreams {
|
||||||
|
lastAccessedAtSelect = "s.last_accessed_at"
|
||||||
|
} else {
|
||||||
|
lastAccessedAtSelect = "NULL"
|
||||||
|
}
|
||||||
|
|
||||||
query := `SELECT b.hash, s.id, s.last_accessed_at
|
query := `SELECT b.hash, b.id, s.id, ` + lastAccessedAtSelect + `
|
||||||
FROM blob_ b
|
FROM blob_ b
|
||||||
LEFT JOIN stream_blob sb ON b.id = sb.blob_id
|
LEFT JOIN stream_blob sb ON b.id = sb.blob_id
|
||||||
INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id)
|
LEFT JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id)
|
||||||
WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
||||||
args := make([]interface{}, len(batch)+1)
|
args := make([]interface{}, len(batch))
|
||||||
args[0] = true
|
|
||||||
for i := range batch {
|
for i := range batch {
|
||||||
args[i+1] = batch[i]
|
args[i] = batch[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
logQuery(query, args...)
|
logQuery(query, args...)
|
||||||
|
@ -214,13 +284,17 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
||||||
defer closeRows(rows)
|
defer closeRows(rows)
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
err := rows.Scan(&hash, &streamID, &lastAccessedAt)
|
err := rows.Scan(&hash, &blobID, &streamID, &lastAccessedAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
exists[hash] = true
|
exists[hash] = true
|
||||||
if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) {
|
if !lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline) {
|
||||||
needsTouch = append(needsTouch, streamID)
|
if s.TrackAccess == TrackAccessBlobs {
|
||||||
|
needsTouch = append(needsTouch, blobID)
|
||||||
|
} else if s.TrackAccess == TrackAccessStreams && !streamID.IsZero() {
|
||||||
|
needsTouch = append(needsTouch, streamID.Uint64)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,8 +314,14 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
||||||
return exists, needsTouch, nil
|
return exists, needsTouch, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete will remove the blob from the db
|
// Delete will remove (or soft-delete) the blob from the db
|
||||||
|
// NOTE: If SoftDelete is enabled, streams will never be deleted
|
||||||
func (s *SQL) Delete(hash string) error {
|
func (s *SQL) Delete(hash string) error {
|
||||||
|
if s.SoftDelete {
|
||||||
|
_, err := s.exec("UPDATE blob_ SET is_stored = 0 WHERE hash = ?", hash)
|
||||||
|
return errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
_, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash)
|
_, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
|
@ -251,6 +331,88 @@ func (s *SQL) Delete(hash string) error {
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetHashRange gets the smallest and biggest hashes in the db
|
||||||
|
func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) {
|
||||||
|
if s.conn == nil {
|
||||||
|
return nil, errors.Err("not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.TrackAccess != TrackAccessBlobs {
|
||||||
|
return nil, errors.Err("blob access tracking is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
query := "SELECT hash from blob_ where is_stored = 1 order by last_accessed_at limit ?"
|
||||||
|
logQuery(query, maxBlobs)
|
||||||
|
|
||||||
|
rows, err := s.conn.Query(query, maxBlobs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Err(err)
|
||||||
|
}
|
||||||
|
defer closeRows(rows)
|
||||||
|
|
||||||
|
blobs := make([]string, 0, maxBlobs)
|
||||||
|
for rows.Next() {
|
||||||
|
var hash string
|
||||||
|
err := rows.Scan(&hash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Err(err)
|
||||||
|
}
|
||||||
|
blobs = append(blobs, hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
return blobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllHashes writes all hashes from the db into the channel.
|
||||||
|
// It does not close the channel when it finishes.
|
||||||
|
//func (s *SQL) AllHashes(ch chan<- string) error {
|
||||||
|
// if s.conn == nil {
|
||||||
|
// return errors.Err("not connected")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// query := "SELECT hash from blob_"
|
||||||
|
// if s.SoftDelete {
|
||||||
|
// query += " where is_stored = 1"
|
||||||
|
// }
|
||||||
|
// logQuery(query)
|
||||||
|
//
|
||||||
|
// rows, err := s.conn.Query(query)
|
||||||
|
// if err != nil {
|
||||||
|
// return errors.Err(err)
|
||||||
|
// }
|
||||||
|
// defer closeRows(rows)
|
||||||
|
//
|
||||||
|
// for rows.Next() {
|
||||||
|
// var hash string
|
||||||
|
// err := rows.Scan(&hash)
|
||||||
|
// if err != nil {
|
||||||
|
// return errors.Err(err)
|
||||||
|
// }
|
||||||
|
// ch <- hash
|
||||||
|
// // TODO: this needs testing
|
||||||
|
// // TODO: need a way to cancel this early (e.g. in case of shutdown)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// close(ch)
|
||||||
|
// return nil
|
||||||
|
//}
|
||||||
|
|
||||||
|
func (s *SQL) Count() (int, error) {
|
||||||
|
if s.conn == nil {
|
||||||
|
return 0, errors.Err("not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
query := "SELECT count(id) from blob_"
|
||||||
|
if s.SoftDelete {
|
||||||
|
query += " where is_stored = 1"
|
||||||
|
}
|
||||||
|
logQuery(query)
|
||||||
|
|
||||||
|
var count int
|
||||||
|
err := s.conn.QueryRow(query).Scan(&count)
|
||||||
|
return count, errors.Err(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Block will mark a blob as blocked
|
// Block will mark a blob as blocked
|
||||||
func (s *SQL) Block(hash string) error {
|
func (s *SQL) Block(hash string) error {
|
||||||
query := "INSERT IGNORE INTO blocked SET hash = ?"
|
query := "INSERT IGNORE INTO blocked SET hash = ?"
|
||||||
|
@ -528,8 +690,10 @@ CREATE TABLE blob_ (
|
||||||
hash char(96) NOT NULL,
|
hash char(96) NOT NULL,
|
||||||
is_stored TINYINT(1) NOT NULL DEFAULT 0,
|
is_stored TINYINT(1) NOT NULL DEFAULT 0,
|
||||||
length bigint(20) unsigned DEFAULT NULL,
|
length bigint(20) unsigned DEFAULT NULL,
|
||||||
|
last_accessed_at TIMESTAMP NULL DEFAULT NULL,
|
||||||
PRIMARY KEY (id),
|
PRIMARY KEY (id),
|
||||||
UNIQUE KEY blob_hash_idx (hash)
|
UNIQUE KEY blob_hash_idx (hash),
|
||||||
|
KEY `blob_last_accessed_idx` (`last_accessed_at`)
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE stream (
|
CREATE TABLE stream (
|
||||||
|
@ -560,3 +724,47 @@ CREATE TABLE blocked (
|
||||||
);
|
);
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
//func (d *LiteDBBackedStore) selfClean() {
|
||||||
|
// d.stopper.Add(1)
|
||||||
|
// defer d.stopper.Done()
|
||||||
|
// lastCleanup := time.Now()
|
||||||
|
// const cleanupInterval = 10 * time.Second
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// case <-d.stopper.Ch():
|
||||||
|
// log.Infoln("stopping self cleanup")
|
||||||
|
// return
|
||||||
|
// default:
|
||||||
|
// time.Sleep(1 * time.Second)
|
||||||
|
// }
|
||||||
|
// if time.Since(lastCleanup) < cleanupInterval {
|
||||||
|
// continue
|
||||||
|
//
|
||||||
|
// blobsCount, err := d.db.BlobsCount()
|
||||||
|
// if err != nil {
|
||||||
|
// log.Errorf(errors.FullTrace(err))
|
||||||
|
// }
|
||||||
|
// if blobsCount >= d.maxItems {
|
||||||
|
// itemsToDelete := blobsCount / 100 * 10
|
||||||
|
// blobs, err := d.db.GetLRUBlobs(itemsToDelete)
|
||||||
|
// if err != nil {
|
||||||
|
// log.Errorf(errors.FullTrace(err))
|
||||||
|
// }
|
||||||
|
// for _, hash := range blobs {
|
||||||
|
// select {
|
||||||
|
// case <-d.stopper.Ch():
|
||||||
|
// return
|
||||||
|
// default:
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
// err = d.Delete(hash)
|
||||||
|
// if err != nil {
|
||||||
|
// log.Errorf(errors.FullTrace(err))
|
||||||
|
// }
|
||||||
|
// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// lastCleanup = time.Now()
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
4
go.mod
4
go.mod
|
@ -12,8 +12,7 @@ require (
|
||||||
github.com/davecgh/go-spew v1.1.1
|
github.com/davecgh/go-spew v1.1.1
|
||||||
github.com/go-sql-driver/mysql v1.4.1
|
github.com/go-sql-driver/mysql v1.4.1
|
||||||
github.com/golang/protobuf v1.4.2
|
github.com/golang/protobuf v1.4.2
|
||||||
github.com/google/btree v1.0.0 // indirect
|
github.com/google/gops v0.3.7
|
||||||
github.com/google/gops v0.3.7 // indirect
|
|
||||||
github.com/gorilla/mux v1.7.4
|
github.com/gorilla/mux v1.7.4
|
||||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||||
github.com/hashicorp/golang-lru v0.5.4
|
github.com/hashicorp/golang-lru v0.5.4
|
||||||
|
@ -33,7 +32,6 @@ require (
|
||||||
github.com/spf13/afero v1.4.1 // indirect
|
github.com/spf13/afero v1.4.1 // indirect
|
||||||
github.com/spf13/cast v1.3.0
|
github.com/spf13/cast v1.3.0
|
||||||
github.com/spf13/cobra v0.0.3
|
github.com/spf13/cobra v0.0.3
|
||||||
github.com/spf13/pflag v1.0.3 // indirect
|
|
||||||
github.com/spf13/viper v1.7.1 // indirect
|
github.com/spf13/viper v1.7.1 // indirect
|
||||||
github.com/stretchr/testify v1.4.0
|
github.com/stretchr/testify v1.4.0
|
||||||
github.com/volatiletech/null v8.0.0+incompatible
|
github.com/volatiletech/null v8.0.0+incompatible
|
||||||
|
|
348
lite_db/db.go
348
lite_db/db.go
|
@ -1,348 +0,0 @@
|
||||||
package lite_db
|
|
||||||
|
|
||||||
import (
|
|
||||||
"database/sql"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
||||||
qt "github.com/lbryio/lbry.go/v2/extras/query"
|
|
||||||
|
|
||||||
"github.com/go-sql-driver/mysql"
|
|
||||||
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"github.com/volatiletech/null"
|
|
||||||
)
|
|
||||||
|
|
||||||
// SdBlob is a special blob that contains information on the rest of the blobs in the stream
|
|
||||||
type SdBlob struct {
|
|
||||||
StreamName string `json:"stream_name"`
|
|
||||||
Blobs []struct {
|
|
||||||
Length int `json:"length"`
|
|
||||||
BlobNum int `json:"blob_num"`
|
|
||||||
BlobHash string `json:"blob_hash,omitempty"`
|
|
||||||
IV string `json:"iv"`
|
|
||||||
} `json:"blobs"`
|
|
||||||
StreamType string `json:"stream_type"`
|
|
||||||
Key string `json:"key"`
|
|
||||||
SuggestedFileName string `json:"suggested_file_name"`
|
|
||||||
StreamHash string `json:"stream_hash"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// SQL implements the DB interface
|
|
||||||
type SQL struct {
|
|
||||||
conn *sql.DB
|
|
||||||
|
|
||||||
TrackAccessTime bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func logQuery(query string, args ...interface{}) {
|
|
||||||
s, err := qt.InterpolateParams(query, args...)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorln(err)
|
|
||||||
} else {
|
|
||||||
log.Debugln(s)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect will create a connection to the database
|
|
||||||
func (s *SQL) Connect(dsn string) error {
|
|
||||||
var err error
|
|
||||||
// interpolateParams is necessary. otherwise uploading a stream with thousands of blobs
|
|
||||||
// will hit MySQL's max_prepared_stmt_count limit because the prepared statements are all
|
|
||||||
// opened inside a transaction. closing them manually doesn't seem to help
|
|
||||||
dsn += "?parseTime=1&collation=utf8mb4_unicode_ci&interpolateParams=1"
|
|
||||||
s.conn, err = sql.Open("mysql", dsn)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.conn.SetMaxIdleConns(12)
|
|
||||||
|
|
||||||
return errors.Err(s.conn.Ping())
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddBlob adds a blob to the database.
|
|
||||||
func (s *SQL) AddBlob(hash string, length int) error {
|
|
||||||
if s.conn == nil {
|
|
||||||
return errors.Err("not connected")
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := s.insertBlob(hash, length)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SQL) insertBlob(hash string, length int) (int64, error) {
|
|
||||||
if length <= 0 {
|
|
||||||
return 0, errors.Err("length must be positive")
|
|
||||||
}
|
|
||||||
const isStored = true
|
|
||||||
now := time.Now()
|
|
||||||
args := []interface{}{hash, isStored, length, now}
|
|
||||||
blobID, err := s.exec(
|
|
||||||
"INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at=VALUES(last_accessed_at)",
|
|
||||||
args...,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if blobID == 0 {
|
|
||||||
err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID)
|
|
||||||
if err != nil {
|
|
||||||
return 0, errors.Err(err)
|
|
||||||
}
|
|
||||||
if blobID == 0 {
|
|
||||||
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return blobID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasBlob checks if the database contains the blob information.
|
|
||||||
func (s *SQL) HasBlob(hash string) (bool, error) {
|
|
||||||
exists, err := s.HasBlobs([]string{hash})
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
return exists[hash], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
|
||||||
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
|
||||||
exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
|
|
||||||
s.touch(streamsNeedingTouch)
|
|
||||||
return exists, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SQL) touch(blobIDs []uint64) error {
|
|
||||||
if len(blobIDs) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")"
|
|
||||||
args := make([]interface{}, len(blobIDs)+1)
|
|
||||||
args[0] = time.Now()
|
|
||||||
for i := range blobIDs {
|
|
||||||
args[i+1] = blobIDs[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
startTime := time.Now()
|
|
||||||
_, err := s.exec(query, args...)
|
|
||||||
log.Debugf("blobs access query touched %d blobs and took %s", len(blobIDs), time.Since(startTime))
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
|
|
||||||
if s.conn == nil {
|
|
||||||
return nil, nil, errors.Err("not connected")
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
hash string
|
|
||||||
blobID uint64
|
|
||||||
lastAccessedAt null.Time
|
|
||||||
)
|
|
||||||
|
|
||||||
var needsTouch []uint64
|
|
||||||
exists := make(map[string]bool)
|
|
||||||
|
|
||||||
touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time
|
|
||||||
maxBatchSize := 10000
|
|
||||||
doneIndex := 0
|
|
||||||
|
|
||||||
for len(hashes) > doneIndex {
|
|
||||||
sliceEnd := doneIndex + maxBatchSize
|
|
||||||
if sliceEnd > len(hashes) {
|
|
||||||
sliceEnd = len(hashes)
|
|
||||||
}
|
|
||||||
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
|
|
||||||
batch := hashes[doneIndex:sliceEnd]
|
|
||||||
|
|
||||||
// TODO: this query doesn't work for SD blobs, which are not in the stream_blob table
|
|
||||||
|
|
||||||
query := `SELECT hash, id, last_accessed_at
|
|
||||||
FROM blob_
|
|
||||||
WHERE is_stored = ? and hash IN (` + qt.Qs(len(batch)) + `)`
|
|
||||||
args := make([]interface{}, len(batch)+1)
|
|
||||||
args[0] = true
|
|
||||||
for i := range batch {
|
|
||||||
args[i+1] = batch[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
logQuery(query, args...)
|
|
||||||
|
|
||||||
err := func() error {
|
|
||||||
startTime := time.Now()
|
|
||||||
rows, err := s.conn.Query(query, args...)
|
|
||||||
log.Debugf("hashes query took %s", time.Since(startTime))
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
defer closeRows(rows)
|
|
||||||
|
|
||||||
for rows.Next() {
|
|
||||||
err := rows.Scan(&hash, &blobID, &lastAccessedAt)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
exists[hash] = true
|
|
||||||
if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) {
|
|
||||||
needsTouch = append(needsTouch, blobID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = rows.Err()
|
|
||||||
if err != nil {
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
doneIndex += len(batch)
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return exists, needsTouch, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete will remove the blob from the db
|
|
||||||
func (s *SQL) Delete(hash string) error {
|
|
||||||
_, err := s.exec("UPDATE blob_ set is_stored = ? WHERE hash = ?", 0, hash)
|
|
||||||
return errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored",
|
|
||||||
// but they are tracked so reflector knows what it is missing.
|
|
||||||
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int) error {
|
|
||||||
if s.conn == nil {
|
|
||||||
return errors.Err("not connected")
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := s.insertBlob(sdHash, sdBlobLength)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetHashRange gets the smallest and biggest hashes in the db
|
|
||||||
func (s *SQL) GetLRUBlobs(maxBlobs int) ([]string, error) {
|
|
||||||
if s.conn == nil {
|
|
||||||
return nil, errors.Err("not connected")
|
|
||||||
}
|
|
||||||
|
|
||||||
query := "SELECT hash from blob_ where is_stored = ? order by last_accessed_at limit ?"
|
|
||||||
const isStored = true
|
|
||||||
logQuery(query, isStored, maxBlobs)
|
|
||||||
rows, err := s.conn.Query(query, isStored, maxBlobs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Err(err)
|
|
||||||
}
|
|
||||||
defer closeRows(rows)
|
|
||||||
blobs := make([]string, 0, maxBlobs)
|
|
||||||
for rows.Next() {
|
|
||||||
var hash string
|
|
||||||
err := rows.Scan(&hash)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Err(err)
|
|
||||||
}
|
|
||||||
blobs = append(blobs, hash)
|
|
||||||
}
|
|
||||||
return blobs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SQL) AllBlobs() ([]string, error) {
|
|
||||||
if s.conn == nil {
|
|
||||||
return nil, errors.Err("not connected")
|
|
||||||
}
|
|
||||||
|
|
||||||
query := "SELECT hash from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense?
|
|
||||||
const isStored = true
|
|
||||||
logQuery(query, isStored)
|
|
||||||
rows, err := s.conn.Query(query, isStored)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Err(err)
|
|
||||||
}
|
|
||||||
defer closeRows(rows)
|
|
||||||
totalBlobs, err := s.BlobsCount()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
blobs := make([]string, 0, totalBlobs)
|
|
||||||
for rows.Next() {
|
|
||||||
var hash string
|
|
||||||
err := rows.Scan(&hash)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Err(err)
|
|
||||||
}
|
|
||||||
blobs = append(blobs, hash)
|
|
||||||
}
|
|
||||||
return blobs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SQL) BlobsCount() (int, error) {
|
|
||||||
if s.conn == nil {
|
|
||||||
return 0, errors.Err("not connected")
|
|
||||||
}
|
|
||||||
|
|
||||||
query := "SELECT count(id) from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense?
|
|
||||||
const isStored = true
|
|
||||||
logQuery(query, isStored)
|
|
||||||
var count int
|
|
||||||
err := s.conn.QueryRow(query, isStored).Scan(&count)
|
|
||||||
return count, errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func closeRows(rows *sql.Rows) {
|
|
||||||
if rows != nil {
|
|
||||||
err := rows.Close()
|
|
||||||
if err != nil {
|
|
||||||
log.Error("error closing rows: ", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SQL) exec(query string, args ...interface{}) (int64, error) {
|
|
||||||
logQuery(query, args...)
|
|
||||||
attempt, maxAttempts := 0, 3
|
|
||||||
Retry:
|
|
||||||
attempt++
|
|
||||||
result, err := s.conn.Exec(query, args...)
|
|
||||||
if isLockTimeoutError(err) {
|
|
||||||
if attempt <= maxAttempts {
|
|
||||||
//Error 1205: Lock wait timeout exceeded; try restarting transaction
|
|
||||||
goto Retry
|
|
||||||
}
|
|
||||||
err = errors.Prefix("Lock timeout for query "+query, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return 0, errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lastID, err := result.LastInsertId()
|
|
||||||
return lastID, errors.Err(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func isLockTimeoutError(err error) bool {
|
|
||||||
e, ok := err.(*mysql.MySQLError)
|
|
||||||
return ok && e != nil && e.Number == 1205
|
|
||||||
}
|
|
||||||
|
|
||||||
/* SQL schema
|
|
||||||
|
|
||||||
in prod make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space.
|
|
||||||
|
|
||||||
CREATE TABLE `blob_` (
|
|
||||||
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
|
|
||||||
`hash` char(96) NOT NULL,
|
|
||||||
`is_stored` tinyint(1) NOT NULL DEFAULT '0',
|
|
||||||
`length` bigint unsigned DEFAULT NULL,
|
|
||||||
`last_accessed_at` datetime DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (`id`),
|
|
||||||
UNIQUE KEY `id` (`id`),
|
|
||||||
UNIQUE KEY `blob_hash_idx` (`hash`),
|
|
||||||
KEY `blob_last_accessed_idx` (`last_accessed_at`)
|
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=latin1
|
|
||||||
|
|
||||||
*/
|
|
|
@ -168,3 +168,7 @@ func (s *SlowBlobStore) Delete(hash string) error {
|
||||||
time.Sleep(s.delay)
|
time.Sleep(s.delay)
|
||||||
return s.mem.Delete(hash)
|
return s.mem.Delete(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SlowBlobStore) Shutdown() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -4,10 +4,9 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/db"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/db"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
|
@ -2,12 +2,9 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||||
"github.com/lbryio/reflector.go/db"
|
"github.com/lbryio/reflector.go/db"
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
|
||||||
"github.com/lbryio/reflector.go/lite_db"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
@ -18,18 +15,15 @@ import (
|
||||||
// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store.
|
// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store.
|
||||||
type LiteDBBackedStore struct {
|
type LiteDBBackedStore struct {
|
||||||
blobs BlobStore
|
blobs BlobStore
|
||||||
db *lite_db.SQL
|
db *db.SQL
|
||||||
maxItems int
|
maxItems int
|
||||||
stopper *stop.Stopper
|
stopper *stop.Stopper
|
||||||
component string
|
component string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDBBackedStore returns an initialized store pointer.
|
// NewDBBackedStore returns an initialized store pointer.
|
||||||
func NewLiteDBBackedStore(component string, blobs BlobStore, db *lite_db.SQL, maxItems int) *LiteDBBackedStore {
|
func NewLiteDBBackedStore(component string, blobs BlobStore, db *db.SQL, maxItems int) *LiteDBBackedStore {
|
||||||
instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component}
|
instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component}
|
||||||
go func() {
|
|
||||||
instance.selfClean()
|
|
||||||
}()
|
|
||||||
return instance
|
return instance
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,7 +65,7 @@ func (d *LiteDBBackedStore) Put(hash string, blob stream.Blob) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.db.AddBlob(hash, len(blob))
|
return d.db.AddBlob(hash, len(blob), true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
|
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
|
||||||
|
@ -91,7 +85,7 @@ func (d *LiteDBBackedStore) PutSD(hash string, blob stream.Blob) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.db.AddSDBlob(hash, len(blob))
|
return d.db.AddSDBlob(hash, len(blob), blobContents)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *LiteDBBackedStore) Delete(hash string) error {
|
func (d *LiteDBBackedStore) Delete(hash string) error {
|
||||||
|
@ -105,53 +99,54 @@ func (d *LiteDBBackedStore) Delete(hash string) error {
|
||||||
|
|
||||||
// list returns the hashes of blobs that already exist in the database
|
// list returns the hashes of blobs that already exist in the database
|
||||||
func (d *LiteDBBackedStore) list() ([]string, error) {
|
func (d *LiteDBBackedStore) list() ([]string, error) {
|
||||||
blobs, err := d.db.AllBlobs()
|
//blobs, err := d.db.AllBlobs()
|
||||||
return blobs, err
|
//return blobs, err
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *LiteDBBackedStore) selfClean() {
|
//func (d *LiteDBBackedStore) selfClean() {
|
||||||
d.stopper.Add(1)
|
// d.stopper.Add(1)
|
||||||
defer d.stopper.Done()
|
// defer d.stopper.Done()
|
||||||
lastCleanup := time.Now()
|
// lastCleanup := time.Now()
|
||||||
const cleanupInterval = 10 * time.Second
|
// const cleanupInterval = 10 * time.Second
|
||||||
for {
|
// for {
|
||||||
select {
|
// select {
|
||||||
case <-d.stopper.Ch():
|
// case <-d.stopper.Ch():
|
||||||
log.Infoln("stopping self cleanup")
|
// log.Infoln("stopping self cleanup")
|
||||||
return
|
// return
|
||||||
default:
|
// default:
|
||||||
time.Sleep(1 * time.Second)
|
// time.Sleep(1 * time.Second)
|
||||||
}
|
// }
|
||||||
if time.Since(lastCleanup) < cleanupInterval {
|
// if time.Since(lastCleanup) < cleanupInterval {
|
||||||
continue
|
// continue
|
||||||
}
|
// }
|
||||||
blobsCount, err := d.db.BlobsCount()
|
// blobsCount, err := d.db.BlobsCount()
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
log.Errorf(errors.FullTrace(err))
|
// log.Errorf(errors.FullTrace(err))
|
||||||
}
|
// }
|
||||||
if blobsCount >= d.maxItems {
|
// if blobsCount >= d.maxItems {
|
||||||
itemsToDelete := blobsCount / 100 * 10
|
// itemsToDelete := blobsCount / 100 * 10
|
||||||
blobs, err := d.db.GetLRUBlobs(itemsToDelete)
|
// blobs, err := d.db.GetLRUBlobs(itemsToDelete)
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
log.Errorf(errors.FullTrace(err))
|
// log.Errorf(errors.FullTrace(err))
|
||||||
}
|
// }
|
||||||
for _, hash := range blobs {
|
// for _, hash := range blobs {
|
||||||
select {
|
// select {
|
||||||
case <-d.stopper.Ch():
|
// case <-d.stopper.Ch():
|
||||||
return
|
// return
|
||||||
default:
|
// default:
|
||||||
|
//
|
||||||
}
|
// }
|
||||||
err = d.Delete(hash)
|
// err = d.Delete(hash)
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
log.Errorf(errors.FullTrace(err))
|
// log.Errorf(errors.FullTrace(err))
|
||||||
}
|
// }
|
||||||
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
|
// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
lastCleanup = time.Now()
|
// lastCleanup = time.Now()
|
||||||
}
|
// }
|
||||||
}
|
//}
|
||||||
|
|
||||||
// Shutdown shuts down the store gracefully
|
// Shutdown shuts down the store gracefully
|
||||||
func (d *LiteDBBackedStore) Shutdown() {
|
func (d *LiteDBBackedStore) Shutdown() {
|
||||||
|
|
Loading…
Reference in a new issue