From 2d0b3547d70d9991ba0a89d9e84c83614403ef59 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 26 Jun 2018 13:42:36 -0400 Subject: [PATCH] add BucketRange to bucket struct -initialize the routing table with one bucket covering the entire keyspace --- dht/routing_table.go | 144 +++++++++++++++++++++++++++----------- dht/routing_table_test.go | 56 +++++++++++---- 2 files changed, 146 insertions(+), 54 deletions(-) diff --git a/dht/routing_table.go b/dht/routing_table.go index d3d2818..ec851be 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -13,6 +13,7 @@ import ( "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/stop" "github.com/lbryio/reflector.go/dht/bits" + "math/big" ) // TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap @@ -52,9 +53,10 @@ func (p *peer) Fail() { } type bucket struct { - lock *sync.RWMutex - peers []peer - lastUpdate time.Time + lock *sync.RWMutex + peers []peer + lastUpdate time.Time + bucketRange *bits.Range } // Len returns the number of peers in the bucket @@ -80,6 +82,8 @@ func (b *bucket) UpdateContact(c Contact, insertIfNew bool) { b.lock.Lock() defer b.lock.Unlock() + // TODO: verify the peer is in the bucket key range + peerIndex := find(c.ID, b.peers) if peerIndex >= 0 { b.lastUpdate = time.Now() @@ -140,7 +144,7 @@ func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool { type routingTable struct { id bits.Bitmap - buckets [nodeIDBits]bucket + buckets []bucket } func newRoutingTable(id bits.Bitmap) *routingTable { @@ -151,12 +155,19 @@ func newRoutingTable(id bits.Bitmap) *routingTable { } func (rt *routingTable) reset() { - for i := range rt.buckets { - rt.buckets[i] = bucket{ - peers: make([]peer, 0, bucketSize), - lock: &sync.RWMutex{}, - } - } + start := big.NewInt(0) + end := big.NewInt(1) + end.Lsh(end, bits.NumBits) + end.Sub(end, big.NewInt(1)) + rt.buckets = []bucket{} + rt.buckets = append(rt.buckets, bucket{ + peers: make([]peer, 0, bucketSize), + lock: &sync.RWMutex{}, + bucketRange: &bits.Range{ + Start: bits.FromBigP(start), + End: bits.FromBigP(end), + }, + }) } func (rt *routingTable) BucketInfo() string { @@ -179,7 +190,7 @@ func (rt *routingTable) BucketInfo() string { // Update inserts or refreshes a contact func (rt *routingTable) Update(c Contact) { - rt.bucketFor(c.ID).UpdateContact(c, true) + rt.insertContact(c) } // Fresh refreshes a contact if its already in the routing table @@ -195,36 +206,18 @@ func (rt *routingTable) Fail(c Contact) { // GetClosest returns the closest `limit` contacts from the routing table // It marks each bucket it accesses as having been accessed func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact { - var toSort []sortedContact - var bucketNum int - - if rt.id.Equals(target) { - bucketNum = 0 - } else { - bucketNum = rt.bucketNumFor(target) + toSort := []sortedContact{} + for _, b := range rt.buckets { + toSort = appendContacts(toSort, b, target) } - - toSort = appendContacts(toSort, rt.buckets[bucketNum], target) - - for i := 1; (bucketNum-i >= 0 || bucketNum+i < nodeIDBits) && len(toSort) < limit; i++ { - if bucketNum-i >= 0 { - toSort = appendContacts(toSort, rt.buckets[bucketNum-i], target) - } - if bucketNum+i < nodeIDBits { - toSort = appendContacts(toSort, rt.buckets[bucketNum+i], target) - } - } - sort.Sort(byXorDistance(toSort)) - - var contacts []Contact + contacts := []Contact{} for _, sorted := range toSort { contacts = append(contacts, sorted.contact) if len(contacts) >= limit { break } } - return contacts } @@ -248,11 +241,8 @@ func (rt *routingTable) Count() int { // go in that bucket, and the `end` is the largest id func (rt *routingTable) BucketRanges() []bits.Range { ranges := make([]bits.Range, len(rt.buckets)) - for i := range rt.buckets { - ranges[i] = bits.Range{ - Start: rt.id.Suffix(i, false).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)), - End: rt.id.Suffix(i, true).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)), - } + for i, b := range rt.buckets { + ranges[i] = *b.bucketRange } return ranges } @@ -261,13 +251,89 @@ func (rt *routingTable) bucketNumFor(target bits.Bitmap) int { if rt.id.Equals(target) { panic("routing table does not have a bucket for its own id") } - return nodeIDBits - 1 - target.Xor(rt.id).PrefixLen() + distance := target.Xor(rt.id) + for i, b := range rt.buckets { + if b.bucketRange.Start.Cmp(distance) <= 0 && b.bucketRange.End.Cmp(distance) >= 0 { + return i + } + } + panic("target value overflows the key space") } func (rt *routingTable) bucketFor(target bits.Bitmap) *bucket { return &rt.buckets[rt.bucketNumFor(target)] } +func (rt *routingTable) shouldSplit(target bits.Bitmap) bool { + bucketIndex := rt.bucketNumFor(target) + if len(rt.buckets[bucketIndex].peers) >= bucketSize { + if bucketIndex == 0 { // this is the bucket covering our node id + return true + } + kClosest := rt.GetClosest(rt.id, bucketSize) + kthClosest := kClosest[len(kClosest) - 1] + if target.Xor(rt.id).Cmp(kthClosest.ID.Xor(rt.id)) < 0 { + return true // the kth closest contact is further than this one + } + } + return false +} + +func (rt *routingTable) insertContact(c Contact) { + if len(rt.buckets[rt.bucketNumFor(c.ID)].peers) < bucketSize { + rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true) + } else if rt.shouldSplit(c.ID) { + rt.recursiveInsertContact(c) + } +} + +func (rt *routingTable) recursiveInsertContact(c Contact) { + bucketIndex := rt.bucketNumFor(c.ID) + b := rt.buckets[bucketIndex] + min := b.bucketRange.Start.Big() + max := b.bucketRange.End.Big() + + midpoint := max.Sub(max, min) + midpoint.Div(midpoint, big.NewInt(2)) + + // re-size the bucket to be split + b.bucketRange.Start = bits.FromBigP(min) + b.bucketRange.End = bits.FromBigP(midpoint.Sub(midpoint, big.NewInt(1))) + + movedPeers := []peer{} + resizedPeers := []peer{} + + // set the re-sized bucket to only have peers still in range + for _, p := range b.peers { + if rt.bucketNumFor(p.Contact.ID) != bucketIndex { + movedPeers = append(movedPeers, p) + } else { + resizedPeers = append(resizedPeers, p) + } + } + b.peers = resizedPeers + + // add the new bucket + insert := bucket{ + peers: make([]peer, 0, bucketSize), + lock: &sync.RWMutex{}, + bucketRange: &bits.Range{ + Start: bits.FromBigP(midpoint), + End: bits.FromBigP(max), + }, + } + rt.buckets = append(rt.buckets[:bucketIndex], append([]bucket{insert}, rt.buckets[bucketIndex:]...)...) + + // re-insert the contacts that where out of range of the split bucket + for _, p := range movedPeers { + rt.insertContact(p.Contact) + } + + // insert the new contact + rt.insertContact(c) +} + + func (rt *routingTable) GetIDsForRefresh(refreshInterval time.Duration) []bits.Bitmap { var bitmaps []bits.Bitmap for i, bucket := range rt.buckets { diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go index 8850f98..2418d06 100644 --- a/dht/routing_table_test.go +++ b/dht/routing_table_test.go @@ -36,6 +36,31 @@ func TestRoutingTable_bucketFor(t *testing.T) { } } +func TestRoutingTableFillBuckets(t *testing.T) { + n1 := bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n3 := bits.FromHexP("111111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n4 := bits.FromHexP("111111120000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n5 := bits.FromHexP("111111130000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n6 := bits.FromHexP("111111140000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n7 := bits.FromHexP("111111150000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n8 := bits.FromHexP("111111160000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + n9 := bits.FromHexP("111111070000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") + + rt := newRoutingTable(n1) + rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001}) + rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002}) + rt.Update(Contact{n4, net.ParseIP("127.0.0.1"), 8003}) + rt.Update(Contact{n5, net.ParseIP("127.0.0.1"), 8004}) + rt.Update(Contact{n6, net.ParseIP("127.0.0.1"), 8005}) + rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8006}) + rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8007}) + rt.Update(Contact{n8, net.ParseIP("127.0.0.1"), 8008}) + rt.Update(Contact{n9, net.ParseIP("127.0.0.1"), 8009}) + + log.Printf(rt.BucketInfo()) +} + func TestRoutingTable_GetClosest(t *testing.T) { n1 := bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") @@ -121,28 +146,29 @@ func TestRoutingTable_MoveToBack(t *testing.T) { } } -func TestRoutingTable_BucketRanges(t *testing.T) { +func TestRoutingTable_InitialBucketRange(t *testing.T) { id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41") ranges := newRoutingTable(id).BucketRanges() - if !ranges[0].Start.Equals(ranges[0].End) { - t.Error("first bucket should only fit exactly one id") + bucketRange := ranges[0] + if len(ranges) != 1 { + t.Error("there should only be one bucket") } + if !ranges[0].Start.Equals(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")) { + t.Error("bucket does not cover the lower keyspace") + } + if !ranges[0].End.Equals(bits.FromHexP("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) { + t.Error("bucket does not cover the upper keyspace") + } + found := 0 for i := 0; i < 1000; i++ { randID := bits.Rand() - found := -1 - for i, r := range ranges { - if r.Start.Cmp(randID) <= 0 && r.End.Cmp(randID) >= 0 { - if found >= 0 { - t.Errorf("%s appears in buckets %d and %d", randID.Hex(), found, i) - } else { - found = i - } - } - } - if found < 0 { - t.Errorf("%s did not appear in any bucket", randID.Hex()) + if bucketRange.Start.Cmp(randID) <= 0 && bucketRange.End.Cmp(randID) >= 0 { + found += 1 } } + if found != 1000 { + t.Errorf("%d did not appear in any bucket", found) + } } func TestRoutingTable_Save(t *testing.T) {