From 2e52c1639c25c8d42bd7d3d130773b00103b0174 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Wed, 1 Dec 2021 19:32:23 -0500 Subject: [PATCH] Refactor and fixes related to PR comments. --- server/federation.go | 88 +++++++++++++++++++-------------------- server/federation_test.go | 22 +++++----- server/server.go | 31 ++++++++------ server/udp.go | 6 +-- server/udp_test.go | 2 +- 5 files changed, 74 insertions(+), 75 deletions(-) diff --git a/server/federation.go b/server/federation.go index 304aff6..6348764 100644 --- a/server/federation.go +++ b/server/federation.go @@ -16,8 +16,8 @@ import ( "google.golang.org/grpc" ) -// FederatedServer hold relevant information about peers that we known about. -type FederatedServer struct { +// Peer holds relevant information about peers that we know about. +type Peer struct { Address string Port string Ts time.Time @@ -28,19 +28,19 @@ var ( "127.0.0.1": true, "0.0.0.0": true, "localhost": true, - "": true, + "": true, // Empty net.IP turned into a string } ) -// peerKey takes a ServerMessage object and returns the key that for that peer +// peerKey takes a peer and returns the key that for that peer // in our peer table. -func peerKey(msg *pb.ServerMessage) string { - return msg.Address + ":" + msg.Port +func peerKey(peer *Peer) string { + return peer.Address + ":" + peer.Port } // peerKey is a function on a FederatedServer struct to return the key for that // peer is out peer table. -func (peer *FederatedServer) peerKey() string { +func (peer *Peer) peerKey() string { return peer.Address + ":" + peer.Port } @@ -68,11 +68,10 @@ func (s *Server) getNumSubs() int64 { return *s.NumPeerSubs } -// getAndSetExternalIp takes the address of a peer running a UDP server and +// getAndSetExternalIp takes the ip and port of a peer running a UDP server and // pings it, so we can determine our own external IP address. -func (s *Server) getAndSetExternalIp(msg *pb.ServerMessage) error { - log.Println(msg) - pong, err := UDPPing(msg.Address, msg.Port) +func (s *Server) getAndSetExternalIp(ip, port string) error { + pong, err := UDPPing(ip, port) if err != nil { return err } @@ -146,12 +145,13 @@ retry: continue } - srvMsg := &pb.ServerMessage{ + newPeer := &Peer{ Address: ipPort[0], Port: ipPort[1], + Ts: time.Now(), } - log.Printf("pinging peer %+v\n", srvMsg) - err = s.addPeer(srvMsg, true, true) + log.Printf("pinging peer %+v\n", newPeer) + err = s.addPeer(newPeer, true, true) if err != nil { log.Println(err) } @@ -164,7 +164,7 @@ retry: // subscribeToPeer subscribes us to a peer to we'll get updates about their // known peers. -func (s *Server) subscribeToPeer(peer *FederatedServer) error { +func (s *Server) subscribeToPeer(peer *Peer) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -199,13 +199,13 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error { // This is used to confirm existence of peers on start and let them // know about us. Returns the response from the server on success, // nil otherwise. -func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) { +func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) { log.Println("In helloPeer") ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() conn, err := grpc.DialContext(ctx, - server.Address+":"+server.Port, + peer.Address+":"+peer.Port, grpc.WithInsecure(), grpc.WithBlock(), ) @@ -223,7 +223,7 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) { Servers: []*pb.ServerMessage{}, } - log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, server) + log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer) res, err := c.Hello(ctx, msg) if err != nil { log.Println(err) @@ -266,8 +266,8 @@ func (s *Server) writePeers() { } // notifyPeer takes a peer to notify and a new peer we just learned about -// and calls AddPeer on the first. -func (s *Server) notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error { +// and informs the already known peer about the new peer. +func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error { if s.Args.DisableFederation { return nil } @@ -301,12 +301,12 @@ func (s *Server) notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedSer // notifyPeerSubs takes a new peer server we just learned about and notifies // all the peers that have subscribed to us about it. -func (s *Server) notifyPeerSubs(newServer *FederatedServer) { +func (s *Server) notifyPeerSubs(newPeer *Peer) { var unsubscribe []string s.PeerSubsMut.RLock() for key, peer := range s.PeerSubs { - log.Printf("Notifying peer %s of new node %+v\n", key, newServer) - err := s.notifyPeer(peer, newServer) + log.Printf("Notifying peer %s of new node %+v\n", key, newPeer) + err := s.notifyPeer(peer, newPeer) if err != nil { log.Println("Failed to send data to ", key) log.Println(err) @@ -326,43 +326,36 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) { s.PeerSubsMut.Unlock() } -// addPeer takes a new peer as a pb.ServerMessage, optionally checks to see -// if they're online, and adds them to our list of peer. If we're not currently -// subscribed to a peer, it will also subscribe to it. -func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error { +// addPeer takes a new peer, optionally checks to see if they're online, and +// adds them to our list of peers. It will also optionally subscribe to it. +func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error { if s.Args.DisableFederation { return nil } // First thing we get our external ip if we don't have it, otherwise we // could end up subscribed to our self, which is silly. nilIP := net.IP{} - //localIP0 := net.IPv4(0,0,0,0) localIP1 := net.IPv4(127, 0, 0, 1) if s.ExternalIP.Equal(nilIP) || s.ExternalIP.Equal(localIP1) { - err := s.getAndSetExternalIp(msg) + err := s.getAndSetExternalIp(newPeer.Address, newPeer.Port) if err != nil { log.Println(err) log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host) } } - if s.Args.Port == msg.Port && - (localHosts[msg.Address] || msg.Address == s.ExternalIP.String()) { + if s.Args.Port == newPeer.Port && + (localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) { log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) return nil } - k := peerKey(msg) - newServer := &FederatedServer{ - Address: msg.Address, - Port: msg.Port, - Ts: time.Now(), - } + k := peerKey(newPeer) - log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, msg) - if oldServer, loaded := s.PeerServersLoadOrStore(newServer); !loaded { + log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer) + if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded { if ping { - _, err := s.helloPeer(newServer) + _, err := s.helloPeer(newPeer) if err != nil { s.PeerServersMut.Lock() delete(s.PeerServers, k) @@ -374,11 +367,11 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error s.incNumPeers() metrics.PeersKnown.Inc() s.writePeers() - s.notifyPeerSubs(newServer) + s.notifyPeerSubs(newPeer) // Subscribe to all our peers for now if subscribe { - err := s.subscribeToPeer(newServer) + err := s.subscribeToPeer(newPeer) if err != nil { return err } @@ -389,11 +382,16 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error return nil } -// mergeFederatedServers is an internal convenience function to add a list of +// mergePeers is an internal convenience function to add a list of // peers. -func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) { +func (s *Server) mergePeers(servers []*pb.ServerMessage) { for _, srvMsg := range servers { - err := s.addPeer(srvMsg, false, true) + newPeer := &Peer{ + Address: srvMsg.Address, + Port: srvMsg.Port, + Ts: time.Now(), + } + err := s.addPeer(newPeer, false, true) // This shouldn't happen because we're not pinging them. if err != nil { log.Println(err) diff --git a/server/federation_test.go b/server/federation_test.go index ed29dd2..33963bb 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -93,21 +93,21 @@ func TestAddPeer(t *testing.T) { metrics.PeersKnown.Set(0) for i := 0; i < 10; i++ { - var msg *pb.ServerMessage + var peer *Peer if strings.Contains(tt.name, "1 unique") { - msg = &pb.ServerMessage{ + peer = &Peer{ Address: "1.1.1.1", Port: "50051", } } else { x := i + 1 - msg = &pb.ServerMessage{ + peer = &Peer{ Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x), Port: "50051", } } //log.Printf("Adding peer %+v\n", msg) - err := server.addPeer(msg, false, false) + err := server.addPeer(peer, false, false) if err != nil { log.Println(err) } @@ -151,21 +151,21 @@ func TestPeerWriter(t *testing.T) { server.ExternalIP = net.IPv4(0, 0, 0, 0) for i := 0; i < 10; i++ { - var msg *pb.ServerMessage + var peer *Peer if strings.Contains(tt.name, "1 unique") { - msg = &pb.ServerMessage{ + peer = &Peer{ Address: "1.1.1.1", Port: "50051", } } else { x := i + 1 - msg = &pb.ServerMessage{ + peer = &Peer{ Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x), Port: "50051", } } - //log.Printf("Adding peer %+v\n", msg) - err := server.addPeer(msg, false, false) + //log.Printf("Adding peer %+v\n", peer) + err := server.addPeer(peer, false, false) if err != nil { log.Println(err) } @@ -449,12 +449,12 @@ func TestUDPServer(t *testing.T) { go server2.Run() metrics.PeersKnown.Set(0) - msg := &pb.ServerMessage{ + peer := &Peer{ Address: "0.0.0.0", Port: "50052", } - err := server.addPeer(msg, true, true) + err := server.addPeer(peer, true, true) if err != nil { log.Println(err) } diff --git a/server/server.go b/server/server.go index 16472ce..fea59c6 100644 --- a/server/server.go +++ b/server/server.go @@ -35,10 +35,10 @@ type Server struct { LastRefreshCheck time.Time RefreshDelta time.Duration NumESRefreshes int64 - PeerServers map[string]*FederatedServer + PeerServers map[string]*Peer PeerServersMut sync.RWMutex NumPeerServers *int64 - PeerSubs map[string]*FederatedServer + PeerSubs map[string]*Peer PeerSubsMut sync.RWMutex NumPeerSubs *int64 ExternalIP net.IP @@ -88,7 +88,7 @@ func getVersion() string { 'blockchain.address.unsubscribe' */ -func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) { +func (s *Server) PeerSubsLoadOrStore(peer *Peer) (actual *Peer, loaded bool) { key := peer.peerKey() s.PeerSubsMut.RLock() if actual, ok := s.PeerSubs[key]; ok { @@ -103,7 +103,7 @@ func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedSe } } -func (s *Server) PeerServersLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) { +func (s *Server) PeerServersLoadOrStore(peer *Peer) (actual *Peer, loaded bool) { key := peer.peerKey() s.PeerServersMut.RLock() if actual, ok := s.PeerServers[key]; ok { @@ -195,10 +195,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { LastRefreshCheck: time.Now(), RefreshDelta: refreshDelta, NumESRefreshes: 0, - PeerServers: make(map[string]*FederatedServer), + PeerServers: make(map[string]*Peer), PeerServersMut: sync.RWMutex{}, NumPeerServers: numPeers, - PeerSubs: make(map[string]*FederatedServer), + PeerSubs: make(map[string]*Peer), PeerSubsMut: sync.RWMutex{}, NumPeerSubs: numSubs, ExternalIP: net.IPv4(127, 0, 0, 1), @@ -245,21 +245,21 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes metrics.RequestsCount.With(prometheus.Labels{"method": "hello"}).Inc() port := args.Port host := args.Host - server := &FederatedServer{ + newPeer := &Peer{ Address: host, Port: port, Ts: time.Now(), } - log.Println(server) + log.Println(newPeer) - err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false, true) + err := s.addPeer(newPeer, false, true) // They just contacted us, so this shouldn't happen if err != nil { log.Println(err) } - s.mergeFederatedServers(args.Servers) + s.mergePeers(args.Servers) s.writePeers() - s.notifyPeerSubs(server) + s.notifyPeerSubs(newPeer) return s.makeHelloMessage(), nil } @@ -269,7 +269,7 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.StringValue, error) { metrics.RequestsCount.With(prometheus.Labels{"method": "peer_subscribe"}).Inc() var msg = "Success" - peer := &FederatedServer{ + peer := &Peer{ Address: in.Address, Port: in.Port, Ts: time.Now(), @@ -289,7 +289,12 @@ func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.S func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) { metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc() var msg = "Success" - err := s.addPeer(args, true, true) + newPeer := &Peer{ + Address: args.Address, + Port: args.Port, + Ts: time.Now(), + } + err := s.addPeer(newPeer, true, true) if err != nil { log.Println(err) msg = "Failed" diff --git a/server/udp.go b/server/udp.go index 1830977..6e7d454 100644 --- a/server/udp.go +++ b/server/udp.go @@ -17,8 +17,7 @@ const maxBufferSize = 1024 const magic = 1446058291 const protocolVersion = 1 const defaultFlags = 0b00000000 - -var availableFlag = 0b00000001 +const availableFlag = 0b00000001 // SPVPing is a struct for the format of how to ping another hub over udp. // format b'!lB64s' @@ -211,9 +210,6 @@ func UDPPing(ip, port string) (*SPVPong, error) { return nil, errors.Base("Pong decoding failed") } - // myAddr := pong.DecodeAddress() - // country := pong.DecodeCountry() - return pong, nil } diff --git a/server/udp_test.go b/server/udp_test.go index 8b8db8c..1db0566 100644 --- a/server/udp_test.go +++ b/server/udp_test.go @@ -7,7 +7,7 @@ import ( "testing" ) -// TestAddPeer tests the ability to add peers +// TestUDPPing tests UDPPing correctness against prod server. func TestUDPPing(t *testing.T) { args := makeDefaultArgs() args.DisableStartUDP = true