expand empty buckets

This commit is contained in:
Jack Robison 2018-06-27 18:09:10 -04:00
parent 1b41525f4b
commit 7b8ab21b6c
2 changed files with 192 additions and 61 deletions

View file

@ -3,17 +3,16 @@ package dht
import (
"encoding/json"
"fmt"
"math/big"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/dht/bits"
"math/big"
)
// TODO: if routing table is ever empty (aka the node is isolated), it should re-bootstrap
@ -25,7 +24,12 @@ import (
type peer struct {
Contact Contact
LastActivity time.Time
// LastReplied time.Time
// LastRequested time.Time
// LastFailure time.Time
// SecondLastFailure time.Time
NumFailures int
//<lastPublished>,
//<originallyPublished>
// <originalPublisherID>
@ -56,7 +60,7 @@ type bucket struct {
lock *sync.RWMutex
peers []peer
lastUpdate time.Time
bucketRange *bits.Range
bucketRange bits.Range
}
// Len returns the number of peers in the bucket
@ -163,7 +167,7 @@ func (rt *routingTable) reset() {
rt.buckets = append(rt.buckets, bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
bucketRange: &bits.Range{
bucketRange: bits.Range{
Start: bits.FromBigP(start),
End: bits.FromBigP(end),
},
@ -179,7 +183,7 @@ func (rt *routingTable) BucketInfo() string {
for j, c := range contacts {
s[j] = c.ID.HexShort()
}
bucketInfo = append(bucketInfo, fmt.Sprintf("Bucket %d: (%d) %s", i, len(contacts), strings.Join(s, ", ")))
bucketInfo = append(bucketInfo, fmt.Sprintf("bucket %d: (%d) %s", i, len(contacts), strings.Join(s, ", ")))
}
}
if len(bucketInfo) == 0 {
@ -242,7 +246,7 @@ func (rt *routingTable) Count() int {
func (rt *routingTable) BucketRanges() []bits.Range {
ranges := make([]bits.Range, len(rt.buckets))
for i, b := range rt.buckets {
ranges[i] = *b.bucketRange
ranges[i] = b.bucketRange
}
return ranges
}
@ -280,59 +284,122 @@ func (rt *routingTable) shouldSplit(target bits.Bitmap) bool {
}
func (rt *routingTable) insertContact(c Contact) {
if len(rt.buckets[rt.bucketNumFor(c.ID)].peers) < bucketSize {
bucketIndex := rt.bucketNumFor(c.ID)
peersInBucket := int(len(rt.buckets[bucketIndex].peers))
if peersInBucket < bucketSize {
rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true)
} else if rt.shouldSplit(c.ID) {
rt.recursiveInsertContact(c)
} else if peersInBucket >= bucketSize && rt.shouldSplit(c.ID) {
rt.splitBucket(bucketIndex)
rt.insertContact(c)
}
rt.popEmptyBuckets()
}
func (rt *routingTable) recursiveInsertContact(c Contact) {
bucketIndex := rt.bucketNumFor(c.ID)
func (rt *routingTable) splitBucket(bucketIndex int) {
b := rt.buckets[bucketIndex]
min := b.bucketRange.Start.Big()
max := b.bucketRange.End.Big()
midpoint := max.Sub(max, min)
midpoint := &big.Int{}
midpoint.Sub(max, min)
midpoint.Div(midpoint, big.NewInt(2))
midpoint.Add(midpoint, min)
midpointPlusOne := &big.Int{}
midpointPlusOne.Add(midpointPlusOne, min)
midpointPlusOne.Add(midpoint, big.NewInt(1))
// re-size the bucket to be split
b.bucketRange.Start = bits.FromBigP(min)
b.bucketRange.End = bits.FromBigP(midpoint.Sub(midpoint, big.NewInt(1)))
first_half := rt.buckets[:bucketIndex+1]
movedPeers := []peer{}
resizedPeers := []peer{}
// set the re-sized bucket to only have peers still in range
for _, p := range b.peers {
if rt.bucketNumFor(p.Contact.ID) != bucketIndex {
movedPeers = append(movedPeers, p)
} else {
resizedPeers = append(resizedPeers, p)
}
second_half := []bucket{}
for i := bucketIndex + 1; i < len(rt.buckets); i++ {
second_half = append(second_half, rt.buckets[i])
}
b.peers = resizedPeers
// add the new bucket
insert := bucket{
copiedPeers := []peer{}
copy(copiedPeers, b.peers)
b.peers = []peer{}
rt.buckets = []bucket{}
for _, i := range first_half {
rt.buckets = append(rt.buckets, i)
}
newBucket := bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
bucketRange: &bits.Range{
Start: bits.FromBigP(midpoint),
bucketRange: bits.Range{
Start: bits.FromBigP(midpointPlusOne),
End: bits.FromBigP(max),
},
}
rt.buckets = append(rt.buckets[:bucketIndex], append([]bucket{insert}, rt.buckets[bucketIndex:]...)...)
rt.buckets = append(rt.buckets, newBucket)
for _, i := range second_half {
rt.buckets = append(rt.buckets, i)
}
// re-size the bucket to be split
rt.buckets[bucketIndex].bucketRange.Start = bits.FromBigP(min)
rt.buckets[bucketIndex].bucketRange.End = bits.FromBigP(midpoint)
// re-insert the contacts that where out of range of the split bucket
for _, p := range movedPeers {
// re-insert the contacts that were in the re-sized bucket
for _, p := range copiedPeers {
rt.insertContact(p.Contact)
}
// insert the new contact
rt.insertContact(c)
}
func (rt *routingTable) printBucketInfo() {
for i, b := range rt.buckets {
fmt.Printf("bucket %d, %d contacts\n", i + 1, len(b.peers))
fmt.Printf(" start : %s\n", b.bucketRange.Start.String())
fmt.Printf(" stop : %s\n", b.bucketRange.End.String())
fmt.Println("")
}
}
func (rt *routingTable) popBucket(bucketIndex int) {
canGoLower := bucketIndex >= 1
canGoHigher := len(rt.buckets) - 1 > bucketIndex
if canGoLower && !canGoHigher {
// raise the end of bucket[bucketIndex-1]
rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.End.Big())
} else if !canGoLower && canGoHigher {
// lower the start of bucket[bucketIndex+1]
rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.Start.Big())
} else if canGoLower && canGoHigher {
// raise the end of bucket[bucketIndex-1] and lower the start of bucket[bucketIndex+1] to the
// midpoint of the range covered by bucket[bucketIndex]
midpoint := &big.Int{}
midpoint.Sub(rt.buckets[bucketIndex].bucketRange.End.Big(), rt.buckets[bucketIndex].bucketRange.Start.Big())
midpoint.Div(midpoint, big.NewInt(2))
midpointPlusOne := &big.Int{}
midpointPlusOne.Add(midpoint, big.NewInt(1))
rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(midpoint)
rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(midpointPlusOne)
} else {
return
}
// pop the bucket
rt.buckets = rt.buckets[:bucketIndex+copy(rt.buckets[bucketIndex:], rt.buckets[bucketIndex+1:])]
}
func (rt *routingTable) popNextEmptyBucket() bool {
for bucketIndex := 0; bucketIndex < len(rt.buckets); bucketIndex += 1 {
if len(rt.buckets[bucketIndex].peers) == 0 {
rt.popBucket(bucketIndex)
return true
}
}
return false
}
func (rt *routingTable) popEmptyBuckets() {
if len(rt.buckets) > 1 {
popBuckets := rt.popNextEmptyBucket()
for popBuckets == true {
popBuckets = rt.popNextEmptyBucket()
}
}
}
func (rt *routingTable) GetIDsForRefresh(refreshInterval time.Duration) []bits.Bitmap {
var bitmaps []bits.Bitmap

View file

@ -2,11 +2,11 @@ package dht
import (
"encoding/json"
"math/big"
"net"
"strconv"
"strings"
"testing"
"github.com/lbryio/reflector.go/dht/bits"
"github.com/sebdah/goldie"
)
@ -36,29 +36,92 @@ func TestRoutingTable_bucketFor(t *testing.T) {
}
}
func TestRoutingTableFillBuckets(t *testing.T) {
n1 := bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n2 := bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n3 := bits.FromHexP("111111110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n4 := bits.FromHexP("111111120000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n5 := bits.FromHexP("111111130000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n6 := bits.FromHexP("111111140000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n7 := bits.FromHexP("111111150000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n8 := bits.FromHexP("111111160000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
n9 := bits.FromHexP("111111070000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
func checkBucketCount(rt *routingTable, t *testing.T, correctSize, correctCount, testCaseIndex int) {
if len(rt.buckets) != correctSize {
t.Errorf("failed test case %d. there should be %d buckets, got %d", testCaseIndex + 1, correctSize, len(rt.buckets))
}
if rt.Count() != correctCount {
t.Errorf("failed test case %d. there should be %d contacts, got %d", testCaseIndex + 1, correctCount, rt.Count())
}
rt := newRoutingTable(n1)
rt.Update(Contact{n2, net.ParseIP("127.0.0.1"), 8001})
rt.Update(Contact{n3, net.ParseIP("127.0.0.1"), 8002})
rt.Update(Contact{n4, net.ParseIP("127.0.0.1"), 8003})
rt.Update(Contact{n5, net.ParseIP("127.0.0.1"), 8004})
rt.Update(Contact{n6, net.ParseIP("127.0.0.1"), 8005})
rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8006})
rt.Update(Contact{n7, net.ParseIP("127.0.0.1"), 8007})
rt.Update(Contact{n8, net.ParseIP("127.0.0.1"), 8008})
rt.Update(Contact{n9, net.ParseIP("127.0.0.1"), 8009})
}
log.Printf(rt.BucketInfo())
func checkRangeContinuity(rt *routingTable, t *testing.T) {
position := big.NewInt(0)
for i, bucket := range rt.buckets {
bucketStart := bucket.bucketRange.Start.Big()
if bucketStart.Cmp(position) != 0 {
t.Errorf("invalid start of bucket range: %s vs %s", position.String(), bucketStart.String())
}
if bucketStart.Cmp(bucket.bucketRange.End.Big()) != -1 {
t.Error("range start is not less than bucket end")
}
position = bucket.bucketRange.End.Big()
if i != len(rt.buckets) - 1 {
position.Add(position, big.NewInt(1))
}
}
if position.Cmp(bits.MaxP().Big()) != 0 {
t.Errorf("range does not cover the whole keyspace, %s vs %s", bits.FromBigP(position).String(), bits.MaxP().String())
}
}
func TestSplitBuckets(t *testing.T) {
rt := newRoutingTable(bits.FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"))
if len(rt.buckets) != 1 {
t.Errorf("there should only be one bucket so far")
}
if len(rt.buckets[0].peers) != 0 {
t.Errorf("there should be no contacts yet")
}
var tests = []struct {
id bits.Bitmap
expectedBucketCount int
expectedTotalContacts int
}{
//fill first bucket
{bits.FromHexP("F00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 1},
{bits.FromHexP("FF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 2},
{bits.FromHexP("FFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 3},
{bits.FromHexP("FFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 4},
{bits.FromHexP("FFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 5},
{bits.FromHexP("FFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 6},
{bits.FromHexP("FFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 7},
{bits.FromHexP("FFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 1, 8},
// fill second bucket
{bits.FromHexP("FFFFFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 9},
{bits.FromHexP("FFFFFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 10},
{bits.FromHexP("FFFFFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 11},
{bits.FromHexP("FFFFFFFFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 12},
{bits.FromHexP("FFFFFFFFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 13},
{bits.FromHexP("FFFFFFFFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 14},
{bits.FromHexP("FFFFFFFFFFFFFFF000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 15},
{bits.FromHexP("FFFFFFFFFFFFFFFF00000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 16},
// this should be skipped (no split should occur)
{bits.FromHexP("FFFFFFFFFFFFFFFFF0000000000000000000000000000000000000000000000000000000000000000000000000000000"), 2, 16},
{bits.FromHexP("100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 3, 17},
{bits.FromHexP("200000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 3, 18},
{bits.FromHexP("300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 3, 19},
{bits.FromHexP("400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 20},
{bits.FromHexP("500000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 21},
{bits.FromHexP("600000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 22},
{bits.FromHexP("700000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 23},
{bits.FromHexP("800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 24},
{bits.FromHexP("900000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 25},
{bits.FromHexP("A00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 26},
{bits.FromHexP("B00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 27},
}
for i, testCase := range tests {
rt.Update(Contact{testCase.id, net.ParseIP("127.0.0.1"), 8000 + i})
checkBucketCount(rt, t, testCase.expectedBucketCount, testCase.expectedTotalContacts, i)
checkRangeContinuity(rt, t)
}
rt.printBucketInfo()
}
func TestRoutingTable_GetClosest(t *testing.T) {
@ -77,7 +140,6 @@ func TestRoutingTable_GetClosest(t *testing.T) {
if !contacts[0].ID.Equals(n3) {
t.Error(contacts[0])
}
contacts = rt.GetClosest(n2, 10)
if len(contacts) != 2 {
t.Error(len(contacts))
@ -148,7 +210,8 @@ func TestRoutingTable_MoveToBack(t *testing.T) {
func TestRoutingTable_InitialBucketRange(t *testing.T) {
id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41")
ranges := newRoutingTable(id).BucketRanges()
rt := newRoutingTable(id)
ranges := rt.BucketRanges()
bucketRange := ranges[0]
if len(ranges) != 1 {
t.Error("there should only be one bucket")
@ -169,6 +232,7 @@ func TestRoutingTable_InitialBucketRange(t *testing.T) {
if found != 1000 {
t.Errorf("%d did not appear in any bucket", found)
}
log.Println(rt.Count())
}
func TestRoutingTable_Save(t *testing.T) {