correct node_finder to use loose parallelism

This commit is contained in:
Alex Grintsvayg 2018-06-25 15:48:57 -04:00
parent ea9b181d16
commit e534f5b972
6 changed files with 257 additions and 170 deletions

View file

@ -65,50 +65,31 @@ func (b Bitmap) Big() *big.Int {
return i
}
// Equals returns T/F if every byte in bitmap are equal.
// Cmp compares b and other and returns:
//
// -1 if b < other
// 0 if b == other
// +1 if b > other
//
func (b Bitmap) Cmp(other Bitmap) int {
for k := range b {
if b[k] < other[k] {
return -1
} else if b[k] > other[k] {
return 1
}
}
return 0
}
// Closer returns true if dist(b,x) < dist(b,y)
func (b Bitmap) Closer(x, y Bitmap) bool {
return x.Xor(b).Cmp(y.Xor(b)) < 0
}
// Equals returns true if every byte in bitmap are equal, false otherwise
func (b Bitmap) Equals(other Bitmap) bool {
for k := range b {
if b[k] != other[k] {
return false
}
}
return true
}
// Less returns T/F if there exists a byte pair that is not equal AND this bitmap is less than the other.
func (b Bitmap) Less(other interface{}) bool {
for k := range b {
if b[k] != other.(Bitmap)[k] {
return b[k] < other.(Bitmap)[k]
}
}
return false
}
// LessOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is less than the other.
func (b Bitmap) LessOrEqual(other interface{}) bool {
if bm, ok := other.(Bitmap); ok && b.Equals(bm) {
return true
}
return b.Less(other)
}
// Greater returns T/F if there exists a byte pair that is not equal AND this bitmap byte is greater than the other.
func (b Bitmap) Greater(other interface{}) bool {
for k := range b {
if b[k] != other.(Bitmap)[k] {
return b[k] > other.(Bitmap)[k]
}
}
return false
}
// GreaterOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is greater than the other.
func (b Bitmap) GreaterOrEqual(other interface{}) bool {
if bm, ok := other.(Bitmap); ok && b.Equals(bm) {
return true
}
return b.Greater(other)
return b.Cmp(other) == 0
}
// Copy returns a duplicate value for the bitmap.
@ -180,7 +161,7 @@ func (b Bitmap) Add(other Bitmap) Bitmap {
// Sub returns a bitmap that treats both bitmaps as numbers and subtracts then via the inverse of the other and adding
// then together a + (-b). Negative bitmaps are not supported so other must be greater than this.
func (b Bitmap) Sub(other Bitmap) Bitmap {
if b.Less(other) {
if b.Cmp(other) < 0 {
// ToDo: Why is this not supported? Should it say not implemented? BitMap might have a generic use case outside of dht.
panic("negative bitmaps not supported")
}
@ -378,7 +359,7 @@ func Rand() Bitmap {
func RandInRangeP(low, high Bitmap) Bitmap {
diff := high.Sub(low)
r := Rand()
for r.Greater(diff) {
for r.Cmp(diff) > 0 {
r = r.Sub(diff)
}
//ToDo - Adding the low at this point doesn't gurantee it will be within the range. Consider bitmaps as numbers and
@ -401,3 +382,18 @@ func setBit(b []byte, n int, one bool) {
b[i] &= ^(1 << uint(7-j))
}
}
// CLosest returns the closest bitmap to target. if no bitmaps are provided, target itself is returned
func Closest(target Bitmap, bitmaps ...Bitmap) Bitmap {
if len(bitmaps) == 0 {
return target
}
var closest *Bitmap
for _, b := range bitmaps {
if closest == nil || target.Closer(b, *closest) {
closest = &b
}
}
return *closest
}

View file

@ -42,8 +42,12 @@ func TestBitmap(t *testing.T) {
t.Error(c.PrefixLen())
}
if b.Less(a) {
t.Error("bitmap fails lessThan test")
if b.Cmp(a) < 0 {
t.Error("bitmap fails Cmp test")
}
if a.Closer(c, b) || !a.Closer(b, c) || c.Closer(a, b) || c.Closer(b, c) {
t.Error("bitmap fails Closer test")
}
id := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"

View file

@ -35,7 +35,7 @@ const (
nodeIDBits = bits.NumBits // number of bits in node ID
messageIDLength = 20 // bytes.
udpRetry = 3
udpRetry = 1
udpTimeout = 5 * time.Second
udpMaxMessageLength = 4096 // bytes. I think our longest message is ~676 bytes, so I rounded up to 1024
// scratch that. a findValue could return more than K results if a lot of nodes are storing that value, so we need more buffer
@ -263,7 +263,7 @@ func (dht *DHT) announce(hash bits.Bitmap) error {
if len(contacts) < bucketSize {
// append self to contacts, and self-store
contacts = append(contacts, dht.contact)
} else if dht.node.id.Xor(hash).Less(contacts[bucketSize-1].ID.Xor(hash)) {
} else if hash.Closer(dht.node.id, contacts[bucketSize-1].ID) {
// pop last contact, and self-store instead
contacts[bucketSize-1] = dht.contact
}

View file

@ -144,11 +144,12 @@ func (n *Node) Connect(conn UDPConn) error {
}
}()
n.stop.Add(1)
go func() {
defer n.stop.Done()
n.startRoutingTableGrooming()
}()
// TODO: turn this back on when you're sure it works right
//n.stop.Add(1)
//go func() {
// defer n.stop.Done()
// n.startRoutingTableGrooming()
//}()
return nil
}
@ -302,7 +303,7 @@ func (n *Node) handleResponse(addr *net.UDPAddr, response Response) {
select {
case tx.res <- response:
default:
log.Errorf("[%s] query %s: response received but tx has no listener", n.id.HexShort(), response.ID.HexShort())
//log.Errorf("[%s] query %s: response received, but tx has no listener or multiple responses to the same tx", n.id.HexShort(), response.ID.HexShort())
}
}
@ -334,6 +335,9 @@ func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error {
err = n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err != nil {
if n.connClosed {
return nil
}
log.Error("error setting write deadline - ", err)
}
@ -423,10 +427,10 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request, opti
case res := <-tx.res:
ch <- &res
return
//TODO: does this belong here?
//case <-n.stop.Ch():
// return
case <-n.stop.Ch():
return
case <-ctx.Done():
// TODO: canceling these requests doesn't do much. we can probably stop supporting this feature and just use async
return
case <-time.After(udpTimeout):
}

View file

@ -1,20 +1,33 @@
package dht
import (
"context"
"sort"
"sync"
"time"
"github.com/lbryio/internal-apis/app/crypto"
"github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce"
"github.com/lbryio/reflector.go/dht/bits"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
"github.com/uber-go/atomic"
)
// TODO: iterativeFindValue may be stopping early. if it gets a response with one peer, it should keep going because other nodes may know about more peers that have that blob
// TODO: or, it should try a tcp handshake with peers as it finds them, to make sure they are still online and have the blob
var cfLog *logrus.Logger
func init() {
cfLog = logrus.StandardLogger()
}
func NodeFinderUserLogger(l *logrus.Logger) {
cfLog = l
}
type contactFinder struct {
findValue bool // true if we're using findValue
target bits.Bitmap
@ -32,8 +45,9 @@ type contactFinder struct {
shortlist []Contact
shortlistAdded map[bits.Bitmap]bool
outstandingRequestsMutex *sync.RWMutex
outstandingRequests uint
closestContactMutex *sync.RWMutex
closestContact *Contact
notGettingCloser *atomic.Bool
}
func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop stopOnce.Chan) ([]Contact, bool, error) {
@ -46,7 +60,8 @@ func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop s
shortlistMutex: &sync.Mutex{},
shortlistAdded: make(map[bits.Bitmap]bool),
stop: stopOnce.New(),
outstandingRequestsMutex: &sync.RWMutex{},
closestContactMutex: &sync.RWMutex{},
notGettingCloser: atomic.NewBool(false),
}
if upstreamStop != nil {
@ -73,23 +88,25 @@ func (cf *contactFinder) Find() ([]Contact, bool, error) {
} else {
cf.debug("starting iterativeFindNode")
}
cf.appendNewToShortlist(cf.node.rt.GetClosest(cf.target, alpha))
if len(cf.shortlist) == 0 {
return nil, false, errors.Err("[%s] find %s: no contacts in routing table", cf.node.id.HexShort(), cf.target.HexShort())
}
for i := 0; i < alpha; i++ {
cf.stop.Add(1)
go func(i int) {
defer cf.stop.Done()
cf.iterationWorker(i + 1)
}(i)
go cf.cycle(false)
timeout := 5 * time.Second
CycleLoop:
for {
select {
case <-time.After(timeout):
go cf.cycle(false)
case <-cf.stop.Ch():
break CycleLoop
}
}
cf.stop.Wait()
// TODO: what to do if we have less than K active contacts, shortlist is empty, but we
// TODO: have other contacts in our routing table whom we have not contacted. prolly contact them
// TODO: what to do if we have less than K active contacts, shortlist is empty, but we have other contacts in our routing table whom we have not contacted. prolly contact them
var contacts []Contact
var found bool
@ -107,25 +124,97 @@ func (cf *contactFinder) Find() ([]Contact, bool, error) {
return contacts, found, nil
}
func (cf *contactFinder) iterationWorker(num int) {
cf.debug("starting worker %d", num)
defer func() {
cf.debug("stopping worker %d", num)
// cycle does a single cycle of sending alpha probes and checking results against closestNode
func (cf *contactFinder) cycle(bigCycle bool) {
cycleID := crypto.RandString(6)
if bigCycle {
cf.debug("LAUNCHING CYCLE %s, AND ITS A BIG CYCLE", cycleID)
} else {
cf.debug("LAUNCHING CYCLE %s", cycleID)
}
defer cf.debug("CYCLE %s DONE", cycleID)
cf.closestContactMutex.RLock()
closestContact := cf.closestContact
cf.closestContactMutex.RUnlock()
var wg sync.WaitGroup
ch := make(chan *Contact)
limit := alpha
if bigCycle {
limit = bucketSize
}
for i := 0; i < limit; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ch <- cf.probe(cycleID)
}()
}
go func() {
wg.Wait()
close(ch)
}()
foundCloser := false
for {
c, more := <-ch
if !more {
break
}
if c != nil && (closestContact == nil || cf.target.Closer(c.ID, closestContact.ID)) {
if closestContact != nil {
cf.debug("|%s| best contact improved: %s -> %s", cycleID, closestContact.ID.HexShort(), c.ID.HexShort())
} else {
cf.debug("|%s| best contact starting at %s", cycleID, c.ID.HexShort())
}
foundCloser = true
closestContact = c
}
}
if cf.isSearchFinished() {
cf.stop.Stop()
return
}
if foundCloser {
cf.closestContactMutex.Lock()
// have to check again after locking in case other probes found a closer one in the meantime
if cf.closestContact == nil || cf.target.Closer(closestContact.ID, cf.closestContact.ID) {
cf.closestContact = closestContact
}
cf.closestContactMutex.Unlock()
go cf.cycle(false)
} else if !bigCycle {
cf.debug("|%s| no improvement, running big cycle", cycleID)
go cf.cycle(true)
} else {
// big cycle ran and there was no improvement, so we're done
cf.debug("|%s| big cycle ran, still no improvement", cycleID)
cf.notGettingCloser.Store(true)
}
}
// probe sends a single probe, updates the lists, and returns the closest contact it found
func (cf *contactFinder) probe(cycleID string) *Contact {
maybeContact := cf.popFromShortlist()
if maybeContact == nil {
// TODO: block if there are pending requests out from other workers. there may be more shortlist values coming
//log.Debugf("[%s] worker %d: no contacts in shortlist, waiting...", cf.node.id.HexShort(), num)
time.Sleep(100 * time.Millisecond)
} else {
contact := *maybeContact
if contact.ID.Equals(cf.node.id) {
continue // cannot contact self
cf.debug("|%s| no contacts in shortlist, returning", cycleID)
return nil
}
c := *maybeContact
if c.ID.Equals(cf.node.id) {
return nil
}
cf.debug("|%s| probe %s: launching", cycleID, c.ID.HexShort())
req := Request{Arg: &cf.target}
if cf.findValue {
req.Method = findValueMethod
@ -133,47 +222,53 @@ func (cf *contactFinder) iterationWorker(num int) {
req.Method = findNodeMethod
}
cf.debug("worker %d: contacting %s", num, contact.ID.HexShort())
cf.incrementOutstanding()
var res *Response
resCh, cancel := cf.node.SendCancelable(contact, req)
resCh := cf.node.SendAsync(context.Background(), c, req)
select {
case res = <-resCh:
case <-cf.stop.Ch():
cf.debug("worker %d: canceled", num)
cancel()
return
cf.debug("|%s| probe %s: canceled", cycleID, c.ID.HexShort())
return nil
}
if res == nil {
// nothing to do, response timed out
cf.debug("worker %d: search canceled or timed out waiting for %s", num, contact.ID.HexShort())
} else if cf.findValue && res.FindValueKey != "" {
cf.debug("worker %d: got value", num)
cf.debug("|%s| probe %s: req canceled or timed out", cycleID, c.ID.HexShort())
return nil
}
if cf.findValue && res.FindValueKey != "" {
cf.debug("|%s| probe %s: got value", cycleID, c.ID.HexShort())
cf.findValueMutex.Lock()
cf.findValueResult = res.Contacts
cf.findValueMutex.Unlock()
cf.stop.Stop()
return
} else {
cf.debug("worker %d: got contacts", num)
cf.insertIntoActiveList(contact)
return nil
}
cf.debug("|%s| probe %s: got %s", cycleID, c.ID.HexShort(), res.argsDebug())
cf.insertIntoActiveList(c)
cf.appendNewToShortlist(res.Contacts)
}
cf.decrementOutstanding() // this is all the way down here because we need to add to shortlist first
cf.activeContactsMutex.Lock()
contacts := cf.activeContacts
if len(contacts) > bucketSize {
contacts = contacts[:bucketSize]
}
contactsStr := ""
for _, c := range contacts {
contactsStr += c.ID.HexShort() + ", "
}
cf.activeContactsMutex.Unlock()
if cf.isSearchFinished() {
cf.debug("worker %d: search is finished", num)
cf.stop.Stop()
return
}
}
return cf.closest(res.Contacts...)
}
func (cf *contactFinder) probeClosestOutstanding() {
}
// appendNewToShortlist appends any new contacts to the shortlist and sorts it by distance
// contacts that have already been added to the shortlist in the past are ignored
func (cf *contactFinder) appendNewToShortlist(contacts []Contact) {
cf.shortlistMutex.Lock()
defer cf.shortlistMutex.Unlock()
@ -188,6 +283,7 @@ func (cf *contactFinder) appendNewToShortlist(contacts []Contact) {
sortInPlace(cf.shortlist, cf.target)
}
// popFromShortlist pops the first contact off the shortlist and returns it
func (cf *contactFinder) popFromShortlist() *Contact {
cf.shortlistMutex.Lock()
defer cf.shortlistMutex.Unlock()
@ -201,16 +297,14 @@ func (cf *contactFinder) popFromShortlist() *Contact {
return &first
}
// insertIntoActiveList inserts the contact into appropriate place in the list of active contacts (sorted by distance)
func (cf *contactFinder) insertIntoActiveList(contact Contact) {
cf.activeContactsMutex.Lock()
defer cf.activeContactsMutex.Unlock()
inserted := false
for i, n := range cf.activeContacts {
// 5000ft: insert contact into sorted active contacts list
// Detail: if diff between new contact id and the target id has fewer changes than the n contact from target
// it should be inserted in between the previous and current.
if contact.ID.Xor(cf.target).Less(n.ID.Xor(cf.target)) {
if cf.target.Closer(contact.ID, n.ID) {
cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...)
inserted = true
break
@ -221,6 +315,7 @@ func (cf *contactFinder) insertIntoActiveList(contact Contact) {
}
}
// isSearchFinished returns true if the search is done and should be stopped
func (cf *contactFinder) isSearchFinished() bool {
if cf.findValue && len(cf.findValueResult) > 0 {
return true
@ -232,47 +327,35 @@ func (cf *contactFinder) isSearchFinished() bool {
default:
}
if !cf.areRequestsOutstanding() {
cf.shortlistMutex.Lock()
defer cf.shortlistMutex.Unlock()
if len(cf.shortlist) == 0 {
if cf.notGettingCloser.Load() {
return true
}
cf.activeContactsMutex.Lock()
defer cf.activeContactsMutex.Unlock()
if len(cf.activeContacts) >= bucketSize && cf.activeContacts[bucketSize-1].ID.Xor(cf.target).Less(cf.shortlist[0].ID.Xor(cf.target)) {
// we have at least K active contacts, and we don't have any closer contacts to ping
if len(cf.activeContacts) >= bucketSize {
return true
}
}
return false
}
func (cf *contactFinder) incrementOutstanding() {
cf.outstandingRequestsMutex.Lock()
defer cf.outstandingRequestsMutex.Unlock()
cf.outstandingRequests++
}
func (cf *contactFinder) decrementOutstanding() {
cf.outstandingRequestsMutex.Lock()
defer cf.outstandingRequestsMutex.Unlock()
if cf.outstandingRequests > 0 {
cf.outstandingRequests--
}
}
func (cf *contactFinder) areRequestsOutstanding() bool {
cf.outstandingRequestsMutex.RLock()
defer cf.outstandingRequestsMutex.RUnlock()
return cf.outstandingRequests > 0
func (cf *contactFinder) debug(format string, args ...interface{}) {
args = append([]interface{}{cf.node.id.HexShort()}, append([]interface{}{cf.target.HexShort()}, args...)...)
cfLog.Debugf("[%s] find %s: "+format, args...)
}
func (cf *contactFinder) debug(format string, args ...interface{}) {
args = append([]interface{}{cf.node.id.HexShort()}, append([]interface{}{cf.target.Hex()}, args...)...)
log.Debugf("[%s] find %s: "+format, args...)
func (cf *contactFinder) closest(contacts ...Contact) *Contact {
if len(contacts) == 0 {
return nil
}
closest := contacts[0]
for _, c := range contacts {
if cf.target.Closer(c.ID, closest.ID) {
closest = c
}
}
return &closest
}
func sortInPlace(contacts []Contact, target bits.Bitmap) {

View file

@ -131,7 +131,7 @@ func TestRoutingTable_BucketRanges(t *testing.T) {
randID := bits.Rand()
found := -1
for i, r := range ranges {
if r.Start.LessOrEqual(randID) && r.End.GreaterOrEqual(randID) {
if r.Start.Cmp(randID) <= 0 && r.End.Cmp(randID) >= 0 {
if found >= 0 {
t.Errorf("%s appears in buckets %d and %d", randID.Hex(), found, i)
} else {
@ -154,7 +154,7 @@ func TestRoutingTable_Save(t *testing.T) {
for i, r := range ranges {
for j := 0; j < bucketSize; j++ {
toAdd := r.Start.Add(bits.FromShortHexP(strconv.Itoa(j)))
if toAdd.LessOrEqual(r.End) {
if toAdd.Cmp(r.End) <= 0 {
rt.Update(Contact{
ID: r.Start.Add(bits.FromShortHexP(strconv.Itoa(j))),
IP: net.ParseIP("1.2.3." + strconv.Itoa(j)),