Merge pull request #24 from lbryio/feature/18/jeffreypicard/udp-endpoint

UDPServer / ip address resolution
This commit is contained in:
Jeffrey Picard 2021-12-06 11:57:47 -05:00 committed by GitHub
commit 516b95d96b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 6563 additions and 265 deletions

View file

@ -11,4 +11,4 @@ jobs:
with:
go-version: 1.16.5
- run: go build .
- run: cd server && go test -v -race
- run: go test -v -race ./...

View file

@ -30,4 +30,4 @@ jobs:
- run: go get github.com/golang/protobuf/protoc-gen-go google.golang.org/grpc/cmd/protoc-gen-go-grpc
- run: go build .
- run: ./protobuf/build.sh
- run: cd server && go test -v -race
- run: go test -v -race ./...

View file

@ -18,8 +18,8 @@ var (
Help: "Number of errors by type",
}, []string{"error_type"})
QueryTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "query_time",
Help: "Histogram of query times",
Name: "query_time",
Help: "Histogram of query times",
Buckets: HistogramBuckets,
}, []string{"method"})
PeersKnown = promauto.NewGauge(prometheus.GaugeOpts{
@ -31,4 +31,3 @@ var (
Help: "Number of peers that are subscribed to us.",
})
)

14
main.go
View file

@ -26,19 +26,7 @@ func main() {
s := server.MakeHubServer(ctxWCancel, args)
s.Run()
//l, err := net.Listen("tcp", ":"+args.Port)
//if err != nil {
// log.Fatalf("failed to listen: %v", err)
//}
//
//pb.RegisterHubServer(s.GrpcServer, s)
//reflection.Register(s.GrpcServer)
//
//log.Printf("listening on %s\n", l.Addr().String())
//log.Println(s.Args)
//if err := s.GrpcServer.Serve(l); err != nil {
// log.Fatalf("failed to serve: %v", err)
//}
return
}

File diff suppressed because it is too large Load diff

