Insert in tx #51
18 changed files with 380 additions and 43 deletions
|
@ -8,6 +8,8 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
|
@ -67,10 +69,11 @@ func init() {
|
|||
|
||||
func reflectorCmd(cmd *cobra.Command, args []string) {
|
||||
log.Printf("reflector %s", meta.VersionString())
|
||||
cleanerStopper := stop.New()
|
||||
|
||||
// the blocklist logic requires the db backed store to be the outer-most store
|
||||
underlyingStore := setupStore()
|
||||
outerStore := wrapWithCache(underlyingStore)
|
||||
outerStore := wrapWithCache(underlyingStore, cleanerStopper)
|
||||
|
||||
if !disableUploads {
|
||||
reflectorServer := reflector.NewServer(underlyingStore)
|
||||
|
@ -101,11 +104,14 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
|
|||
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
||||
metricsServer.Start()
|
||||
defer metricsServer.Shutdown()
|
||||
defer outerStore.Shutdown()
|
||||
defer underlyingStore.Shutdown()
|
||||
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
<-interruptChan
|
||||
// deferred shutdowns happen now
|
||||
cleanerStopper.StopAndWait()
|
||||
}
|
||||
|
||||
func setupStore() store.BlobStore {
|
||||
|
@ -146,20 +152,20 @@ func setupStore() store.BlobStore {
|
|||
}
|
||||
|
||||
if useDB {
|
||||
db := new(db.SQL)
|
||||
db.TrackAccessTime = true
|
||||
err := db.Connect(globalConfig.DBConn)
|
||||
dbInst := new(db.SQL)
|
||||
dbInst.TrackAccess = db.TrackAccessStreams
|
||||
err := dbInst.Connect(globalConfig.DBConn)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
s = store.NewDBBackedStore(s, db, false)
|
||||
s = store.NewDBBackedStore(s, dbInst, false)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func wrapWithCache(s store.BlobStore) store.BlobStore {
|
||||
func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore {
|
||||
wrapped := s
|
||||
|
||||
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
|
||||
|
@ -171,11 +177,20 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
|||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
localDb := new(db.SQL)
|
||||
localDb.TrackAccess = db.TrackAccessBlobs
|
||||
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLFUDAStore("hdd", store.NewDiskStore(diskCachePath, 2), realCacheSize),
|
||||
store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false),
|
||||
)
|
||||
|
||||
go cleanOldestBlobs(int(realCacheSize), localDb, wrapped, cleanerStopper)
|
||||
}
|
||||
|
||||
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
|
||||
|
@ -229,3 +244,48 @@ func diskCacheParams(diskParams string) (int, string) {
|
|||
}
|
||||
return int(maxSize), path
|
||||
}
|
||||
|
||||
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
|
||||
const cleanupInterval = 10 * time.Second
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopper.Ch():
|
||||
log.Infoln("stopping self cleanup")
|
||||
return
|
||||
case <-time.After(cleanupInterval):
|
||||
err := doClean(maxItems, db, store, stopper)
|
||||
if err != nil {
|
||||
log.Error(errors.FullTrace(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error {
|
||||
blobsCount, err := db.Count()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if blobsCount >= maxItems {
|
||||
itemsToDelete := blobsCount / 10
|
||||
blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, hash := range blobs {
|
||||
select {
|
||||
case <-stopper.Ch():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
err = store.Delete(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
265
db/db.go
265
db/db.go
|
@ -30,11 +30,26 @@ type SdBlob struct {
|
|||
StreamHash string `json:"stream_hash"`
|
||||
}
|
||||
|
||||
type trackAccess int
|
||||
|
||||
const (
|
||||
// Don't track accesses
|
||||
TrackAccessNone trackAccess = iota
|
||||
// Track accesses at the stream level
|
||||
TrackAccessStreams
|
||||
// Track accesses at the blob level
|
||||
TrackAccessBlobs
|
||||
)
|
||||
|
||||
// SQL implements the DB interface
|
||||
type SQL struct {
|
||||
conn *sql.DB
|
||||
|
||||
TrackAccessTime bool
|
||||
// Track the approx last time a blob or stream was accessed
|
||||
TrackAccess trackAccess
|
||||
|
||||
// Instead of deleting a blob, marked it as not stored in the db
|
||||
SoftDelete bool
|
||||
}
|
||||
|
||||
func logQuery(query string, args ...interface{}) {
|
||||
|
@ -78,11 +93,19 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
|
|||
return 0, errors.Err("length must be positive")
|
||||
}
|
||||
|
||||
args := []interface{}{hash, isStored, length}
|
||||
blobID, err := s.exec(
|
||||
"INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
|
||||
args...,
|
||||
var (
|
||||
q string
|
||||
args []interface{}
|
||||
)
|
||||
if s.TrackAccess == TrackAccessBlobs {
|
||||
args = []interface{}{hash, isStored, length, time.Now()}
|
||||
q = "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at = VALUES(last_accessed_at)"
|
||||
} else {
|
||||
args = []interface{}{hash, isStored, length}
|
||||
q = "INSERT INTO blob_ (hash, is_stored, length) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))"
|
||||
}
|
||||
|
||||
blobID, err := s.exec(q, args...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -95,17 +118,33 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
|
|||
if blobID == 0 {
|
||||
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
|
||||
}
|
||||
|
||||
if s.TrackAccess == TrackAccessBlobs {
|
||||
err := s.touchBlobs([]uint64{uint64(blobID)})
|
||||
if err != nil {
|
||||
return 0, errors.Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return blobID, nil
|
||||
}
|
||||
|
||||
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
||||
args := []interface{}{hash, sdBlobID, time.Now()}
|
||||
streamID, err := s.exec(
|
||||
"INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")",
|
||||
args...,
|
||||
var (
|
||||
q string
|
||||
args []interface{}
|
||||
)
|
||||
|
||||
if s.TrackAccess == TrackAccessStreams {
|
||||
args = []interface{}{hash, sdBlobID, time.Now()}
|
||||
q = "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ")"
|
||||
} else {
|
||||
args = []interface{}{hash, sdBlobID}
|
||||
q = "INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (" + qt.Qs(len(args)) + ")"
|
||||
}
|
||||
|
||||
streamID, err := s.exec(q, args...)
|
||||
if err != nil {
|
||||
return 0, errors.Err(err)
|
||||
}
|
||||
|
@ -119,8 +158,8 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
|||
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
|
||||
}
|
||||
|
||||
if s.TrackAccessTime {
|
||||
err := s.touch([]uint64{uint64(streamID)})
|
||||
if s.TrackAccess == TrackAccessStreams {
|
||||
err := s.touchStreams([]uint64{uint64(streamID)})
|
||||
if err != nil {
|
||||
return 0, errors.Err(err)
|
||||
}
|
||||
|
@ -140,12 +179,36 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
|
|||
|
||||
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
||||
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
||||
exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
|
||||
s.touch(streamsNeedingTouch)
|
||||
exists, idsNeedingTouch, err := s.hasBlobs(hashes)
|
||||
|
||||
if s.TrackAccess == TrackAccessBlobs {
|
||||
s.touchBlobs(idsNeedingTouch)
|
||||
} else if s.TrackAccess == TrackAccessStreams {
|
||||
s.touchStreams(idsNeedingTouch)
|
||||
}
|
||||
|
||||
return exists, err
|
||||
}
|
||||
|
||||
func (s *SQL) touch(streamIDs []uint64) error {
|
||||
func (s *SQL) touchBlobs(blobIDs []uint64) error {
|
||||
if len(blobIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")"
|
||||
args := make([]interface{}, len(blobIDs)+1)
|
||||
args[0] = time.Now()
|
||||
for i := range blobIDs {
|
||||
args[i+1] = blobIDs[i]
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
_, err := s.exec(query, args...)
|
||||
log.Debugf("touched %d blobs and took %s", len(blobIDs), time.Since(startTime))
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
func (s *SQL) touchStreams(streamIDs []uint64) error {
|
||||
if len(streamIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -159,7 +222,7 @@ func (s *SQL) touch(streamIDs []uint64) error {
|
|||
|
||||
startTime := time.Now()
|
||||
_, err := s.exec(query, args...)
|
||||
log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime))
|
||||
log.Debugf("touched %d streams and took %s", len(streamIDs), time.Since(startTime))
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
|
@ -169,9 +232,9 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
|
|||
}
|
||||
|
||||
var (
|
||||
hash string
|
||||
streamID uint64
|
||||
lastAccessedAt null.Time
|
||||
hash string
|
||||
blobID, streamID uint64
|
||||
lastAccessedAt null.Time
|
||||
)
|
||||
|
||||
var needsTouch []uint64
|
||||
|
@ -189,17 +252,23 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
|
|||
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
|
||||
batch := hashes[doneIndex:sliceEnd]
|
||||
|
||||
// TODO: this query doesn't work for SD blobs, which are not in the stream_blob table
|
||||
var lastAccessedAtSelect string
|
||||
if s.TrackAccess == TrackAccessBlobs {
|
||||
lastAccessedAtSelect = "b.last_accessed_at"
|
||||
} else if s.TrackAccess == TrackAccessStreams {
|
||||
lastAccessedAtSelect = "s.last_accessed_at"
|
||||
} else {
|
||||
lastAccessedAtSelect = "NULL"
|
||||
}
|
||||
|
||||
query := `SELECT b.hash, s.id, s.last_accessed_at
|
||||
query := `SELECT b.hash, b.id, s.id, ` + lastAccessedAtSelect + `
|
||||
FROM blob_ b
|
||||
LEFT JOIN stream_blob sb ON b.id = sb.blob_id
|
||||
INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id)
|
||||
WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
||||
args := make([]interface{}, len(batch)+1)
|
||||
args[0] = true
|
||||
LEFT JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id)
|
||||
WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
||||
args := make([]interface{}, len(batch))
|
||||
for i := range batch {
|
||||
args[i+1] = batch[i]
|
||||
args[i] = batch[i]
|
||||
}
|
||||
|
||||
logQuery(query, args...)
|
||||
|
@ -214,13 +283,17 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
|||
defer closeRows(rows)
|
||||
|
||||
for rows.Next() {
|
||||
err := rows.Scan(&hash, &streamID, &lastAccessedAt)
|
||||
err := rows.Scan(&hash, &blobID, &streamID, &lastAccessedAt)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
exists[hash] = true
|
||||
if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) {
|
||||
needsTouch = append(needsTouch, streamID)
|
||||
if !lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline) {
|
||||
if s.TrackAccess == TrackAccessBlobs {
|
||||
needsTouch = append(needsTouch, blobID)
|
||||
} else if s.TrackAccess == TrackAccessStreams {
|
||||
needsTouch = append(needsTouch, streamID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -240,8 +313,14 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
|||
return exists, needsTouch, nil
|
||||
}
|
||||
|
||||
// Delete will remove the blob from the db
|
||||
// Delete will remove (or soft-delete) the blob from the db
|
||||
// NOTE: If SoftDelete is enabled, streams will never be deleted
|
||||
func (s *SQL) Delete(hash string) error {
|
||||
if s.SoftDelete {
|
||||
_, err := s.exec("UPDATE blob_ SET is_stored = 0 WHERE hash = ?", hash)
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
_, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
|
@ -251,6 +330,88 @@ func (s *SQL) Delete(hash string) error {
|
|||
return errors.Err(err)
|
||||
}
|
||||
|
||||
// GetHashRange gets the smallest and biggest hashes in the db
|
||||
func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) {
|
||||
if s.conn == nil {
|
||||
return nil, errors.Err("not connected")
|
||||
}
|
||||
|
||||
if s.TrackAccess != TrackAccessBlobs {
|
||||
return nil, errors.Err("blob access tracking is disabled")
|
||||
}
|
||||
|
||||
query := "SELECT hash from blob_ where is_stored = 1 order by last_accessed_at limit ?"
|
||||
logQuery(query, maxBlobs)
|
||||
|
||||
rows, err := s.conn.Query(query, maxBlobs)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
defer closeRows(rows)
|
||||
|
||||
blobs := make([]string, 0, maxBlobs)
|
||||
for rows.Next() {
|
||||
var hash string
|
||||
err := rows.Scan(&hash)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
blobs = append(blobs, hash)
|
||||
}
|
||||
|
||||
return blobs, nil
|
||||
}
|
||||
|
||||
// AllHashes writes all hashes from the db into the channel.
|
||||
// It does not close the channel when it finishes.
|
||||
//func (s *SQL) AllHashes(ch chan<- string) error {
|
||||
// if s.conn == nil {
|
||||
// return errors.Err("not connected")
|
||||
// }
|
||||
//
|
||||
// query := "SELECT hash from blob_"
|
||||
// if s.SoftDelete {
|
||||
// query += " where is_stored = 1"
|
||||
// }
|
||||
// logQuery(query)
|
||||
//
|
||||
// rows, err := s.conn.Query(query)
|
||||
// if err != nil {
|
||||
// return errors.Err(err)
|
||||
// }
|
||||
// defer closeRows(rows)
|
||||
//
|
||||
// for rows.Next() {
|
||||
// var hash string
|
||||
// err := rows.Scan(&hash)
|
||||
// if err != nil {
|
||||
// return errors.Err(err)
|
||||
// }
|
||||
// ch <- hash
|
||||
// // TODO: this needs testing
|
||||
// // TODO: need a way to cancel this early (e.g. in case of shutdown)
|
||||
// }
|
||||
//
|
||||
// close(ch)
|
||||
// return nil
|
||||
//}
|
||||
|
||||
func (s *SQL) Count() (int, error) {
|
||||
if s.conn == nil {
|
||||
return 0, errors.Err("not connected")
|
||||
}
|
||||
|
||||
query := "SELECT count(id) from blob_"
|
||||
if s.SoftDelete {
|
||||
query += " where is_stored = 1"
|
||||
}
|
||||
logQuery(query)
|
||||
|
||||
var count int
|
||||
err := s.conn.QueryRow(query).Scan(&count)
|
||||
return count, errors.Err(err)
|
||||
}
|
||||
|
||||
// Block will mark a blob as blocked
|
||||
func (s *SQL) Block(hash string) error {
|
||||
query := "INSERT IGNORE INTO blocked SET hash = ?"
|
||||
|
@ -528,8 +689,10 @@ CREATE TABLE blob_ (
|
|||
hash char(96) NOT NULL,
|
||||
is_stored TINYINT(1) NOT NULL DEFAULT 0,
|
||||
length bigint(20) unsigned DEFAULT NULL,
|
||||
last_accessed_at TIMESTAMP NULL DEFAULT NULL,
|
||||
PRIMARY KEY (id),
|
||||
UNIQUE KEY blob_hash_idx (hash)
|
||||
KEY `blob_last_accessed_idx` (`last_accessed_at`)
|
||||
👍 :+1:
|
||||
);
|
||||
|
||||
CREATE TABLE stream (
|
||||
|
@ -560,3 +723,47 @@ CREATE TABLE blocked (
|
|||
);
|
||||
|
||||
*/
|
||||
|
||||
//func (d *LiteDBBackedStore) selfClean() {
|
||||
// d.stopper.Add(1)
|
||||
// defer d.stopper.Done()
|
||||
// lastCleanup := time.Now()
|
||||
// const cleanupInterval = 10 * time.Second
|
||||
// for {
|
||||
// select {
|
||||
// case <-d.stopper.Ch():
|
||||
// log.Infoln("stopping self cleanup")
|
||||
// return
|
||||
// default:
|
||||
// time.Sleep(1 * time.Second)
|
||||
// }
|
||||
// if time.Since(lastCleanup) < cleanupInterval {
|
||||
// continue
|
||||
//
|
||||
// blobsCount, err := d.db.BlobsCount()
|
||||
// if err != nil {
|
||||
// log.Errorf(errors.FullTrace(err))
|
||||
// }
|
||||
// if blobsCount >= d.maxItems {
|
||||
// itemsToDelete := blobsCount / 100 * 10
|
||||
// blobs, err := d.db.GetLRUBlobs(itemsToDelete)
|
||||
// if err != nil {
|
||||
// log.Errorf(errors.FullTrace(err))
|
||||
// }
|
||||
// for _, hash := range blobs {
|
||||
// select {
|
||||
// case <-d.stopper.Ch():
|
||||
// return
|
||||
// default:
|
||||
//
|
||||
// }
|
||||
// err = d.Delete(hash)
|
||||
// if err != nil {
|
||||
// log.Errorf(errors.FullTrace(err))
|
||||
// }
|
||||
// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
|
||||
// }
|
||||
// }
|
||||
// lastCleanup = time.Now()
|
||||
// }
|
||||
//}
|
||||
|
|
4
go.mod
4
go.mod
|
@ -12,8 +12,7 @@ require (
|
|||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/go-sql-driver/mysql v1.4.1
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/google/btree v1.0.0 // indirect
|
||||
github.com/google/gops v0.3.7 // indirect
|
||||
github.com/google/gops v0.3.7
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
|
@ -33,7 +32,6 @@ require (
|
|||
github.com/spf13/afero v1.4.1 // indirect
|
||||
github.com/spf13/cast v1.3.0
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/pflag v1.0.3 // indirect
|
||||
github.com/spf13/viper v1.7.1 // indirect
|
||||
github.com/stretchr/testify v1.4.0
|
||||
github.com/volatiletech/null v8.0.0+incompatible
|
||||
|
|
|
@ -91,3 +91,8 @@ func (p *Store) PutSD(hash string, blob stream.Blob) error {
|
|||
func (p *Store) Delete(hash string) error {
|
||||
panic("http3Store cannot put or delete blobs")
|
||||
}
|
||||
|
||||
// Delete is not supported
|
||||
func (p *Store) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -66,3 +66,8 @@ func (p *Store) PutSD(hash string, blob stream.Blob) error {
|
|||
func (p *Store) Delete(hash string) error {
|
||||
panic("PeerStore cannot put or delete blobs")
|
||||
}
|
||||
|
||||
// Delete is not supported
|
||||
func (p *Store) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
|||
go func() {
|
||||
err = c.cache.Put(hash, blob)
|
||||
if err != nil {
|
||||
log.Errorf("error saving blob to underlying cache: %s", err.Error())
|
||||
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
|
||||
}
|
||||
}()
|
||||
return blob, nil
|
||||
|
@ -100,3 +100,10 @@ func (c *CachingStore) Delete(hash string) error {
|
|||
}
|
||||
return c.cache.Delete(hash)
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (c *CachingStore) Shutdown() {
|
||||
c.origin.Shutdown()
|
||||
c.cache.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -168,3 +168,7 @@ func (s *SlowBlobStore) Delete(hash string) error {
|
|||
time.Sleep(s.delay)
|
||||
return s.mem.Delete(hash)
|
||||
}
|
||||
|
||||
func (s *SlowBlobStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -103,3 +103,8 @@ func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error {
|
|||
func (c *CloudFrontROStore) Delete(_ string) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (c *CloudFrontROStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -48,3 +48,10 @@ func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error {
|
|||
func (c *CloudFrontRWStore) Delete(hash string) error {
|
||||
return c.s3.Delete(hash)
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (c *CloudFrontRWStore) Shutdown() {
|
||||
c.s3.Shutdown()
|
||||
c.cf.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -4,10 +4,9 @@ import (
|
|||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
@ -193,3 +192,9 @@ func (d *DBBackedStore) initBlocked() error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (d *DBBackedStore) Shutdown() {
|
||||
d.blobs.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -147,3 +147,8 @@ func (d *DiskStore) initOnce() error {
|
|||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (d *DiskStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("read %d files from disk", len(existing))
|
||||
logrus.Infof("read %d files from underlying store", len(existing))
|
||||
|
||||
added := 0
|
||||
for _, h := range existing {
|
||||
|
@ -121,3 +121,8 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (l *LFUDAStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -123,3 +123,8 @@ func (l *LRUStore) loadExisting(store lister, maxItems int) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (l *LRUStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -71,3 +71,8 @@ func (m *MemStore) Debug() map[string]stream.Blob {
|
|||
defer m.mu.RUnlock()
|
||||
return m.blobs
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (m *MemStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -13,3 +13,4 @@ func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
|
|||
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) Delete(_ string) error { return nil }
|
||||
func (n *NoopStore) Shutdown() { return }
|
||||
|
|
|
@ -158,3 +158,8 @@ func (s *S3Store) initOnce() error {
|
|||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (s *S3Store) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -65,3 +65,9 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
|||
return blob, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (s *singleflightStore) Shutdown() {
|
||||
s.BlobStore.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ type BlobStore interface {
|
|||
PutSD(hash string, blob stream.Blob) error
|
||||
// Delete the blob from the store.
|
||||
Delete(hash string) error
|
||||
// Shutdown the store gracefully
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// Blocklister is a store that supports blocking blobs to prevent their inclusion in the store.
|
||||
|
|
Loading…
Reference in a new issue
What's the benefit of just logging and re-panicking here and below? I would just return a wrapped error, i.e.