Merge branch 'code_cleanup'

* code_cleanup:
  code cleanup
This commit is contained in:
Alex Grintsvayg 2018-06-13 09:36:21 -04:00
commit 8859733101
37 changed files with 615 additions and 239 deletions

71
.travis.yml Normal file
View file

@ -0,0 +1,71 @@
os: linux
dist: trusty
language: go
# Only the last two Go releases are supported by the Go team with security
# updates. Any versions older than that should be considered deprecated.
# Don't bother testing with them. tip builds your code with the latest
# development version of Go. This can warn you that your code will break
# in the next version of Go. Don't worry! Later we declare that test runs
# are allowed to fail on Go tip.
go:
- 1.10.2
- master
# Skip the install step. Don't `go get` dependencies. Only build with the
# code in vendor/
install: true
matrix:
# It's ok if our code fails on unstable development versions of Go.
allow_failures:
- go: master
# Don't wait for tip tests to finish. Mark the test run green if the
# tests pass on the stable versions of Go.
fast_finish: true
# Don't email me the results of the test runs.
notifications:
email: false
# Anything in before_script that returns a nonzero exit code will
# flunk the build and immediately stop. It's sorta like having
# set -e enabled in bash.
before_script:
# All the .go files, excluding vendor/ and model (auto generated)
- GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ )
- go get golang.org/x/tools/cmd/goimports # Used in build script for generated files
- go get github.com/golang/lint/golint # Linter
- go get honnef.co/go/tools/cmd/megacheck # Badass static analyzer/linter
- go get github.com/jgautheron/gocyclo # Check against high complexity
- go get github.com/mdempsky/unconvert # Identifies unnecessary type conversions
- go get github.com/kisielk/errcheck # Checks for unhandled errors
- go get github.com/opennota/check/cmd/varcheck # Checks for unused vars
- go get github.com/opennota/check/cmd/structcheck # Checks for unused fields in structs
# script always run to completion (set +e). All of these code checks are must haves
# in a modern Go project.
script:
# Build Prism successfully
- make
# Fail if a .go file hasn't been formatted with gofmt
- test -z $(gofmt -s -l $GO_FILES)
# Run unit tests
- go test ./...
# Checks for unused vars and fields on structs
- varcheck ./...
- structcheck ./...
# go vet is the official Go static analyzer
- go vet ./...
# forbid code with huge functions
- gocyclo -ignore "_test.go" -avg -over 19 $GO_FILES
# checks for unhandled errors
- errcheck ./...
# "go vet on steroids" + linter - ignore autogen code
- megacheck -simple.exit-non-zero=true ./...
# check for unnecessary conversions - ignore autogen code
- unconvert ./...
# one last linter - ignore autogen code
- golint -set_exit_status $(go list ./... | grep -v /vendor/ )

View file

@ -14,9 +14,11 @@ import (
)
const (
// DefaultClusterPort is the default port used when starting up a Cluster.
DefaultClusterPort = 17946
)
// Cluster is a management type for Serf which is used to maintain cluster membership of lbry nodes.
type Cluster struct {
name string
port int
@ -27,6 +29,7 @@ type Cluster struct {
stop *stopOnce.Stopper
}
// New returns a new Cluster instance that is not connected.
func New(port int, seedAddr string) *Cluster {
return &Cluster{
name: crypto.RandString(12),
@ -36,6 +39,8 @@ func New(port int, seedAddr string) *Cluster {
}
}
// Connect Initializes the Cluster based on a configuration passed via the New function. It then stores the seed
// address, starts gossiping and listens for gossip.
func (c *Cluster) Connect() error {
var err error
@ -66,9 +71,12 @@ func (c *Cluster) Connect() error {
return nil
}
// Shutdown safely shuts down the cluster.
func (c *Cluster) Shutdown() {
c.stop.StopAndWait()
c.s.Leave()
if err := c.s.Leave(); err != nil {
log.Error("error shutting down cluster - ", err)
}
}
func (c *Cluster) listen() {

View file

@ -21,7 +21,7 @@ func init() {
Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs),
Run: clusterCmd,
}
RootCmd.AddCommand(cmd)
rootCmd.AddCommand(cmd)
}
func clusterCmd(cmd *cobra.Command, args []string) {

View file

@ -12,7 +12,7 @@ func init() {
Short: "Run interactive dht node",
Run: dhtCmd,
}
RootCmd.AddCommand(cmd)
rootCmd.AddCommand(cmd)
}
func dhtCmd(cmd *cobra.Command, args []string) {

View file

@ -17,15 +17,15 @@ func init() {
Short: "Run peer server",
Run: peerCmd,
}
RootCmd.AddCommand(cmd)
rootCmd.AddCommand(cmd)
}
func peerCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL)
err := db.Connect(GlobalConfig.DBConn)
err := db.Connect(globalConfig.DBConn)
checkErr(err)
s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db)
log.Fatal(peer.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(peer.DefaultPort)))
}

View file

@ -17,15 +17,15 @@ func init() {
Short: "Run reflector server",
Run: reflectorCmd,
}
RootCmd.AddCommand(cmd)
rootCmd.AddCommand(cmd)
}
func reflectorCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL)
err := db.Connect(GlobalConfig.DBConn)
err := db.Connect(globalConfig.DBConn)
checkErr(err)
s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db)
log.Fatal(reflector.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(reflector.DefaultPort)))
}

View file

