fixes from nikos review

This commit is contained in:
Alex Grintsvayg 2018-09-20 14:24:30 -04:00
parent 61e83d86de
commit 53d3eea8fb
4 changed files with 31 additions and 35 deletions

View file

@ -74,7 +74,7 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error {
err := execTx(tx, err := execTx(tx,
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
[]interface{}{hash, isStored, length}, hash, isStored, length,
) )
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -153,19 +153,15 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
// Delete will remove the blob from the db // Delete will remove the blob from the db
func (s *SQL) Delete(hash string) error { func (s *SQL) Delete(hash string) error {
args := []interface{}{hash} return withTx(s.conn, func(tx *sql.Tx) error {
err := execTx(tx, "DELETE FROM stream WHERE sd_hash = ?", hash)
query := "DELETE FROM stream WHERE sd_hash = ?"
logQuery(query, args...)
_, err := s.conn.Exec(query, args...)
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
} }
query = "DELETE FROM blob_ WHERE hash = ?" err = execTx(tx, "DELETE FROM blob_ WHERE hash = ?", hash)
logQuery(query, args...)
_, err = s.conn.Exec(query, args...)
return errors.Err(err) return errors.Err(err)
})
} }
// Block will mark a blob as blocked // Block will mark a blob as blocked
@ -267,7 +263,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
// insert stream // insert stream
err = execTx(tx, err = execTx(tx,
"INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)",
[]interface{}{sdBlob.StreamHash, sdHash}, sdBlob.StreamHash, sdHash,
) )
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -287,7 +283,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
err = execTx(tx, err = execTx(tx,
"INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)",
[]interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum}, sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum,
) )
if err != nil { if err != nil {
return errors.Err(err) return errors.Err(err)
@ -412,7 +408,7 @@ func closeRows(rows *sql.Rows) {
} }
} }
func execTx(tx *sql.Tx, query string, args []interface{}) error { func execTx(tx *sql.Tx, query string, args ...interface{}) error {
logQuery(query, args...) logQuery(query, args...)
_, err := tx.Exec(query, args...) _, err := tx.Exec(query, args...)
return errors.Err(err) return errors.Err(err)

View file

@ -52,7 +52,7 @@ func updateBlocklist(b store.Blocklister) {
} }
} }
func blockedSdHashes() (map[string]ValueResp, error) { func blockedSdHashes() (map[string]valOrErr, error) {
resp, err := http.Get(blocklistURL) resp, err := http.Get(blocklistURL)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
@ -78,14 +78,14 @@ func blockedSdHashes() (map[string]ValueResp, error) {
return sdHashesForOutpoints(r.Data.Outpoints) return sdHashesForOutpoints(r.Data.Outpoints)
} }
type ValueResp struct { type valOrErr struct {
Value string Value string
Err error Err error
} }
// sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints // sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints
func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) { func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) {
values := make(map[string]ValueResp) values := make(map[string]valOrErr)
node := wallet.NewNode() node := wallet.NewNode()
err := node.Connect([]string{ err := node.Connect([]string{
@ -100,19 +100,19 @@ func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) {
for _, outpoint := range outpoints { for _, outpoint := range outpoints {
parts := strings.Split(outpoint, ":") parts := strings.Split(outpoint, ":")
if len(parts) != 2 { if len(parts) != 2 {
values[outpoint] = ValueResp{Err: errors.Err("invalid outpoint format")} values[outpoint] = valOrErr{Err: errors.Err("invalid outpoint format")}
continue continue
} }
nout, err := strconv.Atoi(parts[1]) nout, err := strconv.Atoi(parts[1])
if err != nil { if err != nil {
values[outpoint] = ValueResp{Err: errors.Prefix("invalid nout", err)} values[outpoint] = valOrErr{Err: errors.Prefix("invalid nout", err)}
continue continue
} }
resp, err := node.GetClaimsInTx(parts[0]) resp, err := node.GetClaimsInTx(parts[0])
if err != nil { if err != nil {
values[outpoint] = ValueResp{Err: err} values[outpoint] = valOrErr{Err: err}
continue continue
} }
@ -126,23 +126,23 @@ func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) {
break break
} }
if err != nil { if err != nil {
values[outpoint] = ValueResp{Err: err} values[outpoint] = valOrErr{Err: err}
continue continue
} }
claim := &types.Claim{} claim := &types.Claim{}
err = proto.Unmarshal(value, claim) err = proto.Unmarshal(value, claim)
if err != nil { if err != nil {
values[outpoint] = ValueResp{Err: err} values[outpoint] = valOrErr{Err: err}
continue continue
} }
if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash { if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash {
values[outpoint] = ValueResp{Err: errors.Err("source is nil or source type is not lbry_sd_hash")} values[outpoint] = valOrErr{Err: errors.Err("source is nil or source type is not lbry_sd_hash")}
continue continue
} }
values[outpoint] = ValueResp{Value: hex.EncodeToString(claim.GetStream().GetSource().GetSource())} values[outpoint] = valOrErr{Value: hex.EncodeToString(claim.GetStream().GetSource().GetSource())}
} }
return values, nil return values, nil

View file

@ -2,7 +2,9 @@ package reflector
import ( import (
"crypto/rand" "crypto/rand"
"encoding/json"
"io" "io"
"sort"
"strconv" "strconv"
"testing" "testing"
"time" "time"
@ -10,10 +12,6 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"encoding/json"
"sort"
"github.com/lbryio/reflector.go/dht/bits" "github.com/lbryio/reflector.go/dht/bits"
"github.com/phayes/freeport" "github.com/phayes/freeport"
) )

View file

@ -48,10 +48,10 @@ type Node struct {
nextId atomic.Uint32 nextId atomic.Uint32
grp *stop.Group grp *stop.Group
handlersMu sync.RWMutex handlersMu *sync.RWMutex
handlers map[uint32]chan []byte handlers map[uint32]chan []byte
pushHandlersMu sync.RWMutex pushHandlersMu *sync.RWMutex
pushHandlers map[string][]chan []byte pushHandlers map[string][]chan []byte
} }
@ -60,6 +60,8 @@ func NewNode() *Node {
return &Node{ return &Node{
handlers: make(map[uint32]chan []byte), handlers: make(map[uint32]chan []byte),
pushHandlers: make(map[string][]chan []byte), pushHandlers: make(map[string][]chan []byte),
handlersMu: &sync.RWMutex{},
pushHandlersMu: &sync.RWMutex{},
grp: stop.New(), grp: stop.New(),
} }
} }