when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50
3 changed files with 34 additions and 35 deletions
51
db/db.go
51
db/db.go
|
@ -111,6 +111,26 @@ func (s *SQL) AddBlobs(hash []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SQL) insertBlobs(hashes []string) error {
|
||||||
|
var (
|
||||||
|
q string
|
||||||
|
args []interface{}
|
||||||
|
)
|
||||||
|
dayAgo := time.Now().AddDate(0, 0, -1)
|
||||||
|
q = "insert into blob_ (hash, is_stored, length, last_accessed_at) values "
|
||||||
|
for _, hash := range hashes {
|
||||||
|
q += "(?,?,?,?),"
|
||||||
|
args = append(args, hash, true, stream.MaxBlobSize, dayAgo)
|
||||||
|
}
|
||||||
|
q = strings.TrimSuffix(q, ",")
|
||||||
|
_, err := s.exec(q, args...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) {
|
func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) {
|
||||||
if length <= 0 {
|
if length <= 0 {
|
||||||
return 0, errors.Err("length must be positive")
|
return 0, errors.Err("length must be positive")
|
||||||
|
@ -153,26 +173,6 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
|
||||||
return blobID, nil
|
return blobID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQL) insertBlobs(hashes []string) error {
|
|
||||||
var (
|
|
||||||
q string
|
|
||||||
args []interface{}
|
|
||||||
)
|
|
||||||
dayAgo := time.Now().AddDate(0, 0, -1)
|
|
||||||
q = "insert into blob_ (hash, is_stored, length, last_accessed_at) values "
|
|
||||||
for _, hash := range hashes {
|
|
||||||
q += "(?,?,?,?),"
|
|
||||||
args = append(args, hash, true, stream.MaxBlobSize, dayAgo)
|
|
||||||
}
|
|
||||||
q = strings.TrimSuffix(q, ",")
|
|
||||||
_, err := s.exec(q, args...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
||||||
var (
|
var (
|
||||||
q string
|
q string
|
||||||
|
@ -212,8 +212,8 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBlob checks if the database contains the blob information.
|
// HasBlob checks if the database contains the blob information.
|
||||||
func (s *SQL) HasBlob(hash string) (bool, error) {
|
func (s *SQL) HasBlob(hash string, touch bool) (bool, error) {
|
||||||
exists, err := s.HasBlobs([]string{hash})
|
exists, err := s.HasBlobs([]string{hash}, touch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -221,15 +221,16 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
||||||
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
func (s *SQL) HasBlobs(hashes []string, touch bool) (map[string]bool, error) {
|
||||||
exists, idsNeedingTouch, err := s.hasBlobs(hashes)
|
exists, idsNeedingTouch, err := s.hasBlobs(hashes)
|
||||||
go func() {
|
|
||||||
|
if touch {
|
||||||
if s.TrackAccess == TrackAccessBlobs {
|
if s.TrackAccess == TrackAccessBlobs {
|
||||||
s.touchBlobs(idsNeedingTouch)
|
s.touchBlobs(idsNeedingTouch)
|
||||||
} else if s.TrackAccess == TrackAccessStreams {
|
} else if s.TrackAccess == TrackAccessStreams {
|
||||||
s.touchStreams(idsNeedingTouch)
|
s.touchStreams(idsNeedingTouch)
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
|
|
||||||
return exists, err
|
return exists, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (u *Uploader) Upload(dirOrFilePath string) error {
|
||||||
|
|
||||||
var exists map[string]bool
|
var exists map[string]bool
|
||||||
if !u.skipExistsCheck {
|
if !u.skipExistsCheck {
|
||||||
exists, err = u.db.HasBlobs(hashes)
|
exists, err = u.db.HasBlobs(hashes, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,12 +32,12 @@ func (d *DBBackedStore) Name() string { return nameDBBacked }
|
||||||
|
|
||||||
// Has returns true if the blob is in the store
|
// Has returns true if the blob is in the store
|
||||||
func (d *DBBackedStore) Has(hash string) (bool, error) {
|
func (d *DBBackedStore) Has(hash string) (bool, error) {
|
||||||
return d.db.HasBlob(hash)
|
return d.db.HasBlob(hash, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get gets the blob
|
// Get gets the blob
|
||||||
func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
|
func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
|
||||||
has, err := d.db.HasBlob(hash)
|
has, err := d.db.HasBlob(hash, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -46,12 +46,10 @@ func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := d.blobs.Get(hash)
|
b, err := d.blobs.Get(hash)
|
||||||
if d.deleteOnMiss {
|
if d.deleteOnMiss && errors.Is(err, ErrBlobNotFound) {
|
||||||
if err != nil && errors.Is(err, ErrBlobNotFound) {
|
e2 := d.Delete(hash)
|
||||||
e2 := d.Delete(hash)
|
if e2 != nil {
|
||||||
if e2 != nil {
|
log.Errorf("error while deleting blob from db: %s", errors.FullTrace(err))
|
||||||
log.Errorf("error while deleting blob from db: %s", errors.FullTrace(err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,7 +108,7 @@ func (d *DBBackedStore) Block(hash string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
has, err := d.db.HasBlob(hash)
|
has, err := d.db.HasBlob(hash, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue