Replace QUIC with HTTP3/QUIC #43

Merged
nikooo777 merged 18 commits from http3 into master 2020-07-09 15:17:35 +02:00
9 changed files with 744 additions and 8 deletions
Showing only changes of commit fb0004bac4 - Show all commits

View file

@ -24,7 +24,7 @@ install: true
# set -e enabled in bash.
before_script:
# All the .go files, excluding vendor/ and model (auto generated)
- GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ )
# - GO_FILES=$(find . -iname '*.go' ! -iname '*_test.go' -type f | grep -v /vendor/ ) #i wish we were this crazy :p
- go get golang.org/x/tools/cmd/goimports # Used in build script for generated files
# - go get github.com/golang/lint/golint # Linter
# - go get honnef.co/go/tools/cmd/megacheck # Badass static analyzer/linter

View file

@ -7,10 +7,12 @@ import (
"syscall"
"time"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/peer/quic"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
@ -19,6 +21,10 @@ import (
)
var reflectorCmdCacheDir string
var peerPort int
var quicPeerPort int
var reflectorPort int
var metricsPort int
func init() {
var cmd = &cobra.Command{
@ -27,6 +33,10 @@ func init() {
Run: reflectorCmd,
}
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory")
cmd.Flags().IntVar(&peerPort, "peer-port", 5567, "The port reflector will distribute content from")
cmd.Flags().IntVar(&quicPeerPort, "quic-peer-port", 5568, "The port reflector will distribute content from over QUIC protocol")
cmd.Flags().IntVar(&reflectorPort, "reflector-port", 5566, "The port reflector will receive content from")
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics")
rootCmd.AddCommand(cmd)
}
@ -57,7 +67,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = true
err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort))
err = reflectorServer.Start(":" + strconv.Itoa(reflectorPort))
if err != nil {
log.Fatal(err)
}
@ -72,12 +82,18 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
}
peerServer := peer.NewServer(blobStore)
err = peerServer.Start(":5567")
err = peerServer.Start(":"+ strconv.Itoa(peerPort))
if err != nil {
log.Fatal(err)
}
metricsServer := metrics.NewServer(":2112", "/metrics")
quicPeerServer := quic.NewServer(blobStore)
err = quicPeerServer.Start(":"+ strconv.Itoa(quicPeerPort))
if err != nil {
log.Fatal(err)
}
metricsServer := metrics.NewServer(":" + strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start()
interruptChan := make(chan os.Signal, 1)
@ -85,6 +101,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
<-interruptChan
metricsServer.Shutdown()
peerServer.Shutdown()
quicPeerServer.Shutdown()
if reflectorServer != nil {
reflectorServer.Shutdown()
}

1
go.mod
View file

@ -20,6 +20,7 @@ require (
github.com/lbryio/lbry.go/v2 v2.4.5
github.com/lbryio/lbryschema.go v0.0.0-20190602173230-6d2f69a36f46
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec
github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.4.2

15
go.sum
View file

@ -31,6 +31,8 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@ -60,6 +62,8 @@ github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
@ -126,6 +130,7 @@ github.com/hashicorp/memberlist v0.1.4 h1:gkyML/r71w3FL8gUi74Vk76avkj/9lYAY9lvg0
github.com/hashicorp/memberlist v0.1.4/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf h1:WfD7VjIE6z8dIvMsI4/s+1qr5EL+zoIGev1BQj1eoJ8=
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf/go.mod h1:hyb9oH7vZsitZCiBt0ZvifOrB+qc8PS5IiilCIb87rg=
@ -179,6 +184,8 @@ github.com/lbryio/types v0.0.0-20191009145016-1bb8107e04f8/go.mod h1:CG3wsDv5BiV
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec h1:2xk/qg4VTOCJ8RzV/ED5AKqDcJ00zVb08ltf9V+sr3c=
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9 h1:tbuodUh2vuhOVZAdW3NEUvosFHUMJwUNl7jk/VSEiwc=
github.com/lucas-clemente/quic-go v0.7.1-0.20190401152353-907071221cf9/go.mod h1:PpMmPfPKO9nKJ/psF49ESTAGQSdfXxlg1otPbEB2nOw=
github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5 h1:AsEBgzv3DhuYHI/GiQh2HxvTP71HCCE9E/tzGUzGdtU=
github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5/go.mod h1:c2mYKRyMb1BPkO5St0c/ps62L4S0W2NAkaTXj9qEI+0=
github.com/lusis/slack-test v0.0.0-20180109053238-3c758769bfa6 h1:iOAVXzZyXtW408TMYejlUPo6BIn92HmOacWtIfNyYns=
@ -187,6 +194,8 @@ github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5 h1:mG83tLXWSRdcX
github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCWffGOaDcjkw1iB7W9DVLp6GXmfcJY/7YZCWPA4=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/marten-seemann/qtls v0.2.3 h1:0yWJ43C62LsZt08vuQJDK1uC1czUc3FJeCLPoNAI4vA=
github.com/marten-seemann/qtls v0.2.3/go.mod h1:xzjG7avBwGGbdZ8dTGxlBnLArsVKLvwmjgmPuiQEcYk=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
@ -210,8 +219,10 @@ github.com/nlopes/slack v0.6.0 h1:jt0jxVQGhssx1Ib7naAOZEZcGdtIhTzkP0nopK0AsRA=
github.com/nlopes/slack v0.6.0/go.mod h1:JzQ9m3PMAqcpeCam7UaHSuBuupz7CmpjehYMayT6YOk=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.2 h1:uqH7bpe+ERSiDa34FDOF7RikN6RzXgduUF8yarlZp94=
github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
@ -307,6 +318,7 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -350,6 +362,7 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190520201301-c432e742b0af/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -396,6 +409,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gorp.v1 v1.7.1 h1:GBB9KrWRATQZh95HJyVGUZrWwOPswitEYEyqlK8JbAA=
gopkg.in/gorp.v1 v1.7.1/go.mod h1:Wo3h+DBQZIxATwftsglhdD/62zRFPhGhTiu5jUJmCaw=
@ -404,6 +418,7 @@ gopkg.in/ini.v1 v1.48.0 h1:URjZc+8ugRY5mL5uUeQH/a63JcHwdX9xZaWvmNWD7z8=
gopkg.in/ini.v1 v1.48.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/nullbio/null.v6 v6.0.0-20161116030900-40264a2e6b79 h1:FpCr9V8wuOei4BAen+93HtVJ+XSi+KPbaPKm0Vj5R64=
gopkg.in/nullbio/null.v6 v6.0.0-20161116030900-40264a2e6b79/go.mod h1:gWkaRU7CoXpezCBWfWjm3999QqS+1pYPXGbqQCTMzo8=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=

View file

@ -70,6 +70,7 @@ const (
errReadConnReset = "read_conn_reset"
errWriteConnReset = "write_conn_reset"
errReadConnTimedOut = "read_conn_timed_out"
errNoNetworkActivity = "no_network_activity"
errWriteConnTimedOut = "write_conn_timed_out"
errWriteBrokenPipe = "write_broken_pipe"
errEPipe = "e_pipe"
@ -133,6 +134,9 @@ func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a
} else if strings.Contains(err.Error(), "read: connection timed out") { // the other side closed the connection using TCP reset
//log.Warnln("read conn timed out is not the same as ETIMEDOUT")
errType = errReadConnTimedOut
}else if strings.Contains(err.Error(), "NO_ERROR: No recent network activity") { // the other side closed the QUIC connection
//log.Warnln("read conn timed out is not the same as ETIMEDOUT")
errType = errNoNetworkActivity
} else if strings.Contains(err.Error(), "write: connection timed out") {
errType = errWriteConnTimedOut
} else if errors.Is(e, io.ErrUnexpectedEOF) {

213
peer/quic/client.go Normal file
View file

@ -0,0 +1,213 @@
package quic
import (
"bufio"
"crypto/tls"
"encoding/hex"
"encoding/json"
"io"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)
// ErrBlobExists is a default error for when a blob already exists on the reflector server.
var ErrBlobExists = errors.Base("blob exists on server")
// Client is an instance of a client connected to a server.
type Client struct {
Timeout time.Duration
conn quic.Session
stream quic.Stream
buf *bufio.Reader
connected bool
}
// Connect connects to a specific clients and errors if it cannot be contacted.
func (c *Client) Connect(address string) error {
var err error
if c.Timeout == 0 {
c.Timeout = 5 * time.Second
}
tlsConf := &tls.Config{
InsecureSkipVerify: true,
NextProtos: []string{"quic-echo-example"},
}
c.conn, err = quic.DialAddr(address, tlsConf, nil)
if err != nil {
return err
}
c.connected = true
c.stream, err = c.conn.OpenStream()
if err != nil {
return errors.Err(err)
}
c.buf = bufio.NewReader(c.stream)
return nil
}
// Close closes the connection with the client.
func (c *Client) Close() error {
c.connected = false
return c.conn.Close()
}
// GetStream gets a stream
func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) {
if !c.connected {
return nil, errors.Err("not connected")
}
var sd stream.SDBlob
b, err := c.GetBlob(sdHash)
if err != nil {
return nil, err
}
err = sd.FromBlob(b)
if err != nil {
return nil, err
}
s := make(stream.Stream, len(sd.BlobInfos)+1-1) // +1 for sd blob, -1 for last null blob
s[0] = b
for i := 0; i < len(sd.BlobInfos)-1; i++ {
s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
return nil, err
}
}
return s, nil
}
// HasBlob checks if the blob is available
func (c *Client) HasBlob(hash string) (bool, error) {
if !c.connected {
return false, errors.Err("not connected")
}
sendRequest, err := json.Marshal(availabilityRequest{
RequestedBlobs: []string{hash},
})
if err != nil {
return false, err
}
err = c.write(sendRequest)
if err != nil {
return false, err
}
var resp availabilityResponse
err = c.read(&resp)
if err != nil {
return false, err
}
for _, h := range resp.AvailableBlobs {
if h == hash {
return true, nil
}
}
return false, nil
}
// GetBlob gets a blob
func (c *Client) GetBlob(hash string) (stream.Blob, error) {
if !c.connected {
return nil, errors.Err("not connected")
}
sendRequest, err := json.Marshal(blobRequest{
RequestedBlob: hash,
})
if err != nil {
return nil, err
}
err = c.write(sendRequest)
if err != nil {
return nil, err
}
var resp blobResponse
err = c.read(&resp)
if err != nil {
return nil, err
}
if resp.IncomingBlob.Error != "" {
return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
}
if resp.IncomingBlob.BlobHash != hash {
return nil, errors.Prefix(hash[:8], "blob hash in response does not match requested hash")
}
if resp.IncomingBlob.Length <= 0 {
return nil, errors.Prefix(hash[:8], "length reported as <= 0")
}
log.Debugf("receiving blob %s from %s", hash[:8], c.conn.RemoteAddr())
blob, err := c.readRawBlob(resp.IncomingBlob.Length)
if err != nil {
return nil, err
}
return stream.Blob(blob), nil
}
func (c *Client) read(v interface{}) error {
err := c.stream.SetReadDeadline(time.Now().Add(c.Timeout))
if err != nil {
return errors.Err(err)
}
m, err := readNextMessage(c.buf)
if err != nil {
return err
}
log.Debugf("read %d bytes from %s", len(m), c.conn.RemoteAddr())
err = json.Unmarshal(m, v)
return errors.Err(err)
}
func (c *Client) readRawBlob(blobSize int) ([]byte, error) {
err := c.stream.SetReadDeadline(time.Now().Add(c.Timeout))
if err != nil {
return nil, errors.Err(err)
}
blob := make([]byte, blobSize)
n, err := io.ReadFull(c.buf, blob)
log.Debugf("read %d bytes from %s", n, c.conn.RemoteAddr())
return blob, errors.Err(err)
}
func (c *Client) write(b []byte) error {
err := c.stream.SetWriteDeadline(time.Now().Add(c.Timeout))
if err != nil {
return errors.Err(err)
}
log.Debugf("writing %d bytes to %s", len(b), c.conn.RemoteAddr())
n, err := c.stream.Write(b)
if err == nil && n != len(b) {
err = io.ErrShortWrite
}
return errors.Err(err)
}

427
peer/quic/server.go Normal file
View file

@ -0,0 +1,427 @@
package quic
import (
"bufio"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"io"
"math/big"
"strings"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)
const (
// DefaultPort is the port the peer server listens on if not passed in.
DefaultPort = 3333
// LbrycrdAddress to be used when paying for data. Not implemented yet.
LbrycrdAddress = "bJxKvpD96kaJLriqVajZ7SaQTsWWyrGQct"
)
// Server is an instance of a peer server that houses the listener and store.
type Server struct {
store store.BlobStore
closed bool
grp *stop.Group
}
// NewServer returns an initialized Server pointer.
func NewServer(store store.BlobStore) *Server {
return &Server{
store: store,
grp: stop.New(),
}
}
// Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() {
log.Debug("shutting down peer server")
s.grp.StopAndWait()
log.Debug("peer server stopped")
}
// Start starts the server listener to handle connections.
func (s *Server) Start(address string) error {
log.Println("QUIC peer listening on " + address)
l, err := quic.ListenAddr(address, generateTLSConfig(), nil)
if err != nil {
return err
}
go s.listenForShutdown(l)
s.grp.Add(1)
go func() {
s.listenAndServe(l)
s.grp.Done()
}()
return nil
}
func (s *Server) listenForShutdown(listener quic.Listener) {
<-s.grp.Ch()
s.closed = true
err := listener.Close()
if err != nil {
log.Error("error closing listener for peer server - ", err)
}
}
func (s *Server) listenAndServe(listener quic.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
if s.closed {
return
}
log.Error(errors.Prefix("accepting conn", err))
} else {
s.grp.Add(1)
go func() {
s.handleConnection(conn)
s.grp.Done()
}()
}
}
}
func (s *Server) handleConnection(conn quic.Session) {
defer func() {
if err := conn.Close(); err != nil {
log.Error(errors.Prefix("closing peer conn", err))
}
}()
timeoutDuration := 1 * time.Minute
stream, err := conn.AcceptStream()
if err != nil {
log.Error(err)
return
}
buf := bufio.NewReader(stream)
for {
var request []byte
var response []byte
err := stream.SetReadDeadline(time.Now().Add(timeoutDuration))
if err != nil {
log.Error(errors.FullTrace(err))
}
request, err = readNextMessage(buf)
if err != nil {
if err != io.EOF {
s.logError(err)
}
return
}
err = stream.SetReadDeadline(time.Time{})
if err != nil {
log.Error(errors.FullTrace(err))
}
response, err = s.handleCompositeRequest(request)
if err != nil {
log.Error(errors.FullTrace(err))
return
}
err = stream.SetWriteDeadline(time.Now().Add(timeoutDuration))
if err != nil {
log.Error(errors.FullTrace(err))
}
n, err := stream.Write(response)
if err != nil {
if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset
s.logError(err)
}
return
} else if n != len(response) {
log.Errorln(io.ErrShortWrite)
return
}
err = stream.SetWriteDeadline(time.Time{})
if err != nil {
log.Error(errors.FullTrace(err))
}
}
}
func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
var request availabilityRequest
err := json.Unmarshal(data, &request)
if err != nil {
return nil, errors.Err(err)
}
availableBlobs := []string{}
for _, blobHash := range request.RequestedBlobs {
exists, err := s.store.Has(blobHash)
if err != nil {
return nil, err
}
if exists {
availableBlobs = append(availableBlobs, blobHash)
}
}
return json.Marshal(availabilityResponse{LbrycrdAddress: LbrycrdAddress, AvailableBlobs: availableBlobs})
}
//func (s *Server) handlePaymentRateNegotiation(data []byte) ([]byte, error) {
// var request paymentRateRequest
// err := json.Unmarshal(data, &request)
// if err != nil {
// return nil, err
// }
//
// offerReply := paymentRateAccepted
// if request.BlobDataPaymentRate < 0 {
// offerReply = paymentRateTooLow
// }
//
// return json.Marshal(paymentRateResponse{BlobDataPaymentRate: offerReply})
//}
//
//func (s *Server) handleBlobRequest(data []byte) ([]byte, error) {
// var request blobRequest
// err := json.Unmarshal(data, &request)
// if err != nil {
// return nil, err
// }
//
// log.Debugln("Sending blob " + request.RequestedBlob[:8])
//
// blob, err := s.store.Get(request.RequestedBlob)
// if err != nil {
// return nil, err
// }
//
// response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{
// BlobHash: reflector.BlobHash(blob),
// Length: len(blob),
// }})
// if err != nil {
// return nil, err
// }
//
// return append(response, blob...), nil
//}
func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
var request compositeRequest
err := json.Unmarshal(data, &request)
if err != nil {
return nil, errors.Err(err)
}
response := compositeResponse{
LbrycrdAddress: LbrycrdAddress,
}
if len(request.RequestedBlobs) > 0 {
var availableBlobs []string
for _, blobHash := range request.RequestedBlobs {
exists, err := s.store.Has(blobHash)
if err != nil {
return nil, err
}
if exists {
availableBlobs = append(availableBlobs, blobHash)
}
}
response.AvailableBlobs = availableBlobs
}
response.BlobDataPaymentRate = paymentRateAccepted
if request.BlobDataPaymentRate < 0 {
response.BlobDataPaymentRate = paymentRateTooLow
}
var blob []byte
if request.RequestedBlob != "" {
if len(request.RequestedBlob) != stream.BlobHashHexLength {
return nil, errors.Err("Invalid blob hash length")
}
log.Debugln("Sending blob " + request.RequestedBlob[:8])
blob, err = s.store.Get(request.RequestedBlob)
if errors.Is(err, store.ErrBlobNotFound) {
response.IncomingBlob = incomingBlob{
Error: err.Error(),
}
} else if err != nil {
return nil, err
} else {
response.IncomingBlob = incomingBlob{
BlobHash: reflector.BlobHash(blob),
Length: len(blob),
}
metrics.BlobDownloadCount.Inc()
}
}
respData, err := json.Marshal(response)
if err != nil {
return nil, err
}
return append(respData, blob...), nil
}
func (s *Server) logError(e error) {
if e == nil {
return
}
shouldLog := metrics.TrackError(metrics.DirectionDownload, e)
if shouldLog {
log.Errorln(errors.FullTrace(e))
}
}
func readNextMessage(buf *bufio.Reader) ([]byte, error) {
msg := make([]byte, 0)
eof := false
for {
chunk, err := buf.ReadBytes('}')
if err != nil {
if err != io.EOF {
//log.Errorln("readBytes error:", err) // logged by caller
return msg, err
}
eof = true
}
//log.Debugln("got", len(chunk), "bytes.")
//spew.Dump(chunk)
if len(chunk) > 0 {
msg = append(msg, chunk...)
if len(msg) > maxRequestSize {
return msg, errRequestTooLarge
}
// yes, this is how the peer protocol knows when the request finishes
if reflector.IsValidJSON(msg) {
break
}
}
if eof {
break
}
}
//log.Debugln("total size:", len(request))
//if len(request) > 0 {
// spew.Dump(request)
//}
if len(msg) == 0 && eof {
return nil, io.EOF
}
return msg, nil
}
const (
maxRequestSize = 64 * (2 ^ 10) // 64kb
paymentRateAccepted = "RATE_ACCEPTED"
paymentRateTooLow = "RATE_TOO_LOW"
//ToDo: paymentRateUnset is not used but exists in the protocol.
//paymentRateUnset = "RATE_UNSET"
)
var errRequestTooLarge = errors.Base("request is too large")
type availabilityRequest struct {
LbrycrdAddress bool `json:"lbrycrd_address"`
RequestedBlobs []string `json:"requested_blobs"`
}
type availabilityResponse struct {
LbrycrdAddress string `json:"lbrycrd_address"`
AvailableBlobs []string `json:"available_blobs"`
}
type paymentRateRequest struct {
BlobDataPaymentRate float64 `json:"blob_data_payment_rate"`
}
type paymentRateResponse struct {
BlobDataPaymentRate string `json:"blob_data_payment_rate"`
}
type blobRequest struct {
RequestedBlob string `json:"requested_blob"`
}
type incomingBlob struct {
Error string `json:"error,omitempty"`
BlobHash string `json:"blob_hash"`
Length int `json:"length"`
}
type blobResponse struct {
IncomingBlob incomingBlob `json:"incoming_blob"`
}
type compositeRequest struct {
LbrycrdAddress bool `json:"lbrycrd_address"`
RequestedBlobs []string `json:"requested_blobs"`
BlobDataPaymentRate float64 `json:"blob_data_payment_rate"`
RequestedBlob string `json:"requested_blob"`
}
type compositeResponse struct {
LbrycrdAddress string `json:"lbrycrd_address,omitempty"`
AvailableBlobs []string `json:"available_blobs,omitempty"`
BlobDataPaymentRate string `json:"blob_data_payment_rate,omitempty"`
IncomingBlob incomingBlob `json:"incoming_blob,omitempty"`
}
// Setup a bare-bones TLS config for the server
func generateTLSConfig() *tls.Config {
key, err := rsa.GenerateKey(rand.Reader, 1024)
if err != nil {
panic(err)
}
template := x509.Certificate{SerialNumber: big.NewInt(1)}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
panic(err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
panic(err)
}
return &tls.Config{
Certificates: []tls.Certificate{tlsCert},
NextProtos: []string{"quic-echo-example"},
}
}

