2018-04-28 02:16:12 +02:00
package dht
import (
"context"
"encoding/hex"
"net"
"strings"
"sync"
"time"
"github.com/lbryio/errors.go"
"github.com/lbryio/lbry.go/stopOnce"
"github.com/lbryio/lbry.go/util"
2018-06-14 17:48:02 +02:00
"github.com/lbryio/reflector.go/dht/bits"
2018-04-28 02:16:12 +02:00
"github.com/davecgh/go-spew/spew"
"github.com/lyoshenka/bencode"
log "github.com/sirupsen/logrus"
)
// packet represents the information receive from udp.
type packet struct {
data [ ] byte
raddr * net . UDPAddr
}
// UDPConn allows using a mocked connection to test sending/receiving data
2018-05-13 22:02:46 +02:00
// TODO: stop mocking this and use the real thing
2018-04-28 02:16:12 +02:00
type UDPConn interface {
ReadFromUDP ( [ ] byte ) ( int , * net . UDPAddr , error )
WriteToUDP ( [ ] byte , * net . UDPAddr ) ( int , error )
SetReadDeadline ( time . Time ) error
SetWriteDeadline ( time . Time ) error
Close ( ) error
}
2018-05-30 03:38:55 +02:00
// RequestHandlerFunc is exported handler for requests.
2018-05-13 22:02:46 +02:00
type RequestHandlerFunc func ( addr * net . UDPAddr , request Request )
2018-05-30 03:38:55 +02:00
// Node is a type representation of a node on the network.
2018-04-28 02:16:12 +02:00
type Node struct {
2018-05-01 22:18:38 +02:00
// the node's id
2018-06-14 17:48:02 +02:00
id bits . Bitmap
2018-04-28 02:16:12 +02:00
// UDP connection for sending and receiving data
conn UDPConn
2018-05-24 23:49:43 +02:00
// true if we've closed the connection on purpose
connClosed bool
2018-04-28 02:16:12 +02:00
// token manager
tokens * tokenManager
2018-05-01 22:18:38 +02:00
// map of outstanding transactions + mutex
2018-04-28 02:16:12 +02:00
txLock * sync . RWMutex
transactions map [ messageID ] * transaction
// routing table
2018-05-19 19:05:30 +02:00
rt * routingTable
2018-04-28 02:16:12 +02:00
// data store
2018-05-19 19:05:30 +02:00
store * contactStore
2018-05-13 22:02:46 +02:00
// overrides for request handlers
requestHandler RequestHandlerFunc
2018-04-28 02:16:12 +02:00
2018-05-13 22:02:46 +02:00
// stop the node neatly and clean up after itself
2018-05-24 23:49:43 +02:00
stop * stopOnce . Stopper
2018-04-28 02:16:12 +02:00
}
2018-05-30 03:38:55 +02:00
// NewNode returns an initialized Node's pointer.
2018-06-14 17:48:02 +02:00
func NewNode ( id bits . Bitmap ) * Node {
2018-05-13 22:02:46 +02:00
return & Node {
2018-04-28 02:16:12 +02:00
id : id ,
rt : newRoutingTable ( id ) ,
2018-05-13 22:02:46 +02:00
store : newStore ( ) ,
2018-04-28 02:16:12 +02:00
txLock : & sync . RWMutex { } ,
transactions : make ( map [ messageID ] * transaction ) ,
stop : stopOnce . New ( ) ,
tokens : & tokenManager { } ,
}
}
2018-05-13 22:02:46 +02:00
// Connect connects to the given connection and starts any background threads necessary
2018-04-28 02:16:12 +02:00
func ( n * Node ) Connect ( conn UDPConn ) error {
n . conn = conn
2018-05-13 22:02:46 +02:00
n . tokens . Start ( tokenSecretRotationInterval )
2018-05-24 23:49:43 +02:00
go func ( ) {
// stop tokens and close the connection when we're shutting down
<- n . stop . Ch ( )
n . tokens . Stop ( )
n . connClosed = true
2018-06-13 18:45:47 +02:00
err := n . conn . Close ( )
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error closing node connection on shutdown - " , err )
}
2018-05-24 23:49:43 +02:00
} ( )
2018-04-28 02:16:12 +02:00
packets := make ( chan packet )
2018-06-13 18:45:47 +02:00
2018-05-30 03:38:55 +02:00
n . stop . Add ( 1 )
2018-04-28 02:16:12 +02:00
go func ( ) {
2018-05-24 23:49:43 +02:00
defer n . stop . Done ( )
2018-04-28 02:16:12 +02:00
buf := make ( [ ] byte , udpMaxMessageLength )
for {
2018-05-14 03:17:29 +02:00
bytesRead , raddr , err := n . conn . ReadFromUDP ( buf )
2018-04-28 02:16:12 +02:00
if err != nil {
2018-05-24 23:49:43 +02:00
if n . connClosed {
return
2018-04-28 02:16:12 +02:00
}
2018-05-24 23:49:43 +02:00
log . Errorf ( "udp read error: %v" , err )
2018-04-28 02:16:12 +02:00
continue
} else if raddr == nil {
log . Errorf ( "udp read with no raddr" )
continue
}
2018-05-14 03:17:29 +02:00
data := make ( [ ] byte , bytesRead )
copy ( data , buf [ : bytesRead ] ) // slices use the same underlying array, so we need a new one for each packet
2018-04-28 02:16:12 +02:00
2018-05-14 03:17:29 +02:00
select { // needs select here because packet consumer can quit and the packets channel gets filled up and blocks
case packets <- packet { data : data , raddr : raddr } :
2018-05-24 19:05:05 +02:00
case <- n . stop . Ch ( ) :
2018-05-24 23:49:43 +02:00
return
2018-05-14 03:17:29 +02:00
}
2018-04-28 02:16:12 +02:00
}
} ( )
2018-06-13 18:45:47 +02:00
2018-05-30 03:38:55 +02:00
n . stop . Add ( 1 )
2018-04-28 02:16:12 +02:00
go func ( ) {
2018-05-24 23:49:43 +02:00
defer n . stop . Done ( )
2018-04-28 02:16:12 +02:00
var pkt packet
for {
select {
case pkt = <- packets :
n . handlePacket ( pkt )
2018-05-24 19:05:05 +02:00
case <- n . stop . Ch ( ) :
2018-04-28 02:16:12 +02:00
return
}
}
} ( )
2018-06-13 18:45:47 +02:00
n . stop . Add ( 1 )
go func ( ) {
defer n . stop . Done ( )
n . startRoutingTableGrooming ( )
} ( )
2018-05-13 22:02:46 +02:00
2018-04-28 02:16:12 +02:00
return nil
}
// Shutdown shuts down the node
func ( n * Node ) Shutdown ( ) {
log . Debugf ( "[%s] node shutting down" , n . id . HexShort ( ) )
2018-05-24 23:49:43 +02:00
n . stop . StopAndWait ( )
2018-04-28 02:16:12 +02:00
log . Debugf ( "[%s] node stopped" , n . id . HexShort ( ) )
}
// handlePacket handles packets received from udp.
func ( n * Node ) handlePacket ( pkt packet ) {
//log.Debugf("[%s] Received message from %s (%d bytes) %s", n.id.HexShort(), pkt.raddr.String(), len(pkt.data), hex.EncodeToString(pkt.data))
if ! util . InSlice ( string ( pkt . data [ 0 : 5 ] ) , [ ] string { "d1:0i" , "di0ei" } ) {
log . Errorf ( "[%s] data is not a well-formatted dict: (%d bytes) %s" , n . id . HexShort ( ) , len ( pkt . data ) , hex . EncodeToString ( pkt . data ) )
return
}
// the following is a bit of a hack, but it lets us avoid decoding every message twice
// it depends on the data being a dict with 0 as the first key (so it starts with "d1:0i") and the message type as the first value
2018-05-13 22:02:46 +02:00
// TODO: test this more thoroughly
2018-04-28 02:16:12 +02:00
switch pkt . data [ 5 ] {
case '0' + requestType :
request := Request { }
err := bencode . DecodeBytes ( pkt . data , & request )
if err != nil {
log . Errorf ( "[%s] error decoding request from %s: %s: (%d bytes) %s" , n . id . HexShort ( ) , pkt . raddr . String ( ) , err . Error ( ) , len ( pkt . data ) , hex . EncodeToString ( pkt . data ) )
return
}
2018-05-30 03:38:55 +02:00
log . Debugf ( "[%s] query %s: received request from %s: %s(%s)" , n . id . HexShort ( ) , request . ID . HexShort ( ) , request . NodeID . HexShort ( ) , request . Method , request . argsDebug ( ) )
2018-04-28 02:16:12 +02:00
n . handleRequest ( pkt . raddr , request )
case '0' + responseType :
response := Response { }
err := bencode . DecodeBytes ( pkt . data , & response )
if err != nil {
log . Errorf ( "[%s] error decoding response from %s: %s: (%d bytes) %s" , n . id . HexShort ( ) , pkt . raddr . String ( ) , err . Error ( ) , len ( pkt . data ) , hex . EncodeToString ( pkt . data ) )
return
}
2018-05-30 03:38:55 +02:00
log . Debugf ( "[%s] query %s: received response from %s: %s" , n . id . HexShort ( ) , response . ID . HexShort ( ) , response . NodeID . HexShort ( ) , response . argsDebug ( ) )
2018-04-28 02:16:12 +02:00
n . handleResponse ( pkt . raddr , response )
case '0' + errorType :
e := Error { }
err := bencode . DecodeBytes ( pkt . data , & e )
if err != nil {
log . Errorf ( "[%s] error decoding error from %s: %s: (%d bytes) %s" , n . id . HexShort ( ) , pkt . raddr . String ( ) , err . Error ( ) , len ( pkt . data ) , hex . EncodeToString ( pkt . data ) )
return
}
log . Debugf ( "[%s] query %s: received error from %s: %s" , n . id . HexShort ( ) , e . ID . HexShort ( ) , e . NodeID . HexShort ( ) , e . ExceptionType )
n . handleError ( pkt . raddr , e )
default :
log . Errorf ( "[%s] invalid message type: %s" , n . id . HexShort ( ) , pkt . data [ 5 ] )
return
}
}
// handleRequest handles the requests received from udp.
func ( n * Node ) handleRequest ( addr * net . UDPAddr , request Request ) {
if request . NodeID . Equals ( n . id ) {
log . Warn ( "ignoring self-request" )
return
}
2018-05-13 22:02:46 +02:00
// if a handler is overridden, call it instead
if n . requestHandler != nil {
n . requestHandler ( addr , request )
return
}
2018-04-28 02:16:12 +02:00
switch request . Method {
default :
2018-05-13 22:02:46 +02:00
//n.sendMessage(addr, Error{ID: request.ID, NodeID: n.id, ExceptionType: "invalid-request-method"})
2018-04-28 02:16:12 +02:00
log . Errorln ( "invalid request method" )
return
case pingMethod :
2018-06-15 04:30:37 +02:00
err := n . sendMessage ( addr , Response { ID : request . ID , NodeID : n . id , Data : pingSuccessResponse } )
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error sending 'pingmethod' response message - " , err )
}
2018-04-28 02:16:12 +02:00
case storeMethod :
// TODO: we should be sending the IP in the request, not just using the sender's IP
// TODO: should we be using StoreArgs.NodeID or StoreArgs.Value.LbryID ???
if n . tokens . Verify ( request . StoreArgs . Value . Token , request . NodeID , addr ) {
2018-05-22 18:27:49 +02:00
n . Store ( request . StoreArgs . BlobHash , Contact { ID : request . StoreArgs . NodeID , IP : addr . IP , Port : request . StoreArgs . Value . Port } )
2018-06-15 04:30:37 +02:00
err := n . sendMessage ( addr , Response { ID : request . ID , NodeID : n . id , Data : storeSuccessResponse } )
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error sending 'storemethod' response message - " , err )
}
2018-04-28 02:16:12 +02:00
} else {
2018-06-15 04:30:37 +02:00
err := n . sendMessage ( addr , Error { ID : request . ID , NodeID : n . id , ExceptionType : "invalid-token" } )
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error sending 'storemethod'response message for invalid-token - " , err )
}
2018-04-28 02:16:12 +02:00
}
case findNodeMethod :
if request . Arg == nil {
log . Errorln ( "request is missing arg" )
return
}
2018-06-15 04:30:37 +02:00
err := n . sendMessage ( addr , Response {
2018-04-28 02:16:12 +02:00
ID : request . ID ,
NodeID : n . id ,
Contacts : n . rt . GetClosest ( * request . Arg , bucketSize ) ,
2018-06-15 04:30:37 +02:00
} )
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error sending 'findnodemethod' response message - " , err )
}
2018-04-28 02:16:12 +02:00
case findValueMethod :
if request . Arg == nil {
log . Errorln ( "request is missing arg" )
return
}
res := Response {
ID : request . ID ,
NodeID : n . id ,
Token : n . tokens . Get ( request . NodeID , addr ) ,
}
if contacts := n . store . Get ( * request . Arg ) ; len ( contacts ) > 0 {
2018-06-19 19:47:13 +02:00
res . FindValueKey = request . Arg . RawString ( )
2018-04-28 02:16:12 +02:00
res . Contacts = contacts
} else {
res . Contacts = n . rt . GetClosest ( * request . Arg , bucketSize )
}
2018-06-15 04:30:37 +02:00
err := n . sendMessage ( addr , res )
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error sending 'findvaluemethod' response message - " , err )
}
2018-04-28 02:16:12 +02:00
}
// nodes that send us requests should not be inserted, only refreshed.
// the routing table must only contain "good" nodes, which are nodes that reply to our requests
// if a node is already good (aka in the table), its fine to refresh it
// http://www.bittorrent.org/beps/bep_0005.html#routing-table
2018-05-19 19:05:30 +02:00
n . rt . Fresh ( Contact { ID : request . NodeID , IP : addr . IP , Port : addr . Port } )
2018-04-28 02:16:12 +02:00
}
// handleResponse handles responses received from udp.
func ( n * Node ) handleResponse ( addr * net . UDPAddr , response Response ) {
2018-06-19 19:47:13 +02:00
tx := n . txFind ( response . ID , Contact { ID : response . NodeID , IP : addr . IP , Port : addr . Port } )
2018-04-28 02:16:12 +02:00
if tx != nil {
2018-06-21 17:26:48 +02:00
select {
case tx . res <- response :
default :
log . Errorf ( "[%s] query %s: response received but tx has no listener" , n . id . HexShort ( ) , response . ID . HexShort ( ) )
}
2018-04-28 02:16:12 +02:00
}
2018-05-19 19:05:30 +02:00
n . rt . Update ( Contact { ID : response . NodeID , IP : addr . IP , Port : addr . Port } )
2018-04-28 02:16:12 +02:00
}
// handleError handles errors received from udp.
func ( n * Node ) handleError ( addr * net . UDPAddr , e Error ) {
spew . Dump ( e )
2018-05-19 19:05:30 +02:00
n . rt . Fresh ( Contact { ID : e . NodeID , IP : addr . IP , Port : addr . Port } )
2018-04-28 02:16:12 +02:00
}
// send sends data to a udp address
2018-05-01 22:18:38 +02:00
func ( n * Node ) sendMessage ( addr * net . UDPAddr , data Message ) error {
2018-04-28 02:16:12 +02:00
encoded , err := bencode . EncodeBytes ( data )
if err != nil {
return errors . Err ( err )
}
if req , ok := data . ( Request ) ; ok {
log . Debugf ( "[%s] query %s: sending request to %s (%d bytes) %s(%s)" ,
2018-05-30 03:38:55 +02:00
n . id . HexShort ( ) , req . ID . HexShort ( ) , addr . String ( ) , len ( encoded ) , req . Method , req . argsDebug ( ) )
2018-04-28 02:16:12 +02:00
} else if res , ok := data . ( Response ) ; ok {
log . Debugf ( "[%s] query %s: sending response to %s (%d bytes) %s" ,
2018-05-30 03:38:55 +02:00
n . id . HexShort ( ) , res . ID . HexShort ( ) , addr . String ( ) , len ( encoded ) , res . argsDebug ( ) )
2018-04-28 02:16:12 +02:00
} else {
log . Debugf ( "[%s] (%d bytes) %s" , n . id . HexShort ( ) , len ( encoded ) , spew . Sdump ( data ) )
}
2018-06-15 04:30:37 +02:00
err = n . conn . SetWriteDeadline ( time . Now ( ) . Add ( 5 * time . Second ) )
if err != nil {
2018-05-30 03:38:55 +02:00
log . Error ( "error setting write deadline - " , err )
}
2018-04-28 02:16:12 +02:00
_ , err = n . conn . WriteToUDP ( encoded , addr )
return errors . Err ( err )
}
// transaction represents a single query to the dht. it stores the queried contact, the request, and the response channel
type transaction struct {
2018-06-19 19:47:13 +02:00
contact Contact
req Request
res chan Response
skipIDCheck bool
2018-04-28 02:16:12 +02:00
}
// insert adds a transaction to the manager.
func ( n * Node ) txInsert ( tx * transaction ) {
n . txLock . Lock ( )
defer n . txLock . Unlock ( )
n . transactions [ tx . req . ID ] = tx
}
// delete removes a transaction from the manager.
func ( n * Node ) txDelete ( id messageID ) {
n . txLock . Lock ( )
defer n . txLock . Unlock ( )
delete ( n . transactions , id )
}
2018-06-19 19:47:13 +02:00
// Find finds a transaction for the given id and contact
func ( n * Node ) txFind ( id messageID , c Contact ) * transaction {
2018-04-28 02:16:12 +02:00
n . txLock . RLock ( )
defer n . txLock . RUnlock ( )
t , ok := n . transactions [ id ]
2018-06-19 19:47:13 +02:00
if ! ok || ! t . contact . Equals ( c , ! t . skipIDCheck ) {
2018-04-28 02:16:12 +02:00
return nil
}
return t
}
2018-06-19 19:47:13 +02:00
// SendOptions controls the behavior of send calls
type SendOptions struct {
skipIDCheck bool
}
2018-04-28 02:16:12 +02:00
// SendAsync sends a transaction and returns a channel that will eventually contain the transaction response
// The response channel is closed when the transaction is completed or times out.
2018-06-19 19:47:13 +02:00
func ( n * Node ) SendAsync ( ctx context . Context , contact Contact , req Request , options ... SendOptions ) <- chan * Response {
2018-05-19 19:05:30 +02:00
if contact . ID . Equals ( n . id ) {
2018-04-28 02:16:12 +02:00
log . Error ( "sending query to self" )
return nil
}
ch := make ( chan * Response , 1 )
go func ( ) {
defer close ( ch )
req . ID = newMessageID ( )
req . NodeID = n . id
tx := & transaction {
contact : contact ,
req : req ,
res : make ( chan Response ) ,
}
2018-06-19 19:47:13 +02:00
if len ( options ) > 0 && options [ 0 ] . skipIDCheck {
tx . skipIDCheck = true
}
2018-04-28 02:16:12 +02:00
n . txInsert ( tx )
defer n . txDelete ( tx . req . ID )
for i := 0 ; i < udpRetry ; i ++ {
2018-06-15 04:30:37 +02:00
err := n . sendMessage ( contact . Addr ( ) , tx . req )
if err != nil {
2018-04-28 02:16:12 +02:00
if ! strings . Contains ( err . Error ( ) , "use of closed network connection" ) { // this only happens on localhost. real UDP has no connections
log . Error ( "send error: " , err )
}
2018-05-01 22:18:38 +02:00
continue
2018-04-28 02:16:12 +02:00
}
select {
case res := <- tx . res :
ch <- & res
return
case <- ctx . Done ( ) :
return
case <- time . After ( udpTimeout ) :
}
}
2018-05-13 22:02:46 +02:00
// notify routing table about a failure to respond
n . rt . Fail ( tx . contact )
2018-04-28 02:16:12 +02:00
} ( )
return ch
}
// Send sends a transaction and blocks until the response is available. It returns a response, or nil
// if the transaction timed out.
2018-06-19 19:47:13 +02:00
func ( n * Node ) Send ( contact Contact , req Request , options ... SendOptions ) * Response {
return <- n . SendAsync ( context . Background ( ) , contact , req , options ... )
2018-04-28 02:16:12 +02:00
}
2018-05-19 19:05:30 +02:00
// SendCancelable sends the transaction asynchronously and allows the transaction to be canceled
2018-06-19 19:47:13 +02:00
func ( n * Node ) SendCancelable ( contact Contact , req Request , options ... SendOptions ) ( <- chan * Response , context . CancelFunc ) {
2018-05-19 19:05:30 +02:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2018-06-19 19:47:13 +02:00
return n . SendAsync ( ctx , contact , req , options ... ) , cancel
2018-05-19 19:05:30 +02:00
}
2018-05-30 03:38:55 +02:00
// CountActiveTransactions returns the number of transactions in the manager
2018-04-28 02:16:12 +02:00
func ( n * Node ) CountActiveTransactions ( ) int {
n . txLock . Lock ( )
defer n . txLock . Unlock ( )
return len ( n . transactions )
}
2018-05-13 22:02:46 +02:00
func ( n * Node ) startRoutingTableGrooming ( ) {
2018-06-13 18:45:47 +02:00
refreshTicker := time . NewTicker ( tRefresh / 5 ) // how often to check for buckets that need to be refreshed
for {
select {
case <- refreshTicker . C :
RoutingTableRefresh ( n , tRefresh , n . stop . Ch ( ) )
case <- n . stop . Ch ( ) :
return
2018-05-13 22:02:46 +02:00
}
2018-06-13 18:45:47 +02:00
}
2018-05-13 22:02:46 +02:00
}
2018-05-22 18:27:49 +02:00
2018-05-30 03:38:55 +02:00
// Store stores a node contact in the node's contact store.
2018-06-14 17:48:02 +02:00
func ( n * Node ) Store ( hash bits . Bitmap , c Contact ) {
2018-05-22 18:27:49 +02:00
n . store . Upsert ( hash , c )
}