diff --git a/db/db.go b/db/db.go index 1475fe7..c6793d9 100644 --- a/db/db.go +++ b/db/db.go @@ -20,6 +20,7 @@ type DB interface { HasBlob(string) (bool, error) AddBlob(string, int, bool) error AddSDBlob(string, int, types.SdBlob) error + HasFullStream(string) (bool, error) } // SQL is the container for the supporting MySQL database connection. @@ -157,6 +158,32 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { return exists, nil } +// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs) +func (s *SQL) HasFullStream(sdHash string) (bool, error) { + if s.conn == nil { + return false, errors.Err("not connected") + } + + query := `SELECT EXISTS( + SELECT 1 FROM stream s + LEFT JOIN stream_blob sb ON s.hash = sb.stream_hash + LEFT JOIN blob_ b ON b.hash = sb.blob_hash + WHERE s.sd_hash = ? + GROUP BY s.sd_hash + HAVING min(b.is_stored = 1) + );` + args := []interface{}{sdHash} + + logQuery(query, args...) + + row := s.conn.QueryRow(query, args...) + + exists := false + err := row.Scan(&exists) + + return exists, errors.Err(err) +} + // AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information // into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is // rolled back and error(s) are returned. diff --git a/reflector/server.go b/reflector/server.go index 8b9357f..5f0f2e9 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -133,19 +133,27 @@ func (s *Server) doError(conn net.Conn, err error) error { } func (s *Server) receiveBlob(conn net.Conn) error { + var err error + blobSize, blobHash, isSdBlob, err := s.readBlobRequest(conn) if err != nil { return err } + // fullStreamChecker can check if the full stream has been uploaded + type fullStreamChecker interface { + HasFullStream(string) (bool, error) + } + blobExists := false - if !isSdBlob { - // we have to say sd blobs are missing because if we say we have it, they wont try to send any content blobs - has, err := s.store.Has(blobHash) - if err != nil { - return err - } - blobExists = has + if fsc, ok := s.store.(fullStreamChecker); ok && isSdBlob { + blobExists, err = fsc.HasFullStream(blobHash) + } else { + // if we can't confirm that we have the full stream, we have to say that the sd blob is missing. if we say we have it, they wont try to send any content blobs + blobExists, err = s.store.Has(blobHash) + } + if err != nil { + return err } err = s.sendBlobResponse(conn, blobExists, isSdBlob) diff --git a/store/dbbacked.go b/store/dbbacked.go index f5d2e60..916fe68 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -58,3 +58,8 @@ func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error { return d.db.AddSDBlob(hash, len(blob), blobContents) } + +// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs) +func (d *DBBackedS3Store) HasFullStream(sdHash string) (bool, error) { + return d.db.HasFullStream(sdHash) +} diff --git a/store/file.go b/store/file.go index a6acdbe..0e06beb 100644 --- a/store/file.go +++ b/store/file.go @@ -28,7 +28,6 @@ func (f *FileBlobStore) initOnce() error { if f.initialized { return nil } - defer func() { f.initialized = true }() if stat, err := os.Stat(f.dir); err != nil { if os.IsNotExist(err) { @@ -42,6 +41,8 @@ func (f *FileBlobStore) initOnce() error { } else if !stat.IsDir() { return errors.Err("blob dir exists but is not a dir") } + + f.initialized = true return nil } @@ -80,7 +81,7 @@ func (f *FileBlobStore) Get(hash string) ([]byte, error) { return ioutil.ReadAll(file) } -// Put stores the blob on disk or errors with any IO error. +// Put stores the blob on disk func (f *FileBlobStore) Put(hash string, blob []byte) error { err := f.initOnce() if err != nil { @@ -90,8 +91,7 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error { return ioutil.WriteFile(f.path(hash), blob, 0644) } -// PutSD stores the sd blob on the disk or errors with any IO error. +// PutSD stores the sd blob on the disk func (f *FileBlobStore) PutSD(hash string, blob []byte) error { - //Todo - need to handle when streaming hash is not present. return f.Put(hash, blob) } diff --git a/store/memory.go b/store/memory.go index 0a52e0c..7757bd0 100644 --- a/store/memory.go +++ b/store/memory.go @@ -28,7 +28,7 @@ func (m *MemoryBlobStore) Get(hash string) ([]byte, error) { return blob, nil } -// Put stores the blob in memory. It will never error. +// Put stores the blob in memory func (m *MemoryBlobStore) Put(hash string, blob []byte) error { if m.blobs == nil { m.blobs = make(map[string][]byte) @@ -37,8 +37,7 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error { return nil } -// PutSD stores the sd blob in memory. It will never error. +// PutSD stores the sd blob in memory func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error { - //ToDo - need to handle when stream is not present. return m.Put(hash, blob) }