add getstream command to download a stream from a peer

This commit is contained in:
Alex Grintsvayg 2019-09-11 12:30:01 -04:00
parent 1a6b862c96
commit 08df3b167c
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
4 changed files with 252 additions and 8 deletions

71
cmd/getstream.go Normal file
View file

@ -0,0 +1,71 @@
package cmd
import (
"io/ioutil"
"os"
"github.com/lbryio/lbry.go/stream"
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/reflector.go/peer"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
func init() {
var cmd = &cobra.Command{
Use: "getstream ADDRESS:PORT SDHASH",
Short: "Get a stream from a reflector server",
Args: cobra.ExactArgs(2),
Run: getStreamCmd,
}
rootCmd.AddCommand(cmd)
}
func getStreamCmd(cmd *cobra.Command, args []string) {
addr := args[0]
sdHash := args[1]
c := peer.Client{}
err := c.Connect(addr)
if err != nil {
log.Fatal("error connecting client to server: ", err)
}
s, err := c.GetStream(sdHash)
if err != nil {
log.Error(errors.FullTrace(err))
return
}
var sd stream.SDBlob
err = sd.FromBlob(s[0])
if err != nil {
log.Error(errors.FullTrace(err))
return
}
log.Printf("Downloading %d blobs for %s", len(sd.BlobInfos)-1, sd.SuggestedFileName)
data, err := s.Data()
if err != nil {
log.Error(errors.FullTrace(err))
return
}
wd, err := os.Getwd()
if err != nil {
log.Error(errors.FullTrace(err))
return
}
filename := wd + "/" + sd.SuggestedFileName
err = ioutil.WriteFile(filename, data, 0644)
if err != nil {
log.Error(errors.FullTrace(err))
return
}
log.Printf("Wrote %d bytes to %s\n", len(data), filename)
}

View file

@ -14,23 +14,32 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
var peerNoDB bool
func init() { func init() {
var cmd = &cobra.Command{ var cmd = &cobra.Command{
Use: "peer", Use: "peer",
Short: "Run peer server", Short: "Run peer server",
Run: peerCmd, Run: peerCmd,
} }
cmd.Flags().BoolVar(&peerNoDB, "nodb", false, "Don't connect to a db and don't use a db-backed blob store")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
} }
func peerCmd(cmd *cobra.Command, args []string) { func peerCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL) var err error
err := db.Connect(globalConfig.DBConn)
checkErr(err)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db) peerServer := peer.NewServer(s3)
peerServer := peer.NewServer(combo)
if !peerNoDB {
db := new(db.SQL)
err = db.Connect(globalConfig.DBConn)
checkErr(err)
combo := store.NewDBBackedS3Store(s3, db)
peerServer = peer.NewServer(combo)
}
err = peerServer.Start(":" + strconv.Itoa(peer.DefaultPort)) err = peerServer.Start(":" + strconv.Itoa(peer.DefaultPort))
if err != nil { if err != nil {

164
peer/client.go Normal file
View file

@ -0,0 +1,164 @@
package peer
import (
"bufio"
"encoding/hex"
"encoding/json"
"io"
"net"
"time"
"github.com/lbryio/lbry.go/stream"
"github.com/lbryio/lbry.go/extras/errors"
log "github.com/sirupsen/logrus"
)
// ErrBlobExists is a default error for when a blob already exists on the reflector server.
var ErrBlobExists = errors.Base("blob exists on server")
// Client is an instance of a client connected to a server.
type Client struct {
Timeout time.Duration
conn net.Conn
buf *bufio.Reader
connected bool
}
// Connect connects to a specific clients and errors if it cannot be contacted.
func (c *Client) Connect(address string) error {
var err error
if c.Timeout == 0 {
c.Timeout = 5 * time.Second
}
c.conn, err = net.Dial("tcp4", address)
if err != nil {
return err
}
c.connected = true
c.buf = bufio.NewReader(c.conn)
return nil
}
// Close closes the connection with the client.
func (c *Client) Close() error {
c.connected = false
return c.conn.Close()
}
// GetStream gets a stream
func (c *Client) GetStream(sdHash string) (stream.Stream, error) {
if !c.connected {
return nil, errors.Err("not connected")
}
var sd stream.SDBlob
b, err := c.GetBlob(sdHash)
if err != nil {
return nil, err
}
err = sd.FromBlob(b)
if err != nil {
return nil, err
}
s := make(stream.Stream, len(sd.BlobInfos)+1-1) // +1 for sd blob, -1 for last null blob
s[0] = b
for i := 0; i < len(sd.BlobInfos)-1; i++ {
s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
return nil, err
}
}
return s, nil
}
// GetBlob gets a blob
func (c *Client) GetBlob(blobHash string) (stream.Blob, error) {
if !c.connected {
return nil, errors.Err("not connected")
}
sendRequest, err := json.Marshal(blobRequest{
RequestedBlob: blobHash,
})
if err != nil {
return nil, err
}
err = c.write(sendRequest)
if err != nil {
return nil, err
}
var resp blobResponse
err = c.read(&resp)
if err != nil {
return nil, err
}
if resp.IncomingBlob.Error != "" {
return nil, errors.Prefix(blobHash[:8], resp.IncomingBlob.Error)
}
if resp.IncomingBlob.BlobHash != blobHash {
return nil, errors.Prefix(blobHash[:8], "Blob hash in response does not match requested hash")
}
if resp.IncomingBlob.Length <= 0 {
return nil, errors.Prefix(blobHash[:8], "Length reported as <= 0")
}
log.Println("Receiving blob " + blobHash[:8])
blob, err := c.readRawBlob(resp.IncomingBlob.Length)
if err != nil {
return nil, err
}
return stream.Blob(blob), nil
}
func (c *Client) read(v interface{}) error {
err := c.conn.SetReadDeadline(time.Now().Add(c.Timeout))
if err != nil {
return errors.Err(err)
}
m, err := readNextMessage(c.buf)
if err != nil {
return err
}
err = json.Unmarshal(m, v)
return errors.Err(err)
}
func (c *Client) readRawBlob(blobSize int) ([]byte, error) {
err := c.conn.SetReadDeadline(time.Now().Add(c.Timeout))
if err != nil {
return nil, errors.Err(err)
}
blob := make([]byte, blobSize)
_, err = io.ReadFull(c.buf, blob)
return blob, errors.Err(err)
}
func (c *Client) write(b []byte) error {
err := c.conn.SetWriteDeadline(time.Now().Add(c.Timeout))
if err != nil {
return errors.Err(err)
}
log.Debugf("Writing %d bytes", len(b))
n, err := c.conn.Write(b)
if err == nil && n != len(b) {
err = io.ErrShortWrite
}
return errors.Err(err)
}

View file

@ -112,6 +112,7 @@ func (s *Server) handleConnection(conn net.Conn) {
}() }()
timeoutDuration := 1 * time.Minute timeoutDuration := 1 * time.Minute
buf := bufio.NewReader(conn)
for { for {
var request []byte var request []byte
@ -122,7 +123,7 @@ func (s *Server) handleConnection(conn net.Conn) {
log.Error(errors.FullTrace(err)) log.Error(errors.FullTrace(err))
} }
request, err = readNextRequest(conn) request, err = readNextMessage(buf)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
s.logError(err) s.logError(err)
@ -314,10 +315,9 @@ func (s *Server) logError(e error) {
//log.Error(errors.FullTrace(e)) //log.Error(errors.FullTrace(e))
} }
func readNextRequest(conn net.Conn) ([]byte, error) { func readNextMessage(buf *bufio.Reader) ([]byte, error) {
request := make([]byte, 0) request := make([]byte, 0)
eof := false eof := false
buf := bufio.NewReader(conn)
for { for {
chunk, err := buf.ReadBytes('}') chunk, err := buf.ReadBytes('}')