From 914d2bfc61d7b42727e2da044323cbe63d78d5a3 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 30 Oct 2021 01:27:25 -0400 Subject: [PATCH] Rework federation to be synchronous, and add tests --- go.mod | 1 + internal/metrics/metrics.go | 8 + main.go | 29 ++- server/args.go | 21 +- server/federation.go | 405 +++++++++++++++++++++++------------- server/federation_test.go | 371 ++++++++++++++++++++++++++------- server/server.go | 114 +++++++--- 7 files changed, 675 insertions(+), 274 deletions(-) diff --git a/go.mod b/go.mod index a2cf2c5..6d23115 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/lbryio/lbry.go/v2 v2.7.2-0.20210625145058-2b155597bf57 github.com/olivere/elastic/v7 v7.0.24 github.com/prometheus/client_golang v1.11.0 + github.com/prometheus/client_model v0.2.0 golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect golang.org/x/text v0.3.6 google.golang.org/genproto v0.0.0-20210524171403-669157292da3 // indirect diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index c78f03e..b2f2256 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -22,5 +22,13 @@ var ( Help: "Histogram of query times", Buckets: HistogramBuckets, }, []string{"method"}) + PeersKnown = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "peers_known", + Help: "Number of peers we know about.", + }) + PeersSubscribed = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "peers_subbed", + Help: "Number of peers that are subscribed to us.", + }) ) diff --git a/main.go b/main.go index 242e726..f99ddc8 100644 --- a/main.go +++ b/main.go @@ -4,14 +4,12 @@ import ( "context" "fmt" "log" - "net" "time" pb "github.com/lbryio/hub/protobuf/go" "github.com/lbryio/hub/server" "github.com/lbryio/lbry.go/v2/extras/util" "google.golang.org/grpc" - "google.golang.org/grpc/reflection" ) func main() { @@ -26,20 +24,21 @@ func main() { ctxWCancel, cancel := context.WithCancel(ctx) defer cancel() - l, err := net.Listen("tcp", ":"+args.Port) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - s := server.MakeHubServer(ctxWCancel, args) - pb.RegisterHubServer(s.GrpcServer, s) - reflection.Register(s.GrpcServer) - - log.Printf("listening on %s\n", l.Addr().String()) - log.Println(s.Args) - if err := s.GrpcServer.Serve(l); err != nil { - log.Fatalf("failed to serve: %v", err) - } + s.Run() + //l, err := net.Listen("tcp", ":"+args.Port) + //if err != nil { + // log.Fatalf("failed to listen: %v", err) + //} + // + //pb.RegisterHubServer(s.GrpcServer, s) + //reflection.Register(s.GrpcServer) + // + //log.Printf("listening on %s\n", l.Addr().String()) + //log.Println(s.Args) + //if err := s.GrpcServer.Serve(l); err != nil { + // log.Fatalf("failed to serve: %v", err) + //} return } diff --git a/server/args.go b/server/args.go index 12cdea4..b0374ba 100644 --- a/server/args.go +++ b/server/args.go @@ -22,15 +22,15 @@ type Args struct { UDPPort string EsHost string EsPort string - DisableEs bool PrometheusPort string EsIndex string - Debug bool RefreshDelta int CacheTTL int PeerFile string Country string - StartPeerAdder bool + DisableEs bool + Debug bool + LoadPeers bool StartPrometheus bool StartUDP bool WritePeers bool @@ -48,7 +48,7 @@ const ( DefaultCacheTTL = 5 DefaultPeerFile = "peers.txt" DefaultCountry = "US" - DefaultStartPeerAdder = true + DefaultLoadPeers = true DefaultStartPrometheus = true DefaultStartUDP = true DefaultWritePeers = true @@ -83,8 +83,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { serveCmd := parser.NewCommand("serve", "start the hub server") searchCmd := parser.NewCommand("search", "claim search") - debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false}) - disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false}) host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort}) @@ -97,7 +95,10 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { cacheTTL := parser.Int("", "cachettl", &argparse.Options{Required: false, Help: "Cache TTL in minutes", Default: DefaultCacheTTL}) peerFile := parser.String("", "peerfile", &argparse.Options{Required: false, Help: "Initial peer file for federation", Default: DefaultPeerFile}) country := parser.String("", "country", &argparse.Options{Required: false, Help: "Country this node is running in. Default US.", Default: DefaultCountry}) - startPeerAdder := parser.Flag("", "start-peer-adder", &argparse.Options{Required: false, Help: "Start peer adder service", Default: DefaultStartPeerAdder}) + + debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false}) + disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false}) + loadPeers := parser.Flag("", "load-peers", &argparse.Options{Required: false, Help: "load peers from disk at startup", Default: DefaultLoadPeers}) startPrometheus := parser.Flag("", "start-prometheus", &argparse.Options{Required: false, Help: "Start prometheus server", Default: DefaultStartPrometheus}) startUdp := parser.Flag("", "start-udp", &argparse.Options{Required: false, Help: "Start UDP ping server", Default: DefaultStartUDP}) writePeers := parser.Flag("", "write-peers", &argparse.Options{Required: false, Help: "Write peer to disk as we learn about them", Default: DefaultWritePeers}) @@ -125,15 +126,15 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args { EsHost: *esHost, EsPort: *esPort, UDPPort: *udpPort, - DisableEs: *disableEs, PrometheusPort: *prometheusPort, EsIndex: *esIndex, - Debug: *debug, RefreshDelta: *refreshDelta, CacheTTL: *cacheTTL, PeerFile: *peerFile, Country: *country, - StartPeerAdder: *startPeerAdder, + DisableEs: *disableEs, + Debug: *debug, + LoadPeers: *loadPeers, StartPrometheus: *startPrometheus, StartUDP: *startUdp, WritePeers: *writePeers, diff --git a/server/federation.go b/server/federation.go index 2837284..290c820 100644 --- a/server/federation.go +++ b/server/federation.go @@ -6,19 +6,14 @@ import ( "log" "os" "strings" + "sync/atomic" "time" + "github.com/lbryio/hub/internal/metrics" pb "github.com/lbryio/hub/protobuf/go" "google.golang.org/grpc" ) -// peerAddMsg is an internal structure for use in the channel communicating -// to the peerAdder gorountine. -type peerAddMsg struct { - msg *pb.ServerMessage - ping bool -} - // FederatedServer hold relevant information about peers that we known about. type FederatedServer struct { Address string @@ -26,29 +21,56 @@ type FederatedServer struct { Ts time.Time } +var ( + localHosts = map[string]bool{ + "127.0.0.1": true, + "0.0.0.0": true, + "localhost": true, + } +) + + // peerKey takes a ServerMessage object and returns the key that for that peer // in our peer table. func peerKey(msg *pb.ServerMessage) string { return msg.Address + ":" + msg.Port } +func (s *Server) incNumPeers() { + atomic.AddInt64(s.NumPeerServers, 1) +} + +func (s *Server) decNumPeers() { + atomic.AddInt64(s.NumPeerServers, -1) +} + +func (s *Server) getNumPeers() int64 { + return *s.NumPeerServers +} + +func (s *Server) incNumSubs() { + atomic.AddInt64(s.NumPeerSubs, 1) +} + +func (s *Server) decNumSubs() { + atomic.AddInt64(s.NumPeerSubs, -1) +} + +func (s *Server) getNumSubs() int64 { + return *s.NumPeerSubs +} + // loadPeers takes the arguments given to the hub at startup and loads the // previously known peers from disk and verifies their existence before // storing them as known peers. Returns a map of peerKey -> object -func loadPeers(args *Args) map[string]*FederatedServer { - localHosts := map[string]bool { - "127.0.0.1": true, - "0.0.0.0": true, - "localhost": true, - } - servers := make(map[string]*FederatedServer) - peerFile := args.PeerFile - port := args.Port +func (s *Server) loadPeers() error { + peerFile := s.Args.PeerFile + port := s.Args.Port f, err := os.Open(peerFile) if err != nil { log.Println(err) - return map[string]*FederatedServer{} + return err } scanner := bufio.NewScanner(f) scanner.Split(bufio.ScanLines) @@ -68,25 +90,164 @@ func loadPeers(args *Args) map[string]*FederatedServer { continue } // If the peer is us, skip - log.Println(args) log.Println(ipPort) if ipPort[1] == port && localHosts[ipPort[0]] { log.Println("Self peer, skipping ...") continue } - server := &FederatedServer{ + srvMsg := &pb.ServerMessage{ Address: ipPort[0], Port: ipPort[1], - Ts: time.Now(), } - log.Println("pinging peer", server) - if helloPeer(server, args) { - servers[line] = server + log.Printf("pinging peer %+v\n", srvMsg) + err := s.addPeer(srvMsg, true) + if err != nil { + log.Println(err) } } log.Println("Returning from loadPeers") - return servers + return nil +} + + +// getFastestPeer determines the fastest peer in its list of peers by sending +// out udp pings and seeing who responds first. This is currently not +// implemented and just returns the first peer. +func (s *Server) getFastestPeer() *FederatedServer { + var fastestPeer *FederatedServer + + s.PeerServers.Range(func(_, v interface{}) bool { + fastestPeer = v.(*FederatedServer) + return false + }) + + return fastestPeer +} + +// subscribeToFastestPeer is a convenience function to find and subscribe to +// the fastest peer we know about. +func (s *Server) subscribeToFastestPeer() { + peer := s.getFastestPeer() + if peer != nil { + err := s.subscribeToPeer(peer) + if err != nil { + log.Println(err) + } + } else { + log.Println("No peers found, not subscribed to any.") + } +} + +// subscribeToPeer subscribes us to a peer to we'll get updates about their +// known peers. +func (s *Server) subscribeToPeer(peer *FederatedServer) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + conn, err := grpc.DialContext(ctx, + peer.Address+":"+peer.Port, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + if err != nil { + return err + } + defer conn.Close() + + msg := &pb.ServerMessage{ + Address: s.Args.Host, + Port: s.Args.Port, + } + + c := pb.NewHubClient(conn) + + log.Printf("%s:%s subscribing to %+v\n", s.Args.Host, s.Args.Port, peer) + _, err = c.PeerSubscribe(ctx, msg) + if err != nil { + return err + } + + s.Subscribed = true + + return nil +} + +// helloPeer takes a peer to say hello to and sends a hello message +// containing all the peers we know about and information about us. +// This is used to confirm existence of peers on start and let them +// know about us. Returns the response from the server on success, +// nil otherwise. +func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) { + log.Println("In helloPeer") + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + conn, err := grpc.DialContext(ctx, + server.Address+":"+server.Port, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + if err != nil { + log.Println(err) + return nil, err + } + defer conn.Close() + + c := pb.NewHubClient(conn) + + msg := &pb.HelloMessage{ + Port: s.Args.Port, + Host: s.Args.Host, + Servers: []*pb.ServerMessage{}, + } + + log.Printf("%s:%s saying hello to %+v\n", s.Args.Host, s.Args.Port, server) + res, err := c.Hello(ctx, msg) + if err != nil { + log.Println(err) + return nil, err + } + + log.Println(res) + + return res, nil +} + +// writePeers writes our current known peers to disk. +func (s *Server) writePeers() { + if !s.Args.WritePeers { + return + } + f, err := os.Create(s.Args.PeerFile) + if err != nil { + log.Println(err) + return + } + writer := bufio.NewWriter(f) + + s.PeerServers.Range(func(k, _ interface{}) bool { + key, ok := k.(string) + if !ok { + log.Println("Failed to cast key when writing peers: ", k) + return true + } + line := key + "\n" + _, err := writer.WriteString(line) + if err != nil { + log.Println(err) + } + return true + }) + + err = writer.Flush() + if err != nil { + log.Println(err) + } + err = f.Close() + if err != nil { + log.Println(err) + } } // notifyPeer takes a peer to notify and a new peer we just learned about @@ -120,115 +281,8 @@ func notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error { return nil } -// helloPeer takes a peer to say hello to and sends a hello message -// containing all the peers we know about and information about us. -// This is used to confirm existence of peers on start and let them -// know about us. Returns true is call was successful, false otherwise. -func helloPeer(server *FederatedServer, args *Args) bool { - log.Println("In helloPeer") - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - conn, err := grpc.DialContext(ctx, - server.Address+":"+server.Port, - grpc.WithInsecure(), - grpc.WithBlock(), - ) - if err != nil { - log.Println(err) - return false - } - defer conn.Close() - - - c := pb.NewHubClient(conn) - - msg := &pb.HelloMessage{ - Port: args.Port, - Host: args.Host, - Servers: []*pb.ServerMessage{}, - } - res, err := c.Hello(ctx, msg) - if err != nil { - log.Println(err) - return false - } - - log.Println(res) - - return true -} - -// writePeers writes our current known peers to disk. -func (s *Server) writePeers() { - if !s.Args.WritePeers { - return - } - failedCreat := "WARNING: Peer writer failed to create peer file, it's still running but may not be working!" - failedWrite := "WARNING: Peer writer failed to write a line, it's still running but may not be working!" - failedFlush := "WARNING: Peer writer failed to flush, it's still running but may not be working!" - failedClose := "WARNING: Peer writer failed to close the peer file, it's still running but may not be working!" - f, err := os.Create(s.Args.PeerFile) - if err != nil { - log.Println(failedCreat) - log.Println(err) - } - writer := bufio.NewWriter(f) - - for _, peer := range s.Servers { - line := peer.Address + ":" + peer.Port + "\n" - _, err := writer.WriteString(line) - if err != nil { - log.Println(failedWrite) - log.Println(err) - } - } - - err = writer.Flush() - if err != nil { - log.Println(failedFlush) - log.Println(err) - } - err = f.Close() - if err != nil { - log.Println(failedClose) - log.Println(err) - } -} - -// peerAdder is a goroutine which listens for new peers added and then -// optionally checks if they're online and adds them to our map of -// peers in a thread safe manner. -func (s *Server) peerAdder(ctx context.Context) { - for { - select { - case chanMsg := <-s.peerChannel: - msg := chanMsg.msg - ping := chanMsg.ping - - k := msg.Address + ":" + msg.Port - if _, ok := s.Servers[k]; !ok { - newServer := &FederatedServer{ - Address: msg.Address, - Port: msg.Port, - Ts: time.Now(), - } - if !ping || helloPeer(newServer, s.Args) { - s.Servers[k] = newServer - s.writePeers() - s.notifyPeerSubs(newServer) - } - } else { - s.Servers[k].Ts = time.Now() - } - case <-ctx.Done(): - log.Println("context finished, peerAdder shutting down.") - return - } - - } -} - +// notifyPeerSubs takes a new peer server we just learned about and notifies +// all the peers that have subscribed to us about it. func (s *Server) notifyPeerSubs(newServer *FederatedServer) { var unsubscribe []string s.PeerSubs.Range(func(k, v interface{}) bool { @@ -254,37 +308,96 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) { }) for _, key := range unsubscribe { + s.decNumSubs() + metrics.PeersSubscribed.Dec() s.PeerSubs.Delete(key) } } -// addPeer is an internal function to add a peer to this hub. -func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) { - s.peerChannel <- &peerAddMsg{msg, ping} +// addPeer takes a new peer as a pb.ServerMessage, optionally checks to see +// if they're online, and adds them to our list of peer. If we're not currently +// subscribed to a peer, it will also subscribe to it. +func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error { + if s.Args.Port == msg.Port && + (localHosts[msg.Address] || msg.Address == s.Args.Host) { + log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.Args.Host, s.Args.Port) + return nil + } + k := peerKey(msg) + newServer := &FederatedServer{ + Address: msg.Address, + Port: msg.Port, + Ts: time.Now(), + } + log.Printf("%s:%s adding peer %+v\n", s.Args.Host, s.Args.Port, msg) + if oldServer, loaded := s.PeerServers.LoadOrStore(k, newServer); !loaded { + if ping { + _, err := s.helloPeer(newServer) + if err != nil { + s.PeerServers.Delete(k) + return err + } + } + + s.incNumPeers() + metrics.PeersKnown.Inc() + s.writePeers() + s.notifyPeerSubs(newServer) + + // If aren't subscribed to a server yet, subscribe to + // this one. + if !s.Subscribed { + err := s.subscribeToPeer(newServer) + if err != nil { + s.PeerServers.Delete(k) + return err + } else { + s.Subscribed = true + } + } + } else { + oldServerCast, ok := oldServer.(*FederatedServer) + // This shouldn't happen, but if it does, I guess delete the key + // and try adding this one since it's new. + if !ok { + log.Println("Error casting map value: ", oldServer) + s.PeerServers.Delete(k) + return s.addPeer(msg, ping) + } + oldServerCast.Ts = time.Now() + } + return nil } // mergeFederatedServers is an internal convenience function to add a list of // peers. func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) { for _, srvMsg := range servers { - s.peerChannel <- &peerAddMsg{srvMsg, false} + err := s.addPeer(srvMsg, false) + // This shouldn't happen because we're not pinging them. + if err != nil { + log.Println(err) + } } } // makeHelloMessage makes a message for this hub to call the Hello endpoint // on another hub. func (s *Server) makeHelloMessage() *pb.HelloMessage { - n := len(s.Servers) - servers := make([]*pb.ServerMessage, n) + servers := make([]*pb.ServerMessage, 0, 10) - var i = 0 - for _, v := range s.Servers { - servers[i] = &pb.ServerMessage{ - Address: v.Address, - Port: v.Port, + s.PeerServers.Range(func(_, v interface{}) bool { + peer, ok := v.(*FederatedServer) + if !ok { + log.Println("Failed to cast value in makeHelloMessage", v) + return true } - i += 1 - } + servers = append(servers, &pb.ServerMessage{ + Address: peer.Address, + Port: peer.Port, + }) + return true + }) return &pb.HelloMessage{ Port: s.Args.Port, diff --git a/server/federation_test.go b/server/federation_test.go index ac48639..02ac4d2 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -8,9 +8,11 @@ import ( "os" "strings" "testing" - "time" + "github.com/lbryio/hub/internal/metrics" pb "github.com/lbryio/hub/protobuf/go" + dto "github.com/prometheus/client_model/go" + "google.golang.org/grpc" ) // lineCountFile takes a fileName and counts the number of lines in it. @@ -40,9 +42,7 @@ func removeFile(fileName string) { } } -// TestPeerAdder tests the peer adder goroutine. -func TestPeerAdder(t *testing.T) { - ctx := context.Background() +func makeDefaultArgs() *Args { args := &Args{ CmdType: ServeCmd, Host: DefaultHost, @@ -50,20 +50,28 @@ func TestPeerAdder(t *testing.T) { EsHost: DefaultEsHost, EsPort: DefaultEsPort, UDPPort: DefaultUdpPort, - DisableEs: true, PrometheusPort: DefaultPrometheusPort, EsIndex: DefaultEsIndex, - Debug: true, RefreshDelta: DefaultRefreshDelta, CacheTTL: DefaultCacheTTL, PeerFile: DefaultPeerFile, Country: DefaultCountry, - StartPeerAdder: false, + DisableEs: true, + Debug: true, + LoadPeers: false, StartPrometheus: false, StartUDP: false, WritePeers: false, } + return args +} + +// TestAddPeer tests the ability to add peers +func TestAddPeer(t *testing.T) { + ctx := context.Background() + args := makeDefaultArgs() + tests := []struct { name string want int @@ -81,67 +89,47 @@ func TestPeerAdder(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T){ server := MakeHubServer(ctx, args) - ctxWCancel, cancel := context.WithCancel(ctx) + server.Subscribed = true + metrics.PeersKnown.Set(0) - go server.peerAdder(ctxWCancel) for i := 0; i < 10; i++ { - var msg *peerAddMsg + var msg *pb.ServerMessage if strings.Contains(tt.name, "1 unique") { - msg = &peerAddMsg{ - msg: &pb.ServerMessage{ - Address: "1.1.1.1", - Port: "50051", - }, - ping: false, + msg = &pb.ServerMessage{ + Address: "1.1.1.1", + Port: "50051", } } else { - msg = &peerAddMsg{ - msg: &pb.ServerMessage{ - Address: fmt.Sprintf("%d.%d.%d.%d", i, i, i, i), - Port: "50051", - }, - ping: false, + x := i + 1 + msg = &pb.ServerMessage{ + Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x), + Port: "50051", } } - server.peerChannel <- msg + //log.Printf("Adding peer %+v\n", msg) + err := server.addPeer(msg, false) + if err != nil { + log.Println(err) + } } - // Have to give it a second to update peers since it's in - // another thread. - time.Sleep(time.Second) - got := len(server.Servers) + var m = &dto.Metric{} + if err := metrics.PeersKnown.Write(m); err != nil { + t.Errorf("Error getting metrics %+v\n", err) + } + got := int(*m.Gauge.Value) if got != tt.want { - t.Errorf("len(server.Servers) = %d, want %d", got, tt.want) + t.Errorf("len(server.PeerServers) = %d, want %d\n", got, tt.want) } - cancel() }) } } -// TestPeerWriter tests that the peerAdder goroutine writes the peer file -// properly when set to do so. +// TestPeerWriter tests that peers get written properly func TestPeerWriter(t *testing.T) { ctx := context.Background() - args := &Args{ - CmdType: ServeCmd, - Host: DefaultHost, - Port: DefaultPort, - EsHost: DefaultEsHost, - EsPort: DefaultEsPort, - UDPPort: DefaultUdpPort, - DisableEs: true, - PrometheusPort: DefaultPrometheusPort, - EsIndex: DefaultEsIndex, - Debug: true, - RefreshDelta: DefaultRefreshDelta, - CacheTTL: DefaultCacheTTL, - PeerFile: DefaultPeerFile, - Country: DefaultCountry, - StartPeerAdder: false, - StartPrometheus: false, - StartUDP: false, - WritePeers: true, - } + args := makeDefaultArgs() + args.WritePeers = true tests := []struct { name string @@ -160,40 +148,279 @@ func TestPeerWriter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T){ server := MakeHubServer(ctx, args) - ctxWCancel, cancel := context.WithCancel(ctx) + server.Subscribed = true - go server.peerAdder(ctxWCancel) for i := 0; i < 10; i++ { - var msg *peerAddMsg + var msg *pb.ServerMessage if strings.Contains(tt.name, "1 unique") { - msg = &peerAddMsg{ - msg: &pb.ServerMessage{ - Address: "1.1.1.1", - Port: "50051", - }, - ping: false, + msg = &pb.ServerMessage{ + Address: "1.1.1.1", + Port: "50051", } } else { - msg = &peerAddMsg{ - msg: &pb.ServerMessage{ - Address: fmt.Sprintf("%d.%d.%d.%d", i, i, i, i), - Port: "50051", - }, - ping: false, + msg = &pb.ServerMessage{ + Address: fmt.Sprintf("%d.%d.%d.%d", i, i, i, i), + Port: "50051", } } - server.peerChannel <- msg + //log.Printf("Adding peer %+v\n", msg) + err := server.addPeer(msg, false) + if err != nil { + log.Println(err) + } } - // Have to give it a second to update peers since it's in - // another thread. - time.Sleep(time.Second * 1) + //log.Println("Counting lines...") got := lineCountFile(server.Args.PeerFile) if got != tt.want { - t.Errorf("len(server.Servers) = %d, want %d", got, tt.want) + t.Errorf("lineCountFile(peers.txt) = %d, want %d", got, tt.want) } - cancel() }) } removeFile(args.PeerFile) } + +// TestAddPeerEndpoint tests the ability to add peers +func TestAddPeerEndpoint(t *testing.T) { + ctx := context.Background() + args := makeDefaultArgs() + args2 := makeDefaultArgs() + args2.Port = "50052" + + + tests := []struct { + name string + wantServerOne int64 + wantServerTwo int64 + } { + { + // outside -> server1.AddPeer(server2, ping=true) : server1 = 1, server2 = 0 + // server1 -> server2.Hello(server1) : server1 = 1, server2 = 0 + // server2 -> server2.addPeer(server1, ping=false) : server1 = 1, server2 = 1 + // server2 -> server1.PeerSubscribe(server2) : server1 = 1, server2 = 1 + // server1 <- server2.makeHelloMessage() : server1 = 1, server2 = 1 + // server1.notifyPeer() : server1 = 1, server2 = 1 + // server1 -> server2.AddPeer(server2) : server1 = 1, server2 = 1 + // server2 self peer, skipping : server1 = 1, server2 = 1 + // server1 -> server2.PeerSubscribe(server1) : server1 = 1, server2 = 1 + name: "Add 1 peer", + wantServerOne: 1, + wantServerTwo: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T){ + server := MakeHubServer(ctx, args) + server2 := MakeHubServer(ctx, args2) + metrics.PeersKnown.Set(0) + go server.Run() + go server2.Run() + //go server.Run() + conn, err := grpc.Dial("localhost:"+args.Port, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + + c := pb.NewHubClient(conn) + + msg := &pb.ServerMessage{ + Address: "0.0.0.0", + Port: "50052", + } + + _, err = c.AddPeer(context.Background(), msg) + if err != nil { + log.Println(err) + } + + server.GrpcServer.GracefulStop() + server2.GrpcServer.GracefulStop() + got1 := server.getNumPeers() + got2 := server2.getNumPeers() + if got1 != tt.wantServerOne { + t.Errorf("len(server.PeerServers) = %d, want %d\n", got1, tt.wantServerOne) + } + if got2 != tt.wantServerTwo { + t.Errorf("len(server2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo) + } + }) + } + +} + +// TestAddPeerEndpoint2 tests the ability to add peers +func TestAddPeerEndpoint2(t *testing.T) { + ctx := context.Background() + args := makeDefaultArgs() + args2 := makeDefaultArgs() + args3 := makeDefaultArgs() + args2.Port = "50052" + args3.Port = "50053" + + + tests := []struct { + name string + wantServerOne int64 + wantServerTwo int64 + wantServerThree int64 + } { + { + name: "Add 2 peers", + wantServerOne: 2, + wantServerTwo: 2, + wantServerThree: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T){ + server := MakeHubServer(ctx, args) + server2 := MakeHubServer(ctx, args2) + server3 := MakeHubServer(ctx, args3) + metrics.PeersKnown.Set(0) + go server.Run() + go server2.Run() + go server3.Run() + conn, err := grpc.Dial("localhost:"+args.Port, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + + c := pb.NewHubClient(conn) + + msg := &pb.ServerMessage{ + Address: "0.0.0.0", + Port: "50052", + } + + msg2 := &pb.ServerMessage{ + Address: "0.0.0.0", + Port: "50053", + } + + _, err = c.AddPeer(context.Background(), msg) + if err != nil { + log.Println(err) + } + _, err = c.AddPeer(context.Background(), msg2) + if err != nil { + log.Println(err) + } + + server.GrpcServer.GracefulStop() + server2.GrpcServer.GracefulStop() + server3.GrpcServer.GracefulStop() + got1 := server.getNumPeers() + got2 := server2.getNumPeers() + got3 := server3.getNumPeers() + if got1 != tt.wantServerOne { + t.Errorf("len(server.PeerServers) = %d, want %d\n", got1, tt.wantServerOne) + } + if got2 != tt.wantServerTwo { + t.Errorf("len(server2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo) + } + if got3 != tt.wantServerThree { + t.Errorf("len(server3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree) + } + }) + } + +} + + +// TestAddPeerEndpoint3 tests the ability to add peers +func TestAddPeerEndpoint3(t *testing.T) { + ctx := context.Background() + args := makeDefaultArgs() + args2 := makeDefaultArgs() + args3 := makeDefaultArgs() + args2.Port = "50052" + args3.Port = "50053" + + + tests := []struct { + name string + wantServerOne int64 + wantServerTwo int64 + wantServerThree int64 + } { + { + name: "Add 1 peer to each", + wantServerOne: 2, + wantServerTwo: 2, + wantServerThree: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T){ + server := MakeHubServer(ctx, args) + server2 := MakeHubServer(ctx, args2) + server3 := MakeHubServer(ctx, args3) + metrics.PeersKnown.Set(0) + go server.Run() + go server2.Run() + go server3.Run() + conn, err := grpc.Dial("localhost:"+args.Port, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + conn2, err := grpc.Dial("localhost:50052", + grpc.WithInsecure(), + grpc.WithBlock(), + ) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + + c := pb.NewHubClient(conn) + c2 := pb.NewHubClient(conn2) + + msg := &pb.ServerMessage{ + Address: "0.0.0.0", + Port: "50052", + } + + msg2 := &pb.ServerMessage{ + Address: "0.0.0.0", + Port: "50053", + } + + _, err = c.AddPeer(context.Background(), msg) + if err != nil { + log.Println(err) + } + _, err = c2.AddPeer(context.Background(), msg2) + if err != nil { + log.Println(err) + } + + server.GrpcServer.GracefulStop() + server2.GrpcServer.GracefulStop() + server3.GrpcServer.GracefulStop() + got1 := server.getNumPeers() + got2 := server2.getNumPeers() + got3 := server3.getNumPeers() + if got1 != tt.wantServerOne { + t.Errorf("len(server.PeerServers) = %d, want %d\n", got1, tt.wantServerOne) + } + if got2 != tt.wantServerTwo { + t.Errorf("len(server2.PeerServers) = %d, want %d\n", got2, tt.wantServerTwo) + } + if got3 != tt.wantServerThree { + t.Errorf("len(server3.PeerServers) = %d, want %d\n", got3, tt.wantServerThree) + } + }) + } + +} diff --git a/server/server.go b/server/server.go index efc8a17..01de7d9 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "fmt" "hash" "log" + "net" "net/http" "os" "regexp" @@ -20,22 +21,25 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) type Server struct { - GrpcServer *grpc.Server - Args *Args - MultiSpaceRe *regexp.Regexp - WeirdCharsRe *regexp.Regexp - EsClient *elastic.Client - Servers map[string]*FederatedServer - QueryCache *ttlcache.Cache - S256 *hash.Hash + GrpcServer *grpc.Server + Args *Args + MultiSpaceRe *regexp.Regexp + WeirdCharsRe *regexp.Regexp + EsClient *elastic.Client + QueryCache *ttlcache.Cache + S256 *hash.Hash LastRefreshCheck time.Time RefreshDelta time.Duration NumESRefreshes int64 - PeerSubs sync.Map - peerChannel chan *peerAddMsg + PeerServers sync.Map //map[string]*FederatedServer + NumPeerServers *int64 + PeerSubs sync.Map + NumPeerSubs *int64 + Subscribed bool pb.UnimplementedHubServer } @@ -83,14 +87,28 @@ func getVersion() string { 'blockchain.address.unsubscribe' */ +func (s *Server) Run() { + l, err := net.Listen("tcp", ":"+s.Args.Port) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + pb.RegisterHubServer(s.GrpcServer, s) + reflection.Register(s.GrpcServer) + + log.Printf("listening on %s\n", l.Addr().String()) + log.Println(s.Args) + if err := s.GrpcServer.Serve(l); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} + // MakeHubServer takes the arguments given to a hub when it's started and // initializes everything. It loads information about previously known peers, // creates needed internal data structures, and initializes goroutines. func MakeHubServer(ctx context.Context, args *Args) *Server { grpcServer := grpc.NewServer(grpc.NumStreamWorkers(10)) - peerChannel := make(chan *peerAddMsg) - multiSpaceRe, err := regexp.Compile(`\s{2,}`) if err != nil { log.Fatal(err) @@ -101,8 +119,6 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { log.Fatal(err) } - servers := loadPeers(args) - var client *elastic.Client if !args.DisableEs { esUrl := args.EsHost + ":" + args.EsPort @@ -134,26 +150,30 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { refreshDelta = time.Second * 0 } + numPeers := new(int64) + *numPeers = 0 + numSubs := new(int64) + *numSubs = 0 + s := &Server{ GrpcServer: grpcServer, - Args: args, - MultiSpaceRe: multiSpaceRe, - WeirdCharsRe: weirdCharsRe, - EsClient: client, - QueryCache: cache, - S256: &s256, + Args: args, + MultiSpaceRe: multiSpaceRe, + WeirdCharsRe: weirdCharsRe, + EsClient: client, + QueryCache: cache, + S256: &s256, LastRefreshCheck: time.Now(), - RefreshDelta: refreshDelta, + RefreshDelta: refreshDelta, NumESRefreshes: 0, - Servers: servers, + PeerServers: sync.Map{}, + NumPeerServers: numPeers, PeerSubs: sync.Map{}, - peerChannel: peerChannel, + NumPeerSubs: numSubs, + Subscribed: false, } // Start up our background services - if args.StartPeerAdder { - go s.peerAdder(ctx) - } if args.StartPrometheus { go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics") } @@ -165,6 +185,20 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { } }() } + // Load peers from disk and subscribe to one if there are any + if args.LoadPeers { + // We Subscribed to true, so we don't try subscribing to peers as we + // add them, we'll find the best one after + s.Subscribed = true + err = s.loadPeers() + if err != nil { + log.Println(err) + } + // subscribe to the fastest peer we know (if there are any) for updates + // about their peers. + s.Subscribed = false + s.subscribeToFastestPeer() + } return s } @@ -182,6 +216,7 @@ func (s *Server) prometheusEndpoint(port string, endpoint string) { // The passed message includes information about the other hub, and all // of its peers which are added to the knowledge of this hub. func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMessage, error) { + metrics.RequestsCount.With(prometheus.Labels{"method": "hello"}).Inc() port := args.Port host := args.Host server := &FederatedServer{ @@ -191,7 +226,11 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes } log.Println(server) - s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false) + err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false) + // They just contacted us, so this shouldn't happen + if err != nil { + log.Println(err) + } s.mergeFederatedServers(args.Servers) s.writePeers() s.notifyPeerSubs(server) @@ -202,21 +241,34 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes // PeerSubscribe adds a peer hub to the list of subscribers to update about // new peers. func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.StringValue, error) { + metrics.RequestsCount.With(prometheus.Labels{"method": "peer_subscribe"}).Inc() + var msg = "Success" peer := &FederatedServer{ Address: in.Address, Port: in.Port, Ts: time.Now(), } - s.PeerSubs.Store(peerKey(in), peer) + if _, loaded := s.PeerSubs.LoadOrStore(peerKey(in), peer); !loaded { + s.incNumSubs() + metrics.PeersSubscribed.Inc() + } else { + msg = "Already subscribed" + } - return &pb.StringValue{Value: "Success"}, nil + return &pb.StringValue{Value: msg}, nil } // AddPeer is a grpc endpoint to tell this hub about another hub in the network. func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) { - s.addPeer(args, true) - return &pb.StringValue{Value: "Success!"}, nil + metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc() + var msg = "Success" + err := s.addPeer(args, true) + if err != nil { + log.Println(err) + msg = "Failed" + } + return &pb.StringValue{Value: msg}, err } // Ping is a grpc endpoint that returns a short message.