done, but NEEDS MIGRATION AND TESTING

This commit is contained in:
Alex Grintsvayg 2018-09-11 07:41:29 -04:00
parent 6fd0526376
commit 9fb824790b
9 changed files with 198 additions and 58 deletions

View file

@ -1,8 +1,11 @@
package cmd
import (
"github.com/davecgh/go-spew/spew"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -16,5 +19,43 @@ func init() {
}
func testCmd(cmd *cobra.Command, args []string) {
spew.Dump(reflector.BlockedSdHashes())
db := new(db.SQL)
err := db.Connect(globalConfig.DBConn)
if err != nil {
log.Fatal(err)
}
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db)
values, err := reflector.BlockedSdHashes()
if err != nil {
log.Fatal(err)
}
for _, v := range values {
if v.Err != nil {
continue
}
has, err := db.HasBlob(v.Value)
if err != nil {
log.Error(err)
continue
}
if !has {
continue
}
err = combo.Delete(v.Value)
if err != nil {
log.Error(err)
}
err = db.Block(v.Value)
if err != nil {
log.Error(err)
}
}
}

View file

@ -89,7 +89,10 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
return false, errors.Err("not connected")
}
query := "SELECT EXISTS(SELECT 1 FROM blob_ WHERE hash = ? AND is_stored = ?)"
query := `SELECT EXISTS(SELECT 1
FROM blob_ b
LEFT JOIN blocked bl ON b.hash = bl.hash
WHERE b.hash = ? AND b.is_stored = ? AND bl.hash IS NULL)`
args := []interface{}{hash, true}
logQuery(query, args...)
@ -158,6 +161,32 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
return exists, nil
}
// Delete will remove the blob from the db
func (s *SQL) Delete(hash string) error {
args := []interface{}{hash}
query := "DELETE FROM stream WHERE sd_hash = ?"
logQuery(query, args...)
_, err := s.conn.Exec(query, args...)
if err != nil {
return errors.Err(err)
}
query = "DELETE FROM blob_ WHERE hash = ?"
logQuery(query, args...)
_, err = s.conn.Exec(query, args...)
return errors.Err(err)
}
// Block will mark a blob as blocked
func (s *SQL) Block(hash string) error {
query := "INSERT IGNORE INTO blocked SET hash = ?"
args := []interface{}{hash}
logQuery(query, args...)
_, err := s.conn.Exec(query, args...)
return errors.Err(err)
}
// MissingBlobsForKnownStream returns missing blobs for an existing stream
// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks
// like no blobs are missing
@ -396,6 +425,11 @@ CREATE TABLE stream_blob (
FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE TABLE blocked (
hash char(96) NOT NULL,
PRIMARY KEY (hash)
);
could add UNIQUE KEY (stream_hash, num) to stream_blob ...
*/

View file

@ -25,9 +25,7 @@ type blockListResponse struct {
} `json:"data"`
}
func BlockedSdHashes() (map[string]string, error) {
blocked := make(map[string]string)
func BlockedSdHashes() (map[string]ValueResp, error) {
resp, err := http.Get(blocklistURL)
if err != nil {
return nil, errors.Err(err)
@ -43,70 +41,86 @@ func BlockedSdHashes() (map[string]string, error) {
return nil, errors.Prefix("list_blocked API call", r.Error)
}
for _, outpoint := range r.Data.Outpoints {
sdHash, err := sdHashForOutpoint(outpoint)
if err != nil {
blocked[outpoint] = err.Error()
} else {
blocked[outpoint] = sdHash
}
}
return blocked, nil
return sdHashesForOutpoints(r.Data.Outpoints)
}
type ValueResp struct {
Value string
Err error
}
// sdHashForOutpoint queries wallet server for the sd hash in a given outpoint
func sdHashForOutpoint(outpoint string) (string, error) {
val, err := valueForOutpoint(outpoint)
vals, err := sdHashesForOutpoints([]string{outpoint})
if err != nil {
return "", err
}
return sdHashForValue(val)
val, ok := vals[outpoint]
if !ok {
return "", errors.Err("outpoint not in response")
}
return val.Value, val.Err
}
// decodeValue decodes a protobuf-encoded claim and returns the sd hash
func sdHashForValue(value []byte) (string, error) {
claim := &types.Claim{}
err := proto.Unmarshal(value, claim)
if err != nil {
return "", errors.Err(err)
}
if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash {
return "", errors.Err("source is nil or source type is not lbry_sd_hash")
}
return hex.EncodeToString(claim.GetStream().GetSource().GetSource()), nil
}
// valueForOutpoint queries wallet server for the value of the claim at the given outpoint
func valueForOutpoint(outpoint string) ([]byte, error) {
parts := strings.Split(outpoint, ":")
if len(parts) != 2 {
return nil, errors.Err("invalid outpoint format")
}
nout, err := strconv.Atoi(parts[1])
if err != nil {
return nil, errors.Prefix("invalid nout", err)
}
// sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints
func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) {
values := make(map[string]ValueResp)
node := wallet.NewNode()
err = node.ConnectTCP("victor.lbry.tech:50001")
err := node.ConnectTCP("victor.lbry.tech:50001")
if err != nil {
return nil, err
}
resp, err := node.GetClaimsInTx(parts[0])
if err != nil {
return nil, err
}
for _, tx := range resp.Result {
if tx.Nout == nout {
return hex.DecodeString(tx.Value)
for _, outpoint := range outpoints {
parts := strings.Split(outpoint, ":")
if len(parts) != 2 {
values[outpoint] = ValueResp{Err: errors.Err("invalid outpoint format")}
continue
}
nout, err := strconv.Atoi(parts[1])
if err != nil {
values[outpoint] = ValueResp{Err: errors.Prefix("invalid nout", err)}
continue
}
resp, err := node.GetClaimsInTx(parts[0])
if err != nil {
values[outpoint] = ValueResp{Err: err}
continue
}
var value []byte
for _, tx := range resp.Result {
if tx.Nout != nout {
continue
}
value, err = hex.DecodeString(tx.Value)
break
}
if err != nil {
values[outpoint] = ValueResp{Err: err}
continue
}
claim := &types.Claim{}
err = proto.Unmarshal(value, claim)
if err != nil {
values[outpoint] = ValueResp{Err: err}
continue
}
if claim.GetStream().GetSource().GetSourceType() != types.Source_lbry_sd_hash {
values[outpoint] = ValueResp{Err: errors.Err("source is nil or source type is not lbry_sd_hash")}
continue
}
values[outpoint] = ValueResp{Value: hex.EncodeToString(claim.GetStream().GetSource().GetSource())}
}
return nil, errors.Err("outpoint not found")
return values, nil
}

View file

@ -58,6 +58,15 @@ func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error {
return d.db.AddSDBlob(hash, len(blob), blobContents)
}
func (d *DBBackedS3Store) Delete(hash string) error {
err := d.s3.Delete(hash)
if err != nil {
return err
}
return d.db.Delete(hash)
}
// MissingBlobsForKnownStream returns missing blobs for an existing stream
// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks
// like no blobs are missing

View file

@ -95,3 +95,21 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error {
func (f *FileBlobStore) PutSD(hash string, blob []byte) error {
return f.Put(hash, blob)
}
// Delete deletes the blob from the store
func (f *FileBlobStore) Delete(hash string) error {
err := f.initOnce()
if err != nil {
return err
}
has, err := f.Has(hash)
if err != nil {
return err
}
if !has {
return nil
}
return os.Remove(f.path(hash))
}

View file

@ -41,3 +41,9 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error {
return m.Put(hash, blob)
}
// Delete deletes the blob from the store
func (m *MemoryBlobStore) Delete(hash string) error {
delete(m.blobs, hash)
return nil
}

View file

@ -133,3 +133,19 @@ func (s *S3BlobStore) PutSD(hash string, blob []byte) error {
//Todo - handle missing stream for consistency
return s.Put(hash, blob)
}
func (s *S3BlobStore) Delete(hash string) error {
err := s.initOnce()
if err != nil {
return err
}
log.Debugf("Deleting %s from S3", hash[:8])
_, err = s3.New(s.session).DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(hash),
})
return err
}

View file

@ -8,6 +8,7 @@ type BlobStore interface {
Get(string) ([]byte, error)
Put(string, []byte) error
PutSD(string, []byte) error
Delete(string) error
}
//ErrBlobNotFound is a standard error when a blob is not found in the store.

View file

@ -5,8 +5,9 @@ package wallet
import (
"bufio"
"crypto/tls"
"log"
"net"
log "github.com/sirupsen/logrus"
)
type TCPTransport struct {
@ -44,7 +45,7 @@ func NewSSLTransport(addr string, config *tls.Config) (*TCPTransport, error) {
}
func (t *TCPTransport) SendMessage(body []byte) error {
log.Printf("%s <- %s", t.conn.RemoteAddr(), body)
log.Debugf("%s <- %s", t.conn.RemoteAddr(), body)
_, err := t.conn.Write(body)
return err
}
@ -58,10 +59,10 @@ func (t *TCPTransport) listen() {
line, err := reader.ReadBytes(delim)
if err != nil {
t.errors <- err
log.Printf("error %s", err)
log.Error(err)
break
}
log.Printf("%s -> %s", t.conn.RemoteAddr(), line)
log.Debugf("%s -> %s", t.conn.RemoteAddr(), line)
t.responses <- line
}
}