Some cleanup based on review and use map + sync.RWMutex instead of

sync.Map
This commit is contained in:
Jeffrey Picard 2021-11-04 20:57:23 -04:00
parent 2ee8d2c3cc
commit c61c8db92a
4 changed files with 81 additions and 70 deletions

View file

@ -11,4 +11,4 @@ jobs:
with: with:
go-version: 1.16.5 go-version: 1.16.5
- run: go build . - run: go build .
- run: cd server && go test -v - run: cd server && go test -v -race

View file

@ -30,4 +30,4 @@ jobs:
- run: go get github.com/golang/protobuf/protoc-gen-go google.golang.org/grpc/cmd/protoc-gen-go-grpc - run: go get github.com/golang/protobuf/protoc-gen-go google.golang.org/grpc/cmd/protoc-gen-go-grpc
- run: go build . - run: go build .
- run: ./protobuf/build.sh - run: ./protobuf/build.sh
- run: cd server && go test -v - run: cd server && go test -v -race

View file

@ -36,6 +36,12 @@ func peerKey(msg *pb.ServerMessage) string {
return msg.Address + ":" + msg.Port return msg.Address + ":" + msg.Port
} }
// peerKey is a function on a FederatedServer struct to return the key for that
// peer is out peer table.
func (peer *FederatedServer) peerKey() string {
return peer.Address + ":" + peer.Port
}
func (s *Server) incNumPeers() { func (s *Server) incNumPeers() {
atomic.AddInt64(s.NumPeerServers, 1) atomic.AddInt64(s.NumPeerServers, 1)
} }
@ -117,10 +123,9 @@ func (s *Server) loadPeers() error {
func (s *Server) getFastestPeer() *FederatedServer { func (s *Server) getFastestPeer() *FederatedServer {
var fastestPeer *FederatedServer var fastestPeer *FederatedServer
s.PeerServers.Range(func(_, v interface{}) bool { for _, v := range s.PeerServers {
fastestPeer = v.(*FederatedServer) return v
return false }
})
return fastestPeer return fastestPeer
} }
@ -226,19 +231,13 @@ func (s *Server) writePeers() {
} }
writer := bufio.NewWriter(f) writer := bufio.NewWriter(f)
s.PeerServers.Range(func(k, _ interface{}) bool { for key, _ := range s.PeerServers {
key, ok := k.(string)
if !ok {
log.Println("Failed to cast key when writing peers: ", k)
return true
}
line := key + "\n" line := key + "\n"
_, err := writer.WriteString(line) _, err := writer.WriteString(line)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
return true }
})
err = writer.Flush() err = writer.Flush()
if err != nil { if err != nil {
@ -285,18 +284,8 @@ func notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error {
// all the peers that have subscribed to us about it. // all the peers that have subscribed to us about it.
func (s *Server) notifyPeerSubs(newServer *FederatedServer) { func (s *Server) notifyPeerSubs(newServer *FederatedServer) {
var unsubscribe []string var unsubscribe []string
s.PeerSubs.Range(func(k, v interface{}) bool { s.PeerSubsMut.RLock()
key, ok := k.(string) for key, peer := range s.PeerSubs {
if !ok {
log.Println("Failed to cast subscriber key: ", v)
return true
}
peer, ok := v.(*FederatedServer)
if !ok {
log.Println("Failed to cast subscriber value: ", v)
return true
}
log.Printf("Notifying peer %s of new node %+v\n", key, newServer) log.Printf("Notifying peer %s of new node %+v\n", key, newServer)
err := notifyPeer(peer, newServer) err := notifyPeer(peer, newServer)
if err != nil { if err != nil {
@ -304,14 +293,18 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) {
log.Println(err) log.Println(err)
unsubscribe = append(unsubscribe, key) unsubscribe = append(unsubscribe, key)
} }
return true
})
for _, key := range unsubscribe {
s.decNumSubs()
metrics.PeersSubscribed.Dec()
s.PeerSubs.Delete(key)
} }
s.PeerSubsMut.RUnlock()
s.PeerSubsMut.Lock()
for _, key := range unsubscribe {
if _, ok := s.PeerSubs[key]; ok {
delete(s.PeerSubs, key)
s.decNumSubs()
metrics.PeersSubscribed.Dec()
}
}
s.PeerSubsMut.Unlock()
} }
// addPeer takes a new peer as a pb.ServerMessage, optionally checks to see // addPeer takes a new peer as a pb.ServerMessage, optionally checks to see
@ -330,11 +323,13 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error {
Ts: time.Now(), Ts: time.Now(),
} }
log.Printf("%s:%s adding peer %+v\n", s.Args.Host, s.Args.Port, msg) log.Printf("%s:%s adding peer %+v\n", s.Args.Host, s.Args.Port, msg)
if oldServer, loaded := s.PeerServers.LoadOrStore(k, newServer); !loaded { if oldServer, loaded := s.PeerServersLoadOrStore(newServer); !loaded {
if ping { if ping {
_, err := s.helloPeer(newServer) _, err := s.helloPeer(newServer)
if err != nil { if err != nil {
s.PeerServers.Delete(k) s.PeerServersMut.Lock()
delete(s.PeerServers, k)
s.PeerServersMut.Unlock()
return err return err
} }
} }
@ -344,27 +339,15 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error {
s.writePeers() s.writePeers()
s.notifyPeerSubs(newServer) s.notifyPeerSubs(newServer)
// If aren't subscribed to a server yet, subscribe to // Subscribe to all our peers for now
// this one. err := s.subscribeToPeer(newServer)
if !s.Subscribed { if err != nil {
err := s.subscribeToPeer(newServer) return err
if err != nil { } else {
s.PeerServers.Delete(k) s.Subscribed = true
return err
} else {
s.Subscribed = true
}
} }
} else { } else {
oldServerCast, ok := oldServer.(*FederatedServer) oldServer.Ts = time.Now()
// This shouldn't happen, but if it does, I guess delete the key
// and try adding this one since it's new.
if !ok {
log.Println("Error casting map value: ", oldServer)
s.PeerServers.Delete(k)
return s.addPeer(msg, ping)
}
oldServerCast.Ts = time.Now()
} }
return nil return nil
} }
@ -386,18 +369,14 @@ func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) {
func (s *Server) makeHelloMessage() *pb.HelloMessage { func (s *Server) makeHelloMessage() *pb.HelloMessage {
servers := make([]*pb.ServerMessage, 0, 10) servers := make([]*pb.ServerMessage, 0, 10)
s.PeerServers.Range(func(_, v interface{}) bool { s.PeerServersMut.RLock()
peer, ok := v.(*FederatedServer) for _, peer := range s.PeerServers {
if !ok {
log.Println("Failed to cast value in makeHelloMessage", v)
return true
}
servers = append(servers, &pb.ServerMessage{ servers = append(servers, &pb.ServerMessage{
Address: peer.Address, Address: peer.Address,
Port: peer.Port, Port: peer.Port,
}) })
return true }
}) s.PeerServersMut.RUnlock()
return &pb.HelloMessage{ return &pb.HelloMessage{
Port: s.Args.Port, Port: s.Args.Port,

View file

@ -35,9 +35,11 @@ type Server struct {
LastRefreshCheck time.Time LastRefreshCheck time.Time
RefreshDelta time.Duration RefreshDelta time.Duration
NumESRefreshes int64 NumESRefreshes int64
PeerServers sync.Map //map[string]*FederatedServer PeerServers map[string]*FederatedServer
PeerServersMut sync.RWMutex
NumPeerServers *int64 NumPeerServers *int64
PeerSubs sync.Map PeerSubs map[string]*FederatedServer
PeerSubsMut sync.RWMutex
NumPeerSubs *int64 NumPeerSubs *int64
Subscribed bool Subscribed bool
pb.UnimplementedHubServer pb.UnimplementedHubServer
@ -87,6 +89,36 @@ func getVersion() string {
'blockchain.address.unsubscribe' 'blockchain.address.unsubscribe'
*/ */
func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
key := peer.peerKey()
s.PeerSubsMut.RLock()
if actual, ok := s.PeerSubs[key]; ok {
s.PeerSubsMut.RUnlock()
return actual, true
} else {
s.PeerSubsMut.RUnlock()
s.PeerSubsMut.Lock()
s.PeerSubs[key] = peer
s.PeerSubsMut.Unlock()
return peer, false
}
}
func (s *Server) PeerServersLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
key := peer.peerKey()
s.PeerServersMut.RLock()
if actual, ok := s.PeerServers[key]; ok {
s.PeerServersMut.RUnlock()
return actual, true
} else {
s.PeerServersMut.RUnlock()
s.PeerServersMut.Lock()
s.PeerServers[key] = peer
s.PeerServersMut.Unlock()
return peer, false
}
}
func (s *Server) Run() { func (s *Server) Run() {
l, err := net.Listen("tcp", ":"+s.Args.Port) l, err := net.Listen("tcp", ":"+s.Args.Port)
if err != nil { if err != nil {
@ -119,7 +151,7 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
log.Fatal(err) log.Fatal(err)
} }
var client *elastic.Client var client *elastic.Client = nil
if !args.DisableEs { if !args.DisableEs {
esUrl := args.EsHost + ":" + args.EsPort esUrl := args.EsHost + ":" + args.EsPort
opts := []elastic.ClientOptionFunc{ opts := []elastic.ClientOptionFunc{
@ -135,8 +167,6 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} else {
client = nil
} }
cache := ttlcache.NewCache() cache := ttlcache.NewCache()
@ -166,9 +196,11 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
LastRefreshCheck: time.Now(), LastRefreshCheck: time.Now(),
RefreshDelta: refreshDelta, RefreshDelta: refreshDelta,
NumESRefreshes: 0, NumESRefreshes: 0,
PeerServers: sync.Map{}, PeerServers: make(map[string]*FederatedServer),
PeerServersMut: sync.RWMutex{},
NumPeerServers: numPeers, NumPeerServers: numPeers,
PeerSubs: sync.Map{}, PeerSubs: make(map[string]*FederatedServer),
PeerSubsMut: sync.RWMutex{},
NumPeerSubs: numSubs, NumPeerSubs: numSubs,
Subscribed: false, Subscribed: false,
} }
@ -249,7 +281,7 @@ func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.S
Ts: time.Now(), Ts: time.Now(),
} }
if _, loaded := s.PeerSubs.LoadOrStore(peerKey(in), peer); !loaded { if _, loaded := s.PeerSubsLoadOrStore(peer); !loaded {
s.incNumSubs() s.incNumSubs()
metrics.PeersSubscribed.Inc() metrics.PeersSubscribed.Inc()
} else { } else {