2021-10-25 03:39:37 +02:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
|
|
|
"context"
|
2021-11-10 01:39:13 +01:00
|
|
|
"math"
|
2021-11-15 14:52:32 +01:00
|
|
|
"net"
|
2021-10-25 03:39:37 +02:00
|
|
|
"os"
|
2022-12-06 22:14:28 +01:00
|
|
|
"strconv"
|
2021-10-25 03:39:37 +02:00
|
|
|
"strings"
|
2021-10-30 07:27:25 +02:00
|
|
|
"sync/atomic"
|
2021-10-25 03:39:37 +02:00
|
|
|
"time"
|
|
|
|
|
2022-08-09 13:43:01 +02:00
|
|
|
"github.com/lbryio/herald.go/internal/metrics"
|
|
|
|
pb "github.com/lbryio/herald.go/protobuf/go"
|
2022-12-07 17:01:36 +01:00
|
|
|
log "github.com/sirupsen/logrus"
|
2021-10-25 03:39:37 +02:00
|
|
|
"google.golang.org/grpc"
|
2022-12-07 17:01:36 +01:00
|
|
|
"google.golang.org/grpc/credentials/insecure"
|
2021-10-25 03:39:37 +02:00
|
|
|
)
|
|
|
|
|
2021-12-02 01:32:23 +01:00
|
|
|
// Peer holds relevant information about peers that we know about.
|
|
|
|
type Peer struct {
|
2021-12-03 17:52:21 +01:00
|
|
|
Address string
|
|
|
|
Port string
|
|
|
|
LastSeen time.Time
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
2021-10-30 07:27:25 +02:00
|
|
|
var (
|
|
|
|
localHosts = map[string]bool{
|
|
|
|
"127.0.0.1": true,
|
2021-11-25 00:24:06 +01:00
|
|
|
"0.0.0.0": true,
|
2021-10-30 07:27:25 +02:00
|
|
|
"localhost": true,
|
2021-12-02 01:32:23 +01:00
|
|
|
"<nil>": true, // Empty net.IP turned into a string
|
2021-10-30 07:27:25 +02:00
|
|
|
}
|
|
|
|
)
|
|
|
|
|
2021-12-02 01:32:23 +01:00
|
|
|
// peerKey takes a peer and returns the key that for that peer
|
2021-10-25 03:39:37 +02:00
|
|
|
// in our peer table.
|
2021-12-02 01:32:23 +01:00
|
|
|
func peerKey(peer *Peer) string {
|
|
|
|
return peer.Address + ":" + peer.Port
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
2021-11-05 01:57:23 +01:00
|
|
|
// peerKey is a function on a FederatedServer struct to return the key for that
|
|
|
|
// peer is out peer table.
|
2021-12-02 01:32:23 +01:00
|
|
|
func (peer *Peer) peerKey() string {
|
2021-11-05 01:57:23 +01:00
|
|
|
return peer.Address + ":" + peer.Port
|
|
|
|
}
|
|
|
|
|
2021-10-30 07:27:25 +02:00
|
|
|
func (s *Server) incNumPeers() {
|
|
|
|
atomic.AddInt64(s.NumPeerServers, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) decNumPeers() {
|
|
|
|
atomic.AddInt64(s.NumPeerServers, -1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) getNumPeers() int64 {
|
|
|
|
return *s.NumPeerServers
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) incNumSubs() {
|
|
|
|
atomic.AddInt64(s.NumPeerSubs, 1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) decNumSubs() {
|
|
|
|
atomic.AddInt64(s.NumPeerSubs, -1)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) getNumSubs() int64 {
|
|
|
|
return *s.NumPeerSubs
|
|
|
|
}
|
|
|
|
|
2021-12-03 17:52:21 +01:00
|
|
|
// getAndSetExternalIp detects the server's external IP and stores it.
|
2021-12-02 01:32:23 +01:00
|
|
|
func (s *Server) getAndSetExternalIp(ip, port string) error {
|
|
|
|
pong, err := UDPPing(ip, port)
|
2021-11-10 01:39:13 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-11-24 23:58:05 +01:00
|
|
|
myIp := pong.DecodeAddress()
|
2021-11-10 01:39:13 +01:00
|
|
|
log.Println("my ip: ", myIp)
|
|
|
|
s.ExternalIP = myIp
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-10-25 03:39:37 +02:00
|
|
|
// loadPeers takes the arguments given to the hub at startup and loads the
|
|
|
|
// previously known peers from disk and verifies their existence before
|
|
|
|
// storing them as known peers. Returns a map of peerKey -> object
|
2021-10-30 07:27:25 +02:00
|
|
|
func (s *Server) loadPeers() error {
|
|
|
|
peerFile := s.Args.PeerFile
|
2022-12-06 22:14:28 +01:00
|
|
|
port := strconv.Itoa(s.Args.Port)
|
2021-10-25 03:39:37 +02:00
|
|
|
|
2021-11-10 01:39:13 +01:00
|
|
|
// First we make sure our server has come up, so we can answer back to peers.
|
|
|
|
var failures = 0
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
|
defer cancel()
|
|
|
|
|
rocksdb (#29)
* Initial rocksdb commit
Basic reading from rocksdb works
* Try github action thing
* try local dockerfile
* asdf
* qwer
* asdf
* Try adding test db with git-lfs
* update action
* cleanup
* Don't hardcode stop on read
* Progress of reading rocksdb
* fixes and arg test
* asdf
* Fix rocksdb iterator and tests
* update script
* asdf
* Better iterator. Need to implement a lot of keys next, and tests, maybe
tests needed.
* asdf
* asdf
* asdf
* Implementation, testing, and cleanup.
Implemented more prefixes. Figured out a good test that should work for
all prefixes. Removed binary databases so we can just store human
readable csv files.
* more tests, prefixes and small refactor
* Another prefix
* EffectiveAmount
* ActiveAmount
* ActivatedClaimAndSupport
* PendingActivation
* ClaimTakeover
* ClaimExpiration
* SupportToClaim
* ClaimToSupport
* Fix bug with variable length keys
* ChannelToClaim
* ClaimToChannel
* ClaimShortID
* TXOToClaim
* ClaimToTXO
* BlockHeader
* BlockHash
* Undo
* HashXHistory
* Tx and big refactor
* rest the the keys
* Refactor and starting to add resolve
* asdf
* Refactor tests and add column families
* changes
* more work on implementing resolve
* code cleanup, function tests
* small code refactoring
* start building pieces of the test data set for full resolve.
* Export constant, add test
* another test
* TestGetTxHash
* more tests
* more tests
* More tests
* Refactor db functions into three files
* added slice backed stack, need to fix tests
* fix some issues with test suite
* some cleanup and adding arguments and db load / refresh to server command
* fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy
* logrus, protobuf updates, resolve grpc endpoint
* don't run integration test with unit tests
* signal handling and cleanup functions
* signal handling code files
* Unit tests for db stack
* reorganize bisect function so we lock it properly
* fix txcounts loading
* cleanup some logic around iterators and fix a bug where I was running two detect changes threads
* add some metrics
* cleanup
* blocking and filtering implemented
* add params for blocking and filtering channels and streams
* updates and fixes for integration tests
* use newer version of lbry.go when possible
* Add height endpoint and move string functions internal
* remove gitattributes, unused
* some cleanup
* more cleanup / refactor. almost ready for another review
* More cleanup
* use chainhash.Hash types from lbcd where appropriate
* update github action to go-1.17.8
* update go version needed
* trying to fix these builds
* cleanup
* trying to fix memory leak
* fix memory leak (iterator never finished so cleanup didn't run)
* changes per code review
* remove lbry.go v2
* rename sort.go search.go
* fix test
2022-04-29 17:04:01 +02:00
|
|
|
// log.Println("loadPeers #### waiting for server to come up")
|
2021-11-10 01:39:13 +01:00
|
|
|
retry:
|
|
|
|
time.Sleep(time.Second * time.Duration(math.Pow(float64(failures), 2)))
|
|
|
|
conn, err := grpc.DialContext(ctx,
|
|
|
|
"0.0.0.0:"+port,
|
2022-12-07 17:01:36 +01:00
|
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
2021-11-10 01:39:13 +01:00
|
|
|
grpc.WithBlock(),
|
|
|
|
)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
if failures > 3 {
|
|
|
|
log.Println("Warning! Our endpoint doesn't seem to have come up, didn't load peers")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
failures += 1
|
|
|
|
goto retry
|
|
|
|
}
|
|
|
|
if err = conn.Close(); err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
cancel()
|
rocksdb (#29)
* Initial rocksdb commit
Basic reading from rocksdb works
* Try github action thing
* try local dockerfile
* asdf
* qwer
* asdf
* Try adding test db with git-lfs
* update action
* cleanup
* Don't hardcode stop on read
* Progress of reading rocksdb
* fixes and arg test
* asdf
* Fix rocksdb iterator and tests
* update script
* asdf
* Better iterator. Need to implement a lot of keys next, and tests, maybe
tests needed.
* asdf
* asdf
* asdf
* Implementation, testing, and cleanup.
Implemented more prefixes. Figured out a good test that should work for
all prefixes. Removed binary databases so we can just store human
readable csv files.
* more tests, prefixes and small refactor
* Another prefix
* EffectiveAmount
* ActiveAmount
* ActivatedClaimAndSupport
* PendingActivation
* ClaimTakeover
* ClaimExpiration
* SupportToClaim
* ClaimToSupport
* Fix bug with variable length keys
* ChannelToClaim
* ClaimToChannel
* ClaimShortID
* TXOToClaim
* ClaimToTXO
* BlockHeader
* BlockHash
* Undo
* HashXHistory
* Tx and big refactor
* rest the the keys
* Refactor and starting to add resolve
* asdf
* Refactor tests and add column families
* changes
* more work on implementing resolve
* code cleanup, function tests
* small code refactoring
* start building pieces of the test data set for full resolve.
* Export constant, add test
* another test
* TestGetTxHash
* more tests
* more tests
* More tests
* Refactor db functions into three files
* added slice backed stack, need to fix tests
* fix some issues with test suite
* some cleanup and adding arguments and db load / refresh to server command
* fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy
* logrus, protobuf updates, resolve grpc endpoint
* don't run integration test with unit tests
* signal handling and cleanup functions
* signal handling code files
* Unit tests for db stack
* reorganize bisect function so we lock it properly
* fix txcounts loading
* cleanup some logic around iterators and fix a bug where I was running two detect changes threads
* add some metrics
* cleanup
* blocking and filtering implemented
* add params for blocking and filtering channels and streams
* updates and fixes for integration tests
* use newer version of lbry.go when possible
* Add height endpoint and move string functions internal
* remove gitattributes, unused
* some cleanup
* more cleanup / refactor. almost ready for another review
* More cleanup
* use chainhash.Hash types from lbcd where appropriate
* update github action to go-1.17.8
* update go version needed
* trying to fix these builds
* cleanup
* trying to fix memory leak
* fix memory leak (iterator never finished so cleanup didn't run)
* changes per code review
* remove lbry.go v2
* rename sort.go search.go
* fix test
2022-04-29 17:04:01 +02:00
|
|
|
// log.Println("loadPeers #### Past checking for server to come up")
|
2021-11-10 01:39:13 +01:00
|
|
|
|
2021-10-25 03:39:37 +02:00
|
|
|
f, err := os.Open(peerFile)
|
|
|
|
if err != nil {
|
rocksdb (#29)
* Initial rocksdb commit
Basic reading from rocksdb works
* Try github action thing
* try local dockerfile
* asdf
* qwer
* asdf
* Try adding test db with git-lfs
* update action
* cleanup
* Don't hardcode stop on read
* Progress of reading rocksdb
* fixes and arg test
* asdf
* Fix rocksdb iterator and tests
* update script
* asdf
* Better iterator. Need to implement a lot of keys next, and tests, maybe
tests needed.
* asdf
* asdf
* asdf
* Implementation, testing, and cleanup.
Implemented more prefixes. Figured out a good test that should work for
all prefixes. Removed binary databases so we can just store human
readable csv files.
* more tests, prefixes and small refactor
* Another prefix
* EffectiveAmount
* ActiveAmount
* ActivatedClaimAndSupport
* PendingActivation
* ClaimTakeover
* ClaimExpiration
* SupportToClaim
* ClaimToSupport
* Fix bug with variable length keys
* ChannelToClaim
* ClaimToChannel
* ClaimShortID
* TXOToClaim
* ClaimToTXO
* BlockHeader
* BlockHash
* Undo
* HashXHistory
* Tx and big refactor
* rest the the keys
* Refactor and starting to add resolve
* asdf
* Refactor tests and add column families
* changes
* more work on implementing resolve
* code cleanup, function tests
* small code refactoring
* start building pieces of the test data set for full resolve.
* Export constant, add test
* another test
* TestGetTxHash
* more tests
* more tests
* More tests
* Refactor db functions into three files
* added slice backed stack, need to fix tests
* fix some issues with test suite
* some cleanup and adding arguments and db load / refresh to server command
* fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy
* logrus, protobuf updates, resolve grpc endpoint
* don't run integration test with unit tests
* signal handling and cleanup functions
* signal handling code files
* Unit tests for db stack
* reorganize bisect function so we lock it properly
* fix txcounts loading
* cleanup some logic around iterators and fix a bug where I was running two detect changes threads
* add some metrics
* cleanup
* blocking and filtering implemented
* add params for blocking and filtering channels and streams
* updates and fixes for integration tests
* use newer version of lbry.go when possible
* Add height endpoint and move string functions internal
* remove gitattributes, unused
* some cleanup
* more cleanup / refactor. almost ready for another review
* More cleanup
* use chainhash.Hash types from lbcd where appropriate
* update github action to go-1.17.8
* update go version needed
* trying to fix these builds
* cleanup
* trying to fix memory leak
* fix memory leak (iterator never finished so cleanup didn't run)
* changes per code review
* remove lbry.go v2
* rename sort.go search.go
* fix test
2022-04-29 17:04:01 +02:00
|
|
|
// log.Println(err)
|
2021-10-30 07:27:25 +02:00
|
|
|
return err
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
|
|
scanner.Split(bufio.ScanLines)
|
|
|
|
var text []string
|
|
|
|
for scanner.Scan() {
|
|
|
|
text = append(text, scanner.Text())
|
|
|
|
}
|
|
|
|
err = f.Close()
|
|
|
|
if err != nil {
|
|
|
|
log.Println("peer file failed to close: ", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, line := range text {
|
2021-11-25 00:24:06 +01:00
|
|
|
ipPort := strings.Split(line, ":")
|
2021-10-25 03:39:37 +02:00
|
|
|
if len(ipPort) != 2 {
|
|
|
|
log.Println("Malformed entry in peer file")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// If the peer is us, skip
|
|
|
|
log.Println(ipPort)
|
2021-11-10 01:39:13 +01:00
|
|
|
if ipPort[1] == port &&
|
2021-11-15 14:52:32 +01:00
|
|
|
(localHosts[ipPort[0]] || ipPort[0] == s.ExternalIP.String()) {
|
2021-10-25 03:39:37 +02:00
|
|
|
log.Println("Self peer, skipping ...")
|
|
|
|
continue
|
|
|
|
}
|
2021-11-10 01:39:13 +01:00
|
|
|
|
2021-12-02 01:32:23 +01:00
|
|
|
newPeer := &Peer{
|
2021-12-03 17:52:21 +01:00
|
|
|
Address: ipPort[0],
|
|
|
|
Port: ipPort[1],
|
|
|
|
LastSeen: time.Now(),
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
2021-12-02 01:32:23 +01:00
|
|
|
log.Printf("pinging peer %+v\n", newPeer)
|
|
|
|
err = s.addPeer(newPeer, true, true)
|
2021-10-30 07:27:25 +02:00
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
2021-11-10 01:39:13 +01:00
|
|
|
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
log.Println("Returning from loadPeers")
|
2021-10-30 07:27:25 +02:00
|
|
|
return nil
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
2021-10-30 07:27:25 +02:00
|
|
|
// subscribeToPeer subscribes us to a peer to we'll get updates about their
|
|
|
|
// known peers.
|
2021-12-02 01:32:23 +01:00
|
|
|
func (s *Server) subscribeToPeer(peer *Peer) error {
|
2021-10-25 03:39:37 +02:00
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
conn, err := grpc.DialContext(ctx,
|
2021-10-30 07:27:25 +02:00
|
|
|
peer.Address+":"+peer.Port,
|
2022-12-07 17:01:36 +01:00
|
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
2021-10-25 03:39:37 +02:00
|
|
|
grpc.WithBlock(),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
msg := &pb.ServerMessage{
|
2021-11-15 14:52:32 +01:00
|
|
|
Address: s.ExternalIP.String(),
|
2022-12-06 22:14:28 +01:00
|
|
|
Port: strconv.Itoa(s.Args.Port),
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
c := pb.NewHubClient(conn)
|
|
|
|
|
2022-12-06 22:14:28 +01:00
|
|
|
log.Printf("%s:%d subscribing to %+v\n", s.ExternalIP, s.Args.Port, peer)
|
2021-10-30 07:27:25 +02:00
|
|
|
_, err = c.PeerSubscribe(ctx, msg)
|
2021-10-25 03:39:37 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// helloPeer takes a peer to say hello to and sends a hello message
|
|
|
|
// containing all the peers we know about and information about us.
|
|
|
|
// This is used to confirm existence of peers on start and let them
|
2021-10-30 07:27:25 +02:00
|
|
|
// know about us. Returns the response from the server on success,
|
|
|
|
// nil otherwise.
|
2021-12-02 01:32:23 +01:00
|
|
|
func (s *Server) helloPeer(peer *Peer) (*pb.HelloMessage, error) {
|
2021-10-25 03:39:37 +02:00
|
|
|
log.Println("In helloPeer")
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
conn, err := grpc.DialContext(ctx,
|
2021-12-02 01:32:23 +01:00
|
|
|
peer.Address+":"+peer.Port,
|
2022-12-07 17:01:36 +01:00
|
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
2021-10-25 03:39:37 +02:00
|
|
|
grpc.WithBlock(),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
2021-10-30 07:27:25 +02:00
|
|
|
return nil, err
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
c := pb.NewHubClient(conn)
|
|
|
|
|
|
|
|
msg := &pb.HelloMessage{
|
2022-12-06 22:14:28 +01:00
|
|
|
Port: strconv.Itoa(s.Args.Port),
|
2021-11-25 00:24:06 +01:00
|
|
|
Host: s.ExternalIP.String(),
|
2021-10-25 03:39:37 +02:00
|
|
|
Servers: []*pb.ServerMessage{},
|
|
|
|
}
|
2021-10-30 07:27:25 +02:00
|
|
|
|
2022-12-06 22:14:28 +01:00
|
|
|
log.Printf("%s:%d saying hello to %+v\n", s.ExternalIP, s.Args.Port, peer)
|
2021-10-25 03:39:37 +02:00
|
|
|
res, err := c.Hello(ctx, msg)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
2021-10-30 07:27:25 +02:00
|
|
|
return nil, err
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
log.Println(res)
|
|
|
|
|
2021-10-30 07:27:25 +02:00
|
|
|
return res, nil
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
2021-10-29 23:27:27 +02:00
|
|
|
// writePeers writes our current known peers to disk.
|
2021-10-25 03:39:37 +02:00
|
|
|
func (s *Server) writePeers() {
|
2021-11-25 00:56:34 +01:00
|
|
|
if s.Args.DisableWritePeers {
|
2021-10-25 03:39:37 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
f, err := os.Create(s.Args.PeerFile)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
2021-10-30 07:27:25 +02:00
|
|
|
return
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
writer := bufio.NewWriter(f)
|
|
|
|
|
rocksdb (#29)
* Initial rocksdb commit
Basic reading from rocksdb works
* Try github action thing
* try local dockerfile
* asdf
* qwer
* asdf
* Try adding test db with git-lfs
* update action
* cleanup
* Don't hardcode stop on read
* Progress of reading rocksdb
* fixes and arg test
* asdf
* Fix rocksdb iterator and tests
* update script
* asdf
* Better iterator. Need to implement a lot of keys next, and tests, maybe
tests needed.
* asdf
* asdf
* asdf
* Implementation, testing, and cleanup.
Implemented more prefixes. Figured out a good test that should work for
all prefixes. Removed binary databases so we can just store human
readable csv files.
* more tests, prefixes and small refactor
* Another prefix
* EffectiveAmount
* ActiveAmount
* ActivatedClaimAndSupport
* PendingActivation
* ClaimTakeover
* ClaimExpiration
* SupportToClaim
* ClaimToSupport
* Fix bug with variable length keys
* ChannelToClaim
* ClaimToChannel
* ClaimShortID
* TXOToClaim
* ClaimToTXO
* BlockHeader
* BlockHash
* Undo
* HashXHistory
* Tx and big refactor
* rest the the keys
* Refactor and starting to add resolve
* asdf
* Refactor tests and add column families
* changes
* more work on implementing resolve
* code cleanup, function tests
* small code refactoring
* start building pieces of the test data set for full resolve.
* Export constant, add test
* another test
* TestGetTxHash
* more tests
* more tests
* More tests
* Refactor db functions into three files
* added slice backed stack, need to fix tests
* fix some issues with test suite
* some cleanup and adding arguments and db load / refresh to server command
* fix some bugs, start using logrus for leveled logging, upgrade to go 1.17, run go mod tidy
* logrus, protobuf updates, resolve grpc endpoint
* don't run integration test with unit tests
* signal handling and cleanup functions
* signal handling code files
* Unit tests for db stack
* reorganize bisect function so we lock it properly
* fix txcounts loading
* cleanup some logic around iterators and fix a bug where I was running two detect changes threads
* add some metrics
* cleanup
* blocking and filtering implemented
* add params for blocking and filtering channels and streams
* updates and fixes for integration tests
* use newer version of lbry.go when possible
* Add height endpoint and move string functions internal
* remove gitattributes, unused
* some cleanup
* more cleanup / refactor. almost ready for another review
* More cleanup
* use chainhash.Hash types from lbcd where appropriate
* update github action to go-1.17.8
* update go version needed
* trying to fix these builds
* cleanup
* trying to fix memory leak
* fix memory leak (iterator never finished so cleanup didn't run)
* changes per code review
* remove lbry.go v2
* rename sort.go search.go
* fix test
2022-04-29 17:04:01 +02:00
|
|
|
for key := range s.PeerServers {
|
2021-10-30 07:27:25 +02:00
|
|
|
line := key + "\n"
|
2021-10-25 03:39:37 +02:00
|
|
|
_, err := writer.WriteString(line)
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
2021-11-05 01:57:23 +01:00
|
|
|
}
|
2021-10-25 03:39:37 +02:00
|
|
|
|
|
|
|
err = writer.Flush()
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
err = f.Close()
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-30 07:27:25 +02:00
|
|
|
// notifyPeer takes a peer to notify and a new peer we just learned about
|
2021-12-02 01:32:23 +01:00
|
|
|
// and informs the already known peer about the new peer.
|
|
|
|
func (s *Server) notifyPeer(peerToNotify *Peer, newPeer *Peer) error {
|
2021-11-25 00:36:19 +01:00
|
|
|
if s.Args.DisableFederation {
|
|
|
|
return nil
|
|
|
|
}
|
2021-10-30 07:27:25 +02:00
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
|
|
defer cancel()
|
2021-10-25 03:39:37 +02:00
|
|
|
|
2021-10-30 07:27:25 +02:00
|
|
|
conn, err := grpc.DialContext(ctx,
|
|
|
|
peerToNotify.Address+":"+peerToNotify.Port,
|
2022-12-07 17:01:36 +01:00
|
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
2021-10-30 07:27:25 +02:00
|
|
|
grpc.WithBlock(),
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
2021-10-30 07:27:25 +02:00
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
msg := &pb.ServerMessage{
|
|
|
|
Address: newPeer.Address,
|
|
|
|
Port: newPeer.Port,
|
|
|
|
}
|
|
|
|
|
|
|
|
c := pb.NewHubClient(conn)
|
|
|
|
|
|
|
|
_, err = c.AddPeer(ctx, msg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
2021-10-30 07:27:25 +02:00
|
|
|
// notifyPeerSubs takes a new peer server we just learned about and notifies
|
|
|
|
// all the peers that have subscribed to us about it.
|
2021-12-02 01:32:23 +01:00
|
|
|
func (s *Server) notifyPeerSubs(newPeer *Peer) {
|
2021-10-25 03:39:37 +02:00
|
|
|
var unsubscribe []string
|
2021-11-05 01:57:23 +01:00
|
|
|
s.PeerSubsMut.RLock()
|
|
|
|
for key, peer := range s.PeerSubs {
|
2021-12-02 01:32:23 +01:00
|
|
|
log.Printf("Notifying peer %s of new node %+v\n", key, newPeer)
|
|
|
|
err := s.notifyPeer(peer, newPeer)
|
2021-10-25 03:39:37 +02:00
|
|
|
if err != nil {
|
|
|
|
log.Println("Failed to send data to ", key)
|
|
|
|
log.Println(err)
|
|
|
|
unsubscribe = append(unsubscribe, key)
|
|
|
|
}
|
2021-11-05 01:57:23 +01:00
|
|
|
}
|
|
|
|
s.PeerSubsMut.RUnlock()
|
2021-10-25 03:39:37 +02:00
|
|
|
|
2021-11-05 01:57:23 +01:00
|
|
|
s.PeerSubsMut.Lock()
|
2021-10-25 03:39:37 +02:00
|
|
|
for _, key := range unsubscribe {
|
2021-11-05 01:57:23 +01:00
|
|
|
if _, ok := s.PeerSubs[key]; ok {
|
|
|
|
delete(s.PeerSubs, key)
|
|
|
|
s.decNumSubs()
|
|
|
|
metrics.PeersSubscribed.Dec()
|
|
|
|
}
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
2021-11-05 01:57:23 +01:00
|
|
|
s.PeerSubsMut.Unlock()
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
2021-12-02 01:32:23 +01:00
|
|
|
// addPeer takes a new peer, optionally checks to see if they're online, and
|
|
|
|
// adds them to our list of peers. It will also optionally subscribe to it.
|
|
|
|
func (s *Server) addPeer(newPeer *Peer, ping bool, subscribe bool) error {
|
2021-11-25 00:36:19 +01:00
|
|
|
if s.Args.DisableFederation {
|
|
|
|
return nil
|
|
|
|
}
|
2021-11-10 01:39:13 +01:00
|
|
|
// First thing we get our external ip if we don't have it, otherwise we
|
|
|
|
// could end up subscribed to our self, which is silly.
|
2021-11-15 14:52:32 +01:00
|
|
|
nilIP := net.IP{}
|
2021-11-25 00:24:06 +01:00
|
|
|
localIP1 := net.IPv4(127, 0, 0, 1)
|
2021-11-15 14:52:32 +01:00
|
|
|
if s.ExternalIP.Equal(nilIP) || s.ExternalIP.Equal(localIP1) {
|
2021-12-02 01:32:23 +01:00
|
|
|
err := s.getAndSetExternalIp(newPeer.Address, newPeer.Port)
|
2021-11-10 01:39:13 +01:00
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
log.Println("WARNING: can't determine external IP, continuing with ", s.Args.Host)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-12-06 22:14:28 +01:00
|
|
|
if strconv.Itoa(s.Args.Port) == newPeer.Port &&
|
2021-12-02 01:32:23 +01:00
|
|
|
(localHosts[newPeer.Address] || newPeer.Address == s.ExternalIP.String()) {
|
2022-12-06 22:14:28 +01:00
|
|
|
log.Printf("%s:%d addPeer: Self peer, skipping...\n", s.ExternalIP, s.Args.Port)
|
2021-10-30 07:27:25 +02:00
|
|
|
return nil
|
|
|
|
}
|
2021-11-10 01:39:13 +01:00
|
|
|
|
2021-12-02 01:32:23 +01:00
|
|
|
k := peerKey(newPeer)
|
2021-11-10 01:39:13 +01:00
|
|
|
|
2022-12-06 22:14:28 +01:00
|
|
|
log.Printf("%s:%d adding peer %+v\n", s.ExternalIP, s.Args.Port, newPeer)
|
2021-12-02 01:32:23 +01:00
|
|
|
if oldServer, loaded := s.PeerServersLoadOrStore(newPeer); !loaded {
|
2021-10-30 07:27:25 +02:00
|
|
|
if ping {
|
2021-12-02 01:32:23 +01:00
|
|
|
_, err := s.helloPeer(newPeer)
|
2021-10-30 07:27:25 +02:00
|
|
|
if err != nil {
|
2021-11-05 01:57:23 +01:00
|
|
|
s.PeerServersMut.Lock()
|
|
|
|
delete(s.PeerServers, k)
|
|
|
|
s.PeerServersMut.Unlock()
|
2021-10-30 07:27:25 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
s.incNumPeers()
|
|
|
|
metrics.PeersKnown.Inc()
|
|
|
|
s.writePeers()
|
2021-12-02 01:32:23 +01:00
|
|
|
s.notifyPeerSubs(newPeer)
|
2022-12-07 17:01:36 +01:00
|
|
|
// This is weird because we're doing grpc and jsonrpc here.
|
|
|
|
// Do we still want to custom grpc?
|
|
|
|
log.Warn("Sending peer to NotifierChan")
|
|
|
|
s.NotifierChan <- peerNotification{newPeer.Address, newPeer.Port}
|
2021-10-30 07:27:25 +02:00
|
|
|
|
2021-11-05 01:57:23 +01:00
|
|
|
// Subscribe to all our peers for now
|
2021-11-10 01:39:13 +01:00
|
|
|
if subscribe {
|
2021-12-02 01:32:23 +01:00
|
|
|
err := s.subscribeToPeer(newPeer)
|
2021-11-10 01:39:13 +01:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-10-30 07:27:25 +02:00
|
|
|
}
|
|
|
|
} else {
|
2021-12-03 17:52:21 +01:00
|
|
|
oldServer.LastSeen = time.Now()
|
2021-10-30 07:27:25 +02:00
|
|
|
}
|
|
|
|
return nil
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
|
2021-12-02 01:32:23 +01:00
|
|
|
// mergePeers is an internal convenience function to add a list of
|
2021-10-25 03:39:37 +02:00
|
|
|
// peers.
|
2021-12-02 01:32:23 +01:00
|
|
|
func (s *Server) mergePeers(servers []*pb.ServerMessage) {
|
2021-10-25 03:39:37 +02:00
|
|
|
for _, srvMsg := range servers {
|
2021-12-02 01:32:23 +01:00
|
|
|
newPeer := &Peer{
|
2021-12-03 17:52:21 +01:00
|
|
|
Address: srvMsg.Address,
|
|
|
|
Port: srvMsg.Port,
|
|
|
|
LastSeen: time.Now(),
|
2021-12-02 01:32:23 +01:00
|
|
|
}
|
|
|
|
err := s.addPeer(newPeer, false, true)
|
2021-10-30 07:27:25 +02:00
|
|
|
// This shouldn't happen because we're not pinging them.
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
}
|
2021-10-25 03:39:37 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// makeHelloMessage makes a message for this hub to call the Hello endpoint
|
|
|
|
// on another hub.
|
|
|
|
func (s *Server) makeHelloMessage() *pb.HelloMessage {
|
2021-10-30 07:27:25 +02:00
|
|
|
servers := make([]*pb.ServerMessage, 0, 10)
|
|
|
|
|
2021-11-05 01:57:23 +01:00
|
|
|
s.PeerServersMut.RLock()
|
|
|
|
for _, peer := range s.PeerServers {
|
2021-10-30 07:27:25 +02:00
|
|
|
servers = append(servers, &pb.ServerMessage{
|
|
|
|
Address: peer.Address,
|
2021-11-25 00:24:06 +01:00
|
|
|
Port: peer.Port,
|
2021-10-30 07:27:25 +02:00
|
|
|
})
|
2021-11-05 01:57:23 +01:00
|
|
|
}
|
|
|
|
s.PeerServersMut.RUnlock()
|
2021-10-25 03:39:37 +02:00
|
|
|
|
|
|
|
return &pb.HelloMessage{
|
2022-12-06 22:14:28 +01:00
|
|
|
Port: strconv.Itoa(s.Args.Port),
|
2021-11-25 00:24:06 +01:00
|
|
|
Host: s.ExternalIP.String(),
|
2021-10-25 03:39:37 +02:00
|
|
|
Servers: servers,
|
|
|
|
}
|
|
|
|
}
|