From 661c20a21da6d37dd8b20b5bf9108ef28823bccd Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 3 Oct 2019 16:58:17 -0400 Subject: [PATCH] make db-backed store more generic (not specific to s3) --- cmd/peer.go | 2 +- cmd/reflector.go | 2 +- cmd/start.go | 2 +- cmd/upload.go | 2 +- reflector/uploader.go | 4 ++-- store/dbbacked.go | 44 +++++++++++++++++++++---------------------- 6 files changed, 28 insertions(+), 28 deletions(-) diff --git a/cmd/peer.go b/cmd/peer.go index 7a82eae..88c49e0 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -37,7 +37,7 @@ func peerCmd(cmd *cobra.Command, args []string) { err = db.Connect(globalConfig.DBConn) checkErr(err) - combo := store.NewDBBackedS3Store(s3, db) + combo := store.NewDBBackedStore(s3, db) peerServer = peer.NewServer(combo) } diff --git a/cmd/reflector.go b/cmd/reflector.go index 34fe4ba..c065f58 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -48,7 +48,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { log.Fatal(err) } - blobStore = store.NewDBBackedS3Store(s3, db) + blobStore = store.NewDBBackedStore(s3, db) reflectorServer = reflector.NewServer(blobStore) reflectorServer.Timeout = 3 * time.Minute diff --git a/cmd/start.go b/cmd/start.go index fbb2123..862c10a 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -56,7 +56,7 @@ func startCmd(cmd *cobra.Command, args []string) { err := db.Connect(globalConfig.DBConn) checkErr(err) s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - comboStore := store.NewDBBackedS3Store(s3, db) + comboStore := store.NewDBBackedStore(s3, db) conf := prism.DefaultConf() diff --git a/cmd/upload.go b/cmd/upload.go index bbc3b97..49c5312 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -32,7 +32,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { err := db.Connect(globalConfig.DBConn) checkErr(err) - st := store.NewDBBackedS3Store( + st := store.NewDBBackedStore( store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), db) diff --git a/reflector/uploader.go b/reflector/uploader.go index bf3d2e4..2f1589b 100644 --- a/reflector/uploader.go +++ b/reflector/uploader.go @@ -30,7 +30,7 @@ type Summary struct { type Uploader struct { db *db.SQL - store *store.DBBackedS3Store // could just be store.BlobStore interface + store *store.DBBackedStore // could just be store.BlobStore interface workers int skipExistsCheck bool stopper *stop.Group @@ -39,7 +39,7 @@ type Uploader struct { count Summary } -func NewUploader(db *db.SQL, store *store.DBBackedS3Store, workers int, skipExistsCheck bool) *Uploader { +func NewUploader(db *db.SQL, store *store.DBBackedStore, workers int, skipExistsCheck bool) *Uploader { return &Uploader{ db: db, store: store, diff --git a/store/dbbacked.go b/store/dbbacked.go index f5feb0a..1b886dd 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -12,32 +12,32 @@ import ( log "github.com/sirupsen/logrus" ) -// DBBackedS3Store is an instance of an S3 Store that is backed by a DB for what is stored. -type DBBackedS3Store struct { - s3 *S3BlobStore +// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store. +type DBBackedStore struct { + blobs BlobStore db *db.SQL blockedMu sync.RWMutex blocked map[string]bool } -// NewDBBackedS3Store returns an initialized store pointer. -func NewDBBackedS3Store(s3 *S3BlobStore, db *db.SQL) *DBBackedS3Store { - return &DBBackedS3Store{s3: s3, db: db} +// NewDBBackedStore returns an initialized store pointer. +func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore { + return &DBBackedStore{blobs: blobs, db: db} } // Has returns true if the blob is in the store -func (d *DBBackedS3Store) Has(hash string) (bool, error) { +func (d *DBBackedStore) Has(hash string) (bool, error) { return d.db.HasBlob(hash) } // Get gets the blob -func (d *DBBackedS3Store) Get(hash string) (stream.Blob, error) { - return d.s3.Get(hash) +func (d *DBBackedStore) Get(hash string) (stream.Blob, error) { + return d.blobs.Get(hash) } // Put stores the blob in the S3 store and stores the blob information in the DB. -func (d *DBBackedS3Store) Put(hash string, blob stream.Blob) error { - err := d.s3.Put(hash, blob) +func (d *DBBackedStore) Put(hash string, blob stream.Blob) error { + err := d.blobs.Put(hash, blob) if err != nil { return err } @@ -47,7 +47,7 @@ func (d *DBBackedS3Store) Put(hash string, blob stream.Blob) error { // PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if // there is an error storing the blob information in the DB. -func (d *DBBackedS3Store) PutSD(hash string, blob stream.Blob) error { +func (d *DBBackedStore) PutSD(hash string, blob stream.Blob) error { var blobContents db.SdBlob err := json.Unmarshal(blob, &blobContents) if err != nil { @@ -57,7 +57,7 @@ func (d *DBBackedS3Store) PutSD(hash string, blob stream.Blob) error { return errors.Err("sd blob is missing stream hash") } - err = d.s3.PutSD(hash, blob) + err = d.blobs.PutSD(hash, blob) if err != nil { return err } @@ -65,8 +65,8 @@ func (d *DBBackedS3Store) PutSD(hash string, blob stream.Blob) error { return d.db.AddSDBlob(hash, len(blob), blobContents) } -func (d *DBBackedS3Store) Delete(hash string) error { - err := d.s3.Delete(hash) +func (d *DBBackedStore) Delete(hash string) error { + err := d.blobs.Delete(hash) if err != nil { return err } @@ -75,7 +75,7 @@ func (d *DBBackedS3Store) Delete(hash string) error { } // Block deletes the blob and prevents it from being uploaded in the future -func (d *DBBackedS3Store) Block(hash string) error { +func (d *DBBackedStore) Block(hash string) error { if blocked, err := d.isBlocked(hash); blocked || err != nil { return err } @@ -93,7 +93,7 @@ func (d *DBBackedS3Store) Block(hash string) error { } if has { - err = d.s3.Delete(hash) + err = d.blobs.Delete(hash) if err != nil { return err } @@ -108,7 +108,7 @@ func (d *DBBackedS3Store) Block(hash string) error { } // Wants returns false if the hash exists or is blocked, true otherwise -func (d *DBBackedS3Store) Wants(hash string) (bool, error) { +func (d *DBBackedStore) Wants(hash string) (bool, error) { blocked, err := d.isBlocked(hash) if blocked || err != nil { return false, err @@ -121,11 +121,11 @@ func (d *DBBackedS3Store) Wants(hash string) (bool, error) { // MissingBlobsForKnownStream returns missing blobs for an existing stream // WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks // like no blobs are missing -func (d *DBBackedS3Store) MissingBlobsForKnownStream(sdHash string) ([]string, error) { +func (d *DBBackedStore) MissingBlobsForKnownStream(sdHash string) ([]string, error) { return d.db.MissingBlobsForKnownStream(sdHash) } -func (d *DBBackedS3Store) markBlocked(hash string) error { +func (d *DBBackedStore) markBlocked(hash string) error { err := d.initBlocked() if err != nil { return err @@ -138,7 +138,7 @@ func (d *DBBackedS3Store) markBlocked(hash string) error { return nil } -func (d *DBBackedS3Store) isBlocked(hash string) (bool, error) { +func (d *DBBackedStore) isBlocked(hash string) (bool, error) { err := d.initBlocked() if err != nil { return false, err @@ -150,7 +150,7 @@ func (d *DBBackedS3Store) isBlocked(hash string) (bool, error) { return d.blocked[hash], nil } -func (d *DBBackedS3Store) initBlocked() error { +func (d *DBBackedStore) initBlocked() error { // first check without blocking since this is the most likely scenario if d.blocked != nil { return nil