herald.go/server/federation.go

423 lines
9.5 KiB
Go
Raw Permalink Normal View History

package server
import (
"bufio"
"context"
"log"
"math"
2021-11-15 14:52:32 +01:00
"net"
"os"
"strings"
"sync/atomic"
"time"
"github.com/lbryio/herald.go/internal/metrics"
pb "github.com/lbryio/herald.go/protobuf/go"
"google.golang.org/grpc"
)
// Peer holds relevant information about peers that we know about.
type Peer struct {
2021-12-03 17:52:21 +01:00
Address string
Port string
LastSeen time.Time
}
var (
localHosts = map[string]bool{
"127.0.0.1": true,
2021-11-25 00:24:06 +01:00
"0.0.0.0": true,
"localhost": true,
"<nil>": true, // Empty net.IP turned into a string
}
)
// peerKey takes a peer and returns the key that for that peer
// in our peer table.
func peerKey(peer *Peer) string {
return peer.Address + ":" + peer.Port
}
// peerKey is a function on a FederatedServer struct to return the key for that
// peer is out peer table.
func (peer *Peer) peerKey() string {
return peer.Address + ":" + peer.Port
}
func (s *Server) incNumPeers() {
atomic.AddInt64(s.NumPeerServers, 1)
}
func (s *Server) decNumPeers() {
atomic.AddInt64(s.NumPeerServers, -1)
}
func (s *Server) getNumPeers() int64 {
return *s.NumPeerServers
}
func (s *Server) incNumSubs() {
atomic.AddInt64(s.NumPeerSubs, 1)
}
func (s *Server) decNumSubs() {
atomic.AddInt64(s.NumPeerSubs, -1)
}
func (s *Server) getNumSubs() int64 {
return *s.NumPeerSubs
}
2021-12-03 17:52:21 +01:00
// getAndSetExternalIp detects the server's external IP and stores it.
func (s *Server) getAndSetExternalIp(ip, port string) error {
pong, err := UDPPing(ip, port)
if err != nil {
return err
}
myIp := pong.DecodeAddress()
log.Println("my ip: ", myIp)
s.ExternalIP = myIp
return nil
}
// loadPeers takes the arguments given to the hub at startup and loads the
// previously known peers from disk and verifies their existence before
// storing them as known peers. Returns a map of peerKey -> object
func (s *Server) loadPeers() error {
peerFile := s.Args.PeerFile
port := s.Args.Port
// First we make sure our server has come up, so we can answer back to peers.
var failures = 0
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
rocksdb (#29) * Initial rocksdb commit Basic reading from rocksdb works * Try github action thing * try local dockerfile * asdf * qwer * asdf * Try adding test db with git-lfs * update action * cleanup * Don't hardcode stop on read * Progress of reading rocksdb * fixes and arg test * asdf * Fix rocksdb iterator and tests * update script * asdf * Better iterator. Need to implement a lot of keys next, and tests, maybe tests needed. * asdf * asdf * asdf * Implementation, testing, and cleanup. Implemented more prefixes. Figured out a good test that should work for all prefixes. Removed binary databases so we can just store human readable csv files. * more tests, prefixes and small refactor * Another prefix * EffectiveAmount * ActiveAmount * ActivatedClaimAndSupport * PendingActivation * ClaimTakeover * ClaimExpiration * SupportToClaim * ClaimToSupport * Fix bug with variable length keys * ChannelToClaim * ClaimToChannel * ClaimShortID * TXOToClaim * ClaimToTXO * BlockHeader * BlockHash * Undo * HashXHistory * Tx and big refactor * rest the the keys * Refactor and starting to add resolve * asdf * Refactor tests and add column families * changes * more work on implementing resolve * code cleanup, function tests * small code refactoring * start building pieces of the test data set for full resolve. * Export constant, add test * another test * TestGetTxHash * more tests * more tests * More tests * Refactor db functions into three files * added slice backed stack, need to fix tests * fix some issues with test suite * some cleanup and adding arguments and db load / refresh to server command * fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy * logrus, protobuf updates, resolve grpc endpoint * don't run integration test with unit tests * signal handling and cleanup functions * signal handling code files * Unit tests for db stack * reorganize bisect function so we lock it properly * fix txcounts loading * cleanup some logic around iterators and fix a bug where I was running two detect changes threads * add some metrics * cleanup * blocking and filtering implemented * add params for blocking and filtering channels and streams * updates and fixes for integration tests * use newer version of lbry.go when possible * Add height endpoint and move string functions internal * remove gitattributes, unused * some cleanup * more cleanup / refactor. almost ready for another review * More cleanup * use chainhash.Hash types from lbcd where appropriate * update github action to go-1.17.8 * update go version needed * trying to fix these builds * cleanup * trying to fix memory leak * fix memory leak (iterator never finished so cleanup didn't run) * changes per code review * remove lbry.go v2 * rename sort.go search.go * fix test
2022-04-29 17:04:01 +02:00
// log.Println("loadPeers #### waiting for server to come up")
retry:
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
conn, err := grpc.DialContext(ctx,
"0.0.0.0:"+port,
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
if failures > 3 {
log.Println("Warning! Our endpoint doesn't seem to have come up, didn't load peers")
return err
}
failures += 1
goto retry
}
if err = conn.Close(); err != nil {
log.Println(err)
}
cancel()
rocksdb (#29) * Initial rocksdb commit Basic reading from rocksdb works * Try github action thing * try local dockerfile * asdf * qwer * asdf * Try adding test db with git-lfs * update action * cleanup * Don't hardcode stop on read * Progress of reading rocksdb * fixes and arg test * asdf * Fix rocksdb iterator and tests * update script * asdf * Better iterator. Need to implement a lot of keys next, and tests, maybe tests needed. * asdf * asdf * asdf * Implementation, testing, and cleanup. Implemented more prefixes. Figured out a good test that should work for all prefixes. Removed binary databases so we can just store human readable csv files. * more tests, prefixes and small refactor * Another prefix * EffectiveAmount * ActiveAmount * ActivatedClaimAndSupport * PendingActivation * ClaimTakeover * ClaimExpiration * SupportToClaim * ClaimToSupport * Fix bug with variable length keys * ChannelToClaim * ClaimToChannel * ClaimShortID * TXOToClaim * ClaimToTXO * BlockHeader * BlockHash * Undo * HashXHistory * Tx and big refactor * rest the the keys * Refactor and starting to add resolve * asdf * Refactor tests and add column families * changes * more work on implementing resolve * code cleanup, function tests * small code refactoring * start building pieces of the test data set for full resolve. * Export constant, add test * another test * TestGetTxHash * more tests * more tests * More tests * Refactor db functions into three files * added slice backed stack, need to fix tests * fix some issues with test suite * some cleanup and adding arguments and db load / refresh to server command * fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy * logrus, protobuf updates, resolve grpc endpoint * don't run integration test with unit tests * signal handling and cleanup functions * signal handling code files * Unit tests for db stack * reorganize bisect function so we lock it properly * fix txcounts loading * cleanup some logic around iterators and fix a bug where I was running two detect changes threads * add some metrics * cleanup * blocking and filtering implemented * add params for blocking and filtering channels and streams * updates and fixes for integration tests * use newer version of lbry.go when possible * Add height endpoint and move string functions internal * remove gitattributes, unused * some cleanup * more cleanup / refactor. almost ready for another review * More cleanup * use chainhash.Hash types from lbcd where appropriate * update github action to go-1.17.8 * update go version needed * trying to fix these builds * cleanup * trying to fix memory leak * fix memory leak (iterator never finished so cleanup didn't run) * changes per code review * remove lbry.go v2 * rename sort.go search.go * fix test
2022-04-29 17:04:01 +02:00
// log.Println("loadPeers #### Past checking for server to come up")
f, err := os.Open(peerFile)
if err != nil {
rocksdb (#29) * Initial rocksdb commit Basic reading from rocksdb works * Try github action thing * try local dockerfile * asdf * qwer * asdf * Try adding test db with git-lfs * update action * cleanup * Don't hardcode stop on read * Progress of reading rocksdb * fixes and arg test * asdf * Fix rocksdb iterator and tests * update script * asdf * Better iterator. Need to implement a lot of keys next, and tests, maybe tests needed. * asdf * asdf * asdf * Implementation, testing, and cleanup. Implemented more prefixes. Figured out a good test that should work for all prefixes. Removed binary databases so we can just store human readable csv files. * more tests, prefixes and small refactor * Another prefix * EffectiveAmount * ActiveAmount * ActivatedClaimAndSupport * PendingActivation * ClaimTakeover * ClaimExpiration * SupportToClaim * ClaimToSupport * Fix bug with variable length keys * ChannelToClaim * ClaimToChannel * ClaimShortID * TXOToClaim * ClaimToTXO * BlockHeader * BlockHash * Undo * HashXHistory * Tx and big refactor * rest the the keys * Refactor and starting to add resolve * asdf * Refactor tests and add column families * changes * more work on implementing resolve * code cleanup, function tests * small code refactoring * start building pieces of the test data set for full resolve. * Export constant, add test * another test * TestGetTxHash * more tests * more tests * More tests * Refactor db functions into three files * added slice backed stack, need to fix tests * fix some issues with test suite * some cleanup and adding arguments and db load / refresh to server command * fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy * logrus, protobuf updates, resolve grpc endpoint * don't run integration test with unit tests * signal handling and cleanup functions * signal handling code files * Unit tests for db stack * reorganize bisect function so we lock it properly * fix txcounts loading * cleanup some logic around iterators and fix a bug where I was running two detect changes threads * add some metrics * cleanup * blocking and filtering implemented * add params for blocking and filtering channels and streams * updates and fixes for integration tests * use newer version of lbry.go when possible * Add height endpoint and move string functions internal * remove gitattributes, unused * some cleanup * more cleanup / refactor. almost ready for another review * More cleanup * use chainhash.Hash types from lbcd where appropriate * update github action to go-1.17.8 * update go version needed * trying to fix these builds * cleanup * trying to fix memory leak * fix memory leak (iterator never finished so cleanup didn't run) * changes per code review * remove lbry.go v2 * rename sort.go search.go * fix test
2022-04-29 17:04:01 +02:00
// log.Println(err)
return err
}
scanner := bufio.NewScanner(f)
scanner.Split(bufio.ScanLines)
var text []string
for scanner.Scan() {
text = append(text, scanner.Text())
}
err = f.Close()
if err != nil {
log.Println("peer file failed to close: ", err)
}
for _, line := range text {
2021-11-25 00:24:06 +01:00
ipPort := strings.Split(line, ":")
if len(ipPort) != 2 {
log.Println("Malformed entry in peer file")
continue
}
// If the peer is us, skip
log.Println(ipPort)
if ipPort[1] == port &&
2021-11-15 14:52:32 +01:00
(localHosts[ipPort[0]] || ipPort[0] == s.ExternalIP.String()) {
log.Println("Self peer, skipping ...")
continue
}
newPeer := &Peer{
2021-12-03 17:52:21 +01:00
Address: ipPort[0],
Port: ipPort[1],
LastSeen: time.Now(),
}
log.Printf("pinging peer %+v\n", newPeer)
err = s.addPeer(newPeer, true, true)
if err != nil {
log.Println(err)
}
}
log.Println("Returning from loadPeers")
return nil
}
// subscribeToPeer subscribes us to a peer to we'll get updates about their
// known peers.
func (s *Server) subscribeToPeer(peer *Peer) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx,
peer.Address+":"+peer.Port,
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
return err
}
defer conn.Close()
msg := &pb.ServerMessage{
2021-11-15 14:52:32 +01:00
Address: s.ExternalIP.String(),
Port: s.Args.Port,
}
c := pb.NewHubClient(conn)
log.Printf("%s:%s subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer)
_, err = c.PeerSubscribe(ctx, msg)
if err != nil {
return err
}
return nil
}
// helloPeer takes a peer to say hello to and sends a hello message
// containing all the peers we know about and information about us.
// This is used to confirm existence of peers on start and let them
// know about us. Returns the response from the server on success,
// nil otherwise.
func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
log.Println("In helloPeer")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx,
peer.Address+":"+peer.Port,
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
log.Println(err)
return nil, err
}
defer conn.Close()
c := pb.NewHubClient(conn)
msg := &pb.HelloMessage{
2021-11-25 00:24:06 +01:00
Port: s.Args.Port,
Host: s.ExternalIP.String(),
Servers: []*pb.ServerMessage{},
}
log.Printf("%s:%s saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer)
res, err := c.Hello(ctx, msg)
if err != nil {
log.Println(err)
return nil, err
}
log.Println(res)
return res, nil
}
2021-10-29 23:27:27 +02:00
// writePeers writes our current known peers to disk.
func (s *Server) writePeers() {
2021-11-25 00:56:34 +01:00
if s.Args.DisableWritePeers {
return
}
f, err := os.Create(s.Args.PeerFile)
if err != nil {
log.Println(err)
return
}
writer := bufio.NewWriter(f)
rocksdb (#29) * Initial rocksdb commit Basic reading from rocksdb works * Try github action thing * try local dockerfile * asdf * qwer * asdf * Try adding test db with git-lfs * update action * cleanup * Don't hardcode stop on read * Progress of reading rocksdb * fixes and arg test * asdf * Fix rocksdb iterator and tests * update script * asdf * Better iterator. Need to implement a lot of keys next, and tests, maybe tests needed. * asdf * asdf * asdf * Implementation, testing, and cleanup. Implemented more prefixes. Figured out a good test that should work for all prefixes. Removed binary databases so we can just store human readable csv files. * more tests, prefixes and small refactor * Another prefix * EffectiveAmount * ActiveAmount * ActivatedClaimAndSupport * PendingActivation * ClaimTakeover * ClaimExpiration * SupportToClaim * ClaimToSupport * Fix bug with variable length keys * ChannelToClaim * ClaimToChannel * ClaimShortID * TXOToClaim * ClaimToTXO * BlockHeader * BlockHash * Undo * HashXHistory * Tx and big refactor * rest the the keys * Refactor and starting to add resolve * asdf * Refactor tests and add column families * changes * more work on implementing resolve * code cleanup, function tests * small code refactoring * start building pieces of the test data set for full resolve. * Export constant, add test * another test * TestGetTxHash * more tests * more tests * More tests * Refactor db functions into three files * added slice backed stack, need to fix tests * fix some issues with test suite * some cleanup and adding arguments and db load / refresh to server command * fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy * logrus, protobuf updates, resolve grpc endpoint * don't run integration test with unit tests * signal handling and cleanup functions * signal handling code files * Unit tests for db stack * reorganize bisect function so we lock it properly * fix txcounts loading * cleanup some logic around iterators and fix a bug where I was running two detect changes threads * add some metrics * cleanup * blocking and filtering implemented * add params for blocking and filtering channels and streams * updates and fixes for integration tests * use newer version of lbry.go when possible * Add height endpoint and move string functions internal * remove gitattributes, unused * some cleanup * more cleanup / refactor. almost ready for another review * More cleanup * use chainhash.Hash types from lbcd where appropriate * update github action to go-1.17.8 * update go version needed * trying to fix these builds * cleanup * trying to fix memory leak * fix memory leak (iterator never finished so cleanup didn't run) * changes per code review * remove lbry.go v2 * rename sort.go search.go * fix test
2022-04-29 17:04:01 +02:00
for key := range s.PeerServers {
line := key + "\n"
_, err := writer.WriteString(line)
if err != nil {
log.Println(err)
}
}
err = writer.Flush()
if err != nil {
log.Println(err)
}
err = f.Close()
if err != nil {
log.Println(err)
}
}
// notifyPeer takes a peer to notify and a new peer we just learned about
// and informs the already known peer about the new peer.
func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
if s.Args.DisableFederation {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx,
peerToNotify.Address+":"+peerToNotify.Port,
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
return err
}
defer conn.Close()
msg := &pb.ServerMessage{
Address: newPeer.Address,
Port: newPeer.Port,
}
c := pb.NewHubClient(conn)
_, err = c.AddPeer(ctx, msg)
if err != nil {
return err
}
return nil
}
// notifyPeerSubs takes a new peer server we just learned about and notifies
// all the peers that have subscribed to us about it.
func (s *Server) notifyPeerSubs(newPeer *Peer) {
var unsubscribe []string
s.PeerSubsMut.RLock()
for key, peer := range s.PeerSubs {
log.Printf("Notifying peer %s of new node %+v\n", key, newPeer)
err := s.notifyPeer(peer, newPeer)
if err != nil {
log.Println("Failed to send data to ", key)
log.Println(err)
unsubscribe = append(unsubscribe, key)
}
}
s.PeerSubsMut.RUnlock()
s.PeerSubsMut.Lock()
for _, key := range unsubscribe {
if _, ok := s.PeerSubs[key]; ok {
delete(s.PeerSubs, key)
s.decNumSubs()
metrics.PeersSubscribed.Dec()
}
}
s.PeerSubsMut.Unlock()
}
// addPeer takes a new peer, optionally checks to see if they're online, and
// adds them to our list of peers. It will also optionally subscribe to it.
func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
if s.Args.DisableFederation {
return nil
}
// First thing we get our external ip if we don't have it, otherwise we
// could end up subscribed to our self, which is silly.
2021-11-15 14:52:32 +01:00
nilIP := net.IP{}
2021-11-25 00:24:06 +01:00
localIP1 := net.IPv4(127, 0, 0, 1)
2021-11-15 14:52:32 +01:00
if s.ExternalIP.Equal(nilIP) || s.ExternalIP.Equal(localIP1) {
err := s.getAndSetExternalIp(newPeer.Address, newPeer.Port)
if err != nil {
log.Println(err)
log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host)
}
}
if s.Args.Port == newPeer.Port &&
(localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) {
log.Printf("%s:%s addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port)
return nil
}
k := peerKey(newPeer)
log.Printf("%s:%s adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer)
if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded {
if ping {
_, err := s.helloPeer(newPeer)
if err != nil {
s.PeerServersMut.Lock()
delete(s.PeerServers, k)
s.PeerServersMut.Unlock()
return err
}
}
s.incNumPeers()
metrics.PeersKnown.Inc()
s.writePeers()
s.notifyPeerSubs(newPeer)
// Subscribe to all our peers for now
if subscribe {
err := s.subscribeToPeer(newPeer)
if err != nil {
return err
}
}
} else {
2021-12-03 17:52:21 +01:00
oldServer.LastSeen = time.Now()
}
return nil
}
// mergePeers is an internal convenience function to add a list of
// peers.
func (s *Server) mergePeers(servers []*pb.ServerMessage) {
for _, srvMsg := range servers {
newPeer := &Peer{
2021-12-03 17:52:21 +01:00
Address: srvMsg.Address,
Port: srvMsg.Port,
LastSeen: time.Now(),
}
err := s.addPeer(newPeer, false, true)
// This shouldn't happen because we're not pinging them.
if err != nil {
log.Println(err)
}
}
}
// makeHelloMessage makes a message for this hub to call the Hello endpoint
// on another hub.
func (s *Server) makeHelloMessage() *pb.HelloMessage {
servers := make([]*pb.ServerMessage, 0, 10)
s.PeerServersMut.RLock()
for _, peer := range s.PeerServers {
servers = append(servers, &pb.ServerMessage{
Address: peer.Address,
2021-11-25 00:24:06 +01:00
Port: peer.Port,
})
}
s.PeerServersMut.RUnlock()
return &pb.HelloMessage{
2021-11-25 00:24:06 +01:00
Port: s.Args.Port,
Host: s.ExternalIP.String(),
Servers: servers,
}
}