fix partial stream upload

This commit is contained in:
Alex Grintsvayg 2018-08-15 20:17:02 -04:00
parent 391f983630
commit 75886211b1
4 changed files with 171 additions and 53 deletions

View file

@ -158,30 +158,47 @@ func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
return exists, nil
}
// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs)
func (s *SQL) HasFullStream(sdHash string) (bool, error) {
// MissingBlobsForKnownStream returns missing blobs for an existing stream
// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks
// like no blobs are missing
func (s *SQL) MissingBlobsForKnownStream(sdHash string) ([]string, error) {
if s.conn == nil {
return false, errors.Err("not connected")
return nil, errors.Err("not connected")
}
query := `SELECT EXISTS(
SELECT 1 FROM stream s
LEFT JOIN stream_blob sb ON s.hash = sb.stream_hash
LEFT JOIN blob_ b ON b.hash = sb.blob_hash
WHERE s.sd_hash = ?
GROUP BY s.sd_hash
HAVING min(b.is_stored = 1)
);`
query := `
SELECT b.hash FROM blob_ b
INNER JOIN stream_blob sb ON b.hash = sb.blob_hash
INNER JOIN stream s ON s.hash = sb.stream_hash AND s.sd_hash = ?
WHERE b.is_stored = 0
`
args := []interface{}{sdHash}
logQuery(query, args...)
row := s.conn.QueryRow(query, args...)
rows, err := s.conn.Query(query, args...)
if err != nil {
return nil, errors.Err(err)
}
defer closeRows(rows)
exists := false
err := row.Scan(&exists)
var missingBlobs []string
var hash string
return exists, errors.Err(err)
for rows.Next() {
err := rows.Scan(&hash)
if err != nil {
return nil, errors.Err(err)
}
missingBlobs = append(missingBlobs, hash)
}
err = rows.Err()
if err != nil {
return nil, errors.Err(err)
}
return missingBlobs, errors.Err(err)
}
// AddSDBlob takes the SD Hash number of blobs and the set of blobs. In a single db tx it inserts the sdblob information

View file

@ -165,23 +165,27 @@ func (s *Server) receiveBlob(conn net.Conn) error {
return err
}
// fullStreamChecker can check if the full stream has been uploaded
type fullStreamChecker interface {
HasFullStream(string) (bool, error)
}
blobExists := false
if fsc, ok := s.store.(fullStreamChecker); ok && isSdBlob {
blobExists, err = fsc.HasFullStream(blobHash)
} else {
// if we can't confirm that we have the full stream, we have to say that the sd blob is missing. if we say we have it, they wont try to send any content blobs
blobExists, err = s.store.Has(blobHash)
}
blobExists, err := s.store.Has(blobHash)
if err != nil {
return err
}
err = s.sendBlobResponse(conn, blobExists, isSdBlob)
var neededBlobs []string
if isSdBlob && blobExists {
if fsc, ok := s.store.(neededBlobChecker); ok {
neededBlobs, err = fsc.MissingBlobsForKnownStream(blobHash)
if err != nil {
return err
}
} else {
// if we can't confirm that we have the full stream, we have to say that the sd blob is
// missing. if we say we have it, they wont try to send any content blobs
blobExists = false
}
}
err = s.sendBlobResponse(conn, blobExists, isSdBlob, neededBlobs)
if err != nil {
return err
}
@ -264,12 +268,12 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) {
return blobSize, blobHash, isSdBlob, nil
}
func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool) error {
func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool, neededBlobs []string) error {
var response []byte
var err error
if isSdBlob {
response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: !blobExists})
response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: !blobExists, NeededBlobs: neededBlobs})
} else {
response, err = json.Marshal(sendBlobResponse{SendBlob: !blobExists})
}
@ -374,3 +378,8 @@ type blobTransferResponse struct {
type sdBlobTransferResponse struct {
ReceivedSdBlob bool `json:"received_sd_blob"`
}
// neededBlobChecker can check which blobs from a known stream are not uploaded yet
type neededBlobChecker interface {
MissingBlobsForKnownStream(string) ([]string, error)
}

View file

@ -3,6 +3,7 @@ package reflector
import (
"crypto/rand"
"io"
"math"
"strconv"
"testing"
"time"
@ -10,6 +11,11 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/lbryio/reflector.go/store"
"encoding/json"
"sort"
"github.com/lbryio/reflector.go/dht/bits"
"github.com/phayes/freeport"
)
@ -62,11 +68,7 @@ func TestServer_MediumBlob(t *testing.T) {
t.Fatal("error connecting client to server", err)
}
blob := make([]byte, 1000)
_, err = rand.Read(blob)
if err != nil {
t.Fatal("failed to make random blob")
}
blob := randBlob(1000)
err = c.SendBlob(blob)
if err != nil {
@ -84,11 +86,7 @@ func TestServer_FullBlob(t *testing.T) {
t.Fatal("error connecting client to server", err)
}
blob := make([]byte, maxBlobSize)
_, err = rand.Read(blob)
if err != nil {
t.Fatal("failed to make random blob")
}
blob := randBlob(maxBlobSize)
err = c.SendBlob(blob)
if err != nil {
@ -106,11 +104,7 @@ func TestServer_TooBigBlob(t *testing.T) {
t.Fatal("error connecting client to server", err)
}
blob := make([]byte, maxBlobSize+1)
_, err = rand.Read(blob)
if err != nil {
t.Fatal("failed to make random blob")
}
blob := randBlob(maxBlobSize + 1)
err = c.SendBlob(blob)
if err == nil {
@ -144,11 +138,7 @@ func TestServer_Timeout(t *testing.T) {
time.Sleep(testTimeout * 2)
blob := make([]byte, 10)
_, err = rand.Read(blob)
if err != nil {
t.Fatal("failed to make random blob")
}
blob := randBlob(10)
err = c.SendBlob(blob)
t.Log(spew.Sdump(err))
@ -156,3 +146,103 @@ func TestServer_Timeout(t *testing.T) {
t.Error("server should have timed out by now")
}
}
type mockPartialStore struct {
store.MemoryBlobStore
missing []string
}
func (m mockPartialStore) MissingBlobsForKnownStream(hash string) ([]string, error) {
return m.missing, nil
}
func TestServer_PartialUpload(t *testing.T) {
port, err := freeport.GetFreePort()
if err != nil {
t.Fatal(err)
}
sdHash := bits.Rand().String()
missing := make([]string, 4)
for i := range missing {
missing[i] = bits.Rand().String()
}
var st store.BlobStore
st = &mockPartialStore{missing: missing}
if _, ok := st.(neededBlobChecker); !ok {
t.Fatal("mock does not implement the relevant interface")
}
st.Put(sdHash, randBlob(10))
srv := NewServer(st)
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil {
t.Fatal(err)
}
defer srv.Shutdown()
c := Client{}
err = c.Connect(":" + strconv.Itoa(port))
if err != nil {
t.Fatal("error connecting client to server", err)
}
sendRequest, err := json.Marshal(sendBlobRequest{
SdBlobHash: sdHash,
SdBlobSize: len(sdHash),
})
if err != nil {
t.Fatal(err)
}
_, err = c.conn.Write(sendRequest)
if err != nil {
t.Fatal(err)
}
var sendResp sendSdBlobResponse
err = json.NewDecoder(c.conn).Decode(&sendResp)
if err != nil {
t.Fatal(err)
}
if sendResp.SendSdBlob {
t.Errorf("expected SendSdBlob = false, got true")
}
if len(sendResp.NeededBlobs) != len(missing) {
t.Fatalf("got %d needed blobs, expected %d", len(sendResp.NeededBlobs), len(missing))
}
sort.Strings(sendResp.NeededBlobs)
sort.Strings(missing)
for i := range missing {
if missing[i] != sendResp.NeededBlobs[i] {
t.Errorf("needed blobs mismatch: %s != %s", missing[i], sendResp.NeededBlobs[i])
}
}
}
func MakeRandStream(size int) ([]byte, [][]byte) {
blobs := make([][]byte, int(math.Ceil(float64(size)/maxBlobSize)))
for i := 0; i < len(blobs); i++ {
blobs[i] = randBlob(int(math.Min(maxBlobSize, float64(size))))
size -= maxBlobSize
}
return nil, blobs
}
func randBlob(size int) []byte {
//if size > maxBlobSize {
// panic("blob size too big")
//}
blob := make([]byte, size)
_, err := rand.Read(blob)
if err != nil {
panic("failed to make random blob")
}
return blob
}

View file

@ -58,7 +58,9 @@ func (d *DBBackedS3Store) PutSD(hash string, blob []byte) error {
return d.db.AddSDBlob(hash, len(blob), blobContents)
}
// HasFullStream checks if the full stream has been uploaded (i.e. if we have the sd blob and all the content blobs)
func (d *DBBackedS3Store) HasFullStream(sdHash string) (bool, error) {
return d.db.HasFullStream(sdHash)
// MissingBlobsForKnownStream returns missing blobs for an existing stream
// WARNING: if the stream does NOT exist, no blob hashes will be returned, which looks
// like no blobs are missing
func (d *DBBackedS3Store) MissingBlobsForKnownStream(sdHash string) ([]string, error) {
return d.db.MissingBlobsForKnownStream(sdHash)
}