From 53d3eea8fb5e0024517909d8e2eab63c9acfb06a Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 20 Sep 2018 14:24:30 -0400 Subject: [PATCH] fixes from nikos review --- db/db.go | 26 +++++++++++--------------- reflector/blocklist.go | 22 +++++++++++----------- reflector/server_test.go | 6 ++---- wallet/network.go | 12 +++++++----- 4 files changed, 31 insertions(+), 35 deletions(-) diff --git a/db/db.go b/db/db.go index aa2db28..c47e6e1 100644 --- a/db/db.go +++ b/db/db.go @@ -74,7 +74,7 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error { err := execTx(tx, "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", - []interface{}{hash, isStored, length}, + hash, isStored, length, ) if err != nil { return errors.Err(err) @@ -153,19 +153,15 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { // Delete will remove the blob from the db func (s *SQL) Delete(hash string) error { - args := []interface{}{hash} + return withTx(s.conn, func(tx *sql.Tx) error { + err := execTx(tx, "DELETE FROM stream WHERE sd_hash = ?", hash) + if err != nil { + return errors.Err(err) + } - query := "DELETE FROM stream WHERE sd_hash = ?" - logQuery(query, args...) - _, err := s.conn.Exec(query, args...) - if err != nil { + err = execTx(tx, "DELETE FROM blob_ WHERE hash = ?", hash) return errors.Err(err) - } - - query = "DELETE FROM blob_ WHERE hash = ?" - logQuery(query, args...) - _, err = s.conn.Exec(query, args...) - return errors.Err(err) + }) } // Block will mark a blob as blocked @@ -267,7 +263,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { // insert stream err = execTx(tx, "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", - []interface{}{sdBlob.StreamHash, sdHash}, + sdBlob.StreamHash, sdHash, ) if err != nil { return errors.Err(err) @@ -287,7 +283,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { err = execTx(tx, "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", - []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum}, + sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum, ) if err != nil { return errors.Err(err) @@ -412,7 +408,7 @@ func closeRows(rows *sql.Rows) { } } -func execTx(tx *sql.Tx, query string, args []interface{}) error { +func execTx(tx *sql.Tx, query string, args ...interface{}) error { logQuery(query, args...) _, err := tx.Exec(query, args...) return errors.Err(err) diff --git a/reflector/blocklist.go b/reflector/blocklist.go index 9712828..2e5f829 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -52,7 +52,7 @@ func updateBlocklist(b store.Blocklister) { } } -func blockedSdHashes() (map[string]ValueResp, error) { +func blockedSdHashes() (map[string]valOrErr, error) { resp, err := http.Get(blocklistURL) if err != nil { return nil, errors.Err(err) @@ -78,14 +78,14 @@ func blockedSdHashes() (map[string]ValueResp, error) { return sdHashesForOutpoints(r.Data.Outpoints) } -type ValueResp struct { +type valOrErr struct { Value string Err error } // sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints -func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) { - values := make(map[string]ValueResp) +func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) { + values := make(map[string]valOrErr) node := wallet.NewNode() err := node.Connect([]string{ @@ -100,19 +100,19 @@ func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) { for _, outpoint := range outpoints { parts := strings.Split(outpoint, ":") if len(parts) != 2 { - values[outpoint] = ValueResp{Err: errors.Err("invalid outpoint format")} + values[outpoint] = valOrErr{Err: errors.Err("invalid outpoint format")} continue } nout, err := strconv.Atoi(parts[1]) if err != nil { - values[outpoint] = ValueResp{Err: errors.Prefix("invalid nout", err)} + values[outpoint] = valOrErr{Err: errors.Prefix("invalid nout", err)} continue } resp, err := node.GetClaimsInTx(parts[0]) if err != nil { - values[outpoint] = ValueResp{Err: err} + values[outpoint] = valOrErr{Err: err} continue } @@ -126,23 +126,23 @@ func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) { break } if err != nil { - values[outpoint] = ValueResp{Err: err} + values[outpoint] = valOrErr{Err: err} continue } claim := &types.Claim{} err = proto.Unmarshal(value, claim) if err != nil { - values[outpoint] = ValueResp{Err: err} + values[outpoint] = valOrErr{Err: err} continue } if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash { - values[outpoint] = ValueResp{Err: errors.Err("source is nil or source type is not lbry_sd_hash")} + values[outpoint] = valOrErr{Err: errors.Err("source is nil or source type is not lbry_sd_hash")} continue } - values[outpoint] = ValueResp{Value: hex.EncodeToString(claim.GetStream().GetSource().GetSource())} + values[outpoint] = valOrErr{Value: hex.EncodeToString(claim.GetStream().GetSource().GetSource())} } return values, nil diff --git a/reflector/server_test.go b/reflector/server_test.go index 5a28624..58ef6dc 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -2,7 +2,9 @@ package reflector import ( "crypto/rand" + "encoding/json" "io" + "sort" "strconv" "testing" "time" @@ -10,10 +12,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lbryio/reflector.go/store" - "encoding/json" - - "sort" - "github.com/lbryio/reflector.go/dht/bits" "github.com/phayes/freeport" ) diff --git a/wallet/network.go b/wallet/network.go index e35257b..32e514c 100644 --- a/wallet/network.go +++ b/wallet/network.go @@ -48,19 +48,21 @@ type Node struct { nextId atomic.Uint32 grp *stop.Group - handlersMu sync.RWMutex + handlersMu *sync.RWMutex handlers map[uint32]chan []byte - pushHandlersMu sync.RWMutex + pushHandlersMu *sync.RWMutex pushHandlers map[string][]chan []byte } // NewNode creates a new node. func NewNode() *Node { return &Node{ - handlers: make(map[uint32]chan []byte), - pushHandlers: make(map[string][]chan []byte), - grp: stop.New(), + handlers: make(map[uint32]chan []byte), + pushHandlers: make(map[string][]chan []byte), + handlersMu: &sync.RWMutex{}, + pushHandlersMu: &sync.RWMutex{}, + grp: stop.New(), } }