lbry.go/dht/bootstrap.go

213 lines
4.6 KiB
Go
Raw Normal View History

2018-04-28 02:16:12 +02:00
package dht
2018-05-13 22:02:46 +02:00
import (
"math/rand"
"net"
"sync"
"time"
2018-06-14 17:48:02 +02:00
"github.com/lbryio/reflector.go/dht/bits"
2018-05-13 22:02:46 +02:00
)
const (
bootstrapDefaultRefreshDuration = 15 * time.Minute
)
// BootstrapNode is a configured node setup for testing.
2018-04-28 02:16:12 +02:00
type BootstrapNode struct {
2018-05-13 22:02:46 +02:00
Node
initialPingInterval time.Duration
checkInterval time.Duration
nlock *sync.RWMutex
2018-07-12 20:34:24 +02:00
peers map[bits.Bitmap]*peer
nodeIDs []bits.Bitmap // necessary for efficient random ID selection
2018-05-13 22:02:46 +02:00
}
// NewBootstrapNode returns a BootstrapNode pointer.
2018-06-14 17:48:02 +02:00
func NewBootstrapNode(id bits.Bitmap, initialPingInterval, rePingInterval time.Duration) *BootstrapNode {
2018-05-13 22:02:46 +02:00
b := &BootstrapNode{
Node: *NewNode(id),
initialPingInterval: initialPingInterval,
checkInterval: rePingInterval,
nlock: &sync.RWMutex{},
2018-07-12 20:34:24 +02:00
peers: make(map[bits.Bitmap]*peer),
nodeIDs: make([]bits.Bitmap, 0),
2018-05-13 22:02:46 +02:00
}
b.requestHandler = b.handleRequest
return b
}
// Add manually adds a contact
func (b *BootstrapNode) Add(c Contact) {
b.upsert(c)
}
// Connect connects to the given connection and starts any background threads necessary
func (b *BootstrapNode) Connect(conn UDPConn) error {
err := b.Node.Connect(conn)
if err != nil {
return err
}
2018-08-07 17:00:04 +02:00
log.Infof("[%s] bootstrap: node connected", b.id.HexShort())
2018-05-13 22:02:46 +02:00
go func() {
t := time.NewTicker(b.checkInterval / 5)
for {
select {
case <-t.C:
b.check()
case <-b.grp.Ch():
2018-05-13 22:02:46 +02:00
return
}
}
}()
return nil
}
// upsert adds the contact to the list, or updates the lastPinged time
2018-05-13 22:02:46 +02:00
func (b *BootstrapNode) upsert(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
2018-07-12 20:34:24 +02:00
if peer, exists := b.peers[c.ID]; exists {
log.Debugf("[%s] bootstrap: touching contact %s", b.id.HexShort(), peer.Contact.ID.HexShort())
peer.Touch()
2018-05-13 22:02:46 +02:00
return
}
log.Debugf("[%s] bootstrap: adding new contact %s", b.id.HexShort(), c.ID.HexShort())
2018-07-12 20:34:24 +02:00
b.peers[c.ID] = &peer{c, b.id.Xor(c.ID), time.Now(), 0}
b.nodeIDs = append(b.nodeIDs, c.ID)
2018-05-13 22:02:46 +02:00
}
// remove removes the contact from the list
func (b *BootstrapNode) remove(c Contact) {
b.nlock.Lock()
defer b.nlock.Unlock()
2018-07-12 20:34:24 +02:00
_, exists := b.peers[c.ID]
2018-05-13 22:02:46 +02:00
if !exists {
return
}
log.Debugf("[%s] bootstrap: removing contact %s", b.id.HexShort(), c.ID.HexShort())
2018-07-12 20:34:24 +02:00
delete(b.peers, c.ID)
for i := range b.nodeIDs {
if b.nodeIDs[i].Equals(c.ID) {
b.nodeIDs = append(b.nodeIDs[:i], b.nodeIDs[i+1:]...)
break
}
}
2018-05-13 22:02:46 +02:00
}
// get returns up to `limit` random contacts from the list
func (b *BootstrapNode) get(limit int) []Contact {
b.nlock.RLock()
defer b.nlock.RUnlock()
2018-07-12 20:34:24 +02:00
if len(b.peers) < limit {
limit = len(b.peers)
2018-05-13 22:02:46 +02:00
}
ret := make([]Contact, limit)
for i, k := range randKeys(len(b.nodeIDs))[:limit] {
2018-07-12 20:34:24 +02:00
ret[i] = b.peers[b.nodeIDs[k]].Contact
2018-05-13 22:02:46 +02:00
}
return ret
}
// ping pings a node. if the node responds, it is added to the list. otherwise, it is removed
func (b *BootstrapNode) ping(c Contact) {
log.Debugf("[%s] bootstrap: pinging %s", b.id.HexShort(), c.ID.HexShort())
b.grp.Add(1)
defer b.grp.Done()
2018-05-13 22:02:46 +02:00
2018-06-25 21:56:45 +02:00
resCh := b.SendAsync(c, Request{Method: pingMethod})
2018-05-13 22:02:46 +02:00
var res *Response
select {
case res = <-resCh:
case <-b.grp.Ch():
2018-05-13 22:02:46 +02:00
return
}
if res != nil && res.Data == pingSuccessResponse {
b.upsert(c)
} else {
b.remove(c)
}
}
func (b *BootstrapNode) check() {
b.nlock.RLock()
defer b.nlock.RUnlock()
2018-07-12 20:34:24 +02:00
for i := range b.peers {
if !b.peers[i].ActiveInLast(b.checkInterval) {
go b.ping(b.peers[i].Contact)
2018-05-13 22:02:46 +02:00
}
}
}
// handleRequest handles the requests received from udp.
func (b *BootstrapNode) handleRequest(addr *net.UDPAddr, request Request) {
switch request.Method {
case pingMethod:
err := b.sendMessage(addr, Response{ID: request.ID, NodeID: b.id, Data: pingSuccessResponse})
if err != nil {
log.Error("error sending response message - ", err)
}
2018-05-13 22:02:46 +02:00
case findNodeMethod:
if request.Arg == nil {
log.Errorln("request is missing arg")
return
}
err := b.sendMessage(addr, Response{
2018-05-13 22:02:46 +02:00
ID: request.ID,
NodeID: b.id,
Contacts: b.get(bucketSize),
})
if err != nil {
log.Error("error sending 'findnodemethod' response message - ", err)
}
2018-05-13 22:02:46 +02:00
}
go func() {
b.nlock.RLock()
2018-07-12 20:34:24 +02:00
_, exists := b.peers[request.NodeID]
b.nlock.RUnlock()
if !exists {
log.Debugf("[%s] bootstrap: queuing %s to ping", b.id.HexShort(), request.NodeID.HexShort())
<-time.After(b.initialPingInterval)
b.nlock.RLock()
2018-07-12 20:34:24 +02:00
_, exists = b.peers[request.NodeID]
b.nlock.RUnlock()
if !exists {
b.ping(Contact{ID: request.NodeID, IP: addr.IP, Port: addr.Port})
}
}
2018-05-13 22:02:46 +02:00
}()
}
func randKeys(max int) []int {
keys := make([]int, max)
for k := range keys {
keys[k] = k
}
rand.Shuffle(max, func(i, j int) {
keys[i], keys[j] = keys[j], keys[i]
})
return keys
2018-04-28 02:16:12 +02:00
}