node finder is its own thing. simplify exported dht api

This commit is contained in:
Alex Grintsvayg 2018-04-03 14:00:35 -04:00
parent ea8d0d1eed
commit 5bb275afaa
3 changed files with 243 additions and 226 deletions

View file

@ -1,7 +1,6 @@
package dht
import (
"context"
"net"
"sync"
"time"
@ -135,8 +134,6 @@ func New(config *Config) (*DHT, error) {
// init initializes global variables.
func (dht *DHT) init() error {
log.Debugf("Initializing DHT on %s (node id %s)", dht.conf.Address, dht.node.id.HexShort())
listener, err := net.ListenPacket(network, dht.conf.Address)
if err != nil {
return errors.Err(err)
@ -203,7 +200,7 @@ func (dht *DHT) join() {
}
// now call iterativeFind on yourself
_, _, err := dht.Get(dht.node.id)
_, err := dht.Get(dht.node.id)
if err != nil {
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
}
@ -237,7 +234,7 @@ func (dht *DHT) Start() {
go dht.runHandler()
dht.join()
log.Infof("[%s] DHT ready", dht.node.id.HexShort())
log.Infof("[%s] DHT ready on %s", dht.node.id.HexShort(), dht.node.Addr().String())
}
// Shutdown shuts down the dht
@ -249,26 +246,21 @@ func (dht *DHT) Shutdown() {
log.Infof("[%s] DHT stopped", dht.node.id.HexShort())
}
func printState(dht *DHT) {
t := time.NewTicker(60 * time.Second)
for {
log.Printf("DHT state at %s", time.Now().Format(time.RFC822Z))
log.Printf("Outstanding transactions: %d", dht.tm.Count())
log.Printf("Known nodes: %d", dht.store.CountKnownNodes())
log.Printf("Buckets: \n%s", dht.rt.BucketInfo())
<-t.C
}
}
func (dht *DHT) Get(hash bitmap) ([]Node, bool, error) {
// Get returns the list of nodes that have the blob for the given hash
func (dht *DHT) Get(hash bitmap) ([]Node, error) {
nf := newNodeFinder(dht, hash, true)
res, err := nf.Find()
if err != nil {
return nil, false, err
return nil, err
}
return res.Nodes, res.Found, nil
if res.Found {
return res.Nodes, nil
}
return nil, nil
}
// Put announces to the DHT that this node has the blob for the given hash
func (dht *DHT) Put(hash bitmap) error {
nf := newNodeFinder(dht, hash, false)
res, err := nf.Find()
@ -293,208 +285,13 @@ func (dht *DHT) Put(hash bitmap) error {
return nil
}
type nodeFinder struct {
findValue bool // true if we're using findValue
target bitmap
dht *DHT
done *stopOnce.Stopper
findValueMutex *sync.Mutex
findValueResult []Node
activeNodesMutex *sync.Mutex
activeNodes []Node
shortlistContactedMutex *sync.Mutex
shortlist []Node
contacted map[bitmap]bool
}
type findNodeResponse struct {
Found bool
Nodes []Node
}
func newNodeFinder(dht *DHT, target bitmap, findValue bool) *nodeFinder {
return &nodeFinder{
dht: dht,
target: target,
findValue: findValue,
findValueMutex: &sync.Mutex{},
activeNodesMutex: &sync.Mutex{},
shortlistContactedMutex: &sync.Mutex{},
contacted: make(map[bitmap]bool),
done: stopOnce.New(),
}
}
func (nf *nodeFinder) Find() (findNodeResponse, error) {
log.Debugf("[%s] starting an iterative Find() for %s (findValue is %t)", nf.dht.node.id.HexShort(), nf.target.HexShort(), nf.findValue)
nf.appendNewToShortlist(nf.dht.rt.GetClosest(nf.target, alpha))
if len(nf.shortlist) == 0 {
return findNodeResponse{}, errors.Err("no nodes in routing table")
}
wg := &sync.WaitGroup{}
for i := 0; i < alpha; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
nf.iterationWorker(i + 1)
}(i)
}
wg.Wait()
// TODO: what to do if we have less than K active nodes, shortlist is empty, but we
// TODO: have other nodes in our routing table whom we have not contacted. prolly contact them?
result := findNodeResponse{}
if nf.findValue && len(nf.findValueResult) > 0 {
result.Found = true
result.Nodes = nf.findValueResult
} else {
result.Nodes = nf.activeNodes
if len(result.Nodes) > bucketSize {
result.Nodes = result.Nodes[:bucketSize]
}
}
return result, nil
}
func (nf *nodeFinder) iterationWorker(num int) {
log.Debugf("[%s] starting worker %d", nf.dht.node.id.HexShort(), num)
defer func() { log.Debugf("[%s] stopping worker %d", nf.dht.node.id.HexShort(), num) }()
func printState(dht *DHT) {
t := time.NewTicker(60 * time.Second)
for {
maybeNode := nf.popFromShortlist()
if maybeNode == nil {
// TODO: block if there are pending requests out from other workers. there may be more shortlist values coming
log.Debugf("[%s] no more nodes in shortlist", nf.dht.node.id.HexShort())
return
}
node := *maybeNode
if node.id.Equals(nf.dht.node.id) {
continue // cannot contact self
}
req := &Request{Args: []string{nf.target.RawString()}}
if nf.findValue {
req.Method = findValueMethod
} else {
req.Method = findNodeMethod
}
log.Debugf("[%s] contacting %s", nf.dht.node.id.HexShort(), node.id.HexShort())
var res *Response
ctx, cancel := context.WithCancel(context.Background())
resCh := nf.dht.tm.SendAsync(ctx, node, req)
select {
case res = <-resCh:
case <-nf.done.Chan():
log.Debugf("[%s] worker %d: canceled", nf.dht.node.id.HexShort(), num)
cancel()
return
}
if res == nil {
// nothing to do, response timed out
} else if nf.findValue && res.FindValueKey != "" {
log.Debugf("[%s] worker %d: got value", nf.dht.node.id.HexShort(), num)
nf.findValueMutex.Lock()
nf.findValueResult = res.FindNodeData
nf.findValueMutex.Unlock()
nf.done.Stop()
return
} else {
log.Debugf("[%s] worker %d: got more contacts", nf.dht.node.id.HexShort(), num)
nf.insertIntoActiveList(node)
nf.appendNewToShortlist(res.FindNodeData)
}
if nf.isSearchFinished() {
log.Debugf("[%s] worker %d: search is finished", nf.dht.node.id.HexShort(), num)
nf.done.Stop()
return
}
log.Printf("DHT state at %s", time.Now().Format(time.RFC822Z))
log.Printf("Outstanding transactions: %d", dht.tm.Count())
log.Printf("Known nodes: %d", dht.store.CountKnownNodes())
log.Printf("Buckets: \n%s", dht.rt.BucketInfo())
<-t.C
}
}
func (nf *nodeFinder) appendNewToShortlist(nodes []Node) {
nf.shortlistContactedMutex.Lock()
defer nf.shortlistContactedMutex.Unlock()
notContacted := []Node{}
for _, n := range nodes {
if _, ok := nf.contacted[n.id]; !ok {
notContacted = append(notContacted, n)
}
}
nf.shortlist = append(nf.shortlist, notContacted...)
sortNodesInPlace(nf.shortlist, nf.target)
}
func (nf *nodeFinder) popFromShortlist() *Node {
nf.shortlistContactedMutex.Lock()
defer nf.shortlistContactedMutex.Unlock()
if len(nf.shortlist) == 0 {
return nil
}
first := nf.shortlist[0]
nf.shortlist = nf.shortlist[1:]
nf.contacted[first.id] = true
return &first
}
func (nf *nodeFinder) insertIntoActiveList(node Node) {
nf.activeNodesMutex.Lock()
defer nf.activeNodesMutex.Unlock()
inserted := false
for i, n := range nf.activeNodes {
if node.id.Xor(nf.target).Less(n.id.Xor(nf.target)) {
nf.activeNodes = append(nf.activeNodes[:i], append([]Node{node}, nf.activeNodes[i:]...)...)
inserted = true
}
}
if !inserted {
nf.activeNodes = append(nf.activeNodes, node)
}
}
func (nf *nodeFinder) isSearchFinished() bool {
if nf.findValue && len(nf.findValueResult) > 0 {
return true
}
select {
case <-nf.done.Chan():
return true
default:
}
nf.shortlistContactedMutex.Lock()
defer nf.shortlistContactedMutex.Unlock()
if len(nf.shortlist) == 0 {
return true
}
nf.activeNodesMutex.Lock()
defer nf.activeNodesMutex.Unlock()
if len(nf.activeNodes) >= bucketSize && nf.activeNodes[bucketSize-1].id.Xor(nf.target).Less(nf.shortlist[0].id.Xor(nf.target)) {
// we have at least K active nodes, and we don't have any closer nodes yet to contact
return true
}
return false
}

217
dht/node_finder.go Normal file
View file

@ -0,0 +1,217 @@
package dht
import (
"context"
"sync"
"github.com/lbryio/errors.go"
"github.com/lbryio/lbry.go/stopOnce"
log "github.com/sirupsen/logrus"
)
type nodeFinder struct {
findValue bool // true if we're using findValue
target bitmap
dht *DHT
done *stopOnce.Stopper
findValueMutex *sync.Mutex
findValueResult []Node
activeNodesMutex *sync.Mutex
activeNodes []Node
shortlistContactedMutex *sync.Mutex
shortlist []Node
contacted map[bitmap]bool
}
type findNodeResponse struct {
Found bool
Nodes []Node
}
func newNodeFinder(dht *DHT, target bitmap, findValue bool) *nodeFinder {
return &nodeFinder{
dht: dht,
target: target,
findValue: findValue,
findValueMutex: &sync.Mutex{},
activeNodesMutex: &sync.Mutex{},
shortlistContactedMutex: &sync.Mutex{},
contacted: make(map[bitmap]bool),
done: stopOnce.New(),
}
}
func (nf *nodeFinder) Find() (findNodeResponse, error) {
log.Debugf("[%s] starting an iterative Find() for %s (findValue is %t)", nf.dht.node.id.HexShort(), nf.target.HexShort(), nf.findValue)
nf.appendNewToShortlist(nf.dht.rt.GetClosest(nf.target, alpha))
if len(nf.shortlist) == 0 {
return findNodeResponse{}, errors.Err("no nodes in routing table")
}
wg := &sync.WaitGroup{}
for i := 0; i < alpha; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
nf.iterationWorker(i + 1)
}(i)
}
wg.Wait()
// TODO: what to do if we have less than K active nodes, shortlist is empty, but we
// TODO: have other nodes in our routing table whom we have not contacted. prolly contact them?
result := findNodeResponse{}
if nf.findValue && len(nf.findValueResult) > 0 {
result.Found = true
result.Nodes = nf.findValueResult
} else {
result.Nodes = nf.activeNodes
if len(result.Nodes) > bucketSize {
result.Nodes = result.Nodes[:bucketSize]
}
}
return result, nil
}
func (nf *nodeFinder) iterationWorker(num int) {
log.Debugf("[%s] starting worker %d", nf.dht.node.id.HexShort(), num)
defer func() { log.Debugf("[%s] stopping worker %d", nf.dht.node.id.HexShort(), num) }()
for {
maybeNode := nf.popFromShortlist()
if maybeNode == nil {
// TODO: block if there are pending requests out from other workers. there may be more shortlist values coming
log.Debugf("[%s] no more nodes in shortlist", nf.dht.node.id.HexShort())
return
}
node := *maybeNode
if node.id.Equals(nf.dht.node.id) {
continue // cannot contact self
}
req := &Request{Args: []string{nf.target.RawString()}}
if nf.findValue {
req.Method = findValueMethod
} else {
req.Method = findNodeMethod
}
log.Debugf("[%s] contacting %s", nf.dht.node.id.HexShort(), node.id.HexShort())
var res *Response
ctx, cancel := context.WithCancel(context.Background())
resCh := nf.dht.tm.SendAsync(ctx, node, req)
select {
case res = <-resCh:
case <-nf.done.Chan():
log.Debugf("[%s] worker %d: canceled", nf.dht.node.id.HexShort(), num)
cancel()
return
}
if res == nil {
// nothing to do, response timed out
} else if nf.findValue && res.FindValueKey != "" {
log.Debugf("[%s] worker %d: got value", nf.dht.node.id.HexShort(), num)
nf.findValueMutex.Lock()
nf.findValueResult = res.FindNodeData
nf.findValueMutex.Unlock()
nf.done.Stop()
return
} else {
log.Debugf("[%s] worker %d: got more contacts", nf.dht.node.id.HexShort(), num)
nf.insertIntoActiveList(node)
nf.appendNewToShortlist(res.FindNodeData)
}
if nf.isSearchFinished() {
log.Debugf("[%s] worker %d: search is finished", nf.dht.node.id.HexShort(), num)
nf.done.Stop()
return
}
}
}
func (nf *nodeFinder) appendNewToShortlist(nodes []Node) {
nf.shortlistContactedMutex.Lock()
defer nf.shortlistContactedMutex.Unlock()
notContacted := []Node{}
for _, n := range nodes {
if _, ok := nf.contacted[n.id]; !ok {
notContacted = append(notContacted, n)
}
}
nf.shortlist = append(nf.shortlist, notContacted...)
sortNodesInPlace(nf.shortlist, nf.target)
}
func (nf *nodeFinder) popFromShortlist() *Node {
nf.shortlistContactedMutex.Lock()
defer nf.shortlistContactedMutex.Unlock()
if len(nf.shortlist) == 0 {
return nil
}
first := nf.shortlist[0]
nf.shortlist = nf.shortlist[1:]
nf.contacted[first.id] = true
return &first
}
func (nf *nodeFinder) insertIntoActiveList(node Node) {
nf.activeNodesMutex.Lock()
defer nf.activeNodesMutex.Unlock()
inserted := false
for i, n := range nf.activeNodes {
if node.id.Xor(nf.target).Less(n.id.Xor(nf.target)) {
nf.activeNodes = append(nf.activeNodes[:i], append([]Node{node}, nf.activeNodes[i:]...)...)
inserted = true
}
}
if !inserted {
nf.activeNodes = append(nf.activeNodes, node)
}
}
func (nf *nodeFinder) isSearchFinished() bool {
if nf.findValue && len(nf.findValueResult) > 0 {
return true
}
select {
case <-nf.done.Chan():
return true
default:
}
nf.shortlistContactedMutex.Lock()
defer nf.shortlistContactedMutex.Unlock()
if len(nf.shortlist) == 0 {
return true
}
nf.activeNodesMutex.Lock()
defer nf.activeNodesMutex.Unlock()
if len(nf.activeNodes) >= bucketSize && nf.activeNodes[bucketSize-1].id.Xor(nf.target).Less(nf.shortlist[0].id.Xor(nf.target)) {
// we have at least K active nodes, and we don't have any closer nodes yet to contact
return true
}
return false
}

View file

@ -6,7 +6,7 @@ import (
"time"
)
func TestDHT_FindNodes(t *testing.T) {
func TestNodeFinder_FindNodes(t *testing.T) {
id1 := newRandomBitmap()
id2 := newRandomBitmap()
id3 := newRandomBitmap()
@ -40,11 +40,12 @@ func TestDHT_FindNodes(t *testing.T) {
time.Sleep(1 * time.Second) // give dhts a chance to connect
foundNodes, found, err := dht3.Get(newRandomBitmap())
nf := newNodeFinder(dht3, newRandomBitmap(), false)
res, err := nf.Find()
if err != nil {
t.Fatal(err)
}
foundNodes, found := res.Nodes, res.Found
if found {
t.Fatal("something was found, but it should not have been")
@ -74,7 +75,7 @@ func TestDHT_FindNodes(t *testing.T) {
}
}
func TestDHT_Get(t *testing.T) {
func TestNodeFinder_FindValue(t *testing.T) {
id1 := newRandomBitmap()
id2 := newRandomBitmap()
id3 := newRandomBitmap()
@ -111,10 +112,12 @@ func TestDHT_Get(t *testing.T) {
nodeToFind := Node{id: newRandomBitmap(), ip: net.IPv4(1, 2, 3, 4), port: 5678}
dht1.store.Upsert(nodeToFind.id.RawString(), nodeToFind)
foundNodes, found, err := dht3.Get(nodeToFind.id)
nf := newNodeFinder(dht3, nodeToFind.id, true)
res, err := nf.Find()
if err != nil {
t.Fatal(err)
}
foundNodes, found := res.Nodes, res.Found
if !found {
t.Fatal("node was not found")