reflector.go/dht/node_finder.go

272 lines
6.8 KiB
Go
Raw Normal View History

package dht
import (
"context"
2018-04-28 02:16:12 +02:00
"sort"
"sync"
2018-04-05 17:35:57 +02:00
"time"
"github.com/lbryio/errors.go"
"github.com/lbryio/lbry.go/stopOnce"
log "github.com/sirupsen/logrus"
)
2018-04-28 02:16:12 +02:00
type contactFinder struct {
findValue bool // true if we're using findValue
target Bitmap
2018-04-28 02:16:12 +02:00
node *Node
2018-05-13 22:02:46 +02:00
done *stopOnce.Stopper
doneWG *sync.WaitGroup
findValueMutex *sync.Mutex
2018-04-28 02:16:12 +02:00
findValueResult []Contact
2018-04-28 02:16:12 +02:00
activeContactsMutex *sync.Mutex
activeContacts []Contact
2018-04-05 17:35:57 +02:00
shortlistMutex *sync.Mutex
2018-04-28 02:16:12 +02:00
shortlist []Contact
shortlistAdded map[Bitmap]bool
outstandingRequestsMutex *sync.RWMutex
outstandingRequests uint
}
type findNodeResponse struct {
2018-04-28 02:16:12 +02:00
Found bool
Contacts []Contact
}
2018-04-28 02:16:12 +02:00
func newContactFinder(node *Node, target Bitmap, findValue bool) *contactFinder {
return &contactFinder{
node: node,
target: target,
findValue: findValue,
findValueMutex: &sync.Mutex{},
activeContactsMutex: &sync.Mutex{},
shortlistMutex: &sync.Mutex{},
shortlistAdded: make(map[Bitmap]bool),
done: stopOnce.New(),
2018-05-13 22:02:46 +02:00
doneWG: &sync.WaitGroup{},
outstandingRequestsMutex: &sync.RWMutex{},
}
}
2018-05-13 22:02:46 +02:00
func (cf *contactFinder) Cancel() {
cf.done.Stop()
cf.doneWG.Wait()
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) Find() (findNodeResponse, error) {
if cf.findValue {
log.Debugf("[%s] starting an iterative Find for the value %s", cf.node.id.HexShort(), cf.target.HexShort())
} else {
2018-04-28 02:16:12 +02:00
log.Debugf("[%s] starting an iterative Find for contacts near %s", cf.node.id.HexShort(), cf.target.HexShort())
}
2018-04-28 02:16:12 +02:00
cf.appendNewToShortlist(cf.node.rt.GetClosest(cf.target, alpha))
if len(cf.shortlist) == 0 {
return findNodeResponse{}, errors.Err("no contacts in routing table")
}
for i := 0; i < alpha; i++ {
2018-05-13 22:02:46 +02:00
cf.doneWG.Add(1)
go func(i int) {
2018-05-13 22:02:46 +02:00
defer cf.doneWG.Done()
2018-04-28 02:16:12 +02:00
cf.iterationWorker(i + 1)
}(i)
}
2018-05-13 22:02:46 +02:00
cf.doneWG.Wait()
2018-04-28 02:16:12 +02:00
// TODO: what to do if we have less than K active contacts, shortlist is empty, but we
// TODO: have other contacts in our routing table whom we have not contacted. prolly contact them
result := findNodeResponse{}
2018-04-28 02:16:12 +02:00
if cf.findValue && len(cf.findValueResult) > 0 {
result.Found = true
2018-04-28 02:16:12 +02:00
result.Contacts = cf.findValueResult
} else {
2018-04-28 02:16:12 +02:00
result.Contacts = cf.activeContacts
if len(result.Contacts) > bucketSize {
result.Contacts = result.Contacts[:bucketSize]
}
}
return result, nil
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) iterationWorker(num int) {
log.Debugf("[%s] starting worker %d", cf.node.id.HexShort(), num)
defer func() { log.Debugf("[%s] stopping worker %d", cf.node.id.HexShort(), num) }()
for {
2018-04-28 02:16:12 +02:00
maybeContact := cf.popFromShortlist()
if maybeContact == nil {
// TODO: block if there are pending requests out from other workers. there may be more shortlist values coming
2018-04-28 02:16:12 +02:00
log.Debugf("[%s] worker %d: no contacts in shortlist, waiting...", cf.node.id.HexShort(), num)
time.Sleep(100 * time.Millisecond)
} else {
2018-04-28 02:16:12 +02:00
contact := *maybeContact
2018-04-05 17:35:57 +02:00
2018-04-28 02:16:12 +02:00
if contact.id.Equals(cf.node.id) {
2018-04-05 17:35:57 +02:00
continue // cannot contact self
}
2018-04-28 02:16:12 +02:00
req := Request{Arg: &cf.target}
if cf.findValue {
2018-04-05 17:35:57 +02:00
req.Method = findValueMethod
} else {
req.Method = findNodeMethod
}
2018-04-28 02:16:12 +02:00
log.Debugf("[%s] worker %d: contacting %s", cf.node.id.HexShort(), num, contact.id.HexShort())
2018-04-28 02:16:12 +02:00
cf.incrementOutstanding()
2018-04-05 17:35:57 +02:00
var res *Response
ctx, cancel := context.WithCancel(context.Background())
2018-04-28 02:16:12 +02:00
resCh := cf.node.SendAsync(ctx, contact, req)
2018-04-05 17:35:57 +02:00
select {
case res = <-resCh:
2018-04-28 02:16:12 +02:00
case <-cf.done.Chan():
log.Debugf("[%s] worker %d: canceled", cf.node.id.HexShort(), num)
2018-04-05 17:35:57 +02:00
cancel()
return
}
if res == nil {
// nothing to do, response timed out
2018-05-13 22:02:46 +02:00
log.Debugf("[%s] worker %d: search canceled or timed out waiting for %s", cf.node.id.HexShort(), num, contact.id.HexShort())
2018-04-28 02:16:12 +02:00
} else if cf.findValue && res.FindValueKey != "" {
log.Debugf("[%s] worker %d: got value", cf.node.id.HexShort(), num)
cf.findValueMutex.Lock()
cf.findValueResult = res.Contacts
cf.findValueMutex.Unlock()
cf.done.Stop()
2018-04-05 17:35:57 +02:00
return
} else {
2018-04-28 02:16:12 +02:00
log.Debugf("[%s] worker %d: got contacts", cf.node.id.HexShort(), num)
cf.insertIntoActiveList(contact)
cf.appendNewToShortlist(res.Contacts)
2018-04-05 17:35:57 +02:00
}
2018-04-28 02:16:12 +02:00
cf.decrementOutstanding() // this is all the way down here because we need to add to shortlist first
}
2018-04-28 02:16:12 +02:00
if cf.isSearchFinished() {
log.Debugf("[%s] worker %d: search is finished", cf.node.id.HexShort(), num)
cf.done.Stop()
return
}
}
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) appendNewToShortlist(contacts []Contact) {
cf.shortlistMutex.Lock()
defer cf.shortlistMutex.Unlock()
2018-04-28 02:16:12 +02:00
for _, c := range contacts {
if _, ok := cf.shortlistAdded[c.id]; !ok {
cf.shortlist = append(cf.shortlist, c)
cf.shortlistAdded[c.id] = true
}
}
2018-04-28 02:16:12 +02:00
sortInPlace(cf.shortlist, cf.target)
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) popFromShortlist() *Contact {
cf.shortlistMutex.Lock()
defer cf.shortlistMutex.Unlock()
2018-04-28 02:16:12 +02:00
if len(cf.shortlist) == 0 {
return nil
}
2018-04-28 02:16:12 +02:00
first := cf.shortlist[0]
cf.shortlist = cf.shortlist[1:]
return &first
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) insertIntoActiveList(contact Contact) {
cf.activeContactsMutex.Lock()
defer cf.activeContactsMutex.Unlock()
inserted := false
2018-04-28 02:16:12 +02:00
for i, n := range cf.activeContacts {
if contact.id.Xor(cf.target).Less(n.id.Xor(cf.target)) {
cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...)
inserted = true
2018-04-05 17:35:57 +02:00
break
}
}
if !inserted {
2018-04-28 02:16:12 +02:00
cf.activeContacts = append(cf.activeContacts, contact)
}
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) isSearchFinished() bool {
if cf.findValue && len(cf.findValueResult) > 0 {
return true
}
select {
2018-04-28 02:16:12 +02:00
case <-cf.done.Chan():
return true
default:
}
2018-04-28 02:16:12 +02:00
if !cf.areRequestsOutstanding() {
cf.shortlistMutex.Lock()
defer cf.shortlistMutex.Unlock()
2018-04-28 02:16:12 +02:00
if len(cf.shortlist) == 0 {
return true
}
2018-04-28 02:16:12 +02:00
cf.activeContactsMutex.Lock()
defer cf.activeContactsMutex.Unlock()
2018-04-28 02:16:12 +02:00
if len(cf.activeContacts) >= bucketSize && cf.activeContacts[bucketSize-1].id.Xor(cf.target).Less(cf.shortlist[0].id.Xor(cf.target)) {
// we have at least K active contacts, and we don't have any closer contacts to ping
return true
}
}
return false
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) incrementOutstanding() {
cf.outstandingRequestsMutex.Lock()
defer cf.outstandingRequestsMutex.Unlock()
cf.outstandingRequests++
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) decrementOutstanding() {
cf.outstandingRequestsMutex.Lock()
defer cf.outstandingRequestsMutex.Unlock()
if cf.outstandingRequests > 0 {
cf.outstandingRequests--
}
}
2018-04-28 02:16:12 +02:00
func (cf *contactFinder) areRequestsOutstanding() bool {
cf.outstandingRequestsMutex.RLock()
defer cf.outstandingRequestsMutex.RUnlock()
return cf.outstandingRequests > 0
}
func sortInPlace(contacts []Contact, target Bitmap) {
toSort := make([]sortedContact, len(contacts))
for i, n := range contacts {
toSort[i] = sortedContact{n, n.id.Xor(target)}
}
sort.Sort(byXorDistance(toSort))
for i, c := range toSort {
contacts[i] = c.contact
}
}