when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50
7 changed files with 47 additions and 27 deletions
|
@ -33,7 +33,9 @@ func peerCmd(cmd *cobra.Command, args []string) {
|
||||||
peerServer := peer.NewServer(s3)
|
peerServer := peer.NewServer(s3)
|
||||||
|
|
||||||
if !peerNoDB {
|
if !peerNoDB {
|
||||||
db := new(db.SQL)
|
db := &db.SQL{
|
||||||
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
||||||
|
}
|
||||||
err = db.Connect(globalConfig.DBConn)
|
err = db.Connect(globalConfig.DBConn)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
|
|
||||||
|
|
|
@ -30,9 +30,11 @@ func populateDbCmd(cmd *cobra.Command, args []string) {
|
||||||
if diskStorePath == "" {
|
if diskStorePath == "" {
|
||||||
log.Fatal("store-path must be defined")
|
log.Fatal("store-path must be defined")
|
||||||
}
|
}
|
||||||
localDb := new(db.SQL)
|
localDb := &db.SQL{
|
||||||
localDb.SoftDelete = true
|
SoftDelete: true,
|
||||||
localDb.TrackAccess = db.TrackAccessBlobs
|
TrackAccess: db.TrackAccessBlobs,
|
||||||
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
||||||
|
}
|
||||||
err := localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
err := localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|
|
@ -152,8 +152,10 @@ func setupStore() store.BlobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
if useDB {
|
if useDB {
|
||||||
dbInst := new(db.SQL)
|
dbInst := &db.SQL{
|
||||||
dbInst.TrackAccess = db.TrackAccessStreams
|
TrackAccess: db.TrackAccessStreams,
|
||||||
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
||||||
|
}
|
||||||
err := dbInst.Connect(globalConfig.DBConn)
|
err := dbInst.Connect(globalConfig.DBConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
@ -178,9 +180,11 @@ func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStor
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
localDb := new(db.SQL)
|
localDb := &db.SQL{
|
||||||
localDb.SoftDelete = true
|
SoftDelete: true,
|
||||||
localDb.TrackAccess = db.TrackAccessBlobs
|
TrackAccess: db.TrackAccessBlobs,
|
||||||
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
||||||
|
}
|
||||||
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
|
|
|
@ -52,7 +52,9 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func startCmd(cmd *cobra.Command, args []string) {
|
func startCmd(cmd *cobra.Command, args []string) {
|
||||||
db := new(db.SQL)
|
db := &db.SQL{
|
||||||
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
||||||
|
}
|
||||||
err := db.Connect(globalConfig.DBConn)
|
err := db.Connect(globalConfig.DBConn)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/lbryio/reflector.go/db"
|
"github.com/lbryio/reflector.go/db"
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
"github.com/lbryio/reflector.go/reflector"
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
@ -30,7 +31,9 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func uploadCmd(cmd *cobra.Command, args []string) {
|
func uploadCmd(cmd *cobra.Command, args []string) {
|
||||||
db := new(db.SQL)
|
db := &db.SQL{
|
||||||
|
LogQueries: log.GetLevel() == log.DebugLevel,
|
||||||
|
}
|
||||||
err := db.Connect(globalConfig.DBConn)
|
err := db.Connect(globalConfig.DBConn)
|
||||||
checkErr(err)
|
checkErr(err)
|
||||||
|
|
||||||
|
|
33
db/db.go
33
db/db.go
|
@ -52,14 +52,21 @@ type SQL struct {
|
||||||
|
|
||||||
// Instead of deleting a blob, marked it as not stored in the db
|
// Instead of deleting a blob, marked it as not stored in the db
|
||||||
SoftDelete bool
|
SoftDelete bool
|
||||||
|
|
||||||
|
// Log executed queries. qt.InterpolateParams is cpu-heavy. This avoids that call if not needed.
|
||||||
|
LogQueries bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func logQuery(query string, args ...interface{}) {
|
func (s SQL) logQuery(query string, args ...interface{}) {
|
||||||
s, err := qt.InterpolateParams(query, args...)
|
if !s.LogQueries {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
qStr, err := qt.InterpolateParams(query, args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorln(err)
|
log.Errorln(err)
|
||||||
} else {
|
} else {
|
||||||
log.Debugln(s)
|
log.Debugln(qStr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,7 +327,7 @@ WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)`
|
||||||
args[i] = batch[i]
|
args[i] = batch[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
logQuery(query, args...)
|
s.logQuery(query, args...)
|
||||||
|
|
||||||
err := func() error {
|
err := func() error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
@ -390,7 +397,7 @@ func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
query := "SELECT hash from blob_ where is_stored = 1 order by last_accessed_at limit ?"
|
query := "SELECT hash from blob_ where is_stored = 1 order by last_accessed_at limit ?"
|
||||||
logQuery(query, maxBlobs)
|
s.logQuery(query, maxBlobs)
|
||||||
|
|
||||||
rows, err := s.conn.Query(query, maxBlobs)
|
rows, err := s.conn.Query(query, maxBlobs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -422,7 +429,7 @@ func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) {
|
||||||
// if s.SoftDelete {
|
// if s.SoftDelete {
|
||||||
// query += " where is_stored = 1"
|
// query += " where is_stored = 1"
|
||||||
// }
|
// }
|
||||||
// logQuery(query)
|
// s.logQuery(query)
|
||||||
//
|
//
|
||||||
// rows, err := s.conn.Query(query)
|
// rows, err := s.conn.Query(query)
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
|
@ -454,7 +461,7 @@ func (s *SQL) Count() (int, error) {
|
||||||
if s.SoftDelete {
|
if s.SoftDelete {
|
||||||
query += " where is_stored = 1"
|
query += " where is_stored = 1"
|
||||||
}
|
}
|
||||||
logQuery(query)
|
s.logQuery(query)
|
||||||
|
|
||||||
var count int
|
var count int
|
||||||
err := s.conn.QueryRow(query).Scan(&count)
|
err := s.conn.QueryRow(query).Scan(&count)
|
||||||
|
@ -465,7 +472,7 @@ func (s *SQL) Count() (int, error) {
|
||||||
func (s *SQL) Block(hash string) error {
|
func (s *SQL) Block(hash string) error {
|
||||||
query := "INSERT IGNORE INTO blocked SET hash = ?"
|
query := "INSERT IGNORE INTO blocked SET hash = ?"
|
||||||
args := []interface{}{hash}
|
args := []interface{}{hash}
|
||||||
logQuery(query, args...)
|
s.logQuery(query, args...)
|
||||||
_, err := s.conn.Exec(query, args...)
|
_, err := s.conn.Exec(query, args...)
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
@ -473,7 +480,7 @@ func (s *SQL) Block(hash string) error {
|
||||||
// GetBlocked will return a list of blocked hashes
|
// GetBlocked will return a list of blocked hashes
|
||||||
func (s *SQL) GetBlocked() (map[string]bool, error) {
|
func (s *SQL) GetBlocked() (map[string]bool, error) {
|
||||||
query := "SELECT hash FROM blocked"
|
query := "SELECT hash FROM blocked"
|
||||||
logQuery(query)
|
s.logQuery(query)
|
||||||
rows, err := s.conn.Query(query)
|
rows, err := s.conn.Query(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Err(err)
|
return nil, errors.Err(err)
|
||||||
|
@ -516,7 +523,7 @@ func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) {
|
||||||
`
|
`
|
||||||
args := []interface{}{sdHash}
|
args := []interface{}{sdHash}
|
||||||
|
|
||||||
logQuery(query, args...)
|
s.logQuery(query, args...)
|
||||||
|
|
||||||
rows, err := s.conn.Query(query, args...)
|
rows, err := s.conn.Query(query, args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -595,7 +602,7 @@ func (s *SQL) GetHashRange() (string, string, error) {
|
||||||
|
|
||||||
query := "SELECT MIN(hash), MAX(hash) from blob_"
|
query := "SELECT MIN(hash), MAX(hash) from blob_"
|
||||||
|
|
||||||
logQuery(query)
|
s.logQuery(query)
|
||||||
|
|
||||||
err := s.conn.QueryRow(query).Scan(&min, &max)
|
err := s.conn.QueryRow(query).Scan(&min, &max)
|
||||||
return min, max, err
|
return min, max, err
|
||||||
|
@ -619,7 +626,7 @@ func (s *SQL) GetStoredHashesInRange(ctx context.Context, start, end bits.Bitmap
|
||||||
query := "SELECT hash FROM blob_ WHERE hash >= ? AND hash <= ? AND is_stored = 1"
|
query := "SELECT hash FROM blob_ WHERE hash >= ? AND hash <= ? AND is_stored = 1"
|
||||||
args := []interface{}{start.Hex(), end.Hex()}
|
args := []interface{}{start.Hex(), end.Hex()}
|
||||||
|
|
||||||
logQuery(query, args...)
|
s.logQuery(query, args...)
|
||||||
|
|
||||||
rows, err := s.conn.Query(query, args...)
|
rows, err := s.conn.Query(query, args...)
|
||||||
defer closeRows(rows)
|
defer closeRows(rows)
|
||||||
|
@ -700,7 +707,7 @@ func closeRows(rows *sql.Rows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQL) exec(query string, args ...interface{}) (int64, error) {
|
func (s *SQL) exec(query string, args ...interface{}) (int64, error) {
|
||||||
logQuery(query, args...)
|
s.logQuery(query, args...)
|
||||||
attempt, maxAttempts := 0, 3
|
attempt, maxAttempts := 0, 3
|
||||||
Retry:
|
Retry:
|
||||||
attempt++
|
attempt++
|
||||||
|
|
|
@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(store.NewMemStore())
|
srv := NewServer(store.NewMemStore(), store.NewMemStore())
|
||||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(store.NewMemStore())
|
srv := NewServer(store.NewMemStore(), store.NewMemStore())
|
||||||
srv.Timeout = testTimeout
|
srv.Timeout = testTimeout
|
||||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -190,7 +190,7 @@ func TestServer_PartialUpload(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(st)
|
srv := NewServer(st, st)
|
||||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
Loading…
Reference in a new issue