2018-04-03 20:00:35 +02:00
package dht
import (
"context"
2018-04-28 02:16:12 +02:00
"sort"
2018-04-03 20:00:35 +02:00
"sync"
2018-04-05 17:35:57 +02:00
"time"
2018-04-03 20:00:35 +02:00
"github.com/lbryio/errors.go"
"github.com/lbryio/lbry.go/stopOnce"
log "github.com/sirupsen/logrus"
)
2018-05-15 02:55:45 +02:00
// TODO: iterativeFindValue may be stopping early. if it gets a response with one peer, it should keep going because other nodes may know about more peers that have that blob
// TODO: or, it should try a tcp handshake with peers as it finds them, to make sure they are still online and have the blob
2018-04-28 02:16:12 +02:00
type contactFinder struct {
2018-04-03 20:00:35 +02:00
findValue bool // true if we're using findValue
2018-04-05 22:05:28 +02:00
target Bitmap
2018-04-28 02:16:12 +02:00
node * Node
2018-04-03 20:00:35 +02:00
2018-05-13 22:02:46 +02:00
done * stopOnce . Stopper
doneWG * sync . WaitGroup
2018-04-03 20:00:35 +02:00
findValueMutex * sync . Mutex
2018-04-28 02:16:12 +02:00
findValueResult [ ] Contact
2018-04-03 20:00:35 +02:00
2018-04-28 02:16:12 +02:00
activeContactsMutex * sync . Mutex
activeContacts [ ] Contact
2018-04-03 20:00:35 +02:00
2018-04-05 17:35:57 +02:00
shortlistMutex * sync . Mutex
2018-04-28 02:16:12 +02:00
shortlist [ ] Contact
2018-04-05 22:05:28 +02:00
shortlistAdded map [ Bitmap ] bool
outstandingRequestsMutex * sync . RWMutex
outstandingRequests uint
2018-04-03 20:00:35 +02:00
}
type findNodeResponse struct {
2018-04-28 02:16:12 +02:00
Found bool
Contacts [ ] Contact
2018-04-03 20:00:35 +02:00
}
2018-04-28 02:16:12 +02:00
func newContactFinder ( node * Node , target Bitmap , findValue bool ) * contactFinder {
return & contactFinder {
node : node ,
target : target ,
findValue : findValue ,
findValueMutex : & sync . Mutex { } ,
activeContactsMutex : & sync . Mutex { } ,
shortlistMutex : & sync . Mutex { } ,
shortlistAdded : make ( map [ Bitmap ] bool ) ,
done : stopOnce . New ( ) ,
2018-05-13 22:02:46 +02:00
doneWG : & sync . WaitGroup { } ,
2018-04-05 22:05:28 +02:00
outstandingRequestsMutex : & sync . RWMutex { } ,
2018-04-03 20:00:35 +02:00
}
}
2018-05-13 22:02:46 +02:00
func ( cf * contactFinder ) Cancel ( ) {
cf . done . Stop ( )
cf . doneWG . Wait ( )
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) Find ( ) ( findNodeResponse , error ) {
if cf . findValue {
log . Debugf ( "[%s] starting an iterative Find for the value %s" , cf . node . id . HexShort ( ) , cf . target . HexShort ( ) )
2018-04-05 22:05:28 +02:00
} else {
2018-04-28 02:16:12 +02:00
log . Debugf ( "[%s] starting an iterative Find for contacts near %s" , cf . node . id . HexShort ( ) , cf . target . HexShort ( ) )
2018-04-05 22:05:28 +02:00
}
2018-04-28 02:16:12 +02:00
cf . appendNewToShortlist ( cf . node . rt . GetClosest ( cf . target , alpha ) )
if len ( cf . shortlist ) == 0 {
return findNodeResponse { } , errors . Err ( "no contacts in routing table" )
2018-04-03 20:00:35 +02:00
}
for i := 0 ; i < alpha ; i ++ {
2018-05-13 22:02:46 +02:00
cf . doneWG . Add ( 1 )
2018-04-03 20:00:35 +02:00
go func ( i int ) {
2018-05-13 22:02:46 +02:00
defer cf . doneWG . Done ( )
2018-04-28 02:16:12 +02:00
cf . iterationWorker ( i + 1 )
2018-04-03 20:00:35 +02:00
} ( i )
}
2018-05-13 22:02:46 +02:00
cf . doneWG . Wait ( )
2018-04-03 20:00:35 +02:00
2018-04-28 02:16:12 +02:00
// TODO: what to do if we have less than K active contacts, shortlist is empty, but we
// TODO: have other contacts in our routing table whom we have not contacted. prolly contact them
2018-04-03 20:00:35 +02:00
result := findNodeResponse { }
2018-04-28 02:16:12 +02:00
if cf . findValue && len ( cf . findValueResult ) > 0 {
2018-04-03 20:00:35 +02:00
result . Found = true
2018-04-28 02:16:12 +02:00
result . Contacts = cf . findValueResult
2018-04-03 20:00:35 +02:00
} else {
2018-04-28 02:16:12 +02:00
result . Contacts = cf . activeContacts
if len ( result . Contacts ) > bucketSize {
result . Contacts = result . Contacts [ : bucketSize ]
2018-04-03 20:00:35 +02:00
}
}
return result , nil
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) iterationWorker ( num int ) {
log . Debugf ( "[%s] starting worker %d" , cf . node . id . HexShort ( ) , num )
defer func ( ) { log . Debugf ( "[%s] stopping worker %d" , cf . node . id . HexShort ( ) , num ) } ( )
2018-04-03 20:00:35 +02:00
for {
2018-04-28 02:16:12 +02:00
maybeContact := cf . popFromShortlist ( )
if maybeContact == nil {
2018-04-03 20:00:35 +02:00
// TODO: block if there are pending requests out from other workers. there may be more shortlist values coming
2018-04-28 02:16:12 +02:00
log . Debugf ( "[%s] worker %d: no contacts in shortlist, waiting..." , cf . node . id . HexShort ( ) , num )
2018-04-05 22:05:28 +02:00
time . Sleep ( 100 * time . Millisecond )
2018-04-03 20:00:35 +02:00
} else {
2018-04-28 02:16:12 +02:00
contact := * maybeContact
2018-04-05 17:35:57 +02:00
2018-04-28 02:16:12 +02:00
if contact . id . Equals ( cf . node . id ) {
2018-04-05 17:35:57 +02:00
continue // cannot contact self
}
2018-04-28 02:16:12 +02:00
req := Request { Arg : & cf . target }
if cf . findValue {
2018-04-05 17:35:57 +02:00
req . Method = findValueMethod
} else {
req . Method = findNodeMethod
}
2018-04-28 02:16:12 +02:00
log . Debugf ( "[%s] worker %d: contacting %s" , cf . node . id . HexShort ( ) , num , contact . id . HexShort ( ) )
2018-04-05 22:05:28 +02:00
2018-04-28 02:16:12 +02:00
cf . incrementOutstanding ( )
2018-04-05 17:35:57 +02:00
var res * Response
ctx , cancel := context . WithCancel ( context . Background ( ) )
2018-04-28 02:16:12 +02:00
resCh := cf . node . SendAsync ( ctx , contact , req )
2018-04-05 17:35:57 +02:00
select {
case res = <- resCh :
2018-04-28 02:16:12 +02:00
case <- cf . done . Chan ( ) :
log . Debugf ( "[%s] worker %d: canceled" , cf . node . id . HexShort ( ) , num )
2018-04-05 17:35:57 +02:00
cancel ( )
return
}
if res == nil {
// nothing to do, response timed out
2018-05-13 22:02:46 +02:00
log . Debugf ( "[%s] worker %d: search canceled or timed out waiting for %s" , cf . node . id . HexShort ( ) , num , contact . id . HexShort ( ) )
2018-04-28 02:16:12 +02:00
} else if cf . findValue && res . FindValueKey != "" {
log . Debugf ( "[%s] worker %d: got value" , cf . node . id . HexShort ( ) , num )
cf . findValueMutex . Lock ( )
cf . findValueResult = res . Contacts
cf . findValueMutex . Unlock ( )
cf . done . Stop ( )
2018-04-05 17:35:57 +02:00
return
} else {
2018-04-28 02:16:12 +02:00
log . Debugf ( "[%s] worker %d: got contacts" , cf . node . id . HexShort ( ) , num )
cf . insertIntoActiveList ( contact )
cf . appendNewToShortlist ( res . Contacts )
2018-04-05 17:35:57 +02:00
}
2018-04-05 22:05:28 +02:00
2018-04-28 02:16:12 +02:00
cf . decrementOutstanding ( ) // this is all the way down here because we need to add to shortlist first
2018-04-03 20:00:35 +02:00
}
2018-04-28 02:16:12 +02:00
if cf . isSearchFinished ( ) {
log . Debugf ( "[%s] worker %d: search is finished" , cf . node . id . HexShort ( ) , num )
cf . done . Stop ( )
2018-04-03 20:00:35 +02:00
return
}
}
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) appendNewToShortlist ( contacts [ ] Contact ) {
cf . shortlistMutex . Lock ( )
defer cf . shortlistMutex . Unlock ( )
2018-04-03 20:00:35 +02:00
2018-04-28 02:16:12 +02:00
for _ , c := range contacts {
if _ , ok := cf . shortlistAdded [ c . id ] ; ! ok {
cf . shortlist = append ( cf . shortlist , c )
cf . shortlistAdded [ c . id ] = true
2018-04-03 20:00:35 +02:00
}
}
2018-04-28 02:16:12 +02:00
sortInPlace ( cf . shortlist , cf . target )
2018-04-03 20:00:35 +02:00
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) popFromShortlist ( ) * Contact {
cf . shortlistMutex . Lock ( )
defer cf . shortlistMutex . Unlock ( )
2018-04-03 20:00:35 +02:00
2018-04-28 02:16:12 +02:00
if len ( cf . shortlist ) == 0 {
2018-04-03 20:00:35 +02:00
return nil
}
2018-04-28 02:16:12 +02:00
first := cf . shortlist [ 0 ]
cf . shortlist = cf . shortlist [ 1 : ]
2018-04-03 20:00:35 +02:00
return & first
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) insertIntoActiveList ( contact Contact ) {
cf . activeContactsMutex . Lock ( )
defer cf . activeContactsMutex . Unlock ( )
2018-04-03 20:00:35 +02:00
inserted := false
2018-04-28 02:16:12 +02:00
for i , n := range cf . activeContacts {
if contact . id . Xor ( cf . target ) . Less ( n . id . Xor ( cf . target ) ) {
cf . activeContacts = append ( cf . activeContacts [ : i ] , append ( [ ] Contact { contact } , cf . activeContacts [ i : ] ... ) ... )
2018-04-03 20:00:35 +02:00
inserted = true
2018-04-05 17:35:57 +02:00
break
2018-04-03 20:00:35 +02:00
}
}
if ! inserted {
2018-04-28 02:16:12 +02:00
cf . activeContacts = append ( cf . activeContacts , contact )
2018-04-03 20:00:35 +02:00
}
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) isSearchFinished ( ) bool {
if cf . findValue && len ( cf . findValueResult ) > 0 {
2018-04-03 20:00:35 +02:00
return true
}
select {
2018-04-28 02:16:12 +02:00
case <- cf . done . Chan ( ) :
2018-04-03 20:00:35 +02:00
return true
default :
}
2018-04-28 02:16:12 +02:00
if ! cf . areRequestsOutstanding ( ) {
cf . shortlistMutex . Lock ( )
defer cf . shortlistMutex . Unlock ( )
2018-04-03 20:00:35 +02:00
2018-04-28 02:16:12 +02:00
if len ( cf . shortlist ) == 0 {
2018-04-05 22:05:28 +02:00
return true
}
2018-04-03 20:00:35 +02:00
2018-04-28 02:16:12 +02:00
cf . activeContactsMutex . Lock ( )
defer cf . activeContactsMutex . Unlock ( )
2018-04-03 20:00:35 +02:00
2018-04-28 02:16:12 +02:00
if len ( cf . activeContacts ) >= bucketSize && cf . activeContacts [ bucketSize - 1 ] . id . Xor ( cf . target ) . Less ( cf . shortlist [ 0 ] . id . Xor ( cf . target ) ) {
// we have at least K active contacts, and we don't have any closer contacts to ping
2018-04-05 22:05:28 +02:00
return true
}
2018-04-03 20:00:35 +02:00
}
return false
}
2018-04-05 22:05:28 +02:00
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) incrementOutstanding ( ) {
cf . outstandingRequestsMutex . Lock ( )
defer cf . outstandingRequestsMutex . Unlock ( )
cf . outstandingRequests ++
2018-04-05 22:05:28 +02:00
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) decrementOutstanding ( ) {
cf . outstandingRequestsMutex . Lock ( )
defer cf . outstandingRequestsMutex . Unlock ( )
if cf . outstandingRequests > 0 {
cf . outstandingRequests --
2018-04-05 22:05:28 +02:00
}
}
2018-04-28 02:16:12 +02:00
func ( cf * contactFinder ) areRequestsOutstanding ( ) bool {
cf . outstandingRequestsMutex . RLock ( )
defer cf . outstandingRequestsMutex . RUnlock ( )
return cf . outstandingRequests > 0
}
func sortInPlace ( contacts [ ] Contact , target Bitmap ) {
toSort := make ( [ ] sortedContact , len ( contacts ) )
for i , n := range contacts {
toSort [ i ] = sortedContact { n , n . id . Xor ( target ) }
}
sort . Sort ( byXorDistance ( toSort ) )
for i , c := range toSort {
contacts [ i ] = c . contact
}
2018-04-05 22:05:28 +02:00
}