refactor code

This commit is contained in:
Niko Storni 2020-06-30 01:14:52 +02:00
parent 8a5f57b14f
commit 09c7718f30
5 changed files with 43 additions and 43 deletions

View file

@ -7,12 +7,11 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/peer/http3"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/peer/http3"
"github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
@ -21,14 +20,14 @@ import (
) )
var reflectorCmdCacheDir string var reflectorCmdCacheDir string
var peerPort int var tcpPeerPort int
var quicPeerPort int var http3PeerPort int
var reflectorPort int var receiverPort int
var metricsPort int var metricsPort int
var disableUploads bool var disableUploads bool
var reflectorServerAddress string var proxyAddress string
var reflectorServerPort string var proxyPort string
var reflectorServerProtocol string var proxyProtocol string
var useDB bool var useDB bool
func init() { func init() {
@ -37,13 +36,13 @@ func init() {
Short: "Run reflector server", Short: "Run reflector server",
Run: reflectorCmd, Run: reflectorCmd,
} }
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory") cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)")
cmd.Flags().StringVar(&reflectorServerAddress, "reflector-server-address", "", "address of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&reflectorServerPort, "reflector-server-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&reflectorServerProtocol, "reflector-server-protocol", "tcp", "protocol used to fetch blobs from another reflector server (tcp/udp)") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
cmd.Flags().IntVar(&peerPort, "peer-port", 5567, "The port reflector will distribute content from") cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from")
cmd.Flags().IntVar(&quicPeerPort, "quic-peer-port", 5568, "The port reflector will distribute content from over QUIC protocol") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
cmd.Flags().IntVar(&reflectorPort, "reflector-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics")
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
@ -54,18 +53,20 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339)) log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339))
var blobStore store.BlobStore var blobStore store.BlobStore
if reflectorServerAddress != "" { if proxyAddress != "" {
switch reflectorServerProtocol { switch proxyProtocol {
case "tcp": case "tcp":
blobStore = peer.NewStore(peer.StoreOpts{ blobStore = peer.NewStore(peer.StoreOpts{
Address: reflectorServerAddress + ":" + reflectorServerPort, Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
}) })
case "udp": case "http3":
blobStore = http3.NewStore(http3.StoreOpts{ blobStore = http3.NewStore(http3.StoreOpts{
Address: reflectorServerAddress + ":" + reflectorServerPort, Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
}) })
default:
log.Fatalf("specified protocol is not recognized: %s", proxyProtocol)
} }
} else { } else {
blobStore = store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) blobStore = store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
@ -87,7 +88,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
reflectorServer.Timeout = 3 * time.Minute reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = true reflectorServer.EnableBlocklist = true
err = reflectorServer.Start(":" + strconv.Itoa(reflectorPort)) err = reflectorServer.Start(":" + strconv.Itoa(receiverPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -102,13 +103,13 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
} }
peerServer := peer.NewServer(blobStore) peerServer := peer.NewServer(blobStore)
err = peerServer.Start(":" + strconv.Itoa(peerPort)) err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
http3PeerServer := http3.NewServer(blobStore) http3PeerServer := http3.NewServer(blobStore)
err = http3PeerServer.Start(":" + strconv.Itoa(quicPeerPort)) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -122,7 +123,6 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
metricsServer.Shutdown() metricsServer.Shutdown()
peerServer.Shutdown() peerServer.Shutdown()
http3PeerServer.Shutdown() http3PeerServer.Shutdown()
log.Infoln("done shutting down?")
if reflectorServer != nil { if reflectorServer != nil {
reflectorServer.Shutdown() reflectorServer.Shutdown()
} }

View file

@ -9,15 +9,12 @@ import (
"time" "time"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lucas-clemente/quic-go/http3"
log "github.com/sirupsen/logrus"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
)
// ErrBlobExists is a default error for when a blob already exists on the reflector server. "github.com/lucas-clemente/quic-go/http3"
var ErrBlobExists = errors.Base("blob exists on server") )
// Client is an instance of a client connected to a server. // Client is an instance of a client connected to a server.
type Client struct { type Client struct {
@ -91,7 +88,5 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
log.Infof("downloaded %s with HTTP3", hash)
return body.Bytes(), nil return body.Bytes(), nil
} }

View file

@ -10,11 +10,13 @@ import (
"math/big" "math/big"
"net/http" "net/http"
"github.com/gorilla/mux"
"github.com/lbryio/lbry.go/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/gorilla/mux"
"github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/http3" "github.com/lucas-clemente/quic-go/http3"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -23,8 +25,7 @@ import (
// Server is an instance of a peer server that houses the listener and store. // Server is an instance of a peer server that houses the listener and store.
type Server struct { type Server struct {
store store.BlobStore store store.BlobStore
grp *stop.Group
grp *stop.Group
} }
// NewServer returns an initialized Server pointer. // NewServer returns an initialized Server pointer.
@ -41,6 +42,7 @@ func (s *Server) Shutdown() {
s.grp.StopAndWait() s.grp.StopAndWait()
log.Debug("peer server stopped") log.Debug("peer server stopped")
} }
func (s *Server) logError(e error) { func (s *Server) logError(e error) {
if e == nil { if e == nil {
return return
@ -142,7 +144,7 @@ func generateTLSConfig() *tls.Config {
} }
return &tls.Config{ return &tls.Config{
Certificates: []tls.Certificate{tlsCert}, Certificates: []tls.Certificate{tlsCert},
NextProtos: []string{"quic-echo-example"}, NextProtos: []string{"http3-reflector-server"},
} }
} }

View file

@ -42,11 +42,11 @@ func (p *Store) getClient() (*Client, error) {
}, },
QuicConfig: &qconf, QuicConfig: &qconf,
} }
hclient := &http.Client{ connection := &http.Client{
Transport: roundTripper, Transport: roundTripper,
} }
c := &Client{ c := &Client{
conn: hclient, conn: connection,
roundTripper: roundTripper, roundTripper: roundTripper,
ServerAddr: p.opts.Address, ServerAddr: p.opts.Address,
} }
@ -75,15 +75,15 @@ func (p *Store) Get(hash string) (stream.Blob, error) {
// Put is not supported // Put is not supported
func (p *Store) Put(hash string, blob stream.Blob) error { func (p *Store) Put(hash string, blob stream.Blob) error {
panic("PeerStore cannot put or delete blobs") panic("http3Store cannot put or delete blobs")
} }
// PutSD is not supported // PutSD is not supported
func (p *Store) PutSD(hash string, blob stream.Blob) error { func (p *Store) PutSD(hash string, blob stream.Blob) error {
panic("PeerStore cannot put or delete blobs") panic("http3Store cannot put or delete blobs")
} }
// Delete is not supported // Delete is not supported
func (p *Store) Delete(hash string) error { func (p *Store) Delete(hash string) error {
panic("PeerStore cannot put or delete blobs") panic("http3Store cannot put or delete blobs")
} }

View file

@ -96,6 +96,9 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) {
defer node.Shutdown() defer node.Shutdown()
err := node.Connect([]string{ err := node.Connect([]string{
"spv25.lbry.com:50001", "spv25.lbry.com:50001",
"spv26.lbry.com:50001",
"spv19.lbry.com:50001",
"spv14.lbry.com:50001",
}, nil) }, nil)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)