something like this

This commit is contained in:
Alex Grintsvayg 2020-12-23 17:08:13 -05:00 committed by Niko Storni
parent 03df751bc7
commit 6fb0620091
5 changed files with 304 additions and 45 deletions

View file

@ -8,9 +8,10 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/lite_db"
"github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/peer/http3" "github.com/lbryio/reflector.go/peer/http3"
@ -68,10 +69,11 @@ func init() {
func reflectorCmd(cmd *cobra.Command, args []string) { func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString()) log.Printf("reflector %s", meta.VersionString())
cleanerStopper := stop.New()
// the blocklist logic requires the db backed store to be the outer-most store // the blocklist logic requires the db backed store to be the outer-most store
underlyingStore := setupStore() underlyingStore := setupStore()
outerStore := wrapWithCache(underlyingStore) outerStore := wrapWithCache(underlyingStore, cleanerStopper)
if !disableUploads { if !disableUploads {
reflectorServer := reflector.NewServer(underlyingStore) reflectorServer := reflector.NewServer(underlyingStore)
@ -102,13 +104,14 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start() metricsServer.Start()
defer metricsServer.Shutdown() defer metricsServer.Shutdown()
defer underlyingStore.Shutdown()
defer outerStore.Shutdown() defer outerStore.Shutdown()
defer underlyingStore.Shutdown()
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan <-interruptChan
// deferred shutdowns happen now // deferred shutdowns happen now
cleanerStopper.StopAndWait()
} }
func setupStore() store.BlobStore { func setupStore() store.BlobStore {
@ -149,20 +152,20 @@ func setupStore() store.BlobStore {
} }
if useDB { if useDB {
db := new(db.SQL) dbInst := new(db.SQL)
db.TrackAccessTime = true dbInst.TrackAccess = db.TrackAccessStreams
err := db.Connect(globalConfig.DBConn) err := dbInst.Connect(globalConfig.DBConn)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
s = store.NewDBBackedStore(s, db, false) s = store.NewDBBackedStore(s, dbInst, false)
} }
return s return s
} }
func wrapWithCache(s store.BlobStore) store.BlobStore { func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore {
wrapped := s wrapped := s
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache) diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
@ -174,8 +177,9 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
localDb := new(lite_db.SQL)
localDb.TrackAccessTime = true localDb := new(db.SQL)
localDb.TrackAccess = db.TrackAccessBlobs
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -183,8 +187,10 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped = store.NewCachingStore( wrapped = store.NewCachingStore(
"reflector", "reflector",
wrapped, wrapped,
store.NewLiteDBBackedStore("hdd", store.NewDiskStore(diskCachePath, 2), localDb, int(realCacheSize)), store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false),
) )
go cleanOldestBlobs(int(realCacheSize), localDb, wrapped, cleanerStopper)
} }
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache) diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
@ -238,3 +244,48 @@ func diskCacheParams(diskParams string) (int, string) {
} }
return int(maxSize), path return int(maxSize), path
} }
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
const cleanupInterval = 10 * time.Second
for {
select {
case <-stopper.Ch():
log.Infoln("stopping self cleanup")
return
case <-time.After(cleanupInterval):
err := doClean(maxItems, db, store, stopper)
if err != nil {
log.Error(errors.FullTrace(err))
}
}
}
}
func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error {
blobsCount, err := db.Count()
if err != nil {
return err
}
if blobsCount >= maxItems {
itemsToDelete := blobsCount / 10
blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete)
if err != nil {
return err
}
for _, hash := range blobs {
select {
case <-stopper.Ch():
return nil
default:
}
err = store.Delete(hash)
if err != nil {
return err
}
}
}
}

265
db/db.go
View file

