From de0ccd4da7e2c5652e838029ba8127751f079aee Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 5 Oct 2020 14:05:00 -0400 Subject: [PATCH] track approximate access time for blobs --- cmd/reflector.go | 3 +- cmd/test.go | 2 +- cmd/version.go | 3 +- db/db.go | 81 +++++++++++++++++++++++++++++++++++++------- meta/meta.go | 22 ++++++++++-- peer/client.go | 2 +- peer/http3/server.go | 2 +- store/dbbacked.go | 8 +++++ 8 files changed, 101 insertions(+), 22 deletions(-) diff --git a/cmd/reflector.go b/cmd/reflector.go index 3aee294..41f6646 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -52,7 +52,7 @@ func init() { } func reflectorCmd(cmd *cobra.Command, args []string) { - log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339)) + log.Printf("reflector %s", meta.VersionString()) var blobStore store.BlobStore if proxyAddress != "" { @@ -84,6 +84,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { if useDB { db := new(db.SQL) + db.TrackAccessTime = true err = db.Connect(globalConfig.DBConn) if err != nil { log.Fatal(err) diff --git a/cmd/test.go b/cmd/test.go index c4c3c45..051ddf7 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -27,7 +27,7 @@ func init() { } func testCmd(cmd *cobra.Command, args []string) { - log.Printf("reflector version %s", meta.Version) + log.Printf("reflector %s", meta.VersionString()) memStore := store.NewMemoryBlobStore() diff --git a/cmd/version.go b/cmd/version.go index 4254143..b3ad97c 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "time" "github.com/lbryio/reflector.go/meta" "github.com/spf13/cobra" @@ -18,5 +17,5 @@ func init() { } func versionCmd(cmd *cobra.Command, args []string) { - fmt.Printf("version %s (built %s)\n", meta.Version, meta.BuildTime.Format(time.RFC3339)) + fmt.Println(meta.VersionString()) } diff --git a/db/db.go b/db/db.go index 26a959d..5720e32 100644 --- a/db/db.go +++ b/db/db.go @@ -32,6 +32,8 @@ type SdBlob struct { // SQL implements the DB interface type SQL struct { conn *sql.DB + + TrackAccessTime bool } func logQuery(query string, args ...interface{}) { @@ -75,9 +77,10 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) return 0, errors.Err("length must be positive") } + args := []interface{}{hash, isStored, length} blobID, err := s.exec( - "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", - hash, isStored, length, + "INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", + args..., ) if err != nil { return 0, err @@ -97,9 +100,10 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) } func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { + args := []interface{}{hash, sdBlobID, time.Now()} streamID, err := s.exec( - "INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (?,?)", - hash, sdBlobID, + "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")", + args..., ) if err != nil { return 0, errors.Err(err) @@ -113,6 +117,13 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { if streamID == 0 { return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing") } + + if s.TrackAccessTime { + err := s.touch([]uint64{uint64(streamID)}) + if err != nil { + return 0, errors.Err(err) + } + } } return streamID, nil } @@ -128,12 +139,44 @@ func (s *SQL) HasBlob(hash string) (bool, error) { // HasBlobs checks if the database contains the set of blobs and returns a bool map. func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { - if s.conn == nil { - return nil, errors.Err("not connected") + exists, streamsNeedingTouch, err := s.hasBlobs(hashes) + s.touch(streamsNeedingTouch) + return exists, err +} + +func (s *SQL) touch(streamIDs []uint64) error { + if len(streamIDs) == 0 { + return nil } - var hash string + query := "UPDATE stream SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(streamIDs)) + ")" + args := make([]interface{}, len(streamIDs)+1) + args[0] = time.Now() + for i := range streamIDs { + args[i+1] = streamIDs[i] + } + + startTime := time.Now() + _, err := s.exec(query, args...) + log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime)) + return errors.Err(err) +} + +func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { + if s.conn == nil { + return nil, nil, errors.Err("not connected") + } + + var ( + hash string + streamID uint64 + lastAccessedAt time.Time + ) + + var needsTouch []uint64 exists := make(map[string]bool) + + touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time maxBatchSize := 10000 doneIndex := 0 @@ -145,7 +188,13 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) batch := hashes[doneIndex:sliceEnd] - query := "SELECT hash FROM blob_ WHERE is_stored = ? && hash IN (" + qt.Qs(len(batch)) + ")" + // TODO: this query doesn't work for SD blobs, which are not in the stream_blob table + + query := `SELECT b.hash, s.id, s.last_accessed_at +FROM blob_ b +LEFT JOIN stream_blob sb ON b.id = sb.blob_id +INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id) +WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` args := make([]interface{}, len(batch)+1) args[0] = true for i := range batch { @@ -164,11 +213,14 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { defer closeRows(rows) for rows.Next() { - err := rows.Scan(&hash) + err := rows.Scan(&hash, &streamID, &lastAccessedAt) if err != nil { return errors.Err(err) } exists[hash] = true + if s.TrackAccessTime && lastAccessedAt.Before(touchDeadline) { + needsTouch = append(needsTouch, streamID) + } } err = rows.Err() @@ -180,11 +232,11 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { return nil }() if err != nil { - return nil, err + return nil, nil, err } } - return exists, nil + return exists, needsTouch, nil } // Delete will remove the blob from the db @@ -309,9 +361,10 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { return err } + args := []interface{}{streamID, blobID, contentBlob.BlobNum} _, err = s.exec( - "INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES (?,?,?)", - streamID, blobID, contentBlob.BlobNum, + "INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES ("+qt.Qs(len(args))+")", + args..., ) if err != nil { return errors.Err(err) @@ -482,9 +535,11 @@ CREATE TABLE stream ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, hash char(96) NOT NULL, sd_blob_id BIGINT UNSIGNED NOT NULL, + last_accessed_at TIMESTAMP NULL DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY stream_hash_idx (hash), KEY stream_sd_blob_id_idx (sd_blob_id), + KEY last_accessed_at_idx (last_accessed_at), FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE ); diff --git a/meta/meta.go b/meta/meta.go index e3129c2..daf7a15 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -1,6 +1,7 @@ package meta import ( + "fmt" "strconv" "time" ) @@ -12,9 +13,24 @@ var BuildTime time.Time func init() { if Time != "" { t, err := strconv.Atoi(Time) - if err != nil { - return + if err == nil { + BuildTime = time.Unix(int64(t), 0).UTC() } - BuildTime = time.Unix(int64(t), 0).UTC() } } + +func VersionString() string { + version := Version + if version == "" { + version = "" + } + + var buildTime string + if BuildTime.IsZero() { + buildTime = "" + } else { + buildTime = BuildTime.Format(time.RFC3339) + } + + return fmt.Sprintf("version %s, built %s", version, buildTime) +} diff --git a/peer/client.go b/peer/client.go index 134c1f9..faa0fa2 100644 --- a/peer/client.go +++ b/peer/client.go @@ -153,7 +153,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { return nil, err } - return stream.Blob(blob), nil + return blob, nil } func (c *Client) read(v interface{}) error { diff --git a/peer/http3/server.go b/peer/http3/server.go index 74d30c7..30b510c 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -163,7 +163,7 @@ func generateTLSConfig() *tls.Config { func (s *Server) listenAndServe(server *http3.Server) { err := server.ListenAndServe() - if err != nil { + if err != nil && !errors.Is(err, http.ErrServerClosed) { log.Errorln(errors.FullTrace(err)) } } diff --git a/store/dbbacked.go b/store/dbbacked.go index e5912d6..1b554ca 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -32,6 +32,14 @@ func (d *DBBackedStore) Has(hash string) (bool, error) { // Get gets the blob func (d *DBBackedStore) Get(hash string) (stream.Blob, error) { + has, err := d.db.HasBlob(hash) + if err != nil { + return nil, err + } + if !has { + return nil, ErrBlobNotFound + } + return d.blobs.Get(hash) }