61
peer/quic/store.go Normal file
View file

@ -0,0 +1,61 @@
package quic
import (
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
)
// Store is a blob store that gets blobs from a peer.
// It satisfies the store.BlobStore interface but cannot put or delete blobs.
type Store struct {
client *Client
connErr error
}
// StoreOpts allows to set options for a new Store.
type StoreOpts struct {
Address string
Timeout time.Duration
}
// NewStore makes a new peer store.
func NewStore(opts StoreOpts) *Store {
c := &Client{Timeout: opts.Timeout}
err := c.Connect(opts.Address)
return &Store{client: c, connErr: err}
}
// Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) {
if p.connErr != nil {
return false, errors.Prefix("connection error", p.connErr)
}
return p.client.HasBlob(hash)
}
// Get downloads the blob from the peer
func (p *Store) Get(hash string) (stream.Blob, error) {
if p.connErr != nil {
return nil, errors.Prefix("connection error", p.connErr)
}
return p.client.GetBlob(hash)
}
// Put is not supported
func (p *Store) Put(hash string, blob stream.Blob) error {
panic("PeerStore cannot put or delete blobs")
}
// PutSD is not supported
func (p *Store) PutSD(hash string, blob stream.Blob) error {
panic("PeerStore cannot put or delete blobs")
}
// Delete is not supported
func (p *Store) Delete(hash string) error {
panic("PeerStore cannot put or delete blobs")
}

View file

@ -41,7 +41,7 @@ func updateBlocklist(b store.Blocklister) {
for name, v := range values {
if v.Err != nil {
log.Error(errors.Err("blocklist: %s: %s", name, v.Err))
log.Error(errors.FullTrace(errors.Err("blocklist: %s: %s", name, v.Err)))
continue
}
@ -101,9 +101,7 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) {
"spv9.lbry.com:50001",
"spv15.lbry.com:50001",
"spv17.lbry.com:50001",
"spv19.1bry.com:50001",
"spv25.lbry.com:50001",
"spv26.lbry.com:50001",
}, nil)
if err != nil {
return nil, errors.Err(err)