diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f91ffdc --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +.PHONY: test +.DEFAULT_GOAL: test + +test: + go test ./... -v -cover diff --git a/db/db.go b/db/db.go index c1acbd5..4816989 100644 --- a/db/db.go +++ b/db/db.go @@ -164,7 +164,7 @@ CREATE TABLE stream ( sd_hash char(96) NOT NULL, PRIMARY KEY (hash), KEY sd_hash_idx (sd_hash), - FOREIGN KEY stream_sd_hash_blob_hash (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE + FOREIGN KEY (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; CREATE TABLE stream_blob ( @@ -172,8 +172,8 @@ CREATE TABLE stream_blob ( blob_hash char(96) NOT NULL, num int NOT NULL, PRIMARY KEY (stream_hash, blob_hash), - FOREIGN KEY stream_hash_stream_hash (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE, - FOREIGN KEY blob_hash_blob_hash (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE + FOREIGN KEY (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE, + FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; ` diff --git a/peer/server.go b/peer/server.go index f75e657..1b56a30 100644 --- a/peer/server.go +++ b/peer/server.go @@ -17,7 +17,8 @@ import ( ) const ( - DefaultPort = 3333 + DefaultPort = 3333 + LbrycrdAddress = "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct" ) type Server struct { @@ -105,7 +106,6 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) { return []byte{}, err } - address := "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct" availableBlobs := []string{} for _, blobHash := range request.RequestedBlobs { exists, err := s.store.Has(blobHash) @@ -117,7 +117,7 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) { } } - return json.Marshal(availabilityResponse{LbrycrdAddress: address, AvailableBlobs: availableBlobs}) + return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs}) } func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) { diff --git a/peer/server_test.go b/peer/server_test.go new file mode 100644 index 0000000..193ab4a --- /dev/null +++ b/peer/server_test.go @@ -0,0 +1,74 @@ +package peer + +import ( + "bytes" + "testing" + + "github.com/lbryio/reflector.go/store" +) + +var blobs = map[string][]byte{ + "a": []byte("abcdefg"), + "b": []byte("hijklmn"), + "c": []byte("opqrstu"), +} + +type pair struct { + request []byte + response []byte +} + +var availabilityRequests = []pair{ + { + request: []byte(`{"lbrycrd_address":true,"requested_blobs":["a","b"]}`), + response: []byte(`{"lbrycrd_address":"` + LbrycrdAddress + `","available_blobs":["a","b"]}`), + }, + { + request: []byte(`{"lbrycrd_address":true,"requested_blobs":["x","a","y"]}`), + response: []byte(`{"lbrycrd_address":"` + LbrycrdAddress + `","available_blobs":["a"]}`), + }, + { + request: []byte(`{"lbrycrd_address":true,"requested_blobs":[]}`), + response: []byte(`{"lbrycrd_address":"` + LbrycrdAddress + `","available_blobs":[]}`), + }, +} + +func getServer(withBlobs bool) *Server { + st := store.MemoryBlobStore{} + if withBlobs { + for k, v := range blobs { + st.Put(k, v) + } + } + return NewServer(&st) +} + +func TestAvailabilityRequest_NoBlobs(t *testing.T) { + s := getServer(false) + + for _, p := range availabilityRequests { + response, err := s.handleAvailabilityRequest(p.request) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if !bytes.Equal(response, []byte(`{"lbrycrd_address":"`+LbrycrdAddress+`","available_blobs":[]}`)) { + t.Errorf("Response did not match expected response. Got %s", string(response)) + } + } +} + +func TestAvailabilityRequest_WithBlobs(t *testing.T) { + s := getServer(true) + + for _, p := range availabilityRequests { + response, err := s.handleAvailabilityRequest(p.request) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if !bytes.Equal(response, p.response) { + t.Errorf("Response did not match expected response.\nExpected: %s\nGot: %s", string(p.response), string(response)) + } + } +} diff --git a/reflector/client_test.go b/reflector/client_test.go index 3c1e712..501d208 100644 --- a/reflector/client_test.go +++ b/reflector/client_test.go @@ -7,6 +7,8 @@ import ( "strconv" "testing" "time" + + "github.com/lbryio/reflector.go/store" ) var address = "localhost:" + strconv.Itoa(DefaultPort) @@ -21,7 +23,8 @@ func TestMain(m *testing.M) { } defer os.RemoveAll(dir) - s := NewServer(dir) + ms := store.MemoryBlobStore{} + s := NewServer(&ms) go s.ListenAndServe(address) os.Exit(m.Run()) diff --git a/store/file.go b/store/file.go index 0396790..9036e89 100644 --- a/store/file.go +++ b/store/file.go @@ -67,6 +67,9 @@ func (f *FileBlobStore) Get(hash string) ([]byte, error) { file, err := os.Open(f.path(hash)) if err != nil { + if os.IsNotExist(err) { + return []byte{}, errors.Err(ErrBlobNotFound) + } return []byte{}, err } diff --git a/store/memory.go b/store/memory.go new file mode 100644 index 0000000..de80e49 --- /dev/null +++ b/store/memory.go @@ -0,0 +1,38 @@ +package store + +import "github.com/lbryio/errors.go" + +type MemoryBlobStore struct { + blobs map[string][]byte +} + +func (m *MemoryBlobStore) Has(hash string) (bool, error) { + if m.blobs == nil { + m.blobs = make(map[string][]byte) + } + _, ok := m.blobs[hash] + return ok, nil +} + +func (m *MemoryBlobStore) Get(hash string) ([]byte, error) { + if m.blobs == nil { + m.blobs = make(map[string][]byte) + } + blob, ok := m.blobs[hash] + if !ok { + return []byte{}, errors.Err(ErrBlobNotFound) + } + return blob, nil +} + +func (m *MemoryBlobStore) Put(hash string, blob []byte) error { + if m.blobs == nil { + m.blobs = make(map[string][]byte) + } + m.blobs[hash] = blob + return nil +} + +func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error { + return m.Put(hash, blob) +} diff --git a/store/memory_test.go b/store/memory_test.go new file mode 100644 index 0000000..3502af7 --- /dev/null +++ b/store/memory_test.go @@ -0,0 +1,43 @@ +package store + +import ( + "bytes" + "testing" + + "github.com/lbryio/errors.go" +) + +func TestMemoryBlobStore_Put(t *testing.T) { + s := MemoryBlobStore{} + blob := []byte("abcdefg") + err := s.Put("abc", blob) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } +} + +func TestMemoryBlobStore_Get(t *testing.T) { + s := MemoryBlobStore{} + hash := "abc" + blob := []byte("abcdefg") + s.Put(hash, blob) + + gotBlob, err := s.Get(hash) + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if !bytes.Equal(gotBlob, blob) { + t.Error("Got blob that is different from expected blob") + } + + missingBlob, err := s.Get("nonexistent hash") + if err == nil { + t.Errorf("Expected ErrBlobNotFound, got nil") + } + if !errors.Is(err, ErrBlobNotFound) { + t.Errorf("Received unexpected error: %v", err) + } + if !bytes.Equal(missingBlob, []byte{}) { + t.Error("Got blob that is not empty") + } +} diff --git a/store/s3.go b/store/s3.go index 78ff838..aec5271 100644 --- a/store/s3.go +++ b/store/s3.go @@ -5,7 +5,10 @@ import ( "net/http" "time" + "github.com/lbryio/errors.go" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" @@ -74,12 +77,25 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) { return []byte{}, err } + log.Debugf("Getting %s from S3", hash[:8]) + defer func(t time.Time) { + log.Debugf("Getting %s took %s", hash[:8], time.Since(t).String()) + }(time.Now()) + buf := &aws.WriteAtBuffer{} _, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(hash), }) if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case s3.ErrCodeNoSuchBucket: + return []byte{}, errors.Err("bucket %s does not exist", s.bucket) + case s3.ErrCodeNoSuchKey: + return []byte{}, errors.Err(ErrBlobNotFound) + } + } return buf.Bytes(), err } @@ -94,7 +110,7 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error { log.Debugf("Uploading %s to S3", hash[:8]) defer func(t time.Time) { - log.Debugf("Upload took %s", time.Since(t).String()) + log.Debugf("Uploading %s took %s", hash[:8], time.Since(t).String()) }(time.Now()) _, err = s3manager.NewUploader(s.session).Upload(&s3manager.UploadInput{ diff --git a/store/store.go b/store/store.go index 43e76ae..4912d0e 100644 --- a/store/store.go +++ b/store/store.go @@ -1,8 +1,12 @@ package store +import "github.com/lbryio/errors.go" + type BlobStore interface { Has(string) (bool, error) Get(string) ([]byte, error) Put(string, []byte) error PutSD(string, []byte) error } + +var ErrBlobNotFound = errors.Base("blob not found")