diff --git a/cmd/reflector.go b/cmd/reflector.go index 58c3f6f..3bb6b4e 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -10,6 +10,7 @@ import ( "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/lite_db" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer/http3" @@ -101,6 +102,8 @@ func reflectorCmd(cmd *cobra.Command, args []string) { metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() defer metricsServer.Shutdown() + defer underlyingStore.Shutdown() + defer outerStore.Shutdown() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) @@ -171,10 +174,16 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { if err != nil { log.Fatal(err) } + localDb := new(lite_db.SQL) + localDb.TrackAccessTime = true + err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") + if err != nil { + log.Fatal(err) + } wrapped = store.NewCachingStore( "reflector", wrapped, - store.NewLFUDAStore("hdd", store.NewDiskStore(diskCachePath, 2), realCacheSize), + store.NewLiteDBBackedStore("hdd", store.NewDiskStore(diskCachePath, 2), localDb, int(realCacheSize)), ) } diff --git a/lite_db/db.go b/lite_db/db.go new file mode 100644 index 0000000..6be9837 --- /dev/null +++ b/lite_db/db.go @@ -0,0 +1,348 @@ +package lite_db + +import ( + "database/sql" + "time" + + "github.com/lbryio/lbry.go/v2/extras/errors" + qt "github.com/lbryio/lbry.go/v2/extras/query" + + "github.com/go-sql-driver/mysql" + _ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used + log "github.com/sirupsen/logrus" + "github.com/volatiletech/null" +) + +// SdBlob is a special blob that contains information on the rest of the blobs in the stream +type SdBlob struct { + StreamName string `json:"stream_name"` + Blobs []struct { + Length int `json:"length"` + BlobNum int `json:"blob_num"` + BlobHash string `json:"blob_hash,omitempty"` + IV string `json:"iv"` + } `json:"blobs"` + StreamType string `json:"stream_type"` + Key string `json:"key"` + SuggestedFileName string `json:"suggested_file_name"` + StreamHash string `json:"stream_hash"` +} + +// SQL implements the DB interface +type SQL struct { + conn *sql.DB + + TrackAccessTime bool +} + +func logQuery(query string, args ...interface{}) { + s, err := qt.InterpolateParams(query, args...) + if err != nil { + log.Errorln(err) + } else { + log.Debugln(s) + } +} + +// Connect will create a connection to the database +func (s *SQL) Connect(dsn string) error { + var err error + // interpolateParams is necessary. otherwise uploading a stream with thousands of blobs + // will hit MySQL's max_prepared_stmt_count limit because the prepared statements are all + // opened inside a transaction. closing them manually doesn't seem to help + dsn += "?parseTime=1&collation=utf8mb4_unicode_ci&interpolateParams=1" + s.conn, err = sql.Open("mysql", dsn) + if err != nil { + return errors.Err(err) + } + + s.conn.SetMaxIdleConns(12) + + return errors.Err(s.conn.Ping()) +} + +// AddBlob adds a blob to the database. +func (s *SQL) AddBlob(hash string, length int) error { + if s.conn == nil { + return errors.Err("not connected") + } + + _, err := s.insertBlob(hash, length) + return err +} + +func (s *SQL) insertBlob(hash string, length int) (int64, error) { + if length <= 0 { + return 0, errors.Err("length must be positive") + } + const isStored = true + now := time.Now() + args := []interface{}{hash, isStored, length, now} + blobID, err := s.exec( + "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at=VALUES(last_accessed_at)", + args..., + ) + if err != nil { + return 0, err + } + + if blobID == 0 { + err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID) + if err != nil { + return 0, errors.Err(err) + } + if blobID == 0 { + return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing") + } + } + + return blobID, nil +} + +// HasBlob checks if the database contains the blob information. +func (s *SQL) HasBlob(hash string) (bool, error) { + exists, err := s.HasBlobs([]string{hash}) + if err != nil { + return false, err + } + return exists[hash], nil +} + +// HasBlobs checks if the database contains the set of blobs and returns a bool map. +func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { + exists, streamsNeedingTouch, err := s.hasBlobs(hashes) + s.touch(streamsNeedingTouch) + return exists, err +} + +func (s *SQL) touch(blobIDs []uint64) error { + if len(blobIDs) == 0 { + return nil + } + + query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")" + args := make([]interface{}, len(blobIDs)+1) + args[0] = time.Now() + for i := range blobIDs { + args[i+1] = blobIDs[i] + } + + startTime := time.Now() + _, err := s.exec(query, args...) + log.Debugf("blobs access query touched %d blobs and took %s", len(blobIDs), time.Since(startTime)) + return errors.Err(err) +} + +func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { + if s.conn == nil { + return nil, nil, errors.Err("not connected") + } + + var ( + hash string + blobID uint64 + lastAccessedAt null.Time + ) + + var needsTouch []uint64 + exists := make(map[string]bool) + + touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time + maxBatchSize := 10000 + doneIndex := 0 + + for len(hashes) > doneIndex { + sliceEnd := doneIndex + maxBatchSize + if sliceEnd > len(hashes) { + sliceEnd = len(hashes) + } + log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) + batch := hashes[doneIndex:sliceEnd] + + // TODO: this query doesn't work for SD blobs, which are not in the stream_blob table + + query := `SELECT hash, id, last_accessed_at +FROM blob_ +WHERE is_stored = ? and hash IN (` + qt.Qs(len(batch)) + `)` + args := make([]interface{}, len(batch)+1) + args[0] = true + for i := range batch { + args[i+1] = batch[i] + } + + logQuery(query, args...) + + err := func() error { + startTime := time.Now() + rows, err := s.conn.Query(query, args...) + log.Debugf("hashes query took %s", time.Since(startTime)) + if err != nil { + return errors.Err(err) + } + defer closeRows(rows) + + for rows.Next() { + err := rows.Scan(&hash, &blobID, &lastAccessedAt) + if err != nil { + return errors.Err(err) + } + exists[hash] = true + if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) { + needsTouch = append(needsTouch, blobID) + } + } + + err = rows.Err() + if err != nil { + return errors.Err(err) + } + + doneIndex += len(batch) + return nil + }() + if err != nil { + return nil, nil, err + } + } + + return exists, needsTouch, nil +} + +// Delete will remove the blob from the db +func (s *SQL) Delete(hash string) error { + _, err := s.exec("UPDATE blob_ set is_stored = ? WHERE hash = ?", 0, hash) + return errors.Err(err) +} + +// AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored", +// but they are tracked so reflector knows what it is missing. +func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int) error { + if s.conn == nil { + return errors.Err("not connected") + } + + _, err := s.insertBlob(sdHash, sdBlobLength) + return err +} + +// GetHashRange gets the smallest and biggest hashes in the db +func (s *SQL) GetLRUBlobs(maxBlobs int) ([]string, error) { + if s.conn == nil { + return nil, errors.Err("not connected") + } + + query := "SELECT hash from blob_ where is_stored = ? order by last_accessed_at limit ?" + const isStored = true + logQuery(query, isStored, maxBlobs) + rows, err := s.conn.Query(query, isStored, maxBlobs) + if err != nil { + return nil, errors.Err(err) + } + defer closeRows(rows) + blobs := make([]string, 0, maxBlobs) + for rows.Next() { + var hash string + err := rows.Scan(&hash) + if err != nil { + return nil, errors.Err(err) + } + blobs = append(blobs, hash) + } + return blobs, nil +} + +func (s *SQL) AllBlobs() ([]string, error) { + if s.conn == nil { + return nil, errors.Err("not connected") + } + + query := "SELECT hash from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense? + const isStored = true + logQuery(query, isStored) + rows, err := s.conn.Query(query, isStored) + if err != nil { + return nil, errors.Err(err) + } + defer closeRows(rows) + totalBlobs, err := s.BlobsCount() + if err != nil { + return nil, err + } + blobs := make([]string, 0, totalBlobs) + for rows.Next() { + var hash string + err := rows.Scan(&hash) + if err != nil { + return nil, errors.Err(err) + } + blobs = append(blobs, hash) + } + return blobs, nil +} + +func (s *SQL) BlobsCount() (int, error) { + if s.conn == nil { + return 0, errors.Err("not connected") + } + + query := "SELECT count(id) from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense? + const isStored = true + logQuery(query, isStored) + var count int + err := s.conn.QueryRow(query, isStored).Scan(&count) + return count, errors.Err(err) +} + +func closeRows(rows *sql.Rows) { + if rows != nil { + err := rows.Close() + if err != nil { + log.Error("error closing rows: ", err) + } + } +} + +func (s *SQL) exec(query string, args ...interface{}) (int64, error) { + logQuery(query, args...) + attempt, maxAttempts := 0, 3 +Retry: + attempt++ + result, err := s.conn.Exec(query, args...) + if isLockTimeoutError(err) { + if attempt <= maxAttempts { + //Error 1205: Lock wait timeout exceeded; try restarting transaction + goto Retry + } + err = errors.Prefix("Lock timeout for query "+query, err) + } + + if err != nil { + return 0, errors.Err(err) + } + + lastID, err := result.LastInsertId() + return lastID, errors.Err(err) +} + +func isLockTimeoutError(err error) bool { + e, ok := err.(*mysql.MySQLError) + return ok && e != nil && e.Number == 1205 +} + +/* SQL schema + +in prod make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space. + +CREATE TABLE `blob_` ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT, + `hash` char(96) NOT NULL, + `is_stored` tinyint(1) NOT NULL DEFAULT '0', + `length` bigint unsigned DEFAULT NULL, + `last_accessed_at` datetime DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `id` (`id`), + UNIQUE KEY `blob_hash_idx` (`hash`), + KEY `blob_last_accessed_idx` (`last_accessed_at`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1 + +*/ diff --git a/peer/http3/store.go b/peer/http3/store.go index 76a58c1..b681443 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -91,3 +91,8 @@ func (p *Store) PutSD(hash string, blob stream.Blob) error { func (p *Store) Delete(hash string) error { panic("http3Store cannot put or delete blobs") } + +// Delete is not supported +func (p *Store) Shutdown() { + return +} diff --git a/peer/store.go b/peer/store.go index b8abedd..3ef9622 100644 --- a/peer/store.go +++ b/peer/store.go @@ -66,3 +66,8 @@ func (p *Store) PutSD(hash string, blob stream.Blob) error { func (p *Store) Delete(hash string) error { panic("PeerStore cannot put or delete blobs") } + +// Delete is not supported +func (p *Store) Shutdown() { + return +} diff --git a/store/caching.go b/store/caching.go index c8e7d45..53b692c 100644 --- a/store/caching.go +++ b/store/caching.go @@ -68,7 +68,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) { go func() { err = c.cache.Put(hash, blob) if err != nil { - log.Errorf("error saving blob to underlying cache: %s", err.Error()) + log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err)) } }() return blob, nil @@ -100,3 +100,10 @@ func (c *CachingStore) Delete(hash string) error { } return c.cache.Delete(hash) } + +// Shutdown shuts down the store gracefully +func (c *CachingStore) Shutdown() { + c.origin.Shutdown() + c.cache.Shutdown() + return +} diff --git a/store/cloudfront_ro.go b/store/cloudfront_ro.go index a914285..3dcffc0 100644 --- a/store/cloudfront_ro.go +++ b/store/cloudfront_ro.go @@ -103,3 +103,8 @@ func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error { func (c *CloudFrontROStore) Delete(_ string) error { panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") } + +// Shutdown shuts down the store gracefully +func (c *CloudFrontROStore) Shutdown() { + return +} diff --git a/store/cloudfront_rw.go b/store/cloudfront_rw.go index ee771da..6de64af 100644 --- a/store/cloudfront_rw.go +++ b/store/cloudfront_rw.go @@ -48,3 +48,10 @@ func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error { func (c *CloudFrontRWStore) Delete(hash string) error { return c.s3.Delete(hash) } + +// Shutdown shuts down the store gracefully +func (c *CloudFrontRWStore) Shutdown() { + c.s3.Shutdown() + c.cf.Shutdown() + return +} diff --git a/store/dbbacked.go b/store/dbbacked.go index 25cc446..0ebaaed 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -193,3 +193,9 @@ func (d *DBBackedStore) initBlocked() error { return err } + +// Shutdown shuts down the store gracefully +func (d *DBBackedStore) Shutdown() { + d.blobs.Shutdown() + return +} diff --git a/store/disk.go b/store/disk.go index 2234971..03cc393 100644 --- a/store/disk.go +++ b/store/disk.go @@ -147,3 +147,8 @@ func (d *DiskStore) initOnce() error { d.initialized = true return nil } + +// Shutdown shuts down the store gracefully +func (d *DiskStore) Shutdown() { + return +} diff --git a/store/lfuda.go b/store/lfuda.go index cfdb155..0eb7296 100644 --- a/store/lfuda.go +++ b/store/lfuda.go @@ -109,7 +109,7 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error { if err != nil { return err } - logrus.Infof("read %d files from disk", len(existing)) + logrus.Infof("read %d files from underlying store", len(existing)) added := 0 for _, h := range existing { @@ -121,3 +121,8 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error { } return nil } + +// Shutdown shuts down the store gracefully +func (l *LFUDAStore) Shutdown() { + return +} diff --git a/store/litedbbacked.go b/store/litedbbacked.go new file mode 100644 index 0000000..93790d6 --- /dev/null +++ b/store/litedbbacked.go @@ -0,0 +1,160 @@ +package store + +import ( + "encoding/json" + "time" + + "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/lite_db" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + log "github.com/sirupsen/logrus" +) + +// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store. +type LiteDBBackedStore struct { + blobs BlobStore + db *lite_db.SQL + maxItems int + stopper *stop.Stopper + component string +} + +// NewDBBackedStore returns an initialized store pointer. +func NewLiteDBBackedStore(component string, blobs BlobStore, db *lite_db.SQL, maxItems int) *LiteDBBackedStore { + instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component} + go func() { + instance.selfClean() + }() + return instance +} + +const nameLiteDBBacked = "lite-db-backed" + +// Name is the cache type name +func (d *LiteDBBackedStore) Name() string { return nameDBBacked } + +// Has returns true if the blob is in the store +func (d *LiteDBBackedStore) Has(hash string) (bool, error) { + return d.db.HasBlob(hash) +} + +// Get gets the blob +func (d *LiteDBBackedStore) Get(hash string) (stream.Blob, error) { + has, err := d.db.HasBlob(hash) + if err != nil { + return nil, err + } + if !has { + return nil, ErrBlobNotFound + } + b, err := d.blobs.Get(hash) + if err != nil && errors.Is(err, ErrBlobNotFound) { + e2 := d.db.Delete(hash) + if e2 != nil { + log.Errorf("error while deleting blob from db: %s", errors.FullTrace(e2)) + } + return b, err + } + + return d.blobs.Get(hash) +} + +// Put stores the blob in the S3 store and stores the blob information in the DB. +func (d *LiteDBBackedStore) Put(hash string, blob stream.Blob) error { + err := d.blobs.Put(hash, blob) + if err != nil { + return err + } + + return d.db.AddBlob(hash, len(blob)) +} + +// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if +// there is an error storing the blob information in the DB. +func (d *LiteDBBackedStore) PutSD(hash string, blob stream.Blob) error { + var blobContents db.SdBlob + err := json.Unmarshal(blob, &blobContents) + if err != nil { + return errors.Err(err) + } + if blobContents.StreamHash == "" { + return errors.Err("sd blob is missing stream hash") + } + + err = d.blobs.PutSD(hash, blob) + if err != nil { + return err + } + + return d.db.AddSDBlob(hash, len(blob)) +} + +func (d *LiteDBBackedStore) Delete(hash string) error { + err := d.blobs.Delete(hash) + if err != nil { + return err + } + + return d.db.Delete(hash) +} + +// list returns the hashes of blobs that already exist in the database +func (d *LiteDBBackedStore) list() ([]string, error) { + blobs, err := d.db.AllBlobs() + return blobs, err +} + +func (d *LiteDBBackedStore) selfClean() { + d.stopper.Add(1) + defer d.stopper.Done() + lastCleanup := time.Now() + const cleanupInterval = 10 * time.Second + for { + select { + case <-d.stopper.Ch(): + log.Infoln("stopping self cleanup") + return + default: + time.Sleep(1 * time.Second) + } + if time.Since(lastCleanup) < cleanupInterval { + continue + } + blobsCount, err := d.db.BlobsCount() + if err != nil { + log.Errorf(errors.FullTrace(err)) + } + if blobsCount >= d.maxItems { + itemsToDelete := blobsCount / 100 * 10 + blobs, err := d.db.GetLRUBlobs(itemsToDelete) + if err != nil { + log.Errorf(errors.FullTrace(err)) + } + for _, hash := range blobs { + select { + case <-d.stopper.Ch(): + return + default: + + } + err = d.Delete(hash) + if err != nil { + log.Errorf(errors.FullTrace(err)) + } + metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc() + } + } + lastCleanup = time.Now() + } +} + +// Shutdown shuts down the store gracefully +func (d *LiteDBBackedStore) Shutdown() { + d.stopper.StopAndWait() + return +} diff --git a/store/lru.go b/store/lru.go index 071e2b9..ce0d83d 100644 --- a/store/lru.go +++ b/store/lru.go @@ -123,3 +123,8 @@ func (l *LRUStore) loadExisting(store lister, maxItems int) error { } return nil } + +// Shutdown shuts down the store gracefully +func (l *LRUStore) Shutdown() { + return +} diff --git a/store/memory.go b/store/memory.go index f462a8d..62b93b6 100644 --- a/store/memory.go +++ b/store/memory.go @@ -71,3 +71,8 @@ func (m *MemStore) Debug() map[string]stream.Blob { defer m.mu.RUnlock() return m.blobs } + +// Shutdown shuts down the store gracefully +func (m *MemStore) Shutdown() { + return +} diff --git a/store/noop.go b/store/noop.go index 9e5d815..2792a0d 100644 --- a/store/noop.go +++ b/store/noop.go @@ -13,3 +13,4 @@ func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil } func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil } func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil } func (n *NoopStore) Delete(_ string) error { return nil } +func (n *NoopStore) Shutdown() { return } diff --git a/store/s3.go b/store/s3.go index 53451be..02a23c5 100644 --- a/store/s3.go +++ b/store/s3.go @@ -158,3 +158,8 @@ func (s *S3Store) initOnce() error { s.session = sess return nil } + +// Shutdown shuts down the store gracefully +func (s *S3Store) Shutdown() { + return +} diff --git a/store/singleflight.go b/store/singleflight.go index cca1e2f..7d0139d 100644 --- a/store/singleflight.go +++ b/store/singleflight.go @@ -65,3 +65,9 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) { return blob, nil } } + +// Shutdown shuts down the store gracefully +func (s *singleflightStore) Shutdown() { + s.BlobStore.Shutdown() + return +} diff --git a/store/store.go b/store/store.go index 200ff61..36d388d 100644 --- a/store/store.go +++ b/store/store.go @@ -19,6 +19,8 @@ type BlobStore interface { PutSD(hash string, blob stream.Blob) error // Delete the blob from the store. Delete(hash string) error + // Shutdown the store gracefully + Shutdown() } // Blocklister is a store that supports blocking blobs to prevent their inclusion in the store.