reflector.go/peer/client.go

211 lines
4.8 KiB
Go
Raw Permalink Normal View History

package peer
import (
"bufio"
"encoding/hex"
"encoding/json"
"io"
"net"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
2021-01-09 05:08:20 +01:00
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store"
2019-11-14 01:11:35 +01:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)
// ErrBlobExists is a default error for when a blob already exists on the reflector server.
var ErrBlobExists = errors.Base("blob exists on server")
// Client is an instance of a client connected to a server.
type Client struct {
Timeout time.Duration
conn net.Conn
buf *bufio.Reader
connected bool
}
// Connect connects to a specific clients and errors if it cannot be contacted.
func (c *Client) Connect(address string) error {
var err error
if c.Timeout == 0 {
c.Timeout = 5 * time.Second
}
c.conn, err = net.Dial("tcp4", address)
if err != nil {
return err
}
c.connected = true
c.buf = bufio.NewReader(c.conn)
return nil
}
// Close closes the connection with the client.
func (c *Client) Close() error {
c.connected = false
return c.conn.Close()
}
// GetStream gets a stream
func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) {
if !c.connected {
return nil, errors.Err("not connected")
}
var sd stream.SDBlob
2021-01-09 05:08:20 +01:00
b, trace, err := c.GetBlob(sdHash)
if err != nil {
return nil, err
}
2021-01-09 05:08:20 +01:00
log.Debug(trace.String())
err = sd.FromBlob(b)
if err != nil {
return nil, err
}
s := make(stream.Stream, len(sd.BlobInfos)+1-1) // +1 for sd blob, -1 for last null blob
s[0] = b
for i := 0; i < len(sd.BlobInfos)-1; i++ {
2021-01-09 05:08:20 +01:00
s[i+1], trace, err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
return nil, err
}
2021-01-09 05:08:20 +01:00
log.Debug(trace.String())
}
return s, nil
}
// HasBlob checks if the blob is available
func (c *Client) HasBlob(hash string) (bool, error) {
if !c.connected {
return false, errors.Err("not connected")
}
sendRequest, err := json.Marshal(availabilityRequest{
RequestedBlobs: []string{hash},
})
if err != nil {
return false, err
}
err = c.write(sendRequest)
if err != nil {
return false, err
}
var resp availabilityResponse
err = c.read(&resp)
if err != nil {
return false, err
}
for _, h := range resp.AvailableBlobs {
if h == hash {
return true, nil
}
}
return false, nil
}
// GetBlob gets a blob
2021-01-09 05:08:20 +01:00
func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
if !c.connected {
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), errors.Err("not connected")
}
sendRequest, err := json.Marshal(blobRequest{
RequestedBlob: hash,
})
if err != nil {
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
}
err = c.write(sendRequest)
if err != nil {
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
}
var resp blobResponse
err = c.read(&resp)
if err != nil {
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
}
2021-01-09 05:08:20 +01:00
trace := shared.NewBlobTrace(time.Since(start), "tcp")
if resp.RequestTrace != nil {
trace = *resp.RequestTrace
}
if resp.IncomingBlob.Error != "" {
2021-01-09 05:08:20 +01:00
return nil, trace, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
}
if resp.IncomingBlob.BlobHash != hash {
2021-01-09 05:08:20 +01:00
return nil, trace.Stack(time.Since(start), "tcp"), errors.Prefix(hash[:8], "blob hash in response does not match requested hash")
}
if resp.IncomingBlob.Length <= 0 {
2021-01-09 05:08:20 +01:00
return nil, trace, errors.Prefix(hash[:8], "length reported as <= 0")
}
2019-12-16 15:52:51 +01:00
log.Debugf("receiving blob %s from %s", hash[:8], c.conn.RemoteAddr())
blob, err := c.readRawBlob(resp.IncomingBlob.Length)
if err != nil {
2021-01-09 05:08:20 +01:00
return nil, (*resp.RequestTrace).Stack(time.Since(start), "tcp"), err
}
metrics.MtrInBytesTcp.Add(float64(len(blob)))
2021-01-09 05:08:20 +01:00
return blob, trace.Stack(time.Since(start), "tcp"), nil
}
func (c *Client) read(v interface{}) error {
err := c.conn.SetReadDeadline(time.Now().Add(c.Timeout))
if err != nil {
return errors.Err(err)
}
m, err := readNextMessage(c.buf)
if err != nil {
return err
}
2019-12-16 15:52:51 +01:00
log.Debugf("read %d bytes from %s", len(m), c.conn.RemoteAddr())
err = json.Unmarshal(m, v)
return errors.Err(err)
}
func (c *Client) readRawBlob(blobSize int) ([]byte, error) {
err := c.conn.SetReadDeadline(time.Now().Add(c.Timeout))
if err != nil {
return nil, errors.Err(err)
}
blob := make([]byte, blobSize)
n, err := io.ReadFull(c.buf, blob)
2019-12-16 15:52:51 +01:00
log.Debugf("read %d bytes from %s", n, c.conn.RemoteAddr())
return blob, errors.Err(err)
}
func (c *Client) write(b []byte) error {
err := c.conn.SetWriteDeadline(time.Now().Add(c.Timeout))
if err != nil {
return errors.Err(err)
}
2019-12-16 15:52:51 +01:00
log.Debugf("writing %d bytes to %s", len(b), c.conn.RemoteAddr())
n, err := c.conn.Write(b)
if err == nil && n != len(b) {
err = io.ErrShortWrite
}
return errors.Err(err)
}