@ -11,6 +11,7 @@ import (
"github.com/spf13/cobra"
)
// Config is the base configuration for Prism when no sub commands are used.
type Config struct {
AwsID string `json:"aws_id"`
AwsSecret string `json:"aws_secret"`
@ -19,25 +20,24 @@ type Config struct {
DBConn string `json:"db_conn"`
}
var Verbose bool
var Conf string
var GlobalConfig Config
var verbose bool
var conf string
var globalConfig Config
// RootCmd represents the base command when called without any subcommands
var RootCmd = &cobra.Command{
var rootCmd = &cobra.Command{
Use: "reflector",
Short: "Reflector accepts blobs, stores them securely, and hosts them on the network",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
if Verbose {
if verbose {
log.SetLevel(log.DebugLevel)
}
var err error
if Conf == "" {
if conf == "" {
log.Errorln("--conf flag required")
os.Exit(1)
} else {
GlobalConfig, err = loadConfig(Conf)
globalConfig, err = loadConfig(conf)
if err != nil {
log.Error(err)
os.Exit(1)
@ -51,14 +51,14 @@ var RootCmd = &cobra.Command{
}
func init() {
RootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "v", false, "Enable verbose logging")
RootCmd.PersistentFlags().StringVar(&Conf, "conf", "config.json", "Path to config")
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose logging")
rootCmd.PersistentFlags().StringVar(&conf, "conf", "config.json", "Path to config")
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := RootCmd.Execute(); err != nil {
if err := rootCmd.Execute(); err != nil {
log.Errorln(err)
os.Exit(1)
}

View file

@ -20,15 +20,15 @@ func init() {
Run: startCmd,
Args: cobra.RangeArgs(0, 1),
}
RootCmd.AddCommand(cmd)
rootCmd.AddCommand(cmd)
}
func startCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL)
err := db.Connect(GlobalConfig.DBConn)
err := db.Connect(globalConfig.DBConn)
checkErr(err)
s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
comboStore := store.NewDBBackedS3Store(s3, db)
clusterAddr := ""

View file

@ -20,6 +20,23 @@ import (
var workers int
const (
sdInc = 1
blobInc = 2
errInc = 3
)
type uploaderParams struct {
workerWG *sync.WaitGroup
counterWG *sync.WaitGroup
stopper *stopOnce.Stopper
filenameChan chan string
countChan chan int
sdCount int
blobCount int
errCount int
}
func init() {
var cmd = &cobra.Command{
Use: "upload DIR",
@ -28,40 +45,27 @@ func init() {
Run: uploadCmd,
}
cmd.PersistentFlags().IntVar(&workers, "workers", 1, "How many worker threads to run at once")
RootCmd.AddCommand(cmd)
rootCmd.AddCommand(cmd)
}
func uploadCmd(cmd *cobra.Command, args []string) {
startTime := time.Now()
db := new(db.SQL)
err := db.Connect(GlobalConfig.DBConn)
err := db.Connect(globalConfig.DBConn)
checkErr(err)
stopper := stopOnce.New()
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-interruptChan
stopper.Stop()
}()
params := uploaderParams{
workerWG: &sync.WaitGroup{},
counterWG: &sync.WaitGroup{},
filenameChan: make(chan string),
countChan: make(chan int),
stopper: stopOnce.New()}
dir := args[0]
setInterrupt(params.stopper)
f, err := os.Open(dir)
filenames, err := getFileNames(args[0])
checkErr(err)
files, err := f.Readdir(-1)
checkErr(err)
err = f.Close()
checkErr(err)
var filenames []string
for _, file := range files {
if !file.IsDir() {
filenames = append(filenames, file.Name())
}
}
totalCount := len(filenames)
log.Println("checking for existing blobs")
@ -72,96 +76,11 @@ func uploadCmd(cmd *cobra.Command, args []string) {
log.Printf("%d new blobs to upload", totalCount-existsCount)
sdCount := 0
blobCount := 0
errCount := 0
workerWG := &sync.WaitGroup{}
filenameChan := make(chan string)
counterWG := &sync.WaitGroup{}
countChan := make(chan int)
const (
sdInc = 1
blobInc = 2
errInc = 3
)
for i := 0; i < workers; i++ {
go func(i int) {
workerWG.Add(1)
defer workerWG.Done()
defer func(i int) {
log.Printf("worker %d quitting", i)
}(i)
blobStore := newBlobStore()
for {
select {
case <-stopper.Ch():
return
case filename, ok := <-filenameChan:
if !ok {
return
}
blob, err := ioutil.ReadFile(dir + "/" + filename)
checkErr(err)
hash := peer.GetBlobHash(blob)
if hash != filename {
log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", i, filename, hash)
select {
case countChan <- errInc:
case <-stopper.Ch():
}
continue
}
if isJSON(blob) {
log.Printf("worker %d: PUTTING SD BLOB %s", i, hash)
blobStore.PutSD(hash, blob)
select {
case countChan <- sdInc:
case <-stopper.Ch():
}
} else {
log.Printf("worker %d: putting %s", i, hash)
blobStore.Put(hash, blob)
select {
case countChan <- blobInc:
case <-stopper.Ch():
}
}
}
}
}(i)
}
startUploadWorkers(&params, args[0])
params.counterWG.Add(1)
go func() {
counterWG.Add(1)
defer counterWG.Done()
for {
select {
case <-stopper.Ch():
return
case countType, ok := <-countChan:
if !ok {
return
}
switch countType {
case sdInc:
sdCount++
case blobInc:
blobCount++
case errInc:
errCount++
}
}
if (sdCount+blobCount)%50 == 0 {
log.Printf("%d of %d done (%s elapsed, %.3fs per blob)", sdCount+blobCount, totalCount-existsCount, time.Now().Sub(startTime).String(), time.Now().Sub(startTime).Seconds()/float64(sdCount+blobCount))
}
}
defer params.counterWG.Done()
runCountReceiver(&params, startTime, totalCount, existsCount)
}()
Upload:
@ -171,25 +90,25 @@ Upload:
}
select {
case filenameChan <- filename:
case <-stopper.Ch():
case params.filenameChan <- filename:
case <-params.stopper.Ch():
log.Warnln("Caught interrupt, quitting at first opportunity...")
break Upload
}
}
close(filenameChan)
workerWG.Wait()
close(countChan)
counterWG.Wait()
stopper.Stop()
close(params.filenameChan)
params.workerWG.Wait()
close(params.countChan)
params.counterWG.Wait()
params.stopper.Stop()
log.Println("SUMMARY")
log.Printf("%d blobs total", totalCount)
log.Printf("%d SD blobs uploaded", sdCount)
log.Printf("%d content blobs uploaded", blobCount)
log.Printf("%d SD blobs uploaded", params.sdCount)
log.Printf("%d content blobs uploaded", params.blobCount)
log.Printf("%d blobs already stored", existsCount)
log.Printf("%d errors encountered", errCount)
log.Printf("%d errors encountered", params.errCount)
}
func isJSON(data []byte) bool {
@ -199,9 +118,128 @@ func isJSON(data []byte) bool {
func newBlobStore() *store.DBBackedS3Store {
db := new(db.SQL)
err := db.Connect(GlobalConfig.DBConn)
err := db.Connect(globalConfig.DBConn)
checkErr(err)
s3 := store.NewS3BlobStore(GlobalConfig.AwsID, GlobalConfig.AwsSecret, GlobalConfig.BucketRegion, GlobalConfig.BucketName)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
return store.NewDBBackedS3Store(s3, db)
}
func setInterrupt(stopper *stopOnce.Stopper) {
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-interruptChan
stopper.Stop()
}()
}
func startUploadWorkers(params *uploaderParams, dir string) {
for i := 0; i < workers; i++ {
params.workerWG.Add(1)
go func(i int) {
defer params.workerWG.Done()
defer func(i int) {
log.Printf("worker %d quitting", i)
}(i)
blobStore := newBlobStore()
launchFileUploader(params, blobStore, dir, i)
}(i)
}
}
func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, dir string, worker int) {
for {
select {
case <-params.stopper.Ch():
return
case filename, ok := <-params.filenameChan:
if !ok {
return
}
blob, err := ioutil.ReadFile(dir + "/" + filename)
checkErr(err)
hash := peer.GetBlobHash(blob)
if hash != filename {
log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", worker, filename, hash)
select {
case params.countChan <- errInc:
case <-params.stopper.Ch():
}
continue
}
if isJSON(blob) {
log.Printf("worker %d: PUTTING SD BLOB %s", worker, hash)
if err := blobStore.PutSD(hash, blob); err != nil {
log.Error("PutSD Error: ", err)
}
select {
case params.countChan <- sdInc:
case <-params.stopper.Ch():
}
} else {
log.Printf("worker %d: putting %s", worker, hash)
if err := blobStore.Put(hash, blob); err != nil {
log.Error("Put Blob Error: ", err)
}
select {
case params.countChan <- blobInc:
case <-params.stopper.Ch():
}
}
}
}
}
func runCountReceiver(params *uploaderParams, startTime time.Time, totalCount int, existsCount int) {
for {
select {
case <-params.stopper.Ch():
return
case countType, ok := <-params.countChan:
if !ok {
return
}
switch countType {
case sdInc:
params.sdCount++
case blobInc:
params.blobCount++
case errInc:
params.errCount++
}
}
if (params.sdCount+params.blobCount)%50 == 0 {
log.Printf("%d of %d done (%s elapsed, %.3fs per blob)", params.sdCount+params.blobCount, totalCount-existsCount, time.Since(startTime).String(), time.Since(startTime).Seconds()/float64(params.sdCount+params.blobCount))
}
}
}
func getFileNames(dir string) ([]string, error) {
f, err := os.Open(dir)
if err != nil {
return nil, err
}
files, err := f.Readdir(-1)
if err != nil {
return nil, err
}
err = f.Close()
if err != nil {
return nil, err
}
var filenames []string
for _, file := range files {
if !file.IsDir() {
filenames = append(filenames, file.Name())
}
}
return filenames, nil
}

View file

@ -6,11 +6,13 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/querytools"
"github.com/lbryio/reflector.go/types"
// blank import for db driver
_ "github.com/go-sql-driver/mysql"
log "github.com/sirupsen/logrus"
)
// DB interface communicates to a backend database with a simple set of methods that supports tracking blobs that are
// used together with a BlobStore. The DB tracks pointers and the BlobStore stores the data.
type DB interface {
Connect(string) error
HasBlob(string) (bool, error)
@ -18,6 +20,7 @@ type DB interface {
AddSDBlob(string, int, types.SdBlob) error
}
// SQL is the container for the supporting MySQL database connection.
type SQL struct {
conn *sql.DB
}
@ -31,6 +34,7 @@ func logQuery(query string, args ...interface{}) {
}
}
// Connect will create a connection to the database
func (s *SQL) Connect(dsn string) error {
var err error
dsn += "?parseTime=1&collation=utf8mb4_unicode_ci"
@ -42,6 +46,7 @@ func (s *SQL) Connect(dsn string) error {
return errors.Err(s.conn.Ping())
}
// AddBlob adds a blobs information to the database.
func (s *SQL) AddBlob(hash string, length int, stored bool) error {
if s.conn == nil {
return errors.Err("not connected")
@ -75,6 +80,7 @@ func addBlob(tx *sql.Tx, hash string, length int, stored bool) error {
return nil
}
// HasBlob checks if the database contains the blob information.
func (s *SQL) HasBlob(hash string) (bool, error) {
if s.conn == nil {
return false, errors.Err("not connected")
@ -93,6 +99,7 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
return exists, errors.Err(err)
}
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
if s.conn == nil {
return nil, errors.Err("not connected")
@ -122,14 +129,14 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
rows, err := s.conn.Query(query, args...)
if err != nil {
rows.Close()
closeRows(rows)
return exists, err
}
for rows.Next() {
err := rows.Scan(&hash)
if err != nil {
rows.Close()
closeRows(rows)
return exists, err
}
exists[hash] = true
@ -137,17 +144,20 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
err = rows.Err()
if err != nil {
rows.Close()
closeRows(rows)
return exists, err
}
rows.Close()
closeRows(rows)
doneIndex += len(batch)
}
return exists, nil
}
// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information
// into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is
// rolled back and error(s) are returned.
func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob types.SdBlob) error {
if s.conn == nil {
return errors.Err("not connected")
@ -225,10 +235,14 @@ func withTx(dbOrTx interface{}, f txFunc) (err error) {
}
defer func() {
if p := recover(); p != nil {
tx.Rollback()
if rollBackError := tx.Rollback(); rollBackError != nil {
log.Error("failed to rollback tx on panic - ", rollBackError)
}
panic(p)
} else if err != nil {
tx.Rollback()
if rollBackError := tx.Rollback(); rollBackError != nil {
log.Error("failed to rollback tx on panic - ", rollBackError)
}
} else {
err = errors.Err(tx.Commit())
}
@ -240,6 +254,13 @@ func withTx(dbOrTx interface{}, f txFunc) (err error) {
return f(tx)
}
func closeRows(rows *sql.Rows) {
if err := rows.Close(); err != nil {
log.Error("error closing rows: ", err)
}
}
/*// func to generate schema. SQL below that.
func schema() {
_ = `
CREATE TABLE blob_ (
@ -269,4 +290,35 @@ CREATE TABLE stream_blob (
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
`
}
}*/
/* SQL script to create schema
CREATE TABLE `reflector`.`blob_`
(
`hash` char(96) NOT NULL,
`stored` TINYINT(1) NOT NULL DEFAULT 0,
`length` bigint(20) unsigned DEFAULT NULL,
`last_announced_at` datetime DEFAULT NULL,
PRIMARY KEY (`hash`),
KEY `last_announced_at_idx` (`last_announced_at`)
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE `reflector`.`stream`
(
`hash` char(96) NOT NULL,
`sd_hash` char(96) NOT NULL,
PRIMARY KEY (hash),
KEY `sd_hash_idx` (`sd_hash`),
FOREIGN KEY (`sd_hash`) REFERENCES `blob_` (`hash`) ON DELETE RESTRICT ON UPDATE CASCADE
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE `reflector`.`stream_blob`
(
`stream_hash` char(96) NOT NULL,
`blob_hash` char(96) NOT NULL,
`num` int NOT NULL,
PRIMARY KEY (`stream_hash`, `blob_hash`),
FOREIGN KEY (`stream_hash`) REFERENCES `stream` (`hash`) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (`blob_hash`) REFERENCES `blob_` (`hash`) ON DELETE CASCADE ON UPDATE CASCADE
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
*/

View file

@ -6,15 +6,19 @@ import (
"encoding/hex"
"strings"
"strconv"
"github.com/lbryio/lbry.go/errors"
"github.com/lyoshenka/bencode"
)
// TODO: http://roaringbitmap.org/
// Bitmap is a generalized representation of an identifier or data that can be sorted, compared fast. Used by the DHT
// package as a way to handle the unique identifiers of a DHT node.
type Bitmap [nodeIDLength]byte
func (b Bitmap) RawString() string {
func (b Bitmap) rawString() string {
return string(b[:])
}
@ -31,14 +35,17 @@ func (b Bitmap) BString() string {
return buf.String()
}
// Hex returns a hexadecimal representation of the bitmap.
func (b Bitmap) Hex() string {
return hex.EncodeToString(b[:])
}
// HexShort returns a hexadecimal representation of the first 4 bytes.
func (b Bitmap) HexShort() string {
return hex.EncodeToString(b[:4])
}
// HexSimplified returns the hexadecimal representation with all leading 0's removed
func (b Bitmap) HexSimplified() string {
simple := strings.TrimLeft(b.Hex(), "0")
if simple == "" {
@ -47,6 +54,7 @@ func (b Bitmap) HexSimplified() string {
return simple
}
// Equals returns T/F if every byte in bitmap are equal.
func (b Bitmap) Equals(other Bitmap) bool {
for k := range b {
if b[k] != other[k] {
@ -56,6 +64,7 @@ func (b Bitmap) Equals(other Bitmap) bool {
return true
}
// Less returns T/F if there exists a byte pair that is not equal AND this bitmap is less than the other.
func (b Bitmap) Less(other interface{}) bool {
for k := range b {
if b[k] != other.(Bitmap)[k] {
@ -65,6 +74,7 @@ func (b Bitmap) Less(other interface{}) bool {
return false
}
// LessOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is less than the other.
func (b Bitmap) LessOrEqual(other interface{}) bool {
if bm, ok := other.(Bitmap); ok && b.Equals(bm) {
return true
@ -72,6 +82,7 @@ func (b Bitmap) LessOrEqual(other interface{}) bool {
return b.Less(other)
}
// Greater returns T/F if there exists a byte pair that is not equal AND this bitmap byte is greater than the other.
func (b Bitmap) Greater(other interface{}) bool {
for k := range b {
if b[k] != other.(Bitmap)[k] {
@ -81,6 +92,7 @@ func (b Bitmap) Greater(other interface{}) bool {
return false
}
// GreaterOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is greater than the other.
func (b Bitmap) GreaterOrEqual(other interface{}) bool {
if bm, ok := other.(Bitmap); ok && b.Equals(bm) {
return true
@ -88,12 +100,15 @@ func (b Bitmap) GreaterOrEqual(other interface{}) bool {
return b.Greater(other)
}
// Copy returns a duplicate value for the bitmap.
func (b Bitmap) Copy() Bitmap {
var ret Bitmap
copy(ret[:], b[:])
return ret
}
// Xor returns a diff bitmap. If they are equal, the returned bitmap will be all 0's. If 100% unique the returned
// bitmap will be all 1's.
func (b Bitmap) Xor(other Bitmap) Bitmap {
var ret Bitmap
for k := range b {
@ -102,6 +117,7 @@ func (b Bitmap) Xor(other Bitmap) Bitmap {
return ret
}
// And returns a comparison bitmap, that for each byte returns the AND true table result
func (b Bitmap) And(other Bitmap) Bitmap {
var ret Bitmap
for k := range b {
@ -110,6 +126,7 @@ func (b Bitmap) And(other Bitmap) Bitmap {
return ret
}
// Or returns a comparison bitmap, that for each byte returns the OR true table result
func (b Bitmap) Or(other Bitmap) Bitmap {
var ret Bitmap
for k := range b {
@ -118,6 +135,7 @@ func (b Bitmap) Or(other Bitmap) Bitmap {
return ret
}
// Not returns a complimentary bitmap that is an inverse. So b.NOT.NOT = b
func (b Bitmap) Not() Bitmap {
var ret Bitmap
for k := range b {
@ -138,16 +156,21 @@ func (b Bitmap) add(other Bitmap) (Bitmap, bool) {
return ret, carry
}
// Add returns a bitmap that treats both bitmaps as numbers and adding them together. Since the size of a bitmap is
// limited, an overflow is possible when adding bitmaps.
func (b Bitmap) Add(other Bitmap) Bitmap {
ret, carry := b.add(other)
if carry {
panic("overflow in bitmap addition")
panic("overflow in bitmap addition. limited to " + strconv.Itoa(nodeIDBits) + " bits.")
}
return ret
}
// Sub returns a bitmap that treats both bitmaps as numbers and subtracts then via the inverse of the other and adding
// then together a + (-b). Negative bitmaps are not supported so other must be greater than this.
func (b Bitmap) Sub(other Bitmap) Bitmap {
if b.Less(other) {
// ToDo: Why is this not supported? Should it say not implemented? BitMap might have a generic use case outside of dht.
panic("negative bitmaps not supported")
}
complement, _ := other.Not().add(BitmapFromShortHexP("1"))
@ -155,10 +178,12 @@ func (b Bitmap) Sub(other Bitmap) Bitmap {
return ret
}
// Get returns the binary bit at the position passed.
func (b Bitmap) Get(n int) bool {
return getBit(b[:], n)
}
// Set sets the binary bit at the position passed.
func (b Bitmap) Set(n int, one bool) Bitmap {
ret := b.Copy()
setBit(ret[:], n, one)
@ -200,7 +225,7 @@ Outer:
return ret
}
// Syffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false)
// Suffix returns a copy of b with the last n bits set to 1 (if `one` is true) or 0 (if `one` is false)
// https://stackoverflow.com/a/23192263/182709
func (b Bitmap) Suffix(n int, one bool) Bitmap {
ret := b.Copy()
@ -223,11 +248,13 @@ Outer:
return ret
}
// MarshalBencode implements the Marshaller(bencode)/Message interface.
func (b Bitmap) MarshalBencode() ([]byte, error) {
str := string(b[:])
return bencode.EncodeBytes(str)
}
// UnmarshalBencode implements the Marshaller(bencode)/Message interface.
func (b *Bitmap) UnmarshalBencode(encoded []byte) error {
var str string
err := bencode.DecodeBytes(encoded, &str)
@ -241,6 +268,7 @@ func (b *Bitmap) UnmarshalBencode(encoded []byte) error {
return nil
}
// BitmapFromBytes returns a bitmap as long as the byte array is of a specific length specified in the parameters.
func BitmapFromBytes(data []byte) (Bitmap, error) {
var bmp Bitmap
@ -252,6 +280,8 @@ func BitmapFromBytes(data []byte) (Bitmap, error) {
return bmp, nil
}
// BitmapFromBytesP returns a bitmap as long as the byte array is of a specific length specified in the parameters
// otherwise it wil panic.
func BitmapFromBytesP(data []byte) Bitmap {
bmp, err := BitmapFromBytes(data)
if err != nil {
@ -260,10 +290,14 @@ func BitmapFromBytesP(data []byte) Bitmap {
return bmp
}
//BitmapFromString returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters
func BitmapFromString(data string) (Bitmap, error) {
return BitmapFromBytes([]byte(data))
}
//BitmapFromStringP returns a bitmap by converting the string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters otherwise it wil panic.
func BitmapFromStringP(data string) Bitmap {
bmp, err := BitmapFromString(data)
if err != nil {
@ -272,6 +306,8 @@ func BitmapFromStringP(data string) Bitmap {
return bmp
}
//BitmapFromHex returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters
func BitmapFromHex(hexStr string) (Bitmap, error) {
decoded, err := hex.DecodeString(hexStr)
if err != nil {
@ -280,6 +316,8 @@ func BitmapFromHex(hexStr string) (Bitmap, error) {
return BitmapFromBytes(decoded)
}
//BitmapFromHexP returns a bitmap by converting the hex string to bytes and creating from bytes as long as the byte array
// is of a specific length specified in the parameters otherwise it wil panic.
func BitmapFromHexP(hexStr string) Bitmap {
bmp, err := BitmapFromHex(hexStr)
if err != nil {
@ -288,10 +326,15 @@ func BitmapFromHexP(hexStr string) Bitmap {
return bmp
}
//BitmapFromShortHex returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the
// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters
func BitmapFromShortHex(hexStr string) (Bitmap, error) {
return BitmapFromHex(strings.Repeat("0", nodeIDLength*2-len(hexStr)) + hexStr)
}
//BitmapFromShortHexP returns a bitmap by converting the hex string to bytes, adding the leading zeros prefix to the
// hex string and creating from bytes as long as the byte array is of a specific length specified in the parameters
// otherwise it wil panic.
func BitmapFromShortHexP(hexStr string) Bitmap {
bmp, err := BitmapFromShortHex(hexStr)
if err != nil {
@ -300,6 +343,7 @@ func BitmapFromShortHexP(hexStr string) Bitmap {
return bmp
}
// RandomBitmapP generates a cryptographically random bitmap with the confines of the parameters specified.
func RandomBitmapP() Bitmap {
var id Bitmap
_, err := rand.Read(id[:])
@ -309,12 +353,16 @@ func RandomBitmapP() Bitmap {
return id
}
// RandomBitmapInRangeP generates a cryptographically random bitmap and while it is greater than the high threshold
// bitmap will subtract the diff between high and low until it is no longer greater that the high.
func RandomBitmapInRangeP(low, high Bitmap) Bitmap {
diff := high.Sub(low)
r := RandomBitmapP()
for r.Greater(diff) {
r = r.Sub(diff)
}
//ToDo - Adding the low at this point doesn't gurantee it will be within the range. Consider bitmaps as numbers and
// I have a range of 50-100. If get to say 60, and add 50, I would be at 110. Should protect against this?
return r.Add(low)
}

View file

@ -54,13 +54,10 @@ func TestBitmap(t *testing.T) {
func TestBitmap_GetBit(t *testing.T) {
tt := []struct {
hex string
bit int
expected bool
panic bool
}{
//{hex: "0", bit: 385, one: true, expected: "1", panic:true}, // should error
//{hex: "0", bit: 384, one: true, expected: "1", panic:true},
{bit: 383, expected: false, panic: false},
{bit: 382, expected: true, panic: false},
{bit: 381, expected: false, panic: false},

View file

@ -13,6 +13,7 @@ const (
bootstrapDefaultRefreshDuration = 15 * time.Minute
)
// BootstrapNode is a configured node setup for testing.
type BootstrapNode struct {
Node
@ -24,7 +25,7 @@ type BootstrapNode struct {
nodeKeys map[Bitmap]int
}
// New returns a BootstrapNode pointer.
// NewBootstrapNode returns a BootstrapNode pointer.
func NewBootstrapNode(id Bitmap, initialPingInterval, rePingInterval time.Duration) *BootstrapNode {
b := &BootstrapNode{
Node: *NewNode(id),
@ -71,7 +72,7 @@ func (b *BootstrapNode) Connect(conn UDPConn) error {
return nil
}
// ypsert adds the contact to the list, or updates the lastPinged time
// upsert adds the contact to the list, or updates the lastPinged time
func (b *BootstrapNode) upsert(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
@ -157,17 +158,21 @@ func (b *BootstrapNode) check() {
func (b *BootstrapNode) handleRequest(addr *net.UDPAddr, request Request) {
switch request.Method {
case pingMethod:
b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse})
if err := b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse}); err != nil {
log.Error("error sending response message - ", err)
}
case findNodeMethod:
if request.Arg == nil {
log.Errorln("request is missing arg")
return
}
b.sendMessage(addr, Response{
if err := b.sendMessage(addr, Response{
ID: request.ID,
NodeID: b.id,
Contacts: b.get(bucketSize),
})
}); err != nil {
log.Error("error sending 'findnodemethod' response message - ", err)
}
}
go func() {

View file

@ -13,7 +13,9 @@ func TestBootstrapPing(t *testing.T) {
panic(err)
}
b.Connect(listener.(*net.UDPConn))
if err := b.Connect(listener.(*net.UDPConn)); err != nil {
t.Error(err)
}
defer b.Shutdown()
b.Shutdown()

View file

@ -35,8 +35,7 @@ const (
udpMaxMessageLength = 1024 // bytes. I think our longest message is ~676 bytes, so I rounded up
maxPeerFails = 3 // after this many failures, a peer is considered bad and will be removed from the routing table
tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
//tExpire = 60 * time.Minute // the time after which a key/value pair expires; this is a time-to-live (TTL) from the original publication date
tReannounce = 50 * time.Minute // the time after which the original publisher must republish a key/value pair
tRefresh = 1 * time.Hour // the time after which an otherwise unaccessed bucket must be refreshed
//tReplicate = 1 * time.Hour // the interval between Kademlia replication events, when a node is required to publish its entire database
@ -165,6 +164,7 @@ func (dht *DHT) Start() error {
return nil
}
// WaitUntilJoined blocks until the node joins the network.
func (dht *DHT) WaitUntilJoined() {
if dht.joined == nil {
panic("dht not initialized")
@ -181,7 +181,8 @@ func (dht *DHT) Shutdown() {
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
}
// Get returns the list of nodes that have the blob for the given hash
// Ping pings a given address, creates a temporary contact for sending a message, and returns an error if communication
// fails.
func (dht *DHT) Ping(addr string) error {
raddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
@ -254,7 +255,11 @@ func (dht *DHT) startReannouncer() {
case <-tick.C:
dht.lock.RLock()
for h := range dht.announced {
go dht.Announce(h)
go func(bm Bitmap) {
if err := dht.Announce(bm); err != nil {
log.Error("error re-announcing bitmap - ", err)
}
}(h)
}
dht.lock.RUnlock()
}
@ -310,6 +315,8 @@ func (dht *DHT) storeOnNode(hash Bitmap, c Contact) {
}()
}
// PrintState prints the current state of the DHT including address, nr outstanding transactions, stored hashes as well
// as current bucket information.
func (dht *DHT) PrintState() {
log.Printf("DHT node %s at %s", dht.contact.String(), time.Now().Format(time.RFC822Z))
log.Printf("Outstanding transactions: %d", dht.node.CountActiveTransactions())

View file

@ -8,7 +8,7 @@ import (
)
func TestNodeFinder_FindNodes(t *testing.T) {
bs, dhts := TestingCreateDHT(3, true, false)
bs, dhts := TestingCreateDHT(t, 3, true, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -59,7 +59,7 @@ func TestNodeFinder_FindNodes(t *testing.T) {
}
func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) {
_, dhts := TestingCreateDHT(3, false, false)
_, dhts := TestingCreateDHT(t, 3, false, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -74,7 +74,7 @@ func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) {
}
func TestNodeFinder_FindValue(t *testing.T) {
bs, dhts := TestingCreateDHT(3, true, false)
bs, dhts := TestingCreateDHT(t, 3, true, false)
defer func() {
for i := range dhts {
dhts[i].Shutdown()
@ -108,7 +108,7 @@ func TestNodeFinder_FindValue(t *testing.T) {
func TestDHT_LargeDHT(t *testing.T) {
nodes := 100
bs, dhts := TestingCreateDHT(nodes, true, true)
bs, dhts := TestingCreateDHT(t, nodes, true, true)
defer func() {
for _, d := range dhts {
go d.Shutdown()
@ -121,10 +121,12 @@ func TestDHT_LargeDHT(t *testing.T) {
ids := make([]Bitmap, nodes)
for i := range ids {
ids[i] = RandomBitmapP()
go func(i int) {
wg.Add(1)
wg.Add(1)
go func(index int) {
defer wg.Done()
dhts[i].Announce(ids[i])
if err := dhts[index].Announce(ids[index]); err != nil {
t.Error("error announcing random bitmap - ", err)
}
}(i)
}
wg.Wait()

View file

@ -42,16 +42,19 @@ const (
tokenField = "token"
)
// Message is an extension of the bencode marshalling interface for serialized message passing.
type Message interface {
bencode.Marshaler
}
type messageID [messageIDLength]byte
// HexShort returns the first 8 hex characters of the hex encoded message id.
func (m messageID) HexShort() string {
return hex.EncodeToString(m[:])[:8]
}
// UnmarshalBencode takes a byte slice and unmarshals the message id.
func (m *messageID) UnmarshalBencode(encoded []byte) error {
var str string
err := bencode.DecodeBytes(encoded, &str)
@ -62,6 +65,7 @@ func (m *messageID) UnmarshalBencode(encoded []byte) error {
return nil
}
// MarshallBencode returns the encoded byte slice of the message id.
func (m messageID) MarshalBencode() ([]byte, error) {
str := string(m[:])
return bencode.EncodeBytes(str)
@ -76,6 +80,7 @@ func newMessageID() messageID {
return m
}
// Request represents the structured request from one node to another.
type Request struct {
ID messageID
NodeID Bitmap
@ -84,6 +89,7 @@ type Request struct {
StoreArgs *storeArgs
}
// MarshalBencode returns the serialized byte slice representation of the request
func (r Request) MarshalBencode() ([]byte, error) {
var args interface{}
if r.StoreArgs != nil {
@ -102,6 +108,7 @@ func (r Request) MarshalBencode() ([]byte, error) {
})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the request.
func (r *Request) UnmarshalBencode(b []byte) error {
var raw struct {
ID messageID `bencode:"1"`
@ -136,7 +143,7 @@ func (r *Request) UnmarshalBencode(b []byte) error {
return nil
}
func (r Request) ArgsDebug() string {
func (r Request) argsDebug() string {
if r.StoreArgs != nil {
return r.StoreArgs.BlobHash.HexShort() + ", " + r.StoreArgs.Value.LbryID.HexShort() + ":" + strconv.Itoa(r.StoreArgs.Value.Port)
} else if r.Arg != nil {
@ -158,6 +165,7 @@ type storeArgs struct {
SelfStore bool // this is an int on the wire
}
// MarshalBencode returns the serialized byte slice representation of the storage arguments.
func (s storeArgs) MarshalBencode() ([]byte, error) {
encodedValue, err := bencode.EncodeString(s.Value)
if err != nil {
@ -177,6 +185,7 @@ func (s storeArgs) MarshalBencode() ([]byte, error) {
})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments.
func (s *storeArgs) UnmarshalBencode(b []byte) error {
var argsInt []bencode.RawMessage
err := bencode.DecodeBytes(b, &argsInt)
@ -219,6 +228,7 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error {
return nil
}
// Response represents the structured response one node returns to another.
type Response struct {
ID messageID
NodeID Bitmap
@ -228,7 +238,7 @@ type Response struct {
Token string
}
func (r Response) ArgsDebug() string {
func (r Response) argsDebug() string {
if r.Data != "" {
return r.Data
}
@ -251,6 +261,7 @@ func (r Response) ArgsDebug() string {
return str
}
// MarshalBencode returns the serialized byte slice representation of the response.
func (r Response) MarshalBencode() ([]byte, error) {
data := map[string]interface{}{
headerTypeField: responseType,
@ -293,6 +304,7 @@ func (r Response) MarshalBencode() ([]byte, error) {
return bencode.EncodeBytes(data)
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the store arguments.
func (r *Response) UnmarshalBencode(b []byte) error {
var raw struct {
ID messageID `bencode:"1"`
@ -362,6 +374,7 @@ func (r *Response) UnmarshalBencode(b []byte) error {
return nil
}
// Error represents an error message that is returned from one node to another in communication.
type Error struct {
ID messageID
NodeID Bitmap
@ -369,6 +382,7 @@ type Error struct {
Response []string
}
// MarshalBencode returns the serialized byte slice representation of an error message.
func (e Error) MarshalBencode() ([]byte, error) {
return bencode.EncodeBytes(map[string]interface{}{
headerTypeField: errorType,
@ -379,6 +393,7 @@ func (e Error) MarshalBencode() ([]byte, error) {
})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the error message.
func (e *Error) UnmarshalBencode(b []byte) error {
var raw struct {
ID messageID `bencode:"1"`

View file

@ -101,7 +101,7 @@ func TestBencodeFindValueResponse(t *testing.T) {
res := Response{
ID: newMessageID(),
NodeID: RandomBitmapP(),
FindValueKey: RandomBitmapP().RawString(),
FindValueKey: RandomBitmapP().rawString(),
Token: "arst",
Contacts: []Contact{
{ID: RandomBitmapP(), IP: net.IPv4(1, 2, 3, 4).To4(), Port: 5678},

View file

@ -33,8 +33,10 @@ type UDPConn interface {
Close() error
}
// RequestHandlerFunc is exported handler for requests.
type RequestHandlerFunc func(addr *net.UDPAddr, request Request)
// Node is a type representation of a node on the network.
type Node struct {
// the node's id
id Bitmap
@ -61,7 +63,7 @@ type Node struct {
stop *stopOnce.Stopper
}
// New returns a Node pointer.
// NewNode returns an initialized Node's pointer.
func NewNode(id Bitmap) *Node {
return &Node{
id: id,
@ -87,13 +89,14 @@ func (n *Node) Connect(conn UDPConn) error {
<-n.stop.Ch()
n.tokens.Stop()
n.connClosed = true
n.conn.Close()
if err := n.conn.Close(); err != nil {
log.Error("error closing node connection on shutdown - ", err)
}
}()
packets := make(chan packet)
n.stop.Add(1)
go func() {
n.stop.Add(1)
defer n.stop.Done()
buf := make([]byte, udpMaxMessageLength)
@ -121,9 +124,8 @@ func (n *Node) Connect(conn UDPConn) error {
}
}
}()
n.stop.Add(1)
go func() {
n.stop.Add(1)
defer n.stop.Done()
var pkt packet
@ -171,7 +173,7 @@ func (n *Node) handlePacket(pkt packet) {
log.Errorf("[%s] error decoding request from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data))
return
}
log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.ArgsDebug())
log.Debugf("[%s] query %s: received request from %s: %s(%s)", n.id.HexShort(), request.ID.HexShort(), request.NodeID.HexShort(), request.Method, request.argsDebug())
n.handleRequest(pkt.raddr, request)
case '0' + responseType:
@ -181,7 +183,7 @@ func (n *Node) handlePacket(pkt packet) {
log.Errorf("[%s] error decoding response from %s: %s: (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), err.Error(), len(pkt.data), hex.EncodeToString(pkt.data))
return
}
log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.ArgsDebug())
log.Debugf("[%s] query %s: received response from %s: %s", n.id.HexShort(), response.ID.HexShort(), response.NodeID.HexShort(), response.argsDebug())
n.handleResponse(pkt.raddr, response)
case '0' + errorType:
@ -219,26 +221,34 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
log.Errorln("invalid request method")
return
case pingMethod:
n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse})
if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: pingSuccessResponse}); err != nil {
log.Error("error sending 'pingmethod' response message - ", err)
}
case storeMethod:
// TODO: we should be sending the IP in the request, not just using the sender's IP
// TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ???
if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) {
n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port})
n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse})
if err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse}); err != nil {
log.Error("error sending 'storemethod' response message - ", err)
}
} else {
n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"})
if err := n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-token"}); err != nil {
log.Error("error sending 'storemethod'response message for invalid-token - ", err)
}
}
case findNodeMethod:
if request.Arg == nil {
log.Errorln("request is missing arg")
return
}
n.sendMessage(addr, Response{
if err := n.sendMessage(addr, Response{
ID: request.ID,
NodeID: n.id,
Contacts: n.rt.GetClosest(*request.Arg, bucketSize),
})
}); err != nil {
log.Error("error sending 'findnodemethod' response message - ", err)
}
case findValueMethod:
if request.Arg == nil {
@ -253,13 +263,15 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
}
if contacts := n.store.Get(*request.Arg); len(contacts) > 0 {
res.FindValueKey = request.Arg.RawString()
res.FindValueKey = request.Arg.rawString()
res.Contacts = contacts
} else {
res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize)
}
n.sendMessage(addr, res)
if err := n.sendMessage(addr, res); err != nil {
log.Error("error sending 'findvaluemethod' response message - ", err)
}
}
// nodes that send us requests should not be inserted, only refreshed.
@ -294,15 +306,17 @@ func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error {
if req, ok := data.(Request); ok {
log.Debugf("[%s] query %s: sending request to %s (%d bytes) %s(%s)",
n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.ArgsDebug())
n.id.HexShort(), req.ID.HexShort(), addr.String(), len(encoded), req.Method, req.argsDebug())
} else if res, ok := data.(Response); ok {
log.Debugf("[%s] query %s: sending response to %s (%d bytes) %s",
n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.ArgsDebug())
n.id.HexShort(), res.ID.HexShort(), addr.String(), len(encoded), res.argsDebug())
} else {
log.Debugf("[%s] (%d bytes) %s", n.id.HexShort(), len(encoded), spew.Sdump(data))
}
n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
log.Error("error setting write deadline - ", err)
}
_, err = n.conn.WriteToUDP(encoded, addr)
return errors.Err(err)
@ -405,7 +419,7 @@ func (n *Node) SendCancelable(contact Contact, req Request) (<-chan *Response, c
return n.SendAsync(ctx, contact, req), cancel
}
// Count returns the number of transactions in the manager
// CountActiveTransactions returns the number of transactions in the manager
func (n *Node) CountActiveTransactions() int {
n.txLock.Lock()
defer n.txLock.Unlock()
@ -428,6 +442,7 @@ func (n *Node) startRoutingTableGrooming() {
}()
}
// Store stores a node contact in the node's contact store.
func (n *Node) Store(hash Bitmap, c Contact) {
n.store.Upsert(hash, c)
}

View file

@ -195,6 +195,9 @@ func (cf *contactFinder) insertIntoActiveList(contact Contact) {
inserted := false
for i, n := range cf.activeContacts {
// 5000ft: insert contact into sorted active contacts list
// Detail: if diff between new contact id and the target id has fewer changes than the n contact from target
// it should be inserted in between the previous and current.
if contact.ID.Xor(cf.target).Less(n.ID.Xor(cf.target)) {
cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...)
inserted = true

View file

@ -30,7 +30,7 @@ func TestPing(t *testing.T) {
data, err := bencode.EncodeBytes(map[string]interface{}{
headerTypeField: requestType,
headerMessageIDField: messageID,
headerNodeIDField: testNodeID.RawString(),
headerNodeIDField: testNodeID.rawString(),
headerPayloadField: "ping",
headerArgsField: []string{},
})
@ -86,7 +86,7 @@ func TestPing(t *testing.T) {
rNodeID, ok := response[headerNodeIDField].(string)
if !ok {
t.Error("node ID is not a string")
} else if rNodeID != dhtNodeID.RawString() {
} else if rNodeID != dhtNodeID.rawString() {
t.Error("unexpected node ID")
}
}
@ -176,7 +176,7 @@ func TestStore(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {
@ -257,7 +257,7 @@ func TestFindNode(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {
@ -290,10 +290,8 @@ func TestFindValueExisting(t *testing.T) {
defer dht.Shutdown()
nodesToInsert := 3
var nodes []Contact
for i := 0; i < nodesToInsert; i++ {
n := Contact{ID: RandomBitmapP(), IP: net.ParseIP("127.0.0.1"), Port: 10000 + i}
nodes = append(nodes, n)
dht.node.rt.Update(n)
}
@ -333,7 +331,7 @@ func TestFindValueExisting(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {
@ -345,7 +343,7 @@ func TestFindValueExisting(t *testing.T) {
t.Fatal("payload is not a dictionary")
}
compactContacts, ok := payload[valueToFind.RawString()]
compactContacts, ok := payload[valueToFind.rawString()]
if !ok {
t.Fatal("payload is missing key for search value")
}
@ -412,7 +410,7 @@ func TestFindValueFallbackToFindNode(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.RawString())
verifyResponse(t, response, messageID, dhtNodeID.rawString())
_, ok := response[headerPayloadField]
if !ok {

View file

@ -14,6 +14,7 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lyoshenka/bencode"
log "github.com/sirupsen/logrus"
)
// TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap
@ -21,24 +22,29 @@ import (
// TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg)
// https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41
// Contact is a type representation of another node that a specific node is in communication with.
type Contact struct {
ID Bitmap
IP net.IP
Port int
}
// Equals returns T/F if two contacts are the same.
func (c Contact) Equals(other Contact) bool {
return c.ID == other.ID
}
// Addr returns the UPD Address of the contact.
func (c Contact) Addr() *net.UDPAddr {
return &net.UDPAddr{IP: c.IP, Port: c.Port}
}
// String returns the concatenated short hex encoded string of its ID + @ + string represention of its UPD Address.
func (c Contact) String() string {
return c.ID.HexShort() + "@" + c.Addr().String()
}
// MarshalCompact returns the compact byte slice representation of a contact.
func (c Contact) MarshalCompact() ([]byte, error) {
if c.IP.To4() == nil {
return nil, errors.Err("ip not set")
@ -60,6 +66,7 @@ func (c Contact) MarshalCompact() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalCompact unmarshals the compact byte slice representation of a contact.
func (c *Contact) UnmarshalCompact(b []byte) error {
if len(b) != compactNodeInfoLength {
return errors.Err("invalid compact length")
@ -70,10 +77,12 @@ func (c *Contact) UnmarshalCompact(b []byte) error {
return nil
}
// MarshalBencode returns the serialized byte slice representation of a contact.
func (c Contact) MarshalBencode() ([]byte, error) {
return bencode.EncodeBytes([]interface{}{c.ID, c.IP.String(), c.Port})
}
// UnmarshalBencode unmarshals the serialized byte slice into the appropriate fields of the contact.
func (c *Contact) UnmarshalBencode(b []byte) error {
var raw []bencode.RawMessage
err := bencode.DecodeBytes(b, &raw)
@ -139,7 +148,7 @@ func (p *peer) Touch() {
// ActiveSince returns whether a peer has responded in the last `d` duration
// this is used to check if the peer is "good", meaning that we believe the peer will respond to our requests
func (p *peer) ActiveInLast(d time.Duration) bool {
return time.Now().Sub(p.LastActivity) > d
return time.Since(p.LastActivity) > d
}
// IsBad returns whether a peer is "bad", meaning that it has failed to respond to multiple pings in a row
@ -236,7 +245,7 @@ func find(id Bitmap, peers []peer) int {
func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool {
b.lock.RLock()
defer b.lock.RUnlock()
return time.Now().Sub(b.lastUpdate) > refreshInterval
return time.Since(b.lastUpdate) > refreshInterval
}
type routingTable struct {
@ -341,6 +350,7 @@ func (rt *routingTable) Count() int {
return count
}
// Range is a structure that holds a min and max bitmaps. The range is used in bucket sizing.
type Range struct {
start Bitmap
end Bitmap
@ -457,7 +467,9 @@ func RoutingTableRefresh(n *Node, refreshInterval time.Duration, cancel <-chan s
}()
}
nf.Find()
if _, err := nf.Find(); err != nil {
log.Error("error finding contact during routing table refresh - ", err)
}
}(id)
}

View file

@ -13,7 +13,8 @@ import (
var testingDHTIP = "127.0.0.1"
var testingDHTFirstPort = 21000
func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) {
// TestingCreateDHT initializes a testable DHT network with a specific number of nodes, with bootstrap and concurrent options.
func TestingCreateDHT(t *testing.T, numNodes int, bootstrap, concurrent bool) (*BootstrapNode, []*DHT) {
var bootstrapNode *BootstrapNode
var seeds []string
@ -25,7 +26,9 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode,
if err != nil {
panic(err)
}
bootstrapNode.Connect(listener.(*net.UDPConn))
if err := bootstrapNode.Connect(listener.(*net.UDPConn)); err != nil {
t.Error("error connecting bootstrap node - ", err)
}
}
if numNodes < 1 {
@ -41,7 +44,11 @@ func TestingCreateDHT(numNodes int, bootstrap, concurrent bool) (*BootstrapNode,
panic(err)
}
go dht.Start()
go func() {
if err := dht.Start(); err != nil {
t.Error("error starting dht - ", err)
}
}()
if !concurrent {
dht.WaitUntilJoined()
}
@ -103,7 +110,7 @@ func newTestUDPConn(addr string) *testUDPConn {
func (t testUDPConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error) {
var timeoutCh <-chan time.Time
if !t.readDeadline.IsZero() {
timeoutCh = time.After(t.readDeadline.Sub(time.Now()))
timeoutCh = time.After(time.Until(t.readDeadline))
}
select {
@ -218,7 +225,7 @@ func verifyContacts(t *testing.T, contacts []interface{}, nodes []Contact) {
continue
}
for _, n := range nodes {
if n.ID.RawString() == id {
if n.ID.rawString() == id {
currNode = n
currNodeFound = true
foundNodes[id] = true

View file

@ -18,35 +18,47 @@ import (
)
const (
DefaultPort = 3333
// DefaultPort is the port the peer server listens on if not passed in.
DefaultPort = 3333
// LbrycrdAddress to be used when paying for data. Not implemented yet.
LbrycrdAddress = "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct"
)
// Server is an instance of a peer server that houses the listener and store.
type Server struct {
store store.BlobStore
l net.Listener
closed bool
}
// NewServer returns an initialized Server pointer.
func NewServer(store store.BlobStore) *Server {
return &Server{
store: store,
}
}
// Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() {
// TODO: need waitgroup so we can finish whatever we're doing before stopping
s.closed = true
s.l.Close()
if err := s.l.Close(); err != nil {
log.Error("error shuting down peer server - ", err)
}
}
// ListenAndServe starts the server listener to handle connections.
func (s *Server) ListenAndServe(address string) error {
log.Println("Listening on " + address)
l, err := net.Listen("tcp", address)
if err != nil {
return err
}
defer l.Close()
defer func(listener net.Listener) {
if err := listener.Close(); err != nil {
log.Error("error closing listener for peer server - ", err)
}
}(l)
for {
conn, err := l.Accept()
@ -62,7 +74,11 @@ func (s *Server) ListenAndServe(address string) error {
}
func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()
defer func(conn net.Conn) {
if err := conn.Close(); err != nil {
log.Error("error closing client connection for peer server - ", err)
}
}(conn)
timeoutDuration := 5 * time.Second
@ -71,7 +87,9 @@ func (s *Server) handleConnection(conn net.Conn) {
var response []byte
var err error
conn.SetReadDeadline(time.Now().Add(timeoutDuration))
if err := conn.SetReadDeadline(time.Now().Add(timeoutDuration)); err != nil {
log.Error("error setting read deadline for client connection - ", err)
}
request, err = readNextRequest(conn)
if err != nil {
if err != io.EOF {
@ -79,7 +97,9 @@ func (s *Server) handleConnection(conn net.Conn) {
}
return
}
conn.SetReadDeadline(time.Time{})
if err := conn.SetReadDeadline(time.Time{}); err != nil {
log.Error("error setting read deadline client connection - ", err)
}
if strings.Contains(string(request), `"requested_blobs"`) {
log.Debugln("received availability request")
@ -225,6 +245,7 @@ func isValidJSON(b []byte) bool {
return json.Unmarshal(b, &r) == nil
}
// GetBlobHash returns the sha512 hash hex encoded string of the blob byte slice.
func GetBlobHash(blob []byte) string {
hashBytes := sha512.Sum384(blob)
return hex.EncodeToString(hashBytes[:])
@ -234,7 +255,8 @@ const (
maxRequestSize = 64 * (2 ^ 10) // 64kb
paymentRateAccepted = "RATE_ACCEPTED"
paymentRateTooLow = "RATE_TOO_LOW"
paymentRateUnset = "RATE_UNSET"
//ToDo: paymentRateUnset is not used but exists in the protocol.
//paymentRateUnset = "RATE_UNSET"
)
var errRequestTooLarge = errors.Base("request is too large")

View file

@ -33,18 +33,20 @@ var availabilityRequests = []pair{
},
}
func getServer(withBlobs bool) *Server {
func getServer(t *testing.T, withBlobs bool) *Server {
st := store.MemoryBlobStore{}
if withBlobs {
for k, v := range blobs {
st.Put(k, v)
if err := st.Put(k, v); err != nil {
t.Error("error during put operation of memory blobstore - ", err)
}
}
}
return NewServer(&st)
}
func TestAvailabilityRequest_NoBlobs(t *testing.T) {
s := getServer(false)
s := getServer(t, false)
for _, p := range availabilityRequests {
response, err := s.handleAvailabilityRequest(p.request)
@ -59,7 +61,7 @@ func TestAvailabilityRequest_NoBlobs(t *testing.T) {
}
func TestAvailabilityRequest_WithBlobs(t *testing.T) {
s := getServer(true)
s := getServer(t, true)
for _, p := range availabilityRequests {
response, err := s.handleAvailabilityRequest(p.request)

View file

@ -9,11 +9,13 @@ import (
log "github.com/sirupsen/logrus"
)
// Client is an instance of a client connected to a server.
type Client struct {
conn net.Conn
connected bool
}
// Connect connects to a specific clients and errors if it cannot be contacted.
func (c *Client) Connect(address string) error {
var err error
c.conn, err = net.Dial("tcp", address)
@ -23,11 +25,14 @@ func (c *Client) Connect(address string) error {
c.connected = true
return c.doHandshake(protocolVersion1)
}
// Close closes the connection with the client.
func (c *Client) Close() error {
c.connected = false
return c.conn.Close()
}
// SendBlob sends a send blob request to the client.
func (c *Client) SendBlob(blob []byte) error {
if !c.connected {
return errors.Err("not connected")

View file

@ -8,21 +8,29 @@ import (
"testing"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
)
var address = "localhost:" + strconv.Itoa(DefaultPort)
var s Server
func TestMain(m *testing.M) {
dir, err := ioutil.TempDir("", "reflector_client_test")
if err != nil {
panic(err)
log.Panic("could not create temp directory - ", err)
}
defer os.RemoveAll(dir)
defer func(directory string) {
if err := os.RemoveAll(dir); err != nil {
log.Panic("error removing files and directory - ", err)
}
}(dir)
ms := store.MemoryBlobStore{}
s := NewServer(&ms)
go s.ListenAndServe(address)
go func() {
if err := s.ListenAndServe(address); err != nil {
log.Panic("error starting up reflector server - ", err)
}
}()
os.Exit(m.Run())
}
@ -39,7 +47,7 @@ func TestSmallBlob(t *testing.T) {
c := Client{}
err := c.Connect(address)
if err != nil {
t.Error(err)
t.Error("error connecting client to server - ", err)
}
err = c.SendBlob([]byte{})

View file

@ -8,6 +8,7 @@ import (
"github.com/lbryio/reflector.go/store"
)
// Prism is the root instance of the application and houses the DHT, Peer Server, Reflector Server, and Cluster.
type Prism struct {
dht *dht.DHT
peer *peer.Server
@ -17,6 +18,7 @@ type Prism struct {
stop *stopOnce.Stopper
}
// NewPrism returns an initialized Prism instance pointer.
func NewPrism(store store.BlobStore, clusterSeedAddr string) *Prism {
d, err := dht.New(nil)
if err != nil {
@ -31,6 +33,7 @@ func NewPrism(store store.BlobStore, clusterSeedAddr string) *Prism {
}
}
// Connect starts the components of the application.
func (p *Prism) Connect() error {
err := p.dht.Start()
if err != nil {
@ -49,6 +52,7 @@ func (p *Prism) Connect() error {
return nil
}
// Shutdown gracefully shuts down the different prism components before exiting.
func (p *Prism) Shutdown() {
p.stop.StopAndWait()
p.reflector.Shutdown()

View file

@ -14,31 +14,43 @@ import (
log "github.com/sirupsen/logrus"
)
// Server is and instance of the reflector server. It houses the blob store and listener.
type Server struct {
store store.BlobStore
l net.Listener
closed bool
}
// NewServer returns an initialized reflector server pointer.
func NewServer(store store.BlobStore) *Server {
return &Server{
store: store,
}
}
// Shutdown shuts down the reflector server gracefully.
func (s *Server) Shutdown() {
// TODO: need waitgroup so we can finish whatever we're doing before stopping
s.closed = true
s.l.Close()
if err := s.l.Close(); err != nil {
log.Error("error shutting down reflector server - ", err)
}
}
//ListenAndServe starts the server listener to handle connections.
func (s *Server) ListenAndServe(address string) error {
//ToDo - We should make this DRY as it is the same code in both servers.
log.Println("Listening on " + address)
l, err := net.Listen("tcp", address)
if err != nil {
return err
}
defer l.Close()
defer func(listener net.Listener) {
if err := listener.Close(); err != nil {
log.Error("error closing reflector server listener - ", err)
}
}(l)
for {
conn, err := l.Accept()
@ -55,14 +67,20 @@ func (s *Server) ListenAndServe(address string) error {
func (s *Server) handleConn(conn net.Conn) {
// TODO: connection should time out eventually
defer conn.Close()
defer func(conn net.Conn) {
if err := conn.Close(); err != nil {
log.Error("error closing reflector client connection - ", err)
}
}(conn)
err := s.doHandshake(conn)
if err != nil {
if err == io.EOF {
return
}
s.doError(conn, err)
if err := s.doError(conn, err); err != nil {
log.Error("error sending error response to reflector client connection - ", err)
}
return
}
@ -70,7 +88,9 @@ func (s *Server) handleConn(conn net.Conn) {
err = s.receiveBlob(conn)
if err != nil {
if err != io.EOF {
s.doError(conn, err)
if err := s.doError(conn, err); err != nil {
log.Error("error sending error response for receiving a blob to reflector client connection - ", err)
}
}
return
}

View file

@ -8,6 +8,7 @@ import (
)
const (
// DefaultPort is the port the reflector server listens on if not passed in.
DefaultPort = 5566
maxBlobSize = 2 * 1024 * 1024
@ -16,6 +17,7 @@ const (
protocolVersion2 = 1
)
// ErrBlobExists is a default error for when a blob already exists on the reflector server.
var ErrBlobExists = errors.Base("blob exists on server")
type errorResponse struct {

View file

@ -8,23 +8,28 @@ import (
"github.com/lbryio/reflector.go/types"
)
// DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored.
type DBBackedS3Store struct {
s3 *S3BlobStore
db db.DB
}
// NewDBBackedS3Store returns an initialized store pointer.
func NewDBBackedS3Store(s3 *S3BlobStore, db db.DB) *DBBackedS3Store {
return &DBBackedS3Store{s3: s3, db: db}
}
// Has returns T/F or Error ( if the DB errors ) if store contains the blob.
func (d *DBBackedS3Store) Has(hash string) (bool, error) {
return d.db.HasBlob(hash)
}
// Get returns the byte slice of the blob or an error.
func (d *DBBackedS3Store) Get(hash string) ([]byte, error) {
return d.s3.Get(hash)
}
// Put stores the blob in the S3 store and stores the blob information in the DB.
func (d *DBBackedS3Store) Put(hash string, blob []byte) error {
err := d.s3.Put(hash, blob)
if err != nil {
@ -34,6 +39,8 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error {
return d.db.AddBlob(hash, len(blob), true)
}
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
// there is an error storing the blob information in the DB.
func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error {
var blobContents types.SdBlob
err := json.Unmarshal(blob, &blobContents)

View file

@ -8,12 +8,14 @@ import (
"github.com/lbryio/lbry.go/errors"
)
// FileBlobStore is a local disk store.
type FileBlobStore struct {
dir string
initialized bool
}
// NewFileBlobStore returns an initialized file disk store pointer.
func NewFileBlobStore(dir string) *FileBlobStore {
return &FileBlobStore{dir: dir}
}
@ -43,6 +45,7 @@ func (f *FileBlobStore) initOnce() error {
return nil
}
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
func (f *FileBlobStore) Has(hash string) (bool, error) {
err := f.initOnce()
if err != nil {
@ -59,6 +62,7 @@ func (f *FileBlobStore) Has(hash string) (bool, error) {
return true, nil
}
// Get returns the byte slice of the blob stored or will error if the blob doesn't exist.
func (f *FileBlobStore) Get(hash string) ([]byte, error) {
err := f.initOnce()
if err != nil {
@ -76,6 +80,7 @@ func (f *FileBlobStore) Get(hash string) ([]byte, error) {
return ioutil.ReadAll(file)
}
// Put stores the blob on disk or errors with any IO error.
func (f *FileBlobStore) Put(hash string, blob []byte) error {
err := f.initOnce()
if err != nil {
@ -85,6 +90,8 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error {
return ioutil.WriteFile(f.path(hash), blob, 0644)
}
// PutSD stores the sd blob on the disk or errors with any IO error.
func (f *FileBlobStore) PutSD(hash string, blob []byte) error {
//Todo - need to handle when streaming hash is not present.
return f.Put(hash, blob)
}

View file

@ -2,10 +2,12 @@ package store
import "github.com/lbryio/lbry.go/errors"
// MemoryBlobStore is an in memory only blob store with no persistence.
type MemoryBlobStore struct {
blobs map[string][]byte
}
// Has returns T/F if the blob is currently stored. It will never error.
func (m *MemoryBlobStore) Has(hash string) (bool, error) {
if m.blobs == nil {
m.blobs = make(map[string][]byte)
@ -14,6 +16,7 @@ func (m *MemoryBlobStore) Has(hash string) (bool, error) {
return ok, nil
}
// Get returns the blob byte slice if present and errors if the blob is not found.
func (m *MemoryBlobStore) Get(hash string) ([]byte, error) {
if m.blobs == nil {
m.blobs = make(map[string][]byte)
@ -25,6 +28,7 @@ func (m *MemoryBlobStore) Get(hash string) ([]byte, error) {
return blob, nil
}
// Put stores the blob in memory. It will never error.
func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
if m.blobs == nil {
m.blobs = make(map[string][]byte)
@ -33,6 +37,8 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
return nil
}
// PutSD stores the sd blob in memory. It will never error.
func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error {
//ToDo - need to handle when stream is not present.
return m.Put(hash, blob)
}

View file

@ -20,7 +20,9 @@ func TestMemoryBlobStore_Get(t *testing.T) {
s := MemoryBlobStore{}
hash := "abc"
blob := []byte("abcdefg")
s.Put(hash, blob)
if err := s.Put(hash, blob); err != nil {
t.Error("error getting memory blob - ", err)
}
gotBlob, err := s.Get(hash)
if err != nil {

View file

@ -16,6 +16,7 @@ import (
log "github.com/sirupsen/logrus"
)
// S3BlobStore is an S3 store
type S3BlobStore struct {
awsID string
awsSecret string
@ -25,6 +26,7 @@ type S3BlobStore struct {
session *session.Session
}
// NewS3BlobStore returns an initialized S3 store pointer.
func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
return &S3BlobStore{
awsID: awsID,
@ -51,6 +53,7 @@ func (s *S3BlobStore) initOnce() error {
return nil
}
// Has returns T/F or Error ( from S3 ) if the store contains the blob.
func (s *S3BlobStore) Has(hash string) (bool, error) {
err := s.initOnce()
if err != nil {
@ -71,7 +74,9 @@ func (s *S3BlobStore) Has(hash string) (bool, error) {
return true, nil
}
// Get returns the blob slice if present or errors on S3.
func (s *S3BlobStore) Get(hash string) ([]byte, error) {
//Todo-Need to handle error for blob doesn't exist for consistency.
err := s.initOnce()
if err != nil {
return []byte{}, err
@ -102,6 +107,7 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) {
return buf.Bytes(), nil
}
// Put stores the blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) Put(hash string, blob []byte) error {
err := s.initOnce()
if err != nil {
@ -122,6 +128,8 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error {
return err
}
// PutSD stores the sd blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) PutSD(hash string, blob []byte) error {
//Todo - handle missing stream for consistency
return s.Put(hash, blob)
}

View file

@ -2,6 +2,7 @@ package store
import "github.com/lbryio/lbry.go/errors"
// BlobStore is an interface with methods for consistently handling blob storage.
type BlobStore interface {
Has(string) (bool, error)
Get(string) ([]byte, error)
@ -9,4 +10,5 @@ type BlobStore interface {
PutSD(string, []byte) error
}
//ErrBlobNotFound is a standard error when a blob is not found in the store.
var ErrBlobNotFound = errors.Base("blob not found")

View file

@ -1,5 +1,6 @@
package types
// SdBlob is an instance of specialized blob that contains information on the rest of the blobs it is associated with.
type SdBlob struct {
StreamName string `json:"stream_name"`
Blobs []struct {