Merge branch 'track_access'

* track_access:
  small changes for easier testing
  track approximate access time for blobs
This commit is contained in:
Alex Grintsvayg 2020-10-05 18:10:49 -04:00
commit 6118dde36c
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
11 changed files with 153 additions and 33 deletions

View file

@ -3,6 +3,7 @@ package cmd
import ( import (
"encoding/hex" "encoding/hex"
"os" "os"
"time"
"github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
@ -49,7 +50,12 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
log.Fatal(err) log.Fatal(err)
} }
f, err := os.Create(wd + "/" + sd.SuggestedFileName) filename := sd.SuggestedFileName
if filename == "" {
filename = "stream_" + time.Now().Format("20060102_150405")
}
f, err := os.Create(wd + "/" + filename)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View file

@ -25,6 +25,7 @@ var http3PeerPort int
var receiverPort int var receiverPort int
var metricsPort int var metricsPort int
var disableUploads bool var disableUploads bool
var disableBlocklist bool
var proxyAddress string var proxyAddress string
var proxyPort string var proxyPort string
var proxyProtocol string var proxyProtocol string
@ -47,12 +48,13 @@ func init() {
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics")
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
} }
func reflectorCmd(cmd *cobra.Command, args []string) { func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339)) log.Printf("reflector %s", meta.VersionString())
var blobStore store.BlobStore var blobStore store.BlobStore
if proxyAddress != "" { if proxyAddress != "" {
@ -84,6 +86,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
if useDB { if useDB {
db := new(db.SQL) db := new(db.SQL)
db.TrackAccessTime = true
err = db.Connect(globalConfig.DBConn) err = db.Connect(globalConfig.DBConn)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -93,7 +96,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
reflectorServer = reflector.NewServer(blobStore) reflectorServer = reflector.NewServer(blobStore)
reflectorServer.Timeout = 3 * time.Minute reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = true reflectorServer.EnableBlocklist = !disableBlocklist
err = reflectorServer.Start(":" + strconv.Itoa(receiverPort)) err = reflectorServer.Start(":" + strconv.Itoa(receiverPort))
if err != nil { if err != nil {

View file

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"os" "os"
"strings"
"github.com/lbryio/lbry.go/v2/dht" "github.com/lbryio/lbry.go/v2/dht"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
@ -68,8 +69,11 @@ func preRun(cmd *cobra.Command, args []string) {
debugLogger.SetOutput(os.Stderr) debugLogger.SetOutput(os.Stderr)
if util.InSlice(verboseAll, verbose) { if util.InSlice(verboseAll, verbose) {
logrus.Info("global verbose logging enabled")
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
verbose = []string{verboseDHT, verboseNodeFinder} verbose = []string{verboseDHT, verboseNodeFinder}
} else if len(verbose) > 0 {
logrus.Infof("verbose logging enabled for: %s", strings.Join(verbose, ", "))
} }
for _, debugType := range verbose { for _, debugType := range verbose {

View file

@ -2,6 +2,8 @@ package cmd
import ( import (
"crypto/rand" "crypto/rand"
"io/ioutil"
"os"
"github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/reflector"
@ -13,9 +15,9 @@ import (
func init() { func init() {
var cmd = &cobra.Command{ var cmd = &cobra.Command{
Use: "sendblob ADDRESS:PORT", Use: "sendblob ADDRESS:PORT [PATH]",
Short: "Send a random blob to a reflector server", Short: "Send a random blob to a reflector server",
Args: cobra.ExactArgs(1), Args: cobra.RangeArgs(1, 2),
Run: sendBlobCmd, Run: sendBlobCmd,
} }
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
@ -23,6 +25,10 @@ func init() {
func sendBlobCmd(cmd *cobra.Command, args []string) { func sendBlobCmd(cmd *cobra.Command, args []string) {
addr := args[0] addr := args[0]
var path string
if len(args) >= 2 {
path = args[1]
}
c := reflector.Client{} c := reflector.Client{}
err := c.Connect(addr) err := c.Connect(addr)
@ -30,6 +36,7 @@ func sendBlobCmd(cmd *cobra.Command, args []string) {
log.Fatal("error connecting client to server: ", err) log.Fatal("error connecting client to server: ", err)
} }
if path == "" {
blob := make(stream.Blob, 1024) blob := make(stream.Blob, 1024)
_, err = rand.Read(blob) _, err = rand.Read(blob)
if err != nil { if err != nil {
@ -40,4 +47,26 @@ func sendBlobCmd(cmd *cobra.Command, args []string) {
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
return
}
file, err := os.Open(path)
checkErr(err)
data, err := ioutil.ReadAll(file)
checkErr(err)
s, err := stream.New(data)
checkErr(err)
sdBlob := &stream.SDBlob{}
err = sdBlob.FromBlob(s[0])
checkErr(err)
for i, b := range s {
if i == 0 {
err = c.SendSDBlob(b)
} else {
err = c.SendBlob(b)
}
checkErr(err)
}
} }

View file

@ -27,7 +27,7 @@ func init() {
} }
func testCmd(cmd *cobra.Command, args []string) { func testCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector version %s", meta.Version) log.Printf("reflector %s", meta.VersionString())
memStore := store.NewMemoryBlobStore() memStore := store.NewMemoryBlobStore()

View file

@ -2,7 +2,6 @@ package cmd
import ( import (
"fmt" "fmt"
"time"
"github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/meta"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -18,5 +17,5 @@ func init() {
} }
func versionCmd(cmd *cobra.Command, args []string) { func versionCmd(cmd *cobra.Command, args []string) {
fmt.Printf("version %s (built %s)\n", meta.Version, meta.BuildTime.Format(time.RFC3339)) fmt.Println(meta.VersionString())
} }

View file

@ -32,6 +32,8 @@ type SdBlob struct {
// SQL implements the DB interface // SQL implements the DB interface
type SQL struct { type SQL struct {
conn *sql.DB conn *sql.DB
TrackAccessTime bool
} }
func logQuery(query string, args ...interface{}) { func logQuery(query string, args ...interface{}) {
@ -75,9 +77,10 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
return 0, errors.Err("length must be positive") return 0, errors.Err("length must be positive")
} }
args := []interface{}{hash, isStored, length}
blobID, err := s.exec( blobID, err := s.exec(
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", "INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
hash, isStored, length, args...,
) )
if err != nil { if err != nil {
return 0, err return 0, err
@ -97,9 +100,10 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
} }
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
args := []interface{}{hash, sdBlobID, time.Now()}
streamID, err := s.exec( streamID, err := s.exec(
"INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (?,?)", "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")",
hash, sdBlobID, args...,
) )
if err != nil { if err != nil {
return 0, errors.Err(err) return 0, errors.Err(err)
@ -113,6 +117,13 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
if streamID == 0 { if streamID == 0 {
return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing") return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing")
} }
if s.TrackAccessTime {
err := s.touch([]uint64{uint64(streamID)})
if err != nil {
return 0, errors.Err(err)
}
}
} }
return streamID, nil return streamID, nil
} }
@ -128,12 +139,44 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
// HasBlobs checks if the database contains the set of blobs and returns a bool map. // HasBlobs checks if the database contains the set of blobs and returns a bool map.
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
if s.conn == nil { exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
return nil, errors.Err("not connected") s.touch(streamsNeedingTouch)
return exists, err
}
func (s *SQL) touch(streamIDs []uint64) error {
if len(streamIDs) == 0 {
return nil
} }
var hash string query := "UPDATE stream SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(streamIDs)) + ")"
args := make([]interface{}, len(streamIDs)+1)
args[0] = time.Now()
for i := range streamIDs {
args[i+1] = streamIDs[i]
}
startTime := time.Now()
_, err := s.exec(query, args...)
log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime))
return errors.Err(err)
}
func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
if s.conn == nil {
return nil, nil, errors.Err("not connected")
}
var (
hash string
streamID uint64
lastAccessedAt time.Time
)
var needsTouch []uint64
exists := make(map[string]bool) exists := make(map[string]bool)
touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time
maxBatchSize := 10000 maxBatchSize := 10000
doneIndex := 0 doneIndex := 0
@ -145,7 +188,13 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes))
batch := hashes[doneIndex:sliceEnd] batch := hashes[doneIndex:sliceEnd]
query := "SELECT hash FROM blob_ WHERE is_stored = ? && hash IN (" + qt.Qs(len(batch)) + ")" // TODO: this query doesn't work for SD blobs, which are not in the stream_blob table
query := `SELECT b.hash, s.id, s.last_accessed_at
FROM blob_ b
LEFT JOIN stream_blob sb ON b.id = sb.blob_id
INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id)
WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)`
args := make([]interface{}, len(batch)+1) args := make([]interface{}, len(batch)+1)
args[0] = true args[0] = true
for i := range batch { for i := range batch {
@ -164,11 +213,14 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
defer closeRows(rows) defer closeRows(rows)
for rows.Next() { for rows.Next() {
err := rows.Scan(&hash) err := rows.Scan(&hash, &streamID, &lastAccessedAt)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
exists[hash] = true exists[hash] = true
if s.TrackAccessTime && lastAccessedAt.Before(touchDeadline) {
needsTouch = append(needsTouch, streamID)
}
} }
err = rows.Err() err = rows.Err()
@ -180,11 +232,11 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
return nil return nil
}() }()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
} }
return exists, nil return exists, needsTouch, nil
} }
// Delete will remove the blob from the db // Delete will remove the blob from the db
@ -309,9 +361,10 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
return err return err
} }
args := []interface{}{streamID, blobID, contentBlob.BlobNum}
_, err = s.exec( _, err = s.exec(
"INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES (?,?,?)", "INSERT IGNORE INTO stream_blob (stream_id, blob_id, num) VALUES ("+qt.Qs(len(args))+")",
streamID, blobID, contentBlob.BlobNum, args...,
) )
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -482,9 +535,11 @@ CREATE TABLE stream (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE,
hash char(96) NOT NULL, hash char(96) NOT NULL,
sd_blob_id BIGINT UNSIGNED NOT NULL, sd_blob_id BIGINT UNSIGNED NOT NULL,
last_accessed_at TIMESTAMP NULL DEFAULT NULL,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY stream_hash_idx (hash), UNIQUE KEY stream_hash_idx (hash),
KEY stream_sd_blob_id_idx (sd_blob_id), KEY stream_sd_blob_id_idx (sd_blob_id),
KEY last_accessed_at_idx (last_accessed_at),
FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE FOREIGN KEY (sd_blob_id) REFERENCES blob_ (id) ON DELETE RESTRICT ON UPDATE CASCADE
); );

View file

@ -1,6 +1,7 @@
package meta package meta
import ( import (
"fmt"
"strconv" "strconv"
"time" "time"
) )
@ -12,9 +13,24 @@ var BuildTime time.Time
func init() { func init() {
if Time != "" { if Time != "" {
t, err := strconv.Atoi(Time) t, err := strconv.Atoi(Time)
if err != nil { if err == nil {
return
}
BuildTime = time.Unix(int64(t), 0).UTC() BuildTime = time.Unix(int64(t), 0).UTC()
} }
}
}
func VersionString() string {
version := Version
if version == "" {
version = "<unset>"
}
var buildTime string
if BuildTime.IsZero() {
buildTime = "<now>"
} else {
buildTime = BuildTime.Format(time.RFC3339)
}
return fmt.Sprintf("version %s, built %s", version, buildTime)
} }

View file

@ -153,7 +153,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
return nil, err return nil, err
} }
return stream.Blob(blob), nil return blob, nil
} }
func (c *Client) read(v interface{}) error { func (c *Client) read(v interface{}) error {

View file

@ -163,7 +163,7 @@ func generateTLSConfig() *tls.Config {
func (s *Server) listenAndServe(server *http3.Server) { func (s *Server) listenAndServe(server *http3.Server) {
err := server.ListenAndServe() err := server.ListenAndServe()
if err != nil { if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorln(errors.FullTrace(err)) log.Errorln(errors.FullTrace(err))
} }
} }

View file

@ -32,6 +32,14 @@ func (d *DBBackedStore) Has(hash string) (bool, error) {
// Get gets the blob // Get gets the blob
func (d *DBBackedStore) Get(hash string) (stream.Blob, error) { func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
has, err := d.db.HasBlob(hash)
if err != nil {
return nil, err
}
if !has {
return nil, ErrBlobNotFound
}
return d.blobs.Get(hash) return d.blobs.Get(hash)
} }