when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50
17 changed files with 589 additions and 3 deletions
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/lite_db"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
"github.com/lbryio/reflector.go/peer"
|
||||
"github.com/lbryio/reflector.go/peer/http3"
|
||||
|
@ -101,6 +102,8 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
|
|||
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
||||
metricsServer.Start()
|
||||
defer metricsServer.Shutdown()
|
||||
defer underlyingStore.Shutdown()
|
||||
defer outerStore.Shutdown()
|
||||
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
|
@ -171,10 +174,16 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
|||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
localDb := new(lite_db.SQL)
|
||||
localDb.TrackAccessTime = true
|
||||
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLFUDAStore("hdd", store.NewDiskStore(diskCachePath, 2), realCacheSize),
|
||||
store.NewLiteDBBackedStore("hdd", store.NewDiskStore(diskCachePath, 2), localDb, int(realCacheSize)),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
348
lite_db/db.go
Normal file
348
lite_db/db.go
Normal file
|
@ -0,0 +1,348 @@
|
|||
package lite_db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
qt "github.com/lbryio/lbry.go/v2/extras/query"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/volatiletech/null"
|
||||
)
|
||||
|
||||
// SdBlob is a special blob that contains information on the rest of the blobs in the stream
|
||||
type SdBlob struct {
|
||||
StreamName string `json:"stream_name"`
|
||||
Blobs []struct {
|
||||
Length int `json:"length"`
|
||||
BlobNum int `json:"blob_num"`
|
||||
BlobHash string `json:"blob_hash,omitempty"`
|
||||
IV string `json:"iv"`
|
||||
} `json:"blobs"`
|
||||
StreamType string `json:"stream_type"`
|
||||
Key string `json:"key"`
|
||||
SuggestedFileName string `json:"suggested_file_name"`
|
||||
StreamHash string `json:"stream_hash"`
|
||||
}
|
||||
|
||||
// SQL implements the DB interface
|
||||
type SQL struct {
|
||||
conn *sql.DB
|
||||
|
||||
TrackAccessTime bool
|
||||
}
|
||||
|
||||
func logQuery(query string, args ...interface{}) {
|
||||
s, err := qt.InterpolateParams(query, args...)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
} else {
|
||||
log.Debugln(s)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect will create a connection to the database
|
||||
func (s *SQL) Connect(dsn string) error {
|
||||
var err error
|
||||
// interpolateParams is necessary. otherwise uploading a stream with thousands of blobs
|
||||
// will hit MySQL's max_prepared_stmt_count limit because the prepared statements are all
|
||||
// opened inside a transaction. closing them manually doesn't seem to help
|
||||
dsn += "?parseTime=1&collation=utf8mb4_unicode_ci&interpolateParams=1"
|
||||
s.conn, err = sql.Open("mysql", dsn)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
s.conn.SetMaxIdleConns(12)
|
||||
|
||||
return errors.Err(s.conn.Ping())
|
||||
}
|
||||
|
||||
// AddBlob adds a blob to the database.
|
||||
func (s *SQL) AddBlob(hash string, length int) error {
|
||||
if s.conn == nil {
|
||||
return errors.Err("not connected")
|
||||
}
|
||||
|
||||
_, err := s.insertBlob(hash, length)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *SQL) insertBlob(hash string, length int) (int64, error) {
|
||||
if length <= 0 {
|
||||
return 0, errors.Err("length must be positive")
|
||||
}
|
||||
const isStored = true
|
||||
now := time.Now()
|
||||
args := []interface{}{hash, isStored, length, now}
|
||||
blobID, err := s.exec(
|
||||
"INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at=VALUES(last_accessed_at)",
|
||||
args...,
|
||||
)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if blobID == 0 {
|
||||
err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID)
|
||||
if err != nil {
|
||||
return 0, errors.Err(err)
|
||||
}
|
||||
if blobID == 0 {
|
||||
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
|
||||
}
|
||||
}
|
||||
|
||||
return blobID, nil
|
||||
}
|
||||
|
||||
// HasBlob checks if the database contains the blob information.
|
||||
func (s *SQL) HasBlob(hash string) (bool, error) {
|
||||
exists, err := s.HasBlobs([]string{hash})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return exists[hash], nil
|
||||
}
|
||||
|
||||
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
||||
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
||||
exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
|
||||
s.touch(streamsNeedingTouch)
|
||||
return exists, err
|
||||
}
|
||||
|
||||
func (s *SQL) touch(blobIDs []uint64) error {
|
||||
if len(blobIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")"
|
||||
args := make([]interface{}, len(blobIDs)+1)
|
||||
args[0] = time.Now()
|
||||
for i := range blobIDs {
|
||||
args[i+1] = blobIDs[i]
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
_, err := s.exec(query, args...)
|
||||
log.Debugf("blobs access query touched %d blobs and took %s", len(blobIDs), time.Since(startTime))
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
|
||||
if s.conn == nil {
|
||||
return nil, nil, errors.Err("not connected")
|
||||
}
|
||||
|
||||
var (
|
||||
hash string
|
||||
blobID uint64
|
||||
lastAccessedAt null.Time
|
||||
)
|
||||
|
||||
var needsTouch []uint64
|
||||
exists := make(map[string]bool)
|
||||
|
||||
touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time
|
||||
maxBatchSize := 10000
|
||||
doneIndex := 0
|
||||
|
||||
for len(hashes) > doneIndex {
|
||||
sliceEnd := doneIndex + maxBatchSize
|
||||
if sliceEnd > len(hashes) {
|
||||
sliceEnd = len(hashes)
|
||||
}
|
||||
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
|
||||
batch := hashes[doneIndex:sliceEnd]
|
||||
|
||||
// TODO: this query doesn't work for SD blobs, which are not in the stream_blob table
|
||||
|
||||
query := `SELECT hash, id, last_accessed_at
|
||||
FROM blob_
|
||||
WHERE is_stored = ? and hash IN (` + qt.Qs(len(batch)) + `)`
|
||||
args := make([]interface{}, len(batch)+1)
|
||||
args[0] = true
|
||||
for i := range batch {
|
||||
args[i+1] = batch[i]
|
||||
}
|
||||
|
||||
logQuery(query, args...)
|
||||
|
||||
err := func() error {
|
||||
startTime := time.Now()
|
||||
rows, err := s.conn.Query(query, args...)
|
||||
log.Debugf("hashes query took %s", time.Since(startTime))
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
defer closeRows(rows)
|
||||
|
||||
for rows.Next() {
|
||||
err := rows.Scan(&hash, &blobID, &lastAccessedAt)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
exists[hash] = true
|
||||
if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) {
|
||||
needsTouch = append(needsTouch, blobID)
|
||||
}
|
||||
}
|
||||
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
doneIndex += len(batch)
|
||||
return nil
|
||||
}()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return exists, needsTouch, nil
|
||||
}
|
||||
|
||||
// Delete will remove the blob from the db
|
||||
func (s *SQL) Delete(hash string) error {
|
||||
_, err := s.exec("UPDATE blob_ set is_stored = ? WHERE hash = ?", 0, hash)
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
// AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored",
|
||||
// but they are tracked so reflector knows what it is missing.
|
||||
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int) error {
|
||||
if s.conn == nil {
|
||||
return errors.Err("not connected")
|
||||
}
|
||||
|
||||
_, err := s.insertBlob(sdHash, sdBlobLength)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetHashRange gets the smallest and biggest hashes in the db
|
||||
func (s *SQL) GetLRUBlobs(maxBlobs int) ([]string, error) {
|
||||
if s.conn == nil {
|
||||
return nil, errors.Err("not connected")
|
||||
}
|
||||
|
||||
query := "SELECT hash from blob_ where is_stored = ? order by last_accessed_at limit ?"
|
||||
const isStored = true
|
||||
logQuery(query, isStored, maxBlobs)
|
||||
rows, err := s.conn.Query(query, isStored, maxBlobs)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
defer closeRows(rows)
|
||||
blobs := make([]string, 0, maxBlobs)
|
||||
for rows.Next() {
|
||||
var hash string
|
||||
err := rows.Scan(&hash)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
blobs = append(blobs, hash)
|
||||
}
|
||||
return blobs, nil
|
||||
}
|
||||
|
||||
func (s *SQL) AllBlobs() ([]string, error) {
|
||||
if s.conn == nil {
|
||||
return nil, errors.Err("not connected")
|
||||
}
|
||||
|
||||
query := "SELECT hash from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense?
|
||||
const isStored = true
|
||||
logQuery(query, isStored)
|
||||
rows, err := s.conn.Query(query, isStored)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
defer closeRows(rows)
|
||||
totalBlobs, err := s.BlobsCount()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blobs := make([]string, 0, totalBlobs)
|
||||
for rows.Next() {
|
||||
var hash string
|
||||
err := rows.Scan(&hash)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
blobs = append(blobs, hash)
|
||||
}
|
||||
return blobs, nil
|
||||
}
|
||||
|
||||
func (s *SQL) BlobsCount() (int, error) {
|
||||
if s.conn == nil {
|
||||
return 0, errors.Err("not connected")
|
||||
}
|
||||
|
||||
query := "SELECT count(id) from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense?
|
||||
const isStored = true
|
||||
logQuery(query, isStored)
|
||||
var count int
|
||||
err := s.conn.QueryRow(query, isStored).Scan(&count)
|
||||
return count, errors.Err(err)
|
||||
}
|
||||
|
||||
func closeRows(rows *sql.Rows) {
|
||||
if rows != nil {
|
||||
err := rows.Close()
|
||||
if err != nil {
|
||||
log.Error("error closing rows: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SQL) exec(query string, args ...interface{}) (int64, error) {
|
||||
logQuery(query, args...)
|
||||
attempt, maxAttempts := 0, 3
|
||||
Retry:
|
||||
attempt++
|
||||
result, err := s.conn.Exec(query, args...)
|
||||
if isLockTimeoutError(err) {
|
||||
if attempt <= maxAttempts {
|
||||
//Error 1205: Lock wait timeout exceeded; try restarting transaction
|
||||
goto Retry
|
||||
}
|
||||
err = errors.Prefix("Lock timeout for query "+query, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.Err(err)
|
||||
}
|
||||
|
||||
lastID, err := result.LastInsertId()
|
||||
return lastID, errors.Err(err)
|
||||
}
|
||||
|
||||
func isLockTimeoutError(err error) bool {
|
||||
e, ok := err.(*mysql.MySQLError)
|
||||
return ok && e != nil && e.Number == 1205
|
||||
}
|
||||
|
||||
/* SQL schema
|
||||
|
||||
in prod make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space.
|
||||
|
||||
CREATE TABLE `blob_` (
|
||||
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
|
||||
`hash` char(96) NOT NULL,
|
||||
`is_stored` tinyint(1) NOT NULL DEFAULT '0',
|
||||
`length` bigint unsigned DEFAULT NULL,
|
||||
`last_accessed_at` datetime DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `id` (`id`),
|
||||
UNIQUE KEY `blob_hash_idx` (`hash`),
|
||||
KEY `blob_last_accessed_idx` (`last_accessed_at`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=latin1
|
||||
|
||||
*/
|
|
@ -91,3 +91,8 @@ func (p *Store) PutSD(hash string, blob stream.Blob) error {
|
|||
func (p *Store) Delete(hash string) error {
|
||||
panic("http3Store cannot put or delete blobs")
|
||||
}
|
||||
|
||||
// Delete is not supported
|
||||
func (p *Store) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -66,3 +66,8 @@ func (p *Store) PutSD(hash string, blob stream.Blob) error {
|
|||
func (p *Store) Delete(hash string) error {
|
||||
panic("PeerStore cannot put or delete blobs")
|
||||
}
|
||||
|
||||
// Delete is not supported
|
||||
func (p *Store) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
|||
go func() {
|
||||
err = c.cache.Put(hash, blob)
|
||||
if err != nil {
|
||||
log.Errorf("error saving blob to underlying cache: %s", err.Error())
|
||||
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
|
||||
}
|
||||
}()
|
||||
return blob, nil
|
||||
|
@ -100,3 +100,10 @@ func (c *CachingStore) Delete(hash string) error {
|
|||
}
|
||||
return c.cache.Delete(hash)
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (c *CachingStore) Shutdown() {
|
||||
c.origin.Shutdown()
|
||||
c.cache.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -103,3 +103,8 @@ func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error {
|
|||
func (c *CloudFrontROStore) Delete(_ string) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (c *CloudFrontROStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -48,3 +48,10 @@ func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error {
|
|||
func (c *CloudFrontRWStore) Delete(hash string) error {
|
||||
return c.s3.Delete(hash)
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (c *CloudFrontRWStore) Shutdown() {
|
||||
c.s3.Shutdown()
|
||||
c.cf.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -193,3 +193,9 @@ func (d *DBBackedStore) initBlocked() error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (d *DBBackedStore) Shutdown() {
|
||||
d.blobs.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -147,3 +147,8 @@ func (d *DiskStore) initOnce() error {
|
|||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (d *DiskStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Infof("read %d files from disk", len(existing))
|
||||
logrus.Infof("read %d files from underlying store", len(existing))
|
||||
|
||||
added := 0
|
||||
for _, h := range existing {
|
||||
|
@ -121,3 +121,8 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (l *LFUDAStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
160
store/litedbbacked.go
Normal file
160
store/litedbbacked.go
Normal file
|
@ -0,0 +1,160 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/lite_db"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store.
|
||||
type LiteDBBackedStore struct {
|
||||
blobs BlobStore
|
||||
db *lite_db.SQL
|
||||
maxItems int
|
||||
stopper *stop.Stopper
|
||||
component string
|
||||
}
|
||||
|
||||
// NewDBBackedStore returns an initialized store pointer.
|
||||
func NewLiteDBBackedStore(component string, blobs BlobStore, db *lite_db.SQL, maxItems int) *LiteDBBackedStore {
|
||||
instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component}
|
||||
go func() {
|
||||
instance.selfClean()
|
||||
}()
|
||||
return instance
|
||||
}
|
||||
|
||||
const nameLiteDBBacked = "lite-db-backed"
|
||||
|
||||
// Name is the cache type name
|
||||
func (d *LiteDBBackedStore) Name() string { return nameDBBacked }
|
||||
|
||||
// Has returns true if the blob is in the store
|
||||
func (d *LiteDBBackedStore) Has(hash string) (bool, error) {
|
||||
return d.db.HasBlob(hash)
|
||||
}
|
||||
|
||||
// Get gets the blob
|
||||
func (d *LiteDBBackedStore) Get(hash string) (stream.Blob, error) {
|
||||
has, err := d.db.HasBlob(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !has {
|
||||
return nil, ErrBlobNotFound
|
||||
}
|
||||
b, err := d.blobs.Get(hash)
|
||||
if err != nil && errors.Is(err, ErrBlobNotFound) {
|
||||
e2 := d.db.Delete(hash)
|
||||
if e2 != nil {
|
||||
log.Errorf("error while deleting blob from db: %s", errors.FullTrace(e2))
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
return d.blobs.Get(hash)
|
||||
}
|
||||
|
||||
// Put stores the blob in the S3 store and stores the blob information in the DB.
|
||||
func (d *LiteDBBackedStore) Put(hash string, blob stream.Blob) error {
|
||||
err := d.blobs.Put(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.db.AddBlob(hash, len(blob))
|
||||
}
|
||||
|
||||
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
|
||||
// there is an error storing the blob information in the DB.
|
||||
func (d *LiteDBBackedStore) PutSD(hash string, blob stream.Blob) error {
|
||||
var blobContents db.SdBlob
|
||||
err := json.Unmarshal(blob, &blobContents)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
if blobContents.StreamHash == "" {
|
||||
return errors.Err("sd blob is missing stream hash")
|
||||
}
|
||||
|
||||
err = d.blobs.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.db.AddSDBlob(hash, len(blob))
|
||||
}
|
||||
|
||||
func (d *LiteDBBackedStore) Delete(hash string) error {
|
||||
err := d.blobs.Delete(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.db.Delete(hash)
|
||||
}
|
||||
|
||||
// list returns the hashes of blobs that already exist in the database
|
||||
func (d *LiteDBBackedStore) list() ([]string, error) {
|
||||
blobs, err := d.db.AllBlobs()
|
||||
return blobs, err
|
||||
}
|
||||
|
||||
func (d *LiteDBBackedStore) selfClean() {
|
||||
d.stopper.Add(1)
|
||||
defer d.stopper.Done()
|
||||
lastCleanup := time.Now()
|
||||
const cleanupInterval = 10 * time.Second
|
||||
for {
|
||||
select {
|
||||
case <-d.stopper.Ch():
|
||||
log.Infoln("stopping self cleanup")
|
||||
return
|
||||
default:
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
if time.Since(lastCleanup) < cleanupInterval {
|
||||
continue
|
||||
}
|
||||
blobsCount, err := d.db.BlobsCount()
|
||||
if err != nil {
|
||||
log.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
if blobsCount >= d.maxItems {
|
||||
itemsToDelete := blobsCount / 100 * 10
|
||||
blobs, err := d.db.GetLRUBlobs(itemsToDelete)
|
||||
if err != nil {
|
||||
log.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
for _, hash := range blobs {
|
||||
select {
|
||||
case <-d.stopper.Ch():
|
||||
return
|
||||
default:
|
||||
|
||||
}
|
||||
err = d.Delete(hash)
|
||||
if err != nil {
|
||||
log.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
|
||||
}
|
||||
}
|
||||
lastCleanup = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (d *LiteDBBackedStore) Shutdown() {
|
||||
d.stopper.StopAndWait()
|
||||
return
|
||||
}
|
|
@ -123,3 +123,8 @@ func (l *LRUStore) loadExisting(store lister, maxItems int) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (l *LRUStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -71,3 +71,8 @@ func (m *MemStore) Debug() map[string]stream.Blob {
|
|||
defer m.mu.RUnlock()
|
||||
return m.blobs
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (m *MemStore) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -13,3 +13,4 @@ func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
|
|||
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) Delete(_ string) error { return nil }
|
||||
func (n *NoopStore) Shutdown() { return }
|
||||
|
|
|
@ -158,3 +158,8 @@ func (s *S3Store) initOnce() error {
|
|||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (s *S3Store) Shutdown() {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -65,3 +65,9 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
|||
return blob, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (s *singleflightStore) Shutdown() {
|
||||
s.BlobStore.Shutdown()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ type BlobStore interface {
|
|||
PutSD(hash string, blob stream.Blob) error
|
||||
// Delete the blob from the store.
|
||||
Delete(hash string) error
|
||||
// Shutdown the store gracefully
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// Blocklister is a store that supports blocking blobs to prevent their inclusion in the store.
|
||||
|
|
Loading…
Reference in a new issue