actually done now

This commit is contained in:
Alex Grintsvayg 2018-09-20 11:24:36 -04:00
parent 9fb824790b
commit 61e83d86de
13 changed files with 503 additions and 292 deletions

74
Gopkg.lock generated
View file

@ -3,7 +3,7 @@
[[projects]]
branch = "master"
digest = "1:d64110a78451e373c5a952d2625323dbbe3bfe41c67f9652ea9668a6ceb4f645"
digest = "1:354e62d5acb9af138e13ec842f78a846d214a8d4a9f80e578698f1f1565e2ef8"
name = "github.com/armon/go-metrics"
packages = ["."]
pruneopts = ""
@ -11,7 +11,7 @@
[[projects]]
branch = "master"
digest = "1:c0b6dbbb56a745020d5939bdde2197241a1c6109f226cca57b16f46916be5e94"
digest = "1:b1b9627af19ee54d3ed6b069375f0e91baa4a25267cf3b684e80fdefb17f4719"
name = "github.com/aws/aws-sdk-go"
packages = [
"aws",
@ -53,14 +53,14 @@
[[projects]]
branch = "master"
digest = "1:e250643be8120824556f39df6ef128fc2be490fc96e0cb64b1a8ecf96bbe3ce6"
digest = "1:56b87c786a316d6e9b9c7ba8f3dd64e3199ca3b33a55cc596c633023bed20264"
name = "github.com/btcsuite/btcutil"
packages = ["base58"]
pruneopts = ""
revision = "ab6388e0c60ae4834a1f57511e20c17b5f78be4b"
[[projects]]
digest = "1:0a39ec8bf5629610a4bc7873a92039ee509246da3cef1a0ea60f1ed7e5f9cea5"
digest = "1:56c130d885a4aacae1dd9c7b71cfe39912c7ebc1ff7d2b46083c8812996dc43b"
name = "github.com/davecgh/go-spew"
packages = ["spew"]
pruneopts = ""
@ -76,7 +76,7 @@
version = "v1.0.1"
[[projects]]
digest = "1:d19c78214e03e297e9e30d2eb11892f731358b2951f2a5c7374658a156373e4c"
digest = "1:858b7fe7b0f4bc7ef9953926828f2816ea52d01a88d72d1c45bc8c108f23c356"
name = "github.com/go-ini/ini"
packages = ["."]
pruneopts = ""
@ -85,12 +85,20 @@
[[projects]]
branch = "master"
digest = "1:27b11ca1ad214ead955ff5480e8575e74c5df4e4dc02b04256a8d92131e1d3ad"
digest = "1:7150b23ba935d63f7b930d6c5ff20b52649ba623d62e0344971c822615fe57a3"
name = "github.com/go-sql-driver/mysql"
packages = ["."]
pruneopts = ""
revision = "99ff426eb706cffe92ff3d058e168b278cabf7c7"
[[projects]]
digest = "1:3dd078fda7500c341bc26cfbc6c6a34614f295a2457149fc1045cab767cbcf18"
name = "github.com/golang/protobuf"
packages = ["proto"]
pruneopts = ""
revision = "aa810b61a9c79d51363740d207bb46cf8e620ed5"
version = "v1.2.0"
[[projects]]
digest = "1:dbbeb8ddb0be949954c8157ee8439c2adfd8dc1c9510eb44a6e58cb68c3dce28"
name = "github.com/gorilla/context"
@ -108,7 +116,7 @@
version = "v1.6.2"
[[projects]]
digest = "1:b4817bdb0da3054166de058111943ac58b315aead2fed4ee838625a4e304f74c"
digest = "1:91aaeb45b3c10cc9cb68d1450cbc8ac77d0a677cf34a8ed3d4ef4dacb9df8a50"
name = "github.com/gorilla/rpc"
packages = [
"v2",
@ -119,7 +127,7 @@
version = "v1.1.0"
[[projects]]
digest = "1:fe1b4d4cbe48c0d55507c55f8663aa4185576cc58fa0c8be03bb8f19dfe17a9c"
digest = "1:64d212c703a2b94054be0ce470303286b177ad260b2f89a307e3d1bb6c073ef6"
name = "github.com/gorilla/websocket"
packages = ["."]
pruneopts = ""
@ -144,7 +152,7 @@
[[projects]]
branch = "master"
digest = "1:6a611e691e739173805cb54019b5c39bb9d46455526dff31e0e6fe3aaca52776"
digest = "1:6396690228a7560bf9247cb90e5ae9c797bd630b01e7d2acab430bbca9a1ecb3"
name = "github.com/hashicorp/go-msgpack"
packages = ["codec"]
pruneopts = ""
@ -152,7 +160,7 @@
[[projects]]
branch = "master"
digest = "1:deaebb4a98ca748bbad7eb653f3a675749500020823a086448ffcd7ba6b8b02d"
digest = "1:0b5ca7d18e4ded1e4dacbb37ff027cb40a80c0fed969e4e03cf7aff129bc1b44"
name = "github.com/hashicorp/go-multierror"
packages = ["."]
pruneopts = ""
@ -160,7 +168,7 @@
[[projects]]
branch = "master"
digest = "1:74f54e6ef2339f1de1e8c4b6674442118bd89e619b2fbd949ef2337330067994"
digest = "1:fd8ec2359315965bb6b84fd8e45cd5e8b58b80d8430dc96c8c5dfce46d30dbfc"
name = "github.com/hashicorp/go-sockaddr"
packages = ["."]
pruneopts = ""
@ -175,7 +183,7 @@
revision = "0fb14efe8c47ae851c0034ed7a448854d3d34cf3"
[[projects]]
digest = "1:d7ce65372f495908f80fc1f80f4dab5d763d9a1de544abd95aa719e4262d0dd5"
digest = "1:d2c45a353b65012162c7ca22c39b1b0bd06d39362fb375cf42b4e48e1104bfc6"
name = "github.com/hashicorp/memberlist"
packages = ["."]
pruneopts = ""
@ -184,7 +192,7 @@
[[projects]]
branch = "master"
digest = "1:4fd01ac3766b886665cfd335cc63819ec4e4538dcc1180c05d6edc089619962c"
digest = "1:7b8e4a60bfdacc2a79ba4a4ef21b2e86e98fb1dc99d816179e0b4aee75106051"
name = "github.com/hashicorp/serf"
packages = [
"coordinate",
@ -195,7 +203,7 @@
[[projects]]
branch = "master"
digest = "1:0d37c42156531a07a84812a47c27610947b849710ffab6f62be6e98c5112c140"
digest = "1:32c49a8cbcb20989c4fc0825f792cb1ea079af601c11b5ed0a92a48433171db3"
name = "github.com/inconshreveable/go-update"
packages = [
".",
@ -214,7 +222,7 @@
version = "v1.0"
[[projects]]
digest = "1:4f767a115bc8e08576f6d38ab73c376fc1b1cd3bb5041171c9e8668cc7739b52"
digest = "1:6f49eae0c1e5dab1dafafee34b207aeb7a42303105960944828c2079b92fc88e"
name = "github.com/jmespath/go-jmespath"
packages = ["."]
pruneopts = ""
@ -230,7 +238,7 @@
[[projects]]
branch = "master"
digest = "1:132aae3fa5ad407b53f57cf6ebe274e414ec22d1700899023319931fa90b63e2"
digest = "1:7abe0d83ffb4c20fce461c314b1dc858cba274580cf1508f698b5f9fd9e1cde9"
name = "github.com/johntdyer/slackrus"
packages = ["."]
pruneopts = ""
@ -246,7 +254,7 @@
[[projects]]
branch = "master"
digest = "1:76fd7507e6014c598a01f1b3d558774d2a3114c438403bc98123870d2aecec62"
digest = "1:3e990fec1701f7cd3a301cb0fa824f65e35a37c224ff17f4d842720651d2f2fb"
name = "github.com/lbryio/lbry.go"
packages = [
"crypto",
@ -259,6 +267,14 @@
pruneopts = ""
revision = "e2c96944fc485d3ab5e164da78f8439a94c5aa85"
[[projects]]
branch = "master"
digest = "1:dbd7fd543a88da4f81fbb849175b400d69b04f1f82de34c8c2efdc5626b80999"
name = "github.com/lbryio/types"
packages = ["go"]
pruneopts = ""
revision = "0a913ba650dd7d72e2a008b86dac117be3d5f075"
[[projects]]
branch = "master"
digest = "1:cabf2bf5e49edfe0c34cb9c6a256f2a99e6cc8c5e660855c8f3dafe1f81d5dcd"
@ -268,7 +284,7 @@
revision = "b7abd7672df533e627eddbf3a5a529786e8bda7f"
[[projects]]
digest = "1:f0bad0fece0fb73c6ea249c18d8e80ffbe86be0457715b04463068f04686cf39"
digest = "1:4c8d8358c45ba11ab7bb15df749d4df8664ff1582daead28bae58cf8cbe49890"
name = "github.com/miekg/dns"
packages = ["."]
pruneopts = ""
@ -276,7 +292,7 @@
version = "v1.0.8"
[[projects]]
digest = "1:ba3b2eb0ae6fd3deac4386c02fe9d2279c9520738eb9db2f0667e74d5c7a0a61"
digest = "1:e6352ff4bd34c601567ad5e274837275f08e2a933e2688354cf5d44595c13ef9"
name = "github.com/nlopes/slack"
packages = ["."]
pruneopts = ""
@ -285,7 +301,7 @@
[[projects]]
branch = "master"
digest = "1:af967afd3cbc6b0145937f4dcab78bcd93e7b2f2b618fb2bcaf7069ad5c638fa"
digest = "1:d38c630298ac75e214f3caa5c240ea2923c7a089824d175ba4107d0650d56579"
name = "github.com/phayes/freeport"
packages = ["."]
pruneopts = ""
@ -309,7 +325,7 @@
[[projects]]
branch = "master"
digest = "1:1747c026a603e3c9f33f238e1d1390df2c8f48876b6bcb7a9c52c7b479e040f4"
digest = "1:56de39853758a4b6053a3f71e527305bbed11a0d876156e32e8cc7180d36198b"
name = "github.com/sirupsen/logrus"
packages = ["."]
pruneopts = ""
@ -325,7 +341,7 @@
[[projects]]
branch = "master"
digest = "1:b5212c335a490d958a6b1b5b48901b46e682b82fa9af3a238fa88df6eaa60873"
digest = "1:c8f6919ab9f140506fd4ad3f4a9c9c2af9ee7921e190af0c67b2fca2f903083c"
name = "github.com/spf13/cobra"
packages = ["."]
pruneopts = ""
@ -341,7 +357,7 @@
[[projects]]
branch = "master"
digest = "1:f05efcac20bea32e1fcefde9a3cbefb07e02053666c4a67681ad18c8efc682d3"
digest = "1:b7ef38166f9ee44c54ef992b2754950f73fa09daf30355bea7aa510f224c38a6"
name = "github.com/uber-go/atomic"
packages = ["."]
pruneopts = ""
@ -349,7 +365,7 @@
[[projects]]
branch = "master"
digest = "1:16b935c128f178647036048862a21e8bfd66d1e83fb19787a8b356bdcf0de899"
digest = "1:53c4b75f22ea7757dea07eae380ea42de547ae6865a5e3b41866754a8a8219c9"
name = "golang.org/x/crypto"
packages = [
"ed25519",
@ -363,7 +379,7 @@
[[projects]]
branch = "master"
digest = "1:15f2fc8cc79d90b0d4d712f04bd3eb3a3856ff3dd6b610a1425113ead3501610"
digest = "1:9f170ebb5ac75debb7e958e0388545441cc77de4d131a0c170530e948f3e857e"
name = "golang.org/x/net"
packages = [
"bpf",
@ -378,7 +394,7 @@
[[projects]]
branch = "master"
digest = "1:e8f649ecfae7835a0a27ef39fd2180f6d3c12bc422e2ae55cd611f0b283b3e6e"
digest = "1:309d0f514b3f0dd143089ff4ab91c894d3e3f7e771c89b59d4b015b955cbaa5c"
name = "golang.org/x/sys"
packages = [
"unix",
@ -396,7 +412,7 @@
revision = "fbb02b2291d28baffd63558aa44b4b56f178d650"
[[projects]]
digest = "1:eede11c81b63c8f6fd06ef24ba0a640dc077196ec9b7a58ecde03c82eee2f151"
digest = "1:c1771ca6060335f9768dff6558108bc5ef6c58506821ad43377ee23ff059e472"
name = "google.golang.org/appengine"
packages = ["cloudsql"]
pruneopts = ""
@ -404,7 +420,7 @@
version = "v1.1.0"
[[projects]]
digest = "1:05eca53b271663de74078b5484b1995a8d56668a51434a698dc5d0863035d575"
digest = "1:f771bf87a3253de520c2af6fb6e75314dce0fedc0b30b208134fe502932bb15d"
name = "gopkg.in/nullbio/null.v6"
packages = ["convert"]
pruneopts = ""
@ -423,6 +439,7 @@
"github.com/aws/aws-sdk-go/service/s3/s3manager",
"github.com/davecgh/go-spew/spew",
"github.com/go-sql-driver/mysql",
"github.com/golang/protobuf/proto",
"github.com/gorilla/mux",
"github.com/gorilla/rpc/v2",
"github.com/gorilla/rpc/v2/json",
@ -435,6 +452,7 @@
"github.com/lbryio/lbry.go/querytools",
"github.com/lbryio/lbry.go/stop",
"github.com/lbryio/lbry.go/util",
"github.com/lbryio/types/go",
"github.com/lyoshenka/bencode",
"github.com/phayes/freeport",
"github.com/sebdah/goldie",

View file

@ -41,3 +41,7 @@
[[constraint]]
branch = "master"
name = "github.com/inconshreveable/go-update"
[[constraint]]
branch = "master"
name = "github.com/lbryio/types"

View file

@ -1,7 +1,6 @@
package cmd
import (
"log"
"net"
"os"
"os/signal"
@ -12,6 +11,7 @@ import (
"github.com/lbryio/reflector.go/dht"
"github.com/lbryio/reflector.go/dht/bits"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

View file

@ -42,6 +42,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
reflectorServer.StatLogger = log.StandardLogger()
reflectorServer.StatReportFrequency = 1 * time.Hour
}
reflectorServer.EnableBlocklist = true
err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort))
if err != nil {

View file

@ -1,10 +1,6 @@
package cmd
import (
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -19,43 +15,5 @@ func init() {
}
func testCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL)
err := db.Connect(globalConfig.DBConn)
if err != nil {
log.Fatal(err)
}
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db)
values, err := reflector.BlockedSdHashes()
if err != nil {
log.Fatal(err)
}
for _, v := range values {
if v.Err != nil {
continue
}
has, err := db.HasBlob(v.Value)
if err != nil {
log.Error(err)
continue
}
if !has {
continue
}
err = combo.Delete(v.Value)
if err != nil {
log.Error(err)
}
err = db.Block(v.Value)
if err != nil {
log.Error(err)
}
}
log.Println("test :-)")
}

View file

@ -85,24 +85,11 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error {
// HasBlob checks if the database contains the blob information.
func (s *SQL) HasBlob(hash string) (bool, error) {
if s.conn == nil {
return false, errors.Err("not connected")
exists, err := s.HasBlobs([]string{hash})
if err != nil {
return false, err
}
query := `SELECT EXISTS(SELECT 1
FROM blob_ b
LEFT JOIN blocked bl ON b.hash = bl.hash
WHERE b.hash = ? AND b.is_stored = ? AND bl.hash IS NULL)`
args := []interface{}{hash, true}
logQuery(query, args...)
row := s.conn.QueryRow(query, args...)
exists := false
err := row.Scan(&exists)
return exists, errors.Err(err)
return exists[hash], nil
}
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
@ -133,29 +120,32 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
logQuery(query, args...)
rows, err := s.conn.Query(query, args...)
if err != nil {
closeRows(rows)
return exists, err
}
for rows.Next() {
err := rows.Scan(&hash)
err := func() error {
rows, err := s.conn.Query(query, args...)
if err != nil {
closeRows(rows)
return exists, err
return errors.Err(err)
}
exists[hash] = true
}
defer closeRows(rows)
err = rows.Err()
for rows.Next() {
err := rows.Scan(&hash)
if err != nil {
return errors.Err(err)
}
exists[hash] = true
}
err = rows.Err()
if err != nil {
return errors.Err(err)
}
doneIndex += len(batch)
return nil
}()
if err != nil {
closeRows(rows)
return exists, err
return nil, err
}
closeRows(rows)
doneIndex += len(batch)
}
return exists, nil
@ -187,6 +177,35 @@ func (s *SQL) Block(hash string) error {
return errors.Err(err)
}
// GetBlocked will return a list of blocked hashes
func (s *SQL) GetBlocked() (map[string]bool, error) {
query := "SELECT hash FROM blocked"
logQuery(query)
rows, err := s.conn.Query(query)
if err != nil {
return nil, errors.Err(err)
}
defer closeRows(rows)
blocked := make(map[string]bool)
var hash string
for rows.Next() {
err := rows.Scan(&hash)
if err != nil {
return nil, errors.Err(err)
}
blocked[hash] = true
}
err = rows.Err()
if err != nil {
return nil, errors.Err(err)
}
return blocked, nil
}
// MissingBlobsForKnownStream returns missing blobs for an existing stream
// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks
// like no blobs are missing

View file

@ -6,33 +6,67 @@ import (
"net/http"
"strconv"
"strings"
"time"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/reflector.go/wallet"
"github.com/lbryio/lbry.go/errors"
types "github.com/lbryio/types/go"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
)
const blocklistURL = "https://api.lbry.io/file/list_blocked"
type blockListResponse struct {
Success bool `json:"success"`
Error string `json:"error"`
Data struct {
Outpoints []string `json:"outpoints"`
} `json:"data"`
func (s *Server) enableBlocklist(b store.Blocklister) {
updateBlocklist(b)
t := time.NewTicker(12 * time.Hour)
for {
select {
case <-s.grp.Ch():
return
case <-t.C:
updateBlocklist(b)
}
}
}
func BlockedSdHashes() (map[string]ValueResp, error) {
func updateBlocklist(b store.Blocklister) {
values, err := blockedSdHashes()
if err != nil {
log.Error(err)
return
}
for _, v := range values {
if v.Err != nil {
continue
}
err = b.Block(v.Value)
if err != nil {
log.Error(err)
}
}
}
func blockedSdHashes() (map[string]ValueResp, error) {
resp, err := http.Get(blocklistURL)
if err != nil {
return nil, errors.Err(err)
}
defer resp.Body.Close()
var r blockListResponse
var r struct {
Success bool `json:"success"`
Error string `json:"error"`
Data struct {
Outpoints []string `json:"outpoints"`
} `json:"data"`
}
if err = json.NewDecoder(resp.Body).Decode(&r); err != nil {
return nil, errors.Err(err)
}
@ -49,27 +83,16 @@ type ValueResp struct {
Err error
}
// sdHashForOutpoint queries wallet server for the sd hash in a given outpoint
func sdHashForOutpoint(outpoint string) (string, error) {
vals, err := sdHashesForOutpoints([]string{outpoint})
if err != nil {
return "", err
}
val, ok := vals[outpoint]
if !ok {
return "", errors.Err("outpoint not in response")
}
return val.Value, val.Err
}
// sdHashesForOutpoints queries wallet server for the sd hashes in a given outpoints
func sdHashesForOutpoints(outpoints []string) (map[string]ValueResp, error) {
values := make(map[string]ValueResp)
node := wallet.NewNode()
err := node.ConnectTCP("victor.lbry.tech:50001")
err := node.Connect([]string{
"victor.lbry.tech:50001",
//"lbryumx1.lbry.io:50001", // cant use real servers until victor pushes bugfix
//"lbryumx2.lbry.io:50001",
}, nil)
if err != nil {
return nil, err
}

View file

@ -37,6 +37,8 @@ type Server struct {
StatLogger *log.Logger // logger to log stats
StatReportFrequency time.Duration // how often to log stats
EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled
store store.BlobStore
grp *stop.Group
stats *stats
@ -90,6 +92,19 @@ func (s *Server) Start(address string) error {
s.stats.Start()
}
if s.EnableBlocklist {
if b, ok := s.store.(store.Blocklister); ok {
s.grp.Add(1)
go func() {
s.enableBlocklist(b)
s.grp.Done()
}()
} else {
//s.Shutdown()
return errors.Err("blocklist is enabled but blob store does not support blocklisting")
}
}
return nil
}
@ -174,39 +189,46 @@ func (s *Server) doError(conn net.Conn, err error) error {
}
func (s *Server) receiveBlob(conn net.Conn) error {
var err error
blobSize, blobHash, isSdBlob, err := s.readBlobRequest(conn)
if err != nil {
return err
}
blobExists, err := s.store.Has(blobHash)
if err != nil {
return err
var wantsBlob bool
if bl, ok := s.store.(store.Blocklister); ok {
wantsBlob, err = bl.Wants(blobHash)
if err != nil {
return err
}
} else {
blobExists, err := s.store.Has(blobHash)
if err != nil {
return err
}
wantsBlob = !blobExists
}
var neededBlobs []string
if isSdBlob && blobExists {
if fsc, ok := s.store.(neededBlobChecker); ok {
neededBlobs, err = fsc.MissingBlobsForKnownStream(blobHash)
if isSdBlob && !wantsBlob {
if nbc, ok := s.store.(neededBlobChecker); ok {
neededBlobs, err = nbc.MissingBlobsForKnownStream(blobHash)
if err != nil {
return err
}
} else {
// if we can't confirm that we have the full stream, we have to say that the sd blob is
// missing. if we say we have it, they wont try to send any content blobs
blobExists = false
wantsBlob = true
}
}
err = s.sendBlobResponse(conn, blobExists, isSdBlob, neededBlobs)
err = s.sendBlobResponse(conn, wantsBlob, isSdBlob, neededBlobs)
if err != nil {
return err
}
if blobExists {
if !wantsBlob {
return nil
}
@ -296,14 +318,14 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) {
return blobSize, blobHash, isSdBlob, nil
}
func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool, neededBlobs []string) error {
func (s *Server) sendBlobResponse(conn net.Conn, shouldSendBlob, isSdBlob bool, neededBlobs []string) error {
var response []byte
var err error
if isSdBlob {
response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: !blobExists, NeededBlobs: neededBlobs})
response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: shouldSendBlob, NeededBlobs: neededBlobs})
} else {
response, err = json.Marshal(sendBlobResponse{SendBlob: !blobExists})
response, err = json.Marshal(sendBlobResponse{SendBlob: shouldSendBlob})
}
if err != nil {
return err

View file

@ -2,15 +2,19 @@ package store
import (
"encoding/json"
"sync"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/reflector.go/db"
log "github.com/sirupsen/logrus"
)
// DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored.
type DBBackedS3Store struct {
s3 *S3BlobStore
db *db.SQL
s3 *S3BlobStore
db *db.SQL
blockedMu sync.RWMutex
blocked map[string]bool
}
// NewDBBackedS3Store returns an initialized store pointer.
@ -67,9 +71,98 @@ func (d *DBBackedS3Store) Delete(hash string) error {
return d.db.Delete(hash)
}
// Block deletes the blob and prevents it from being uploaded in the future
func (d *DBBackedS3Store) Block(hash string) error {
if blocked, err := d.isBlocked(hash); blocked || err != nil {
return err
}
log.Debugf("blocking %s", hash)
err := d.db.Block(hash)
if err != nil {
return err
}
has, err := d.db.HasBlob(hash)
if err != nil {
return err
}
if has {
err = d.s3.Delete(hash)
if err != nil {
return err
}
err = d.db.Delete(hash)
if err != nil {
return err
}
}
return d.markBlocked(hash)
}
// Wants returns false if the hash exists or is blocked, true otherwise
func (d *DBBackedS3Store) Wants(hash string) (bool, error) {
has, err := d.Has(hash)
if has || err != nil {
return false, err
}
blocked, err := d.isBlocked(hash)
return !blocked, err
}
// MissingBlobsForKnownStream returns missing blobs for an existing stream
// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks
// like no blobs are missing
func (d *DBBackedS3Store) MissingBlobsForKnownStream(sdHash string) ([]string, error) {
return d.db.MissingBlobsForKnownStream(sdHash)
}
func (d *DBBackedS3Store) markBlocked(hash string) error {
err := d.initBlocked()
if err != nil {
return err
}
d.blockedMu.Lock()
defer d.blockedMu.Unlock()
d.blocked[hash] = true
return nil
}
func (d *DBBackedS3Store) isBlocked(hash string) (bool, error) {
err := d.initBlocked()
if err != nil {
return false, err
}
d.blockedMu.RLock()
defer d.blockedMu.RUnlock()
return d.blocked[hash], nil
}
func (d *DBBackedS3Store) initBlocked() error {
// first check without blocking since this is the most likely scenario
if d.blocked != nil {
return nil
}
d.blockedMu.Lock()
defer d.blockedMu.Unlock()
// check again in case of race condition
if d.blocked != nil {
return nil
}
var err error
d.blocked, err = d.db.GetBlocked()
return err
}

View file

@ -4,11 +4,23 @@ import "github.com/lbryio/lbry.go/errors"
// BlobStore is an interface with methods for consistently handling blob storage.
type BlobStore interface {
Has(string) (bool, error)
Get(string) ([]byte, error)
Put(string, []byte) error
PutSD(string, []byte) error
Delete(string) error
// Does blob exist in the store
Has(hash string) (bool, error)
// Get the blob from the store
Get(hase string) ([]byte, error)
// Put the blob into the store
Put(hash string, blob []byte) error
// Put an SD blob into the store
PutSD(hash string, blob []byte) error
// Delete the blob from the store
Delete(hash string) error
}
type Blocklister interface {
// Block deletes the blob and prevents it from being uploaded in the future
Block(hash string) error
// Wants returns false if the hash exists or is blocked, true otherwise
Wants(hash string) (bool, error)
}
//ErrBlobNotFound is a standard error when a blob is not found in the store.

View file

@ -7,8 +7,14 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"math/rand"
"net"
"sync"
"time"
"github.com/lbryio/lbry.go/stop"
log "github.com/sirupsen/logrus"
"github.com/uber-go/atomic"
)
const (
@ -19,16 +25,11 @@ const (
var (
ErrNotImplemented = errors.New("not implemented")
ErrNodeConnected = errors.New("node already connected")
ErrConnectFailed = errors.New("failed to connect")
)
type Transport interface {
SendMessage([]byte) error
Responses() <-chan []byte
Errors() <-chan error
}
type respMetadata struct {
Id int `json:"id"`
type response struct {
Id uint32 `json:"id"`
Method string `json:"method"`
Error struct {
Code int `json:"code"`
@ -37,90 +38,125 @@ type respMetadata struct {
}
type request struct {
Id int `json:"id"`
Id uint32 `json:"id"`
Method string `json:"method"`
Params []string `json:"params"`
}
type Node struct {
Address string
transport *TCPTransport
nextId atomic.Uint32
grp *stop.Group
transport Transport
handlers map[int]chan []byte
handlersLock sync.RWMutex
handlersMu sync.RWMutex
handlers map[uint32]chan []byte
pushHandlers map[string][]chan []byte
pushHandlersLock sync.RWMutex
nextId int
pushHandlersMu sync.RWMutex
pushHandlers map[string][]chan []byte
}
// NewNode creates a new node.
func NewNode() *Node {
n := &Node{
handlers: make(map[int]chan []byte),
return &Node{
handlers: make(map[uint32]chan []byte),
pushHandlers: make(map[string][]chan []byte),
grp: stop.New(),
}
return n
}
// ConnectTCP creates a new TCP connection to the specified address.
func (n *Node) ConnectTCP(addr string) error {
// Connect creates a new connection to the specified address.
func (n *Node) Connect(addrs []string, config *tls.Config) error {
if n.transport != nil {
return ErrNodeConnected
}
n.Address = addr
transport, err := NewTCPTransport(addr)
if err != nil {
// shuffle addresses for load balancing
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
var err error
for _, addr := range addrs {
n.transport, err = NewTransport(addr, config)
if err == nil {
break
}
if e, ok := err.(*net.OpError); ok && e.Err.Error() == "no such host" {
// net.errNoSuchHost is not exported, so we have to string-match
continue
}
return err
}
n.transport = transport
go n.listen()
if n.transport == nil {
return ErrConnectFailed
}
log.Debugf("wallet connected to %s", n.transport.conn.RemoteAddr())
n.grp.Add(1)
go func() {
defer n.grp.Done()
<-n.grp.Ch()
n.transport.Shutdown()
}()
n.grp.Add(1)
go func() {
defer n.grp.Done()
n.handleErrors()
}()
n.grp.Add(1)
go func() {
defer n.grp.Done()
n.listen()
}()
return nil
}
// ConnectSLL creates a new SLL connection to the specified address.
func (n *Node) ConnectSSL(addr string, config *tls.Config) error {
if n.transport != nil {
return ErrNodeConnected
func (n *Node) Shutdown() {
n.grp.StopAndWait()
}
func (n *Node) handleErrors() {
for {
select {
case <-n.grp.Ch():
return
case err := <-n.transport.Errors():
n.err(err)
}
}
n.Address = addr
transport, err := NewSSLTransport(addr, config)
if err != nil {
return err
}
n.transport = transport
go n.listen()
return nil
}
// err handles errors produced by the foreign node.
func (n *Node) err(err error) {
// TODO: Better error handling.
log.Fatal(err)
log.Error(err)
}
// listen processes messages from the server.
func (n *Node) listen() {
for {
select {
case err := <-n.transport.Errors():
n.err(err)
case <-n.grp.Ch():
return
case bytes := <-n.transport.Responses():
msg := &respMetadata{}
msg := &response{}
if err := json.Unmarshal(bytes, msg); err != nil {
n.err(err)
return
continue
}
if len(msg.Error.Message) > 0 {
n.err(fmt.Errorf("error from server: %#v", msg.Error.Message))
return
continue
}
if len(msg.Method) > 0 {
n.pushHandlersLock.RLock()
n.pushHandlersMu.RLock()
handlers := n.pushHandlers[msg.Method]
n.pushHandlersLock.RUnlock()
n.pushHandlersMu.RUnlock()
for _, handler := range handlers {
select {
@ -130,10 +166,9 @@ func (n *Node) listen() {
}
}
n.handlersLock.RLock()
n.handlersMu.RLock()
c, ok := n.handlers[msg.Id]
n.handlersLock.RUnlock()
n.handlersMu.RUnlock()
if ok {
c <- bytes
}
@ -144,8 +179,8 @@ func (n *Node) listen() {
// listenPush returns a channel of messages matching the method.
func (n *Node) listenPush(method string) <-chan []byte {
c := make(chan []byte, 1)
n.pushHandlersLock.Lock()
defer n.pushHandlersLock.Unlock()
n.pushHandlersMu.Lock()
defer n.pushHandlersMu.Unlock()
n.pushHandlers[method] = append(n.pushHandlers[method], c)
return c
}
@ -153,34 +188,34 @@ func (n *Node) listenPush(method string) <-chan []byte {
// request makes a request to the server and unmarshals the response into v.
func (n *Node) request(method string, params []string, v interface{}) error {
msg := request{
Id: n.nextId,
Id: n.nextId.Load(),
Method: method,
Params: params,
}
n.nextId++
n.nextId.Inc()
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
bytes = append(bytes, delim)
if err := n.transport.SendMessage(bytes); err != nil {
bytes = append(bytes, delimiter)
err = n.transport.Send(bytes)
if err != nil {
return err
}
c := make(chan []byte, 1)
n.handlersLock.Lock()
n.handlersMu.Lock()
n.handlers[msg.Id] = c
n.handlersLock.Unlock()
n.handlersMu.Unlock()
resp := <-c
n.handlersLock.Lock()
defer n.handlersLock.Unlock()
n.handlersMu.Lock()
delete(n.handlers, msg.Id)
n.handlersMu.Unlock()
if err := json.Unmarshal(resp, v); err != nil {
return nil
}
return nil
return json.Unmarshal(resp, v)
}

View file

@ -1,75 +0,0 @@
package wallet
// copied from https://github.com/d4l3k/go-electrum
import (
"bufio"
"crypto/tls"
"net"
log "github.com/sirupsen/logrus"
)
type TCPTransport struct {
conn net.Conn
responses chan []byte
errors chan error
}
func NewTCPTransport(addr string) (*TCPTransport, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
t := &TCPTransport{
conn: conn,
responses: make(chan []byte),
errors: make(chan error),
}
go t.listen()
return t, nil
}
func NewSSLTransport(addr string, config *tls.Config) (*TCPTransport, error) {
conn, err := tls.Dial("tcp", addr, config)
if err != nil {
return nil, err
}
t := &TCPTransport{
conn: conn,
responses: make(chan []byte),
errors: make(chan error),
}
go t.listen()
return t, nil
}
func (t *TCPTransport) SendMessage(body []byte) error {
log.Debugf("%s <- %s", t.conn.RemoteAddr(), body)
_, err := t.conn.Write(body)
return err
}
const delim = byte('\n')
func (t *TCPTransport) listen() {
defer t.conn.Close()
reader := bufio.NewReader(t.conn)
for {
line, err := reader.ReadBytes(delim)
if err != nil {
t.errors <- err
log.Error(err)
break
}
log.Debugf("%s -> %s", t.conn.RemoteAddr(), line)
t.responses <- line
}
}
func (t *TCPTransport) Responses() <-chan []byte {
return t.responses
}
func (t *TCPTransport) Errors() <-chan error {
return t.errors
}

101
wallet/transport.go Normal file
View file

@ -0,0 +1,101 @@
package wallet
// copied from https://github.com/d4l3k/go-electrum
import (
"bufio"
"crypto/tls"
"net"
"time"
"github.com/lbryio/lbry.go/stop"
log "github.com/sirupsen/logrus"
)
type TCPTransport struct {
conn net.Conn
responses chan []byte
errors chan error
grp *stop.Group
}
func NewTransport(addr string, config *tls.Config) (*TCPTransport, error) {
var conn net.Conn
var err error
timeout := 5 * time.Second
if config != nil {
conn, err = tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", addr, config)
} else {
conn, err = net.DialTimeout("tcp", addr, timeout)
}
if err != nil {
return nil, err
}
t := &TCPTransport{
conn: conn,
responses: make(chan []byte),
errors: make(chan error),
grp: stop.New(),
}
t.grp.Add(1)
go func() {
defer t.grp.Done()
<-t.grp.Ch()
t.close()
}()
t.grp.Add(1)
go func() {
t.grp.Done()
t.listen()
}()
return t, nil
}
const delimiter = byte('\n')
func (t *TCPTransport) listen() {
reader := bufio.NewReader(t.conn)
for {
line, err := reader.ReadBytes(delimiter)
if err != nil {
t.error(err)
return
}
log.Debugf("%s -> %s", t.conn.RemoteAddr(), line)
t.responses <- line
}
}
func (t *TCPTransport) Send(body []byte) error {
log.Debugf("%s <- %s", t.conn.RemoteAddr(), body)
_, err := t.conn.Write(body)
return err
}
func (t *TCPTransport) error(err error) {
select {
case t.errors <- err:
default:
}
}
func (t *TCPTransport) Responses() <-chan []byte { return t.responses }
func (t *TCPTransport) Errors() <-chan error { return t.errors }
func (t *TCPTransport) Shutdown() {
t.grp.StopAndWait()
}
func (t *TCPTransport) close() {
err := t.conn.Close()
if err != nil {
t.error(err)
}
}