use stream.Blob for BlobStore interface

This commit is contained in:
Alex Grintsvayg 2019-10-03 16:34:57 -04:00
parent 0af6d65d40
commit 2ca83139df
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
8 changed files with 65 additions and 55 deletions

View file

@ -5,11 +5,10 @@ import (
"os"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/stream"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -56,11 +55,10 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
}
for i := 0; i < len(sd.BlobInfos)-1; i++ {
bb, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
b, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
log.Fatal(err)
}
b := stream.Blob(bb)
data, err := b.Plaintext(sd.Key, sd.BlobInfos[i].IV)
if err != nil {

View file

@ -2,6 +2,7 @@ package peer
import (
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
)
// Store is a blob store that gets blobs from a peer.
@ -28,7 +29,7 @@ func (p *Store) Has(hash string) (bool, error) {
}
// Get downloads the blob from the peer
func (p *Store) Get(hash string) ([]byte, error) {
func (p *Store) Get(hash string) (stream.Blob, error) {
if p.connErr != nil {
return nil, errors.Prefix("connection error", p.connErr)
}
@ -37,12 +38,12 @@ func (p *Store) Get(hash string) ([]byte, error) {
}
// Put is not supported
func (p *Store) Put(hash string, blob []byte) error {
func (p *Store) Put(hash string, blob stream.Blob) error {
panic("PeerStore cannot put or delete blobs")
}
// PutSD is not supported
func (p *Store) PutSD(hash string, blob []byte) error {
func (p *Store) PutSD(hash string, blob stream.Blob) error {
panic("PeerStore cannot put or delete blobs")
}

View file

@ -2,6 +2,7 @@ package store
import (
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
)
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance.
@ -27,7 +28,7 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) {
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
// from the origin, it is also stored in the cache.
func (c *CachingBlobStore) Get(hash string) ([]byte, error) {
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) {
blob, err := c.cache.Get(hash)
if err == nil || !errors.Is(err, ErrBlobNotFound) {
return blob, err
@ -44,7 +45,7 @@ func (c *CachingBlobStore) Get(hash string) ([]byte, error) {
}
// Put stores the blob in the origin and the cache
func (c *CachingBlobStore) Put(hash string, blob []byte) error {
func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
err := c.origin.Put(hash, blob)
if err != nil {
return err
@ -53,7 +54,7 @@ func (c *CachingBlobStore) Put(hash string, blob []byte) error {
}
// PutSD stores the sd blob in the origin and the cache
func (c *CachingBlobStore) PutSD(hash string, blob []byte) error {
func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
err := c.origin.PutSD(hash, blob)
if err != nil {
return err

View file

@ -7,6 +7,8 @@ import (
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
log "github.com/sirupsen/logrus"
)
@ -29,12 +31,12 @@ func (d *DBBackedS3Store) Has(hash string) (bool, error) {
}
// Get gets the blob
func (d *DBBackedS3Store) Get(hash string) ([]byte, error) {
func (d *DBBackedS3Store) Get(hash string) (stream.Blob, error) {
return d.s3.Get(hash)
}
// Put stores the blob in the S3 store and stores the blob information in the DB.
func (d *DBBackedS3Store) Put(hash string, blob []byte) error {
func (d *DBBackedS3Store) Put(hash string, blob stream.Blob) error {
err := d.s3.Put(hash, blob)
if err != nil {
return err
@ -45,7 +47,7 @@ func (d *DBBackedS3Store) Put(hash string, blob []byte) error {
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
// there is an error storing the blob information in the DB.
func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error {
func (d *DBBackedS3Store) PutSD(hash string, blob stream.Blob) error {
var blobContents db.SdBlob
err := json.Unmarshal(blob, &blobContents)
if err != nil {

View file

@ -6,6 +6,7 @@ import (
"path"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
)
// DiskBlobStore stores blobs on a local disk
@ -23,43 +24,43 @@ func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore {
return &DiskBlobStore{blobDir: dir, prefixLength: prefixLength}
}
func (f *DiskBlobStore) dir(hash string) string {
if f.prefixLength <= 0 || len(hash) < f.prefixLength {
return f.blobDir
func (d *DiskBlobStore) dir(hash string) string {
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
return d.blobDir
}
return path.Join(f.blobDir, hash[:f.prefixLength])
return path.Join(d.blobDir, hash[:d.prefixLength])
}
func (f *DiskBlobStore) path(hash string) string {
return path.Join(f.dir(hash), hash)
func (d *DiskBlobStore) path(hash string) string {
return path.Join(d.dir(hash), hash)
}
func (f *DiskBlobStore) ensureDirExists(dir string) error {
func (d *DiskBlobStore) ensureDirExists(dir string) error {
return errors.Err(os.MkdirAll(dir, 0755))
}
func (f *DiskBlobStore) initOnce() error {
if f.initialized {
func (d *DiskBlobStore) initOnce() error {
if d.initialized {
return nil
}
err := f.ensureDirExists(f.blobDir)
err := d.ensureDirExists(d.blobDir)
if err != nil {
return err
}
f.initialized = true
d.initialized = true
return nil
}
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
func (f *DiskBlobStore) Has(hash string) (bool, error) {
err := f.initOnce()
func (d *DiskBlobStore) Has(hash string) (bool, error) {
err := d.initOnce()
if err != nil {
return false, err
}
_, err = os.Stat(f.path(hash))
_, err = os.Stat(d.path(hash))
if err != nil {
if os.IsNotExist(err) {
return false, nil
@ -69,14 +70,14 @@ func (f *DiskBlobStore) Has(hash string) (bool, error) {
return true, nil
}
// Get returns the byte slice of the blob stored or will error if the blob doesn't exist.
func (f *DiskBlobStore) Get(hash string) ([]byte, error) {
err := f.initOnce()
// Get returns the blob or an error if the blob doesn't exist.
func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
err := d.initOnce()
if err != nil {
return nil, err
}
file, err := os.Open(f.path(hash))
file, err := os.Open(d.path(hash))
if err != nil {
if os.IsNotExist(err) {
return nil, errors.Err(ErrBlobNotFound)
@ -88,33 +89,33 @@ func (f *DiskBlobStore) Get(hash string) ([]byte, error) {
}
// Put stores the blob on disk
func (f *DiskBlobStore) Put(hash string, blob []byte) error {
err := f.initOnce()
func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
err := d.initOnce()
if err != nil {
return err
}
err = f.ensureDirExists(f.dir(hash))
err = d.ensureDirExists(d.dir(hash))
if err != nil {
return err
}
return ioutil.WriteFile(f.path(hash), blob, 0644)
return ioutil.WriteFile(d.path(hash), blob, 0644)
}
// PutSD stores the sd blob on the disk
func (f *DiskBlobStore) PutSD(hash string, blob []byte) error {
return f.Put(hash, blob)
func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error {
return d.Put(hash, blob)
}
// Delete deletes the blob from the store
func (f *DiskBlobStore) Delete(hash string) error {
err := f.initOnce()
func (d *DiskBlobStore) Delete(hash string) error {
err := d.initOnce()
if err != nil {
return err
}
has, err := f.Has(hash)
has, err := d.Has(hash)
if err != nil {
return err
}
@ -122,5 +123,5 @@ func (f *DiskBlobStore) Delete(hash string) error {
return nil
}
return os.Remove(f.path(hash))
return os.Remove(d.path(hash))
}

View file

@ -1,6 +1,9 @@
package store
import "github.com/lbryio/lbry.go/extras/errors"
import (
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
)
// MemoryBlobStore is an in memory only blob store with no persistence.
type MemoryBlobStore struct {
@ -17,7 +20,7 @@ func (m *MemoryBlobStore) Has(hash string) (bool, error) {
}
// Get returns the blob byte slice if present and errors if the blob is not found.
func (m *MemoryBlobStore) Get(hash string) ([]byte, error) {
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
if m.blobs == nil {
m.blobs = make(map[string][]byte)
}
@ -29,7 +32,7 @@ func (m *MemoryBlobStore) Get(hash string) ([]byte, error) {
}
// Put stores the blob in memory
func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error {
if m.blobs == nil {
m.blobs = make(map[string][]byte)
}
@ -38,7 +41,7 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
}
// PutSD stores the sd blob in memory
func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error {
func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error {
return m.Put(hash, blob)
}

View file

@ -6,6 +6,7 @@ import (
"time"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
@ -75,11 +76,11 @@ func (s *S3BlobStore) Has(hash string) (bool, error) {
}
// Get returns the blob slice if present or errors on S3.
func (s *S3BlobStore) Get(hash string) ([]byte, error) {
func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
//Todo-Need to handle error for blob doesn't exist for consistency.
err := s.initOnce()
if err != nil {
return []byte{}, err
return nil, err
}
log.Debugf("Getting %s from S3", hash[:8])
@ -96,9 +97,9 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchBucket:
return []byte{}, errors.Err("bucket %s does not exist", s.bucket)
return nil, errors.Err("bucket %s does not exist", s.bucket)
case s3.ErrCodeNoSuchKey:
return []byte{}, errors.Err(ErrBlobNotFound)
return nil, errors.Err(ErrBlobNotFound)
}
}
return buf.Bytes(), err
@ -108,7 +109,7 @@ func (s *S3BlobStore) Get(hash string) ([]byte, error) {
}
// Put stores the blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) Put(hash string, blob []byte) error {
func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
err := s.initOnce()
if err != nil {
return err
@ -130,7 +131,7 @@ func (s *S3BlobStore) Put(hash string, blob []byte) error {
}
// PutSD stores the sd blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) PutSD(hash string, blob []byte) error {
func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error {
//Todo - handle missing stream for consistency
return s.Put(hash, blob)
}

View file

@ -1,17 +1,20 @@
package store
import "github.com/lbryio/lbry.go/extras/errors"
import (
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/stream"
)
// BlobStore is an interface with methods for consistently handling blob storage.
type BlobStore interface {
// Does blob exist in the store
Has(hash string) (bool, error)
// Get the blob from the store
Get(hash string) ([]byte, error)
Get(hash string) (stream.Blob, error)
// Put the blob into the store
Put(hash string, blob []byte) error
Put(hash string, blob stream.Blob) error
// Put an SD blob into the store
PutSD(hash string, blob []byte) error
PutSD(hash string, blob stream.Blob) error
// Delete the blob from the store
Delete(hash string) error
}