reflector.go/server/peer/server.go

405 lines
9.2 KiB
Go
Raw Normal View History

2018-01-29 20:37:26 +01:00
package peer
import (
"bufio"
2020-03-20 15:15:41 +01:00
"encoding/hex"
2018-01-29 20:37:26 +01:00
"encoding/json"
2020-03-20 15:15:41 +01:00
ee "errors"
2018-01-29 20:37:26 +01:00
"io"
"net"
"strings"
"time"
2018-01-29 20:37:26 +01:00
2019-12-29 02:42:03 +01:00
"github.com/lbryio/reflector.go/internal/metrics"
2019-02-08 20:56:41 +01:00
"github.com/lbryio/reflector.go/reflector"
2021-01-09 05:08:20 +01:00
"github.com/lbryio/reflector.go/shared"
2019-02-08 20:56:41 +01:00
"github.com/lbryio/reflector.go/store"
2019-11-14 01:11:35 +01:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/stream"
2018-01-29 20:37:26 +01:00
log "github.com/sirupsen/logrus"
)
const (
// DefaultPort is the port the peer server listens on if not passed in.
DefaultPort = 3333
// LbrycrdAddress to be used when paying for data. Not implemented yet.
2018-02-07 21:21:20 +01:00
LbrycrdAddress = "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct"
2018-01-29 20:37:26 +01:00
)
// Server is an instance of a peer server that houses the listener and store.
2018-01-29 20:37:26 +01:00
type Server struct {
store store.BlobStore
2021-08-02 08:23:54 +02:00
disk *store.DiskStore
closed bool
2020-01-02 19:27:34 +01:00
grp *stop.Group
2018-01-29 20:37:26 +01:00
}
// NewServer returns an initialized Server pointer.
2021-08-02 08:23:54 +02:00
func NewServer(blobstore store.BlobStore) *Server {
2018-01-29 20:37:26 +01:00
return &Server{
2021-08-02 08:23:54 +02:00
store: blobstore,
disk: store.NewDiskStore("/external/blobs", 2),
grp: stop.New(),
2018-01-29 20:37:26 +01:00
}
}
// Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() {
2019-12-29 02:42:03 +01:00
log.Debug("shutting down peer server")
s.grp.StopAndWait()
log.Debug("peer server stopped")
}
// Start starts the server listener to handle connections.
func (s *Server) Start(address string) error {
log.Println("peer listening on " + address)
l, err := net.Listen("tcp4", address)
2018-01-29 20:37:26 +01:00
if err != nil {
return err
}
go s.listenForShutdown(l)
s.grp.Add(1)
go func() {
s.listenAndServe(l)
s.grp.Done()
}()
return nil
}
func (s *Server) listenForShutdown(listener net.Listener) {
<-s.grp.Ch()
s.closed = true
err := listener.Close()
if err != nil {
log.Error("error closing listener for peer server - ", err)
}
}
func (s *Server) listenAndServe(listener net.Listener) {
2018-01-29 20:37:26 +01:00
for {
conn, err := listener.Accept()
2018-01-29 20:37:26 +01:00
if err != nil {
if s.closed {
return
}
2019-02-08 23:28:09 +01:00
log.Error(errors.Prefix("accepting conn", err))
2018-01-29 20:37:26 +01:00
} else {
s.grp.Add(1)
2021-05-21 00:12:30 +02:00
metrics.RoutinesQueue.WithLabelValues("peer", "server-handleconn").Inc()
go func() {
2021-05-21 00:12:30 +02:00
defer metrics.RoutinesQueue.WithLabelValues("peer", "server-handleconn").Dec()
s.handleConnection(conn)
s.grp.Done()
}()
2018-01-29 20:37:26 +01:00
}
}
}
func (s *Server) handleConnection(conn net.Conn) {
defer func() {
if err := conn.Close(); err != nil {
log.Error(errors.Prefix("closing peer conn", err))
}
}()
2019-03-22 04:22:31 +01:00
timeoutDuration := 1 * time.Minute
buf := bufio.NewReader(conn)
2018-01-29 20:37:26 +01:00
for {
var request []byte
var response []byte
err := conn.SetReadDeadline(time.Now().Add(timeoutDuration))
if err != nil {
log.Error(errors.FullTrace(err))
}
request, err = readNextMessage(buf)
2018-01-29 20:37:26 +01:00
if err != nil {
if err != io.EOF {
s.logError(err)
2018-01-29 20:37:26 +01:00
}
return
}
err = conn.SetReadDeadline(time.Time{})
if err != nil {
log.Error(errors.FullTrace(err))
}
2021-08-02 08:23:54 +02:00
response, blobHash, err := s.handleCompositeRequest(request)
if err != nil {
2020-02-26 00:22:22 +01:00
log.Error(errors.FullTrace(err))
return
}
err = conn.SetWriteDeadline(time.Now().Add(timeoutDuration))
if err != nil {
log.Error(errors.FullTrace(err))
}
n, err := conn.Write(response)
2021-08-02 08:23:54 +02:00
s.disk.Sendfile(blobHash, conn)
if err != nil {
if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset
s.logError(err)
}
return
} else if n != len(response) {
log.Errorln(io.ErrShortWrite)
return
}
err = conn.SetWriteDeadline(time.Time{})
if err != nil {
log.Error(errors.FullTrace(err))
}
2018-01-29 20:37:26 +01:00
}
}
func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
2018-01-29 20:37:26 +01:00
var request availabilityRequest
err := json.Unmarshal(data, &request)
2018-01-29 20:37:26 +01:00
if err != nil {
2020-02-25 21:49:51 +01:00
return nil, errors.Err(err)
2018-01-29 20:37:26 +01:00
}
availableBlobs := []string{}
for _, blobHash := range request.RequestedBlobs {
exists, err := s.store.Has(blobHash)
if err != nil {
return nil, err
2018-01-29 20:37:26 +01:00
}
if exists {
availableBlobs = append(availableBlobs, blobHash)
}
}
2018-02-07 21:21:20 +01:00
return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs})
2018-01-29 20:37:26 +01:00
}
//func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) {
// var request paymentRateRequest
// err := json.Unmarshal(data, &request)
// if err != nil {
// return nil, err
// }
//
// offerReply := paymentRateAccepted
// if request.BlobDataPaymentRate < 0 {
// offerReply = paymentRateTooLow
// }
//
// return json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply})
//}
//
//func (s *Server) handleBlobRequest(data []byte) ([]byte, error) {
// var request blobRequest
// err := json.Unmarshal(data, &request)
// if err != nil {
// return nil, err
// }
//
// log.Debugln("Sending blob " + request.RequestedBlob[:8])
//
// blob, err := s.store.Get(request.RequestedBlob)
// if err != nil {
// return nil, err
// }
//
// response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{
// BlobHash: reflector.BlobHash(blob),
// Length: len(blob),
// }})
// if err != nil {
// return nil, err
// }
//
// return append(response, blob...), nil
//}
2018-01-29 20:37:26 +01:00
2021-08-02 08:23:54 +02:00
func (s *Server) handleCompositeRequest(data []byte) ([]byte, string, error) {
2019-01-15 20:23:08 +01:00
var request compositeRequest
err := json.Unmarshal(data, &request)
if err != nil {
2020-03-30 22:05:08 +02:00
var je *json.SyntaxError
2020-03-20 15:15:41 +01:00
if ee.As(err, &je) {
2021-08-02 08:23:54 +02:00
return nil, "", errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data))
2020-03-20 15:15:41 +01:00
}
2021-08-02 08:23:54 +02:00
return nil, "", errors.Err(err)
2019-01-15 20:23:08 +01:00
}
response := compositeResponse{
LbrycrdAddress: LbrycrdAddress,
}
if len(request.RequestedBlobs) > 0 {
var availableBlobs []string
for _, blobHash := range request.RequestedBlobs {
exists, err := s.store.Has(blobHash)
if err != nil {
2021-08-02 08:23:54 +02:00
return nil, "", err
2019-01-15 20:23:08 +01:00
}
if exists {
availableBlobs = append(availableBlobs, blobHash)
}
}
response.AvailableBlobs = availableBlobs
}
response.BlobDataPaymentRate = paymentRateAccepted
if request.BlobDataPaymentRate < 0 {
response.BlobDataPaymentRate = paymentRateTooLow
}
var blob []byte
if request.RequestedBlob != "" {
2019-09-10 23:18:44 +02:00
if len(request.RequestedBlob) != stream.BlobHashHexLength {
2021-08-02 08:23:54 +02:00
return nil, "", errors.Err("Invalid blob hash length")
2019-09-10 23:18:44 +02:00
}
2019-01-17 22:49:19 +01:00
log.Debugln("Sending blob " + request.RequestedBlob[:8])
2019-01-15 20:23:08 +01:00
2021-08-02 08:23:54 +02:00
size, err := s.disk.Size(request.RequestedBlob)
if err != nil {
2019-01-15 20:23:08 +01:00
response.IncomingBlob = incomingBlob{
Error: err.Error(),
}
} else {
response.IncomingBlob = incomingBlob{
2021-08-02 08:23:54 +02:00
BlobHash: request.RequestedBlob,
Length: int(size),
2019-01-15 20:23:08 +01:00
}
}
2021-08-02 08:23:54 +02:00
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc()
metrics.PeerDownloadCount.Inc()
2019-01-15 20:23:08 +01:00
}
respData, err := json.Marshal(response)
if err != nil {
2021-08-02 08:23:54 +02:00
return nil, "", err
2019-01-15 20:23:08 +01:00
}
2021-08-02 08:23:54 +02:00
return respData, request.RequestedBlob, nil
2019-01-15 20:23:08 +01:00
}
func (s *Server) logError(e error) {
if e == nil {
return
}
2019-12-29 17:57:43 +01:00
shouldLog := metrics.TrackError(metrics.DirectionDownload, e)
2019-02-08 20:56:41 +01:00
if shouldLog {
log.Errorln(errors.FullTrace(e))
}
}
func readNextMessage(buf *bufio.Reader) ([]byte, error) {
msg := make([]byte, 0)
eof := false
2018-01-29 20:37:26 +01:00
for {
chunk, err := buf.ReadBytes('}')
2018-01-29 20:37:26 +01:00
if err != nil {
if err != io.EOF {
2019-02-09 02:19:58 +01:00
//log.Errorln("readBytes error:", err) // logged by caller
return msg, err
}
eof = true
}
//log.Debugln("got", len(chunk), "bytes.")
//spew.Dump(chunk)
if len(chunk) > 0 {
msg = append(msg, chunk...)
if len(msg) > maxRequestSize {
return msg, errRequestTooLarge
}
// yes, this is how the peer protocol knows when the request finishes
if reflector.IsValidJSON(msg) {
break
2018-01-29 20:37:26 +01:00
}
}
if eof {
2018-01-29 20:37:26 +01:00
break
}
}
//log.Debugln("total size:", len(request))
//if len(request) > 0 {
// spew.Dump(request)
//}
if len(msg) == 0 && eof {
return nil, io.EOF
2018-01-29 20:37:26 +01:00
}
return msg, nil
}
const (
maxRequestSize = 64 * (2 ^ 10) // 64kb
paymentRateAccepted = "RATE_ACCEPTED"
paymentRateTooLow = "RATE_TOO_LOW"
//ToDo: paymentRateUnset is not used but exists in the protocol.
//paymentRateUnset = "RATE_UNSET"
)
var errRequestTooLarge = errors.Base("request is too large")
type availabilityRequest struct {
LbrycrdAddress bool `json:"lbrycrd_address"`
RequestedBlobs []string `json:"requested_blobs"`
}
type availabilityResponse struct {
LbrycrdAddress string `json:"lbrycrd_address"`
AvailableBlobs []string `json:"available_blobs"`
}
type paymentRateRequest struct {
BlobDataPaymentRate float64 `json:"blob_data_payment_rate"`
}
type paymentRateResponse struct {
BlobDataPaymentRate string `json:"blob_data_payment_rate"`
}
type blobRequest struct {
RequestedBlob string `json:"requested_blob"`
}
type incomingBlob struct {
Error string `json:"error,omitempty"`
BlobHash string `json:"blob_hash"`
Length int `json:"length"`
}
type blobResponse struct {
IncomingBlob incomingBlob `json:"incoming_blob"`
2021-01-09 05:08:20 +01:00
RequestTrace *shared.BlobTrace
}
2019-01-15 20:23:08 +01:00
type compositeRequest struct {
LbrycrdAddress bool `json:"lbrycrd_address"`
RequestedBlobs []string `json:"requested_blobs"`
BlobDataPaymentRate float64 `json:"blob_data_payment_rate"`
RequestedBlob string `json:"requested_blob"`
}
type compositeResponse struct {
LbrycrdAddress string `json:"lbrycrd_address,omitempty"`
AvailableBlobs []string `json:"available_blobs,omitempty"`
BlobDataPaymentRate string `json:"blob_data_payment_rate,omitempty"`
IncomingBlob incomingBlob `json:"incoming_blob,omitempty"`
}