diff --git a/cmd/reflector.go b/cmd/reflector.go index 5f4305d..b4e19f4 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -9,6 +9,7 @@ import ( "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/meta" + "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" @@ -49,8 +50,15 @@ func reflectorCmd(cmd *cobra.Command, args []string) { log.Fatal(err) } + peerServer := peer.NewServer(combo) + err = peerServer.Start(":5567") + if err != nil { + log.Fatal(err) + } + interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan + peerServer.Shutdown() reflectorServer.Shutdown() } diff --git a/peer/server.go b/peer/server.go index 031ab66..73002c5 100644 --- a/peer/server.go +++ b/peer/server.go @@ -7,14 +7,12 @@ import ( "encoding/json" "io" "net" - "strings" "time" "github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/stop" "github.com/lbryio/reflector.go/store" - "github.com/davecgh/go-spew/spew" log "github.com/sirupsen/logrus" ) @@ -124,20 +122,21 @@ func (s *Server) handleConnection(conn net.Conn) { log.Error(errors.FullTrace(err)) } - if strings.Contains(string(request), `"requested_blobs"`) { - log.Debugln("received availability request") - response, err = s.handleAvailabilityRequest(request) - } else if strings.Contains(string(request), `"blob_data_payment_rate"`) { - log.Debugln("received rate negotiation request") - response, err = s.handlePaymentRateNegotiation(request) - } else if strings.Contains(string(request), `"requested_blob"`) { - log.Debugln("received blob request") - response, err = s.handleBlobRequest(request) - } else { - log.Errorln("invalid request") - spew.Dump(request) - return - } + //if strings.Contains(string(request), `"requested_blobs"`) { + // log.Debugln("received availability request") + // response, err = s.handleAvailabilityRequest(request) + //} else if strings.Contains(string(request), `"blob_data_payment_rate"`) { + // log.Debugln("received rate negotiation request") + // response, err = s.handlePaymentRateNegotiation(request) + //} else if strings.Contains(string(request), `"requested_blob"`) { + // log.Debugln("received blob request") + // response, err = s.handleBlobRequest(request) + //} else { + // log.Errorln("invalid request") + // spew.Dump(request) + // return + //} + response, err = s.handleCompositeRequest(request) if err != nil { log.Error(err) return @@ -215,6 +214,63 @@ func (s *Server) handleBlobRequest(data []byte) ([]byte, error) { return append(response, blob...), nil } +func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { + var request compositeRequest + err := json.Unmarshal(data, &request) + if err != nil { + return []byte{}, err + } + + response := compositeResponse{ + LbrycrdAddress: LbrycrdAddress, + } + + if len(request.RequestedBlobs) > 0 { + var availableBlobs []string + for _, blobHash := range request.RequestedBlobs { + exists, err := s.store.Has(blobHash) + if err != nil { + return []byte{}, err + } + if exists { + availableBlobs = append(availableBlobs, blobHash) + } + } + response.AvailableBlobs = availableBlobs + } + + response.BlobDataPaymentRate = paymentRateAccepted + if request.BlobDataPaymentRate < 0 { + response.BlobDataPaymentRate = paymentRateTooLow + } + + var blob []byte + if request.RequestedBlob != "" { + log.Println("Sending blob " + request.RequestedBlob[:8]) + + blob, err = s.store.Get(request.RequestedBlob) + if errors.Is(err, store.ErrBlobNotFound) { + response.IncomingBlob = incomingBlob{ + Error: err.Error(), + } + } else if err != nil { + return []byte{}, err + } else { + response.IncomingBlob = incomingBlob{ + BlobHash: GetBlobHash(blob), + Length: len(blob), + } + } + } + + respData, err := json.Marshal(response) + if err != nil { + return []byte{}, err + } + + return append(respData, blob...), nil +} + func readNextRequest(conn net.Conn) ([]byte, error) { request := make([]byte, 0) eof := false @@ -314,3 +370,17 @@ type incomingBlob struct { type blobResponse struct { IncomingBlob incomingBlob `json:"incoming_blob"` } + +type compositeRequest struct { + LbrycrdAddress bool `json:"lbrycrd_address"` + RequestedBlobs []string `json:"requested_blobs"` + BlobDataPaymentRate float64 `json:"blob_data_payment_rate"` + RequestedBlob string `json:"requested_blob"` +} + +type compositeResponse struct { + LbrycrdAddress string `json:"lbrycrd_address,omitempty"` + AvailableBlobs []string `json:"available_blobs,omitempty"` + BlobDataPaymentRate string `json:"blob_data_payment_rate,omitempty"` + IncomingBlob incomingBlob `json:"incoming_blob,omitempty"` +}