make MemoryStore consistent with the New...() pattern
This commit is contained in:
parent
2ca83139df
commit
69f1e0f4ca
8 changed files with 34 additions and 38 deletions
|
@ -29,7 +29,7 @@ func init() {
|
||||||
func testCmd(cmd *cobra.Command, args []string) {
|
func testCmd(cmd *cobra.Command, args []string) {
|
||||||
log.Printf("reflector version %s", meta.Version)
|
log.Printf("reflector version %s", meta.Version)
|
||||||
|
|
||||||
memStore := &store.MemoryBlobStore{}
|
memStore := store.NewMemoryBlobStore()
|
||||||
|
|
||||||
reflectorServer := reflector.NewServer(memStore)
|
reflectorServer := reflector.NewServer(memStore)
|
||||||
reflectorServer.Timeout = 3 * time.Minute
|
reflectorServer.Timeout = 3 * time.Minute
|
||||||
|
|
|
@ -8,13 +8,12 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/stream"
|
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
"github.com/lbryio/reflector.go/reflector"
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/extras/errors"
|
"github.com/lbryio/lbry.go/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/extras/stop"
|
"github.com/lbryio/lbry.go/extras/stop"
|
||||||
|
"github.com/lbryio/lbry.go/stream"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -169,14 +168,14 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
||||||
var request availabilityRequest
|
var request availabilityRequest
|
||||||
err := json.Unmarshal(data, &request)
|
err := json.Unmarshal(data, &request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
availableBlobs := []string{}
|
availableBlobs := []string{}
|
||||||
for _, blobHash := range request.RequestedBlobs {
|
for _, blobHash := range request.RequestedBlobs {
|
||||||
exists, err := s.store.Has(blobHash)
|
exists, err := s.store.Has(blobHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if exists {
|
if exists {
|
||||||
availableBlobs = append(availableBlobs, blobHash)
|
availableBlobs = append(availableBlobs, blobHash)
|
||||||
|
@ -190,7 +189,7 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
||||||
// var request paymentRateRequest
|
// var request paymentRateRequest
|
||||||
// err := json.Unmarshal(data, &request)
|
// err := json.Unmarshal(data, &request)
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// return []byte{}, err
|
// return nil, err
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// offerReply := paymentRateAccepted
|
// offerReply := paymentRateAccepted
|
||||||
|
@ -205,14 +204,14 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
||||||
// var request blobRequest
|
// var request blobRequest
|
||||||
// err := json.Unmarshal(data, &request)
|
// err := json.Unmarshal(data, &request)
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// return []byte{}, err
|
// return nil, err
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
// log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
||||||
//
|
//
|
||||||
// blob, err := s.store.Get(request.RequestedBlob)
|
// blob, err := s.store.Get(request.RequestedBlob)
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// return []byte{}, err
|
// return nil, err
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{
|
// response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{
|
||||||
|
@ -220,7 +219,7 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
||||||
// Length: len(blob),
|
// Length: len(blob),
|
||||||
// }})
|
// }})
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// return []byte{}, err
|
// return nil, err
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// return append(response, blob...), nil
|
// return append(response, blob...), nil
|
||||||
|
@ -230,7 +229,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
var request compositeRequest
|
var request compositeRequest
|
||||||
err := json.Unmarshal(data, &request)
|
err := json.Unmarshal(data, &request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
response := compositeResponse{
|
response := compositeResponse{
|
||||||
|
@ -242,7 +241,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
for _, blobHash := range request.RequestedBlobs {
|
for _, blobHash := range request.RequestedBlobs {
|
||||||
exists, err := s.store.Has(blobHash)
|
exists, err := s.store.Has(blobHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if exists {
|
if exists {
|
||||||
availableBlobs = append(availableBlobs, blobHash)
|
availableBlobs = append(availableBlobs, blobHash)
|
||||||
|
@ -270,7 +269,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
Error: err.Error(),
|
Error: err.Error(),
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return []byte{}, err
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
response.IncomingBlob = incomingBlob{
|
response.IncomingBlob = incomingBlob{
|
||||||
BlobHash: reflector.BlobHash(blob),
|
BlobHash: reflector.BlobHash(blob),
|
||||||
|
@ -282,7 +281,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
|
|
||||||
respData, err := json.Marshal(response)
|
respData, err := json.Marshal(response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return append(respData, blob...), nil
|
return append(respData, blob...), nil
|
||||||
|
|
|
@ -34,7 +34,7 @@ var availabilityRequests = []pair{
|
||||||
}
|
}
|
||||||
|
|
||||||
func getServer(t *testing.T, withBlobs bool) *Server {
|
func getServer(t *testing.T, withBlobs bool) *Server {
|
||||||
st := store.MemoryBlobStore{}
|
st := store.NewMemoryBlobStore()
|
||||||
if withBlobs {
|
if withBlobs {
|
||||||
for k, v := range blobs {
|
for k, v := range blobs {
|
||||||
err := st.Put(k, v)
|
err := st.Put(k, v)
|
||||||
|
@ -43,7 +43,7 @@ func getServer(t *testing.T, withBlobs bool) *Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return NewServer(&st)
|
return NewServer(st)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAvailabilityRequest_NoBlobs(t *testing.T) {
|
func TestAvailabilityRequest_NoBlobs(t *testing.T) {
|
||||||
|
|
|
@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(&store.MemoryBlobStore{})
|
srv := NewServer(store.NewMemoryBlobStore())
|
||||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(&store.MemoryBlobStore{})
|
srv := NewServer(store.NewMemoryBlobStore())
|
||||||
srv.Timeout = testTimeout
|
srv.Timeout = testTimeout
|
||||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) {
|
||||||
//}
|
//}
|
||||||
|
|
||||||
type mockPartialStore struct {
|
type mockPartialStore struct {
|
||||||
store.MemoryBlobStore
|
*store.MemoryBlobStore
|
||||||
missing []string
|
missing []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) {
|
||||||
missing[i] = bits.Rand().String()
|
missing[i] = bits.Rand().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
st := store.BlobStore(&mockPartialStore{missing: missing})
|
st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing})
|
||||||
if _, ok := st.(neededBlobChecker); !ok {
|
if _, ok := st.(neededBlobChecker); !ok {
|
||||||
t.Fatal("mock does not implement the relevant interface")
|
t.Fatal("mock does not implement the relevant interface")
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,8 +6,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCachingBlobStore_Put(t *testing.T) {
|
func TestCachingBlobStore_Put(t *testing.T) {
|
||||||
origin := &MemoryBlobStore{}
|
origin := NewMemoryBlobStore()
|
||||||
cache := &MemoryBlobStore{}
|
cache := NewMemoryBlobStore()
|
||||||
s := NewCachingBlobStore(origin, cache)
|
s := NewCachingBlobStore(origin, cache)
|
||||||
|
|
||||||
b := []byte("this is a blob of stuff")
|
b := []byte("this is a blob of stuff")
|
||||||
|
@ -36,8 +36,8 @@ func TestCachingBlobStore_Put(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
||||||
origin := &MemoryBlobStore{}
|
origin := NewMemoryBlobStore()
|
||||||
cache := &MemoryBlobStore{}
|
cache := NewMemoryBlobStore()
|
||||||
s := NewCachingBlobStore(origin, cache)
|
s := NewCachingBlobStore(origin, cache)
|
||||||
|
|
||||||
b := []byte("this is a blob of stuff")
|
b := []byte("this is a blob of stuff")
|
||||||
|
|
|
@ -7,35 +7,32 @@ import (
|
||||||
|
|
||||||
// MemoryBlobStore is an in memory only blob store with no persistence.
|
// MemoryBlobStore is an in memory only blob store with no persistence.
|
||||||
type MemoryBlobStore struct {
|
type MemoryBlobStore struct {
|
||||||
blobs map[string][]byte
|
blobs map[string]stream.Blob
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMemoryBlobStore() *MemoryBlobStore {
|
||||||
|
return &MemoryBlobStore{
|
||||||
|
blobs: make(map[string]stream.Blob),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Has returns T/F if the blob is currently stored. It will never error.
|
// Has returns T/F if the blob is currently stored. It will never error.
|
||||||
func (m *MemoryBlobStore) Has(hash string) (bool, error) {
|
func (m *MemoryBlobStore) Has(hash string) (bool, error) {
|
||||||
if m.blobs == nil {
|
|
||||||
m.blobs = make(map[string][]byte)
|
|
||||||
}
|
|
||||||
_, ok := m.blobs[hash]
|
_, ok := m.blobs[hash]
|
||||||
return ok, nil
|
return ok, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the blob byte slice if present and errors if the blob is not found.
|
// Get returns the blob byte slice if present and errors if the blob is not found.
|
||||||
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
|
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
|
||||||
if m.blobs == nil {
|
|
||||||
m.blobs = make(map[string][]byte)
|
|
||||||
}
|
|
||||||
blob, ok := m.blobs[hash]
|
blob, ok := m.blobs[hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
return []byte{}, errors.Err(ErrBlobNotFound)
|
return nil, errors.Err(ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
return blob, nil
|
return blob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob in memory
|
// Put stores the blob in memory
|
||||||
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error {
|
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error {
|
||||||
if m.blobs == nil {
|
|
||||||
m.blobs = make(map[string][]byte)
|
|
||||||
}
|
|
||||||
m.blobs[hash] = blob
|
m.blobs[hash] = blob
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -52,6 +49,6 @@ func (m *MemoryBlobStore) Delete(hash string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Debug returns the blobs in memory. It's useful for testing and debugging.
|
// Debug returns the blobs in memory. It's useful for testing and debugging.
|
||||||
func (m *MemoryBlobStore) Debug() map[string][]byte {
|
func (m *MemoryBlobStore) Debug() map[string]stream.Blob {
|
||||||
return m.blobs
|
return m.blobs
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMemoryBlobStore_Put(t *testing.T) {
|
func TestMemoryBlobStore_Put(t *testing.T) {
|
||||||
s := MemoryBlobStore{}
|
s := NewMemoryBlobStore()
|
||||||
blob := []byte("abcdefg")
|
blob := []byte("abcdefg")
|
||||||
err := s.Put("abc", blob)
|
err := s.Put("abc", blob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -17,7 +17,7 @@ func TestMemoryBlobStore_Put(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMemoryBlobStore_Get(t *testing.T) {
|
func TestMemoryBlobStore_Get(t *testing.T) {
|
||||||
s := MemoryBlobStore{}
|
s := NewMemoryBlobStore()
|
||||||
hash := "abc"
|
hash := "abc"
|
||||||
blob := []byte("abcdefg")
|
blob := []byte("abcdefg")
|
||||||
err := s.Put(hash, blob)
|
err := s.Put(hash, blob)
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
"github.com/lbryio/lbry.go/stream"
|
"github.com/lbryio/lbry.go/stream"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BlobStore is an interface with methods for consistently handling blob storage.
|
// BlobStore is an interface for handling blob storage.
|
||||||
type BlobStore interface {
|
type BlobStore interface {
|
||||||
// Does blob exist in the store
|
// Does blob exist in the store
|
||||||
Has(hash string) (bool, error)
|
Has(hash string) (bool, error)
|
||||||
|
|
Loading…
Reference in a new issue