Merge branch 'resize-buckets'

* resize-buckets:
  refactor contact sort
  more
  handle peer port correctly
  Revert "add tcp port mapping to data store"
  iterative find value rpc command
  add jack.lbry.tech as a known node for debugging
  add tcp port mapping to data store
  bucket splitting is solid
  add dht start command, run a jsonrpc server to interact with the node
  grin's cleanup and some WIP
  more
  expand empty buckets
  add BucketRange to bucket struct
This commit is contained in:
Alex Grintsvayg 2018-07-26 10:07:47 -04:00
commit dd98b3cdfb
17 changed files with 710 additions and 207 deletions

23
Gopkg.lock generated
View file

@ -73,6 +73,27 @@
packages = ["."]
revision = "3287d94d4c6a48a63e16fffaabf27ab20203af2a"
[[projects]]
name = "github.com/gorilla/context"
packages = ["."]
revision = "08b5f424b9271eedf6f9f0ce86cb9396ed337a42"
version = "v1.1.1"
[[projects]]
name = "github.com/gorilla/mux"
packages = ["."]
revision = "e3702bed27f0d39777b0b37b664b6280e8ef8fbf"
version = "v1.6.2"
[[projects]]
name = "github.com/gorilla/rpc"
packages = [
".",
"json"
]
revision = "22c016f3df3febe0c1f6727598b6389507e03a18"
version = "v1.1.0"
[[projects]]
name = "github.com/gorilla/websocket"
packages = ["."]
@ -268,6 +289,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "4dc432f7df1c1d59d5ee47417ab4f0fe187d26eb9e1f53fecdb6396b3bd1e6e0"
inputs-digest = "6fac5a5bd6eb2f49d18558f8ed96b510e0852f95d7c746e301d53f5df92fffc4"
solver-name = "gps-cdcl"
solver-version = 1

View file

@ -1,7 +1,10 @@
package cmd
import (
"log"
"math/big"
"net"
"net/http"
"os"
"os/signal"
"strconv"
@ -11,41 +14,76 @@ import (
"github.com/lbryio/reflector.go/dht"
"github.com/lbryio/reflector.go/dht/bits"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
type NodeRPC string
type PingArgs struct {
nodeID string
address string
port int
}
type PingResult string
func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) error {
*result = PingResult("pong")
return nil
}
var dhtPort int
var rpcPort int
func init() {
var cmd = &cobra.Command{
Use: "dht [start|bootstrap]",
Use: "dht [bootstrap|connect]",
Short: "Run dht node",
ValidArgs: []string{"start", "bootstrap"},
Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs),
Run: dhtCmd,
}
cmd.PersistentFlags().StringP("nodeID", "n", "", "nodeID in hex")
cmd.PersistentFlags().IntVar(&dhtPort, "port", 4567, "Port to start DHT on")
cmd.PersistentFlags().IntVar(&rpcPort, "rpc_port", 1234, "Port to listen for rpc commands on")
rootCmd.AddCommand(cmd)
}
func dhtCmd(cmd *cobra.Command, args []string) {
if args[0] == "bootstrap" {
node := dht.NewBootstrapNode(bits.Rand(), 1*time.Millisecond, 1*time.Minute)
listener, err := net.ListenPacket(dht.Network, "127.0.0.1:"+strconv.Itoa(dhtPort))
checkErr(err)
conn := listener.(*net.UDPConn)
err = node.Connect(conn)
checkErr(err)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan
log.Printf("shutting down bootstrap node")
node.Shutdown()
} else {
log.Fatal("not implemented")
nodeIDStr := cmd.Flag("nodeID").Value.String()
nodeID := bits.Bitmap{}
if nodeIDStr == "" {
nodeID = bits.Rand()
} else {
nodeID = bits.FromHexP(nodeIDStr)
}
log.Println(nodeID.String())
node := dht.NewBootstrapNode(nodeID, 1*time.Millisecond, 1*time.Minute)
listener, err := net.ListenPacket(dht.Network, "127.0.0.1:"+strconv.Itoa(dhtPort))
checkErr(err)
conn := listener.(*net.UDPConn)
err = node.Connect(conn)
checkErr(err)
log.Println("started node")
_, _, err = dht.FindContacts(&node.Node, nodeID.Sub(bits.FromBigP(big.NewInt(1))), false, nil)
rpcServer := dht.RunRPCServer("127.0.0.1:"+strconv.Itoa(rpcPort), "/", node)
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
<-interruptChan
rpcServer.Wg.Done()
node.Shutdown()
}
}

View file

@ -27,7 +27,7 @@ var verbose []string
const (
verboseAll = "all"
verboseDHT = "dht"
verboseNodeFinder = "nodefinder"
verboseNodeFinder = "node_finder"
)
var conf string

View file

@ -61,3 +61,7 @@ func (r Range) intervalStart(n, num int) *big.Int {
func (r Range) IntervalSize() *big.Int {
return (&big.Int{}).Sub(r.End.Big(), r.Start.Big())
}
func (r Range) Contains(b Bitmap) bool {
return r.Start.Cmp(b) <= 0 && r.End.Cmp(b) >= 0
}

View file

@ -21,7 +21,7 @@ type BootstrapNode struct {
checkInterval time.Duration
nlock *sync.RWMutex
nodes map[bits.Bitmap]*peer
peers map[bits.Bitmap]*peer
nodeIDs []bits.Bitmap // necessary for efficient random ID selection
}
@ -34,7 +34,7 @@ func NewBootstrapNode(id bits.Bitmap, initialPingInterval, rePingInterval time.D
checkInterval: rePingInterval,
nlock: &sync.RWMutex{},
nodes: make(map[bits.Bitmap]*peer),
peers: make(map[bits.Bitmap]*peer),
nodeIDs: make([]bits.Bitmap, 0),
}
@ -48,6 +48,10 @@ func (b *BootstrapNode) Add(c Contact) {
b.upsert(c)
}
func (b *BootstrapNode) AddKnownNode(c Contact) {
b.Node.rt.Update(c)
}
// Connect connects to the given connection and starts any background threads necessary
func (b *BootstrapNode) Connect(conn UDPConn) error {
err := b.Node.Connect(conn)
@ -77,14 +81,14 @@ func (b *BootstrapNode) upsert(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
if node, exists := b.nodes[c.ID]; exists {
log.Debugf("[%s] bootstrap: touching contact %s", b.id.HexShort(), node.Contact.ID.HexShort())
node.Touch()
if peer, exists := b.peers[c.ID]; exists {
log.Debugf("[%s] bootstrap: touching contact %s", b.id.HexShort(), peer.Contact.ID.HexShort())
peer.Touch()
return
}
log.Debugf("[%s] bootstrap: adding new contact %s", b.id.HexShort(), c.ID.HexShort())
b.nodes[c.ID] = &peer{c, time.Now(), 0}
b.peers[c.ID] = &peer{c, b.id.Xor(c.ID), time.Now(), 0}
b.nodeIDs = append(b.nodeIDs, c.ID)
}
@ -93,13 +97,13 @@ func (b *BootstrapNode) remove(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
_, exists := b.nodes[c.ID]
_, exists := b.peers[c.ID]
if !exists {
return
}
log.Debugf("[%s] bootstrap: removing contact %s", b.id.HexShort(), c.ID.HexShort())
delete(b.nodes, c.ID)
delete(b.peers, c.ID)
for i := range b.nodeIDs {
if b.nodeIDs[i].Equals(c.ID) {
b.nodeIDs = append(b.nodeIDs[:i], b.nodeIDs[i+1:]...)
@ -113,13 +117,13 @@ func (b *BootstrapNode) get(limit int) []Contact {
b.nlock.RLock()
defer b.nlock.RUnlock()
if len(b.nodes) < limit {
limit = len(b.nodes)
if len(b.peers) < limit {
limit = len(b.peers)
}
ret := make([]Contact, limit)
for i, k := range randKeys(len(b.nodeIDs))[:limit] {
ret[i] = b.nodes[b.nodeIDs[k]].Contact
ret[i] = b.peers[b.nodeIDs[k]].Contact
}
return ret
@ -152,9 +156,9 @@ func (b *BootstrapNode) check() {
b.nlock.RLock()
defer b.nlock.RUnlock()
for i := range b.nodes {
if !b.nodes[i].ActiveInLast(b.checkInterval) {
go b.ping(b.nodes[i].Contact)
for i := range b.peers {
if !b.peers[i].ActiveInLast(b.checkInterval) {
go b.ping(b.peers[i].Contact)
}
}
}
@ -185,13 +189,13 @@ func (b *BootstrapNode) handleRequest(addr *net.UDPAddr, request Request) {
go func() {
b.nlock.RLock()
_, exists := b.nodes[request.NodeID]
_, exists := b.peers[request.NodeID]
b.nlock.RUnlock()
if !exists {
log.Debugf("[%s] bootstrap: queuing %s to ping", b.id.HexShort(), request.NodeID.HexShort())
<-time.After(b.initialPingInterval)
b.nlock.RLock()
_, exists = b.nodes[request.NodeID]
_, exists = b.peers[request.NodeID]
b.nlock.RUnlock()
if !exists {
b.ping(Contact{ID: request.NodeID, IP: addr.IP, Port: addr.Port})

View file

@ -3,6 +3,8 @@ package dht
import (
"bytes"
"net"
"sort"
"strconv"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/reflector.go/dht/bits"
@ -12,44 +14,47 @@ import (
// TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap
// TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg)
// https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41
// Contact is a type representation of another node that a specific node is in communication with.
// Contact contains information for contacting another node on the network
type Contact struct {
ID bits.Bitmap
IP net.IP
Port int
ID bits.Bitmap
IP net.IP
Port int // the udp port used for the dht
PeerPort int // the tcp port a peer can be contacted on for blob requests
}
// Equals returns T/F if two contacts are the same.
// Equals returns true if two contacts are the same.
func (c Contact) Equals(other Contact, checkID bool) bool {
return c.IP.Equal(other.IP) && c.Port == other.Port && (!checkID || c.ID == other.ID)
}
// Addr returns the UPD Address of the contact.
// Addr returns the address of the contact.
func (c Contact) Addr() *net.UDPAddr {
return &net.UDPAddr{IP: c.IP, Port: c.Port}
}
// String returns the concatenated short hex encoded string of its ID + @ + string represention of its UPD Address.
// String returns a short string representation of the contact
func (c Contact) String() string {
return c.ID.HexShort() + "@" + c.Addr().String()
str := c.ID.HexShort() + "@" + c.Addr().String()
if c.PeerPort != 0 {
str += "(" + strconv.Itoa(c.PeerPort) + ")"
}
return str
}
// MarshalCompact returns the compact byte slice representation of a contact.
// MarshalCompact returns a compact byteslice representation of the contact
// NOTE: The compact representation always uses the tcp PeerPort, not the udp Port. This is dumb, but that's how the python daemon does it
func (c Contact) MarshalCompact() ([]byte, error) {
if c.IP.To4() == nil {
return nil, errors.Err("ip not set")
}
if c.Port < 0 || c.Port > 65535 {
if c.PeerPort < 0 || c.PeerPort > 65535 {
return nil, errors.Err("invalid port")
}
var buf bytes.Buffer
buf.Write(c.IP.To4())
buf.WriteByte(byte(c.Port >> 8))
buf.WriteByte(byte(c.Port))
buf.WriteByte(byte(c.PeerPort >> 8))
buf.WriteByte(byte(c.PeerPort))
buf.Write(c.ID[:])
if buf.Len() != compactNodeInfoLength {
@ -59,13 +64,14 @@ func (c Contact) MarshalCompact() ([]byte, error) {
return buf.Bytes(), nil
}
// UnmarshalCompact unmarshals the compact byte slice representation of a contact.
// UnmarshalCompact unmarshals the compact byteslice representation of a contact.
// NOTE: The compact representation always uses the tcp PeerPort, not the udp Port. This is dumb, but that's how the python daemon does it
func (c *Contact) UnmarshalCompact(b []byte) error {
if len(b) != compactNodeInfoLength {
return errors.Err("invalid compact length")
}
c.IP = net.IPv4(b[0], b[1], b[2], b[3]).To4()
c.Port = int(uint16(b[5]) | uint16(b[4])<<8)
c.PeerPort = int(uint16(b[5]) | uint16(b[4])<<8)
c.ID = bits.FromBytesP(b[6:])
return nil
}
@ -110,15 +116,8 @@ func (c *Contact) UnmarshalBencode(b []byte) error {
return nil
}
type sortedContact struct {
contact Contact
xorDistanceToTarget bits.Bitmap
}
type byXorDistance []sortedContact
func (a byXorDistance) Len() int { return len(a) }
func (a byXorDistance) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byXorDistance) Less(i, j int) bool {
return a[i].xorDistanceToTarget.Cmp(a[j].xorDistanceToTarget) < 0
func sortByDistance(contacts []Contact, target bits.Bitmap) {
sort.Slice(contacts, func(i, j int) bool {
return contacts[i].ID.Xor(target).Cmp(contacts[j].ID.Xor(target)) < 0
})
}

View file

@ -10,9 +10,9 @@ import (
func TestCompactEncoding(t *testing.T) {
c := Contact{
ID: bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41"),
IP: net.ParseIP("1.2.3.4"),
Port: int(55<<8 + 66),
ID: bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41"),
IP: net.ParseIP("1.2.3.4"),
PeerPort: int(55<<8 + 66),
}
var compact []byte

View file

@ -39,7 +39,6 @@ const (
alpha = 3 // this is the constant alpha in the spec
bucketSize = 8 // this is the constant k in the spec
nodeIDLength = bits.NumBytes // bytes. this is the constant B in the spec
nodeIDBits = bits.NumBits // number of bits in node ID
messageIDLength = 20 // bytes.
udpRetry = 1
@ -319,6 +318,7 @@ func (dht *DHT) startReannouncer() {
func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
// self-store
if dht.contact.ID == c.ID {
c.PeerPort = dht.conf.PeerProtocolPort
dht.node.Store(hash, c)
return
}

View file

@ -10,6 +10,10 @@ import (
)
func TestNodeFinder_FindNodes(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow nodeFinder test")
}
bs, dhts := TestingCreateDHT(t, 3, true, false)
defer func() {
for i := range dhts {
@ -73,6 +77,10 @@ func TestNodeFinder_FindNodes_NoBootstrap(t *testing.T) {
}
func TestNodeFinder_FindValue(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow nodeFinder test")
}
bs, dhts := TestingCreateDHT(t, 3, true, false)
defer func() {
for i := range dhts {
@ -104,6 +112,10 @@ func TestNodeFinder_FindValue(t *testing.T) {
}
func TestDHT_LargeDHT(t *testing.T) {
if testing.Short() {
t.Skip("skipping large DHT test")
}
nodes := 100
bs, dhts := TestingCreateDHT(t, nodes, true, true)
defer func() {

View file

@ -44,7 +44,7 @@ const (
protocolVersionField = "protocolVersion"
)
// Message is an extension of the bencode marshalling interface for serialized message passing.
// Message is a DHT message
type Message interface {
bencode.Marshaler
}
@ -82,7 +82,7 @@ func newMessageID() messageID {
return m
}
// Request represents the structured request from one node to another.
// Request represents a DHT request message
type Request struct {
ID messageID
NodeID bits.Bitmap
@ -261,7 +261,7 @@ func (s *storeArgs) UnmarshalBencode(b []byte) error {
return nil
}
// Response represents the structured response one node returns to another.
// Response represents a DHT response message
type Response struct {
ID messageID
NodeID bits.Bitmap
@ -416,7 +416,7 @@ func (r *Response) UnmarshalBencode(b []byte) error {
return nil
}
// Error represents an error message that is returned from one node to another in communication.
// Error represents a DHT error response
type Error struct {
ID messageID
NodeID bits.Bitmap

View file

@ -103,9 +103,9 @@ func TestBencodeFindValueResponse(t *testing.T) {
ID: newMessageID(),
NodeID: bits.Rand(),
FindValueKey: bits.Rand().RawString(),
Token: "arst",
Token: "arstarstarst",
Contacts: []Contact{
{ID: bits.Rand(), IP: net.IPv4(1, 2, 3, 4).To4(), Port: 5678},
{ID: bits.Rand(), IP: net.IPv4(1, 2, 3, 4).To4(), PeerPort: 8765},
},
}

View file

@ -143,11 +143,11 @@ func (n *Node) Connect(conn UDPConn) error {
}()
// TODO: turn this back on when you're sure it works right
//n.stop.Add(1)
//go func() {
// defer n.stop.Done()
// n.startRoutingTableGrooming()
//}()
n.grp.Add(1)
go func() {
defer n.grp.Done()
n.startRoutingTableGrooming()
}()
return nil
}
@ -236,7 +236,7 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
// TODO: we should be sending the IP in the request, not just using the sender's IP
// TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ???
if n.tokens.Verify(request.StoreArgs.Value.Token, request.NodeID, addr) {
n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: request.StoreArgs.Value.Port})
n.Store(request.StoreArgs.BlobHash, Contact{ID: request.StoreArgs.NodeID, IP: addr.IP, Port: addr.Port, PeerPort: request.StoreArgs.Value.Port})
err := n.sendMessage(addr, Response{ID: request.ID, NodeID: n.id, Data: storeSuccessResponse})
if err != nil {

View file

@ -1,7 +1,6 @@
package dht
import (
"sort"
"sync"
"time"
@ -268,7 +267,7 @@ func (cf *contactFinder) appendNewToShortlist(contacts []Contact) {
}
}
sortInPlace(cf.shortlist, cf.target)
sortByDistance(cf.shortlist, cf.target)
}
// popFromShortlist pops the first contact off the shortlist and returns it
@ -345,17 +344,3 @@ func (cf *contactFinder) closest(contacts ...Contact) *Contact {
}
return &closest
}
func sortInPlace(contacts []Contact, target bits.Bitmap) {
toSort := make([]sortedContact, len(contacts))
for i, n := range contacts {
toSort[i] = sortedContact{n, n.ID.Xor(target)}
}
sort.Sort(byXorDistance(toSort))
for i, c := range toSort {
contacts[i] = c.contact
}
}

221
dht/node_rpc.go Normal file
View file

@ -0,0 +1,221 @@
package dht
import (
"errors"
"net"
"net/http"
"sync"
"github.com/gorilla/mux"
"github.com/gorilla/rpc"
"github.com/gorilla/rpc/json"
"github.com/lbryio/reflector.go/dht/bits"
)
type NodeRPCServer struct {
Wg sync.WaitGroup
Node *BootstrapNode
}
var mut sync.Mutex
var rpcServer *NodeRPCServer
type NodeRPC int
type PingArgs struct {
NodeID string
IP string
Port int
}
type PingResult string
func (n *NodeRPC) Ping(r *http.Request, args *PingArgs, result *PingResult) error {
if rpcServer == nil {
return errors.New("no node set up")
}
toQuery, err := bits.FromHex(args.NodeID)
if err != nil {
return err
}
c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port}
req := Request{Method: "ping"}
nodeResponse := rpcServer.Node.Send(c, req)
if nodeResponse != nil {
*result = PingResult(nodeResponse.Data)
}
return nil
}
type FindArgs struct {
Key string
NodeID string
IP string
Port int
}
type ContactResponse struct {
NodeID string
IP string
Port int
}
type FindNodeResult []ContactResponse
func (n *NodeRPC) FindNode(r *http.Request, args *FindArgs, result *FindNodeResult) error {
if rpcServer == nil {
return errors.New("no node set up")
}
key, err := bits.FromHex(args.Key)
if err != nil {
return err
}
toQuery, err := bits.FromHex(args.NodeID)
if err != nil {
return err
}
c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port}
req := Request{Arg: &key, Method: "findNode"}
nodeResponse := rpcServer.Node.Send(c, req)
contacts := []ContactResponse{}
if nodeResponse != nil && nodeResponse.Contacts != nil {
for _, foundContact := range nodeResponse.Contacts {
contacts = append(contacts, ContactResponse{foundContact.ID.Hex(), foundContact.IP.String(), foundContact.Port})
}
}
*result = FindNodeResult(contacts)
return nil
}
type FindValueResult struct {
Contacts []ContactResponse
Value string
}
func (n *NodeRPC) FindValue(r *http.Request, args *FindArgs, result *FindValueResult) error {
if rpcServer == nil {
return errors.New("no node set up")
}
key, err := bits.FromHex(args.Key)
if err != nil {
return err
}
toQuery, err := bits.FromHex(args.NodeID)
if err != nil {
return err
}
c := Contact{ID: toQuery, IP: net.ParseIP(args.IP), Port: args.Port}
req := Request{Arg: &key, Method: "findValue"}
nodeResponse := rpcServer.Node.Send(c, req)
contacts := []ContactResponse{}
if nodeResponse != nil && nodeResponse.FindValueKey != "" {
*result = FindValueResult{Value: nodeResponse.FindValueKey}
return nil
} else if nodeResponse != nil && nodeResponse.Contacts != nil {
for _, foundContact := range nodeResponse.Contacts {
contacts = append(contacts, ContactResponse{foundContact.ID.Hex(), foundContact.IP.String(), foundContact.Port})
}
*result = FindValueResult{Contacts: contacts}
return nil
}
return errors.New("not sure what happened")
}
type IterativeFindValueArgs struct {
Key string
}
type IterativeFindValueResult struct {
Contacts []ContactResponse
FoundValue bool
}
func (n *NodeRPC) IterativeFindValue(r *http.Request, args *IterativeFindValueArgs, result *IterativeFindValueResult) error {
if rpcServer == nil {
return errors.New("no node set up")
}
key, err := bits.FromHex(args.Key)
if err != nil {
return err
}
foundContacts, found, err := FindContacts(&rpcServer.Node.Node, key, false, nil)
contacts := []ContactResponse{}
result.FoundValue = found
for _, foundContact := range foundContacts {
contacts = append(contacts, ContactResponse{foundContact.ID.Hex(), foundContact.IP.String(), foundContact.Port})
}
result.Contacts = contacts
return nil
}
type BucketResponse struct {
Start string
End string
Count int
Contacts []ContactResponse
}
type RoutingTableResponse struct {
NodeID string
Count int
Buckets []BucketResponse
}
type GetRoutingTableArgs struct{}
func (n *NodeRPC) GetRoutingTable(r *http.Request, args *GetRoutingTableArgs, result *RoutingTableResponse) error {
if rpcServer == nil {
return errors.New("no node set up")
}
result.NodeID = rpcServer.Node.id.String()
result.Count = len(rpcServer.Node.rt.buckets)
for _, b := range rpcServer.Node.rt.buckets {
bucketInfo := []ContactResponse{}
for _, c := range b.Contacts() {
bucketInfo = append(bucketInfo, ContactResponse{c.ID.String(), c.IP.String(), c.Port})
}
result.Buckets = append(result.Buckets, BucketResponse{
Start: b.Range.Start.String(), End: b.Range.End.String(), Contacts: bucketInfo,
Count: b.Len(),
})
}
return nil
}
type AddKnownNodeResponse struct{}
func (n *NodeRPC) AddKnownNode(r *http.Request, args *ContactResponse, result *AddKnownNodeResponse) error {
if rpcServer == nil {
return errors.New("no node set up")
}
rpcServer.Node.AddKnownNode(
Contact{
bits.FromHexP(args.NodeID),
net.ParseIP(args.IP), args.Port, 0,
})
return nil
}
func RunRPCServer(address, rpcPath string, node *BootstrapNode) NodeRPCServer {
mut.Lock()
defer mut.Unlock()
rpcServer = &NodeRPCServer{
Wg: sync.WaitGroup{},
Node: node,
}
rpcServer.Wg.Add(1)
go func() {
s := rpc.NewServer()
s.RegisterCodec(json.NewCodec(), "application/json")
s.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8")
node := new(NodeRPC)
s.RegisterService(node, "")
r := mux.NewRouter()
r.Handle(rpcPath, s)
server := &http.Server{Addr: address, Handler: r}
log.Println("rpc listening on " + address)
server.ListenAndServe()
}()
return *rpcServer
}

View file

@ -289,7 +289,7 @@ func TestFindValueExisting(t *testing.T) {
messageID := newMessageID()
valueToFind := bits.Rand()
nodeToFind := Contact{ID: bits.Rand(), IP: net.ParseIP("1.2.3.4"), Port: 1286}
nodeToFind := Contact{ID: bits.Rand(), IP: net.ParseIP("1.2.3.4"), PeerPort: 1286}
dht.node.store.Upsert(valueToFind, nodeToFind)
dht.node.store.Upsert(valueToFind, nodeToFind)
dht.node.store.Upsert(valueToFind, nodeToFind)

View file

@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"net"
"sort"
"strconv"
"strings"
"sync"
@ -20,11 +19,17 @@ import (
// TODO: use a tree with bucket splitting instead of a fixed bucket list. include jack's optimization (see link in commit mesg)
// https://github.com/lbryio/lbry/pull/1211/commits/341b27b6d21ac027671d42458826d02735aaae41
// peer is a contact with extra freshness information
// peer is a contact with extra information
type peer struct {
Contact Contact
Distance bits.Bitmap
LastActivity time.Time
NumFailures int
// LastReplied time.Time
// LastRequested time.Time
// LastFailure time.Time
// SecondLastFailure time.Time
NumFailures int
//<lastPublished>,
//<originallyPublished>
// <originalPublisherID>
@ -55,6 +60,15 @@ type bucket struct {
lock *sync.RWMutex
peers []peer
lastUpdate time.Time
Range bits.Range // capitalized because `range` is a keyword
}
func newBucket(r bits.Range) *bucket {
return &bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
Range: r,
}
}
// Len returns the number of peers in the bucket
@ -64,6 +78,17 @@ func (b bucket) Len() int {
return len(b.peers)
}
func (b bucket) Has(c Contact) bool {
b.lock.RLock()
defer b.lock.RUnlock()
for _, p := range b.peers {
if p.Contact.Equals(c, true) {
return true
}
}
return false
}
// Contacts returns a slice of the bucket's contacts
func (b bucket) Contacts() []Contact {
b.lock.RLock()
@ -75,17 +100,20 @@ func (b bucket) Contacts() []Contact {
return contacts
}
// UpdateContact marks a contact as having been successfully contacted. if insertIfNew and the contact is does not exist yet, it is inserted
func (b *bucket) UpdateContact(c Contact, insertIfNew bool) {
// UpdatePeer marks a contact as having been successfully contacted. if insertIfNew and the contact is does not exist yet, it is inserted
func (b *bucket) UpdatePeer(p peer, insertIfNew bool) error {
b.lock.Lock()
defer b.lock.Unlock()
peerIndex := find(c.ID, b.peers)
if !b.Range.Contains(p.Distance) {
return errors.Err("this bucket range does not cover this peer")
}
peerIndex := find(p.Contact.ID, b.peers)
if peerIndex >= 0 {
b.lastUpdate = time.Now()
b.peers[peerIndex].Touch()
moveToBack(b.peers, peerIndex)
} else if insertIfNew {
hasRoom := true
@ -103,11 +131,12 @@ func (b *bucket) UpdateContact(c Contact, insertIfNew bool) {
if hasRoom {
b.lastUpdate = time.Now()
peer := peer{Contact: c}
peer.Touch()
b.peers = append(b.peers, peer)
p.Touch()
b.peers = append(b.peers, p)
}
}
return nil
}
// FailContact marks a contact as having failed, and removes it if it failed too many times
@ -138,28 +167,61 @@ func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool {
return time.Since(b.lastUpdate) > refreshInterval
}
func (b *bucket) Split() (*bucket, *bucket) {
b.lock.Lock()
defer b.lock.Unlock()
left := newBucket(b.Range.IntervalP(1, 2))
right := newBucket(b.Range.IntervalP(2, 2))
left.lastUpdate = b.lastUpdate
right.lastUpdate = b.lastUpdate
for _, p := range b.peers {
if left.Range.Contains(p.Distance) {
left.peers = append(left.peers, p)
} else {
right.peers = append(right.peers, p)
}
}
if len(b.peers) > 1 {
if len(left.peers) == 0 {
left, right = right.Split()
left.Range.Start = b.Range.Start
} else if len(right.peers) == 0 {
left, right = left.Split()
right.Range.End = b.Range.End
}
}
return left, right
}
type routingTable struct {
id bits.Bitmap
buckets [nodeIDBits]bucket
buckets []*bucket
mu *sync.RWMutex // this mutex is write-locked only when CHANGING THE NUMBER OF BUCKETS in the table
}
func newRoutingTable(id bits.Bitmap) *routingTable {
var rt routingTable
rt.id = id
rt := routingTable{
id: id,
mu: &sync.RWMutex{},
}
rt.reset()
return &rt
}
func (rt *routingTable) reset() {
for i := range rt.buckets {
rt.buckets[i] = bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
}
}
rt.mu.Lock()
defer rt.mu.Unlock()
rt.buckets = []*bucket{newBucket(bits.MaxRange())}
}
func (rt *routingTable) BucketInfo() string {
rt.mu.RLock()
defer rt.mu.RUnlock()
var bucketInfo []string
for i, b := range rt.buckets {
if b.Len() > 0 {
@ -168,7 +230,7 @@ func (rt *routingTable) BucketInfo() string {
for j, c := range contacts {
s[j] = c.ID.HexShort()
}
bucketInfo = append(bucketInfo, fmt.Sprintf("Bucket %d: (%d) %s", i, len(contacts), strings.Join(s, ", ")))
bucketInfo = append(bucketInfo, fmt.Sprintf("bucket %d: (%d) %s", i, len(contacts), strings.Join(s, ", ")))
}
}
if len(bucketInfo) == 0 {
@ -179,64 +241,72 @@ func (rt *routingTable) BucketInfo() string {
// Update inserts or refreshes a contact
func (rt *routingTable) Update(c Contact) {
rt.bucketFor(c.ID).UpdateContact(c, true)
rt.mu.Lock() // write lock, because updates may cause bucket splits
defer rt.mu.Unlock()
b := rt.bucketFor(c.ID)
if rt.shouldSplit(b, c) {
left, right := b.Split()
for i := range rt.buckets {
if rt.buckets[i].Range.Start.Equals(left.Range.Start) {
rt.buckets = append(rt.buckets[:i], append([]*bucket{left, right}, rt.buckets[i+1:]...)...)
break
}
}
if left.Range.Contains(c.ID) {
b = left
} else {
b = right
}
}
b.UpdatePeer(peer{Contact: c, Distance: rt.id.Xor(c.ID)}, true)
}
// Fresh refreshes a contact if its already in the routing table
func (rt *routingTable) Fresh(c Contact) {
rt.bucketFor(c.ID).UpdateContact(c, false)
rt.mu.RLock()
defer rt.mu.RUnlock()
rt.bucketFor(c.ID).UpdatePeer(peer{Contact: c, Distance: rt.id.Xor(c.ID)}, false)
}
// FailContact marks a contact as having failed, and removes it if it failed too many times
func (rt *routingTable) Fail(c Contact) {
rt.mu.RLock()
defer rt.mu.RUnlock()
rt.bucketFor(c.ID).FailContact(c.ID)
}
// GetClosest returns the closest `limit` contacts from the routing table
// It marks each bucket it accesses as having been accessed
// GetClosest returns the closest `limit` contacts from the routing table.
// This is a locking wrapper around getClosest()
func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact {
var toSort []sortedContact
var bucketNum int
if rt.id.Equals(target) {
bucketNum = 0
} else {
bucketNum = rt.bucketNumFor(target)
}
toSort = appendContacts(toSort, rt.buckets[bucketNum], target)
for i := 1; (bucketNum-i >= 0 || bucketNum+i < nodeIDBits) && len(toSort) < limit; i++ {
if bucketNum-i >= 0 {
toSort = appendContacts(toSort, rt.buckets[bucketNum-i], target)
}
if bucketNum+i < nodeIDBits {
toSort = appendContacts(toSort, rt.buckets[bucketNum+i], target)
}
}
sort.Sort(byXorDistance(toSort))
var contacts []Contact
for _, sorted := range toSort {
contacts = append(contacts, sorted.contact)
if len(contacts) >= limit {
break
}
}
return contacts
rt.mu.RLock()
defer rt.mu.RUnlock()
return rt.getClosest(target, limit)
}
func appendContacts(contacts []sortedContact, b bucket, target bits.Bitmap) []sortedContact {
for _, contact := range b.Contacts() {
contacts = append(contacts, sortedContact{contact, contact.ID.Xor(target)})
// getClosest returns the closest `limit` contacts from the routing table
func (rt *routingTable) getClosest(target bits.Bitmap, limit int) []Contact {
var contacts []Contact
for _, b := range rt.buckets {
contacts = append(contacts, b.Contacts()...)
}
sortByDistance(contacts, target)
if len(contacts) > limit {
contacts = contacts[:limit]
}
return contacts
}
// Count returns the number of contacts in the routing table
func (rt *routingTable) Count() int {
rt.mu.RLock()
defer rt.mu.RUnlock()
count := 0
for _, bucket := range rt.buckets {
count += bucket.Len()
@ -244,28 +314,51 @@ func (rt *routingTable) Count() int {
return count
}
// BucketRanges returns a slice of ranges, where the `start` of each range is the smallest id that can
// go in that bucket, and the `end` is the largest id
func (rt *routingTable) BucketRanges() []bits.Range {
ranges := make([]bits.Range, len(rt.buckets))
for i := range rt.buckets {
ranges[i] = bits.Range{
Start: rt.id.Suffix(i, false).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
End: rt.id.Suffix(i, true).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
}
}
return ranges
}
func (rt *routingTable) bucketNumFor(target bits.Bitmap) int {
if rt.id.Equals(target) {
panic("routing table does not have a bucket for its own id")
}
return nodeIDBits - 1 - target.Xor(rt.id).PrefixLen()
// Len returns the number of buckets in the routing table
func (rt *routingTable) Len() int {
rt.mu.RLock()
defer rt.mu.RUnlock()
return len(rt.buckets)
}
func (rt *routingTable) bucketFor(target bits.Bitmap) *bucket {
return &rt.buckets[rt.bucketNumFor(target)]
if rt.id.Equals(target) {
panic("routing table does not have a bucket for its own id")
}
distance := target.Xor(rt.id)
for _, b := range rt.buckets {
if b.Range.Contains(distance) {
return b
}
}
panic("target is not contained in any buckets")
}
func (rt *routingTable) shouldSplit(b *bucket, c Contact) bool {
if b.Has(c) {
return false
}
if b.Len() >= bucketSize {
if b.Range.Start.Equals(bits.Bitmap{}) { // this is the bucket covering our node id
return true
}
kClosest := rt.getClosest(rt.id, bucketSize)
kthClosest := kClosest[len(kClosest)-1]
if rt.id.Closer(c.ID, kthClosest.ID) {
return true
}
}
return false
}
func (rt *routingTable) printBucketInfo() {
fmt.Printf("there are %d contacts in %d buckets\n", rt.Count(), rt.Len())
for i, b := range rt.buckets {
fmt.Printf("bucket %d, %d contacts\n", i+1, len(b.peers))
fmt.Printf(" start : %s\n", b.Range.Start.String())
fmt.Printf(" stop : %s\n", b.Range.End.String())
fmt.Println("")
}
}
func (rt *routingTable) GetIDsForRefresh(refreshInterval time.Duration) []bits.Bitmap {

View file

@ -2,47 +2,198 @@ package dht
import (
"encoding/json"
"math/big"
"net"
"strconv"
"strings"
"testing"
"github.com/lbryio/reflector.go/dht/bits"
"github.com/sebdah/goldie"
)
func TestRoutingTable_bucketFor(t *testing.T) {
func TestBucket_Split(t *testing.T) {
rt := newRoutingTable(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
if len(rt.buckets) != 1 {
t.Errorf("there should only be one bucket so far")
}
if len(rt.buckets[0].peers) != 0 {
t.Errorf("there should be no contacts yet")
}
var tests = []struct {
name string
id bits.Bitmap
expectedBucketCount int
expectedTotalContacts int
}{
//fill first bucket
{"b1-one", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100"), 1, 1},
{"b1-two", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200"), 1, 2},
{"b1-three", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000300"), 1, 3},
{"b1-four", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000400"), 1, 4},
{"b1-five", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000500"), 1, 5},
{"b1-six", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000600"), 1, 6},
{"b1-seven", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000700"), 1, 7},
{"b1-eight", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000800"), 1, 8},
// split off second bucket and fill it
{"b2-one", bits.FromHexP("001000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 9},
{"b2-two", bits.FromHexP("002000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 10},
{"b2-three", bits.FromHexP("003000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 11},
{"b2-four", bits.FromHexP("004000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 12},
{"b2-five", bits.FromHexP("005000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 13},
{"b2-six", bits.FromHexP("006000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 14},
{"b2-seven", bits.FromHexP("007000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 15},
// at this point there are two buckets. the first has 7 contacts, the second has 8
// inserts into the second bucket should be skipped
{"dont-split", bits.FromHexP("009000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 15},
// ... unless the ID is closer than the kth-closest contact
{"split-kth-closest", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), 2, 16},
{"b3-two", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), 3, 17},
{"b3-three", bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), 3, 18},
}
for i, testCase := range tests {
rt.Update(Contact{testCase.id, net.ParseIP("127.0.0.1"), 8000 + i, 0})
if len(rt.buckets) != testCase.expectedBucketCount {
t.Errorf("failed test case %s. there should be %d buckets, got %d", testCase.name, testCase.expectedBucketCount, len(rt.buckets))
}
if rt.Count() != testCase.expectedTotalContacts {
t.Errorf("failed test case %s. there should be %d contacts, got %d", testCase.name, testCase.expectedTotalContacts, rt.Count())
}
}
var testRanges = []struct {
id bits.Bitmap
expected int
}{
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), 0},
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), 1},
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), 1},
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004"), 2},
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005"), 2},
{bits.FromHexP("00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f"), 3},
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010"), 4},
{bits.FromHexP("F00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 383},
{bits.FromHexP("F0000000000000000000000000000000F0000000000000000000000000F0000000000000000000000000000000000000"), 383},
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000005"), 0},
{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000410"), 1},
{bits.FromHexP("0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000007f0"), 1},
{bits.FromHexP("F00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000800"), 2},
{bits.FromHexP("F00000000000000000000000000000000000000000000000000F00000000000000000000000000000000000000000000"), 2},
{bits.FromHexP("F0000000000000000000000000000000F0000000000000000000000000F0000000000000000000000000000000000000"), 2},
}
for _, tt := range tests {
bucket := rt.bucketNumFor(tt.id)
for _, tt := range testRanges {
bucket := bucketNumFor(rt, tt.id)
if bucket != tt.expected {
t.Errorf("bucketFor(%s, %s) => %d, want %d", tt.id.Hex(), rt.id.Hex(), bucket, tt.expected)
t.Errorf("bucketFor(%s, %s) => got %d, expected %d", tt.id.Hex(), rt.id.Hex(), bucket, tt.expected)
}
}
}
func bucketNumFor(rt *routingTable, target bits.Bitmap) int {
if rt.id.Equals(target) {
panic("routing table does not have a bucket for its own id")
}
distance := target.Xor(rt.id)
for i := range rt.buckets {
if rt.buckets[i].Range.Contains(distance) {
return i
}
}
panic("target is not contained in any buckets")
}
func TestBucket_Split_Continuous(t *testing.T) {
b := newBucket(bits.MaxRange())
left, right := b.Split()
if !left.Range.Start.Equals(b.Range.Start) {
t.Errorf("left bucket start does not align with original bucket start. got %s, expected %s", left.Range.Start, b.Range.Start)
}
if !right.Range.End.Equals(b.Range.End) {
t.Errorf("right bucket end does not align with original bucket end. got %s, expected %s", right.Range.End, b.Range.End)
}
leftEndNext := (&big.Int{}).Add(left.Range.End.Big(), big.NewInt(1))
if !bits.FromBigP(leftEndNext).Equals(right.Range.Start) {
t.Errorf("there's a gap between left bucket end and right bucket start. end is %s, start is %s", left.Range.End, right.Range.Start)
}
}
func TestBucket_Split_KthClosest_DoSplit(t *testing.T) {
rt := newRoutingTable(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
// add 4 low IDs
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), net.ParseIP("127.0.0.1"), 8001, 0})
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), net.ParseIP("127.0.0.1"), 8002, 0})
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), net.ParseIP("127.0.0.1"), 8003, 0})
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004"), net.ParseIP("127.0.0.1"), 8004, 0})
// add 4 high IDs
rt.Update(Contact{bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8001, 0})
rt.Update(Contact{bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8002, 0})
rt.Update(Contact{bits.FromHexP("a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8003, 0})
rt.Update(Contact{bits.FromHexP("b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8004, 0})
// split the bucket and fill the high bucket
rt.Update(Contact{bits.FromHexP("c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8005, 0})
rt.Update(Contact{bits.FromHexP("d00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8006, 0})
rt.Update(Contact{bits.FromHexP("e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8007, 0})
rt.Update(Contact{bits.FromHexP("f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8008, 0})
// add a high ID. it should split because the high ID is closer than the Kth closest ID
rt.Update(Contact{bits.FromHexP("910000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.1"), 8009, 0})
if len(rt.buckets) != 3 {
t.Errorf("expected 3 buckets, got %d", len(rt.buckets))
}
if rt.Count() != 13 {
t.Errorf("expected 13 contacts, got %d", rt.Count())
}
}
func TestBucket_Split_KthClosest_DontSplit(t *testing.T) {
rt := newRoutingTable(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
// add 4 low IDs
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001"), net.ParseIP("127.0.0.1"), 8001, 0})
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002"), net.ParseIP("127.0.0.1"), 8002, 0})
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000003"), net.ParseIP("127.0.0.1"), 8003, 0})
rt.Update(Contact{bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004"), net.ParseIP("127.0.0.1"), 8004, 0})
// add 4 high IDs
rt.Update(Contact{bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8001, 0})
rt.Update(Contact{bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8002, 0})
rt.Update(Contact{bits.FromHexP("a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8003, 0})
rt.Update(Contact{bits.FromHexP("b00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8004, 0})
// split the bucket and fill the high bucket
rt.Update(Contact{bits.FromHexP("c00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8005, 0})
rt.Update(Contact{bits.FromHexP("d00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8006, 0})
rt.Update(Contact{bits.FromHexP("e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8007, 0})
rt.Update(Contact{bits.FromHexP("f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.2"), 8008, 0})
// add a really high ID. this should not split because its not closer than the Kth closest ID
rt.Update(Contact{bits.FromHexP("ffff00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), net.ParseIP("127.0.0.1"), 8009, 0})
if len(rt.buckets) != 2 {
t.Errorf("expected 2 buckets, got %d", len(rt.buckets))
}
if rt.Count() != 12 {
t.Errorf("expected 12 contacts, got %d", rt.Count())
}
}
func TestRoutingTable_GetClosest(t *testing.T) {
n1 := bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n3 := bits.FromHexP("111111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
rt := newRoutingTable(n1)
rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001})
rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002})
rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001, 0})
rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002, 0})
contacts := rt.GetClosest(bits.FromHexP("222222220000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1)
if len(contacts) != 1 {
@ -52,7 +203,6 @@ func TestRoutingTable_GetClosest(t *testing.T) {
if !contacts[0].ID.Equals(n3) {
t.Error(contacts[0])
}
contacts = rt.GetClosest(n2, 10)
if len(contacts) != 2 {
t.Error(len(contacts))
@ -121,42 +271,17 @@ func TestRoutingTable_MoveToBack(t *testing.T) {
}
}
func TestRoutingTable_BucketRanges(t *testing.T) {
id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41")
ranges := newRoutingTable(id).BucketRanges()
if !ranges[0].Start.Equals(ranges[0].End) {
t.Error("first bucket should only fit exactly one id")
}
for i := 0; i < 1000; i++ {
randID := bits.Rand()
found := -1
for i, r := range ranges {
if r.Start.Cmp(randID) <= 0 && r.End.Cmp(randID) >= 0 {
if found >= 0 {
t.Errorf("%s appears in buckets %d and %d", randID.Hex(), found, i)
} else {
found = i
}
}
}
if found < 0 {
t.Errorf("%s did not appear in any bucket", randID.Hex())
}
}
}
func TestRoutingTable_Save(t *testing.T) {
t.Skip("fix me")
id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41")
rt := newRoutingTable(id)
ranges := rt.BucketRanges()
for i, r := range ranges {
for i, b := range rt.buckets {
for j := 0; j < bucketSize; j++ {
toAdd := r.Start.Add(bits.FromShortHexP(strconv.Itoa(j)))
if toAdd.Cmp(r.End) <= 0 {
toAdd := b.Range.Start.Add(bits.FromShortHexP(strconv.Itoa(j)))
if toAdd.Cmp(b.Range.End) <= 0 {
rt.Update(Contact{
ID: r.Start.Add(bits.FromShortHexP(strconv.Itoa(j))),
ID: b.Range.Start.Add(bits.FromShortHexP(strconv.Itoa(j))),
IP: net.ParseIP("1.2.3." + strconv.Itoa(j)),
Port: 1 + i*bucketSize + j,
})
@ -173,6 +298,7 @@ func TestRoutingTable_Save(t *testing.T) {
}
func TestRoutingTable_Load_ID(t *testing.T) {
t.Skip("fix me")
id := "1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41"
data := []byte(`{"id": "` + id + `","contacts": []}`)