grin's cleanup and some WIP

This commit is contained in:
Alex Grintsvayg 2018-07-10 17:30:47 -04:00
parent a3ac49182c
commit 5cdcdfdd09
4 changed files with 147 additions and 232 deletions

View file

@ -344,11 +344,6 @@ func MaxP() Bitmap {
return FromHexP(strings.Repeat("f", NumBytes*2))
}
// Min returns a bitmap with all bits set to 0
func MinP() Bitmap {
return FromHexP(strings.Repeat("0", NumBytes*2))
}
// Rand generates a cryptographically random bitmap with the confines of the parameters specified.
func Rand() Bitmap {
var id Bitmap

View file

@ -61,3 +61,7 @@ func (r Range) intervalStart(n, num int) *big.Int {
func (r Range) IntervalSize() *big.Int {
return (&big.Int{}).Sub(r.End.Big(), r.Start.Big())
}
func (r Range) Contains(b Bitmap) bool {
return r.Start.Cmp(b) <= 0 && r.End.Cmp(b) >= 0
}

View file

@ -3,13 +3,14 @@ package dht
import (
"encoding/json"
"fmt"
"math/big"
"net"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/dht/bits"
@ -28,7 +29,7 @@ type peer struct {
// LastRequested time.Time
// LastFailure time.Time
// SecondLastFailure time.Time
NumFailures int
NumFailures int
//<lastPublished>,
//<originallyPublished>
@ -57,10 +58,18 @@ func (p *peer) Fail() {
}
type bucket struct {
lock *sync.RWMutex
peers []peer
lastUpdate time.Time
bucketRange bits.Range
lock *sync.RWMutex
peers []peer
lastUpdate time.Time
Range bits.Range // capitalized because `range` is a keyword
}
func newBucket(r bits.Range) *bucket {
return &bucket{
peers: make([]peer, 0, bucketSize),
lock: &sync.RWMutex{},
Range: r,
}
}
// Len returns the number of peers in the bucket
@ -70,6 +79,17 @@ func (b bucket) Len() int {
return len(b.peers)
}
func (b bucket) Contains(c Contact) bool {
b.lock.RLock()
defer b.lock.RUnlock()
for _, p := range b.peers {
if p.Contact.Equals(c, true) {
return true
}
}
return false
}
// Contacts returns a slice of the bucket's contacts
func (b bucket) Contacts() []Contact {
b.lock.RLock()
@ -85,22 +105,26 @@ func (b bucket) Contacts() []Contact {
func (b *bucket) UpdateContact(c Contact, insertIfNew bool) {
b.lock.Lock()
defer b.lock.Unlock()
fmt.Printf("updating contact %s\n", c.ID)
// TODO: verify the peer is in the bucket key range
peerIndex := find(c.ID, b.peers)
if peerIndex >= 0 {
fmt.Println("exists, moving to back")
b.lastUpdate = time.Now()
b.peers[peerIndex].Touch()
moveToBack(b.peers, peerIndex)
} else if insertIfNew {
fmt.Println("inserting new")
hasRoom := true
if len(b.peers) >= bucketSize {
fmt.Println("no room")
hasRoom = false
for i := range b.peers {
if b.peers[i].IsBad(maxPeerFails) {
fmt.Println("dropping bad peer to make room")
// TODO: Ping contact first. Only remove if it does not respond
b.peers = append(b.peers[:i], b.peers[i+1:]...)
hasRoom = true
@ -110,10 +134,13 @@ func (b *bucket) UpdateContact(c Contact, insertIfNew bool) {
}
if hasRoom {
fmt.Println("actually adding")
b.lastUpdate = time.Now()
peer := peer{Contact: c}
peer.Touch()
b.peers = append(b.peers, peer)
} else {
fmt.Println("no room, dropping")
}
}
}
@ -146,37 +173,59 @@ func (b *bucket) NeedsRefresh(refreshInterval time.Duration) bool {
return time.Since(b.lastUpdate) > refreshInterval
}
func (b *bucket) Split() (*bucket, *bucket) {
b.lock.Lock()
defer b.lock.Unlock()
left := newBucket(b.Range.IntervalP(1, 2))
right := newBucket(b.Range.IntervalP(2, 2))
left.lastUpdate = b.lastUpdate
right.lastUpdate = b.lastUpdate
for _, p := range b.peers {
if left.Range.Contains(p.Contact.ID) {
left.peers = append(left.peers, p)
} else {
right.peers = append(right.peers, p)
}
}
if len(left.peers) == 0 {
left, right = right.Split()
left.Range.Start = b.Range.Start
} else if len(right.peers) == 0 {
left, right = left.Split()
right.Range.End = b.Range.End
}
return left, right
}
type routingTable struct {
id bits.Bitmap
buckets []bucket
lock *sync.RWMutex
buckets []*bucket
mu *sync.RWMutex // this mutex is write-locked only when CHANGING THE NUMBER OF BUCKETS in the table
}
func newRoutingTable(id bits.Bitmap) *routingTable {
var rt routingTable
rt.id = id
rt.lock = &sync.RWMutex{}
rt := routingTable{
id: id,
mu: &sync.RWMutex{},
}
rt.reset()
return &rt
}
func (rt *routingTable) reset() {
rt.Lock()
defer rt.Unlock()
newBucketLock := &sync.RWMutex{}
newBucketLock.Lock()
rt.buckets = []bucket{}
rt.buckets = append(rt.buckets, bucket{
peers: make([]peer, 0, bucketSize),
lock: newBucketLock,
bucketRange: bits.Range{
Start: bits.MinP(),
End: bits.MaxP(),
},
})
rt.mu.Lock()
defer rt.mu.Unlock()
rt.buckets = []*bucket{newBucket(bits.MaxRange())}
}
func (rt *routingTable) BucketInfo() string {
rt.mu.RLock()
defer rt.mu.RUnlock()
var bucketInfo []string
for i, b := range rt.buckets {
if b.Len() > 0 {
@ -196,52 +245,53 @@ func (rt *routingTable) BucketInfo() string {
// Update inserts or refreshes a contact
func (rt *routingTable) Update(c Contact) {
rt.insertContact(c)
rt.mu.Lock() // write lock, because updates may cause bucket splits
defer rt.mu.Unlock()
if rt.shouldSplit(c) {
spew.Dump("splitting")
i := rt.bucketNumFor(c.ID)
left, right := rt.buckets[i].Split()
rt.buckets = append(rt.buckets[:i], append([]*bucket{left, right}, rt.buckets[i+1:]...)...)
} else {
spew.Dump("no split")
}
rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true)
}
// Fresh refreshes a contact if its already in the routing table
func (rt *routingTable) Fresh(c Contact) {
rt.mu.RLock()
defer rt.mu.RUnlock()
rt.bucketFor(c.ID).UpdateContact(c, false)
}
// FailContact marks a contact as having failed, and removes it if it failed too many times
func (rt *routingTable) Fail(c Contact) {
rt.mu.RLock()
defer rt.mu.RUnlock()
rt.bucketFor(c.ID).FailContact(c.ID)
}
func (rt *routingTable) getClosestToUs(limit int) []Contact {
contacts := []Contact{}
toSort := []sortedContact{}
rt.lock.RLock()
defer rt.lock.RUnlock()
for _, bucket := range rt.buckets {
toSort = []sortedContact{}
toSort = appendContacts(toSort, bucket, rt.id)
sort.Sort(byXorDistance(toSort))
for _, sorted := range toSort {
contacts = append(contacts, sorted.contact)
if len(contacts) >= limit {
break
}
}
}
return contacts
// GetClosest returns the closest `limit` contacts from the routing table.
// This is a locking wrapper around getClosest()
func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact {
rt.mu.RLock()
defer rt.mu.RUnlock()
return rt.getClosest(target, limit)
}
// GetClosest returns the closest `limit` contacts from the routing table
// It marks each bucket it accesses as having been accessed
func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact {
if target == rt.id {
return rt.getClosestToUs(limit)
}
rt.lock.RLock()
defer rt.lock.RUnlock()
toSort := []sortedContact{}
// getClosest returns the closest `limit` contacts from the routing table
func (rt *routingTable) getClosest(target bits.Bitmap, limit int) []Contact {
var toSort []sortedContact
for _, b := range rt.buckets {
toSort = appendContacts(toSort, b, target)
for _, c := range b.Contacts() {
toSort = append(toSort, sortedContact{c, c.ID.Xor(target)})
}
}
sort.Sort(byXorDistance(toSort))
contacts := []Contact{}
var contacts []Contact
for _, sorted := range toSort {
contacts = append(contacts, sorted.contact)
if len(contacts) >= limit {
@ -251,18 +301,11 @@ func (rt *routingTable) GetClosest(target bits.Bitmap, limit int) []Contact {
return contacts
}
func appendContacts(contacts []sortedContact, b bucket, target bits.Bitmap) []sortedContact {
for _, contact := range b.Contacts() {
contacts = append(contacts, sortedContact{contact, contact.ID.Xor(target)})
}
return contacts
}
// Count returns the number of contacts in the routing table
func (rt *routingTable) Count() int {
rt.mu.RLock()
defer rt.mu.RUnlock()
count := 0
rt.lock.RLock()
defer rt.lock.RUnlock()
for _, bucket := range rt.buckets {
count += bucket.Len()
}
@ -271,32 +314,30 @@ func (rt *routingTable) Count() int {
// Len returns the number of buckets in the routing table
func (rt *routingTable) Len() int {
rt.lock.RLock()
defer rt.lock.RUnlock()
rt.mu.RLock()
defer rt.mu.RUnlock()
return len(rt.buckets)
}
// BucketRanges returns a slice of ranges, where the `start` of each range is the smallest id that can
// go in that bucket, and the `end` is the largest id
func (rt *routingTable) BucketRanges() []bits.Range {
rt.lock.RLock()
defer rt.lock.RUnlock()
rt.mu.RLock()
defer rt.mu.RUnlock()
ranges := make([]bits.Range, len(rt.buckets))
for i, b := range rt.buckets {
ranges[i] = b.bucketRange
ranges[i] = b.Range
}
return ranges
}
func (rt *routingTable) bucketNumFor(target bits.Bitmap) int {
rt.lock.RLock()
defer rt.lock.RUnlock()
if rt.id.Equals(target) {
panic("routing table does not have a bucket for its own id")
}
distance := target.Xor(rt.id)
for i, b := range rt.buckets {
if b.bucketRange.Start.Cmp(distance) <= 0 && b.bucketRange.End.Cmp(distance) >= 0 {
if b.Range.Contains(distance) {
return i
}
}
@ -304,164 +345,36 @@ func (rt *routingTable) bucketNumFor(target bits.Bitmap) int {
}
func (rt *routingTable) bucketFor(target bits.Bitmap) *bucket {
bucketIndex := rt.bucketNumFor(target)
rt.lock.RLock()
defer rt.lock.RUnlock()
return &rt.buckets[bucketIndex]
return rt.buckets[rt.bucketNumFor(target)]
}
func (rt *routingTable) shouldSplit(target bits.Bitmap) bool {
b := rt.bucketFor(target)
func (rt *routingTable) shouldSplit(c Contact) bool {
b := rt.bucketFor(c.ID)
if b.Contains(c) {
return false
}
if b.Len() >= bucketSize {
if b.bucketRange.Start.Equals(bits.MinP()) { // this is the bucket covering our node id
if b.Range.Start.Equals(bits.Bitmap{}) { // this is the bucket covering our node id
return true
}
kClosest := rt.GetClosest(rt.id, bucketSize)
kthClosest := kClosest[len(kClosest) - 1]
if target.Xor(rt.id).Cmp(kthClosest.ID.Xor(rt.id)) < 0 {
return true // the kth closest contact is further than this one
kClosest := rt.getClosest(rt.id, bucketSize)
kthClosest := kClosest[len(kClosest)-1]
if rt.id.Closer(c.ID, kthClosest.ID) {
return true
}
}
return false
}
func (rt *routingTable) insertContact(c Contact) {
bucketIndex := rt.bucketNumFor(c.ID)
peersInBucket :=rt.buckets[bucketIndex].Len()
if peersInBucket < bucketSize {
rt.buckets[rt.bucketNumFor(c.ID)].UpdateContact(c, true)
} else if peersInBucket >= bucketSize && rt.shouldSplit(c.ID) {
rt.splitBucket(bucketIndex)
rt.insertContact(c)
rt.popEmptyBuckets()
}
}
func (rt * routingTable) Lock() {
rt.lock.Lock()
for _, buk := range rt.buckets {
buk.lock.Lock()
}
}
func (rt * routingTable) Unlock() {
rt.lock.Unlock()
for _, buk := range rt.buckets {
buk.lock.Unlock()
}
}
func (rt *routingTable) splitBucket(bucketIndex int) {
rt.Lock()
defer rt.Unlock()
b := rt.buckets[bucketIndex]
min := b.bucketRange.Start.Big()
max := b.bucketRange.End.Big()
midpoint := &big.Int{}
midpoint.Sub(max, min)
midpoint.Div(midpoint, big.NewInt(2))
midpoint.Add(midpoint, min)
midpointPlusOne := &big.Int{}
midpointPlusOne.Add(midpointPlusOne, min)
midpointPlusOne.Add(midpoint, big.NewInt(1))
first_half := rt.buckets[:bucketIndex+1]
second_half := []bucket{}
for i := bucketIndex + 1; i < len(rt.buckets); i++ {
second_half = append(second_half, rt.buckets[i])
}
copiedPeers := []peer{}
copy(copiedPeers, b.peers)
b.peers = []peer{}
rt.buckets = []bucket{}
for _, buk := range first_half {
rt.buckets = append(rt.buckets, buk)
}
newBucketLock := &sync.RWMutex{}
newBucketLock.Lock() // will be unlocked by the deferred rt.Unlock()
newBucket := bucket{
peers: make([]peer, 0, bucketSize),
lock: newBucketLock,
bucketRange: bits.Range{
Start: bits.FromBigP(midpointPlusOne),
End: bits.FromBigP(max),
},
}
rt.buckets = append(rt.buckets, newBucket)
for _, buk := range second_half {
rt.buckets = append(rt.buckets, buk)
}
// re-size the bucket to be split
rt.buckets[bucketIndex].bucketRange.Start = bits.FromBigP(min)
rt.buckets[bucketIndex].bucketRange.End = bits.FromBigP(midpoint)
// re-insert the contacts that were in the re-sized bucket
for _, p := range copiedPeers {
rt.insertContact(p.Contact)
}
}
func (rt *routingTable) printBucketInfo() {
for i, b := range rt.buckets {
fmt.Printf("bucket %d, %d contacts\n", i + 1, len(b.peers))
fmt.Printf(" start : %s\n", b.bucketRange.Start.String())
fmt.Printf(" stop : %s\n", b.bucketRange.End.String())
fmt.Printf("bucket %d, %d contacts\n", i+1, len(b.peers))
fmt.Printf(" start : %s\n", b.Range.Start.String())
fmt.Printf(" stop : %s\n", b.Range.End.String())
fmt.Println("")
}
}
func (rt *routingTable) popBucket(bucketIndex int) {
canGoLower := bucketIndex >= 1
canGoHigher := len(rt.buckets) - 1 > bucketIndex
if canGoLower && !canGoHigher {
// raise the end of bucket[bucketIndex-1]
rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.End.Big())
} else if !canGoLower && canGoHigher {
// lower the start of bucket[bucketIndex+1]
rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(rt.buckets[bucketIndex].bucketRange.Start.Big())
} else if canGoLower && canGoHigher {
// raise the end of bucket[bucketIndex-1] and lower the start of bucket[bucketIndex+1] to the
// midpoint of the range covered by bucket[bucketIndex]
midpoint := &big.Int{}
midpoint.Sub(rt.buckets[bucketIndex].bucketRange.End.Big(), rt.buckets[bucketIndex].bucketRange.Start.Big())
midpoint.Div(midpoint, big.NewInt(2))
midpointPlusOne := &big.Int{}
midpointPlusOne.Add(midpoint, big.NewInt(1))
rt.buckets[bucketIndex-1].bucketRange.End = bits.FromBigP(midpoint)
rt.buckets[bucketIndex+1].bucketRange.Start = bits.FromBigP(midpointPlusOne)
} else {
return
}
// pop the bucket
rt.buckets = rt.buckets[:bucketIndex+copy(rt.buckets[bucketIndex:], rt.buckets[bucketIndex+1:])]
}
func (rt *routingTable) popNextEmptyBucket() bool {
for bucketIndex := 0; bucketIndex < len(rt.buckets); bucketIndex += 1 {
if len(rt.buckets[bucketIndex].peers) == 0 {
rt.popBucket(bucketIndex)
return true
}
}
return false
}
func (rt *routingTable) popEmptyBuckets() {
rt.Lock()
defer rt.Unlock()
if len(rt.buckets) > 1 {
popBuckets := rt.popNextEmptyBucket()
for popBuckets == true {
popBuckets = rt.popNextEmptyBucket()
}
}
}
func (rt *routingTable) GetIDsForRefresh(refreshInterval time.Duration) []bits.Bitmap {
var bitmaps []bits.Bitmap
for i, bucket := range rt.buckets {

View file

@ -2,38 +2,39 @@ package dht
import (
"encoding/json"
"fmt"
"math/big"
"net"
"strconv"
"strings"
"testing"
"github.com/lbryio/reflector.go/dht/bits"
"github.com/sebdah/goldie"
)
func checkBucketCount(rt *routingTable, t *testing.T, correctSize, correctCount, testCaseIndex int) {
func checkBucketCount(t *testing.T, rt *routingTable, correctSize, correctCount, testCaseIndex int) {
if len(rt.buckets) != correctSize {
t.Errorf("failed test case %d. there should be %d buckets, got %d", testCaseIndex + 1, correctSize, len(rt.buckets))
t.Errorf("failed test case %d. there should be %d buckets, got %d", testCaseIndex+1, correctSize, len(rt.buckets))
}
if rt.Count() != correctCount {
t.Errorf("failed test case %d. there should be %d contacts, got %d", testCaseIndex + 1, correctCount, rt.Count())
t.Errorf("failed test case %d. there should be %d contacts, got %d", testCaseIndex+1, correctCount, rt.Count())
}
}
func checkRangeContinuity(rt *routingTable, t *testing.T) {
func checkRangeContinuity(t *testing.T, rt *routingTable) {
position := big.NewInt(0)
for i, bucket := range rt.buckets {
bucketStart := bucket.bucketRange.Start.Big()
bucketStart := bucket.Range.Start.Big()
if bucketStart.Cmp(position) != 0 {
t.Errorf("invalid start of bucket range: %s vs %s", position.String(), bucketStart.String())
}
if bucketStart.Cmp(bucket.bucketRange.End.Big()) != -1 {
if bucketStart.Cmp(bucket.Range.End.Big()) != -1 {
t.Error("range start is not less than bucket end")
}
position = bucket.bucketRange.End.Big()
if i != len(rt.buckets) - 1 {
position = bucket.Range.End.Big()
if i != len(rt.buckets)-1 {
position.Add(position, big.NewInt(1))
}
}
@ -52,8 +53,8 @@ func TestSplitBuckets(t *testing.T) {
}
var tests = []struct {
id bits.Bitmap
expectedBucketCount int
id bits.Bitmap
expectedBucketCount int
expectedTotalContacts int
}{
//fill first bucket
@ -92,10 +93,13 @@ func TestSplitBuckets(t *testing.T) {
{bits.FromHexP("A00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 26},
{bits.FromHexP("B00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), 4, 27},
}
for i, testCase := range tests {
fmt.Printf("\n\n\ncase %d\n", i)
rt.Update(Contact{testCase.id, net.ParseIP("127.0.0.1"), 8000 + i})
checkBucketCount(rt, t, testCase.expectedBucketCount, testCase.expectedTotalContacts, i)
checkRangeContinuity(rt, t)
//spew.Dump(rt.buckets)
checkBucketCount(t, rt, testCase.expectedBucketCount, testCase.expectedTotalContacts, i)
checkRangeContinuity(t, rt)
}
var testRanges = []struct {
@ -204,7 +208,6 @@ func TestRoutingTable_MoveToBack(t *testing.T) {
}
}
func TestRoutingTable_Save(t *testing.T) {
id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41")
rt := newRoutingTable(id)