add a few tests, better error handling
This commit is contained in:
parent
7b3ac43fff
commit
0d458aefc3
10 changed files with 194 additions and 8 deletions
5
Makefile
Normal file
5
Makefile
Normal file
|
@ -0,0 +1,5 @@
|
|||
.PHONY: test
|
||||
.DEFAULT_GOAL: test
|
||||
|
||||
test:
|
||||
go test ./... -v -cover
|
6
db/db.go
6
db/db.go
|
@ -164,7 +164,7 @@ CREATE TABLE stream (
|
|||
sd_hash char(96) NOT NULL,
|
||||
PRIMARY KEY (hash),
|
||||
KEY sd_hash_idx (sd_hash),
|
||||
FOREIGN KEY stream_sd_hash_blob_hash (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE
|
||||
FOREIGN KEY (sd_hash) REFERENCES blob_ (hash) ON DELETE RESTRICT ON UPDATE CASCADE
|
||||
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
||||
CREATE TABLE stream_blob (
|
||||
|
@ -172,8 +172,8 @@ CREATE TABLE stream_blob (
|
|||
blob_hash char(96) NOT NULL,
|
||||
num int NOT NULL,
|
||||
PRIMARY KEY (stream_hash, blob_hash),
|
||||
FOREIGN KEY stream_hash_stream_hash (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE,
|
||||
FOREIGN KEY blob_hash_blob_hash (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
FOREIGN KEY (stream_hash) REFERENCES stream (hash) ON DELETE CASCADE ON UPDATE CASCADE,
|
||||
FOREIGN KEY (blob_hash) REFERENCES blob_ (hash) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
||||
`
|
||||
|
|
|
@ -17,7 +17,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
DefaultPort = 3333
|
||||
DefaultPort = 3333
|
||||
LbrycrdAddress = "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
|
@ -105,7 +106,6 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
|||
return []byte{}, err
|
||||
}
|
||||
|
||||
address := "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct"
|
||||
availableBlobs := []string{}
|
||||
for _, blobHash := range request.RequestedBlobs {
|
||||
exists, err := s.store.Has(blobHash)
|
||||
|
@ -117,7 +117,7 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return json.Marshal(availabilityResponse{LbrycrdAddress: address, AvailableBlobs: availableBlobs})
|
||||
return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs})
|
||||
}
|
||||
|
||||
func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) {
|
||||
|
|
74
peer/server_test.go
Normal file
74
peer/server_test.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package peer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
)
|
||||
|
||||
var blobs = map[string][]byte{
|
||||
"a": []byte("abcdefg"),
|
||||
"b": []byte("hijklmn"),
|
||||
"c": []byte("opqrstu"),
|
||||
}
|
||||
|
||||
type pair struct {
|
||||
request []byte
|
||||
response []byte
|
||||
}
|
||||
|
||||
var availabilityRequests = []pair{
|
||||
{
|
||||
request: []byte(`{"lbrycrd_address":true,"requested_blobs":["a","b"]}`),
|
||||
response: []byte(`{"lbrycrd_address":"` + LbrycrdAddress + `","available_blobs":["a","b"]}`),
|
||||
},
|
||||
{
|
||||
request: []byte(`{"lbrycrd_address":true,"requested_blobs":["x","a","y"]}`),
|
||||
response: []byte(`{"lbrycrd_address":"` + LbrycrdAddress + `","available_blobs":["a"]}`),
|
||||
},
|
||||
{
|
||||
request: []byte(`{"lbrycrd_address":true,"requested_blobs":[]}`),
|
||||
response: []byte(`{"lbrycrd_address":"` + LbrycrdAddress + `","available_blobs":[]}`),
|
||||
},
|
||||
}
|
||||
|
||||
func getServer(withBlobs bool) *Server {
|
||||
st := store.MemoryBlobStore{}
|
||||
if withBlobs {
|
||||
for k, v := range blobs {
|
||||
st.Put(k, v)
|
||||
}
|
||||
}
|
||||
return NewServer(&st)
|
||||
}
|
||||
|
||||
func TestAvailabilityRequest_NoBlobs(t *testing.T) {
|
||||
s := getServer(false)
|
||||
|
||||
for _, p := range availabilityRequests {
|
||||
response, err := s.handleAvailabilityRequest(p.request)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
if !bytes.Equal(response, []byte(`{"lbrycrd_address":"`+LbrycrdAddress+`","available_blobs":[]}`)) {
|
||||
t.Errorf("Response did not match expected response. Got %s", string(response))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAvailabilityRequest_WithBlobs(t *testing.T) {
|
||||
s := getServer(true)
|
||||
|
||||
for _, p := range availabilityRequests {
|
||||
response, err := s.handleAvailabilityRequest(p.request)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
if !bytes.Equal(response, p.response) {
|
||||
t.Errorf("Response did not match expected response.\nExpected: %s\nGot: %s", string(p.response), string(response))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -7,6 +7,8 @@ import (
|
|||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
)
|
||||
|
||||
var address = "localhost:" + strconv.Itoa(DefaultPort)
|
||||
|
@ -21,7 +23,8 @@ func TestMain(m *testing.M) {
|
|||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
s := NewServer(dir)
|
||||
ms := store.MemoryBlobStore{}
|
||||
s := NewServer(&ms)
|
||||
go s.ListenAndServe(address)
|
||||
|
||||
os.Exit(m.Run())
|
||||
|
|
|
@ -67,6 +67,9 @@ func (f *FileBlobStore) Get(hash string) ([]byte, error) {
|
|||
|
||||
file, err := os.Open(f.path(hash))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return []byte{}, errors.Err(ErrBlobNotFound)
|
||||
}
|
||||
return []byte{}, err
|
||||
}
|
||||
|
||||
|
|
38
store/memory.go
Normal file
38
store/memory.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package store
|
||||
|
||||
import "github.com/lbryio/errors.go"
|
||||
|
||||
type MemoryBlobStore struct {
|
||||
blobs map[string][]byte
|
||||
}
|
||||
|
||||
func (m *MemoryBlobStore) Has(hash string) (bool, error) {
|
||||
if m.blobs == nil {
|
||||
m.blobs = make(map[string][]byte)
|
||||
}
|
||||
_, ok := m.blobs[hash]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
func (m *MemoryBlobStore) Get(hash string) ([]byte, error) {
|
||||
if m.blobs == nil {
|
||||
m.blobs = make(map[string][]byte)
|
||||
}
|
||||
blob, ok := m.blobs[hash]
|
||||
if !ok {
|
||||
return []byte{}, errors.Err(ErrBlobNotFound)
|
||||
}
|
||||
return blob, nil
|
||||
}
|
||||
|
||||
func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
|
||||
if m.blobs == nil {
|
||||
m.blobs = make(map[string][]byte)
|
||||
}
|
||||
m.blobs[hash] = blob
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error {
|
||||
return m.Put(hash, blob)
|
||||
}
|
43
store/memory_test.go
Normal file
43
store/memory_test.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/lbryio/errors.go"
|
||||
)
|
||||
|
||||
func TestMemoryBlobStore_Put(t *testing.T) {
|
||||
s := MemoryBlobStore{}
|
||||
blob := []byte("abcdefg")
|
||||
err := s.Put("abc", blob)
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryBlobStore_Get(t *testing.T) {
|
||||
s := MemoryBlobStore{}
|
||||
hash := "abc"
|
||||
blob := []byte("abcdefg")
|
||||
s.Put(hash, blob)
|
||||
|
||||
gotBlob, err := s.Get(hash)
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
if !bytes.Equal(gotBlob, blob) {
|
||||
t.Error("Got blob that is different from expected blob")
|
||||
}
|
||||
|
||||
missingBlob, err := s.Get("nonexistent hash")
|
||||
if err == nil {
|
||||
t.Errorf("Expected ErrBlobNotFound, got nil")
|
||||
}
|
||||
if !errors.Is(err, ErrBlobNotFound) {
|
||||
t.Errorf("Received unexpected error: %v", err)
|
||||
}
|
||||
if !bytes.Equal(missingBlob, []byte{}) {
|
||||
t.Error("Got blob that is not empty")
|
||||
}
|
||||
}
|
18
store/s3.go
18
store/s3.go
|
@ -5,7 +5,10 @@ import (
|
|||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/errors.go"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
|
@ -74,12 +77,25 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) {
|
|||
return []byte{}, err
|
||||
}
|
||||
|
||||
log.Debugf("Getting %s from S3", hash[:8])
|
||||
defer func(t time.Time) {
|
||||
log.Debugf("Getting %s took %s", hash[:8], time.Since(t).String())
|
||||
}(time.Now())
|
||||
|
||||
buf := &aws.WriteAtBuffer{}
|
||||
_, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(hash),
|
||||
})
|
||||
if err != nil {
|
||||
if aerr, ok := err.(awserr.Error); ok {
|
||||
switch aerr.Code() {
|
||||
case s3.ErrCodeNoSuchBucket:
|
||||
return []byte{}, errors.Err("bucket %s does not exist", s.bucket)
|
||||
case s3.ErrCodeNoSuchKey:
|
||||
return []byte{}, errors.Err(ErrBlobNotFound)
|
||||
}
|
||||
}
|
||||
return buf.Bytes(), err
|
||||
}
|
||||
|
||||
|
@ -94,7 +110,7 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error {
|
|||
|
||||
log.Debugf("Uploading %s to S3", hash[:8])
|
||||
defer func(t time.Time) {
|
||||
log.Debugf("Upload took %s", time.Since(t).String())
|
||||
log.Debugf("Uploading %s took %s", hash[:8], time.Since(t).String())
|
||||
}(time.Now())
|
||||
|
||||
_, err = s3manager.NewUploader(s.session).Upload(&s3manager.UploadInput{
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
package store
|
||||
|
||||
import "github.com/lbryio/errors.go"
|
||||
|
||||
type BlobStore interface {
|
||||
Has(string) (bool, error)
|
||||
Get(string) ([]byte, error)
|
||||
Put(string, []byte) error
|
||||
PutSD(string, []byte) error
|
||||
}
|
||||
|
||||
var ErrBlobNotFound = errors.Base("blob not found")
|
||||
|
|
Loading…
Reference in a new issue