UDPServer / ip address resolution #24
2
.github/workflows/build-short.yml
vendored
|
@ -11,4 +11,4 @@ jobs:
|
|||
with:
|
||||
go-version: 1.16.5
|
||||
- run: go build .
|
||||
- run: cd server && go test -v -race
|
||||
- run: go test -v -race ./...
|
2
.github/workflows/build.yml
vendored
|
@ -30,4 +30,4 @@ jobs:
|
|||
- run: go get github.com/golang/protobuf/protoc-gen-go google.golang.org/grpc/cmd/protoc-gen-go-grpc
|
||||
- run: go build .
|
||||
- run: ./protobuf/build.sh
|
||||
- run: cd server && go test -v -race
|
||||
- run: go test -v -race ./...
|
||||
|
|
|
@ -18,8 +18,8 @@ var (
|
|||
Help: "Number of errors by type",
|
||||
}, []string{"error_type"})
|
||||
QueryTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Name: "query_time",
|
||||
Help: "Histogram of query times",
|
||||
Name: "query_time",
|
||||
Help: "Histogram of query times",
|
||||
Buckets: HistogramBuckets,
|
||||
}, []string{"method"})
|
||||
PeersKnown = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
|
@ -31,4 +31,3 @@ var (
|
|||
Help: "Number of peers that are subscribed to us.",
|
||||
})
|
||||
)
|
||||
|
||||
|
|
14
main.go
|
@ -26,19 +26,7 @@ func main() {
|
|||
|
||||
s := server.MakeHubServer(ctxWCancel, args)
|
||||
s.Run()
|
||||
//l, err := net.Listen("tcp", ":"+args.Port)
|
||||
//if err != nil {
|
||||
// log.Fatalf("failed to listen: %v", err)
|
||||
//}
|
||||
//
|
||||
//pb.RegisterHubServer(s.GrpcServer, s)
|
||||
//reflection.Register(s.GrpcServer)
|
||||
//
|
||||
//log.Printf("listening on %s\n", l.Addr().String())
|
||||
//log.Println(s.Args)
|
||||
//if err := s.GrpcServer.Serve(l); err != nil {
|
||||
// log.Fatalf("failed to serve: %v", err)
|
||||
//}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
1051
protobuf/definitions/claim.proto
Normal file
5031
protobuf/go/claim.pb.go
Normal file
116
server/args.go
|
@ -16,42 +16,42 @@ const (
|
|||
|
||||
// Args struct contains the arguments to the hub server.
|
||||
type Args struct {
|
||||
CmdType int
|
||||
Host string
|
||||
Port string
|
||||
UDPPort string
|
||||
EsHost string
|
||||
EsPort string
|
||||
PrometheusPort string
|
||||
EsIndex string
|
||||
RefreshDelta int
|
||||
CacheTTL int
|
||||
PeerFile string
|
||||
Country string
|
||||
DisableEs bool
|
||||
Debug bool
|
||||
LoadPeers bool
|
||||
StartPrometheus bool
|
||||
StartUDP bool
|
||||
WritePeers bool
|
||||
CmdType int
|
||||
Host string
|
||||
Port string
|
||||
EsHost string
|
||||
EsPort string
|
||||
PrometheusPort string
|
||||
EsIndex string
|
||||
RefreshDelta int
|
||||
CacheTTL int
|
||||
PeerFile string
|
||||
Country string
|
||||
DisableEs bool
|
||||
Debug bool
|
||||
DisableLoadPeers bool
|
||||
DisableStartPrometheus bool
|
||||
DisableStartUDP bool
|
||||
DisableWritePeers bool
|
||||
DisableFederation bool
|
||||
}
|
||||
|
||||
const (
|
||||
DefaultHost = "0.0.0.0"
|
||||
DefaultPort = "50051"
|
||||
DefaultUdpPort = "41119"
|
||||
DefaultEsHost = "http://localhost"
|
||||
DefaultEsIndex = "claims"
|
||||
DefaultEsPort = "9200"
|
||||
DefaultPrometheusPort = "2112"
|
||||
DefaultRefreshDelta = 5
|
||||
DefaultCacheTTL = 5
|
||||
DefaultPeerFile = "peers.txt"
|
||||
DefaultCountry = "US"
|
||||
DefaultLoadPeers = true
|
||||
DefaultStartPrometheus = true
|
||||
DefaultStartUDP = true
|
||||
DefaultWritePeers = true
|
||||
DefaultHost = "0.0.0.0"
|
||||
DefaultPort = "50051"
|
||||
DefaultEsHost = "http://localhost"
|
||||
DefaultEsIndex = "claims"
|
||||
DefaultEsPort = "9200"
|
||||
DefaultPrometheusPort = "2112"
|
||||
DefaultRefreshDelta = 5
|
||||
DefaultCacheTTL = 5
|
||||
DefaultPeerFile = "peers.txt"
|
||||
DefaultCountry = "US"
|
||||
DefaultDisableLoadPeers = false
|
||||
DefaultDisableStartPrometheus = false
|
||||
DefaultDisableStartUDP = false
|
||||
DefaultDisableWritePeers = false
|
||||
DefaultDisableFederation = false
|
||||
)
|
||||
|
||||
// GetEnvironment takes the environment variables as an array of strings
|
||||
|
@ -88,7 +88,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
|
||||
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
|
||||
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
|
||||
udpPort := parser.String("", "uspport", &argparse.Options{Required: false, Help: "udp ping port", Default: DefaultUdpPort})
|
||||
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
|
||||
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
|
||||
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
|
||||
|
@ -98,10 +97,11 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
|
||||
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
|
||||
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
|
||||
loadPeers := parser.Flag("", "load-peers", &argparse.Options{Required: false, Help: "load peers from disk at startup", Default: DefaultLoadPeers})
|
||||
startPrometheus := parser.Flag("", "start-prometheus", &argparse.Options{Required: false, Help: "Start prometheus server", Default: DefaultStartPrometheus})
|
||||
startUdp := parser.Flag("", "start-udp", &argparse.Options{Required: false, Help: "Start UDP ping server", Default: DefaultStartUDP})
|
||||
writePeers := parser.Flag("", "write-peers", &argparse.Options{Required: false, Help: "Write peer to disk as we learn about them", Default: DefaultWritePeers})
|
||||
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
|
||||
disableStartPrometheus := parser.Flag("", "disable-start-prometheus", &argparse.Options{Required: false, Help: "Disable start prometheus server", Default: DefaultDisableStartPrometheus})
|
||||
disableStartUdp := parser.Flag("", "disable-start-udp", &argparse.Options{Required: false, Help: "Disable start UDP ping server", Default: DefaultDisableStartUDP})
|
||||
disableWritePeers := parser.Flag("", "disable-write-peers", &argparse.Options{Required: false, Help: "Disable write peer to disk as we learn about them", Default: DefaultDisableWritePeers})
|
||||
disableFederation := parser.Flag("", "disable-federation", &argparse.Options{Required: false, Help: "Disable server federation", Default: DefaultDisableFederation})
|
||||
|
||||
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
|
||||
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
|
||||
|
@ -120,24 +120,24 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
}
|
||||
|
||||
args := &Args{
|
||||
CmdType: SearchCmd,
|
||||
Host: *host,
|
||||
Port: *port,
|
||||
EsHost: *esHost,
|
||||
EsPort: *esPort,
|
||||
UDPPort: *udpPort,
|
||||
PrometheusPort: *prometheusPort,
|
||||
EsIndex: *esIndex,
|
||||
RefreshDelta: *refreshDelta,
|
||||
CacheTTL: *cacheTTL,
|
||||
PeerFile: *peerFile,
|
||||
Country: *country,
|
||||
DisableEs: *disableEs,
|
||||
Debug: *debug,
|
||||
LoadPeers: *loadPeers,
|
||||
StartPrometheus: *startPrometheus,
|
||||
StartUDP: *startUdp,
|
||||
WritePeers: *writePeers,
|
||||
CmdType: SearchCmd,
|
||||
Host: *host,
|
||||
Port: *port,
|
||||
EsHost: *esHost,
|
||||
EsPort: *esPort,
|
||||
PrometheusPort: *prometheusPort,
|
||||
EsIndex: *esIndex,
|
||||
RefreshDelta: *refreshDelta,
|
||||
CacheTTL: *cacheTTL,
|
||||
PeerFile: *peerFile,
|
||||
Country: *country,
|
||||
DisableEs: *disableEs,
|
||||
Debug: *debug,
|
||||
DisableLoadPeers: *disableLoadPeers,
|
||||
DisableStartPrometheus: *disableStartPrometheus,
|
||||
DisableStartUDP: *disableStartUdp,
|
||||
DisableWritePeers: *disableWritePeers,
|
||||
DisableFederation: *disableFederation,
|
||||
}
|
||||
|
||||
if esHost, ok := environment["ELASTIC_HOST"]; ok {
|
||||
|
@ -157,7 +157,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
}
|
||||
|
||||
/*
|
||||
Verify no invalid argument combinations
|
||||
Verify no invalid argument combinations
|
||||
*/
|
||||
if len(*channelIds) > 0 && *channelId != "" {
|
||||
log.Fatal("Cannot specify both channel_id and channel_ids")
|
||||
|
@ -198,4 +198,4 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
|
|||
}
|
||||
|
||||
return args
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"bufio"
|
||||
"context"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
@ -14,31 +16,31 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// FederatedServer hold relevant information about peers that we known about.
|
||||
type FederatedServer struct {
|
||||
Address string
|
||||
Port string
|
||||
Ts time.Time
|
||||
// Peer holds relevant information about peers that we know about.
|
||||
type Peer struct {
|
||||
Address string
|
||||
Port string
|
||||
LastSeen time.Time
|
||||
}
|
||||
|
||||
This happens when the net.IP type is empty and gets turned into a string. This happens when the net.IP type is empty and gets turned into a string.
|
||||
var (
|
||||
localHosts = map[string]bool{
|
||||
"127.0.0.1": true,
|
||||
"0.0.0.0": true,
|
||||
"0.0.0.0": true,
|
||||
"localhost": true,
|
||||
"<nil>": true, // Empty net.IP turned into a string
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
// peerKey takes a ServerMessage object and returns the key that for that peer
|
||||
// peerKey takes a peer and returns the key that for that peer
|
||||
// in our peer table.
|
||||
func peerKey(msg *pb.ServerMessage) string {
|
||||
return msg.Address + ":" + msg.Port
|
||||
func peerKey(peer *Peer) string {
|
||||
return peer.Address + ":" + peer.Port
|
||||
}
|
||||
|
||||
// peerKey is a function on a FederatedServer struct to return the key for that
|
||||
// peer is out peer table.
|
||||
func (peer *FederatedServer) peerKey() string {
|
||||
func (peer *Peer) peerKey() string {
|
||||
return peer.Address + ":" + peer.Port
|
||||
}
|
||||
|
||||
|
@ -66,6 +68,19 @@ func (s *Server) getNumSubs() int64 {
|
|||
return *s.NumPeerSubs
|
||||
}
|
||||
|
||||
// getAndSetExternalIp detects the server's external IP and stores it.
|
||||
func (s *Server) getAndSetExternalIp(ip, port string) error {
|
||||
pong, err := UDPPing(ip, port)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
myIp := pong.DecodeAddress()
|
||||
log.Println("my ip: ", myIp)
|
||||
s.ExternalIP = myIp
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadPeers takes the arguments given to the hub at startup and loads the
|
||||
// previously known peers from disk and verifies their existence before
|
||||
// storing them as known peers. Returns a map of peerKey -> object
|
||||
|
@ -73,6 +88,32 @@ func (s *Server) loadPeers() error {
|
|||
peerFile := s.Args.PeerFile
|
||||
port := s.Args.Port
|
||||
|
||||
// First we make sure our server has come up, so we can answer back to peers.
|
||||
var failures = 0
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
retry:
|
||||
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
|
||||
conn, err := grpc.DialContext(ctx,
|
||||
"0.0.0.0:"+port,
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock(),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if failures > 3 {
|
||||
log.Println("Warning! Our endpoint doesn't seem to have come up, didn't load peers")
|
||||
return err
|
||||
}
|
||||
failures += 1
|
||||
goto retry
|
||||
}
|
||||
if err = conn.Close(); err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
cancel()
|
||||
|
||||
f, err := os.Open(peerFile)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
@ -90,26 +131,30 @@ func (s *Server) loadPeers() error {
|
|||
}
|
||||
|
||||
for _, line := range text {
|
||||
ipPort := strings.Split(line,":")
|
||||
ipPort := strings.Split(line, ":")
|
||||
if len(ipPort) != 2 {
|
||||
log.Println("Malformed entry in peer file")
|
||||
continue
|
||||
}
|
||||
// If the peer is us, skip
|
||||
log.Println(ipPort)
|
||||
if ipPort[1] == port && localHosts[ipPort[0]] {
|
||||
if ipPort[1] == port &&
|
||||
(localHosts[ipPort[0]] || ipPort[0] == s.ExternalIP.String()) {
|
||||
log.Println("Self peer, skipping ...")
|
||||
continue
|
||||
}
|
||||
srvMsg := &pb.ServerMessage{
|
||||
Address: ipPort[0],
|
||||
Port: ipPort[1],
|
||||
|
||||
newPeer := &Peer{
|
||||
Address: ipPort[0],
|
||||
Port: ipPort[1],
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
log.Printf("pinging peer %+v\n", srvMsg)
|
||||
err := s.addPeer(srvMsg, true)
|
||||
log.Printf("pinging peer %+v\n", newPeer)
|
||||
err = s.addPeer(newPeer, true, true)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
log.Println("Returning from loadPeers")
|
||||
|
@ -118,7 +163,7 @@ func (s *Server) loadPeers() error {
|
|||
|
||||
// subscribeToPeer subscribes us to a peer to we'll get updates about their
|
||||
// known peers.
|
||||
func (s *Server) subscribeToPeer(peer *FederatedServer) error {
|
||||
func (s *Server) subscribeToPeer(peer *Peer) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
@ -133,20 +178,18 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error {
|
|||
defer conn.Close()
|
||||
|
||||
msg := &pb.ServerMessage{
|
||||
Address: s.Args.Host,
|
||||
Address: s.ExternalIP.String(),
|
||||
Port: s.Args.Port,
|
||||
}
|
||||
|
||||
c := pb.NewHubClient(conn)
|
||||
|
||||
log.Printf("%s:%s subscribing to %+v\n", s.Args.Host, s.Args.Port, peer)
|
||||
log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer)
|
||||
_, err = c.PeerSubscribe(ctx, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.Subscribed = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -155,13 +198,13 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error {
|
|||
// This is used to confirm existence of peers on start and let them
|
||||
// know about us. Returns the response from the server on success,
|
||||
// nil otherwise.
|
||||
func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) {
|
||||
func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
|
||||
log.Println("In helloPeer")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(ctx,
|
||||
server.Address+":"+server.Port,
|
||||
peer.Address+":"+peer.Port,
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock(),
|
||||
)
|
||||
|
@ -174,12 +217,12 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) {
|
|||
c := pb.NewHubClient(conn)
|
||||
|
||||
msg := &pb.HelloMessage{
|
||||
Port: s.Args.Port,
|
||||
Host: s.Args.Host,
|
||||
Port: s.Args.Port,
|
||||
Host: s.ExternalIP.String(),
|
||||
Servers: []*pb.ServerMessage{},
|
||||
}
|
||||
|
||||
log.Printf("%s:%s saying hello to %+v\n", s.Args.Host, s.Args.Port, server)
|
||||
log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer)
|
||||
res, err := c.Hello(ctx, msg)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
@ -193,7 +236,7 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) {
|
|||
|
||||
// writePeers writes our current known peers to disk.
|
||||
func (s *Server) writePeers() {
|
||||
if !s.Args.WritePeers {
|
||||
if s.Args.DisableWritePeers {
|
||||
return
|
||||
}
|
||||
f, err := os.Create(s.Args.PeerFile)
|
||||
|
@ -222,8 +265,11 @@ func (s *Server) writePeers() {
|
|||
}
|
||||
|
||||
in general, comments should describe the interface and not the implementation. say what it means to notify a peer, not what functions are called inside this one. thats more informative and you can change the implementation without changing the comment. in general, comments should describe the interface and not the implementation. say what it means to notify a peer, not what functions are called inside this one. thats more informative and you can change the implementation without changing the comment.
if you haven't read Philosophy of Software Design, i strongly recommend it (esp the chapters on comments). i can buy you a copy if you'd like if you haven't read Philosophy of Software Design, i strongly recommend it (esp the chapters on comments). i can buy you a copy if you'd like
Makes sense! I'll take a look at that book. Makes sense! I'll take a look at that book.
|
||||
// notifyPeer takes a peer to notify and a new peer we just learned about
|
||||
// and calls AddPeer on the first.
|
||||
func notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error {
|
||||
// and informs the already known peer about the new peer.
|
||||
func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
|
||||
if s.Args.DisableFederation {
|
||||
return nil
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
@ -254,12 +300,12 @@ func notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error {
|
|||
|
||||
// notifyPeerSubs takes a new peer server we just learned about and notifies
|
||||
// all the peers that have subscribed to us about it.
|
||||
func (s *Server) notifyPeerSubs(newServer *FederatedServer) {
|
||||
func (s *Server) notifyPeerSubs(newPeer *Peer) {
|
||||
var unsubscribe []string
|
||||
s.PeerSubsMut.RLock()
|
||||
for key, peer := range s.PeerSubs {
|
||||
log.Printf("Notifying peer %s of new node %+v\n", key, newServer)
|
||||
err := notifyPeer(peer, newServer)
|
||||
log.Printf("Notifying peer %s of new node %+v\n", key, newPeer)
|
||||
err := s.notifyPeer(peer, newPeer)
|
||||
if err != nil {
|
||||
log.Println("Failed to send data to ", key)
|
||||
log.Println(err)
|
||||
|
@ -279,25 +325,36 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) {
|
|||
s.PeerSubsMut.Unlock()
|
||||
}
|
||||
|
||||
// addPeer takes a new peer as a pb.ServerMessage, optionally checks to see
|
||||
// if they're online, and adds them to our list of peer. If we're not currently
|
||||
// subscribed to a peer, it will also subscribe to it.
|
||||
func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error {
|
||||
if s.Args.Port == msg.Port &&
|
||||
(localHosts[msg.Address] || msg.Address == s.Args.Host) {
|
||||
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.Args.Host, s.Args.Port)
|
||||
// addPeer takes a new peer, optionally checks to see if they're online, and
|
||||
this is a good comment, though i'd delete "takes a new peer as a pb.ServerMessage" since you can tell that from the signature (though maybe rename this is a good comment, though i'd delete "takes a new peer as a pb.ServerMessage" since you can tell that from the signature (though maybe rename `msg` to `newPeer` since that's what it is). and in fact, maybe you need a Peer type so pb.ServerMessage is not doing double duty
I agree. I actually do have a type exactly for this, but it's still called FederatedServer because that's what I called it months ago when I started this. I think I'll actually do a little refactor and rename that to Peer throughout the code base. It definitely makes more sense. I agree. I actually do have a type exactly for this, but it's still called FederatedServer because that's what I called it months ago when I started this. I think I'll actually do a little refactor and rename that to Peer throughout the code base. It definitely makes more sense.
|
||||
// adds them to our list of peers. It will also optionally subscribe to it.
|
||||
func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
|
||||
if s.Args.DisableFederation {
|
||||
return nil
|
||||
}
|
||||
k := peerKey(msg)
|
||||
newServer := &FederatedServer{
|
||||
Address: msg.Address,
|
||||
Port: msg.Port,
|
||||
Ts: time.Now(),
|
||||
// First thing we get our external ip if we don't have it, otherwise we
|
||||
// could end up subscribed to our self, which is silly.
|
||||
nilIP := net.IP{}
|
||||
localIP1 := net.IPv4(127, 0, 0, 1)
|
||||
if s.ExternalIP.Equal(nilIP) || s.ExternalIP.Equal(localIP1) {
|
||||
err := s.getAndSetExternalIp(newPeer.Address, newPeer.Port)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host)
|
||||
}
|
||||
}
|
||||
log.Printf("%s:%s adding peer %+v\n", s.Args.Host, s.Args.Port, msg)
|
||||
if oldServer, loaded := s.PeerServersLoadOrStore(newServer); !loaded {
|
||||
|
||||
if s.Args.Port == newPeer.Port &&
|
||||
(localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) {
|
||||
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port)
|
||||
return nil
|
||||
}
|
||||
|
||||
k := peerKey(newPeer)
|
||||
|
||||
log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer)
|
||||
if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded {
|
||||
if ping {
|
||||
_, err := s.helloPeer(newServer)
|
||||
_, err := s.helloPeer(newPeer)
|
||||
if err != nil {
|
||||
s.PeerServersMut.Lock()
|
||||
delete(s.PeerServers, k)
|
||||
|
@ -309,26 +366,31 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error {
|
|||
s.incNumPeers()
|
||||
metrics.PeersKnown.Inc()
|
||||
s.writePeers()
|
||||
s.notifyPeerSubs(newServer)
|
||||
s.notifyPeerSubs(newPeer)
|
||||
|
||||
// Subscribe to all our peers for now
|
||||
err := s.subscribeToPeer(newServer)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
s.Subscribed = true
|
||||
if subscribe {
|
||||
err := s.subscribeToPeer(newPeer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
oldServer.Ts = time.Now()
|
||||
oldServer.LastSeen = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// mergeFederatedServers is an internal convenience function to add a list of
|
||||
// mergePeers is an internal convenience function to add a list of
|
||||
// peers.
|
||||
func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) {
|
||||
func (s *Server) mergePeers(servers []*pb.ServerMessage) {
|
||||
for _, srvMsg := range servers {
|
||||
err := s.addPeer(srvMsg, false)
|
||||
newPeer := &Peer{
|
||||
Address: srvMsg.Address,
|
||||
Port: srvMsg.Port,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
err := s.addPeer(newPeer, false, true)
|
||||
// This shouldn't happen because we're not pinging them.
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
@ -345,14 +407,14 @@ func (s *Server) makeHelloMessage() *pb.HelloMessage {
|
|||
for _, peer := range s.PeerServers {
|
||||
servers = append(servers, &pb.ServerMessage{
|
||||
Address: peer.Address,
|
||||
Port: peer.Port,
|
||||
Port: peer.Port,
|
||||
})
|
||||
}
|
||||
s.PeerServersMut.RUnlock()
|
||||
|
||||
return &pb.HelloMessage{
|
||||
Port: s.Args.Port,
|
||||
Host: s.Args.Host,
|
||||
Port: s.Args.Port,
|
||||
Host: s.ExternalIP.String(),
|
||||
Servers: servers,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -44,24 +45,23 @@ func removeFile(fileName string) {
|
|||
|
||||
func makeDefaultArgs() *Args {
|
||||
args := &Args{
|
||||
CmdType: ServeCmd,
|
||||
Host: DefaultHost,
|
||||
Port: DefaultPort,
|
||||
EsHost: DefaultEsHost,
|
||||
EsPort: DefaultEsPort,
|
||||
UDPPort: DefaultUdpPort,
|
||||
PrometheusPort: DefaultPrometheusPort,
|
||||
EsIndex: DefaultEsIndex,
|
||||
RefreshDelta: DefaultRefreshDelta,
|
||||
CacheTTL: DefaultCacheTTL,
|
||||
PeerFile: DefaultPeerFile,
|
||||
Country: DefaultCountry,
|
||||
DisableEs: true,
|
||||
Debug: true,
|
||||
LoadPeers: false,
|
||||
StartPrometheus: false,
|
||||
StartUDP: false,
|
||||
WritePeers: false,
|
||||
CmdType: ServeCmd,
|
||||
Host: DefaultHost,
|
||||
Port: DefaultPort,
|
||||
EsHost: DefaultEsHost,
|
||||
EsPort: DefaultEsPort,
|
||||
PrometheusPort: DefaultPrometheusPort,
|
||||
EsIndex: DefaultEsIndex,
|
||||
RefreshDelta: DefaultRefreshDelta,
|
||||
CacheTTL: DefaultCacheTTL,
|
||||
PeerFile: DefaultPeerFile,
|
||||
Country: DefaultCountry,
|
||||
DisableEs: true,
|
||||
Debug: true,
|
||||
DisableLoadPeers: true,
|
||||
DisableStartPrometheus: true,
|
||||
DisableStartUDP: true,
|
||||
DisableWritePeers: true,
|
||||
}
|
||||
|
||||
return args
|
||||
|
@ -75,7 +75,7 @@ func TestAddPeer(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
want int
|
||||
} {
|
||||
}{
|
||||
{
|
||||
name: "Add 10 peers",
|
||||
want: 10,
|
||||
|
@ -87,27 +87,27 @@ func TestAddPeer(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T){
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := MakeHubServer(ctx, args)
|
||||
server.Subscribed = true
|
||||
server.ExternalIP = net.IPv4(0, 0, 0, 0)
|
||||
metrics.PeersKnown.Set(0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
var msg *pb.ServerMessage
|
||||
var peer *Peer
|
||||
if strings.Contains(tt.name, "1 unique") {
|
||||
msg = &pb.ServerMessage{
|
||||
peer = &Peer{
|
||||
Address: "1.1.1.1",
|
||||
Port: "50051",
|
||||
}
|
||||
} else {
|
||||
x := i + 1
|
||||
msg = &pb.ServerMessage{
|
||||
peer = &Peer{
|
||||
Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x),
|
||||
Port: "50051",
|
||||
}
|
||||
}
|
||||
//log.Printf("Adding peer %+v\n", msg)
|
||||
err := server.addPeer(msg, false)
|
||||
err := server.addPeer(peer, false, false)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
@ -129,12 +129,12 @@ func TestAddPeer(t *testing.T) {
|
|||
func TestPeerWriter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
args.WritePeers = true
|
||||
args.DisableWritePeers = false
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
want int
|
||||
} {
|
||||
}{
|
||||
{
|
||||
name: "Add 10 peers",
|
||||
want: 10,
|
||||
|
@ -146,26 +146,26 @@ func TestPeerWriter(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T){
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := MakeHubServer(ctx, args)
|
||||
server.Subscribed = true
|
||||
server.ExternalIP = net.IPv4(0, 0, 0, 0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
var msg *pb.ServerMessage
|
||||
var peer *Peer
|
||||
if strings.Contains(tt.name, "1 unique") {
|
||||
msg = &pb.ServerMessage{
|
||||
peer = &Peer{
|
||||
Address: "1.1.1.1",
|
||||
Port: "50051",
|
||||
}
|
||||
} else {
|
||||
x := i + 1
|
||||
msg = &pb.ServerMessage{
|
||||
peer = &Peer{
|
||||
Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x),
|
||||
Port: "50051",
|
||||
}
|
||||
}
|
||||
//log.Printf("Adding peer %+v\n", msg)
|
||||
err := server.addPeer(msg, false)
|
||||
//log.Printf("Adding peer %+v\n", peer)
|
||||
err := server.addPeer(peer, false, false)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
@ -188,12 +188,11 @@ func TestAddPeerEndpoint(t *testing.T) {
|
|||
args2 := makeDefaultArgs()
|
||||
args2.Port = "50052"
|
||||
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
name string
|
||||
wantServerOne int64
|
||||
wantServerTwo int64
|
||||
} {
|
||||
}{
|
||||
{
|
||||
// outside -> server1.AddPeer(server2, ping=true) : server1 = 1, server2 = 0
|
||||
// server1 -> server2.Hello(server1) : server1 = 1, server2 = 0
|
||||
|
@ -204,14 +203,14 @@ func TestAddPeerEndpoint(t *testing.T) {
|
|||
// server1 -> server2.AddPeer(server2) : server1 = 1, server2 = 1
|
||||
// server2 self peer, skipping : server1 = 1, server2 = 1
|
||||
// server1 -> server2.PeerSubscribe(server1) : server1 = 1, server2 = 1
|
||||
name: "Add 1 peer",
|
||||
name: "Add 1 peer",
|
||||
wantServerOne: 1,
|
||||
wantServerTwo: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T){
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := MakeHubServer(ctx, args)
|
||||
server2 := MakeHubServer(ctx, args2)
|
||||
metrics.PeersKnown.Set(0)
|
||||
|
@ -262,15 +261,14 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
|||
args2.Port = "50052"
|
||||
args3.Port = "50053"
|
||||
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
name string
|
||||
wantServerOne int64
|
||||
wantServerTwo int64
|
||||
wantServerThree int64
|
||||
} {
|
||||
}{
|
||||
{
|
||||
name: "Add 2 peers",
|
||||
name: "Add 2 peers",
|
||||
wantServerOne: 2,
|
||||
wantServerTwo: 2,
|
||||
wantServerThree: 2,
|
||||
|
@ -278,7 +276,7 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T){
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := MakeHubServer(ctx, args)
|
||||
server2 := MakeHubServer(ctx, args2)
|
||||
server3 := MakeHubServer(ctx, args3)
|
||||
|
@ -335,7 +333,6 @@ func TestAddPeerEndpoint2(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
|
||||
// TestAddPeerEndpoint3 tests the ability to add peers
|
||||
func TestAddPeerEndpoint3(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
@ -345,15 +342,14 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
|||
args2.Port = "50052"
|
||||
args3.Port = "50053"
|
||||
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
name string
|
||||
wantServerOne int64
|
||||
wantServerTwo int64
|
||||
wantServerThree int64
|
||||
} {
|
||||
}{
|
||||
{
|
||||
name: "Add 1 peer to each",
|
||||
name: "Add 1 peer to each",
|
||||
wantServerOne: 2,
|
||||
wantServerTwo: 2,
|
||||
wantServerThree: 2,
|
||||
|
@ -361,7 +357,7 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T){
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := MakeHubServer(ctx, args)
|
||||
server2 := MakeHubServer(ctx, args2)
|
||||
server3 := MakeHubServer(ctx, args3)
|
||||
|
@ -425,3 +421,58 @@ func TestAddPeerEndpoint3(t *testing.T) {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
// TestAddPeer tests the ability to add peers
|
||||
func TestUDPServer(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
args := makeDefaultArgs()
|
||||
args.DisableStartUDP = false
|
||||
args2 := makeDefaultArgs()
|
||||
args2.Port = "50052"
|
||||
args2.DisableStartUDP = false
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "hubs server external ip",
|
||||
want: "127.0.0.1",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
server := MakeHubServer(ctx, args)
|
||||
server2 := MakeHubServer(ctx, args2)
|
||||
go server.Run()
|
||||
go server2.Run()
|
||||
metrics.PeersKnown.Set(0)
|
||||
|
||||
peer := &Peer{
|
||||
Address: "0.0.0.0",
|
||||
Port: "50052",
|
||||
}
|
||||
|
||||
err := server.addPeer(peer, true, true)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
server.GrpcServer.GracefulStop()
|
||||
server2.GrpcServer.GracefulStop()
|
||||
|
||||
got1 := server.ExternalIP.String()
|
||||
if got1 != tt.want {
|
||||
t.Errorf("server.ExternalIP = %s, want %s\n", got1, tt.want)
|
||||
t.Errorf("server.Args.Port = %s\n", server.Args.Port)
|
||||
}
|
||||
got2 := server2.ExternalIP.String()
|
||||
if got2 != tt.want {
|
||||
t.Errorf("server2.ExternalIP = %s, want %s\n", got2, tt.want)
|
||||
t.Errorf("server2.Args.Port = %s\n", server2.Args.Port)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -275,16 +275,16 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
setPageVars(in, &pageSize, &from)
|
||||
|
||||
/*
|
||||
cache based on search request params
|
||||
include from value and number of results.
|
||||
When another search request comes in with same search params
|
||||
and same or increased offset (which we currently don't even use?)
|
||||
that will be a cache hit.
|
||||
FIXME: For now the cache is turned off when in debugging mode
|
||||
(for unit tests) because it breaks on some of them.
|
||||
FIXME: Currently the cache just skips the initial search,
|
||||
the mgets and post processing are still done. There's probably
|
||||
a more efficient way to store the final result.
|
||||
cache based on search request params
|
||||
include from value and number of results.
|
||||
When another search request comes in with same search params
|
||||
and same or increased offset (which we currently don't even use?)
|
||||
that will be a cache hit.
|
||||
FIXME: For now the cache is turned off when in debugging mode
|
||||
(for unit tests) because it breaks on some of them.
|
||||
FIXME: Currently the cache just skips the initial search,
|
||||
the mgets and post processing are still done. There's probably
|
||||
a more efficient way to store the final result.
|
||||
*/
|
||||
|
||||
if val, err := s.QueryCache.Get(cacheKey); err != nil {
|
||||
|
@ -518,15 +518,15 @@ func (s *Server) setupEsQuery(
|
|||
}
|
||||
|
||||
replacements := map[string]string{
|
||||
"name": "normalized_name",
|
||||
"normalized": "normalized_name",
|
||||
"claim_name": "normalized_name",
|
||||
"txid": "tx_id",
|
||||
"nout": "tx_nout",
|
||||
"reposted": "repost_count",
|
||||
"name": "normalized_name",
|
||||
"normalized": "normalized_name",
|
||||
"claim_name": "normalized_name",
|
||||
"txid": "tx_id",
|
||||
"nout": "tx_nout",
|
||||
"reposted": "repost_count",
|
||||
"valid_channel_signature": "is_signature_valid",
|
||||
"claim_id": "_id",
|
||||
"signature_digest": "signature",
|
||||
"claim_id": "_id",
|
||||
"signature_digest": "signature",
|
||||
}
|
||||
|
||||
textFields := map[string]bool{
|
||||
|
@ -967,4 +967,3 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B
|
|||
|
||||
return newHits, blockedHits, blockedChannels
|
||||
}
|
||||
|
||||
|
|
|
@ -35,17 +35,16 @@ type Server struct {
|
|||
LastRefreshCheck time.Time
|
||||
RefreshDelta time.Duration
|
||||
NumESRefreshes int64
|
||||
PeerServers map[string]*FederatedServer
|
||||
PeerServers map[string]*Peer
|
||||
PeerServersMut sync.RWMutex
|
||||
NumPeerServers *int64
|
||||
there's a there's a `net.IP` type that you should use for IPs
|
||||
PeerSubs map[string]*FederatedServer
|
||||
PeerSubs map[string]*Peer
|
||||
PeerSubsMut sync.RWMutex
|
||||
NumPeerSubs *int64
|
||||
Subscribed bool
|
||||
ExternalIP net.IP
|
||||
pb.UnimplementedHubServer
|
||||
}
|
||||
|
||||
nice nice
|
||||
|
||||
func getVersion() string {
|
||||
return meta.Version
|
||||
}
|
||||
|
@ -89,7 +88,7 @@ func getVersion() string {
|
|||
'blockchain.address.unsubscribe'
|
||||
*/
|
||||
|
||||
func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
|
||||
func (s *Server) PeerSubsLoadOrStore(peer *Peer) (actual *Peer, loaded bool) {
|
||||
key := peer.peerKey()
|
||||
s.PeerSubsMut.RLock()
|
||||
if actual, ok := s.PeerSubs[key]; ok {
|
||||
|
@ -104,7 +103,7 @@ func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedSe
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) PeerServersLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
|
||||
func (s *Server) PeerServersLoadOrStore(peer *Peer) (actual *Peer, loaded bool) {
|
||||
key := peer.peerKey()
|
||||
s.PeerServersMut.RLock()
|
||||
if actual, ok := s.PeerServers[key]; ok {
|
||||
|
@ -196,20 +195,20 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
|||
LastRefreshCheck: time.Now(),
|
||||
RefreshDelta: refreshDelta,
|
||||
NumESRefreshes: 0,
|
||||
PeerServers: make(map[string]*FederatedServer),
|
||||
PeerServers: make(map[string]*Peer),
|
||||
PeerServersMut: sync.RWMutex{},
|
||||
NumPeerServers: numPeers,
|
||||
PeerSubs: make(map[string]*FederatedServer),
|
||||
PeerSubs: make(map[string]*Peer),
|
||||
PeerSubsMut: sync.RWMutex{},
|
||||
NumPeerSubs: numSubs,
|
||||
Subscribed: false,
|
||||
ExternalIP: net.IPv4(127, 0, 0, 1),
|
||||
}
|
||||
|
||||
// Start up our background services
|
||||
if args.StartPrometheus {
|
||||
if !args.DisableStartPrometheus {
|
||||
go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics")
|
||||
}
|
||||
if args.StartUDP {
|
||||
if !args.DisableStartUDP {
|
||||
go func() {
|
||||
err := UDPServer(args)
|
||||
if err != nil {
|
||||
|
@ -218,11 +217,13 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
|||
}()
|
||||
}
|
||||
// Load peers from disk and subscribe to one if there are any
|
||||
if args.LoadPeers {
|
||||
err = s.loadPeers()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
if !args.DisableLoadPeers {
|
||||
go func() {
|
||||
err := s.loadPeers()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return s
|
||||
|
@ -244,21 +245,21 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes
|
|||
metrics.RequestsCount.With(prometheus.Labels{"method": "hello"}).Inc()
|
||||
port := args.Port
|
||||
host := args.Host
|
||||
server := &FederatedServer{
|
||||
Address: host,
|
||||
Port: port,
|
||||
Ts: time.Now(),
|
||||
newPeer := &Peer{
|
||||
Address: host,
|
||||
Port: port,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
log.Println(server)
|
||||
log.Println(newPeer)
|
||||
|
||||
err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false)
|
||||
err := s.addPeer(newPeer, false, true)
|
||||
// They just contacted us, so this shouldn't happen
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
s.mergeFederatedServers(args.Servers)
|
||||
s.mergePeers(args.Servers)
|
||||
s.writePeers()
|
||||
s.notifyPeerSubs(server)
|
||||
s.notifyPeerSubs(newPeer)
|
||||
|
||||
return s.makeHelloMessage(), nil
|
||||
}
|
||||
|
@ -268,10 +269,10 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes
|
|||
func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.StringValue, error) {
|
||||
metrics.RequestsCount.With(prometheus.Labels{"method": "peer_subscribe"}).Inc()
|
||||
var msg = "Success"
|
||||
peer := &FederatedServer{
|
||||
Address: in.Address,
|
||||
Port: in.Port,
|
||||
Ts: time.Now(),
|
||||
peer := &Peer{
|
||||
Address: in.Address,
|
||||
Port: in.Port,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
|
||||
if _, loaded := s.PeerSubsLoadOrStore(peer); !loaded {
|
||||
|
@ -288,7 +289,12 @@ func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.S
|
|||
func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) {
|
||||
metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc()
|
||||
var msg = "Success"
|
||||
err := s.addPeer(args, true)
|
||||
newPeer := &Peer{
|
||||
Address: args.Address,
|
||||
Port: args.Port,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
err := s.addPeer(newPeer, true, true)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
msg = "Failed"
|
||||
|
|
|
@ -2,19 +2,25 @@ package server
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
pb "github.com/lbryio/hub/protobuf/go"
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
)
|
||||
|
||||
const maxBufferSize = 1024
|
||||
const maxBufferSize = 1024
|
||||
|
||||
// genesis blocktime (which is actually wrong)
|
||||
const magic = 1446058291
|
||||
// magic constant for the UDPPing protocol. The above comment is taken from
|
||||
// the python code this was implemented off of.
|
||||
// https://github.com/lbryio/lbry-sdk/blob/7d49b046d44a4b7067d5dc1d6cd65ff0475c71c8/lbry/wallet/server/udp.py#L12
|
||||
const magic = 1446058291
|
||||
const protocolVersion = 1
|
||||
const defaultFlags = 0b00000000
|
||||
const availableFlag = 0b00000001
|
||||
can you add a comment describing what this is for? why is the (wrong) genesis blocktime stored in a var called can you add a comment describing what this is for? why is the (wrong) genesis blocktime stored in a var called `magic`?
Sure, so the comment is just stolen from the Python implementation and it's an arbitrary number needed for the UDPPing protocol, but I'll add a comment saying that. Sure, so the comment is just stolen from the Python implementation and it's an arbitrary number needed for the UDPPing protocol, but I'll add a comment saying that.
|
||||
|
||||
// SPVPing is a struct for the format of how to ping another hub over udp.
|
||||
// format b'!lB64s'
|
||||
|
@ -31,7 +37,7 @@ type SPVPong struct {
|
|||
flags byte
|
||||
height uint32
|
||||
tip []byte // 32
|
||||
srcAddrRaw []byte // 4
|
||||
srcAddrRaw []byte // 4
|
||||
country uint16
|
||||
}
|
||||
|
||||
|
@ -55,7 +61,7 @@ func decodeSPVPing(data []byte) *SPVPing {
|
|||
parsedMagic := binary.BigEndian.Uint32(data)
|
||||
parsedProtocalVersion := data[4]
|
||||
return &SPVPing{
|
||||
magic: parsedMagic,
|
||||
magic: parsedMagic,
|
||||
version: parsedProtocalVersion,
|
||||
}
|
||||
}
|
||||
|
@ -65,7 +71,7 @@ func decodeSPVPing(data []byte) *SPVPing {
|
|||
func (pong *SPVPong) Encode() []byte {
|
||||
data := make([]byte, 44)
|
||||
|
||||
data[0] = pong.protocolVersion
|
||||
data[0] = pong.protocolVersion
|
||||
data[1] = pong.flags
|
||||
binary.BigEndian.PutUint32(data[2:], pong.height)
|
||||
copy(data[6:], pong.tip)
|
||||
|
@ -76,10 +82,13 @@ func (pong *SPVPong) Encode() []byte {
|
|||
}
|
||||
|
||||
// makeSPVPong creates an SPVPong struct according to given parameters.
|
||||
// FIXME: Currently, does not correctly encode the country.
|
||||
func makeSPVPong(flags int, height int, tip []byte, sourceAddr string, country string) *SPVPong {
|
||||
byteAddr := EncodeAddress(sourceAddr)
|
||||
countryInt := 1
|
||||
var countryInt int32
|
||||
var ok bool
|
||||
if countryInt, ok = pb.Location_Country_value[country]; !ok {
|
||||
countryInt = int32(pb.Location_UNKNOWN_COUNTRY)
|
||||
}
|
||||
return &SPVPong{
|
||||
protocolVersion: protocolVersion,
|
||||
flags: byte(flags),
|
||||
|
@ -99,19 +108,19 @@ func decodeSPVPong(data []byte) *SPVPong {
|
|||
|
||||
parsedProtocalVersion := data[0]
|
||||
flags := data[1]
|
||||
height := binary.BigEndian.Uint32(data[:2])
|
||||
height := binary.BigEndian.Uint32(data[2:])
|
||||
tip := make([]byte, 32)
|
||||
copy(tip, data[6:38])
|
||||
srcRawAddr := make([]byte, 4)
|
||||
copy(srcRawAddr, data[38:42])
|
||||
country := binary.BigEndian.Uint16(data[:42])
|
||||
country := binary.BigEndian.Uint16(data[42:])
|
||||
return &SPVPong{
|
||||
protocolVersion: parsedProtocalVersion,
|
||||
flags: flags,
|
||||
height: height,
|
||||
tip: tip,
|
||||
srcAddrRaw: srcRawAddr,
|
||||
country: country,
|
||||
flags: flags,
|
||||
height: height,
|
||||
tip: tip,
|
||||
srcAddrRaw: srcRawAddr,
|
||||
country: country,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -137,8 +146,8 @@ func EncodeAddress(addr string) []byte {
|
|||
}
|
||||
|
||||
// DecodeAddress gets the string ipv4 address from an SPVPong struct.
|
||||
func (pong *SPVPong) DecodeAddress() string {
|
||||
return fmt.Sprintf("%d.%d.%d.%d",
|
||||
func (pong *SPVPong) DecodeAddress() net.IP {
|
||||
return net.IPv4(
|
||||
pong.srcAddrRaw[0],
|
||||
pong.srcAddrRaw[1],
|
||||
pong.srcAddrRaw[2],
|
||||
|
@ -146,53 +155,72 @@ func (pong *SPVPong) DecodeAddress() string {
|
|||
)
|
||||
}
|
||||
|
||||
func (pong *SPVPong) DecodeCountry() string {
|
||||
return pb.Location_Country_name[int32(pong.country)]
|
||||
}
|
||||
|
||||
func (pong *SPVPong) DecodeProtocolVersion() int {
|
||||
return int(pong.protocolVersion)
|
||||
}
|
||||
|
||||
func (pong *SPVPong) DecodeHeight() int {
|
||||
return int(pong.height)
|
||||
}
|
||||
|
||||
func (pong *SPVPong) DecodeTip() []byte {
|
||||
return pong.tip
|
||||
}
|
||||
|
||||
func (pong *SPVPong) DecodeFlags() byte {
|
||||
return pong.flags
|
||||
}
|
||||
|
||||
// UDPPing sends a ping over udp to another hub and returns the ip address of
|
||||
// this hub.
|
||||
func UDPPing(address string) (string, error) {
|
||||
func UDPPing(ip, port string) (*SPVPong, error) {
|
||||
address := ip + ":" + port
|
||||
addr, err := net.ResolveUDPAddr("udp", address)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := net.DialUDP("udp", nil, addr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer conn.Close()
|
||||
|
||||
_, err = conn.Write(encodeSPVPing())
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buffer := make([]byte, maxBufferSize)
|
||||
deadline := time.Now().Add(time.Second)
|
||||
err = conn.SetReadDeadline(deadline)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
n, _, err := conn.ReadFromUDP(buffer)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pong := decodeSPVPong(buffer[:n])
|
||||
|
||||
if pong == nil {
|
||||
return "", errors.Base("Pong decoding failed")
|
||||
return nil, errors.Base("Pong decoding failed")
|
||||
}
|
||||
|
||||
myAddr := pong.DecodeAddress()
|
||||
|
||||
return myAddr, nil
|
||||
return pong, nil
|
||||
}
|
||||
|
||||
// UDPServer is a goroutine that starts an udp server that implements the hubs
|
||||
// Ping/Pong protocol to find out about each other without making full TCP
|
||||
// connections.
|
||||
func UDPServer(args *Args) error {
|
||||
address := ":" + args.UDPPort
|
||||
address := ":" + args.Port
|
||||
tip := make([]byte, 32)
|
||||
addr, err := net.ResolveUDPAddr("udp", address)
|
||||
if err != nil {
|
||||
|
@ -215,7 +243,7 @@ func UDPServer(args *Args) error {
|
|||
}
|
||||
|
||||
sAddr := addr.IP.String()
|
||||
pong := makeSPVPong(0,0, tip, sAddr, args.Country)
|
||||
pong := makeSPVPong(defaultFlags|availableFlag, 0, tip, sAddr, args.Country)
|
||||
data := pong.Encode()
|
||||
|
||||
_, err = conn.WriteToUDP(data, addr)
|
||||
|
|
83
server/udp_test.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestUDPPing tests UDPPing correctness against prod server.
|
||||
func TestUDPPing(t *testing.T) {
|
||||
args := makeDefaultArgs()
|
||||
args.DisableStartUDP = true
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
wantIP string
|
||||
wantCountry string
|
||||
wantProtocolVersion int
|
||||
wantHeightMin int
|
||||
wantFlags byte
|
||||
}{
|
||||
{
|
||||
name: "Correctly parse information from production server.",
|
||||
wantIP: "SETME",
|
||||
wantCountry: "US",
|
||||
wantProtocolVersion: 1,
|
||||
wantHeightMin: 1060000,
|
||||
wantFlags: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
toAddr := "spv16.lbry.com"
|
||||
toPort := "50001"
|
||||
|
||||
pong, err := UDPPing(toAddr, toPort)
|
||||
gotCountry := pong.DecodeCountry()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
res, err := exec.Command("dig", "@resolver4.opendns.com", "myip.opendns.com", "+short").Output()
|
||||
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
digIP := strings.TrimSpace(string(res))
|
||||
udpIP := pong.DecodeAddress().String()
|
||||
tt.wantIP = digIP
|
||||
|
||||
log.Println("Height:", pong.DecodeHeight())
|
||||
log.Printf("Flags: %x\n", pong.DecodeFlags())
|
||||
log.Println("ProtocolVersion:", pong.DecodeProtocolVersion())
|
||||
log.Printf("Tip: %x\n", pong.DecodeTip())
|
||||
|
||||
gotHeight := pong.DecodeHeight()
|
||||
gotProtocolVersion := pong.DecodeProtocolVersion()
|
||||
gotFlags := pong.DecodeFlags()
|
||||
gotIP := udpIP
|
||||
|
||||
if gotIP != tt.wantIP {
|
||||
t.Errorf("ip: got: '%s', want: '%s'\n", gotIP, tt.wantIP)
|
||||
}
|
||||
if gotCountry != tt.wantCountry {
|
||||
t.Errorf("country: got: '%s', want: '%s'\n", gotCountry, tt.wantCountry)
|
||||
}
|
||||
if gotHeight < tt.wantHeightMin {
|
||||
t.Errorf("height: got: %d, want >=: %d\n", gotHeight, tt.wantHeightMin)
|
||||
}
|
||||
if gotProtocolVersion != tt.wantProtocolVersion {
|
||||
t.Errorf("protocolVersion: got: %d, want: %d\n", gotProtocolVersion, tt.wantProtocolVersion)
|
||||
}
|
||||
if gotFlags != tt.wantFlags {
|
||||
t.Errorf("flags: got: %d, want: %d\n", gotFlags, tt.wantFlags)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
weird. how does that happen?