track approximate access time for blobs #44

Merged
lyoshenka merged 4 commits from track_access into master 2020-10-06 00:11:16 +02:00
8 changed files with 101 additions and 22 deletions
Showing only changes of commit de0ccd4da7 - Show all commits

View file

@ -52,7 +52,7 @@ func init() {
} }
func reflectorCmd(cmd *cobra.Command, args []string) { func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339)) log.Printf("reflector %s", meta.VersionString())
var blobStore store.BlobStore var blobStore store.BlobStore
if proxyAddress != "" { if proxyAddress != "" {
@ -84,6 +84,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
if useDB { if useDB {
db := new(db.SQL) db := new(db.SQL)
db.TrackAccessTime = true
err = db.Connect(globalConfig.DBConn) err = db.Connect(globalConfig.DBConn)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -27,7 +27,7 @@ func init() {
} }
func testCmd(cmd *cobra.Command, args []string) { func testCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector version %s", meta.Version) log.Printf("reflector %s", meta.VersionString())
memStore := store.NewMemoryBlobStore() memStore := store.NewMemoryBlobStore()

View file

@ -2,7 +2,6 @@ package cmd
import ( import (
"fmt" "fmt"
"time"
"github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/meta"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -18,5 +17,5 @@ func init() {
} }
func versionCmd(cmd *cobra.Command, args []string) { func versionCmd(cmd *cobra.Command, args []string) {
fmt.Printf("version %s (built %s)\n", meta.Version, meta.BuildTime.Format(time.RFC3339)) fmt.Println(meta.VersionString())
} }

View file

@ -32,6 +32,8 @@ type SdBlob struct {
// SQL implements the DB interface // SQL implements the DB interface
type SQL struct { type SQL struct {
conn *sql.DB conn *sql.DB
TrackAccessTime bool
} }
func logQuery(query string, args ...interface{}) { func logQuery(query string, args ...interface{}) {
@ -75,9 +77,10 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
return 0, errors.Err("length must be positive") return 0, errors.Err("length must be positive")
} }
args := []interface{}{hash, isStored, length}
blobID, err := s.exec( blobID, err := s.exec(
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", "INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
hash, isStored, length, args...,
) )
if err != nil { if err != nil {
return 0, err return 0, err
@ -97,9 +100,10 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
} }
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
args := []interface{}{hash, sdBlobID, time.Now()}
streamID, err := s.exec( streamID, err := s.exec(
"INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (?,?)", "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")",
hash, sdBlobID, args...,
) )
if err != nil { if err != nil {
return 0, errors.Err(err) return 0, errors.Err(err)
@ -113,6 +117,13 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
if streamID == 0 { if streamID == 0 {
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing") return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
} }
if s.TrackAccessTime {
err := s.touch([]uint64{uint64(streamID)})
if err != nil {
return 0, errors.Err(err)
}
}
} }
return streamID, nil return streamID, nil
} }
@ -128,12 +139,44 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
// HasBlobs checks if the database contains the set of blobs and returns a bool map. // HasBlobs checks if the database contains the set of blobs and returns a bool map.
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
if s.conn == nil { exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
return nil, errors.Err("not connected") s.touch(streamsNeedingTouch)
return exists, err
} }
var hash string func (s *SQL) touch(streamIDs []uint64) error {
if len(streamIDs) == 0 {
return nil
}
query := "UPDATE stream SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(streamIDs)) + ")"
args := make([]interface{}, len(streamIDs)+1)
args[0] = time.Now()
for i := range streamIDs {
args[i+1] = streamIDs[i]
}
startTime := time.Now()
_, err := s.exec(query, args...)
log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime))
return errors.Err(err)
}
func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
if s.conn == nil {
return nil, nil, errors.Err("not connected")
}
var (
hash string
streamID uint64
lastAccessedAt time.Time
)
var needsTouch []uint64
exists := make(map[string]bool) exists := make(map[string]bool)
touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time
maxBatchSize := 10000 maxBatchSize := 10000
doneIndex := 0 doneIndex := 0
@ -145,7 +188,13 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
batch := hashes[doneIndex:sliceEnd] batch := hashes[doneIndex:sliceEnd]
query := "SELECT hash FROM blob_ WHERE is_stored = ? && hash IN (" + qt.Qs(len(batch)) + ")" // TODO: this query doesn't work for SD blobs, which are not in the stream_blob table
query := `SELECT b.hash, s.id, s.last_accessed_at
FROM blob_ b
LEFT JOIN stream_blob sb ON b.id = sb.blob_id
INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id)
WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
args := make([]interface{}, len(batch)+1) args := make([]interface{}, len(batch)+1)
args[0] = true args[0] = true
for i := range batch { for i := range batch {
@ -164,11 +213,14 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
defer closeRows(rows) defer closeRows(rows)
for rows.Next() { for rows.Next() {
err := rows.Scan(&hash) err := rows.Scan(&hash, &streamID, &lastAccessedAt)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
exists[hash] = true exists[hash] = true
if s.TrackAccessTime && lastAccessedAt.Before(touchDeadline) {
needsTouch = append(needsTouch, streamID)
}
} }
err = rows.Err() err = rows.Err()
@ -180,11 +232,11 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
return nil return nil
}() }()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
} }
return exists, nil return exists, needsTouch, nil
} }
// Delete will remove the blob from the db // Delete will remove the blob from the db
@ -309,9 +361,10 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
return err return err
} }
args := []interface{}{streamID, blobID, contentBlob.BlobNum}
_, err = s.exec( _, err = s.exec(
"INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES (?,?,?)", "INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES ("+qt.Qs(len(args))+")",
streamID, blobID, contentBlob.BlobNum, args...,
) )
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -482,9 +535,11 @@ CREATE TABLE stream (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE,
hash char(96) NOT NULL, hash char(96) NOT NULL,
sd_blob_id BIGINT UNSIGNED NOT NULL, sd_blob_id BIGINT UNSIGNED NOT NULL,
last_accessed_at TIMESTAMP NULL DEFAULT NULL,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY stream_hash_idx (hash), UNIQUE KEY stream_hash_idx (hash),
KEY stream_sd_blob_id_idx (sd_blob_id), KEY stream_sd_blob_id_idx (sd_blob_id),
KEY last_accessed_at_idx (last_accessed_at),
FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE
); );

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"fmt"
"strconv" "strconv"
"time" "time"
) )
@ -12,9 +13,24 @@ var BuildTime time.Time
func init() { func init() {
if Time != "" { if Time != "" {
t, err := strconv.Atoi(Time) t, err := strconv.Atoi(Time)
if err != nil { if err == nil {
return
}
BuildTime = time.Unix(int64(t), 0).UTC() BuildTime = time.Unix(int64(t), 0).UTC()
} }
} }
}
func VersionString() string {
version := Version
if version == "" {
version = "<unset>"
}
var buildTime string
if BuildTime.IsZero() {
buildTime = "<now>"
} else {
buildTime = BuildTime.Format(time.RFC3339)
}
return fmt.Sprintf("version %s, built %s", version, buildTime)
}

View file

@ -153,7 +153,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
return nil, err return nil, err
} }
return stream.Blob(blob), nil return blob, nil
} }
func (c *Client) read(v interface{}) error { func (c *Client) read(v interface{}) error {

View file

@ -163,7 +163,7 @@ func generateTLSConfig() *tls.Config {
func (s *Server) listenAndServe(server *http3.Server) { func (s *Server) listenAndServe(server *http3.Server) {
err := server.ListenAndServe() err := server.ListenAndServe()
if err != nil { if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorln(errors.FullTrace(err)) log.Errorln(errors.FullTrace(err))
} }
} }

View file

@ -32,6 +32,14 @@ func (d *DBBackedStore) Has(hash string) (bool, error) {
// Get gets the blob // Get gets the blob
func (d *DBBackedStore) Get(hash string) (stream.Blob, error) { func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
has, err := d.db.HasBlob(hash)
if err != nil {
return nil, err
}
if !has {
return nil, ErrBlobNotFound
}
return d.blobs.Get(hash) return d.blobs.Get(hash)
} }