From e534f5b972f3a28a808da081eec5cbfbac85c318 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 25 Jun 2018 15:48:57 -0400 Subject: [PATCH] correct node_finder to use loose parallelism --- dht/bits/bitmap.go | 86 ++++++----- dht/bits/bitmap_test.go | 8 +- dht/dht.go | 4 +- dht/node.go | 22 +-- dht/node_finder.go | 303 ++++++++++++++++++++++++-------------- dht/routing_table_test.go | 4 +- 6 files changed, 257 insertions(+), 170 deletions(-) diff --git a/dht/bits/bitmap.go b/dht/bits/bitmap.go index 4edffcc..0605fb6 100644 --- a/dht/bits/bitmap.go +++ b/dht/bits/bitmap.go @@ -65,50 +65,31 @@ func (b Bitmap) Big() *big.Int { return i } -// Equals returns T/F if every byte in bitmap are equal. +// Cmp compares b and other and returns: +// +// -1 if b < other +// 0 if b == other +// +1 if b > other +// +func (b Bitmap) Cmp(other Bitmap) int { + for k := range b { + if b[k] < other[k] { + return -1 + } else if b[k] > other[k] { + return 1 + } + } + return 0 +} + +// Closer returns true if dist(b,x) < dist(b,y) +func (b Bitmap) Closer(x, y Bitmap) bool { + return x.Xor(b).Cmp(y.Xor(b)) < 0 +} + +// Equals returns true if every byte in bitmap are equal, false otherwise func (b Bitmap) Equals(other Bitmap) bool { - for k := range b { - if b[k] != other[k] { - return false - } - } - return true -} - -// Less returns T/F if there exists a byte pair that is not equal AND this bitmap is less than the other. -func (b Bitmap) Less(other interface{}) bool { - for k := range b { - if b[k] != other.(Bitmap)[k] { - return b[k] < other.(Bitmap)[k] - } - } - return false -} - -// LessOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is less than the other. -func (b Bitmap) LessOrEqual(other interface{}) bool { - if bm, ok := other.(Bitmap); ok && b.Equals(bm) { - return true - } - return b.Less(other) -} - -// Greater returns T/F if there exists a byte pair that is not equal AND this bitmap byte is greater than the other. -func (b Bitmap) Greater(other interface{}) bool { - for k := range b { - if b[k] != other.(Bitmap)[k] { - return b[k] > other.(Bitmap)[k] - } - } - return false -} - -// GreaterOrEqual returns true if the bitmaps are equal, otherwise it checks if this bitmap is greater than the other. -func (b Bitmap) GreaterOrEqual(other interface{}) bool { - if bm, ok := other.(Bitmap); ok && b.Equals(bm) { - return true - } - return b.Greater(other) + return b.Cmp(other) == 0 } // Copy returns a duplicate value for the bitmap. @@ -180,7 +161,7 @@ func (b Bitmap) Add(other Bitmap) Bitmap { // Sub returns a bitmap that treats both bitmaps as numbers and subtracts then via the inverse of the other and adding // then together a + (-b). Negative bitmaps are not supported so other must be greater than this. func (b Bitmap) Sub(other Bitmap) Bitmap { - if b.Less(other) { + if b.Cmp(other) < 0 { // ToDo: Why is this not supported? Should it say not implemented? BitMap might have a generic use case outside of dht. panic("negative bitmaps not supported") } @@ -378,7 +359,7 @@ func Rand() Bitmap { func RandInRangeP(low, high Bitmap) Bitmap { diff := high.Sub(low) r := Rand() - for r.Greater(diff) { + for r.Cmp(diff) > 0 { r = r.Sub(diff) } //ToDo - Adding the low at this point doesn't gurantee it will be within the range. Consider bitmaps as numbers and @@ -401,3 +382,18 @@ func setBit(b []byte, n int, one bool) { b[i] &= ^(1 << uint(7-j)) } } + +// CLosest returns the closest bitmap to target. if no bitmaps are provided, target itself is returned +func Closest(target Bitmap, bitmaps ...Bitmap) Bitmap { + if len(bitmaps) == 0 { + return target + } + + var closest *Bitmap + for _, b := range bitmaps { + if closest == nil || target.Closer(b, *closest) { + closest = &b + } + } + return *closest +} diff --git a/dht/bits/bitmap_test.go b/dht/bits/bitmap_test.go index 482d0d5..d709d7a 100644 --- a/dht/bits/bitmap_test.go +++ b/dht/bits/bitmap_test.go @@ -42,8 +42,12 @@ func TestBitmap(t *testing.T) { t.Error(c.PrefixLen()) } - if b.Less(a) { - t.Error("bitmap fails lessThan test") + if b.Cmp(a) < 0 { + t.Error("bitmap fails Cmp test") + } + + if a.Closer(c, b) || !a.Closer(b, c) || c.Closer(a, b) || c.Closer(b, c) { + t.Error("bitmap fails Closer test") } id := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" diff --git a/dht/dht.go b/dht/dht.go index 2873d43..a37b42f 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -35,7 +35,7 @@ const ( nodeIDBits = bits.NumBits // number of bits in node ID messageIDLength = 20 // bytes. - udpRetry = 3 + udpRetry = 1 udpTimeout = 5 * time.Second udpMaxMessageLength = 4096 // bytes. I think our longest message is ~676 bytes, so I rounded up to 1024 // scratch that. a findValue could return more than K results if a lot of nodes are storing that value, so we need more buffer @@ -263,7 +263,7 @@ func (dht *DHT) announce(hash bits.Bitmap) error { if len(contacts) < bucketSize { // append self to contacts, and self-store contacts = append(contacts, dht.contact) - } else if dht.node.id.Xor(hash).Less(contacts[bucketSize-1].ID.Xor(hash)) { + } else if hash.Closer(dht.node.id, contacts[bucketSize-1].ID) { // pop last contact, and self-store instead contacts[bucketSize-1] = dht.contact } diff --git a/dht/node.go b/dht/node.go index 0bb5cc6..e2343e5 100644 --- a/dht/node.go +++ b/dht/node.go @@ -144,11 +144,12 @@ func (n *Node) Connect(conn UDPConn) error { } }() - n.stop.Add(1) - go func() { - defer n.stop.Done() - n.startRoutingTableGrooming() - }() + // TODO: turn this back on when you're sure it works right + //n.stop.Add(1) + //go func() { + // defer n.stop.Done() + // n.startRoutingTableGrooming() + //}() return nil } @@ -302,7 +303,7 @@ func (n *Node) handleResponse(addr *net.UDPAddr, response Response) { select { case tx.res <- response: default: - log.Errorf("[%s] query %s: response received but tx has no listener", n.id.HexShort(), response.ID.HexShort()) + //log.Errorf("[%s] query %s: response received, but tx has no listener or multiple responses to the same tx", n.id.HexShort(), response.ID.HexShort()) } } @@ -334,6 +335,9 @@ func (n *Node) sendMessage(addr *net.UDPAddr, data Message) error { err = n.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err != nil { + if n.connClosed { + return nil + } log.Error("error setting write deadline - ", err) } @@ -423,10 +427,10 @@ func (n *Node) SendAsync(ctx context.Context, contact Contact, req Request, opti case res := <-tx.res: ch <- &res return - //TODO: does this belong here? - //case <-n.stop.Ch(): - // return + case <-n.stop.Ch(): + return case <-ctx.Done(): + // TODO: canceling these requests doesn't do much. we can probably stop supporting this feature and just use async return case <-time.After(udpTimeout): } diff --git a/dht/node_finder.go b/dht/node_finder.go index ad5f894..00c5c2a 100644 --- a/dht/node_finder.go +++ b/dht/node_finder.go @@ -1,20 +1,33 @@ package dht import ( + "context" "sort" "sync" "time" + "github.com/lbryio/internal-apis/app/crypto" "github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/reflector.go/dht/bits" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" + "github.com/uber-go/atomic" ) // TODO: iterativeFindValue may be stopping early. if it gets a response with one peer, it should keep going because other nodes may know about more peers that have that blob // TODO: or, it should try a tcp handshake with peers as it finds them, to make sure they are still online and have the blob +var cfLog *logrus.Logger + +func init() { + cfLog = logrus.StandardLogger() +} + +func NodeFinderUserLogger(l *logrus.Logger) { + cfLog = l +} + type contactFinder struct { findValue bool // true if we're using findValue target bits.Bitmap @@ -32,8 +45,9 @@ type contactFinder struct { shortlist []Contact shortlistAdded map[bits.Bitmap]bool - outstandingRequestsMutex *sync.RWMutex - outstandingRequests uint + closestContactMutex *sync.RWMutex + closestContact *Contact + notGettingCloser *atomic.Bool } func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop stopOnce.Chan) ([]Contact, bool, error) { @@ -46,7 +60,8 @@ func FindContacts(node *Node, target bits.Bitmap, findValue bool, upstreamStop s shortlistMutex: &sync.Mutex{}, shortlistAdded: make(map[bits.Bitmap]bool), stop: stopOnce.New(), - outstandingRequestsMutex: &sync.RWMutex{}, + closestContactMutex: &sync.RWMutex{}, + notGettingCloser: atomic.NewBool(false), } if upstreamStop != nil { @@ -73,23 +88,25 @@ func (cf *contactFinder) Find() ([]Contact, bool, error) { } else { cf.debug("starting iterativeFindNode") } + cf.appendNewToShortlist(cf.node.rt.GetClosest(cf.target, alpha)) if len(cf.shortlist) == 0 { return nil, false, errors.Err("[%s] find %s: no contacts in routing table", cf.node.id.HexShort(), cf.target.HexShort()) } - for i := 0; i < alpha; i++ { - cf.stop.Add(1) - go func(i int) { - defer cf.stop.Done() - cf.iterationWorker(i + 1) - }(i) + go cf.cycle(false) + timeout := 5 * time.Second +CycleLoop: + for { + select { + case <-time.After(timeout): + go cf.cycle(false) + case <-cf.stop.Ch(): + break CycleLoop + } } - cf.stop.Wait() - - // TODO: what to do if we have less than K active contacts, shortlist is empty, but we - // TODO: have other contacts in our routing table whom we have not contacted. prolly contact them + // TODO: what to do if we have less than K active contacts, shortlist is empty, but we have other contacts in our routing table whom we have not contacted. prolly contact them var contacts []Contact var found bool @@ -107,73 +124,151 @@ func (cf *contactFinder) Find() ([]Contact, bool, error) { return contacts, found, nil } -func (cf *contactFinder) iterationWorker(num int) { - cf.debug("starting worker %d", num) - defer func() { - cf.debug("stopping worker %d", num) +// cycle does a single cycle of sending alpha probes and checking results against closestNode +func (cf *contactFinder) cycle(bigCycle bool) { + cycleID := crypto.RandString(6) + if bigCycle { + cf.debug("LAUNCHING CYCLE %s, AND ITS A BIG CYCLE", cycleID) + } else { + cf.debug("LAUNCHING CYCLE %s", cycleID) + } + defer cf.debug("CYCLE %s DONE", cycleID) + + cf.closestContactMutex.RLock() + closestContact := cf.closestContact + cf.closestContactMutex.RUnlock() + + var wg sync.WaitGroup + ch := make(chan *Contact) + + limit := alpha + if bigCycle { + limit = bucketSize + } + + for i := 0; i < limit; i++ { + wg.Add(1) + go func() { + defer wg.Done() + ch <- cf.probe(cycleID) + }() + } + + go func() { + wg.Wait() + close(ch) }() + foundCloser := false for { - maybeContact := cf.popFromShortlist() - if maybeContact == nil { - // TODO: block if there are pending requests out from other workers. there may be more shortlist values coming - //log.Debugf("[%s] worker %d: no contacts in shortlist, waiting...", cf.node.id.HexShort(), num) - time.Sleep(100 * time.Millisecond) - } else { - contact := *maybeContact - - if contact.ID.Equals(cf.node.id) { - continue // cannot contact self - } - - req := Request{Arg: &cf.target} - if cf.findValue { - req.Method = findValueMethod - } else { - req.Method = findNodeMethod - } - - cf.debug("worker %d: contacting %s", num, contact.ID.HexShort()) - - cf.incrementOutstanding() - - var res *Response - resCh, cancel := cf.node.SendCancelable(contact, req) - select { - case res = <-resCh: - case <-cf.stop.Ch(): - cf.debug("worker %d: canceled", num) - cancel() - return - } - - if res == nil { - // nothing to do, response timed out - cf.debug("worker %d: search canceled or timed out waiting for %s", num, contact.ID.HexShort()) - } else if cf.findValue && res.FindValueKey != "" { - cf.debug("worker %d: got value", num) - cf.findValueMutex.Lock() - cf.findValueResult = res.Contacts - cf.findValueMutex.Unlock() - cf.stop.Stop() - return - } else { - cf.debug("worker %d: got contacts", num) - cf.insertIntoActiveList(contact) - cf.appendNewToShortlist(res.Contacts) - } - - cf.decrementOutstanding() // this is all the way down here because we need to add to shortlist first + c, more := <-ch + if !more { + break } - - if cf.isSearchFinished() { - cf.debug("worker %d: search is finished", num) - cf.stop.Stop() - return + if c != nil && (closestContact == nil || cf.target.Closer(c.ID, closestContact.ID)) { + if closestContact != nil { + cf.debug("|%s| best contact improved: %s -> %s", cycleID, closestContact.ID.HexShort(), c.ID.HexShort()) + } else { + cf.debug("|%s| best contact starting at %s", cycleID, c.ID.HexShort()) + } + foundCloser = true + closestContact = c } } + + if cf.isSearchFinished() { + cf.stop.Stop() + return + } + + if foundCloser { + cf.closestContactMutex.Lock() + // have to check again after locking in case other probes found a closer one in the meantime + if cf.closestContact == nil || cf.target.Closer(closestContact.ID, cf.closestContact.ID) { + cf.closestContact = closestContact + } + cf.closestContactMutex.Unlock() + go cf.cycle(false) + } else if !bigCycle { + cf.debug("|%s| no improvement, running big cycle", cycleID) + go cf.cycle(true) + } else { + // big cycle ran and there was no improvement, so we're done + cf.debug("|%s| big cycle ran, still no improvement", cycleID) + cf.notGettingCloser.Store(true) + } } +// probe sends a single probe, updates the lists, and returns the closest contact it found +func (cf *contactFinder) probe(cycleID string) *Contact { + maybeContact := cf.popFromShortlist() + if maybeContact == nil { + cf.debug("|%s| no contacts in shortlist, returning", cycleID) + return nil + } + + c := *maybeContact + + if c.ID.Equals(cf.node.id) { + return nil + } + + cf.debug("|%s| probe %s: launching", cycleID, c.ID.HexShort()) + + req := Request{Arg: &cf.target} + if cf.findValue { + req.Method = findValueMethod + } else { + req.Method = findNodeMethod + } + + var res *Response + resCh := cf.node.SendAsync(context.Background(), c, req) + select { + case res = <-resCh: + case <-cf.stop.Ch(): + cf.debug("|%s| probe %s: canceled", cycleID, c.ID.HexShort()) + return nil + } + + if res == nil { + cf.debug("|%s| probe %s: req canceled or timed out", cycleID, c.ID.HexShort()) + return nil + } + + if cf.findValue && res.FindValueKey != "" { + cf.debug("|%s| probe %s: got value", cycleID, c.ID.HexShort()) + cf.findValueMutex.Lock() + cf.findValueResult = res.Contacts + cf.findValueMutex.Unlock() + cf.stop.Stop() + return nil + } + + cf.debug("|%s| probe %s: got %s", cycleID, c.ID.HexShort(), res.argsDebug()) + cf.insertIntoActiveList(c) + cf.appendNewToShortlist(res.Contacts) + + cf.activeContactsMutex.Lock() + contacts := cf.activeContacts + if len(contacts) > bucketSize { + contacts = contacts[:bucketSize] + } + contactsStr := "" + for _, c := range contacts { + contactsStr += c.ID.HexShort() + ", " + } + cf.activeContactsMutex.Unlock() + + return cf.closest(res.Contacts...) +} + +func (cf *contactFinder) probeClosestOutstanding() { + +} + +// appendNewToShortlist appends any new contacts to the shortlist and sorts it by distance +// contacts that have already been added to the shortlist in the past are ignored func (cf *contactFinder) appendNewToShortlist(contacts []Contact) { cf.shortlistMutex.Lock() defer cf.shortlistMutex.Unlock() @@ -188,6 +283,7 @@ func (cf *contactFinder) appendNewToShortlist(contacts []Contact) { sortInPlace(cf.shortlist, cf.target) } +// popFromShortlist pops the first contact off the shortlist and returns it func (cf *contactFinder) popFromShortlist() *Contact { cf.shortlistMutex.Lock() defer cf.shortlistMutex.Unlock() @@ -201,16 +297,14 @@ func (cf *contactFinder) popFromShortlist() *Contact { return &first } +// insertIntoActiveList inserts the contact into appropriate place in the list of active contacts (sorted by distance) func (cf *contactFinder) insertIntoActiveList(contact Contact) { cf.activeContactsMutex.Lock() defer cf.activeContactsMutex.Unlock() inserted := false for i, n := range cf.activeContacts { - // 5000ft: insert contact into sorted active contacts list - // Detail: if diff between new contact id and the target id has fewer changes than the n contact from target - // it should be inserted in between the previous and current. - if contact.ID.Xor(cf.target).Less(n.ID.Xor(cf.target)) { + if cf.target.Closer(contact.ID, n.ID) { cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...) inserted = true break @@ -221,6 +315,7 @@ func (cf *contactFinder) insertIntoActiveList(contact Contact) { } } +// isSearchFinished returns true if the search is done and should be stopped func (cf *contactFinder) isSearchFinished() bool { if cf.findValue && len(cf.findValueResult) > 0 { return true @@ -232,47 +327,35 @@ func (cf *contactFinder) isSearchFinished() bool { default: } - if !cf.areRequestsOutstanding() { - cf.shortlistMutex.Lock() - defer cf.shortlistMutex.Unlock() + if cf.notGettingCloser.Load() { + return true + } - if len(cf.shortlist) == 0 { - return true - } - - cf.activeContactsMutex.Lock() - defer cf.activeContactsMutex.Unlock() - - if len(cf.activeContacts) >= bucketSize && cf.activeContacts[bucketSize-1].ID.Xor(cf.target).Less(cf.shortlist[0].ID.Xor(cf.target)) { - // we have at least K active contacts, and we don't have any closer contacts to ping - return true - } + cf.activeContactsMutex.Lock() + defer cf.activeContactsMutex.Unlock() + if len(cf.activeContacts) >= bucketSize { + return true } return false } -func (cf *contactFinder) incrementOutstanding() { - cf.outstandingRequestsMutex.Lock() - defer cf.outstandingRequestsMutex.Unlock() - cf.outstandingRequests++ -} -func (cf *contactFinder) decrementOutstanding() { - cf.outstandingRequestsMutex.Lock() - defer cf.outstandingRequestsMutex.Unlock() - if cf.outstandingRequests > 0 { - cf.outstandingRequests-- - } -} -func (cf *contactFinder) areRequestsOutstanding() bool { - cf.outstandingRequestsMutex.RLock() - defer cf.outstandingRequestsMutex.RUnlock() - return cf.outstandingRequests > 0 +func (cf *contactFinder) debug(format string, args ...interface{}) { + args = append([]interface{}{cf.node.id.HexShort()}, append([]interface{}{cf.target.HexShort()}, args...)...) + cfLog.Debugf("[%s] find %s: "+format, args...) } -func (cf *contactFinder) debug(format string, args ...interface{}) { - args = append([]interface{}{cf.node.id.HexShort()}, append([]interface{}{cf.target.Hex()}, args...)...) - log.Debugf("[%s] find %s: "+format, args...) +func (cf *contactFinder) closest(contacts ...Contact) *Contact { + if len(contacts) == 0 { + return nil + } + closest := contacts[0] + for _, c := range contacts { + if cf.target.Closer(c.ID, closest.ID) { + closest = c + } + } + return &closest } func sortInPlace(contacts []Contact, target bits.Bitmap) { diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go index ff5c866..8850f98 100644 --- a/dht/routing_table_test.go +++ b/dht/routing_table_test.go @@ -131,7 +131,7 @@ func TestRoutingTable_BucketRanges(t *testing.T) { randID := bits.Rand() found := -1 for i, r := range ranges { - if r.Start.LessOrEqual(randID) && r.End.GreaterOrEqual(randID) { + if r.Start.Cmp(randID) <= 0 && r.End.Cmp(randID) >= 0 { if found >= 0 { t.Errorf("%s appears in buckets %d and %d", randID.Hex(), found, i) } else { @@ -154,7 +154,7 @@ func TestRoutingTable_Save(t *testing.T) { for i, r := range ranges { for j := 0; j < bucketSize; j++ { toAdd := r.Start.Add(bits.FromShortHexP(strconv.Itoa(j))) - if toAdd.LessOrEqual(r.End) { + if toAdd.Cmp(r.End) <= 0 { rt.Update(Contact{ ID: r.Start.Add(bits.FromShortHexP(strconv.Itoa(j))), IP: net.ParseIP("1.2.3." + strconv.Itoa(j)),