reflector.go/reflector/server.go
2021-05-20 17:21:35 -04:00

452 lines
10 KiB
Go

package reflector
import (
"bufio"
"crypto/sha512"
"encoding/hex"
"encoding/json"
"io"
"io/ioutil"
"net"
"time"
"github.com/google/gops/agent"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)
const (
// DefaultPort is the port the reflector server listens on if not passed in.
DefaultPort = 5566
// DefaultTimeout is the default timeout to read or write the next message
DefaultTimeout = 5 * time.Second
network = "tcp4"
protocolVersion1 = 0
protocolVersion2 = 1
maxBlobSize = stream.MaxBlobSize
)
var ErrBlobTooBig = errors.Base("blob must be at most %d bytes", maxBlobSize)
// Server is and instance of the reflector server. It houses the blob store and listener.
type Server struct {
Timeout time.Duration // timeout to read or write next message
EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled
underlyingStore store.BlobStore
outerStore store.BlobStore
grp *stop.Group
}
// NewServer returns an initialized reflector server pointer.
func NewServer(underlying store.BlobStore, outer store.BlobStore) *Server {
return &Server{
Timeout: DefaultTimeout,
underlyingStore: underlying,
outerStore: outer,
grp: stop.New(),
}
}
// Shutdown shuts down the reflector server gracefully.
func (s *Server) Shutdown() {
log.Println("shutting down reflector server...")
s.grp.StopAndWait()
log.Println("reflector server stopped")
}
//Start starts the server to handle connections.
func (s *Server) Start(address string) error {
l, err := net.Listen(network, address)
if err != nil {
return errors.Err(err)
}
log.Println("reflector listening on " + address)
if err := agent.Listen(agent.Options{}); err != nil {
log.Fatal(err)
}
s.grp.Add(1)
go func() {
<-s.grp.Ch()
err := l.Close()
if err != nil {
log.Error(errors.Prefix("closing listener", err))
}
s.grp.Done()
}()
s.grp.Add(1)
go func() {
s.listenAndServe(l)
s.grp.Done()
}()
if s.EnableBlocklist {
if b, ok := s.underlyingStore.(store.Blocklister); ok {
s.grp.Add(1)
go func() {
s.enableBlocklist(b)
s.grp.Done()
}()
} else {
//s.Shutdown()
return errors.Err("blocklist is enabled but blob store does not support blocklisting")
}
}
return nil
}
func (s *Server) listenAndServe(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
if s.quitting() {
return
}
log.Error(err)
} else {
s.grp.Add(1)
go func() {
s.handleConn(conn)
s.grp.Done()
}()
}
}
}
func (s *Server) handleConn(conn net.Conn) {
// all this stuff is to close the connections correctly when we're shutting down the server
connNeedsClosing := make(chan struct{})
defer func() {
close(connNeedsClosing)
}()
s.grp.Add(1)
go func() {
defer s.grp.Done()
select {
case <-connNeedsClosing:
case <-s.grp.Ch():
}
err := conn.Close()
if err != nil {
log.Error(errors.Prefix("closing peer conn", err))
}
}()
err := s.doHandshake(conn)
if err != nil {
if errors.Is(err, io.EOF) || s.quitting() {
return
}
err := s.doError(conn, err)
if err != nil {
log.Error(errors.Prefix("sending handshake error", err))
}
return
}
for {
err = s.receiveBlob(conn)
if err != nil {
if errors.Is(err, io.EOF) || s.quitting() {
return
}
err := s.doError(conn, err)
if err != nil {
log.Error(errors.Prefix("sending blob receive error", err))
}
return
}
}
}
func (s *Server) doError(conn net.Conn, err error) error {
if err == nil {
return nil
}
shouldLog := metrics.TrackError(metrics.DirectionUpload, err)
if shouldLog {
log.Errorln(errors.FullTrace(err))
}
if e2, ok := err.(*json.SyntaxError); ok {
log.Errorf("syntax error at byte offset %d", e2.Offset)
}
//resp, err := json.Marshal(errorResponse{Error: err.Error()})
//if err != nil {
// return err
//}
//return s.write(conn, resp)
return nil
}
func (s *Server) receiveBlob(conn net.Conn) error {
blobSize, blobHash, isSdBlob, err := s.readBlobRequest(conn)
if err != nil {
return err
}
var wantsBlob bool
if bl, ok := s.underlyingStore.(store.Blocklister); ok {
wantsBlob, err = bl.Wants(blobHash)
if err != nil {
return err
}
} else {
blobExists, err := s.underlyingStore.Has(blobHash)
if err != nil {
return err
}
wantsBlob = !blobExists
}
var neededBlobs []string
if isSdBlob && !wantsBlob {
if nbc, ok := s.underlyingStore.(neededBlobChecker); ok {
neededBlobs, err = nbc.MissingBlobsForKnownStream(blobHash)
if err != nil {
return err
}
} else {
// if we can't check for blobs in a stream, we have to say that the sd blob is
// missing. if we say we have the sd blob, they wont try to send any content blobs
wantsBlob = true
}
}
err = s.sendBlobResponse(conn, wantsBlob, isSdBlob, neededBlobs)
if err != nil {
return err
}
if !wantsBlob {
return nil
}
blob, err := s.readRawBlob(conn, blobSize)
if err != nil {
sendErr := s.sendTransferResponse(conn, false, isSdBlob)
if sendErr != nil {
return sendErr
}
return errors.Prefix("error reading blob "+blobHash[:8], err)
}
receivedBlobHash := BlobHash(blob)
if blobHash != receivedBlobHash {
sendErr := s.sendTransferResponse(conn, false, isSdBlob)
if sendErr != nil {
return sendErr
}
return errors.Err("hash of received blob data does not match hash from send request")
// this can also happen if the blob size is wrong, because the server will read the wrong number of bytes from the stream
}
log.Debugln("Got blob " + blobHash[:8])
if isSdBlob {
err = s.outerStore.PutSD(blobHash, blob)
} else {
err = s.outerStore.Put(blobHash, blob)
}
if err != nil {
return err
}
metrics.MtrInBytesReflector.Add(float64(len(blob)))
metrics.BlobUploadCount.Inc()
if isSdBlob {
metrics.SDBlobUploadCount.Inc()
}
return s.sendTransferResponse(conn, true, isSdBlob)
}
func (s *Server) doHandshake(conn net.Conn) error {
var handshake handshakeRequestResponse
err := s.read(conn, &handshake)
if err != nil {
return err
} else if handshake.Version == nil {
return errors.Err("handshake is missing protocol version")
} else if *handshake.Version != protocolVersion1 && *handshake.Version != protocolVersion2 {
return errors.Err("protocol version not supported")
}
resp, err := json.Marshal(handshakeRequestResponse{Version: handshake.Version})
if err != nil {
return err
}
return s.write(conn, resp)
}
func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) {
var sendRequest sendBlobRequest
err := s.read(conn, &sendRequest)
if err != nil {
return 0, "", false, err
}
var blobHash string
var blobSize int
isSdBlob := sendRequest.SdBlobHash != ""
if isSdBlob {
blobSize = sendRequest.SdBlobSize
blobHash = sendRequest.SdBlobHash
} else {
blobSize = sendRequest.BlobSize
blobHash = sendRequest.BlobHash
}
if blobHash == "" {
return blobSize, blobHash, isSdBlob, errors.Err("blob hash is empty")
}
if blobSize > maxBlobSize {
return blobSize, blobHash, isSdBlob, errors.Err(ErrBlobTooBig)
}
if blobSize == 0 {
return blobSize, blobHash, isSdBlob, errors.Err("0-byte blob received")
}
return blobSize, blobHash, isSdBlob, nil
}
func (s *Server) sendBlobResponse(conn net.Conn, shouldSendBlob, isSdBlob bool, neededBlobs []string) error {
var response []byte
var err error
if isSdBlob {
response, err = json.Marshal(sendSdBlobResponse{SendSdBlob: shouldSendBlob, NeededBlobs: neededBlobs})
} else {
response, err = json.Marshal(sendBlobResponse{SendBlob: shouldSendBlob})
}
if err != nil {
return err
}
return s.write(conn, response)
}
func (s *Server) sendTransferResponse(conn net.Conn, receivedBlob, isSdBlob bool) error {
var response []byte
var err error
if isSdBlob {
response, err = json.Marshal(sdBlobTransferResponse{ReceivedSdBlob: receivedBlob})
} else {
response, err = json.Marshal(blobTransferResponse{ReceivedBlob: receivedBlob})
}
if err != nil {
return err
}
return s.write(conn, response)
}
func (s *Server) read(conn net.Conn, v interface{}) error {
err := conn.SetReadDeadline(time.Now().Add(s.Timeout))
if err != nil {
return errors.Err(err)
}
dec := json.NewDecoder(conn)
err = dec.Decode(v)
if err != nil {
data, _ := ioutil.ReadAll(dec.Buffered())
if len(data) > 0 {
return errors.Err("%s. Data: %s", err.Error(), hex.EncodeToString(data))
}
return errors.Err(err)
}
return nil
}
func (s *Server) readRawBlob(conn net.Conn, blobSize int) ([]byte, error) {
err := conn.SetReadDeadline(time.Now().Add(s.Timeout))
if err != nil {
return nil, errors.Err(err)
}
blob := make([]byte, blobSize)
_, err = io.ReadFull(bufio.NewReader(conn), blob)
return blob, errors.Err(err)
}
func (s *Server) write(conn net.Conn, b []byte) error {
err := conn.SetWriteDeadline(time.Now().Add(s.Timeout))
if err != nil {
return errors.Err(err)
}
n, err := conn.Write(b)
if err == nil && n != len(b) {
err = io.ErrShortWrite
}
return errors.Err(err)
}
func (s *Server) quitting() bool {
select {
case <-s.grp.Ch():
return true
default:
return false
}
}
// BlobHash returns the sha512 hash hex encoded string of the blob byte slice.
func BlobHash(blob []byte) string {
hashBytes := sha512.Sum384(blob)
return hex.EncodeToString(hashBytes[:])
}
func IsValidJSON(b []byte) bool {
var r json.RawMessage
return json.Unmarshal(b, &r) == nil
}
//type errorResponse struct {
// Error string `json:"error"`
//}
type handshakeRequestResponse struct {
Version *int `json:"version"`
}
type sendBlobRequest struct {
BlobHash string `json:"blob_hash,omitempty"`
BlobSize int `json:"blob_size,omitempty"`
SdBlobHash string `json:"sd_blob_hash,omitempty"`
SdBlobSize int `json:"sd_blob_size,omitempty"`
}
type sendBlobResponse struct {
SendBlob bool `json:"send_blob"`
}
type sendSdBlobResponse struct {
SendSdBlob bool `json:"send_sd_blob"`
NeededBlobs []string `json:"needed_blobs,omitempty"`
}
type blobTransferResponse struct {
ReceivedBlob bool `json:"received_blob"`
}
type sdBlobTransferResponse struct {
ReceivedSdBlob bool `json:"received_sd_blob"`
}
// neededBlobChecker can check which blobs from a known stream are not uploaded yet
type neededBlobChecker interface {
MissingBlobsForKnownStream(string) ([]string, error)
}