Refactor and fixes related to PR comments.

This commit is contained in:
Jeffrey Picard 2021-12-01 19:32:23 -05:00
parent 1c1d288654
commit 2e52c1639c
5 changed files with 74 additions and 75 deletions

View file

@ -16,8 +16,8 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
// FederatedServer hold relevant information about peers that we known about. // Peer holds relevant information about peers that we know about.
type FederatedServer struct { type Peer struct {
Address string Address string
Port string Port string
Ts time.Time Ts time.Time
@ -28,19 +28,19 @@ var (
"127.0.0.1": true, "127.0.0.1": true,
"0.0.0.0": true, "0.0.0.0": true,
"localhost": true, "localhost": true,
"<nil>": true, "<nil>": true, // Empty net.IP turned into a string
} }
) )
// peerKey takes a ServerMessage object and returns the key that for that peer // peerKey takes a peer and returns the key that for that peer
// in our peer table. // in our peer table.
func peerKey(msg *pb.ServerMessage) string { func peerKey(peer *Peer) string {
return msg.Address + ":" + msg.Port return peer.Address + ":" + peer.Port
} }
// peerKey is a function on a FederatedServer struct to return the key for that // peerKey is a function on a FederatedServer struct to return the key for that
// peer is out peer table. // peer is out peer table.
func (peer *FederatedServer) peerKey() string { func (peer *Peer) peerKey() string {
return peer.Address + ":" + peer.Port return peer.Address + ":" + peer.Port
} }
@ -68,11 +68,10 @@ func (s *Server) getNumSubs() int64 {
return *s.NumPeerSubs return *s.NumPeerSubs
} }
// getAndSetExternalIp takes the address of a peer running a UDP server and // getAndSetExternalIp takes the ip and port of a peer running a UDP server and
// pings it, so we can determine our own external IP address. // pings it, so we can determine our own external IP address.
func (s *Server) getAndSetExternalIp(msg *pb.ServerMessage) error { func (s *Server) getAndSetExternalIp(ip, port string) error {
log.Println(msg) pong, err := UDPPing(ip, port)
pong, err := UDPPing(msg.Address, msg.Port)
if err != nil { if err != nil {
return err return err
} }
@ -146,12 +145,13 @@ retry:
continue continue
} }
srvMsg := &pb.ServerMessage{ newPeer := &Peer{
Address: ipPort[0], Address: ipPort[0],
Port: ipPort[1], Port: ipPort[1],
Ts: time.Now(),
} }
log.Printf("pinging peer %+v\n", srvMsg) log.Printf("pinging peer %+v\n", newPeer)
err = s.addPeer(srvMsg, true, true) err = s.addPeer(newPeer, true, true)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
@ -164,7 +164,7 @@ retry:
// subscribeToPeer subscribes us to a peer to we'll get updates about their // subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers. // known peers.
func (s *Server) subscribeToPeer(peer *FederatedServer) error { func (s *Server) subscribeToPeer(peer *Peer) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
@ -199,13 +199,13 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error {
// This is used to confirm existence of peers on start and let them // This is used to confirm existence of peers on start and let them
// know about us. Returns the response from the server on success, // know about us. Returns the response from the server on success,
// nil otherwise. // nil otherwise.
func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) { func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
log.Println("In helloPeer") log.Println("In helloPeer")
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
conn, err := grpc.DialContext(ctx, conn, err := grpc.DialContext(ctx,
server.Address+":"+server.Port, peer.Address+":"+peer.Port,
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithBlock(), grpc.WithBlock(),
) )
@ -223,7 +223,7 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) {
Servers: []*pb.ServerMessage{}, Servers: []*pb.ServerMessage{},
} }
log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, server) log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer)
res, err := c.Hello(ctx, msg) res, err := c.Hello(ctx, msg)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
@ -266,8 +266,8 @@ func (s *Server) writePeers() {
} }
// notifyPeer takes a peer to notify and a new peer we just learned about // notifyPeer takes a peer to notify and a new peer we just learned about
// and calls AddPeer on the first. // and informs the already known peer about the new peer.
func (s *Server) notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error { func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
if s.Args.DisableFederation { if s.Args.DisableFederation {
return nil return nil
} }
@ -301,12 +301,12 @@ func (s *Server) notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedSer
// notifyPeerSubs takes a new peer server we just learned about and notifies // notifyPeerSubs takes a new peer server we just learned about and notifies
// all the peers that have subscribed to us about it. // all the peers that have subscribed to us about it.
func (s *Server) notifyPeerSubs(newServer *FederatedServer) { func (s *Server) notifyPeerSubs(newPeer *Peer) {
var unsubscribe []string var unsubscribe []string
s.PeerSubsMut.RLock() s.PeerSubsMut.RLock()
for key, peer := range s.PeerSubs { for key, peer := range s.PeerSubs {
log.Printf("Notifying peer %s of new node %+v\n", key, newServer) log.Printf("Notifying peer %s of new node %+v\n", key, newPeer)
err := s.notifyPeer(peer, newServer) err := s.notifyPeer(peer, newPeer)
if err != nil { if err != nil {
log.Println("Failed to send data to ", key) log.Println("Failed to send data to ", key)
log.Println(err) log.Println(err)
@ -326,43 +326,36 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) {
s.PeerSubsMut.Unlock() s.PeerSubsMut.Unlock()
} }
// addPeer takes a new peer as a pb.ServerMessage, optionally checks to see // addPeer takes a new peer, optionally checks to see if they're online, and
// if they're online, and adds them to our list of peer. If we're not currently // adds them to our list of peers. It will also optionally subscribe to it.
// subscribed to a peer, it will also subscribe to it. func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error {
if s.Args.DisableFederation { if s.Args.DisableFederation {
return nil return nil
} }
// First thing we get our external ip if we don't have it, otherwise we // First thing we get our external ip if we don't have it, otherwise we
// could end up subscribed to our self, which is silly. // could end up subscribed to our self, which is silly.
nilIP := net.IP{} nilIP := net.IP{}
//localIP0 := net.IPv4(0,0,0,0)
localIP1 := net.IPv4(127, 0, 0, 1) localIP1 := net.IPv4(127, 0, 0, 1)
if s.ExternalIP.Equal(nilIP) || s.ExternalIP.Equal(localIP1) { if s.ExternalIP.Equal(nilIP) || s.ExternalIP.Equal(localIP1) {
err := s.getAndSetExternalIp(msg) err := s.getAndSetExternalIp(newPeer.Address, newPeer.Port)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host) log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host)
} }
} }
if s.Args.Port == msg.Port && if s.Args.Port == newPeer.Port &&
(localHosts[msg.Address] || msg.Address == s.ExternalIP.String()) { (localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) {
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port) log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port)
return nil return nil
} }
k := peerKey(msg) k := peerKey(newPeer)
newServer := &FederatedServer{
Address: msg.Address,
Port: msg.Port,
Ts: time.Now(),
}
log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, msg) log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer)
if oldServer, loaded := s.PeerServersLoadOrStore(newServer); !loaded { if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded {
if ping { if ping {
_, err := s.helloPeer(newServer) _, err := s.helloPeer(newPeer)
if err != nil { if err != nil {
s.PeerServersMut.Lock() s.PeerServersMut.Lock()
delete(s.PeerServers, k) delete(s.PeerServers, k)
@ -374,11 +367,11 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error
s.incNumPeers() s.incNumPeers()
metrics.PeersKnown.Inc() metrics.PeersKnown.Inc()
s.writePeers() s.writePeers()
s.notifyPeerSubs(newServer) s.notifyPeerSubs(newPeer)
// Subscribe to all our peers for now // Subscribe to all our peers for now
if subscribe { if subscribe {
err := s.subscribeToPeer(newServer) err := s.subscribeToPeer(newPeer)
if err != nil { if err != nil {
return err return err
} }
@ -389,11 +382,16 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool, subscribe bool) error
return nil return nil
} }
// mergeFederatedServers is an internal convenience function to add a list of // mergePeers is an internal convenience function to add a list of
// peers. // peers.
func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) { func (s *Server) mergePeers(servers []*pb.ServerMessage) {
for _, srvMsg := range servers { for _, srvMsg := range servers {
err := s.addPeer(srvMsg, false, true) newPeer := &Peer{
Address: srvMsg.Address,
Port: srvMsg.Port,
Ts: time.Now(),
}
err := s.addPeer(newPeer, false, true)
// This shouldn't happen because we're not pinging them. // This shouldn't happen because we're not pinging them.
if err != nil { if err != nil {
log.Println(err) log.Println(err)

View file

@ -93,21 +93,21 @@ func TestAddPeer(t *testing.T) {
metrics.PeersKnown.Set(0) metrics.PeersKnown.Set(0)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
var msg *pb.ServerMessage var peer *Peer
if strings.Contains(tt.name, "1 unique") { if strings.Contains(tt.name, "1 unique") {
msg = &pb.ServerMessage{ peer = &Peer{
Address: "1.1.1.1", Address: "1.1.1.1",
Port: "50051", Port: "50051",
} }
} else { } else {
x := i + 1 x := i + 1
msg = &pb.ServerMessage{ peer = &Peer{
Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x), Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x),
Port: "50051", Port: "50051",
} }
} }
//log.Printf("Adding peer %+v\n", msg) //log.Printf("Adding peer %+v\n", msg)
err := server.addPeer(msg, false, false) err := server.addPeer(peer, false, false)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
@ -151,21 +151,21 @@ func TestPeerWriter(t *testing.T) {
server.ExternalIP = net.IPv4(0, 0, 0, 0) server.ExternalIP = net.IPv4(0, 0, 0, 0)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
var msg *pb.ServerMessage var peer *Peer
if strings.Contains(tt.name, "1 unique") { if strings.Contains(tt.name, "1 unique") {
msg = &pb.ServerMessage{ peer = &Peer{
Address: "1.1.1.1", Address: "1.1.1.1",
Port: "50051", Port: "50051",
} }
} else { } else {
x := i + 1 x := i + 1
msg = &pb.ServerMessage{ peer = &Peer{
Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x), Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x),
Port: "50051", Port: "50051",
} }
} }
//log.Printf("Adding peer %+v\n", msg) //log.Printf("Adding peer %+v\n", peer)
err := server.addPeer(msg, false, false) err := server.addPeer(peer, false, false)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
@ -449,12 +449,12 @@ func TestUDPServer(t *testing.T) {
go server2.Run() go server2.Run()
metrics.PeersKnown.Set(0) metrics.PeersKnown.Set(0)
msg := &pb.ServerMessage{ peer := &Peer{
Address: "0.0.0.0", Address: "0.0.0.0",
Port: "50052", Port: "50052",
} }
err := server.addPeer(msg, true, true) err := server.addPeer(peer, true, true)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }

View file

@ -35,10 +35,10 @@ type Server struct {
LastRefreshCheck time.Time LastRefreshCheck time.Time
RefreshDelta time.Duration RefreshDelta time.Duration
NumESRefreshes int64 NumESRefreshes int64
PeerServers map[string]*FederatedServer PeerServers map[string]*Peer
PeerServersMut sync.RWMutex PeerServersMut sync.RWMutex
NumPeerServers *int64 NumPeerServers *int64
PeerSubs map[string]*FederatedServer PeerSubs map[string]*Peer
PeerSubsMut sync.RWMutex PeerSubsMut sync.RWMutex
NumPeerSubs *int64 NumPeerSubs *int64
ExternalIP net.IP ExternalIP net.IP
@ -88,7 +88,7 @@ func getVersion() string {
'blockchain.address.unsubscribe' 'blockchain.address.unsubscribe'
*/ */
func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) { func (s *Server) PeerSubsLoadOrStore(peer *Peer) (actual *Peer, loaded bool) {
key := peer.peerKey() key := peer.peerKey()
s.PeerSubsMut.RLock() s.PeerSubsMut.RLock()
if actual, ok := s.PeerSubs[key]; ok { if actual, ok := s.PeerSubs[key]; ok {
@ -103,7 +103,7 @@ func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedSe
} }
} }
func (s *Server) PeerServersLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) { func (s *Server) PeerServersLoadOrStore(peer *Peer) (actual *Peer, loaded bool) {
key := peer.peerKey() key := peer.peerKey()
s.PeerServersMut.RLock() s.PeerServersMut.RLock()
if actual, ok := s.PeerServers[key]; ok { if actual, ok := s.PeerServers[key]; ok {
@ -195,10 +195,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
LastRefreshCheck: time.Now(), LastRefreshCheck: time.Now(),
RefreshDelta: refreshDelta, RefreshDelta: refreshDelta,
NumESRefreshes: 0, NumESRefreshes: 0,
PeerServers: make(map[string]*FederatedServer), PeerServers: make(map[string]*Peer),
PeerServersMut: sync.RWMutex{}, PeerServersMut: sync.RWMutex{},
NumPeerServers: numPeers, NumPeerServers: numPeers,
PeerSubs: make(map[string]*FederatedServer), PeerSubs: make(map[string]*Peer),
PeerSubsMut: sync.RWMutex{}, PeerSubsMut: sync.RWMutex{},
NumPeerSubs: numSubs, NumPeerSubs: numSubs,
ExternalIP: net.IPv4(127, 0, 0, 1), ExternalIP: net.IPv4(127, 0, 0, 1),
@ -245,21 +245,21 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes
metrics.RequestsCount.With(prometheus.Labels{"method": "hello"}).Inc() metrics.RequestsCount.With(prometheus.Labels{"method": "hello"}).Inc()
port := args.Port port := args.Port
host := args.Host host := args.Host
server := &FederatedServer{ newPeer := &Peer{
Address: host, Address: host,
Port: port, Port: port,
Ts: time.Now(), Ts: time.Now(),
} }
log.Println(server) log.Println(newPeer)
err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false, true) err := s.addPeer(newPeer, false, true)
// They just contacted us, so this shouldn't happen // They just contacted us, so this shouldn't happen
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
s.mergeFederatedServers(args.Servers) s.mergePeers(args.Servers)
s.writePeers() s.writePeers()
s.notifyPeerSubs(server) s.notifyPeerSubs(newPeer)
return s.makeHelloMessage(), nil return s.makeHelloMessage(), nil
} }
@ -269,7 +269,7 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes
func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.StringValue, error) { func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "peer_subscribe"}).Inc() metrics.RequestsCount.With(prometheus.Labels{"method": "peer_subscribe"}).Inc()
var msg = "Success" var msg = "Success"
peer := &FederatedServer{ peer := &Peer{
Address: in.Address, Address: in.Address,
Port: in.Port, Port: in.Port,
Ts: time.Now(), Ts: time.Now(),
@ -289,7 +289,12 @@ func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.S
func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) { func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc() metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc()
var msg = "Success" var msg = "Success"
err := s.addPeer(args, true, true) newPeer := &Peer{
Address: args.Address,
Port: args.Port,
Ts: time.Now(),
}
err := s.addPeer(newPeer, true, true)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
msg = "Failed" msg = "Failed"

View file

@ -17,8 +17,7 @@ const maxBufferSize = 1024
const magic = 1446058291 const magic = 1446058291
const protocolVersion = 1 const protocolVersion = 1
const defaultFlags = 0b00000000 const defaultFlags = 0b00000000
const availableFlag = 0b00000001
var availableFlag = 0b00000001
// SPVPing is a struct for the format of how to ping another hub over udp. // SPVPing is a struct for the format of how to ping another hub over udp.
// format b'!lB64s' // format b'!lB64s'
@ -211,9 +210,6 @@ func UDPPing(ip, port string) (*SPVPong, error) {
return nil, errors.Base("Pong decoding failed") return nil, errors.Base("Pong decoding failed")
} }
// myAddr := pong.DecodeAddress()
// country := pong.DecodeCountry()
return pong, nil return pong, nil
} }

View file

@ -7,7 +7,7 @@ import (
"testing" "testing"
) )
// TestAddPeer tests the ability to add peers // TestUDPPing tests UDPPing correctness against prod server.
func TestUDPPing(t *testing.T) { func TestUDPPing(t *testing.T) {
args := makeDefaultArgs() args := makeDefaultArgs()
args.DisableStartUDP = true args.DisableStartUDP = true