@ -30,11 +30,26 @@ type SdBlob struct {
StreamHash string `json:"stream_hash"` StreamHash string `json:"stream_hash"`
} }
type trackAccess int
const (
// Don't track accesses
TrackAccessNone trackAccess = iota
// Track accesses at the stream level
TrackAccessStreams
// Track accesses at the blob level
TrackAccessBlobs
)
// SQL implements the DB interface // SQL implements the DB interface
type SQL struct { type SQL struct {
conn *sql.DB conn *sql.DB
TrackAccessTime bool // Track the approx last time a blob or stream was accessed
TrackAccess trackAccess
// Instead of deleting a blob, marked it as not stored in the db
SoftDelete bool
} }
func logQuery(query string, args ...interface{}) { func logQuery(query string, args ...interface{}) {
@ -78,11 +93,19 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
return 0, errors.Err("length must be positive") return 0, errors.Err("length must be positive")
} }
args := []interface{}{hash, isStored, length} var (
blobID, err := s.exec( q string
"INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", args []interface{}
args...,
) )
if s.TrackAccess == TrackAccessBlobs {
args = []interface{}{hash, isStored, length, time.Now()}
q = "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at = VALUES(last_accessed_at)"
} else {
args = []interface{}{hash, isStored, length}
q = "INSERT INTO blob_ (hash, is_stored, length) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))"
}
blobID, err := s.exec(q, args...)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -95,17 +118,33 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
if blobID == 0 { if blobID == 0 {
return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing") return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing")
} }
if s.TrackAccess == TrackAccessBlobs {
err := s.touchBlobs([]uint64{uint64(blobID)})
if err != nil {
return 0, errors.Err(err)
}
}
} }
return blobID, nil return blobID, nil
} }
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
args := []interface{}{hash, sdBlobID, time.Now()} var (
streamID, err := s.exec( q string
"INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")", args []interface{}
args...,
) )
if s.TrackAccess == TrackAccessStreams {
args = []interface{}{hash, sdBlobID, time.Now()}
q = "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ")"
} else {
args = []interface{}{hash, sdBlobID}
q = "INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (" + qt.Qs(len(args)) + ")"
}
streamID, err := s.exec(q, args...)
if err != nil { if err != nil {
return 0, errors.Err(err) return 0, errors.Err(err)
} }
@ -119,8 +158,8 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing") return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
} }
if s.TrackAccessTime { if s.TrackAccess == TrackAccessStreams {
err := s.touch([]uint64{uint64(streamID)}) err := s.touchStreams([]uint64{uint64(streamID)})
if err != nil { if err != nil {
return 0, errors.Err(err) return 0, errors.Err(err)
} }
@ -140,12 +179,36 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
// HasBlobs checks if the database contains the set of blobs and returns a bool map. // HasBlobs checks if the database contains the set of blobs and returns a bool map.
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
exists, streamsNeedingTouch, err := s.hasBlobs(hashes) exists, idsNeedingTouch, err := s.hasBlobs(hashes)
s.touch(streamsNeedingTouch)
if s.TrackAccess == TrackAccessBlobs {
s.touchBlobs(idsNeedingTouch)
} else if s.TrackAccess == TrackAccessStreams {
s.touchStreams(idsNeedingTouch)
}
return exists, err return exists, err
} }
func (s *SQL) touch(streamIDs []uint64) error { func (s *SQL) touchBlobs(blobIDs []uint64) error {
if len(blobIDs) == 0 {
return nil
}
query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")"
args := make([]interface{}, len(blobIDs)+1)
args[0] = time.Now()
for i := range blobIDs {
args[i+1] = blobIDs[i]
}
startTime := time.Now()
_, err := s.exec(query, args...)
log.Debugf("touched %d blobs and took %s", len(blobIDs), time.Since(startTime))
return errors.Err(err)
}
func (s *SQL) touchStreams(streamIDs []uint64) error {
if len(streamIDs) == 0 { if len(streamIDs) == 0 {
return nil return nil
} }
@ -159,7 +222,7 @@ func (s *SQL) touch(streamIDs []uint64) error {
startTime := time.Now() startTime := time.Now()
_, err := s.exec(query, args...) _, err := s.exec(query, args...)
log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime)) log.Debugf("touched %d streams and took %s", len(streamIDs), time.Since(startTime))
return errors.Err(err) return errors.Err(err)
} }
@ -169,9 +232,9 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
} }
var ( var (
hash string hash string
streamID uint64 blobID, streamID uint64
lastAccessedAt null.Time lastAccessedAt null.Time
) )
var needsTouch []uint64 var needsTouch []uint64
@ -189,17 +252,23 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
batch := hashes[doneIndex:sliceEnd] batch := hashes[doneIndex:sliceEnd]
// TODO: this query doesn't work for SD blobs, which are not in the stream_blob table var lastAccessedAtSelect string
if s.TrackAccess == TrackAccessBlobs {
lastAccessedAtSelect = "b.last_accessed_at"
} else if s.TrackAccess == TrackAccessStreams {
lastAccessedAtSelect = "s.last_accessed_at"
} else {
lastAccessedAtSelect = "NULL"
}
query := `SELECT b.hash, s.id, s.last_accessed_at query := `SELECT b.hash, b.id, s.id, ` + lastAccessedAtSelect + `
FROM blob_ b FROM blob_ b
LEFT JOIN stream_blob sb ON b.id = sb.blob_id LEFT JOIN stream_blob sb ON b.id = sb.blob_id
INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id) LEFT JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id)
WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)`
args := make([]interface{}, len(batch)+1) args := make([]interface{}, len(batch))
args[0] = true
for i := range batch { for i := range batch {
args[i+1] = batch[i] args[i] = batch[i]
} }
logQuery(query, args...) logQuery(query, args...)
@ -214,13 +283,17 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
defer closeRows(rows) defer closeRows(rows)
for rows.Next() { for rows.Next() {
err := rows.Scan(&hash, &streamID, &lastAccessedAt) err := rows.Scan(&hash, &blobID, &streamID, &lastAccessedAt)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
exists[hash] = true exists[hash] = true
if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) { if !lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline) {
needsTouch = append(needsTouch, streamID) if s.TrackAccess == TrackAccessBlobs {
needsTouch = append(needsTouch, blobID)
} else if s.TrackAccess == TrackAccessStreams {
needsTouch = append(needsTouch, streamID)
}
} }
} }
@ -240,8 +313,14 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
return exists, needsTouch, nil return exists, needsTouch, nil
} }
// Delete will remove the blob from the db // Delete will remove (or soft-delete) the blob from the db
// NOTE: If SoftDelete is enabled, streams will never be deleted
func (s *SQL) Delete(hash string) error { func (s *SQL) Delete(hash string) error {
if s.SoftDelete {
_, err := s.exec("UPDATE blob_ SET is_stored = 0 WHERE hash = ?", hash)
return errors.Err(err)
}
_, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash) _, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -251,6 +330,88 @@ func (s *SQL) Delete(hash string) error {
return errors.Err(err) return errors.Err(err)
} }
// GetHashRange gets the smallest and biggest hashes in the db
func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) {
if s.conn == nil {
return nil, errors.Err("not connected")
}
if s.TrackAccess != TrackAccessBlobs {
return nil, errors.Err("blob access tracking is disabled")
}
query := "SELECT hash from blob_ where is_stored = 1 order by last_accessed_at limit ?"
logQuery(query, maxBlobs)
rows, err := s.conn.Query(query, maxBlobs)
if err != nil {
return nil, errors.Err(err)
}
defer closeRows(rows)
blobs := make([]string, 0, maxBlobs)
for rows.Next() {
var hash string
err := rows.Scan(&hash)
if err != nil {
return nil, errors.Err(err)
}
blobs = append(blobs, hash)
}
return blobs, nil
}
// AllHashes writes all hashes from the db into the channel.
// It does not close the channel when it finishes.
//func (s *SQL) AllHashes(ch chan<- string) error {
// if s.conn == nil {
// return errors.Err("not connected")
// }
//
// query := "SELECT hash from blob_"
// if s.SoftDelete {
// query += " where is_stored = 1"
// }
// logQuery(query)
//
// rows, err := s.conn.Query(query)
// if err != nil {
// return errors.Err(err)
// }
// defer closeRows(rows)
//
// for rows.Next() {
// var hash string
// err := rows.Scan(&hash)
// if err != nil {
// return errors.Err(err)
// }
// ch <- hash
// // TODO: this needs testing
// // TODO: need a way to cancel this early (e.g. in case of shutdown)
// }
//
// close(ch)
// return nil
//}
func (s *SQL) Count() (int, error) {
if s.conn == nil {
return 0, errors.Err("not connected")
}
query := "SELECT count(id) from blob_"
if s.SoftDelete {
query += " where is_stored = 1"
}
logQuery(query)
var count int
err := s.conn.QueryRow(query).Scan(&count)
return count, errors.Err(err)
}
// Block will mark a blob as blocked // Block will mark a blob as blocked
func (s *SQL) Block(hash string) error { func (s *SQL) Block(hash string) error {
query := "INSERT IGNORE INTO blocked SET hash = ?" query := "INSERT IGNORE INTO blocked SET hash = ?"
@ -528,8 +689,10 @@ CREATE TABLE blob_ (
hash char(96) NOT NULL, hash char(96) NOT NULL,
is_stored TINYINT(1) NOT NULL DEFAULT 0, is_stored TINYINT(1) NOT NULL DEFAULT 0,
length bigint(20) unsigned DEFAULT NULL, length bigint(20) unsigned DEFAULT NULL,
last_accessed_at TIMESTAMP NULL DEFAULT NULL,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY blob_hash_idx (hash) UNIQUE KEY blob_hash_idx (hash)
KEY `blob_last_accessed_idx` (`last_accessed_at`)
); );
CREATE TABLE stream ( CREATE TABLE stream (
@ -560,3 +723,47 @@ CREATE TABLE blocked (
); );
*/ */
//func (d *LiteDBBackedStore) selfClean() {
// d.stopper.Add(1)
// defer d.stopper.Done()
// lastCleanup := time.Now()
// const cleanupInterval = 10 * time.Second
// for {
// select {
// case <-d.stopper.Ch():
// log.Infoln("stopping self cleanup")
// return
// default:
// time.Sleep(1 * time.Second)
// }
// if time.Since(lastCleanup) < cleanupInterval {
// continue
//
// blobsCount, err := d.db.BlobsCount()
// if err != nil {
// log.Errorf(errors.FullTrace(err))
// }
// if blobsCount >= d.maxItems {
// itemsToDelete := blobsCount / 100 * 10
// blobs, err := d.db.GetLRUBlobs(itemsToDelete)
// if err != nil {
// log.Errorf(errors.FullTrace(err))
// }
// for _, hash := range blobs {
// select {
// case <-d.stopper.Ch():
// return
// default:
//
// }
// err = d.Delete(hash)
// if err != nil {
// log.Errorf(errors.FullTrace(err))
// }
// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
// }
// }
// lastCleanup = time.Now()
// }
//}

4
go.mod
View file

@ -12,8 +12,7 @@ require (
github.com/davecgh/go-spew v1.1.1 github.com/davecgh/go-spew v1.1.1
github.com/go-sql-driver/mysql v1.4.1 github.com/go-sql-driver/mysql v1.4.1
github.com/golang/protobuf v1.4.2 github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0 // indirect github.com/google/gops v0.3.7
github.com/google/gops v0.3.7 // indirect
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
@ -33,7 +32,6 @@ require (
github.com/spf13/afero v1.4.1 // indirect github.com/spf13/afero v1.4.1 // indirect
github.com/spf13/cast v1.3.0 github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.3 github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
github.com/spf13/viper v1.7.1 // indirect github.com/spf13/viper v1.7.1 // indirect
github.com/stretchr/testify v1.4.0 github.com/stretchr/testify v1.4.0
github.com/volatiletech/null v8.0.0+incompatible github.com/volatiletech/null v8.0.0+incompatible

View file

@ -168,3 +168,7 @@ func (s *SlowBlobStore) Delete(hash string) error {
time.Sleep(s.delay) time.Sleep(s.delay)
return s.mem.Delete(hash) return s.mem.Delete(hash)
} }
func (s *SlowBlobStore) Shutdown() {
return
}

View file

@ -4,10 +4,9 @@ import (
"encoding/json" "encoding/json"
"sync" "sync"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/db"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )