add BucketRange to bucket struct

-initialize the routing table with one bucket covering the entire keyspace
This commit is contained in:
Jack Robison 2018-06-26 13:42:36 -04:00
parent c7717add23
commit 2d0b3547d7
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 146 additions and 54 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/dht/bits"
"math/big"
)
// TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap
@ -52,9 +53,10 @@ func (p *peer) Fail() {
}
type bucket struct {
lock *sync.RWMutex
peers []peer
lastUpdate time.Time
lock *sync.RWMutex
peers []peer
lastUpdate time.Time
bucketRange *bits.Range
}
// Len returns the number of peers in the bucket
@ -80,6 +82,8 @@ func (b *bucket) UpdateContact(c Contact, insertIfNew bool) {
b.lock.Lock()
defer b.lock.Unlock()
// TODO: verify the peer is in the bucket key range
peerIndex := find(c.ID, b.peers)
if peerIndex >= 0 {
b.lastUpdate = time.Now()
@ -140,7 +144,7 @@ func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool {
type routingTable struct {
id bits.Bitmap
buckets [nodeIDBits]bucket
buckets []bucket
}
func newRoutingTable(id bits.Bitmap) *routingTable {
@ -151,12 +155,19 @@ func newRoutingTable(id bits.Bitmap) *routingTable {
}
func (rt *routingTable) reset() {
for i := range rt.buckets {
rt.buckets[i] = bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
}
}
start := big.NewInt(0)
end := big.NewInt(1)
end.Lsh(end, bits.NumBits)
end.Sub(end, big.NewInt(1))
rt.buckets = []bucket{}
rt.buckets = append(rt.buckets, bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
bucketRange: &bits.Range{
Start: bits.FromBigP(start),
End: bits.FromBigP(end),
},
})
}
func (rt *routingTable) BucketInfo() string {
@ -179,7 +190,7 @@ func (rt *routingTable) BucketInfo() string {
// Update inserts or refreshes a contact
func (rt *routingTable) Update(c Contact) {
rt.bucketFor(c.ID).UpdateContact(c, true)
rt.insertContact(c)
}
// Fresh refreshes a contact if its already in the routing table
@ -195,36 +206,18 @@ func (rt *routingTable) Fail(c Contact) {
// GetClosest returns the closest `limit` contacts from the routing table
// It marks each bucket it accesses as having been accessed
func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact {
var toSort []sortedContact
var bucketNum int
if rt.id.Equals(target) {
bucketNum = 0
} else {
bucketNum = rt.bucketNumFor(target)
toSort := []sortedContact{}
for _, b := range rt.buckets {
toSort = appendContacts(toSort, b, target)
}
toSort = appendContacts(toSort, rt.buckets[bucketNum], target)
for i := 1; (bucketNum-i >= 0 || bucketNum+i < nodeIDBits) && len(toSort) < limit; i++ {
if bucketNum-i >= 0 {
toSort = appendContacts(toSort, rt.buckets[bucketNum-i], target)
}
if bucketNum+i < nodeIDBits {
toSort = appendContacts(toSort, rt.buckets[bucketNum+i], target)
}
}
sort.Sort(byXorDistance(toSort))
var contacts []Contact
contacts := []Contact{}
for _, sorted := range toSort {
contacts = append(contacts, sorted.contact)
if len(contacts) >= limit {
break
}
}
return contacts
}
@ -248,11 +241,8 @@ func (rt *routingTable) Count() int {
// go in that bucket, and the `end` is the largest id
func (rt *routingTable) BucketRanges() []bits.Range {
ranges := make([]bits.Range, len(rt.buckets))
for i := range rt.buckets {
ranges[i] = bits.Range{
Start: rt.id.Suffix(i, false).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
End: rt.id.Suffix(i, true).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
}
for i, b := range rt.buckets {
ranges[i] = *b.bucketRange
}
return ranges
}
@ -261,13 +251,89 @@ func (rt *routingTable) bucketNumFor(target bits.Bitmap) int {
if rt.id.Equals(target) {
panic("routing table does not have a bucket for its own id")
}
return nodeIDBits - 1 - target.Xor(rt.id).PrefixLen()
distance := target.Xor(rt.id)
for i, b := range rt.buckets {
if b.bucketRange.Start.Cmp(distance) <= 0 && b.bucketRange.End.Cmp(distance) >= 0 {
return i
}
}
panic("target value overflows the key space")
}
func (rt *routingTable) bucketFor(target bits.Bitmap) *bucket {
return &rt.buckets[rt.bucketNumFor(target)]
}
func (rt *routingTable) shouldSplit(target bits.Bitmap) bool {
bucketIndex := rt.bucketNumFor(target)
if len(rt.buckets[bucketIndex].peers) >= bucketSize {
if bucketIndex == 0 { // this is the bucket covering our node id
return true
}
kClosest := rt.GetClosest(rt.id, bucketSize)
kthClosest := kClosest[len(kClosest) - 1]
if target.Xor(rt.id).Cmp(kthClosest.ID.Xor(rt.id)) < 0 {
return true // the kth closest contact is further than this one
}
}
return false
}
func (rt *routingTable) insertContact(c Contact) {
if len(rt.buckets[rt.bucketNumFor(c.ID)].peers) < bucketSize {
rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true)
} else if rt.shouldSplit(c.ID) {
rt.recursiveInsertContact(c)
}
}
func (rt *routingTable) recursiveInsertContact(c Contact) {
bucketIndex := rt.bucketNumFor(c.ID)
b := rt.buckets[bucketIndex]
min := b.bucketRange.Start.Big()
max := b.bucketRange.End.Big()
midpoint := max.Sub(max, min)
midpoint.Div(midpoint, big.NewInt(2))
// re-size the bucket to be split
b.bucketRange.Start = bits.FromBigP(min)
b.bucketRange.End = bits.FromBigP(midpoint.Sub(midpoint, big.NewInt(1)))
movedPeers := []peer{}
resizedPeers := []peer{}
// set the re-sized bucket to only have peers still in range
for _, p := range b.peers {
if rt.bucketNumFor(p.Contact.ID) != bucketIndex {
movedPeers = append(movedPeers, p)
} else {
resizedPeers = append(resizedPeers, p)
}
}
b.peers = resizedPeers
// add the new bucket
insert := bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
bucketRange: &bits.Range{
Start: bits.FromBigP(midpoint),
End: bits.FromBigP(max),
},
}
rt.buckets = append(rt.buckets[:bucketIndex], append([]bucket{insert}, rt.buckets[bucketIndex:]...)...)
// re-insert the contacts that where out of range of the split bucket
for _, p := range movedPeers {
rt.insertContact(p.Contact)
}
// insert the new contact
rt.insertContact(c)
}
func (rt *routingTable) GetIDsForRefresh(refreshInterval time.Duration) []bits.Bitmap {
var bitmaps []bits.Bitmap
for i, bucket := range rt.buckets {

View file

@ -36,6 +36,31 @@ func TestRoutingTable_bucketFor(t *testing.T) {
}
}
func TestRoutingTableFillBuckets(t *testing.T) {
n1 := bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n3 := bits.FromHexP("111111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n4 := bits.FromHexP("111111120000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n5 := bits.FromHexP("111111130000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n6 := bits.FromHexP("111111140000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n7 := bits.FromHexP("111111150000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n8 := bits.FromHexP("111111160000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n9 := bits.FromHexP("111111070000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
rt := newRoutingTable(n1)
rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001})
rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002})
rt.Update(Contact{n4, net.ParseIP("127.0.0.1"), 8003})
rt.Update(Contact{n5, net.ParseIP("127.0.0.1"), 8004})
rt.Update(Contact{n6, net.ParseIP("127.0.0.1"), 8005})
rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8006})
rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8007})
rt.Update(Contact{n8, net.ParseIP("127.0.0.1"), 8008})
rt.Update(Contact{n9, net.ParseIP("127.0.0.1"), 8009})
log.Printf(rt.BucketInfo())
}
func TestRoutingTable_GetClosest(t *testing.T) {
n1 := bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
@ -121,28 +146,29 @@ func TestRoutingTable_MoveToBack(t *testing.T) {
}
}
func TestRoutingTable_BucketRanges(t *testing.T) {
func TestRoutingTable_InitialBucketRange(t *testing.T) {
id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41")
ranges := newRoutingTable(id).BucketRanges()
if !ranges[0].Start.Equals(ranges[0].End) {
t.Error("first bucket should only fit exactly one id")
bucketRange := ranges[0]
if len(ranges) != 1 {
t.Error("there should only be one bucket")
}
if !ranges[0].Start.Equals(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")) {
t.Error("bucket does not cover the lower keyspace")
}
if !ranges[0].End.Equals(bits.FromHexP("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) {
t.Error("bucket does not cover the upper keyspace")
}
found := 0
for i := 0; i < 1000; i++ {
randID := bits.Rand()
found := -1
for i, r := range ranges {
if r.Start.Cmp(randID) <= 0 && r.End.Cmp(randID) >= 0 {
if found >= 0 {
t.Errorf("%s appears in buckets %d and %d", randID.Hex(), found, i)
} else {
found = i
}
}
}
if found < 0 {
t.Errorf("%s did not appear in any bucket", randID.Hex())
if bucketRange.Start.Cmp(randID) <= 0 && bucketRange.End.Cmp(randID) >= 0 {
found += 1
}
}
if found != 1000 {
t.Errorf("%d did not appear in any bucket", found)
}
}
func TestRoutingTable_Save(t *testing.T) {