fix getstream command to write as it downloads (or else it runs out of memory)

This commit is contained in:
Alex Grintsvayg 2019-09-27 14:38:49 -04:00
parent cb669eb1a7
commit c1e8e7481f
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
2 changed files with 87 additions and 38 deletions

View file

@ -1,10 +1,9 @@
package cmd package cmd
import ( import (
"io/ioutil"
"os" "os"
"github.com/lbryio/lbry.go/stream" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer"
@ -33,39 +32,16 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
log.Fatal("error connecting client to server: ", err) log.Fatal("error connecting client to server: ", err)
} }
s, err := c.GetStream(sdHash) cache := store.NewFileBlobStore("/tmp/lbry_downloaded_blobs")
if err != nil {
log.Error(errors.FullTrace(err))
return
}
var sd stream.SDBlob
err = sd.FromBlob(s[0])
if err != nil {
log.Error(errors.FullTrace(err))
return
}
log.Printf("Downloading %d blobs for %s", len(sd.BlobInfos)-1, sd.SuggestedFileName)
data, err := s.Data()
if err != nil {
log.Error(errors.FullTrace(err))
return
}
wd, err := os.Getwd() wd, err := os.Getwd()
if err != nil { if err != nil {
log.Error(errors.FullTrace(err)) log.Fatal(err)
return
} }
filename := wd + "/" + sd.SuggestedFileName err = c.WriteStream(sdHash, wd, cache)
err = ioutil.WriteFile(filename, data, 0644)
if err != nil { if err != nil {
log.Error(errors.FullTrace(err)) log.Error(errors.FullTrace(err))
return return
} }
log.Printf("Wrote %d bytes to %s\n", len(data), filename)
} }

View file

@ -6,8 +6,11 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"net" "net"
"os"
"time" "time"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/stream" "github.com/lbryio/lbry.go/stream"
"github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/errors"
@ -47,15 +50,65 @@ func (c *Client) Close() error {
return c.conn.Close() return c.conn.Close()
} }
// WriteStream downloads and writes a stream to file
func (c *Client) WriteStream(sdHash, dir string, blobStore store.BlobStore) error {
if !c.connected {
return errors.Err("not connected")
}
var sd stream.SDBlob
sdb, err := c.getBlobWithCache(sdHash, blobStore)
if err != nil {
return err
}
err = sd.FromBlob(sdb)
if err != nil {
return err
}
info, err := os.Stat(dir)
if err != nil {
return errors.Prefix("cannot stat "+dir, err)
} else if !info.IsDir() {
return errors.Err(dir + " must be a directory")
}
f, err := os.Create(dir + "/" + sd.SuggestedFileName)
if err != nil {
return err
}
for i := 0; i < len(sd.BlobInfos)-1; i++ {
b, err := c.getBlobWithCache(hex.EncodeToString(sd.BlobInfos[i].BlobHash), blobStore)
if err != nil {
return err
}
data, err := b.Plaintext(sd.Key, sd.BlobInfos[i].IV)
if err != nil {
return err
}
_, err = f.Write(data)
if err != nil {
return err
}
}
return nil
}
// GetStream gets a stream // GetStream gets a stream
func (c *Client) GetStream(sdHash string) (stream.Stream, error) { func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) {
if !c.connected { if !c.connected {
return nil, errors.Err("not connected") return nil, errors.Err("not connected")
} }
var sd stream.SDBlob var sd stream.SDBlob
b, err := c.GetBlob(sdHash) b, err := c.getBlobWithCache(sdHash, blobCache)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -69,7 +122,7 @@ func (c *Client) GetStream(sdHash string) (stream.Stream, error) {
s[0] = b s[0] = b
for i := 0; i < len(sd.BlobInfos)-1; i++ { for i := 0; i < len(sd.BlobInfos)-1; i++ {
s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) s[i+1], err = c.getBlobWithCache(hex.EncodeToString(sd.BlobInfos[i].BlobHash), blobCache)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -78,14 +131,34 @@ func (c *Client) GetStream(sdHash string) (stream.Stream, error) {
return s, nil return s, nil
} }
func (c *Client) getBlobWithCache(hash string, blobCache store.BlobStore) (stream.Blob, error) {
if blobCache == nil {
return c.GetBlob(hash)
}
blob, err := blobCache.Get(hash)
if err == nil || !errors.Is(err, store.ErrBlobNotFound) {
return blob, err
}
blob, err = c.GetBlob(hash)
if err != nil {
return nil, err
}
err = blobCache.Put(hash, blob)
return blob, err
}
// GetBlob gets a blob // GetBlob gets a blob
func (c *Client) GetBlob(blobHash string) (stream.Blob, error) { func (c *Client) GetBlob(hash string) (stream.Blob, error) {
if !c.connected { if !c.connected {
return nil, errors.Err("not connected") return nil, errors.Err("not connected")
} }
sendRequest, err := json.Marshal(blobRequest{ sendRequest, err := json.Marshal(blobRequest{
RequestedBlob: blobHash, RequestedBlob: hash,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -103,16 +176,16 @@ func (c *Client) GetBlob(blobHash string) (stream.Blob, error) {
} }
if resp.IncomingBlob.Error != "" { if resp.IncomingBlob.Error != "" {
return nil, errors.Prefix(blobHash[:8], resp.IncomingBlob.Error) return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
} }
if resp.IncomingBlob.BlobHash != blobHash { if resp.IncomingBlob.BlobHash != hash {
return nil, errors.Prefix(blobHash[:8], "Blob hash in response does not match requested hash") return nil, errors.Prefix(hash[:8], "Blob hash in response does not match requested hash")
} }
if resp.IncomingBlob.Length <= 0 { if resp.IncomingBlob.Length <= 0 {
return nil, errors.Prefix(blobHash[:8], "Length reported as <= 0") return nil, errors.Prefix(hash[:8], "Length reported as <= 0")
} }
log.Println("Receiving blob " + blobHash[:8]) log.Println("Receiving blob " + hash[:8])
blob, err := c.readRawBlob(resp.IncomingBlob.Length) blob, err := c.readRawBlob(resp.IncomingBlob.Length)
if err != nil { if err != nil {