From 09c7718f30340bcb0aae4ea706f1e5fd6141262f Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 30 Jun 2020 01:14:52 +0200 Subject: [PATCH] refactor code --- cmd/reflector.go | 50 +++++++++++++++++++++--------------------- peer/http3/client.go | 9 ++------ peer/http3/server.go | 14 +++++++----- peer/http3/store.go | 10 ++++----- reflector/blocklist.go | 3 +++ 5 files changed, 43 insertions(+), 43 deletions(-) diff --git a/cmd/reflector.go b/cmd/reflector.go index ebdbe5b..5c900a3 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -7,12 +7,11 @@ import ( "syscall" "time" - "github.com/lbryio/reflector.go/peer" - "github.com/lbryio/reflector.go/peer/http3" - "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" + "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/peer/http3" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" @@ -21,14 +20,14 @@ import ( ) var reflectorCmdCacheDir string -var peerPort int -var quicPeerPort int -var reflectorPort int +var tcpPeerPort int +var http3PeerPort int +var receiverPort int var metricsPort int var disableUploads bool -var reflectorServerAddress string -var reflectorServerPort string -var reflectorServerProtocol string +var proxyAddress string +var proxyPort string +var proxyProtocol string var useDB bool func init() { @@ -37,13 +36,13 @@ func init() { Short: "Run reflector server", Run: reflectorCmd, } - cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory") - cmd.Flags().StringVar(&reflectorServerAddress, "reflector-server-address", "", "address of another reflector server where blobs are fetched from") - cmd.Flags().StringVar(&reflectorServerPort, "reflector-server-port", "5567", "port of another reflector server where blobs are fetched from") - cmd.Flags().StringVar(&reflectorServerProtocol, "reflector-server-protocol", "tcp", "protocol used to fetch blobs from another reflector server (tcp/udp)") - cmd.Flags().IntVar(&peerPort, "peer-port", 5567, "The port reflector will distribute content from") - cmd.Flags().IntVar(&quicPeerPort, "quic-peer-port", 5568, "The port reflector will distribute content from over QUIC protocol") - cmd.Flags().IntVar(&reflectorPort, "reflector-port", 5566, "The port reflector will receive content from") + cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") + cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from") + cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") + cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") + cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from") + cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") + cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") @@ -54,18 +53,20 @@ func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339)) var blobStore store.BlobStore - if reflectorServerAddress != "" { - switch reflectorServerProtocol { + if proxyAddress != "" { + switch proxyProtocol { case "tcp": blobStore = peer.NewStore(peer.StoreOpts{ - Address: reflectorServerAddress + ":" + reflectorServerPort, + Address: proxyAddress + ":" + proxyPort, Timeout: 30 * time.Second, }) - case "udp": + case "http3": blobStore = http3.NewStore(http3.StoreOpts{ - Address: reflectorServerAddress + ":" + reflectorServerPort, + Address: proxyAddress + ":" + proxyPort, Timeout: 30 * time.Second, }) + default: + log.Fatalf("specified protocol is not recognized: %s", proxyProtocol) } } else { blobStore = store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) @@ -87,7 +88,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { reflectorServer.Timeout = 3 * time.Minute reflectorServer.EnableBlocklist = true - err = reflectorServer.Start(":" + strconv.Itoa(reflectorPort)) + err = reflectorServer.Start(":" + strconv.Itoa(receiverPort)) if err != nil { log.Fatal(err) } @@ -102,13 +103,13 @@ func reflectorCmd(cmd *cobra.Command, args []string) { } peerServer := peer.NewServer(blobStore) - err = peerServer.Start(":" + strconv.Itoa(peerPort)) + err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) if err != nil { log.Fatal(err) } http3PeerServer := http3.NewServer(blobStore) - err = http3PeerServer.Start(":" + strconv.Itoa(quicPeerPort)) + err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) if err != nil { log.Fatal(err) } @@ -122,7 +123,6 @@ func reflectorCmd(cmd *cobra.Command, args []string) { metricsServer.Shutdown() peerServer.Shutdown() http3PeerServer.Shutdown() - log.Infoln("done shutting down?") if reflectorServer != nil { reflectorServer.Shutdown() } diff --git a/peer/http3/client.go b/peer/http3/client.go index ee3b97d..ca3d556 100644 --- a/peer/http3/client.go +++ b/peer/http3/client.go @@ -9,15 +9,12 @@ import ( "time" "github.com/lbryio/reflector.go/store" - "github.com/lucas-clemente/quic-go/http3" - log "github.com/sirupsen/logrus" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" -) -// ErrBlobExists is a default error for when a blob already exists on the reflector server. -var ErrBlobExists = errors.Base("blob exists on server") + "github.com/lucas-clemente/quic-go/http3" +) // Client is an instance of a client connected to a server. type Client struct { @@ -91,7 +88,5 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { if err != nil { return nil, errors.Err(err) } - log.Infof("downloaded %s with HTTP3", hash) - return body.Bytes(), nil } diff --git a/peer/http3/server.go b/peer/http3/server.go index a497b63..c04026e 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -10,11 +10,13 @@ import ( "math/big" "net/http" - "github.com/gorilla/mux" - "github.com/lbryio/lbry.go/extras/stop" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/extras/stop" + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/gorilla/mux" "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/http3" log "github.com/sirupsen/logrus" @@ -23,8 +25,7 @@ import ( // Server is an instance of a peer server that houses the listener and store. type Server struct { store store.BlobStore - - grp *stop.Group + grp *stop.Group } // NewServer returns an initialized Server pointer. @@ -41,6 +42,7 @@ func (s *Server) Shutdown() { s.grp.StopAndWait() log.Debug("peer server stopped") } + func (s *Server) logError(e error) { if e == nil { return @@ -142,7 +144,7 @@ func generateTLSConfig() *tls.Config { } return &tls.Config{ Certificates: []tls.Certificate{tlsCert}, - NextProtos: []string{"quic-echo-example"}, + NextProtos: []string{"http3-reflector-server"}, } } diff --git a/peer/http3/store.go b/peer/http3/store.go index 3de9a8a..7d5e433 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -42,11 +42,11 @@ func (p *Store) getClient() (*Client, error) { }, QuicConfig: &qconf, } - hclient := &http.Client{ + connection := &http.Client{ Transport: roundTripper, } c := &Client{ - conn: hclient, + conn: connection, roundTripper: roundTripper, ServerAddr: p.opts.Address, } @@ -75,15 +75,15 @@ func (p *Store) Get(hash string) (stream.Blob, error) { // Put is not supported func (p *Store) Put(hash string, blob stream.Blob) error { - panic("PeerStore cannot put or delete blobs") + panic("http3Store cannot put or delete blobs") } // PutSD is not supported func (p *Store) PutSD(hash string, blob stream.Blob) error { - panic("PeerStore cannot put or delete blobs") + panic("http3Store cannot put or delete blobs") } // Delete is not supported func (p *Store) Delete(hash string) error { - panic("PeerStore cannot put or delete blobs") + panic("http3Store cannot put or delete blobs") } diff --git a/reflector/blocklist.go b/reflector/blocklist.go index 7e3adf5..ed2ba9c 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -96,6 +96,9 @@ func sdHashesForOutpoints(outpoints []string) (map[string]valOrErr, error) { defer node.Shutdown() err := node.Connect([]string{ "spv25.lbry.com:50001", + "spv26.lbry.com:50001", + "spv19.lbry.com:50001", + "spv14.lbry.com:50001", }, nil) if err != nil { return nil, errors.Err(err)