diff --git a/.travis.yml b/.travis.yml index c9f1b44..933ce39 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,7 @@ install: true # set -e enabled in bash. before_script: # All the .go files, excluding vendor/ and model (auto generated) - - GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ ) +# - GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ ) #i wish we were this crazy :p - go get golang.org/x/tools/cmd/goimports # Used in build script for generated files # - go get github.com/golang/lint/golint # Linter # - go get honnef.co/go/tools/cmd/megacheck # Badass static analyzer/linter diff --git a/cmd/reflector.go b/cmd/reflector.go index 8177d1e..19802f0 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -7,10 +7,12 @@ import ( "syscall" "time" + "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/peer/quic" + "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" - "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" @@ -19,6 +21,10 @@ import ( ) var reflectorCmdCacheDir string +var peerPort int +var quicPeerPort int +var reflectorPort int +var metricsPort int func init() { var cmd = &cobra.Command{ @@ -27,6 +33,10 @@ func init() { Run: reflectorCmd, } cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory") + cmd.Flags().IntVar(&peerPort, "peer-port", 5567, "The port reflector will distribute content from") + cmd.Flags().IntVar(&quicPeerPort, "quic-peer-port", 5568, "The port reflector will distribute content from over QUIC protocol") + cmd.Flags().IntVar(&reflectorPort, "reflector-port", 5566, "The port reflector will receive content from") + cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") rootCmd.AddCommand(cmd) } @@ -57,7 +67,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { reflectorServer.Timeout = 3 * time.Minute reflectorServer.EnableBlocklist = true - err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort)) + err = reflectorServer.Start(":" + strconv.Itoa(reflectorPort)) if err != nil { log.Fatal(err) } @@ -72,12 +82,18 @@ func reflectorCmd(cmd *cobra.Command, args []string) { } peerServer := peer.NewServer(blobStore) - err = peerServer.Start(":5567") + err = peerServer.Start(":"+ strconv.Itoa(peerPort)) if err != nil { log.Fatal(err) } - metricsServer := metrics.NewServer(":2112", "/metrics") + quicPeerServer := quic.NewServer(blobStore) + err = quicPeerServer.Start(":"+ strconv.Itoa(quicPeerPort)) + if err != nil { + log.Fatal(err) + } + + metricsServer := metrics.NewServer(":" + strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() interruptChan := make(chan os.Signal, 1) @@ -85,6 +101,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { <-interruptChan metricsServer.Shutdown() peerServer.Shutdown() + quicPeerServer.Shutdown() if reflectorServer != nil { reflectorServer.Shutdown() } diff --git a/go.mod b/go.mod index a83d35d..73abece 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/lbryio/lbry.go/v2 v2.4.5 github.com/lbryio/lbryschema.go v0.0.0-20190602173230-6d2f69a36f46 github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec + github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 github.com/prometheus/client_golang v0.9.2 github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index 0ff2261..785df1f 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= +github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -60,6 +62,8 @@ github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4 github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk= +github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= @@ -126,6 +130,7 @@ github.com/hashicorp/memberlist v0.1.4 h1:gkyML/r71w3FL8gUi74Vk76avkj/9lYAY9lvg0 github.com/hashicorp/memberlist v0.1.4/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf h1:WfD7VjIE6z8dIvMsI4/s+1qr5EL+zoIGev1BQj1eoJ8= github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf/go.mod h1:hyb9oH7vZsitZCiBt0ZvifOrB+qc8PS5IiilCIb87rg= @@ -179,6 +184,8 @@ github.com/lbryio/types v0.0.0-20191009145016-1bb8107e04f8/go.mod h1:CG3wsDv5BiV github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec h1:2xk/qg4VTOCJ8RzV/ED5AKqDcJ00zVb08ltf9V+sr3c= github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE= github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc= +github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw= github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5 h1:AsEBgzv3DhuYHI/GiQh2HxvTP71HCCE9E/tzGUzGdtU= github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5/go.mod h1:c2mYKRyMb1BPkO5St0c/ps62L4S0W2NAkaTXj9qEI+0= github.com/lusis/slack-test v0.0.0-20180109053238-3c758769bfa6 h1:iOAVXzZyXtW408TMYejlUPo6BIn92HmOacWtIfNyYns= @@ -187,6 +194,8 @@ github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5 h1:mG83tLXWSRdcX github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4= github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA= +github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk= github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= @@ -210,8 +219,10 @@ github.com/nlopes/slack v0.6.0 h1:jt0jxVQGhssx1Ib7naAOZEZcGdtIhTzkP0nopK0AsRA= github.com/nlopes/slack v0.6.0/go.mod h1:JzQ9m3PMAqcpeCam7UaHSuBuupz7CmpjehYMayT6YOk= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.2 h1:uqH7bpe+ERSiDa34FDOF7RikN6RzXgduUF8yarlZp94= github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= @@ -307,6 +318,7 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -350,6 +362,7 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190520201301-c432e742b0af/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -396,6 +409,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gorp.v1 v1.7.1 h1:GBB9KrWRATQZh95HJyVGUZrWwOPswitEYEyqlK8JbAA= gopkg.in/gorp.v1 v1.7.1/go.mod h1:Wo3h+DBQZIxATwftsglhdD/62zRFPhGhTiu5jUJmCaw= @@ -404,6 +418,7 @@ gopkg.in/ini.v1 v1.48.0 h1:URjZc+8ugRY5mL5uUeQH/a63JcHwdX9xZaWvmNWD7z8= gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/nullbio/null.v6 v6.0.0-20161116030900-40264a2e6b79 h1:FpCr9V8wuOei4BAen+93HtVJ+XSi+KPbaPKm0Vj5R64= gopkg.in/nullbio/null.v6 v6.0.0-20161116030900-40264a2e6b79/go.mod h1:gWkaRU7CoXpezCBWfWjm3999QqS+1pYPXGbqQCTMzo8= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 93e0756..617bd55 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -70,6 +70,7 @@ const ( errReadConnReset = "read_conn_reset" errWriteConnReset = "write_conn_reset" errReadConnTimedOut = "read_conn_timed_out" + errNoNetworkActivity = "no_network_activity" errWriteConnTimedOut = "write_conn_timed_out" errWriteBrokenPipe = "write_broken_pipe" errEPipe = "e_pipe" @@ -133,6 +134,9 @@ func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a } else if strings.Contains(err.Error(), "read: connection timed out") { // the other side closed the connection using TCP reset //log.Warnln("read conn timed out is not the same as ETIMEDOUT") errType = errReadConnTimedOut + }else if strings.Contains(err.Error(), "NO_ERROR: No recent network activity") { // the other side closed the QUIC connection + //log.Warnln("read conn timed out is not the same as ETIMEDOUT") + errType = errNoNetworkActivity } else if strings.Contains(err.Error(), "write: connection timed out") { errType = errWriteConnTimedOut } else if errors.Is(e, io.ErrUnexpectedEOF) { diff --git a/peer/quic/client.go b/peer/quic/client.go new file mode 100644 index 0000000..08eb753 --- /dev/null +++ b/peer/quic/client.go @@ -0,0 +1,213 @@ +package quic + +import ( + "bufio" + "crypto/tls" + "encoding/hex" + "encoding/json" + "io" + "time" + + "github.com/lucas-clemente/quic-go" + + "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + log "github.com/sirupsen/logrus" +) + +// ErrBlobExists is a default error for when a blob already exists on the reflector server. +var ErrBlobExists = errors.Base("blob exists on server") + +// Client is an instance of a client connected to a server. +type Client struct { + Timeout time.Duration + conn quic.Session + stream quic.Stream + buf *bufio.Reader + connected bool +} + +// Connect connects to a specific clients and errors if it cannot be contacted. +func (c *Client) Connect(address string) error { + var err error + if c.Timeout == 0 { + c.Timeout = 5 * time.Second + } + tlsConf := &tls.Config{ + InsecureSkipVerify: true, + NextProtos: []string{"quic-echo-example"}, + } + + c.conn, err = quic.DialAddr(address, tlsConf, nil) + if err != nil { + return err + } + c.connected = true + c.stream, err = c.conn.OpenStream() + if err != nil { + return errors.Err(err) + } + c.buf = bufio.NewReader(c.stream) + return nil +} + +// Close closes the connection with the client. +func (c *Client) Close() error { + c.connected = false + return c.conn.Close() +} + +// GetStream gets a stream +func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) { + if !c.connected { + return nil, errors.Err("not connected") + } + + var sd stream.SDBlob + + b, err := c.GetBlob(sdHash) + if err != nil { + return nil, err + } + + err = sd.FromBlob(b) + if err != nil { + return nil, err + } + + s := make(stream.Stream, len(sd.BlobInfos)+1-1) // +1 for sd blob, -1 for last null blob + s[0] = b + + for i := 0; i < len(sd.BlobInfos)-1; i++ { + s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) + if err != nil { + return nil, err + } + } + + return s, nil +} + +// HasBlob checks if the blob is available +func (c *Client) HasBlob(hash string) (bool, error) { + if !c.connected { + return false, errors.Err("not connected") + } + + sendRequest, err := json.Marshal(availabilityRequest{ + RequestedBlobs: []string{hash}, + }) + if err != nil { + return false, err + } + + err = c.write(sendRequest) + if err != nil { + return false, err + } + + var resp availabilityResponse + err = c.read(&resp) + if err != nil { + return false, err + } + + for _, h := range resp.AvailableBlobs { + if h == hash { + return true, nil + } + } + + return false, nil +} + +// GetBlob gets a blob +func (c *Client) GetBlob(hash string) (stream.Blob, error) { + if !c.connected { + return nil, errors.Err("not connected") + } + + sendRequest, err := json.Marshal(blobRequest{ + RequestedBlob: hash, + }) + if err != nil { + return nil, err + } + + err = c.write(sendRequest) + if err != nil { + return nil, err + } + + var resp blobResponse + err = c.read(&resp) + if err != nil { + return nil, err + } + + if resp.IncomingBlob.Error != "" { + return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error) + } + if resp.IncomingBlob.BlobHash != hash { + return nil, errors.Prefix(hash[:8], "blob hash in response does not match requested hash") + } + if resp.IncomingBlob.Length <= 0 { + return nil, errors.Prefix(hash[:8], "length reported as <= 0") + } + + log.Debugf("receiving blob %s from %s", hash[:8], c.conn.RemoteAddr()) + + blob, err := c.readRawBlob(resp.IncomingBlob.Length) + if err != nil { + return nil, err + } + + return stream.Blob(blob), nil +} + +func (c *Client) read(v interface{}) error { + err := c.stream.SetReadDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return errors.Err(err) + } + + m, err := readNextMessage(c.buf) + if err != nil { + return err + } + + log.Debugf("read %d bytes from %s", len(m), c.conn.RemoteAddr()) + + err = json.Unmarshal(m, v) + return errors.Err(err) +} + +func (c *Client) readRawBlob(blobSize int) ([]byte, error) { + err := c.stream.SetReadDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return nil, errors.Err(err) + } + + blob := make([]byte, blobSize) + n, err := io.ReadFull(c.buf, blob) + log.Debugf("read %d bytes from %s", n, c.conn.RemoteAddr()) + return blob, errors.Err(err) +} + +func (c *Client) write(b []byte) error { + err := c.stream.SetWriteDeadline(time.Now().Add(c.Timeout)) + if err != nil { + return errors.Err(err) + } + + log.Debugf("writing %d bytes to %s", len(b), c.conn.RemoteAddr()) + + n, err := c.stream.Write(b) + if err == nil && n != len(b) { + err = io.ErrShortWrite + } + return errors.Err(err) +} diff --git a/peer/quic/server.go b/peer/quic/server.go new file mode 100644 index 0000000..601e904 --- /dev/null +++ b/peer/quic/server.go @@ -0,0 +1,427 @@ +package quic + +import ( + "bufio" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/json" + "encoding/pem" + "io" + "math/big" + "strings" + "time" + + "github.com/lucas-clemente/quic-go" + + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/reflector" + "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/lbryio/lbry.go/v2/stream" + + log "github.com/sirupsen/logrus" +) + +const ( + // DefaultPort is the port the peer server listens on if not passed in. + DefaultPort = 3333 + // LbrycrdAddress to be used when paying for data. Not implemented yet. + LbrycrdAddress = "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct" +) + +// Server is an instance of a peer server that houses the listener and store. +type Server struct { + store store.BlobStore + closed bool + + grp *stop.Group +} + +// NewServer returns an initialized Server pointer. +func NewServer(store store.BlobStore) *Server { + return &Server{ + store: store, + grp: stop.New(), + } +} + +// Shutdown gracefully shuts down the peer server. +func (s *Server) Shutdown() { + log.Debug("shutting down peer server") + s.grp.StopAndWait() + log.Debug("peer server stopped") +} + +// Start starts the server listener to handle connections. +func (s *Server) Start(address string) error { + log.Println("QUIC peer listening on " + address) + + l, err := quic.ListenAddr(address, generateTLSConfig(), nil) + if err != nil { + return err + } + + go s.listenForShutdown(l) + s.grp.Add(1) + go func() { + s.listenAndServe(l) + s.grp.Done() + }() + + return nil +} + +func (s *Server) listenForShutdown(listener quic.Listener) { + <-s.grp.Ch() + s.closed = true + err := listener.Close() + if err != nil { + log.Error("error closing listener for peer server - ", err) + } +} + +func (s *Server) listenAndServe(listener quic.Listener) { + for { + conn, err := listener.Accept() + if err != nil { + if s.closed { + return + } + log.Error(errors.Prefix("accepting conn", err)) + } else { + s.grp.Add(1) + go func() { + s.handleConnection(conn) + s.grp.Done() + }() + } + } +} + +func (s *Server) handleConnection(conn quic.Session) { + defer func() { + if err := conn.Close(); err != nil { + log.Error(errors.Prefix("closing peer conn", err)) + } + }() + + timeoutDuration := 1 * time.Minute + stream, err := conn.AcceptStream() + if err != nil { + log.Error(err) + return + } + buf := bufio.NewReader(stream) + for { + var request []byte + var response []byte + + err := stream.SetReadDeadline(time.Now().Add(timeoutDuration)) + if err != nil { + log.Error(errors.FullTrace(err)) + } + + request, err = readNextMessage(buf) + if err != nil { + if err != io.EOF { + s.logError(err) + } + return + } + + err = stream.SetReadDeadline(time.Time{}) + if err != nil { + log.Error(errors.FullTrace(err)) + } + + response, err = s.handleCompositeRequest(request) + if err != nil { + log.Error(errors.FullTrace(err)) + return + } + + err = stream.SetWriteDeadline(time.Now().Add(timeoutDuration)) + if err != nil { + log.Error(errors.FullTrace(err)) + } + + n, err := stream.Write(response) + if err != nil { + if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset + s.logError(err) + } + return + } else if n != len(response) { + log.Errorln(io.ErrShortWrite) + return + } + + err = stream.SetWriteDeadline(time.Time{}) + if err != nil { + log.Error(errors.FullTrace(err)) + } + } +} + +func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) { + var request availabilityRequest + err := json.Unmarshal(data, &request) + if err != nil { + return nil, errors.Err(err) + } + + availableBlobs := []string{} + for _, blobHash := range request.RequestedBlobs { + exists, err := s.store.Has(blobHash) + if err != nil { + return nil, err + } + if exists { + availableBlobs = append(availableBlobs, blobHash) + } + } + + return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs}) +} + +//func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) { +// var request paymentRateRequest +// err := json.Unmarshal(data, &request) +// if err != nil { +// return nil, err +// } +// +// offerReply := paymentRateAccepted +// if request.BlobDataPaymentRate < 0 { +// offerReply = paymentRateTooLow +// } +// +// return json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply}) +//} +// +//func (s *Server) handleBlobRequest(data []byte) ([]byte, error) { +// var request blobRequest +// err := json.Unmarshal(data, &request) +// if err != nil { +// return nil, err +// } +// +// log.Debugln("Sending blob " + request.RequestedBlob[:8]) +// +// blob, err := s.store.Get(request.RequestedBlob) +// if err != nil { +// return nil, err +// } +// +// response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{ +// BlobHash: reflector.BlobHash(blob), +// Length: len(blob), +// }}) +// if err != nil { +// return nil, err +// } +// +// return append(response, blob...), nil +//} + +func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { + var request compositeRequest + err := json.Unmarshal(data, &request) + if err != nil { + return nil, errors.Err(err) + } + + response := compositeResponse{ + LbrycrdAddress: LbrycrdAddress, + } + + if len(request.RequestedBlobs) > 0 { + var availableBlobs []string + for _, blobHash := range request.RequestedBlobs { + exists, err := s.store.Has(blobHash) + if err != nil { + return nil, err + } + if exists { + availableBlobs = append(availableBlobs, blobHash) + } + } + response.AvailableBlobs = availableBlobs + } + + response.BlobDataPaymentRate = paymentRateAccepted + if request.BlobDataPaymentRate < 0 { + response.BlobDataPaymentRate = paymentRateTooLow + } + + var blob []byte + if request.RequestedBlob != "" { + if len(request.RequestedBlob) != stream.BlobHashHexLength { + return nil, errors.Err("Invalid blob hash length") + } + + log.Debugln("Sending blob " + request.RequestedBlob[:8]) + + blob, err = s.store.Get(request.RequestedBlob) + if errors.Is(err, store.ErrBlobNotFound) { + response.IncomingBlob = incomingBlob{ + Error: err.Error(), + } + } else if err != nil { + return nil, err + } else { + response.IncomingBlob = incomingBlob{ + BlobHash: reflector.BlobHash(blob), + Length: len(blob), + } + metrics.BlobDownloadCount.Inc() + } + } + + respData, err := json.Marshal(response) + if err != nil { + return nil, err + } + + return append(respData, blob...), nil +} + +func (s *Server) logError(e error) { + if e == nil { + return + } + shouldLog := metrics.TrackError(metrics.DirectionDownload, e) + if shouldLog { + log.Errorln(errors.FullTrace(e)) + } +} + +func readNextMessage(buf *bufio.Reader) ([]byte, error) { + msg := make([]byte, 0) + eof := false + + for { + chunk, err := buf.ReadBytes('}') + if err != nil { + if err != io.EOF { + //log.Errorln("readBytes error:", err) // logged by caller + return msg, err + } + eof = true + } + + //log.Debugln("got", len(chunk), "bytes.") + //spew.Dump(chunk) + + if len(chunk) > 0 { + msg = append(msg, chunk...) + + if len(msg) > maxRequestSize { + return msg, errRequestTooLarge + } + + // yes, this is how the peer protocol knows when the request finishes + if reflector.IsValidJSON(msg) { + break + } + } + + if eof { + break + } + } + + //log.Debugln("total size:", len(request)) + //if len(request) > 0 { + // spew.Dump(request) + //} + + if len(msg) == 0 && eof { + return nil, io.EOF + } + + return msg, nil +} + +const ( + maxRequestSize = 64 * (2 ^ 10) // 64kb + paymentRateAccepted = "RATE_ACCEPTED" + paymentRateTooLow = "RATE_TOO_LOW" + //ToDo: paymentRateUnset is not used but exists in the protocol. + //paymentRateUnset = "RATE_UNSET" +) + +var errRequestTooLarge = errors.Base("request is too large") + +type availabilityRequest struct { + LbrycrdAddress bool `json:"lbrycrd_address"` + RequestedBlobs []string `json:"requested_blobs"` +} + +type availabilityResponse struct { + LbrycrdAddress string `json:"lbrycrd_address"` + AvailableBlobs []string `json:"available_blobs"` +} + +type paymentRateRequest struct { + BlobDataPaymentRate float64 `json:"blob_data_payment_rate"` +} + +type paymentRateResponse struct { + BlobDataPaymentRate string `json:"blob_data_payment_rate"` +} + +type blobRequest struct { + RequestedBlob string `json:"requested_blob"` +} + +type incomingBlob struct { + Error string `json:"error,omitempty"` + BlobHash string `json:"blob_hash"` + Length int `json:"length"` +} +type blobResponse struct { + IncomingBlob incomingBlob `json:"incoming_blob"` +} + +type compositeRequest struct { + LbrycrdAddress bool `json:"lbrycrd_address"` + RequestedBlobs []string `json:"requested_blobs"` + BlobDataPaymentRate float64 `json:"blob_data_payment_rate"` + RequestedBlob string `json:"requested_blob"` +} + +type compositeResponse struct { + LbrycrdAddress string `json:"lbrycrd_address,omitempty"` + AvailableBlobs []string `json:"available_blobs,omitempty"` + BlobDataPaymentRate string `json:"blob_data_payment_rate,omitempty"` + IncomingBlob incomingBlob `json:"incoming_blob,omitempty"` +} + +// Setup a bare-bones TLS config for the server +func generateTLSConfig() *tls.Config { + key, err := rsa.GenerateKey(rand.Reader, 1024) + if err != nil { + panic(err) + } + template := x509.Certificate{SerialNumber: big.NewInt(1)} + certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) + if err != nil { + panic(err) + } + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) + + tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + panic(err) + } + return &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + NextProtos: []string{"quic-echo-example"}, + } +} diff --git a/peer/quic/store.go b/peer/quic/store.go new file mode 100644 index 0000000..dd3ffe5 --- /dev/null +++ b/peer/quic/store.go @@ -0,0 +1,61 @@ +package quic + +import ( + "time" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" +) + +// Store is a blob store that gets blobs from a peer. +// It satisfies the store.BlobStore interface but cannot put or delete blobs. +type Store struct { + client *Client + connErr error +} + +// StoreOpts allows to set options for a new Store. +type StoreOpts struct { + Address string + Timeout time.Duration +} + +// NewStore makes a new peer store. +func NewStore(opts StoreOpts) *Store { + c := &Client{Timeout: opts.Timeout} + err := c.Connect(opts.Address) + return &Store{client: c, connErr: err} +} + +// Has asks the peer if they have a hash +func (p *Store) Has(hash string) (bool, error) { + if p.connErr != nil { + return false, errors.Prefix("connection error", p.connErr) + } + + return p.client.HasBlob(hash) +} + +// Get downloads the blob from the peer +func (p *Store) Get(hash string) (stream.Blob, error) { + if p.connErr != nil { + return nil, errors.Prefix("connection error", p.connErr) + } + + return p.client.GetBlob(hash) +} + +// Put is not supported +func (p *Store) Put(hash string, blob stream.Blob) error { + panic("PeerStore cannot put or delete blobs") +} + +// PutSD is not supported +func (p *Store) PutSD(hash string, blob stream.Blob) error { + panic("PeerStore cannot put or delete blobs") +} + +// Delete is not supported +func (p *Store) Delete(hash string) error { + panic("PeerStore cannot put or delete blobs") +} diff --git a/reflector/blocklist.go b/reflector/blocklist.go index 9f8f4ad..42aa2ce 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -41,7 +41,7 @@ func updateBlocklist(b store.Blocklister) { for name, v := range values { if v.Err != nil { - log.Error(errors.Err("blocklist: %s: %s", name, v.Err)) + log.Error(errors.FullTrace(errors.Err("blocklist: %s: %s", name, v.Err))) continue } @@ -101,9 +101,7 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) { "spv9.lbry.com:50001", "spv15.lbry.com:50001", "spv17.lbry.com:50001", - "spv19.1bry.com:50001", "spv25.lbry.com:50001", - "spv26.lbry.com:50001", }, nil) if err != nil { return nil, errors.Err(err)