-Added travis support -updated travis to analyze code beneath the root. -refactored upload.go to fix travis errors. -gocyclo should ignore test files. $GOFILES needed to be adjusted. -fix rows.Close() ignoring error. Created func to handle so defer can be used when needed also. -fixed ignored errors. -fixed unit test that was not passing correctly to anonymous function. -fixed govet error for passing param inside go func. -removed returned error, in favor of logging instead. -added error logging for ignored error. -fixed potential race conditions. -removed unused append -fixed time usage to align with go standards. -removed unused variables -made changes for code review. -code comments for exported functions. -Documented bitmap.go and insert into contact list. -Documented dht, message, bootstrap -Fixed comment typos -Documented message,node, routing_table, testing in DHT package. -Documented server, client, prism, server and shared in peer and reflector packages. -Documented the stores in Store package. -made defer adjustments inline and deleted the separate function. -adjusted method in upload to take the only parameter it requires.
273 lines
7.2 KiB
273 lines
7.2 KiB
package dht
import (
log "github.com/sirupsen/logrus"
// TODO: iterativeFindValue may be stopping early. if it gets a response with one peer, it should keep going because other nodes may know about more peers that have that blob
// TODO: or, it should try a tcp handshake with peers as it finds them, to make sure they are still online and have the blob
type contactFinder struct {
findValue bool // true if we're using findValue
target Bitmap
node *Node
stop *stopOnce.Stopper
findValueMutex *sync.Mutex
findValueResult []Contact
activeContactsMutex *sync.Mutex
activeContacts []Contact
shortlistMutex *sync.Mutex
shortlist []Contact
shortlistAdded map[Bitmap]bool
outstandingRequestsMutex *sync.RWMutex
outstandingRequests uint
type findNodeResponse struct {
Found bool
Contacts []Contact
func newContactFinder(node *Node, target Bitmap, findValue bool) *contactFinder {
return &contactFinder{
node: node,
target: target,
findValue: findValue,
findValueMutex: &sync.Mutex{},
activeContactsMutex: &sync.Mutex{},
shortlistMutex: &sync.Mutex{},
shortlistAdded: make(map[Bitmap]bool),
stop: stopOnce.New(),
outstandingRequestsMutex: &sync.RWMutex{},
func (cf *contactFinder) Cancel() {
func (cf *contactFinder) Find() (findNodeResponse, error) {
if cf.findValue {
log.Debugf("[%s] starting an iterative Find for the value %s", cf.node.id.HexShort(), cf.target.HexShort())
} else {
log.Debugf("[%s] starting an iterative Find for contacts near %s", cf.node.id.HexShort(), cf.target.HexShort())
cf.appendNewToShortlist(cf.node.rt.GetClosest(cf.target, alpha))
if len(cf.shortlist) == 0 {
return findNodeResponse{}, errors.Err("no contacts in routing table")
for i := 0; i < alpha; i++ {
go func(i int) {
defer cf.stop.Done()
cf.iterationWorker(i + 1)
// TODO: what to do if we have less than K active contacts, shortlist is empty, but we
// TODO: have other contacts in our routing table whom we have not contacted. prolly contact them
result := findNodeResponse{}
if cf.findValue && len(cf.findValueResult) > 0 {
result.Found = true
result.Contacts = cf.findValueResult
} else {
result.Contacts = cf.activeContacts
if len(result.Contacts) > bucketSize {
result.Contacts = result.Contacts[:bucketSize]
return result, nil
func (cf *contactFinder) iterationWorker(num int) {
log.Debugf("[%s] starting worker %d", cf.node.id.HexShort(), num)
defer func() { log.Debugf("[%s] stopping worker %d", cf.node.id.HexShort(), num) }()
for {
maybeContact := cf.popFromShortlist()
if maybeContact == nil {
// TODO: block if there are pending requests out from other workers. there may be more shortlist values coming
log.Debugf("[%s] worker %d: no contacts in shortlist, waiting...", cf.node.id.HexShort(), num)
time.Sleep(100 * time.Millisecond)
} else {
contact := *maybeContact
if contact.ID.Equals(cf.node.id) {
continue // cannot contact self
req := Request{Arg: &cf.target}
if cf.findValue {
req.Method = findValueMethod
} else {
req.Method = findNodeMethod
log.Debugf("[%s] worker %d: contacting %s", cf.node.id.HexShort(), num, contact.ID.HexShort())
var res *Response
resCh, cancel := cf.node.SendCancelable(contact, req)
select {
case res = <-resCh:
case <-cf.stop.Ch():
log.Debugf("[%s] worker %d: canceled", cf.node.id.HexShort(), num)
if res == nil {
// nothing to do, response timed out
log.Debugf("[%s] worker %d: search canceled or timed out waiting for %s", cf.node.id.HexShort(), num, contact.ID.HexShort())
} else if cf.findValue && res.FindValueKey != "" {
log.Debugf("[%s] worker %d: got value", cf.node.id.HexShort(), num)
cf.findValueResult = res.Contacts
} else {
log.Debugf("[%s] worker %d: got contacts", cf.node.id.HexShort(), num)
cf.decrementOutstanding() // this is all the way down here because we need to add to shortlist first
if cf.isSearchFinished() {
log.Debugf("[%s] worker %d: search is finished", cf.node.id.HexShort(), num)
func (cf *contactFinder) appendNewToShortlist(contacts []Contact) {
defer cf.shortlistMutex.Unlock()
for _, c := range contacts {
if _, ok := cf.shortlistAdded[c.ID]; !ok {
cf.shortlist = append(cf.shortlist, c)
cf.shortlistAdded[c.ID] = true
sortInPlace(cf.shortlist, cf.target)
func (cf *contactFinder) popFromShortlist() *Contact {
defer cf.shortlistMutex.Unlock()
if len(cf.shortlist) == 0 {
return nil
first := cf.shortlist[0]
cf.shortlist = cf.shortlist[1:]
return &first
func (cf *contactFinder) insertIntoActiveList(contact Contact) {
defer cf.activeContactsMutex.Unlock()
inserted := false
for i, n := range cf.activeContacts {
// 5000ft: insert contact into sorted active contacts list
// Detail: if diff between new contact id and the target id has fewer changes than the n contact from target
// it should be inserted in between the previous and current.
if contact.ID.Xor(cf.target).Less(n.ID.Xor(cf.target)) {
cf.activeContacts = append(cf.activeContacts[:i], append([]Contact{contact}, cf.activeContacts[i:]...)...)
inserted = true
if !inserted {
cf.activeContacts = append(cf.activeContacts, contact)
func (cf *contactFinder) isSearchFinished() bool {
if cf.findValue && len(cf.findValueResult) > 0 {
return true
select {
case <-cf.stop.Ch():
return true
if !cf.areRequestsOutstanding() {
defer cf.shortlistMutex.Unlock()
if len(cf.shortlist) == 0 {
return true
defer cf.activeContactsMutex.Unlock()
if len(cf.activeContacts) >= bucketSize && cf.activeContacts[bucketSize-1].ID.Xor(cf.target).Less(cf.shortlist[0].ID.Xor(cf.target)) {
// we have at least K active contacts, and we don't have any closer contacts to ping
return true
return false
func (cf *contactFinder) incrementOutstanding() {
defer cf.outstandingRequestsMutex.Unlock()
func (cf *contactFinder) decrementOutstanding() {
defer cf.outstandingRequestsMutex.Unlock()
if cf.outstandingRequests > 0 {
func (cf *contactFinder) areRequestsOutstanding() bool {
defer cf.outstandingRequestsMutex.RUnlock()
return cf.outstandingRequests > 0
func sortInPlace(contacts []Contact, target Bitmap) {
toSort := make([]sortedContact, len(contacts))
for i, n := range contacts {
toSort[i] = sortedContact{n, n.ID.Xor(target)}
for i, c := range toSort {
contacts[i] = c.contact