From 5bb275afaae1421e5286c72398da596f3a74acba Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 3 Apr 2018 14:00:35 -0400 Subject: [PATCH] node finder is its own thing. simplify exported dht api --- dht/dht.go | 239 ++--------------------- dht/node_finder.go | 217 ++++++++++++++++++++ dht/{dht_test.go => node_finder_test.go} | 13 +- 3 files changed, 243 insertions(+), 226 deletions(-) create mode 100644 dht/node_finder.go rename dht/{dht_test.go => node_finder_test.go} (88%) diff --git a/dht/dht.go b/dht/dht.go index 63cc1ad..3d13722 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -1,7 +1,6 @@ package dht import ( - "context" "net" "sync" "time" @@ -135,8 +134,6 @@ func New(config *Config) (*DHT, error) { // init initializes global variables. func (dht *DHT) init() error { - log.Debugf("Initializing DHT on %s (node id %s)", dht.conf.Address, dht.node.id.HexShort()) - listener, err := net.ListenPacket(network, dht.conf.Address) if err != nil { return errors.Err(err) @@ -203,7 +200,7 @@ func (dht *DHT) join() { } // now call iterativeFind on yourself - _, _, err := dht.Get(dht.node.id) + _, err := dht.Get(dht.node.id) if err != nil { log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) } @@ -237,7 +234,7 @@ func (dht *DHT) Start() { go dht.runHandler() dht.join() - log.Infof("[%s] DHT ready", dht.node.id.HexShort()) + log.Infof("[%s] DHT ready on %s", dht.node.id.HexShort(), dht.node.Addr().String()) } // Shutdown shuts down the dht @@ -249,26 +246,21 @@ func (dht *DHT) Shutdown() { log.Infof("[%s] DHT stopped", dht.node.id.HexShort()) } -func printState(dht *DHT) { - t := time.NewTicker(60 * time.Second) - for { - log.Printf("DHT state at %s", time.Now().Format(time.RFC822Z)) - log.Printf("Outstanding transactions: %d", dht.tm.Count()) - log.Printf("Known nodes: %d", dht.store.CountKnownNodes()) - log.Printf("Buckets: \n%s", dht.rt.BucketInfo()) - <-t.C - } -} - -func (dht *DHT) Get(hash bitmap) ([]Node, bool, error) { +// Get returns the list of nodes that have the blob for the given hash +func (dht *DHT) Get(hash bitmap) ([]Node, error) { nf := newNodeFinder(dht, hash, true) res, err := nf.Find() if err != nil { - return nil, false, err + return nil, err } - return res.Nodes, res.Found, nil + + if res.Found { + return res.Nodes, nil + } + return nil, nil } +// Put announces to the DHT that this node has the blob for the given hash func (dht *DHT) Put(hash bitmap) error { nf := newNodeFinder(dht, hash, false) res, err := nf.Find() @@ -293,208 +285,13 @@ func (dht *DHT) Put(hash bitmap) error { return nil } -type nodeFinder struct { - findValue bool // true if we're using findValue - target bitmap - dht *DHT - - done *stopOnce.Stopper - - findValueMutex *sync.Mutex - findValueResult []Node - - activeNodesMutex *sync.Mutex - activeNodes []Node - - shortlistContactedMutex *sync.Mutex - shortlist []Node - contacted map[bitmap]bool -} - -type findNodeResponse struct { - Found bool - Nodes []Node -} - -func newNodeFinder(dht *DHT, target bitmap, findValue bool) *nodeFinder { - return &nodeFinder{ - dht: dht, - target: target, - findValue: findValue, - findValueMutex: &sync.Mutex{}, - activeNodesMutex: &sync.Mutex{}, - shortlistContactedMutex: &sync.Mutex{}, - contacted: make(map[bitmap]bool), - done: stopOnce.New(), - } -} - -func (nf *nodeFinder) Find() (findNodeResponse, error) { - log.Debugf("[%s] starting an iterative Find() for %s (findValue is %t)", nf.dht.node.id.HexShort(), nf.target.HexShort(), nf.findValue) - nf.appendNewToShortlist(nf.dht.rt.GetClosest(nf.target, alpha)) - if len(nf.shortlist) == 0 { - return findNodeResponse{}, errors.Err("no nodes in routing table") - } - - wg := &sync.WaitGroup{} - - for i := 0; i < alpha; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - nf.iterationWorker(i + 1) - }(i) - } - - wg.Wait() - - // TODO: what to do if we have less than K active nodes, shortlist is empty, but we - // TODO: have other nodes in our routing table whom we have not contacted. prolly contact them? - - result := findNodeResponse{} - if nf.findValue && len(nf.findValueResult) > 0 { - result.Found = true - result.Nodes = nf.findValueResult - } else { - result.Nodes = nf.activeNodes - if len(result.Nodes) > bucketSize { - result.Nodes = result.Nodes[:bucketSize] - } - } - - return result, nil -} - -func (nf *nodeFinder) iterationWorker(num int) { - log.Debugf("[%s] starting worker %d", nf.dht.node.id.HexShort(), num) - defer func() { log.Debugf("[%s] stopping worker %d", nf.dht.node.id.HexShort(), num) }() - +func printState(dht *DHT) { + t := time.NewTicker(60 * time.Second) for { - maybeNode := nf.popFromShortlist() - if maybeNode == nil { - // TODO: block if there are pending requests out from other workers. there may be more shortlist values coming - log.Debugf("[%s] no more nodes in shortlist", nf.dht.node.id.HexShort()) - return - } - node := *maybeNode - - if node.id.Equals(nf.dht.node.id) { - continue // cannot contact self - } - - req := &Request{Args: []string{nf.target.RawString()}} - if nf.findValue { - req.Method = findValueMethod - } else { - req.Method = findNodeMethod - } - - log.Debugf("[%s] contacting %s", nf.dht.node.id.HexShort(), node.id.HexShort()) - - var res *Response - ctx, cancel := context.WithCancel(context.Background()) - resCh := nf.dht.tm.SendAsync(ctx, node, req) - select { - case res = <-resCh: - case <-nf.done.Chan(): - log.Debugf("[%s] worker %d: canceled", nf.dht.node.id.HexShort(), num) - cancel() - return - } - - if res == nil { - // nothing to do, response timed out - } else if nf.findValue && res.FindValueKey != "" { - log.Debugf("[%s] worker %d: got value", nf.dht.node.id.HexShort(), num) - nf.findValueMutex.Lock() - nf.findValueResult = res.FindNodeData - nf.findValueMutex.Unlock() - nf.done.Stop() - return - } else { - log.Debugf("[%s] worker %d: got more contacts", nf.dht.node.id.HexShort(), num) - nf.insertIntoActiveList(node) - nf.appendNewToShortlist(res.FindNodeData) - } - - if nf.isSearchFinished() { - log.Debugf("[%s] worker %d: search is finished", nf.dht.node.id.HexShort(), num) - nf.done.Stop() - return - } + log.Printf("DHT state at %s", time.Now().Format(time.RFC822Z)) + log.Printf("Outstanding transactions: %d", dht.tm.Count()) + log.Printf("Known nodes: %d", dht.store.CountKnownNodes()) + log.Printf("Buckets: \n%s", dht.rt.BucketInfo()) + <-t.C } } - -func (nf *nodeFinder) appendNewToShortlist(nodes []Node) { - nf.shortlistContactedMutex.Lock() - defer nf.shortlistContactedMutex.Unlock() - - notContacted := []Node{} - for _, n := range nodes { - if _, ok := nf.contacted[n.id]; !ok { - notContacted = append(notContacted, n) - } - } - - nf.shortlist = append(nf.shortlist, notContacted...) - sortNodesInPlace(nf.shortlist, nf.target) -} - -func (nf *nodeFinder) popFromShortlist() *Node { - nf.shortlistContactedMutex.Lock() - defer nf.shortlistContactedMutex.Unlock() - - if len(nf.shortlist) == 0 { - return nil - } - - first := nf.shortlist[0] - nf.shortlist = nf.shortlist[1:] - nf.contacted[first.id] = true - return &first -} - -func (nf *nodeFinder) insertIntoActiveList(node Node) { - nf.activeNodesMutex.Lock() - defer nf.activeNodesMutex.Unlock() - - inserted := false - for i, n := range nf.activeNodes { - if node.id.Xor(nf.target).Less(n.id.Xor(nf.target)) { - nf.activeNodes = append(nf.activeNodes[:i], append([]Node{node}, nf.activeNodes[i:]...)...) - inserted = true - } - } - if !inserted { - nf.activeNodes = append(nf.activeNodes, node) - } -} - -func (nf *nodeFinder) isSearchFinished() bool { - if nf.findValue && len(nf.findValueResult) > 0 { - return true - } - - select { - case <-nf.done.Chan(): - return true - default: - } - - nf.shortlistContactedMutex.Lock() - defer nf.shortlistContactedMutex.Unlock() - - if len(nf.shortlist) == 0 { - return true - } - - nf.activeNodesMutex.Lock() - defer nf.activeNodesMutex.Unlock() - - if len(nf.activeNodes) >= bucketSize && nf.activeNodes[bucketSize-1].id.Xor(nf.target).Less(nf.shortlist[0].id.Xor(nf.target)) { - // we have at least K active nodes, and we don't have any closer nodes yet to contact - return true - } - - return false -} diff --git a/dht/node_finder.go b/dht/node_finder.go new file mode 100644 index 0000000..14cfbc4 --- /dev/null +++ b/dht/node_finder.go @@ -0,0 +1,217 @@ +package dht + +import ( + "context" + "sync" + + "github.com/lbryio/errors.go" + "github.com/lbryio/lbry.go/stopOnce" + + log "github.com/sirupsen/logrus" +) + +type nodeFinder struct { + findValue bool // true if we're using findValue + target bitmap + dht *DHT + + done *stopOnce.Stopper + + findValueMutex *sync.Mutex + findValueResult []Node + + activeNodesMutex *sync.Mutex + activeNodes []Node + + shortlistContactedMutex *sync.Mutex + shortlist []Node + contacted map[bitmap]bool +} + +type findNodeResponse struct { + Found bool + Nodes []Node +} + +func newNodeFinder(dht *DHT, target bitmap, findValue bool) *nodeFinder { + return &nodeFinder{ + dht: dht, + target: target, + findValue: findValue, + findValueMutex: &sync.Mutex{}, + activeNodesMutex: &sync.Mutex{}, + shortlistContactedMutex: &sync.Mutex{}, + contacted: make(map[bitmap]bool), + done: stopOnce.New(), + } +} + +func (nf *nodeFinder) Find() (findNodeResponse, error) { + log.Debugf("[%s] starting an iterative Find() for %s (findValue is %t)", nf.dht.node.id.HexShort(), nf.target.HexShort(), nf.findValue) + nf.appendNewToShortlist(nf.dht.rt.GetClosest(nf.target, alpha)) + if len(nf.shortlist) == 0 { + return findNodeResponse{}, errors.Err("no nodes in routing table") + } + + wg := &sync.WaitGroup{} + + for i := 0; i < alpha; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + nf.iterationWorker(i + 1) + }(i) + } + + wg.Wait() + + // TODO: what to do if we have less than K active nodes, shortlist is empty, but we + // TODO: have other nodes in our routing table whom we have not contacted. prolly contact them? + + result := findNodeResponse{} + if nf.findValue && len(nf.findValueResult) > 0 { + result.Found = true + result.Nodes = nf.findValueResult + } else { + result.Nodes = nf.activeNodes + if len(result.Nodes) > bucketSize { + result.Nodes = result.Nodes[:bucketSize] + } + } + + return result, nil +} + +func (nf *nodeFinder) iterationWorker(num int) { + log.Debugf("[%s] starting worker %d", nf.dht.node.id.HexShort(), num) + defer func() { log.Debugf("[%s] stopping worker %d", nf.dht.node.id.HexShort(), num) }() + + for { + maybeNode := nf.popFromShortlist() + if maybeNode == nil { + // TODO: block if there are pending requests out from other workers. there may be more shortlist values coming + log.Debugf("[%s] no more nodes in shortlist", nf.dht.node.id.HexShort()) + return + } + node := *maybeNode + + if node.id.Equals(nf.dht.node.id) { + continue // cannot contact self + } + + req := &Request{Args: []string{nf.target.RawString()}} + if nf.findValue { + req.Method = findValueMethod + } else { + req.Method = findNodeMethod + } + + log.Debugf("[%s] contacting %s", nf.dht.node.id.HexShort(), node.id.HexShort()) + + var res *Response + ctx, cancel := context.WithCancel(context.Background()) + resCh := nf.dht.tm.SendAsync(ctx, node, req) + select { + case res = <-resCh: + case <-nf.done.Chan(): + log.Debugf("[%s] worker %d: canceled", nf.dht.node.id.HexShort(), num) + cancel() + return + } + + if res == nil { + // nothing to do, response timed out + } else if nf.findValue && res.FindValueKey != "" { + log.Debugf("[%s] worker %d: got value", nf.dht.node.id.HexShort(), num) + nf.findValueMutex.Lock() + nf.findValueResult = res.FindNodeData + nf.findValueMutex.Unlock() + nf.done.Stop() + return + } else { + log.Debugf("[%s] worker %d: got more contacts", nf.dht.node.id.HexShort(), num) + nf.insertIntoActiveList(node) + nf.appendNewToShortlist(res.FindNodeData) + } + + if nf.isSearchFinished() { + log.Debugf("[%s] worker %d: search is finished", nf.dht.node.id.HexShort(), num) + nf.done.Stop() + return + } + } +} + +func (nf *nodeFinder) appendNewToShortlist(nodes []Node) { + nf.shortlistContactedMutex.Lock() + defer nf.shortlistContactedMutex.Unlock() + + notContacted := []Node{} + for _, n := range nodes { + if _, ok := nf.contacted[n.id]; !ok { + notContacted = append(notContacted, n) + } + } + + nf.shortlist = append(nf.shortlist, notContacted...) + sortNodesInPlace(nf.shortlist, nf.target) +} + +func (nf *nodeFinder) popFromShortlist() *Node { + nf.shortlistContactedMutex.Lock() + defer nf.shortlistContactedMutex.Unlock() + + if len(nf.shortlist) == 0 { + return nil + } + + first := nf.shortlist[0] + nf.shortlist = nf.shortlist[1:] + nf.contacted[first.id] = true + return &first +} + +func (nf *nodeFinder) insertIntoActiveList(node Node) { + nf.activeNodesMutex.Lock() + defer nf.activeNodesMutex.Unlock() + + inserted := false + for i, n := range nf.activeNodes { + if node.id.Xor(nf.target).Less(n.id.Xor(nf.target)) { + nf.activeNodes = append(nf.activeNodes[:i], append([]Node{node}, nf.activeNodes[i:]...)...) + inserted = true + } + } + if !inserted { + nf.activeNodes = append(nf.activeNodes, node) + } +} + +func (nf *nodeFinder) isSearchFinished() bool { + if nf.findValue && len(nf.findValueResult) > 0 { + return true + } + + select { + case <-nf.done.Chan(): + return true + default: + } + + nf.shortlistContactedMutex.Lock() + defer nf.shortlistContactedMutex.Unlock() + + if len(nf.shortlist) == 0 { + return true + } + + nf.activeNodesMutex.Lock() + defer nf.activeNodesMutex.Unlock() + + if len(nf.activeNodes) >= bucketSize && nf.activeNodes[bucketSize-1].id.Xor(nf.target).Less(nf.shortlist[0].id.Xor(nf.target)) { + // we have at least K active nodes, and we don't have any closer nodes yet to contact + return true + } + + return false +} diff --git a/dht/dht_test.go b/dht/node_finder_test.go similarity index 88% rename from dht/dht_test.go rename to dht/node_finder_test.go index fd4c76c..f72fc99 100644 --- a/dht/dht_test.go +++ b/dht/node_finder_test.go @@ -6,7 +6,7 @@ import ( "time" ) -func TestDHT_FindNodes(t *testing.T) { +func TestNodeFinder_FindNodes(t *testing.T) { id1 := newRandomBitmap() id2 := newRandomBitmap() id3 := newRandomBitmap() @@ -40,11 +40,12 @@ func TestDHT_FindNodes(t *testing.T) { time.Sleep(1 * time.Second) // give dhts a chance to connect - foundNodes, found, err := dht3.Get(newRandomBitmap()) - + nf := newNodeFinder(dht3, newRandomBitmap(), false) + res, err := nf.Find() if err != nil { t.Fatal(err) } + foundNodes, found := res.Nodes, res.Found if found { t.Fatal("something was found, but it should not have been") @@ -74,7 +75,7 @@ func TestDHT_FindNodes(t *testing.T) { } } -func TestDHT_Get(t *testing.T) { +func TestNodeFinder_FindValue(t *testing.T) { id1 := newRandomBitmap() id2 := newRandomBitmap() id3 := newRandomBitmap() @@ -111,10 +112,12 @@ func TestDHT_Get(t *testing.T) { nodeToFind := Node{id: newRandomBitmap(), ip: net.IPv4(1, 2, 3, 4), port: 5678} dht1.store.Upsert(nodeToFind.id.RawString(), nodeToFind) - foundNodes, found, err := dht3.Get(nodeToFind.id) + nf := newNodeFinder(dht3, nodeToFind.id, true) + res, err := nf.Find() if err != nil { t.Fatal(err) } + foundNodes, found := res.Nodes, res.Found if !found { t.Fatal("node was not found")