5031
protobuf/go/claim.pb.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -16,42 +16,42 @@ const (
// Args struct contains the arguments to the hub server.
type Args struct {
CmdType int
Host string
Port string
UDPPort string
EsHost string
EsPort string
PrometheusPort string
EsIndex string
RefreshDelta int
CacheTTL int
PeerFile string
Country string
DisableEs bool
Debug bool
LoadPeers bool
StartPrometheus bool
StartUDP bool
WritePeers bool
CmdType int
Host string
Port string
EsHost string
EsPort string
PrometheusPort string
EsIndex string
RefreshDelta int
CacheTTL int
PeerFile string
Country string
DisableEs bool
Debug bool
DisableLoadPeers bool
DisableStartPrometheus bool
DisableStartUDP bool
DisableWritePeers bool
DisableFederation bool
}
const (
DefaultHost = "0.0.0.0"
DefaultPort = "50051"
DefaultUdpPort = "41119"
DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims"
DefaultEsPort = "9200"
DefaultPrometheusPort = "2112"
DefaultRefreshDelta = 5
DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt"
DefaultCountry = "US"
DefaultLoadPeers = true
DefaultStartPrometheus = true
DefaultStartUDP = true
DefaultWritePeers = true
DefaultHost = "0.0.0.0"
DefaultPort = "50051"
DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims"
DefaultEsPort = "9200"
DefaultPrometheusPort = "2112"
DefaultRefreshDelta = 5
DefaultCacheTTL = 5
DefaultPeerFile = "peers.txt"
DefaultCountry = "US"
DefaultDisableLoadPeers = false
DefaultDisableStartPrometheus = false
DefaultDisableStartUDP = false
DefaultDisableWritePeers = false
DefaultDisableFederation = false
)
// GetEnvironment takes the environment variables as an array of strings
@ -88,7 +88,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
udpPort := parser.String("", "uspport", &argparse.Options{Required: false, Help: "udp ping port", Default: DefaultUdpPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: DefaultEsIndex})
refreshDelta := parser.Int("", "refresh-delta", &argparse.Options{Required: false, Help: "elasticsearch index refresh delta in seconds", Default: DefaultRefreshDelta})
@ -98,10 +97,11 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
disableEs := parser.Flag("", "disable-es", &argparse.Options{Required: false, Help: "Disable elastic search, for running/testing independently", Default: false})
loadPeers := parser.Flag("", "load-peers", &argparse.Options{Required: false, Help: "load peers from disk at startup", Default: DefaultLoadPeers})
startPrometheus := parser.Flag("", "start-prometheus", &argparse.Options{Required: false, Help: "Start prometheus server", Default: DefaultStartPrometheus})
startUdp := parser.Flag("", "start-udp", &argparse.Options{Required: false, Help: "Start UDP ping server", Default: DefaultStartUDP})
writePeers := parser.Flag("", "write-peers", &argparse.Options{Required: false, Help: "Write peer to disk as we learn about them", Default: DefaultWritePeers})
disableLoadPeers := parser.Flag("", "disable-load-peers", &argparse.Options{Required: false, Help: "Disable load peers from disk at startup", Default: DefaultDisableLoadPeers})
disableStartPrometheus := parser.Flag("", "disable-start-prometheus", &argparse.Options{Required: false, Help: "Disable start prometheus server", Default: DefaultDisableStartPrometheus})
disableStartUdp := parser.Flag("", "disable-start-udp", &argparse.Options{Required: false, Help: "Disable start UDP ping server", Default: DefaultDisableStartUDP})
disableWritePeers := parser.Flag("", "disable-write-peers", &argparse.Options{Required: false, Help: "Disable write peer to disk as we learn about them", Default: DefaultDisableWritePeers})
disableFederation := parser.Flag("", "disable-federation", &argparse.Options{Required: false, Help: "Disable server federation", Default: DefaultDisableFederation})
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
@ -120,24 +120,24 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
}
args := &Args{
CmdType: SearchCmd,
Host: *host,
Port: *port,
EsHost: *esHost,
EsPort: *esPort,
UDPPort: *udpPort,
PrometheusPort: *prometheusPort,
EsIndex: *esIndex,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
PeerFile: *peerFile,
Country: *country,
DisableEs: *disableEs,
Debug: *debug,
LoadPeers: *loadPeers,
StartPrometheus: *startPrometheus,
StartUDP: *startUdp,
WritePeers: *writePeers,
CmdType: SearchCmd,
Host: *host,
Port: *port,
EsHost: *esHost,
EsPort: *esPort,
PrometheusPort: *prometheusPort,
EsIndex: *esIndex,
RefreshDelta: *refreshDelta,
CacheTTL: *cacheTTL,
PeerFile: *peerFile,
Country: *country,
DisableEs: *disableEs,
Debug: *debug,
DisableLoadPeers: *disableLoadPeers,
DisableStartPrometheus: *disableStartPrometheus,
DisableStartUDP: *disableStartUdp,
DisableWritePeers: *disableWritePeers,
DisableFederation: *disableFederation,
}
if esHost, ok := environment["ELASTIC_HOST"]; ok {
@ -157,7 +157,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
}
/*
Verify no invalid argument combinations
Verify no invalid argument combinations
*/
if len(*channelIds) > 0 && *channelId != "" {
log.Fatal("Cannot specify both channel_id and channel_ids")

View file

@ -4,6 +4,8 @@ import (
"bufio"
"context"
"log"
"math"
"net"
"os"
"strings"
"sync/atomic"
@ -14,31 +16,31 @@ import (
"google.golang.org/grpc"
)
// FederatedServer hold relevant information about peers that we known about.
type FederatedServer struct {
Address string
Port string
Ts time.Time
// Peer holds relevant information about peers that we know about.
type Peer struct {
Address string
Port string
LastSeen time.Time
}
var (
localHosts = map[string]bool{
"127.0.0.1": true,
"0.0.0.0": true,
"0.0.0.0": true,
"localhost": true,
"<nil>": true, // Empty net.IP turned into a string
}
)
// peerKey takes a ServerMessage object and returns the key that for that peer
// peerKey takes a peer and returns the key that for that peer
// in our peer table.
func peerKey(msg *pb.ServerMessage) string {
return msg.Address + ":" + msg.Port
func peerKey(peer *Peer) string {
return peer.Address + ":" + peer.Port
}
// peerKey is a function on a FederatedServer struct to return the key for that
// peer is out peer table.
func (peer *FederatedServer) peerKey() string {
func (peer *Peer) peerKey() string {
return peer.Address + ":" + peer.Port
}
@ -66,6 +68,19 @@ func (s *Server) getNumSubs() int64 {
return *s.NumPeerSubs
}
// getAndSetExternalIp detects the server's external IP and stores it.
func (s *Server) getAndSetExternalIp(ip, port string) error {
pong, err := UDPPing(ip, port)
if err != nil {
return err
}
myIp := pong.DecodeAddress()
log.Println("my ip: ", myIp)
s.ExternalIP = myIp
return nil
}
// loadPeers takes the arguments given to the hub at startup and loads the
// previously known peers from disk and verifies their existence before
// storing them as known peers. Returns a map of peerKey -> object
@ -73,6 +88,32 @@ func (s *Server) loadPeers() error {
peerFile := s.Args.PeerFile
port := s.Args.Port
// First we make sure our server has come up, so we can answer back to peers.
var failures = 0
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
retry:
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
conn, err := grpc.DialContext(ctx,
"0.0.0.0:"+port,
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
if failures > 3 {
log.Println("Warning! Our endpoint doesn't seem to have come up, didn't load peers")
return err
}
failures += 1
goto retry
}
if err = conn.Close(); err != nil {
log.Println(err)
}
cancel()
f, err := os.Open(peerFile)
if err != nil {
log.Println(err)
@ -90,26 +131,30 @@ func (s *Server) loadPeers() error {
}
for _, line := range text {
ipPort := strings.Split(line,":")
ipPort := strings.Split(line, ":")
if len(ipPort) != 2 {
log.Println("Malformed entry in peer file")
continue
}
// If the peer is us, skip
log.Println(ipPort)
if ipPort[1] == port && localHosts[ipPort[0]] {
if ipPort[1] == port &&
(localHosts[ipPort[0]] || ipPort[0] == s.ExternalIP.String()) {
log.Println("Self peer, skipping ...")
continue
}
srvMsg := &pb.ServerMessage{
Address: ipPort[0],
Port: ipPort[1],
newPeer := &Peer{
Address: ipPort[0],
Port: ipPort[1],
LastSeen: time.Now(),
}
log.Printf("pinging peer %+v\n", srvMsg)
err := s.addPeer(srvMsg, true)
log.Printf("pinging peer %+v\n", newPeer)
err = s.addPeer(newPeer, true, true)
if err != nil {
log.Println(err)
}
}
log.Println("Returning from loadPeers")
@ -118,7 +163,7 @@ func (s *Server) loadPeers() error {
// subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers.
func (s *Server) subscribeToPeer(peer *FederatedServer) error {
func (s *Server) subscribeToPeer(peer *Peer) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@ -133,20 +178,18 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error {
defer conn.Close()
msg := &pb.ServerMessage{
Address: s.Args.Host,
Address: s.ExternalIP.String(),
Port: s.Args.Port,
}
c := pb.NewHubClient(conn)
log.Printf("%s:%s subscribing to %+v\n", s.Args.Host, s.Args.Port, peer)
log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer)
_, err = c.PeerSubscribe(ctx, msg)
if err != nil {
return err
}
s.Subscribed = true
return nil
}
@ -155,13 +198,13 @@ func (s *Server) subscribeToPeer(peer *FederatedServer) error {
// This is used to confirm existence of peers on start and let them
// know about us. Returns the response from the server on success,
// nil otherwise.
func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) {
func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
log.Println("In helloPeer")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx,
server.Address+":"+server.Port,
peer.Address+":"+peer.Port,
grpc.WithInsecure(),
grpc.WithBlock(),
)
@ -174,12 +217,12 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) {
c := pb.NewHubClient(conn)
msg := &pb.HelloMessage{
Port: s.Args.Port,
Host: s.Args.Host,
Port: s.Args.Port,
Host: s.ExternalIP.String(),
Servers: []*pb.ServerMessage{},
}
log.Printf("%s:%s saying hello to %+v\n", s.Args.Host, s.Args.Port, server)
log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer)
res, err := c.Hello(ctx, msg)
if err != nil {
log.Println(err)
@ -193,7 +236,7 @@ func (s *Server) helloPeer(server *FederatedServer) (*pb.HelloMessage, error) {
// writePeers writes our current known peers to disk.
func (s *Server) writePeers() {
if !s.Args.WritePeers {
if s.Args.DisableWritePeers {
return
}
f, err := os.Create(s.Args.PeerFile)
@ -222,8 +265,11 @@ func (s *Server) writePeers() {
}
// notifyPeer takes a peer to notify and a new peer we just learned about
// and calls AddPeer on the first.
func notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error {
// and informs the already known peer about the new peer.
func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
if s.Args.DisableFederation {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@ -254,12 +300,12 @@ func notifyPeer(peerToNotify *FederatedServer, newPeer *FederatedServer) error {
// notifyPeerSubs takes a new peer server we just learned about and notifies
// all the peers that have subscribed to us about it.
func (s *Server) notifyPeerSubs(newServer *FederatedServer) {
func (s *Server) notifyPeerSubs(newPeer *Peer) {
var unsubscribe []string
s.PeerSubsMut.RLock()
for key, peer := range s.PeerSubs {
log.Printf("Notifying peer %s of new node %+v\n", key, newServer)
err := notifyPeer(peer, newServer)
log.Printf("Notifying peer %s of new node %+v\n", key, newPeer)
err := s.notifyPeer(peer, newPeer)
if err != nil {
log.Println("Failed to send data to ", key)
log.Println(err)
@ -279,25 +325,36 @@ func (s *Server) notifyPeerSubs(newServer *FederatedServer) {
s.PeerSubsMut.Unlock()
}
// addPeer takes a new peer as a pb.ServerMessage, optionally checks to see
// if they're online, and adds them to our list of peer. If we're not currently
// subscribed to a peer, it will also subscribe to it.
func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error {
if s.Args.Port == msg.Port &&
(localHosts[msg.Address] || msg.Address == s.Args.Host) {
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.Args.Host, s.Args.Port)
// addPeer takes a new peer, optionally checks to see if they're online, and
// adds them to our list of peers. It will also optionally subscribe to it.
func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
if s.Args.DisableFederation {
return nil
}
k := peerKey(msg)
newServer := &FederatedServer{
Address: msg.Address,
Port: msg.Port,
Ts: time.Now(),
// First thing we get our external ip if we don't have it, otherwise we
// could end up subscribed to our self, which is silly.
nilIP := net.IP{}
localIP1 := net.IPv4(127, 0, 0, 1)
if s.ExternalIP.Equal(nilIP) || s.ExternalIP.Equal(localIP1) {
err := s.getAndSetExternalIp(newPeer.Address, newPeer.Port)
if err != nil {
log.Println(err)
log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host)
}
}
log.Printf("%s:%s adding peer %+v\n", s.Args.Host, s.Args.Port, msg)
if oldServer, loaded := s.PeerServersLoadOrStore(newServer); !loaded {
if s.Args.Port == newPeer.Port &&
(localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) {
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port)
return nil
}
k := peerKey(newPeer)
log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer)
if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded {
if ping {
_, err := s.helloPeer(newServer)
_, err := s.helloPeer(newPeer)
if err != nil {
s.PeerServersMut.Lock()
delete(s.PeerServers, k)
@ -309,26 +366,31 @@ func (s *Server) addPeer(msg *pb.ServerMessage, ping bool) error {
s.incNumPeers()
metrics.PeersKnown.Inc()
s.writePeers()
s.notifyPeerSubs(newServer)
s.notifyPeerSubs(newPeer)
// Subscribe to all our peers for now
err := s.subscribeToPeer(newServer)
if err != nil {
return err
} else {
s.Subscribed = true
if subscribe {
err := s.subscribeToPeer(newPeer)
if err != nil {
return err
}
}
} else {
oldServer.Ts = time.Now()
oldServer.LastSeen = time.Now()
}
return nil
}
// mergeFederatedServers is an internal convenience function to add a list of
// mergePeers is an internal convenience function to add a list of
// peers.
func (s *Server) mergeFederatedServers(servers []*pb.ServerMessage) {
func (s *Server) mergePeers(servers []*pb.ServerMessage) {
for _, srvMsg := range servers {
err := s.addPeer(srvMsg, false)
newPeer := &Peer{
Address: srvMsg.Address,
Port: srvMsg.Port,
LastSeen: time.Now(),
}
err := s.addPeer(newPeer, false, true)
// This shouldn't happen because we're not pinging them.
if err != nil {
log.Println(err)
@ -345,14 +407,14 @@ func (s *Server) makeHelloMessage() *pb.HelloMessage {
for _, peer := range s.PeerServers {
servers = append(servers, &pb.ServerMessage{
Address: peer.Address,
Port: peer.Port,
Port: peer.Port,
})
}
s.PeerServersMut.RUnlock()
return &pb.HelloMessage{
Port: s.Args.Port,
Host: s.Args.Host,
Port: s.Args.Port,
Host: s.ExternalIP.String(),
Servers: servers,
}
}

View file

@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log"
"net"
"os"
"strings"
"testing"
@ -44,24 +45,23 @@ func removeFile(fileName string) {
func makeDefaultArgs() *Args {
args := &Args{
CmdType: ServeCmd,
Host: DefaultHost,
Port: DefaultPort,
EsHost: DefaultEsHost,
EsPort: DefaultEsPort,
UDPPort: DefaultUdpPort,
PrometheusPort: DefaultPrometheusPort,
EsIndex: DefaultEsIndex,
RefreshDelta: DefaultRefreshDelta,
CacheTTL: DefaultCacheTTL,
PeerFile: DefaultPeerFile,
Country: DefaultCountry,
DisableEs: true,
Debug: true,
LoadPeers: false,
StartPrometheus: false,
StartUDP: false,
WritePeers: false,
CmdType: ServeCmd,
Host: DefaultHost,
Port: DefaultPort,
EsHost: DefaultEsHost,
EsPort: DefaultEsPort,
PrometheusPort: DefaultPrometheusPort,
EsIndex: DefaultEsIndex,
RefreshDelta: DefaultRefreshDelta,
CacheTTL: DefaultCacheTTL,
PeerFile: DefaultPeerFile,
Country: DefaultCountry,
DisableEs: true,
Debug: true,
DisableLoadPeers: true,
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
}
return args
@ -75,7 +75,7 @@ func TestAddPeer(t *testing.T) {
tests := []struct {
name string
want int
} {
}{
{
name: "Add 10 peers",
want: 10,
@ -87,27 +87,27 @@ func TestAddPeer(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T){
t.Run(tt.name, func(t *testing.T) {
server := MakeHubServer(ctx, args)
server.Subscribed = true
server.ExternalIP = net.IPv4(0, 0, 0, 0)
metrics.PeersKnown.Set(0)
for i := 0; i < 10; i++ {
var msg *pb.ServerMessage
var peer *Peer
if strings.Contains(tt.name, "1 unique") {
msg = &pb.ServerMessage{
peer = &Peer{
Address: "1.1.1.1",
Port: "50051",
}
} else {
x := i + 1
msg = &pb.ServerMessage{
peer = &Peer{
Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x),
Port: "50051",
}
}
//log.Printf("Adding peer %+v\n", msg)
err := server.addPeer(msg, false)
err := server.addPeer(peer, false, false)
if err != nil {
log.Println(err)
}
@ -129,12 +129,12 @@ func TestAddPeer(t *testing.T) {
func TestPeerWriter(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args.WritePeers = true
args.DisableWritePeers = false
tests := []struct {
name string
want int
} {
}{
{
name: "Add 10 peers",
want: 10,
@ -146,26 +146,26 @@ func TestPeerWriter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T){
t.Run(tt.name, func(t *testing.T) {
server := MakeHubServer(ctx, args)
server.Subscribed = true
server.ExternalIP = net.IPv4(0, 0, 0, 0)
for i := 0; i < 10; i++ {
var msg *pb.ServerMessage
var peer *Peer
if strings.Contains(tt.name, "1 unique") {
msg = &pb.ServerMessage{
peer = &Peer{
Address: "1.1.1.1",
Port: "50051",
}
} else {
x := i + 1
msg = &pb.ServerMessage{
peer = &Peer{
Address: fmt.Sprintf("%d.%d.%d.%d", x, x, x, x),
Port: "50051",
}
}
//log.Printf("Adding peer %+v\n", msg)
err := server.addPeer(msg, false)
//log.Printf("Adding peer %+v\n", peer)
err := server.addPeer(peer, false, false)
if err != nil {
log.Println(err)
}
@ -188,12 +188,11 @@ func TestAddPeerEndpoint(t *testing.T) {
args2 := makeDefaultArgs()
args2.Port = "50052"
tests := []struct {
name string
name string
wantServerOne int64
wantServerTwo int64
} {
}{
{
// outside -> server1.AddPeer(server2, ping=true) : server1 = 1, server2 = 0
// server1 -> server2.Hello(server1) : server1 = 1, server2 = 0
@ -204,14 +203,14 @@ func TestAddPeerEndpoint(t *testing.T) {
// server1 -> server2.AddPeer(server2) : server1 = 1, server2 = 1
// server2 self peer, skipping : server1 = 1, server2 = 1
// server1 -> server2.PeerSubscribe(server1) : server1 = 1, server2 = 1
name: "Add 1 peer",
name: "Add 1 peer",
wantServerOne: 1,
wantServerTwo: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T){
t.Run(tt.name, func(t *testing.T) {
server := MakeHubServer(ctx, args)
server2 := MakeHubServer(ctx, args2)
metrics.PeersKnown.Set(0)
@ -262,15 +261,14 @@ func TestAddPeerEndpoint2(t *testing.T) {
args2.Port = "50052"
args3.Port = "50053"
tests := []struct {
name string
name string
wantServerOne int64
wantServerTwo int64
wantServerThree int64
} {
}{
{
name: "Add 2 peers",
name: "Add 2 peers",
wantServerOne: 2,
wantServerTwo: 2,
wantServerThree: 2,
@ -278,7 +276,7 @@ func TestAddPeerEndpoint2(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T){
t.Run(tt.name, func(t *testing.T) {
server := MakeHubServer(ctx, args)
server2 := MakeHubServer(ctx, args2)
server3 := MakeHubServer(ctx, args3)
@ -335,7 +333,6 @@ func TestAddPeerEndpoint2(t *testing.T) {
}
// TestAddPeerEndpoint3 tests the ability to add peers
func TestAddPeerEndpoint3(t *testing.T) {
ctx := context.Background()
@ -345,15 +342,14 @@ func TestAddPeerEndpoint3(t *testing.T) {
args2.Port = "50052"
args3.Port = "50053"
tests := []struct {
name string
name string
wantServerOne int64
wantServerTwo int64
wantServerThree int64
} {
}{
{
name: "Add 1 peer to each",
name: "Add 1 peer to each",
wantServerOne: 2,
wantServerTwo: 2,
wantServerThree: 2,
@ -361,7 +357,7 @@ func TestAddPeerEndpoint3(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T){
t.Run(tt.name, func(t *testing.T) {
server := MakeHubServer(ctx, args)
server2 := MakeHubServer(ctx, args2)
server3 := MakeHubServer(ctx, args3)
@ -425,3 +421,58 @@ func TestAddPeerEndpoint3(t *testing.T) {
}
}
// TestAddPeer tests the ability to add peers
func TestUDPServer(t *testing.T) {
ctx := context.Background()
args := makeDefaultArgs()
args.DisableStartUDP = false
args2 := makeDefaultArgs()
args2.Port = "50052"
args2.DisableStartUDP = false
tests := []struct {
name string
want string
}{
{
name: "hubs server external ip",
want: "127.0.0.1",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := MakeHubServer(ctx, args)
server2 := MakeHubServer(ctx, args2)
go server.Run()
go server2.Run()
metrics.PeersKnown.Set(0)
peer := &Peer{
Address: "0.0.0.0",
Port: "50052",
}
err := server.addPeer(peer, true, true)
if err != nil {
log.Println(err)
}
server.GrpcServer.GracefulStop()
server2.GrpcServer.GracefulStop()
got1 := server.ExternalIP.String()
if got1 != tt.want {
t.Errorf("server.ExternalIP = %s, want %s\n", got1, tt.want)
t.Errorf("server.Args.Port = %s\n", server.Args.Port)
}
got2 := server2.ExternalIP.String()
if got2 != tt.want {
t.Errorf("server2.ExternalIP = %s, want %s\n", got2, tt.want)
t.Errorf("server2.Args.Port = %s\n", server2.Args.Port)
}
})
}
}

View file

@ -275,16 +275,16 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
setPageVars(in, &pageSize, &from)
/*
cache based on search request params
include from value and number of results.
When another search request comes in with same search params
and same or increased offset (which we currently don't even use?)
that will be a cache hit.
FIXME: For now the cache is turned off when in debugging mode
(for unit tests) because it breaks on some of them.
FIXME: Currently the cache just skips the initial search,
the mgets and post processing are still done. There's probably
a more efficient way to store the final result.
cache based on search request params
include from value and number of results.
When another search request comes in with same search params
and same or increased offset (which we currently don't even use?)
that will be a cache hit.
FIXME: For now the cache is turned off when in debugging mode
(for unit tests) because it breaks on some of them.
FIXME: Currently the cache just skips the initial search,
the mgets and post processing are still done. There's probably
a more efficient way to store the final result.
*/
if val, err := s.QueryCache.Get(cacheKey); err != nil {
@ -518,15 +518,15 @@ func (s *Server) setupEsQuery(
}
replacements := map[string]string{
"name": "normalized_name",
"normalized": "normalized_name",
"claim_name": "normalized_name",
"txid": "tx_id",
"nout": "tx_nout",
"reposted": "repost_count",
"name": "normalized_name",
"normalized": "normalized_name",
"claim_name": "normalized_name",
"txid": "tx_id",
"nout": "tx_nout",
"reposted": "repost_count",
"valid_channel_signature": "is_signature_valid",
"claim_id": "_id",
"signature_digest": "signature",
"claim_id": "_id",
"signature_digest": "signature",
}
textFields := map[string]bool{
@ -967,4 +967,3 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B
return newHits, blockedHits, blockedChannels
}

View file

@ -35,17 +35,16 @@ type Server struct {
LastRefreshCheck time.Time
RefreshDelta time.Duration
NumESRefreshes int64
PeerServers map[string]*FederatedServer
PeerServers map[string]*Peer
PeerServersMut sync.RWMutex
NumPeerServers *int64
PeerSubs map[string]*FederatedServer
PeerSubs map[string]*Peer
PeerSubsMut sync.RWMutex
NumPeerSubs *int64
Subscribed bool
ExternalIP net.IP
pb.UnimplementedHubServer
}
func getVersion() string {
return meta.Version
}
@ -89,7 +88,7 @@ func getVersion() string {
'blockchain.address.unsubscribe'
*/
func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
func (s *Server) PeerSubsLoadOrStore(peer *Peer) (actual *Peer, loaded bool) {
key := peer.peerKey()
s.PeerSubsMut.RLock()
if actual, ok := s.PeerSubs[key]; ok {
@ -104,7 +103,7 @@ func (s *Server) PeerSubsLoadOrStore(peer *FederatedServer) (actual *FederatedSe
}
}
func (s *Server) PeerServersLoadOrStore(peer *FederatedServer) (actual *FederatedServer, loaded bool) {
func (s *Server) PeerServersLoadOrStore(peer *Peer) (actual *Peer, loaded bool) {
key := peer.peerKey()
s.PeerServersMut.RLock()
if actual, ok := s.PeerServers[key]; ok {
@ -196,20 +195,20 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
LastRefreshCheck: time.Now(),
RefreshDelta: refreshDelta,
NumESRefreshes: 0,
PeerServers: make(map[string]*FederatedServer),
PeerServers: make(map[string]*Peer),
PeerServersMut: sync.RWMutex{},
NumPeerServers: numPeers,
PeerSubs: make(map[string]*FederatedServer),
PeerSubs: make(map[string]*Peer),
PeerSubsMut: sync.RWMutex{},
NumPeerSubs: numSubs,
Subscribed: false,
ExternalIP: net.IPv4(127, 0, 0, 1),
}
// Start up our background services
if args.StartPrometheus {
if !args.DisableStartPrometheus {
go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics")
}
if args.StartUDP {
if !args.DisableStartUDP {
go func() {
err := UDPServer(args)
if err != nil {
@ -218,11 +217,13 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
}()
}
// Load peers from disk and subscribe to one if there are any
if args.LoadPeers {
err = s.loadPeers()
if err != nil {
log.Println(err)
}
if !args.DisableLoadPeers {
go func() {
err := s.loadPeers()
if err != nil {
log.Println(err)
}
}()
}
return s
@ -244,21 +245,21 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes
metrics.RequestsCount.With(prometheus.Labels{"method": "hello"}).Inc()
port := args.Port
host := args.Host
server := &FederatedServer{
Address: host,
Port: port,
Ts: time.Now(),
newPeer := &Peer{
Address: host,
Port: port,
LastSeen: time.Now(),
}
log.Println(server)
log.Println(newPeer)
err := s.addPeer(&pb.ServerMessage{Address: host, Port: port}, false)
err := s.addPeer(newPeer, false, true)
// They just contacted us, so this shouldn't happen
if err != nil {
log.Println(err)
}
s.mergeFederatedServers(args.Servers)
s.mergePeers(args.Servers)
s.writePeers()
s.notifyPeerSubs(server)
s.notifyPeerSubs(newPeer)
return s.makeHelloMessage(), nil
}
@ -268,10 +269,10 @@ func (s *Server) Hello(ctx context.Context, args *pb.HelloMessage) (*pb.HelloMes
func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "peer_subscribe"}).Inc()
var msg = "Success"
peer := &FederatedServer{
Address: in.Address,
Port: in.Port,
Ts: time.Now(),
peer := &Peer{
Address: in.Address,
Port: in.Port,
LastSeen: time.Now(),
}
if _, loaded := s.PeerSubsLoadOrStore(peer); !loaded {
@ -288,7 +289,12 @@ func (s *Server) PeerSubscribe(ctx context.Context, in *pb.ServerMessage) (*pb.S
func (s *Server) AddPeer(ctx context.Context, args *pb.ServerMessage) (*pb.StringValue, error) {
metrics.RequestsCount.With(prometheus.Labels{"method": "add_peer"}).Inc()
var msg = "Success"
err := s.addPeer(args, true)
newPeer := &Peer{
Address: args.Address,
Port: args.Port,
LastSeen: time.Now(),
}
err := s.addPeer(newPeer, true, true)
if err != nil {
log.Println(err)
msg = "Failed"

View file

@ -2,19 +2,25 @@ package server
import (
"encoding/binary"
"fmt"
"net"
"strconv"
"strings"
"time"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/lbry.go/v2/extras/errors"
)
const maxBufferSize = 1024
const maxBufferSize = 1024
// genesis blocktime (which is actually wrong)
const magic = 1446058291
// magic constant for the UDPPing protocol. The above comment is taken from
// the python code this was implemented off of.
// https://github.com/lbryio/lbry-sdk/blob/7d49b046d44a4b7067d5dc1d6cd65ff0475c71c8/lbry/wallet/server/udp.py#L12
const magic = 1446058291
const protocolVersion = 1
const defaultFlags = 0b00000000
const availableFlag = 0b00000001
// SPVPing is a struct for the format of how to ping another hub over udp.
// format b'!lB64s'
@ -31,7 +37,7 @@ type SPVPong struct {
flags byte
height uint32
tip []byte // 32
srcAddrRaw []byte // 4
srcAddrRaw []byte // 4
country uint16
}
@ -55,7 +61,7 @@ func decodeSPVPing(data []byte) *SPVPing {
parsedMagic := binary.BigEndian.Uint32(data)
parsedProtocalVersion := data[4]
return &SPVPing{
magic: parsedMagic,
magic: parsedMagic,
version: parsedProtocalVersion,
}
}
@ -65,7 +71,7 @@ func decodeSPVPing(data []byte) *SPVPing {
func (pong *SPVPong) Encode() []byte {
data := make([]byte, 44)
data[0] = pong.protocolVersion
data[0] = pong.protocolVersion
data[1] = pong.flags
binary.BigEndian.PutUint32(data[2:], pong.height)
copy(data[6:], pong.tip)
@ -76,10 +82,13 @@ func (pong *SPVPong) Encode() []byte {
}
// makeSPVPong creates an SPVPong struct according to given parameters.
// FIXME: Currently, does not correctly encode the country.
func makeSPVPong(flags int, height int, tip []byte, sourceAddr string, country string) *SPVPong {
byteAddr := EncodeAddress(sourceAddr)
countryInt := 1
var countryInt int32
var ok bool
if countryInt, ok = pb.Location_Country_value[country]; !ok {
countryInt = int32(pb.Location_UNKNOWN_COUNTRY)
}
return &SPVPong{
protocolVersion: protocolVersion,
flags: byte(flags),
@ -99,19 +108,19 @@ func decodeSPVPong(data []byte) *SPVPong {
parsedProtocalVersion := data[0]
flags := data[1]
height := binary.BigEndian.Uint32(data[:2])
height := binary.BigEndian.Uint32(data[2:])
tip := make([]byte, 32)
copy(tip, data[6:38])
srcRawAddr := make([]byte, 4)
copy(srcRawAddr, data[38:42])
country := binary.BigEndian.Uint16(data[:42])
country := binary.BigEndian.Uint16(data[42:])
return &SPVPong{
protocolVersion: parsedProtocalVersion,
flags: flags,
height: height,
tip: tip,
srcAddrRaw: srcRawAddr,
country: country,
flags: flags,
height: height,
tip: tip,
srcAddrRaw: srcRawAddr,
country: country,
}
}
@ -137,8 +146,8 @@ func EncodeAddress(addr string) []byte {
}
// DecodeAddress gets the string ipv4 address from an SPVPong struct.
func (pong *SPVPong) DecodeAddress() string {
return fmt.Sprintf("%d.%d.%d.%d",
func (pong *SPVPong) DecodeAddress() net.IP {
return net.IPv4(
pong.srcAddrRaw[0],
pong.srcAddrRaw[1],
pong.srcAddrRaw[2],
@ -146,53 +155,72 @@ func (pong *SPVPong) DecodeAddress() string {
)
}
func (pong *SPVPong) DecodeCountry() string {
return pb.Location_Country_name[int32(pong.country)]
}
func (pong *SPVPong) DecodeProtocolVersion() int {
return int(pong.protocolVersion)
}
func (pong *SPVPong) DecodeHeight() int {
return int(pong.height)
}
func (pong *SPVPong) DecodeTip() []byte {
return pong.tip
}
func (pong *SPVPong) DecodeFlags() byte {
return pong.flags
}
// UDPPing sends a ping over udp to another hub and returns the ip address of
// this hub.
func UDPPing(address string) (string, error) {
func UDPPing(ip, port string) (*SPVPong, error) {
address := ip + ":" + port
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return "", err
return nil, err
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return "", err
return nil, err
}
defer conn.Close()
_, err = conn.Write(encodeSPVPing())
if err != nil {
return "", err
return nil, err
}
buffer := make([]byte, maxBufferSize)
deadline := time.Now().Add(time.Second)
err = conn.SetReadDeadline(deadline)
if err != nil {
return "", err
return nil, err
}
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
return "", err
return nil, err
}
pong := decodeSPVPong(buffer[:n])
if pong == nil {
return "", errors.Base("Pong decoding failed")
return nil, errors.Base("Pong decoding failed")
}
myAddr := pong.DecodeAddress()
return myAddr, nil
return pong, nil
}
// UDPServer is a goroutine that starts an udp server that implements the hubs
// Ping/Pong protocol to find out about each other without making full TCP
// connections.
func UDPServer(args *Args) error {
address := ":" + args.UDPPort
address := ":" + args.Port
tip := make([]byte, 32)
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
@ -215,7 +243,7 @@ func UDPServer(args *Args) error {
}
sAddr := addr.IP.String()
pong := makeSPVPong(0,0, tip, sAddr, args.Country)
pong := makeSPVPong(defaultFlags|availableFlag, 0, tip, sAddr, args.Country)
data := pong.Encode()
_, err = conn.WriteToUDP(data, addr)

83
server/udp_test.go Normal file
View file

@ -0,0 +1,83 @@
package server
import (
"log"
"os/exec"
"strings"
"testing"
)
// TestUDPPing tests UDPPing correctness against prod server.
func TestUDPPing(t *testing.T) {
args := makeDefaultArgs()
args.DisableStartUDP = true
tests := []struct {
name string
wantIP string
wantCountry string
wantProtocolVersion int
wantHeightMin int
wantFlags byte
}{
{
name: "Correctly parse information from production server.",
wantIP: "SETME",
wantCountry: "US",
wantProtocolVersion: 1,
wantHeightMin: 1060000,
wantFlags: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
toAddr := "spv16.lbry.com"
toPort := "50001"
pong, err := UDPPing(toAddr, toPort)
gotCountry := pong.DecodeCountry()
if err != nil {
log.Println(err)
}
res, err := exec.Command("dig", "@resolver4.opendns.com", "myip.opendns.com", "+short").Output()
if err != nil {
log.Println(err)
}
digIP := strings.TrimSpace(string(res))
udpIP := pong.DecodeAddress().String()
tt.wantIP = digIP
log.Println("Height:", pong.DecodeHeight())
log.Printf("Flags: %x\n", pong.DecodeFlags())
log.Println("ProtocolVersion:", pong.DecodeProtocolVersion())
log.Printf("Tip: %x\n", pong.DecodeTip())
gotHeight := pong.DecodeHeight()
gotProtocolVersion := pong.DecodeProtocolVersion()
gotFlags := pong.DecodeFlags()
gotIP := udpIP
if gotIP != tt.wantIP {
t.Errorf("ip: got: '%s', want: '%s'\n", gotIP, tt.wantIP)
}
if gotCountry != tt.wantCountry {
t.Errorf("country: got: '%s', want: '%s'\n", gotCountry, tt.wantCountry)
}
if gotHeight < tt.wantHeightMin {
t.Errorf("height: got: %d, want >=: %d\n", gotHeight, tt.wantHeightMin)
}
if gotProtocolVersion != tt.wantProtocolVersion {
t.Errorf("protocolVersion: got: %d, want: %d\n", gotProtocolVersion, tt.wantProtocolVersion)
}
if gotFlags != tt.wantFlags {
t.Errorf("flags: got: %d, want: %d\n", gotFlags, tt.wantFlags)
}
})
}
}