fix reflector responding correctly when we have the full stream. fixes lbryio/reflector-cluster#60

This commit is contained in:
Alex Grintsvayg 2018-07-26 10:25:47 -04:00
parent dd98b3cdfb
commit 8f395d8743
5 changed files with 53 additions and 14 deletions

View file

@ -20,6 +20,7 @@ type DB interface {
HasBlob(string) (bool, error) HasBlob(string) (bool, error)
AddBlob(string, int, bool) error AddBlob(string, int, bool) error
AddSDBlob(string, int, types.SdBlob) error AddSDBlob(string, int, types.SdBlob) error
HasFullStream(string) (bool, error)
} }
// SQL is the container for the supporting MySQL database connection. // SQL is the container for the supporting MySQL database connection.
@ -157,6 +158,32 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
return exists, nil return exists, nil
} }
// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs)
func (s *SQL) HasFullStream(sdHash string) (bool, error) {
if s.conn == nil {
return false, errors.Err("not connected")
}
query := `SELECT EXISTS(
SELECT 1 FROM stream s
LEFT JOIN stream_blob sb ON s.hash = sb.stream_hash
LEFT JOIN blob_ b ON b.hash = sb.blob_hash
WHERE s.sd_hash = ?
GROUP BY s.sd_hash
HAVING min(b.is_stored = 1)
);`
args := []interface{}{sdHash}
logQuery(query, args...)
row := s.conn.QueryRow(query, args...)
exists := false
err := row.Scan(&exists)
return exists, errors.Err(err)
}
// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information // AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information
// into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is // into a stream, and inserts the associated blobs' information in the database. If a blob fails the transaction is
// rolled back and error(s) are returned. // rolled back and error(s) are returned.

View file

@ -133,19 +133,27 @@ func (s *Server) doError(conn net.Conn, err error) error {
} }
func (s *Server) receiveBlob(conn net.Conn) error { func (s *Server) receiveBlob(conn net.Conn) error {
var err error
blobSize, blobHash, isSdBlob, err := s.readBlobRequest(conn) blobSize, blobHash, isSdBlob, err := s.readBlobRequest(conn)
if err != nil { if err != nil {
return err return err
} }
// fullStreamChecker can check if the full stream has been uploaded
type fullStreamChecker interface {
HasFullStream(string) (bool, error)
}
blobExists := false blobExists := false
if !isSdBlob { if fsc, ok := s.store.(fullStreamChecker); ok && isSdBlob {
// we have to say sd blobs are missing because if we say we have it, they wont try to send any content blobs blobExists, err = fsc.HasFullStream(blobHash)
has, err := s.store.Has(blobHash) } else {
if err != nil { // if we can't confirm that we have the full stream, we have to say that the sd blob is missing. if we say we have it, they wont try to send any content blobs
return err blobExists, err = s.store.Has(blobHash)
} }
blobExists = has if err != nil {
return err
} }
err = s.sendBlobResponse(conn, blobExists, isSdBlob) err = s.sendBlobResponse(conn, blobExists, isSdBlob)

View file

@ -58,3 +58,8 @@ func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error {
return d.db.AddSDBlob(hash, len(blob), blobContents) return d.db.AddSDBlob(hash, len(blob), blobContents)
} }
// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs)
func (d *DBBackedS3Store) HasFullStream(sdHash string) (bool, error) {
return d.db.HasFullStream(sdHash)
}

View file

@ -28,7 +28,6 @@ func (f *FileBlobStore) initOnce() error {
if f.initialized { if f.initialized {
return nil return nil
} }
defer func() { f.initialized = true }()
if stat, err := os.Stat(f.dir); err != nil { if stat, err := os.Stat(f.dir); err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -42,6 +41,8 @@ func (f *FileBlobStore) initOnce() error {
} else if !stat.IsDir() { } else if !stat.IsDir() {
return errors.Err("blob dir exists but is not a dir") return errors.Err("blob dir exists but is not a dir")
} }
f.initialized = true
return nil return nil
} }
@ -80,7 +81,7 @@ func (f *FileBlobStore) Get(hash string) ([]byte, error) {
return ioutil.ReadAll(file) return ioutil.ReadAll(file)
} }
// Put stores the blob on disk or errors with any IO error. // Put stores the blob on disk
func (f *FileBlobStore) Put(hash string, blob []byte) error { func (f *FileBlobStore) Put(hash string, blob []byte) error {
err := f.initOnce() err := f.initOnce()
if err != nil { if err != nil {
@ -90,8 +91,7 @@ func (f *FileBlobStore) Put(hash string, blob []byte) error {
return ioutil.WriteFile(f.path(hash), blob, 0644) return ioutil.WriteFile(f.path(hash), blob, 0644)
} }
// PutSD stores the sd blob on the disk or errors with any IO error. // PutSD stores the sd blob on the disk
func (f *FileBlobStore) PutSD(hash string, blob []byte) error { func (f *FileBlobStore) PutSD(hash string, blob []byte) error {
//Todo - need to handle when streaming hash is not present.
return f.Put(hash, blob) return f.Put(hash, blob)
} }

View file

@ -28,7 +28,7 @@ func (m *MemoryBlobStore) Get(hash string) ([]byte, error) {
return blob, nil return blob, nil
} }
// Put stores the blob in memory. It will never error. // Put stores the blob in memory
func (m *MemoryBlobStore) Put(hash string, blob []byte) error { func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
if m.blobs == nil { if m.blobs == nil {
m.blobs = make(map[string][]byte) m.blobs = make(map[string][]byte)
@ -37,8 +37,7 @@ func (m *MemoryBlobStore) Put(hash string, blob []byte) error {
return nil return nil
} }
// PutSD stores the sd blob in memory. It will never error. // PutSD stores the sd blob in memory
func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error { func (m *MemoryBlobStore) PutSD(hash string, blob []byte) error {
//ToDo - need to handle when stream is not present.
return m.Put(hash, blob) return m.Put(hash, blob)
} }