cluster automatically balances what nodes are announcing what hashes

This commit is contained in:
Alex Grintsvayg 2018-06-19 13:47:13 -04:00
parent b19df481da
commit 5e346cc21a
12 changed files with 227 additions and 75 deletions

View file

@ -23,14 +23,12 @@ const (
// package as a way to handle the unique identifiers of a DHT node.
type Bitmap [NumBytes]byte
func (b Bitmap) String() string {
func (b Bitmap) RawString() string {
return string(b[:])
}
func (b Bitmap) Big() *big.Int {
i := new(big.Int)
i.SetString(b.Hex(), 16)
return i
func (b Bitmap) String() string {
return b.Hex()
}
// BString returns the bitmap as a string of 0s and 1s
@ -61,6 +59,12 @@ func (b Bitmap) HexSimplified() string {
return simple
}
func (b Bitmap) Big() *big.Int {
i := new(big.Int)
i.SetString(b.Hex(), 16)
return i
}
// Equals returns T/F if every byte in bitmap are equal.
func (b Bitmap) Equals(other Bitmap) bool {
for k := range b {
@ -356,7 +360,7 @@ func FromBigP(b *big.Int) Bitmap {
// Max returns a bitmap with all bits set to 1
func MaxP() Bitmap {
return FromHexP(strings.Repeat("1", NumBytes*2))
return FromHexP(strings.Repeat("f", NumBytes*2))
}
// Rand generates a cryptographically random bitmap with the confines of the parameters specified.

63
dht/bits/range.go Normal file
View file

@ -0,0 +1,63 @@
package bits
import (
"math/big"
"github.com/lbryio/errors.go"
)
// Range has a start and end
type Range struct {
Start Bitmap
End Bitmap
}
func MaxRange() Range {
return Range{
Start: Bitmap{},
End: MaxP(),
}
}
// IntervalP divides the range into `num` intervals and returns the `n`th one
// intervals are approximately the same size, but may not be exact because of rounding issues
// the first interval always starts at the beginning of the range, and the last interval always ends at the end
func (r Range) IntervalP(n, num int) Range {
if num < 1 || n < 1 || n > num {
panic(errors.Err("invalid interval %d of %d", n, num))
}
start := r.intervalStart(n, num)
end := new(big.Int)
if n == num {
end = r.End.Big()
} else {
end = r.intervalStart(n+1, num)
end.Sub(end, big.NewInt(1))
}
return Range{FromBigP(start), FromBigP(end)}
}
func (r Range) intervalStart(n, num int) *big.Int {
// formula:
// size = (end - start) / num
// rem = (end - start) % num
// intervalStart = rangeStart + (size * n-1) + ((rem * n-1) % num)
size := new(big.Int)
rem := new(big.Int)
size.Sub(r.End.Big(), r.Start.Big()).DivMod(size, big.NewInt(int64(num)), rem)
size.Mul(size, big.NewInt(int64(n-1)))
rem.Mul(rem, big.NewInt(int64(n-1))).Mod(rem, big.NewInt(int64(num)))
start := r.Start.Big()
start.Add(start, size).Add(start, rem)
return start
}
func (r Range) IntervalSize() *big.Int {
return (&big.Int{}).Sub(r.End.Big(), r.Start.Big())
}

48
dht/bits/range_test.go Normal file
View file

@ -0,0 +1,48 @@
package bits
import (
"math/big"
"testing"
)
func TestMaxRange(t *testing.T) {
start := FromHexP("000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000")
end := FromHexP("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")
r := MaxRange()
if !r.Start.Equals(start) {
t.Error("max range does not start at the beginning")
}
if !r.End.Equals(end) {
t.Error("max range does not end at the end")
}
}
func TestRange_IntervalP(t *testing.T) {
max := MaxRange()
numIntervals := 97
expectedAvg := (&big.Int{}).Div(max.IntervalSize(), big.NewInt(int64(numIntervals)))
maxDiff := big.NewInt(int64(numIntervals))
var lastEnd Bitmap
for i := 1; i <= numIntervals; i++ {
ival := max.IntervalP(i, numIntervals)
if i == 1 && !ival.Start.Equals(max.Start) {
t.Error("first interval does not start at 0")
}
if i == numIntervals && !ival.End.Equals(max.End) {
t.Error("last interval does not end at max")
}
if i > 1 && !ival.Start.Equals(lastEnd.Add(FromShortHexP("1"))) {
t.Errorf("interval %d of %d: last end was %s, this start is %s", i, numIntervals, lastEnd.Hex(), ival.Start.Hex())
}
if ival.IntervalSize().Cmp((&big.Int{}).Add(expectedAvg, maxDiff)) > 0 || ival.IntervalSize().Cmp((&big.Int{}).Sub(expectedAvg, maxDiff)) < 0 {
t.Errorf("interval %d of %d: interval size is outside the normal range", i, numIntervals)
}
lastEnd = ival.End
}
}

View file

@ -21,9 +21,9 @@ type BootstrapNode struct {
initialPingInterval time.Duration
checkInterval time.Duration
nlock *sync.RWMutex
nodes []peer
nodeKeys map[bits.Bitmap]int
nlock *sync.RWMutex
nodes map[bits.Bitmap]*peer
nodeIDs []bits.Bitmap // necessary for efficient random ID selection
}
// NewBootstrapNode returns a BootstrapNode pointer.
@ -34,9 +34,9 @@ func NewBootstrapNode(id bits.Bitmap, initialPingInterval, rePingInterval time.D
initialPingInterval: initialPingInterval,
checkInterval: rePingInterval,
nlock: &sync.RWMutex{},
nodes: make([]peer, 0),
nodeKeys: make(map[bits.Bitmap]int),
nlock: &sync.RWMutex{},
nodes: make(map[bits.Bitmap]*peer),
nodeIDs: make([]bits.Bitmap, 0),
}
b.requestHandler = b.handleRequest
@ -78,15 +78,15 @@ func (b *BootstrapNode) upsert(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
if i, exists := b.nodeKeys[c.ID]; exists {
log.Debugf("[%s] bootstrap: touching contact %s", b.id.HexShort(), b.nodes[i].Contact.ID.HexShort())
b.nodes[i].Touch()
if node, exists := b.nodes[c.ID]; exists {
log.Debugf("[%s] bootstrap: touching contact %s", b.id.HexShort(), node.Contact.ID.HexShort())
node.Touch()
return
}
log.Debugf("[%s] bootstrap: adding new contact %s", b.id.HexShort(), c.ID.HexShort())
b.nodeKeys[c.ID] = len(b.nodes)
b.nodes = append(b.nodes, peer{c, time.Now(), 0})
b.nodes[c.ID] = &peer{c, time.Now(), 0}
b.nodeIDs = append(b.nodeIDs, c.ID)
}
// remove removes the contact from the list
@ -94,14 +94,19 @@ func (b *BootstrapNode) remove(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
i, exists := b.nodeKeys[c.ID]
_, exists := b.nodes[c.ID]
if !exists {
return
}
log.Debugf("[%s] bootstrap: removing contact %s", b.id.HexShort(), c.ID.HexShort())
b.nodes = append(b.nodes[:i], b.nodes[i+1:]...)
delete(b.nodeKeys, c.ID)
delete(b.nodes, c.ID)
for i := range b.nodeIDs {
if b.nodeIDs[i].Equals(c.ID) {
b.nodeIDs = append(b.nodeIDs[:i], b.nodeIDs[i+1:]...)
break
}
}
}
// get returns up to `limit` random contacts from the list
@ -114,8 +119,8 @@ func (b *BootstrapNode) get(limit int) []Contact {
}
ret := make([]Contact, limit)
for i, k := range randKeys(len(b.nodes))[:limit] {
ret[i] = b.nodes[k].Contact
for i, k := range randKeys(len(b.nodeIDs))[:limit] {
ret[i] = b.nodes[b.nodeIDs[k]].Contact
}
return ret
@ -123,6 +128,7 @@ func (b *BootstrapNode) get(limit int) []Contact {
// ping pings a node. if the node responds, it is added to the list. otherwise, it is removed
func (b *BootstrapNode) ping(c Contact) {
log.Debugf("[%s] bootstrap: pinging %s", b.id.HexShort(), c.ID.HexShort())
b.stop.Add(1)
defer b.stop.Done()
@ -180,9 +186,19 @@ func (b *BootstrapNode) handleRequest(addr *net.UDPAddr, request Request) {
}
go func() {
log.Debugf("[%s] bootstrap: queuing %s to ping", b.id.HexShort(), request.NodeID.HexShort())
<-time.After(b.initialPingInterval)
b.ping(Contact{ID: request.NodeID, IP: addr.IP, Port: addr.Port})
b.nlock.RLock()
_, exists := b.nodes[request.NodeID]
b.nlock.RUnlock()
if !exists {
log.Debugf("[%s] bootstrap: queuing %s to ping", b.id.HexShort(), request.NodeID.HexShort())
<-time.After(b.initialPingInterval)
b.nlock.RLock()
_, exists = b.nodes[request.NodeID]
b.nlock.RUnlock()
if !exists {
b.ping(Contact{ID: request.NodeID, IP: addr.IP, Port: addr.Port})
}
}
}()
}

View file

@ -200,7 +200,7 @@ func (dht *DHT) Ping(addr string) error {
}
tmpNode := Contact{ID: bits.Rand(), IP: raddr.IP, Port: raddr.Port}
res := dht.node.Send(tmpNode, Request{Method: pingMethod})
res := dht.node.Send(tmpNode, Request{Method: pingMethod}, SendOptions{skipIDCheck: true})
if res == nil {
return errors.Err("no response from node %s", addr)
}
@ -222,15 +222,23 @@ func (dht *DHT) Get(hash bits.Bitmap) ([]Contact, error) {
}
// Add adds the hash to the list of hashes this node has
func (dht *DHT) Add(hash bits.Bitmap) error {
func (dht *DHT) Add(hash bits.Bitmap) {
// TODO: calling Add several times quickly could cause it to be announced multiple times before dht.announced[hash] is set to true
dht.lock.RLock()
exists := dht.announced[hash]
dht.lock.RUnlock()
if exists {
return nil
return
}
return dht.announce(hash)
dht.stop.Add(1)
go func() {
defer dht.stop.Done()
err := dht.announce(hash)
if err != nil {
log.Error(errors.Prefix("error announcing bitmap", err))
}
}()
}
// Announce announces to the DHT that this node has the blob for the given hash
@ -241,7 +249,10 @@ func (dht *DHT) announce(hash bits.Bitmap) error {
}
// if we found less than K contacts, or current node is closer than farthest contact
if len(contacts) < bucketSize || dht.node.id.Xor(hash).Less(contacts[bucketSize-1].ID.Xor(hash)) {
if len(contacts) < bucketSize {
// append self to contacts, and self-store
contacts = append(contacts, dht.contact)
} else if dht.node.id.Xor(hash).Less(contacts[bucketSize-1].ID.Xor(hash)) {
// pop last contact, and self-store instead
contacts[bucketSize-1] = dht.contact
}
@ -289,7 +300,7 @@ func (dht *DHT) startReannouncer() {
func (dht *DHT) storeOnNode(hash bits.Bitmap, c Contact) {
// self-store
if dht.contact.Equals(c) {
if dht.contact.ID == c.ID {
dht.node.Store(hash, c)
return
}

View file

@ -102,7 +102,7 @@ func TestBencodeFindValueResponse(t *testing.T) {
res := Response{
ID: newMessageID(),
NodeID: bits.Rand(),
FindValueKey: bits.Rand().String(),
FindValueKey: bits.Rand().RawString(),
Token: "arst",
Contacts: []Contact{
{ID: bits.Rand(), IP: net.IPv4(1, 2, 3, 4).To4(), Port: 5678},

View file

@ -276,7 +276,7 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
}
if contacts := n.store.Get(*request.Arg); len(contacts) > 0 {
res.FindValueKey = request.Arg.String()
res.FindValueKey = request.Arg.RawString()
res.Contacts = contacts
} else {
res.Contacts = n.rt.GetClosest(*request.Arg, bucketSize)
@ -297,7 +297,7 @@ func (n *Node) handleRequest(addr *net.UDPAddr, request Request) {
// handleResponse handles responses received from udp.
func (n *Node) handleResponse(addr *net.UDPAddr, response Response) {
tx := n.txFind(response.ID, addr)
tx := n.txFind(response.ID, Contact{ID: response.NodeID, IP: addr.IP, Port: addr.Port})
if tx != nil {
tx.res <- response
}
@ -339,9 +339,10 @@ func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error {
// transaction represents a single query to the dht. it stores the queried contact, the request, and the response channel
type transaction struct {
contact Contact
req Request
res chan Response
contact Contact
req Request
res chan Response
skipIDCheck bool
}
// insert adds a transaction to the manager.
@ -358,24 +359,27 @@ func (n *Node) txDelete(id messageID) {
delete(n.transactions, id)
}
// Find finds a transaction for the given id. it optionally ensures that addr matches contact from transaction
func (n *Node) txFind(id messageID, addr *net.UDPAddr) *transaction {
// Find finds a transaction for the given id and contact
func (n *Node) txFind(id messageID, c Contact) *transaction {
n.txLock.RLock()
defer n.txLock.RUnlock()
// TODO: also check that the response's nodeid matches the id you thought you sent to?
t, ok := n.transactions[id]
if !ok || (addr != nil && t.contact.Addr().String() != addr.String()) {
if !ok || !t.contact.Equals(c, !t.skipIDCheck) {
return nil
}
return t
}
// SendOptions controls the behavior of send calls
type SendOptions struct {
skipIDCheck bool
}
// SendAsync sends a transaction and returns a channel that will eventually contain the transaction response
// The response channel is closed when the transaction is completed or times out.
func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request) <-chan *Response {
func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request, options ...SendOptions) <-chan *Response {
if contact.ID.Equals(n.id) {
log.Error("sending query to self")
return nil
@ -394,6 +398,10 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request) <-ch
res: make(chan Response),
}
if len(options) > 0 && options[0].skipIDCheck {
tx.skipIDCheck = true
}
n.txInsert(tx)
defer n.txDelete(tx.req.ID)
@ -425,14 +433,14 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request) <-ch
// Send sends a transaction and blocks until the response is available. It returns a response, or nil
// if the transaction timed out.
func (n *Node) Send(contact Contact, req Request) *Response {
return <-n.SendAsync(context.Background(), contact, req)
func (n *Node) Send(contact Contact, req Request, options ...SendOptions) *Response {
return <-n.SendAsync(context.Background(), contact, req, options...)
}
// SendCancelable sends the transaction asynchronously and allows the transaction to be canceled
func (n *Node) SendCancelable(contact Contact, req Request) (<-chan *Response, context.CancelFunc) {
func (n *Node) SendCancelable(contact Contact, req Request, options ...SendOptions) (<-chan *Response, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
return n.SendAsync(ctx, contact, req), cancel
return n.SendAsync(ctx, contact, req, options...), cancel
}
// CountActiveTransactions returns the number of transactions in the manager

View file

@ -48,9 +48,17 @@ func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop s
stop: stopOnce.New(),
outstandingRequestsMutex: &sync.RWMutex{},
}
if upstreamStop != nil {
cf.stop.Link(upstreamStop)
go func() {
select {
case <-upstreamStop:
cf.Stop()
case <-cf.stop.Ch():
}
}()
}
return cf.Find()
}

View file

@ -28,7 +28,7 @@ func TestPing(t *testing.T) {
data, err := bencode.EncodeBytes(map[string]interface{}{
headerTypeField: requestType,
headerMessageIDField: messageID,
headerNodeIDField: testNodeID.String(),
headerNodeIDField: testNodeID.RawString(),
headerPayloadField: "ping",
headerArgsField: []string{},
})
@ -84,7 +84,7 @@ func TestPing(t *testing.T) {
rNodeID, ok := response[headerNodeIDField].(string)
if !ok {
t.Error("node ID is not a string")
} else if rNodeID != dhtNodeID.String() {
} else if rNodeID != dhtNodeID.RawString() {
t.Error("unexpected node ID")
}
}
@ -171,7 +171,7 @@ func TestStore(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.String())
verifyResponse(t, response, messageID, dhtNodeID.RawString())
_, ok := response[headerPayloadField]
if !ok {
@ -249,7 +249,7 @@ func TestFindNode(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.String())
verifyResponse(t, response, messageID, dhtNodeID.RawString())
_, ok := response[headerPayloadField]
if !ok {
@ -320,7 +320,7 @@ func TestFindValueExisting(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.String())
verifyResponse(t, response, messageID, dhtNodeID.RawString())
_, ok := response[headerPayloadField]
if !ok {
@ -332,7 +332,7 @@ func TestFindValueExisting(t *testing.T) {
t.Fatal("payload is not a dictionary")
}
compactContacts, ok := payload[valueToFind.String()]
compactContacts, ok := payload[valueToFind.RawString()]
if !ok {
t.Fatal("payload is missing key for search value")
}
@ -396,7 +396,7 @@ func TestFindValueFallbackToFindNode(t *testing.T) {
}
}
verifyResponse(t, response, messageID, dhtNodeID.String())
verifyResponse(t, response, messageID, dhtNodeID.RawString())
_, ok := response[headerPayloadField]
if !ok {

View file

@ -32,8 +32,8 @@ type Contact struct {
}
// Equals returns T/F if two contacts are the same.
func (c Contact) Equals(other Contact) bool {
return c.ID == other.ID
func (c Contact) Equals(other Contact, checkID bool) bool {
return c.IP.Equal(other.IP) && c.Port == other.Port && (!checkID || c.ID == other.ID)
}
// Addr returns the UPD Address of the contact.
@ -150,7 +150,7 @@ func (p *peer) Touch() {
// ActiveSince returns whether a peer has responded in the last `d` duration
// this is used to check if the peer is "good", meaning that we believe the peer will respond to our requests
func (p *peer) ActiveInLast(d time.Duration) bool {
return time.Since(p.LastActivity) > d
return time.Since(p.LastActivity) < d
}
// IsBad returns whether a peer is "bad", meaning that it has failed to respond to multiple pings in a row
@ -352,20 +352,14 @@ func (rt *routingTable) Count() int {
return count
}
// Range is a structure that holds a min and max bitmaps. The range is used in bucket sizing.
type Range struct {
start bits.Bitmap
end bits.Bitmap
}
// BucketRanges returns a slice of ranges, where the `start` of each range is the smallest id that can
// go in that bucket, and the `end` is the largest id
func (rt *routingTable) BucketRanges() []Range {
ranges := make([]Range, len(rt.buckets))
func (rt *routingTable) BucketRanges() []bits.Range {
ranges := make([]bits.Range, len(rt.buckets))
for i := range rt.buckets {
ranges[i] = Range{
rt.id.Suffix(i, false).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
rt.id.Suffix(i, true).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
ranges[i] = bits.Range{
Start: rt.id.Suffix(i, false).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
End: rt.id.Suffix(i, true).Set(nodeIDBits-1-i, !rt.id.Get(nodeIDBits-1-i)),
}
}
return ranges

View file

@ -147,14 +147,14 @@ func TestRoutingTable_MoveToBack(t *testing.T) {
func TestRoutingTable_BucketRanges(t *testing.T) {
id := bits.FromHexP("1c8aff71b99462464d9eeac639595ab99664be3482cb91a29d87467515c7d9158fe72aa1f1582dab07d8f8b5db277f41")
ranges := newRoutingTable(id).BucketRanges()
if !ranges[0].start.Equals(ranges[0].end) {
if !ranges[0].Start.Equals(ranges[0].End) {
t.Error("first bucket should only fit exactly one id")
}
for i := 0; i < 1000; i++ {
randID := bits.Rand()
found := -1
for i, r := range ranges {
if r.start.LessOrEqual(randID) && r.end.GreaterOrEqual(randID) {
if r.Start.LessOrEqual(randID) && r.End.GreaterOrEqual(randID) {
if found >= 0 {
t.Errorf("%s appears in buckets %d and %d", randID.Hex(), found, i)
} else {
@ -176,10 +176,10 @@ func TestRoutingTable_Save(t *testing.T) {
for i, r := range ranges {
for j := 0; j < bucketSize; j++ {
toAdd := r.start.Add(bits.FromShortHexP(strconv.Itoa(j)))
if toAdd.LessOrEqual(r.end) {
toAdd := r.Start.Add(bits.FromShortHexP(strconv.Itoa(j)))
if toAdd.LessOrEqual(r.End) {
rt.Update(Contact{
ID: r.start.Add(bits.FromShortHexP(strconv.Itoa(j))),
ID: r.Start.Add(bits.FromShortHexP(strconv.Itoa(j))),
IP: net.ParseIP("1.2.3." + strconv.Itoa(j)),
Port: 1 + i*bucketSize + j,
})

View file

@ -226,7 +226,7 @@ func verifyContacts(t *testing.T, contacts []interface{}, nodes []Contact) {
continue
}
for _, n := range nodes {
if n.ID.String() == id {
if n.ID.RawString() == id {
currNode = n
currNodeFound = true
foundNodes[id] = true