Merge branch 'dht-tests'

This commit is contained in:
Jack Robison 2017-10-11 13:06:43 -04:00
commit b95a89ec99
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
29 changed files with 957 additions and 650 deletions

View file

@ -19,6 +19,13 @@ at anytime.
* Fixed concurrent reflects opening too many files
* Fixed cases when reflecting would fail on error conditions
* Fixed deadlocks from occuring during blob writes
* Fixed and updated`lbrynet.tests.dht`
* Fixed redundant dht id
* Fixed dht `ping` method
* Fixed raising remote exceptions in dht
* Fixed hanging delayedCall in dht node class
* Fixed logging error in dht when calling or receiving methods with no arguments
* Fixed IndexError in routingTable.findCloseNodes which would cause an empty list to be returned
### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.

View file

@ -37,18 +37,18 @@ class Session(object):
"""
def __init__(self, blob_data_payment_rate, db_dir=None,
lbryid=None, peer_manager=None, dht_node_port=None,
node_id=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None,
hash_announcer=None, blob_dir=None,
blob_manager=None, peer_port=None, use_upnp=True,
rate_limiter=None, wallet=None,
dht_node_class=node.Node, blob_tracker_class=None,
payment_rate_manager_class=None, is_generous=True):
payment_rate_manager_class=None, is_generous=True, external_ip=None):
"""@param blob_data_payment_rate: The default payment rate for blob data
@param db_dir: The directory in which levelDB files should be stored
@param lbryid: The unique ID of this node
@param node_id: The unique ID of this node
@param peer_manager: An object which keeps track of all known
peers. If None, a PeerManager will be created
@ -101,7 +101,7 @@ class Session(object):
"""
self.db_dir = db_dir
self.lbryid = lbryid
self.node_id = node_id
self.peer_manager = peer_manager
@ -124,7 +124,7 @@ class Session(object):
self.rate_limiter = rate_limiter
self.external_ip = '127.0.0.1'
self.external_ip = external_ip
self.upnp_redirects = []
@ -142,8 +142,8 @@ class Session(object):
log.debug("Starting session.")
if self.lbryid is None:
self.lbryid = generate_id()
if self.node_id is None:
self.node_id = generate_id()
if self.wallet is None:
from lbrynet.core.PTCWallet import PTCWallet
@ -274,7 +274,7 @@ class Session(object):
self.dht_node = self.dht_node_class(
udpPort=self.dht_node_port,
lbryid=self.lbryid,
node_id=self.node_id,
externalIP=self.external_ip
)
self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager)

View file

@ -9,7 +9,7 @@ import json
import textwrap
import random
import signal
from copy import deepcopy
from twisted.web import server
from twisted.internet import defer, threads, error, reactor
from twisted.internet.task import LoopingCall
@ -206,7 +206,7 @@ class Daemon(AuthJSONRPCServer):
# of the daemon, but I don't want to deal with that now
self.analytics_manager = analytics_manager
self.lbryid = conf.settings.node_id
self.node_id = conf.settings.node_id
self.wallet_user = None
self.wallet_password = None
@ -562,7 +562,7 @@ class Daemon(AuthJSONRPCServer):
self.session = Session(
conf.settings['data_rate'],
db_dir=self.db_dir,
lbryid=self.lbryid,
node_id=self.node_id,
blob_dir=self.blobfile_dir,
dht_node_port=self.dht_node_port,
known_dht_nodes=conf.settings['known_dht_nodes'],
@ -1054,7 +1054,7 @@ class Daemon(AuthJSONRPCServer):
best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None
response = {
'lbry_id': base58.b58encode(self.lbryid),
'lbry_id': base58.b58encode(self.node_id),
'installation_id': conf.settings.installation_id,
'is_running': self.announced_startup,
'is_first_run': self.session.wallet.is_first_run if has_wallet else None,
@ -2658,6 +2658,78 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(lambda r: self._render_response(r))
return d
def jsonrpc_routing_table_get(self):
"""
Get DHT routing information
Usage:
routing_table_get
Returns:
(dict) dictionary containing routing and contact information
{
"buckets": {
<bucket index>: [
{
"address": (str) peer address,
"node_id": (str) peer node id,
"blobs": (list) blob hashes announced by peer
}
"contacts": (list) contact node ids,
"blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets
"node_id": (str) the local dht node id
"""
result = {}
data_store = deepcopy(self.session.dht_node._dataStore._dict)
datastore_len = len(data_store)
hosts = {}
if datastore_len:
for k, v in data_store.iteritems():
for value, lastPublished, originallyPublished, originalPublisherID in v:
try:
contact = self.session.dht_node._routingTable.getContact(
originalPublisherID)
except ValueError:
continue
if contact in hosts:
blobs = hosts[contact]
else:
blobs = []
blobs.append(k.encode('hex'))
hosts[contact] = blobs
contact_set = []
blob_hashes = []
result['buckets'] = {}
for i in range(len(self.session.dht_node._routingTable._buckets)):
for contact in self.session.dht_node._routingTable._buckets[i]._contacts:
contacts = result['buckets'].get(i, [])
if contact in hosts:
blobs = hosts[contact]
del hosts[contact]
else:
blobs = []
host = {
"address": contact.address,
"node_id": contact.id.encode("hex"),
"blobs": blobs,
}
for blob_hash in blobs:
if blob_hash not in blob_hashes:
blob_hashes.append(blob_hash)
contacts.append(host)
result['buckets'][i] = contacts
if contact.id.encode('hex') not in contact_set:
contact_set.append(contact.id.encode("hex"))
result['contacts'] = contact_set
result['blob_hashes'] = blob_hashes
result['node_id'] = self.session.dht_node.node_id.encode('hex')
return self._render_response(result)
@defer.inlineCallbacks
def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None):
"""

View file

@ -21,9 +21,15 @@ alpha = 3
#: Maximum number of contacts stored in a bucket; this should be an even number
k = 8
#: Maximum number of contacts stored in the replacement cache
replacementCacheSize = 8
#: Timeout for network operations (in seconds)
rpcTimeout = 5
# number of rpc attempts to make before a timeout results in the node being removed as a contact
rpcAttempts = 5
# Delay between iterations of iterative node lookups (for loose parallelism) (in seconds)
iterativeLookupDelay = rpcTimeout / 2

View file

@ -1,13 +1,3 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
class Contact(object):
""" Encapsulation for remote contact

View file

@ -1,33 +1,13 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import UserDict
import time
import constants
from interface import IDataStore
from zope.interface import implements
class DataStore(UserDict.DictMixin):
""" Interface for classes implementing physical storage (for data
published via the "STORE" RPC) for the Kademlia DHT
@note: This provides an interface for a dict-like object
"""
def keys(self):
""" Return a list of the keys in this data store """
def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID):
pass
class DictDataStore(DataStore):
class DictDataStore(UserDict.DictMixin):
""" A datastore using an in-memory Python dictionary """
implements(IDataStore)
def __init__(self):
# Dictionary format:
@ -64,3 +44,9 @@ class DictDataStore(DataStore):
def getPeersForBlob(self, key):
if key in self._dict:
return [val[0] for val in self._dict[key]]
def removePeer(self, value):
for key in self._dict:
self._dict[key] = [val for val in self._dict[key] if val[0] != value]
if not self._dict[key]:
del self._dict[key]

22
lbrynet/dht/delay.py Normal file
View file

@ -0,0 +1,22 @@
import time
class Delay(object):
maxToSendDelay = 10 ** -3 # 0.05
minToSendDelay = 10 ** -5 # 0.01
def __init__(self, start=0):
self._next = start
# TODO: explain why this logic is like it is. And add tests that
# show that it actually does what it needs to do.
def __call__(self):
ts = time.time()
delay = 0
if ts >= self._next:
delay = self.minToSendDelay
self._next = ts + self.minToSendDelay
else:
delay = (self._next - ts) + self.maxToSendDelay
self._next += self.maxToSendDelay
return delay

View file

@ -1,17 +1,4 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
class DecodeError(Exception):
""" Should be raised by an C{Encoding} implementation if decode operation
fails
"""
from error import DecodeError
class Encoding(object):

38
lbrynet/dht/error.py Normal file
View file

@ -0,0 +1,38 @@
import binascii
import exceptions
# this is a dict of {"exceptions.<exception class name>": exception class} items used to raise
# remote built-in exceptions locally
BUILTIN_EXCEPTIONS = {
"exceptions.%s" % e: getattr(exceptions, e) for e in dir(exceptions) if not e.startswith("_")
}
class DecodeError(Exception):
"""
Should be raised by an C{Encoding} implementation if decode operation
fails
"""
pass
class BucketFull(Exception):
"""
Raised when the bucket is full
"""
pass
class UnknownRemoteException(Exception):
pass
class TimeoutError(Exception):
""" Raised when a RPC times out """
def __init__(self, remote_contact_id):
# remote_contact_id is a binary blob so we need to convert it
# into something more readable
msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id))
Exception.__init__(self, msg)
self.remote_contact_id = remote_contact_id

117
lbrynet/dht/interface.py Normal file
View file

@ -0,0 +1,117 @@
from zope.interface import Interface
class IDataStore(Interface):
""" Interface for classes implementing physical storage (for data
published via the "STORE" RPC) for the Kademlia DHT
@note: This provides an interface for a dict-like object
"""
def keys(self):
""" Return a list of the keys in this data store """
pass
def removeExpiredPeers(self):
pass
def hasPeersForBlob(self, key):
pass
def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID):
pass
def getPeersForBlob(self, key):
pass
def removePeer(self, key):
pass
class IRoutingTable(Interface):
""" Interface for RPC message translators/formatters
Classes inheriting from this should provide a suitable routing table for
a parent Node object (i.e. the local entity in the Kademlia network)
"""
def __init__(self, parentNodeID):
"""
@param parentNodeID: The n-bit node ID of the node to which this
routing table belongs
@type parentNodeID: str
"""
def addContact(self, contact):
""" Add the given contact to the correct k-bucket; if it already
exists, its status will be updated
@param contact: The contact to add to this node's k-buckets
@type contact: kademlia.contact.Contact
"""
def findCloseNodes(self, key, count, _rpcNodeID=None):
""" Finds a number of known nodes closest to the node/value with the
specified key.
@param key: the n-bit key (i.e. the node or value ID) to search for
@type key: str
@param count: the amount of contacts to return
@type count: int
@param _rpcNodeID: Used during RPC, this is be the sender's Node ID
Whatever ID is passed in the paramater will get
excluded from the list of returned contacts.
@type _rpcNodeID: str
@return: A list of node contacts (C{kademlia.contact.Contact instances})
closest to the specified key.
This method will return C{k} (or C{count}, if specified)
contacts if at all possible; it will only return fewer if the
node is returning all of the contacts that it knows of.
@rtype: list
"""
def getContact(self, contactID):
""" Returns the (known) contact with the specified node ID
@raise ValueError: No contact with the specified contact ID is known
by this node
"""
def getRefreshList(self, startIndex=0, force=False):
""" Finds all k-buckets that need refreshing, starting at the
k-bucket with the specified index, and returns IDs to be searched for
in order to refresh those k-buckets
@param startIndex: The index of the bucket to start refreshing at;
this bucket and those further away from it will
be refreshed. For example, when joining the
network, this node will set this to the index of
the bucket after the one containing it's closest
neighbour.
@type startIndex: index
@param force: If this is C{True}, all buckets (in the specified range)
will be refreshed, regardless of the time they were last
accessed.
@type force: bool
@return: A list of node ID's that the parent node should search for
in order to refresh the routing Table
@rtype: list
"""
def removeContact(self, contactID):
""" Remove the contact with the specified node ID from the routing
table
@param contactID: The node ID of the contact to remove
@type contactID: str
"""
def touchKBucket(self, key):
""" Update the "last accessed" timestamp of the k-bucket which covers
the range containing the specified key in the key/ID space
@param key: A key in the range of the target k-bucket
@type key: str
"""

View file

@ -1,17 +1,5 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import constants
class BucketFull(Exception):
""" Raised when the bucket is full """
from error import BucketFull
class KBucket(object):

View file

@ -12,7 +12,7 @@ import operator
import struct
import time
from twisted.internet import defer, error, reactor, threads
from twisted.internet import defer, error, reactor, threads, task
import constants
import routingtable
@ -49,8 +49,8 @@ class Node(object):
application is performed via this class (or a subclass).
"""
def __init__(self, id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None, lbryid=None,
def __init__(self, node_id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None,
externalIP=None):
"""
@param dataStore: The data store to use. This must be class inheriting
@ -74,11 +74,7 @@ class Node(object):
being transmitted.
@type networkProtocol: entangled.kademlia.protocol.KademliaProtocol
"""
if id != None:
self.id = id
else:
self.id = self._generateID()
self.lbryid = lbryid
self.node_id = node_id or self._generateID()
self.port = udpPort
self._listeningPort = None # object implementing Twisted
# IListeningPort This will contain a deferred created when
@ -88,12 +84,12 @@ class Node(object):
# operations before the node has finished joining the network)
self._joinDeferred = None
self.next_refresh_call = None
self.next_change_token_call = None
self.change_token_lc = task.LoopingCall(self.change_token)
# Create k-buckets (for storing contacts)
if routingTableClass is None:
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.id)
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id)
else:
self._routingTable = routingTableClass(self.id)
self._routingTable = routingTableClass(self.node_id)
# Initialize this node's network access mechanisms
if networkProtocol is None:
@ -103,7 +99,6 @@ class Node(object):
# Initialize the data storage mechanism used by this node
self.token_secret = self._generateID()
self.old_token_secret = None
self.change_token()
if dataStore is None:
self._dataStore = datastore.DictDataStore()
else:
@ -111,7 +106,7 @@ class Node(object):
# Try to restore the node's state...
if 'nodeState' in self._dataStore:
state = self._dataStore['nodeState']
self.id = state['id']
self.node_id = state['id']
for contactTriple in state['closestNodes']:
contact = Contact(
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
@ -128,9 +123,8 @@ class Node(object):
if self.next_refresh_call is not None:
self.next_refresh_call.cancel()
self.next_refresh_call = None
if self.next_change_token_call is not None:
self.next_change_token_call.cancel()
self.next_change_token_call = None
if self.change_token_lc.running:
self.change_token_lc.stop()
if self._listeningPort is not None:
self._listeningPort.stopListening()
self.hash_watcher.stop()
@ -163,8 +157,12 @@ class Node(object):
bootstrapContacts.append(contact)
else:
bootstrapContacts = None
# Start the token looping call
self.change_token_lc.start(constants.tokenSecretChangeInterval)
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts)
self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
@ -173,18 +171,27 @@ class Node(object):
self.hash_watcher.tick()
yield self._joinDeferred
@property
def contacts(self):
def _inner():
for i in range(len(self._routingTable._buckets)):
for contact in self._routingTable._buckets[i]._contacts:
yield contact
return list(_inner())
def printContacts(self, *args):
print '\n\nNODE CONTACTS\n==============='
for i in range(len(self._routingTable._buckets)):
print "bucket %i" % i
for contact in self._routingTable._buckets[i]._contacts:
print contact
print " %s:%i" % (contact.address, contact.port)
print '=================================='
def getApproximateTotalDHTNodes(self):
# get the deepest bucket and the number of contacts in that bucket and multiply it
# by the number of equivalently deep buckets in the whole DHT to get a really bad
# estimate!
bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)]
bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.node_id)]
num_in_bucket = len(bucket._contacts)
factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin)
return num_in_bucket * factor
@ -200,30 +207,24 @@ class Node(object):
return num_in_data_store * self.getApproximateTotalDHTNodes() / 8
def announceHaveBlob(self, key, port):
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid})
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id})
@defer.inlineCallbacks
def getPeersForBlob(self, blob_hash):
def expand_and_filter(result):
result = yield self.iterativeFindValue(blob_hash)
expanded_peers = []
if isinstance(result, dict):
if result:
if blob_hash in result:
for peer in result[blob_hash]:
if self.lbryid != peer[6:]:
if self.node_id != peer[6:]:
host = ".".join([str(ord(d)) for d in peer[:4]])
if host == "127.0.0.1":
if "from_peer" in result:
if result["from_peer"] != "self":
if host == "127.0.0.1" and "from_peer" in result \
and result["from_peer"] != "self":
host = result["from_peer"]
port, = struct.unpack('>H', peer[4:6])
if (host, port) not in expanded_peers:
expanded_peers.append((host, port))
return expanded_peers
def find_failed(err):
return []
d = self.iterativeFindValue(blob_hash)
d.addCallbacks(expand_and_filter, find_failed)
return d
defer.returnValue(expanded_peers)
def get_most_popular_hashes(self, num_to_return):
return self.hash_watcher.most_popular_hashes(num_to_return)
@ -263,7 +264,7 @@ class Node(object):
result = responseMsg.response
if 'token' in result:
value['token'] = result['token']
d = n.store(blob_hash, value, self.id, 0)
d = n.store(blob_hash, value, self.node_id, 0)
d.addCallback(log_success)
d.addErrback(log_error, n)
else:
@ -272,12 +273,12 @@ class Node(object):
def requestPeers(contacts):
if self.externalIP is not None and len(contacts) >= constants.k:
is_closer = Distance(blob_hash).is_closer(self.id, contacts[-1].id)
is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id)
if is_closer:
contacts.pop()
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id)
elif self.externalIP is not None:
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id)
ds = []
for contact in contacts:
known_nodes[contact.id] = contact
@ -295,8 +296,6 @@ class Node(object):
def change_token(self):
self.old_token_secret = self.token_secret
self.token_secret = self._generateID()
self.next_change_token_call = reactor.callLater(constants.tokenSecretChangeInterval,
self.change_token)
def make_token(self, compact_ip):
h = hashlib.new('sha384')
@ -463,14 +462,13 @@ class Node(object):
raise TypeError, 'No NodeID given. Therefore we can\'t store this node'
if self_store is True and self.externalIP:
contact = Contact(self.id, self.externalIP, self.port, None, None)
contact = Contact(self.node_id, self.externalIP, self.port, None, None)
compact_ip = contact.compact_ip()
elif '_rpcNodeContact' in kwargs:
contact = kwargs['_rpcNodeContact']
compact_ip = contact.compact_ip()
else:
return 'Not OK'
# raise TypeError, 'No contact info available'
raise TypeError, 'No contact info available'
if ((self_store is False) and
('token' not in value or not self.verify_token(value['token'], compact_ip))):
@ -513,6 +511,7 @@ class Node(object):
node is returning all of the contacts that it knows of.
@rtype: list
"""
# Get the sender's ID (if any)
if '_rpcNodeID' in kwargs:
rpc_sender_id = kwargs['_rpcNodeID']
@ -589,11 +588,12 @@ class Node(object):
findValue = rpc != 'findNode'
if startupShortlist is None:
shortlist = self._routingTable.findCloseNodes(key, constants.alpha)
if key != self.id:
shortlist = self._routingTable.findCloseNodes(key, constants.k)
if key != self.node_id:
# Update the "last accessed" timestamp for the appropriate k-bucket
self._routingTable.touchKBucket(key)
if len(shortlist) == 0:
log.warning("This node doesnt know any other nodes")
# This node doesn't know of any other nodes
fakeDf = defer.Deferred()
fakeDf.callback([])
@ -686,7 +686,7 @@ class _IterativeFindHelper(object):
responseMsg = responseTuple[0]
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
# Make sure the responding node is valid, and abort the operation if it isn't
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id:
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.node_id:
return responseMsg.nodeID
# Mark this node as active

View file

@ -1,62 +1,21 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import logging
import binascii
import time
import socket
import errno
from twisted.internet import protocol, defer, error, reactor, task
from twisted.python import failure
import constants
import encoding
import msgtypes
import msgformat
from contact import Contact
from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError
from delay import Delay
log = logging.getLogger(__name__)
class TimeoutError(Exception):
""" Raised when a RPC times out """
def __init__(self, remote_contact_id):
# remote_contact_id is a binary blob so we need to convert it
# into something more readable
msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id))
Exception.__init__(self, msg)
self.remote_contact_id = remote_contact_id
class Delay(object):
maxToSendDelay = 10 ** -3 # 0.05
minToSendDelay = 10 ** -5 # 0.01
def __init__(self, start=0):
self._next = start
# TODO: explain why this logic is like it is. And add tests that
# show that it actually does what it needs to do.
def __call__(self):
ts = time.time()
delay = 0
if ts >= self._next:
delay = self.minToSendDelay
self._next = ts + self.minToSendDelay
else:
delay = (self._next - ts) + self.maxToSendDelay
self._next += self.maxToSendDelay
return delay
class KademliaProtocol(protocol.DatagramProtocol):
""" Implements all low-level network-related functions of a Kademlia node """
@ -195,11 +154,14 @@ class KademliaProtocol(protocol.DatagramProtocol):
C{ErrorMessage}).
@rtype: twisted.internet.defer.Deferred
"""
msg = msgtypes.RequestMessage(self._node.id, method, args)
msg = msgtypes.RequestMessage(self._node.node_id, method, args)
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
if args:
log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex'))
else:
log.debug("DHT SEND CALL %s", method)
df = defer.Deferred()
if rawResponse:
@ -209,7 +171,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id)
# Transmit the data
self._send(encodedMsg, msg.id, (contact.address, contact.port))
self._sentMessages[msg.id] = (contact.id, df, timeoutCall)
self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args)
return df
def startProtocol(self):
@ -286,7 +248,12 @@ class KademliaProtocol(protocol.DatagramProtocol):
df.callback((message, address))
elif isinstance(message, msgtypes.ErrorMessage):
# The RPC request raised a remote exception; raise it locally
remoteException = Exception(message.response)
if message.exceptionType in BUILTIN_EXCEPTIONS:
exception_type = BUILTIN_EXCEPTIONS[message.exceptionType]
else:
exception_type = UnknownRemoteException
remoteException = exception_type(message.response)
log.error("Remote exception (%s): %s", address, remoteException)
df.errback(remoteException)
else:
# We got a result from the RPC
@ -377,7 +344,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
def _sendResponse(self, contact, rpcID, response):
""" Send a RPC response to the specified contact
"""
msg = msgtypes.ResponseMessage(rpcID, self._node.id, response)
msg = msgtypes.ResponseMessage(rpcID, self._node.node_id, response)
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
self._send(encodedMsg, rpcID, (contact.address, contact.port))
@ -385,7 +352,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
def _sendError(self, contact, rpcID, exceptionType, exceptionMessage):
""" Send an RPC error message to the specified contact
"""
msg = msgtypes.ErrorMessage(rpcID, self._node.id, exceptionType, exceptionMessage)
msg = msgtypes.ErrorMessage(rpcID, self._node.node_id, exceptionType, exceptionMessage)
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
self._send(encodedMsg, rpcID, (contact.address, contact.port))
@ -408,48 +375,56 @@ class KademliaProtocol(protocol.DatagramProtocol):
func = getattr(self._node, method, None)
if callable(func) and hasattr(func, 'rpcmethod'):
# Call the exposed Node method and return the result to the deferred callback chain
if args:
log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'),
senderContact.address, senderContact.port)
else:
log.debug("DHT RECV CALL %s %s:%i", method, senderContact.address,
senderContact.port)
try:
if method != 'ping':
kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact}
result = func(*args, **kwargs)
else:
result = func()
except Exception, e:
df.errback(failure.Failure(e))
log.exception("error handling request for %s: %s", senderContact.address, method)
df.errback(e)
else:
df.callback(result)
else:
# No such exposed method
df.errback(failure.Failure(AttributeError('Invalid method: %s' % method)))
df.errback(AttributeError('Invalid method: %s' % method))
def _msgTimeout(self, messageID):
""" Called when an RPC request message times out """
# Find the message that timed out
if not self._sentMessages.has_key(messageID):
if messageID not in self._sentMessages:
# This should never be reached
log.error("deferred timed out, but is not present in sent messages list!")
return
remoteContactID, df = self._sentMessages[messageID][0:2]
remoteContactID, df, timeout_call, method, args = self._sentMessages[messageID]
if self._partialMessages.has_key(messageID):
# We are still receiving this message
self._msgTimeoutInProgress(messageID, remoteContactID, df)
self._msgTimeoutInProgress(messageID, remoteContactID, df, method, args)
return
del self._sentMessages[messageID]
# The message's destination node is now considered to be dead;
# raise an (asynchronous) TimeoutError exception and update the host node
self._node.removeContact(remoteContactID)
df.errback(failure.Failure(TimeoutError(remoteContactID)))
df.errback(TimeoutError(remoteContactID))
def _msgTimeoutInProgress(self, messageID, remoteContactID, df):
def _msgTimeoutInProgress(self, messageID, remoteContactID, df, method, args):
# See if any progress has been made; if not, kill the message
if self._hasProgressBeenMade(messageID):
# Reset the RPC timeout timer
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, messageID)
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall)
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args)
else:
# No progress has been made
del self._partialMessagesProgress[messageID]
del self._partialMessages[messageID]
df.errback(failure.Failure(TimeoutError(remoteContactID)))
df.errback(TimeoutError(remoteContactID))
def _hasProgressBeenMade(self, messageID):
return (

View file

@ -7,102 +7,17 @@
import time
import random
from zope.interface import implements
import constants
import kbucket
from interface import IRoutingTable
from error import TimeoutError
import logging
from protocol import TimeoutError
log = logging.getLogger(__name__)
class RoutingTable(object):
""" Interface for RPC message translators/formatters
Classes inheriting from this should provide a suitable routing table for
a parent Node object (i.e. the local entity in the Kademlia network)
"""
def __init__(self, parentNodeID):
"""
@param parentNodeID: The n-bit node ID of the node to which this
routing table belongs
@type parentNodeID: str
"""
def addContact(self, contact):
""" Add the given contact to the correct k-bucket; if it already
exists, its status will be updated
@param contact: The contact to add to this node's k-buckets
@type contact: kademlia.contact.Contact
"""
def findCloseNodes(self, key, count, _rpcNodeID=None):
""" Finds a number of known nodes closest to the node/value with the
specified key.
@param key: the n-bit key (i.e. the node or value ID) to search for
@type key: str
@param count: the amount of contacts to return
@type count: int
@param _rpcNodeID: Used during RPC, this is be the sender's Node ID
Whatever ID is passed in the paramater will get
excluded from the list of returned contacts.
@type _rpcNodeID: str
@return: A list of node contacts (C{kademlia.contact.Contact instances})
closest to the specified key.
This method will return C{k} (or C{count}, if specified)
contacts if at all possible; it will only return fewer if the
node is returning all of the contacts that it knows of.
@rtype: list
"""
def getContact(self, contactID):
""" Returns the (known) contact with the specified node ID
@raise ValueError: No contact with the specified contact ID is known
by this node
"""
def getRefreshList(self, startIndex=0, force=False):
""" Finds all k-buckets that need refreshing, starting at the
k-bucket with the specified index, and returns IDs to be searched for
in order to refresh those k-buckets
@param startIndex: The index of the bucket to start refreshing at;
this bucket and those further away from it will
be refreshed. For example, when joining the
network, this node will set this to the index of
the bucket after the one containing it's closest
neighbour.
@type startIndex: index
@param force: If this is C{True}, all buckets (in the specified range)
will be refreshed, regardless of the time they were last
accessed.
@type force: bool
@return: A list of node ID's that the parent node should search for
in order to refresh the routing Table
@rtype: list
"""
def removeContact(self, contactID):
""" Remove the contact with the specified node ID from the routing
table
@param contactID: The node ID of the contact to remove
@type contactID: str
"""
def touchKBucket(self, key):
""" Update the "last accessed" timestamp of the k-bucket which covers
the range containing the specified key in the key/ID space
@param key: A key in the range of the target k-bucket
@type key: str
"""
class TreeRoutingTable(RoutingTable):
class TreeRoutingTable(object):
""" This class implements a routing table used by a Node class.
The Kademlia routing table is a binary tree whose leaves are k-buckets,
@ -117,6 +32,7 @@ class TreeRoutingTable(RoutingTable):
C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of
that paper.
"""
implements(IRoutingTable)
def __init__(self, parentNodeID):
"""
@ -168,7 +84,7 @@ class TreeRoutingTable(RoutingTable):
@type failure: twisted.python.failure.Failure
"""
failure.trap(TimeoutError)
print '==replacing contact=='
log.warning('==replacing contact==')
# Remove the old contact...
deadContactID = failure.getErrorMessage()
try:
@ -207,7 +123,11 @@ class TreeRoutingTable(RoutingTable):
@rtype: list
"""
bucketIndex = self._kbucketIndex(key)
closestNodes = self._buckets[bucketIndex].getContacts(constants.k, _rpcNodeID)
if bucketIndex < len(self._buckets):
closestNodes = self._buckets[bucketIndex].getContacts(count, _rpcNodeID)
else:
closestNodes = []
# This method must return k contacts (even if we have the node
# with the specified key as node ID), unless there is less
# than k remote nodes in the routing table
@ -215,7 +135,7 @@ class TreeRoutingTable(RoutingTable):
canGoLower = bucketIndex - i >= 0
canGoHigher = bucketIndex + i < len(self._buckets)
# Fill up the node list to k nodes, starting with the closest neighbouring nodes known
while len(closestNodes) < constants.k and (canGoLower or canGoHigher):
while len(closestNodes) < min(count, constants.k) and (canGoLower or canGoHigher):
# TODO: this may need to be optimized
if canGoLower:
closestNodes.extend(
@ -224,8 +144,8 @@ class TreeRoutingTable(RoutingTable):
canGoLower = bucketIndex - (i + 1) >= 0
if canGoHigher:
closestNodes.extend(
self._buckets[bucketIndex + i].getContacts(
constants.k - len(closestNodes), _rpcNodeID))
self._buckets[bucketIndex + i].getContacts(constants.k - len(closestNodes),
_rpcNodeID))
canGoHigher = bucketIndex + (i + 1) < len(self._buckets)
i += 1
return closestNodes
@ -404,10 +324,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
self._replacementCache[bucketIndex] = []
if contact in self._replacementCache[bucketIndex]:
self._replacementCache[bucketIndex].remove(contact)
# TODO: Using k to limit the size of the contact
# replacement cache - maybe define a separate value for
# this in constants.py?
elif len(self._replacementCache[bucketIndex]) >= constants.k:
elif len(self._replacementCache[bucketIndex]) >= constants.replacementCacheSize:
self._replacementCache[bucketIndex].pop(0)
self._replacementCache[bucketIndex].append(contact)
@ -424,7 +341,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
except ValueError:
return
contact.failedRPCs += 1
if contact.failedRPCs >= 5:
if contact.failedRPCs >= constants.rpcAttempts:
self._buckets[bucketIndex].removeContact(contactID)
# Replace this stale contact with one from our replacement cache, if we have any
if bucketIndex in self._replacementCache:

View file

View file

@ -8,10 +8,13 @@ import hashlib
import unittest
import struct
from twisted.internet import protocol, defer, selectreactor
from lbrynet.dht.msgtypes import ResponseMessage
import lbrynet.dht.node
import lbrynet.dht.constants
import lbrynet.dht.datastore
class NodeIDTest(unittest.TestCase):
""" Test case for the Node class's ID """
def setUp(self):
@ -19,12 +22,13 @@ class NodeIDTest(unittest.TestCase):
def testAutoCreatedID(self):
""" Tests if a new node has a valid node ID """
self.failUnlessEqual(type(self.node.id), str, 'Node does not have a valid ID')
self.failUnlessEqual(len(self.node.id), 20, 'Node ID length is incorrect! Expected 160 bits, got %d bits.' % (len(self.node.id)*8))
self.failUnlessEqual(type(self.node.node_id), str, 'Node does not have a valid ID')
self.failUnlessEqual(len(self.node.node_id), 48, 'Node ID length is incorrect! '
'Expected 384 bits, got %d bits.' %
(len(self.node.node_id) * 8))
def testUniqueness(self):
""" Tests the uniqueness of the values created by the NodeID generator
"""
""" Tests the uniqueness of the values created by the NodeID generator """
generatedIDs = []
for i in range(100):
newID = self.node._generateID()
@ -37,42 +41,45 @@ class NodeIDTest(unittest.TestCase):
for i in range(20):
id = self.node._generateID()
# Key length: 20 bytes == 160 bits
self.failUnlessEqual(len(id), 20, 'Length of generated ID is incorrect! Expected 160 bits, got %d bits.' % (len(id)*8))
self.failUnlessEqual(len(id), 48,
'Length of generated ID is incorrect! Expected 384 bits, '
'got %d bits.' % (len(id)*8))
class NodeDataTest(unittest.TestCase):
""" Test case for the Node class's data-related functions """
def setUp(self):
import lbrynet.dht.contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('test')
self.node = lbrynet.dht.node.Node()
self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345, self.node._protocol)
self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345,
self.node._protocol)
self.token = self.node.make_token(self.contact.compact_ip())
self.cases = []
for i in xrange(5):
h.update(str(i))
self.cases.append((h.digest(), 5000+2*i))
self.cases.append((h.digest(), 5001+2*i))
<<<<<<< Updated upstream
#(('a', 'hello there\nthis is a test'),
# ('aMuchLongerKeyThanAnyOfThePreviousOnes', 'some data'))
=======
>>>>>>> Stashed changes
def testStore(self):
def check_val_in_result(r, peer_info):
self.failUnless
""" Tests if the node can store (and privately retrieve) some data """
for key, value in self.cases:
self.node.store(key, {'port': value, 'bbid': self.contact.id, 'token': self.token}, self.contact.id, _rpcNodeContact=self.contact)
request = {
'port': value,
'lbryid': self.contact.id,
'token': self.token
}
self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact)
for key, value in self.cases:
expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + self.contact.id
self.failUnless(self.node._dataStore.hasPeersForBlob(key), 'Stored key not found in node\'s DataStore: "%s"' % key)
self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key), 'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s' % (key, value, self.node._dataStore.getPeersForBlob(key)))
expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + \
self.contact.id
self.failUnless(self.node._dataStore.hasPeersForBlob(key),
'Stored key not found in node\'s DataStore: "%s"' % key)
self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key),
'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s'
% (key, value, self.node._dataStore.getPeersForBlob(key)))
class NodeContactTest(unittest.TestCase):
""" Test case for the Node class's contact management-related functions """
@ -91,36 +98,24 @@ class NodeContactTest(unittest.TestCase):
self.node.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()')
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; '
'expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
def testAddSelfAsContact(self):
""" Tests the node's behaviour when attempting to add itself as a contact """
import lbrynet.dht.contact
# Create a contact with the same ID as the local node's ID
contact = lbrynet.dht.contact.Contact(self.node.id, '127.0.0.1', 91824, None)
contact = lbrynet.dht.contact.Contact(self.node.node_id, '127.0.0.1', 91824, None)
# Now try to add it
self.node.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(self.node.id, lbrynet.dht.constants.k)
closestNodes = self.node._routingTable.findCloseNodes(self.node.node_id,
lbrynet.dht.constants.k)
self.failIf(contact in closestNodes, 'Node added itself as a contact')
<<<<<<< Updated upstream
# """ Test case for the Node class's iterative node lookup algorithm """
# """ Ugly brute-force test to see if the iterative node lookup algorithm runs without failing """
=======
>>>>>>> Stashed changes
"""Some scaffolding for the NodeLookupTest class. Allows isolated
node testing by simulating remote node responses"""
from twisted.internet import protocol, defer, selectreactor
from lbrynet.dht.msgtypes import ResponseMessage
class FakeRPCProtocol(protocol.DatagramProtocol):
def __init__(self):
self.reactor = selectreactor.SelectReactor()
@ -128,30 +123,30 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
self.network = None
def createNetwork(self, contactNetwork):
""" set up a list of contacts together with their closest contacts
@param contactNetwork: a sequence of tuples, each containing a contact together with its closest
contacts: C{(<contact>, <closest contact 1, ...,closest contact n>)}
"""
set up a list of contacts together with their closest contacts
@param contactNetwork: a sequence of tuples, each containing a contact together with its
closest contacts: C{(<contact>, <closest contact 1, ...,closest contact n>)}
"""
self.network = contactNetwork
""" Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs """
def sendRPC(self, contact, method, args, rawResponse=False):
""" Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs"""
if method == "findNode":
# get the specific contacts closest contacts
closestContacts = []
closestContactsList = []
for contactTuple in self.network:
if contact == contactTuple[0]:
# get the list of closest contacts for this contact
closestContactsList = contactTuple[1]
# Pack the closest contacts into a ResponseMessage
for closeContact in closestContactsList:
closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
message = ResponseMessage("rpcId", contact.id, closestContacts)
df = defer.Deferred()
df.callback((message,(contact.address, contact.port)))
df.callback((message, (contact.address, contact.port)))
return df
elif method == "findValue":
for contactTuple in self.network:
@ -160,12 +155,10 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
dataDict = contactTuple[2]
dataKey = dataDict.keys()[0]
data = dataDict.get(dataKey)
# Check if this contact has the requested value
if dataKey == args[0]:
# Return the data value
response = dataDict
print "data found at contact: " + contact.id
else:
# Return the closest contact to the requested data key
@ -173,59 +166,49 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
closeContacts = contactTuple[1]
closestContacts = []
for closeContact in closeContacts:
closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
closestContacts.append((closeContact.id, closeContact.address,
closeContact.port))
response = closestContacts
# Create the response message
message = ResponseMessage("rpcId", contact.id, response)
df = defer.Deferred()
df.callback((message,(contact.address, contact.port)))
df.callback((message, (contact.address, contact.port)))
return df
def _send(self, data, rpcID, address):
""" fake sending data """
class NodeLookupTest(unittest.TestCase):
""" Test case for the Node class's iterativeFind node lookup algorithm """
def setUp(self):
# create a fake protocol to imitate communication with other nodes
self._protocol = FakeRPCProtocol()
# Note: The reactor is never started for this test. All deferred calls run sequentially,
# since there is no asynchronous network communication
# create the node to be tested in isolation
self.node = lbrynet.dht.node.Node(None, 4000, None, None, self._protocol)
self.node = lbrynet.dht.node.Node('12345678901234567800', 4000, None, None, self._protocol)
self.updPort = 81173
<<<<<<< Updated upstream
# create a dummy reactor
=======
>>>>>>> Stashed changes
self.contactsAmount = 80
# set the node ID manually for testing
self.node.id = '12345678901234567800'
# Reinitialise the routing table
self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(self.node.id)
self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(
self.node.node_id)
# create 160 bit node ID's for test purposes
self.testNodeIDs = []
idNum = int(self.node.id)
idNum = int(self.node.node_id)
for i in range(self.contactsAmount):
# create the testNodeIDs in ascending order, away from the actual node ID, with regards to the distance metric
# create the testNodeIDs in ascending order, away from the actual node ID,
# with regards to the distance metric
self.testNodeIDs.append(idNum + i + 1)
# generate contacts
self.contacts = []
for i in range(self.contactsAmount):
contact = lbrynet.dht.contact.Contact(str(self.testNodeIDs[i]), "127.0.0.1", self.updPort + i + 1, self._protocol)
contact = lbrynet.dht.contact.Contact(str(self.testNodeIDs[i]), "127.0.0.1",
self.updPort + i + 1, self._protocol)
self.contacts.append(contact)
# create the network of contacts in format: (contact, closest contacts)
@ -254,43 +237,27 @@ class NodeLookupTest(unittest.TestCase):
contacts_with_datastores = []
for contact_tuple in contactNetwork:
contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], lbrynet.dht.datastore.DictDataStore()))
contacts_with_datastores.append((contact_tuple[0], contact_tuple[1],
lbrynet.dht.datastore.DictDataStore()))
self._protocol.createNetwork(contacts_with_datastores)
def testNodeBootStrap(self):
""" Test bootstrap with the closest possible contacts """
df = self.node._iterativeFind(self.node.id, self.contacts[0:8])
df = self.node._iterativeFind(self.node.node_id, self.contacts[0:8])
# Set the expected result
expectedResult = []
for item in self.contacts[0:6]:
expectedResult.append(item.id)
# Get the result from the deferred
activeContacts = df.result
# Check the length of the active contacts
self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), \
"More active contacts should exist, there should be %d contacts" %expectedResult.__len__())
self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(),
"More active contacts should exist, there should be %d "
"contacts" % expectedResult.__len__())
# Check that the received active contacts are the same as the input contacts
self.failUnlessEqual(activeContacts, expectedResult, \
"Active should only contain the closest possible contacts which were used as input for the boostrap")
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(NodeIDTest))
suite.addTest(unittest.makeSuite(NodeDataTest))
suite.addTest(unittest.makeSuite(NodeContactTest))
suite.addTest(unittest.makeSuite(NodeLookupTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())
self.failUnlessEqual(activeContacts, expectedResult,
"Active should only contain the closest possible contacts"
" which were used as input for the boostrap")

View file

@ -1,88 +1,22 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
import time
import unittest
from twisted.internet import defer
from twisted.python import failure
import twisted.internet.selectreactor
from twisted.internet.protocol import DatagramProtocol
import lbrynet.dht.protocol
import lbrynet.dht.contact
import lbrynet.dht.constants
import lbrynet.dht.msgtypes
from lbrynet.dht.node import rpcmethod
class FakeNode(object):
""" A fake node object implementing some RPC and non-RPC methods to
test the Kademlia protocol's behaviour
"""
def __init__(self, id):
self.id = id
self.contacts = []
@rpcmethod
def ping(self):
return 'pong'
def pingNoRPC(self):
return 'pong'
@rpcmethod
def echo(self, value):
return value
def addContact(self, contact):
self.contacts.append(contact)
def removeContact(self, contact):
self.contacts.remove(contact)
def indirectPingContact(self, protocol, contact):
""" Pings the given contact (using the specified KademliaProtocol
object, not the direct Contact API), and removes the contact
on a timeout """
df = protocol.sendRPC(contact, 'ping', {})
def handleError(f):
if f.check(lbrynet.dht.protocol.TimeoutError):
self.removeContact(contact)
return f
else:
# This is some other error
return f
df.addErrback(handleError)
return df
class ClientDatagramProtocol(lbrynet.dht.protocol.KademliaProtocol):
data = ''
msgID = ''
destination = ('127.0.0.1', 9182)
def __init__(self):
lbrynet.dht.protocol.KademliaProtocol.__init__(self, None)
def startProtocol(self):
self.sendDatagram()
def sendDatagram(self):
if len(self.data):
self._send(self.data, self.msgID, self.destination)
from lbrynet.dht.error import TimeoutError
from lbrynet.dht.node import Node, rpcmethod
class KademliaProtocolTest(unittest.TestCase):
""" Test case for the Protocol class """
def setUp(self):
del lbrynet.dht.protocol.reactor
lbrynet.dht.protocol.reactor = twisted.internet.selectreactor.SelectReactor()
self.node = FakeNode('node1')
self.node = Node(node_id='node1', udpPort=9182, externalIP="127.0.0.1")
self.protocol = lbrynet.dht.protocol.KademliaProtocol(self.node)
def testReactor(self):
@ -93,35 +27,65 @@ class KademliaProtocolTest(unittest.TestCase):
def testRPCTimeout(self):
""" Tests if a RPC message sent to a dead remote node times out correctly """
@rpcmethod
def fake_ping(*args, **kwargs):
time.sleep(lbrynet.dht.constants.rpcTimeout + 1)
return 'pong'
real_ping = self.node.ping
real_timeout = lbrynet.dht.constants.rpcTimeout
real_attempts = lbrynet.dht.constants.rpcAttempts
lbrynet.dht.constants.rpcAttempts = 1
lbrynet.dht.constants.rpcTimeout = 1
self.node.ping = fake_ping
deadContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
self.node.addContact(deadContact)
# Make sure the contact was added
self.failIf(deadContact not in self.node.contacts, 'Contact not added to fake node (error in test code)')
# Set the timeout to 0 for testing
tempTimeout = lbrynet.dht.constants.rpcTimeout
lbrynet.dht.constants.rpcTimeout = 0
lbrynet.dht.protocol.reactor.listenUDP(0, self.protocol)
# Run the PING RPC (which should timeout)
df = self.node.indirectPingContact(self.protocol, deadContact)
self.failIf(deadContact not in self.node.contacts,
'Contact not added to fake node (error in test code)')
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Run the PING RPC (which should raise a timeout error)
df = self.protocol.sendRPC(deadContact, 'ping', {})
def check_timeout(err):
self.assertEqual(type(err), TimeoutError)
df.addErrback(check_timeout)
def reset_values():
self.node.ping = real_ping
lbrynet.dht.constants.rpcTimeout = real_timeout
lbrynet.dht.constants.rpcAttempts = real_attempts
# See if the contact was removed due to the timeout
def check_removed_contact():
self.failIf(deadContact in self.node.contacts,
'Contact was not removed after RPC timeout; check exception types.')
df.addCallback(lambda _: reset_values())
# Stop the reactor if a result arrives (timeout or not)
df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
df.addCallback(lambda _: check_removed_contact())
lbrynet.dht.protocol.reactor.run()
# See if the contact was removed due to the timeout
self.failIf(deadContact in self.node.contacts, 'Contact was not removed after RPC timeout; check exception types.')
# Restore the global timeout
lbrynet.dht.constants.rpcTimeout = tempTimeout
def testRPCRequest(self):
""" Tests if a valid RPC request is executed and responded to correctly """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result)
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \
% (expectedResult, result)
# Publish the "local" node on the network
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Simulate the RPC
@ -132,17 +96,19 @@ class KademliaProtocolTest(unittest.TestCase):
lbrynet.dht.protocol.reactor.run()
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
self.failUnlessEqual(len(self.protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCAccess(self):
""" Tests invalid RPC requests
Verifies that a RPC request for an existing but unpublished
method is denied, and that the associated (remote) exception gets
raised locally """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
try:
f.raiseException()
@ -150,10 +116,13 @@ class KademliaProtocolTest(unittest.TestCase):
# This is the expected outcome since the remote node did not publish the method
self.error = None
except Exception, e:
self.error = 'The remote method failed, but the wrong exception was raised; expected AttributeError, got %s' % type(e)
self.error = 'The remote method failed, but the wrong exception was raised; ' \
'expected AttributeError, got %s' % type(e)
def handleResult(result):
self.error = 'The remote method executed successfully, returning: "%s"; this RPC should not have been allowed.' % result
self.error = 'The remote method executed successfully, returning: "%s"; ' \
'this RPC should not have been allowed.' % result
# Publish the "local" node on the network
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Simulate the RPC
@ -164,37 +133,35 @@ class KademliaProtocolTest(unittest.TestCase):
lbrynet.dht.protocol.reactor.run()
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
self.failUnlessEqual(len(self.protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCRequestArgs(self):
""" Tests if an RPC requiring arguments is executed correctly """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'This should be returned.'
if result != 'This should be returned.':
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result)
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \
(expectedResult, result)
# Publish the "local" node on the network
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Simulate the RPC
df = remoteContact.echo('This should be returned.')
df = remoteContact.ping()
df.addCallback(handleResult)
df.addErrback(handleError)
df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
lbrynet.dht.protocol.reactor.run()
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(KademliaProtocolTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())
self.failUnlessEqual(len(self.protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')

View file

@ -10,6 +10,8 @@ import unittest
import lbrynet.dht.constants
import lbrynet.dht.routingtable
import lbrynet.dht.contact
import lbrynet.dht.node
class FakeRPCProtocol(object):
""" Fake RPC protocol; allows lbrynet.dht.contact.Contact objects to "send" RPCs """
@ -21,6 +23,7 @@ class FakeDeferred(object):
""" Fake Twisted Deferred object; allows the routing table to add callbacks that do nothing """
def addCallback(self, *args, **kwargs):
return
def addErrback(self, *args, **kwargs):
return
@ -28,7 +31,7 @@ class FakeDeferred(object):
class TreeRoutingTableTest(unittest.TestCase):
""" Test case for the RoutingTable class """
def setUp(self):
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node1')
self.nodeID = h.digest()
self.protocol = FakeRPCProtocol()
@ -38,24 +41,26 @@ class TreeRoutingTableTest(unittest.TestCase):
""" Test to see if distance method returns correct result"""
# testList holds a couple 3-tuple (variable1, variable2, result)
basicTestList = [('123456789','123456789', 0L), ('12345', '98765', 34527773184L)]
basicTestList = [('123456789', '123456789', 0L), ('12345', '98765', 34527773184L)]
for test in basicTestList:
result = self.routingTable.distance(test[0], test[1])
self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' % (test[2], result))
result = lbrynet.dht.node.Distance(test[0])(test[1])
self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' %
(test[2], result))
baseIp = '146.64.19.111'
ipTestList = ['146.64.29.222', '192.68.19.333']
distanceOne = self.routingTable.distance(baseIp, ipTestList[0])
distanceTwo = self.routingTable.distance(baseIp, ipTestList[1])
distanceOne = lbrynet.dht.node.Distance(baseIp)(ipTestList[0])
distanceTwo = lbrynet.dht.node.Distance(baseIp)(ipTestList[1])
self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' % (ipTestList[0], baseIp, ipTestList[1]))
self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' %
(ipTestList[0], baseIp, ipTestList[1]))
def testAddContact(self):
""" Tests if a contact can be added and retrieved correctly """
# Create the contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
@ -63,12 +68,14 @@ class TreeRoutingTableTest(unittest.TestCase):
self.routingTable.addContact(contact)
# ...and request the closest nodes to it (will retrieve it)
closestNodes = self.routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()')
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,'
' got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
def testGetContact(self):
""" Tests if a specific existing contact can be retrieved correctly """
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
@ -79,7 +86,10 @@ class TreeRoutingTableTest(unittest.TestCase):
self.failUnlessEqual(contact, sameContact, 'getContact() should return the same contact')
def testAddParentNodeAsContact(self):
""" Tests the routing table's behaviour when attempting to add its parent node as a contact """
"""
Tests the routing table's behaviour when attempting to add its parent node as a contact
"""
# Create a contact with the same ID as the local node's ID
contact = lbrynet.dht.contact.Contact(self.nodeID, '127.0.0.1', 91824, self.protocol)
# Now try to add it
@ -91,7 +101,7 @@ class TreeRoutingTableTest(unittest.TestCase):
def testRemoveContact(self):
""" Tests contact removal """
# Create the contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
@ -105,54 +115,73 @@ class TreeRoutingTableTest(unittest.TestCase):
def testSplitBucket(self):
""" Tests if the the routing table correctly dynamically splits k-buckets """
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'Initial k-bucket range should be 0 <= range < 2**160')
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**384,
'Initial k-bucket range should be 0 <= range < 2**384')
# Add k contacts
for i in range(lbrynet.dht.constants.k):
h = hashlib.sha1()
h = hashlib.sha384()
h.update('remote node %d' % i)
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, but should not yet be split')
self.failUnlessEqual(len(self.routingTable._buckets), 1,
'Only k nodes have been added; the first k-bucket should now '
'be full, but should not yet be split')
# Now add 1 more contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('yet another remote node')
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 2, 'k+1 nodes have been added; the first k-bucket should have been split into two new buckets')
self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'K-bucket was split, but its range was not properly adjusted')
self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**160, 'K-bucket was split, but the second (new) bucket\'s max range was not set properly')
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, self.routingTable._buckets[1].rangeMin, 'K-bucket was split, but the min/max ranges were not divided properly')
self.failUnlessEqual(len(self.routingTable._buckets), 2,
'k+1 nodes have been added; the first k-bucket should have been '
'split into two new buckets')
self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**384,
'K-bucket was split, but its range was not properly adjusted')
self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**384,
'K-bucket was split, but the second (new) bucket\'s '
'max range was not set properly')
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax,
self.routingTable._buckets[1].rangeMin,
'K-bucket was split, but the min/max ranges were '
'not divided properly')
def testFullBucketNoSplit(self):
""" Test that a bucket is not split if it full, but does not cover the range containing the parent node's ID """
self.routingTable._parentNodeID = 21*'a' # more than 160 bits; this will not be in the range of _any_ k-bucket
"""
Test that a bucket is not split if it full, but does not cover the range
containing the parent node's ID
"""
self.routingTable._parentNodeID = 49 * 'a'
# more than 384 bits; this will not be in the range of _any_ k-bucket
# Add k contacts
for i in range(lbrynet.dht.constants.k):
h = hashlib.sha1()
h = hashlib.sha384()
h.update('remote node %d' % i)
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, and there should not be more than 1 bucket')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts)))
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; '
'the first k-bucket should now be '
'full, and there should not be '
'more than 1 bucket')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k,
'Bucket should have k contacts; expected %d got %d' %
(lbrynet.dht.constants.k,
len(self.routingTable._buckets[0]._contacts)))
# Now add 1 more contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('yet another remote node')
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'There should not be more than 1 bucket, since the bucket should not have been split (parent node ID not in range)')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts)))
self.failIf(contact in self.routingTable._buckets[0]._contacts, 'New contact should have been discarded (since RPC is faked in this test)')
self.failUnlessEqual(len(self.routingTable._buckets), 1,
'There should not be more than 1 bucket, since the bucket '
'should not have been split (parent node ID not in range)')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts),
lbrynet.dht.constants.k, 'Bucket should have k contacts; '
'expected %d got %d' %
(lbrynet.dht.constants.k,
len(self.routingTable._buckets[0]._contacts)))
self.failIf(contact in self.routingTable._buckets[0]._contacts,
'New contact should have been discarded (since RPC is faked in this test)')
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TreeRoutingTableTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -46,6 +46,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
log_format = "%(funcName)s(): %(message)s"
logging.basicConfig(level=logging.CRITICAL, format=log_format)
def require_system(system):
def wrapper(fn):
return fn
@ -115,10 +116,10 @@ class LbryUploader(object):
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir,
lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous)
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(
self.session, stream_info_manager, self.sd_identifier)
@ -218,12 +219,13 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd" + str(n),
node_id="abcd" + str(n),
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager()
@ -330,12 +332,13 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="efgh",
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
if slow is True:
session.rate_limiter.set_ul_limit(2 ** 11)
@ -508,11 +511,11 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous)
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -599,12 +602,12 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1")
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
@ -678,11 +681,12 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd", peer_finder=peer_finder,
node_id="abcd", peer_finder=peer_finder,
hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager,
@ -800,11 +804,12 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd", peer_finder=peer_finder,
node_id="abcd", peer_finder=peer_finder,
hash_announcer=hash_announcer, blob_dir=blob_dir,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter,
wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager()

View file

@ -9,7 +9,6 @@ from lbrynet.core import PeerManager
from lbrynet.core import RateLimiter
from lbrynet.core import Session
from lbrynet.core import StreamDescriptor
from lbrynet.dht.node import Node
from lbrynet.lbry_file import EncryptedFileMetadataManager
from lbrynet.lbry_file.client import EncryptedFileOptions
from lbrynet.file_manager import EncryptedFileCreator
@ -18,6 +17,7 @@ from lbrynet.file_manager import EncryptedFileManager
from lbrynet.tests import mocks
from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir
class TestReflector(unittest.TestCase):
def setUp(self):
mocks.mock_conf_settings(self)
@ -57,7 +57,7 @@ class TestReflector(unittest.TestCase):
self.session = Session.Session(
conf.settings['data_rate'],
db_dir=self.db_dir,
lbryid="abcd",
node_id="abcd",
peer_finder=peer_finder,
hash_announcer=hash_announcer,
blob_dir=self.blob_dir,
@ -66,7 +66,7 @@ class TestReflector(unittest.TestCase):
rate_limiter=rate_limiter,
wallet=wallet,
blob_tracker_class=mocks.BlobAvailabilityTracker,
dht_node_class=Node
external_ip="127.0.0.1"
)
self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(

View file

@ -72,12 +72,12 @@ class TestStreamify(TestCase):
os.mkdir(blob_dir)
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=self.is_generous
is_generous=self.is_generous, external_ip="127.0.0.1"
)
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -128,11 +128,11 @@ class TestStreamify(TestCase):
os.mkdir(blob_dir)
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker
blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1"
)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)

View file

@ -18,6 +18,7 @@ def shell_command(command):
FNULL = open(os.devnull, 'w')
p = subprocess.Popen(command,shell=False,stdout=FNULL,stderr=subprocess.STDOUT)
def lbrynet_cli(commands):
cli_cmd=['lbrynet-cli']
for cmd in commands:
@ -65,7 +66,6 @@ class TestIntegration(unittest.TestCase):
out = json.loads(out)
self.assertTrue(out['is_running'])
def test_cli_docopts(self):
out,err = lbrynet_cli(['cli_test_command'])
self.assertEqual('',out)
@ -83,7 +83,6 @@ class TestIntegration(unittest.TestCase):
out = json.loads(out)
self.assertEqual([1,[],1,None,False,False], out)
out,err = lbrynet_cli(['cli_test_command','1', '--pos_arg2=2','--pos_arg3=3'])
out = json.loads(out)
self.assertEqual([1,[],2,3,False,False], out)
@ -93,7 +92,6 @@ class TestIntegration(unittest.TestCase):
# TODO: variable length arguments don't have guess_type() on them
self.assertEqual([1,['2','3'],None,None,False,False], out)
out,err = lbrynet_cli(['cli_test_command','1','-a'])
out = json.loads(out)
self.assertEqual([1,[],None,None,True,False], out)
@ -102,13 +100,10 @@ class TestIntegration(unittest.TestCase):
out = json.loads(out)
self.assertEqual([1,[],None,None,True,False], out)
out,err = lbrynet_cli(['cli_test_command','1','-a','-b'])
out = json.loads(out)
self.assertEqual([1,[],None,None,True,True], out)
def test_status(self):
out = lbrynet.status()
self.assertTrue(out['is_running'])

View file

@ -15,6 +15,10 @@ from lbrynet.tests.mocks import BlobAvailabilityTracker as DummyBlobAvailability
from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed
import logging
logging.getLogger("lbryum").setLevel(logging.WARNING)
def get_test_daemon(data_rate=None, generous=True, with_fee=False):
if data_rate is None:
data_rate = conf.ADJUSTABLE_SETTINGS['data_rate'][1]
@ -68,7 +72,6 @@ class TestCostEst(unittest.TestCase):
size = 10000000
correct_result = 4.5
daemon = get_test_daemon(generous=True, with_fee=True)
print daemon.get_est_cost("test", size)
self.assertEquals(daemon.get_est_cost("test", size).result, correct_result)
def test_fee_and_ungenerous_data(self):

104
scripts/dht_monitor.py Normal file
View file

@ -0,0 +1,104 @@
import curses
import time
from jsonrpc.proxy import JSONRPCProxy
import logging
log = logging.getLogger(__name__)
log.addHandler(logging.FileHandler("dht contacts.log"))
# log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
stdscr = curses.initscr()
api = JSONRPCProxy.from_url("http://localhost:5279")
def init_curses():
curses.noecho()
curses.cbreak()
stdscr.nodelay(1)
stdscr.keypad(1)
def teardown_curses():
curses.nocbreak()
stdscr.keypad(0)
curses.echo()
curses.endwin()
def refresh(last_contacts, last_blobs):
height, width = stdscr.getmaxyx()
try:
routing_table_info = api.routing_table_get()
node_id = routing_table_info['node_id']
except:
node_id = "UNKNOWN"
routing_table_info = {
'buckets': {},
'contacts': [],
'blob_hashes': []
}
for y in range(height):
stdscr.addstr(y, 0, " " * (width - 1))
buckets = routing_table_info['buckets']
stdscr.addstr(0, 0, "node id: %s" % node_id)
stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" %
(len(buckets), len(routing_table_info['contacts']),
len(routing_table_info['blob_hashes'])))
y = 3
for i in sorted(buckets.keys()):
stdscr.addstr(y, 0, "bucket %s" % i)
y += 1
for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')):
stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['node_id'], h['address'],
len(h['blobs'])))
y += 1
y += 1
new_contacts = set(routing_table_info['contacts']) - last_contacts
lost_contacts = last_contacts - set(routing_table_info['contacts'])
if new_contacts:
for c in new_contacts:
log.debug("added contact %s", c)
if lost_contacts:
for c in lost_contacts:
log.info("lost contact %s", c)
new_blobs = set(routing_table_info['blob_hashes']) - last_blobs
lost_blobs = last_blobs - set(routing_table_info['blob_hashes'])
if new_blobs:
for c in new_blobs:
log.debug("added blob %s", c)
if lost_blobs:
for c in lost_blobs:
log.info("lost blob %s", c)
stdscr.addstr(y + 1, 0, str(time.time()))
stdscr.refresh()
return set(routing_table_info['contacts']), set(routing_table_info['blob_hashes'])
def do_main():
c = None
last_contacts, last_blobs = set(), set()
while c not in [ord('q'), ord('Q')]:
last_contacts, last_blobs = refresh(last_contacts, last_blobs)
c = stdscr.getch()
time.sleep(0.1)
def main():
try:
init_curses()
do_main()
finally:
teardown_curses()
if __name__ == "__main__":
main()

View file

@ -22,7 +22,7 @@ def join_network(udp_port, known_nodes):
lbryid = generate_id()
log.info('Creating node')
node = Node(udpPort=udp_port, lbryid=lbryid)
node = Node(udpPort=udp_port, node_id=lbryid)
log.info('Joining network')
yield node.joinNetwork(known_nodes)

View file

@ -150,7 +150,7 @@ if __name__ == '__main__':
# If you wish to have a pure Kademlia network, use the
# entangled.kademlia.node.Node class instead
print 'Creating Node'
node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid)
node = Node(udpPort=int(sys.argv[1]), node_id=lbryid)
# Schedule the node to join the Kademlia/Entangled DHT
node.joinNetwork(knownNodes)

View file

@ -51,7 +51,7 @@ def main(args=None):
session = Session.Session(
0,
db_dir=db_dir,
lbryid=utils.generate_id(),
node_id=utils.generate_id(),
blob_dir=blob_dir,
dht_node_port=4444,
known_dht_nodes=conf.settings['known_dht_nodes'],

View file

@ -1,82 +1,214 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# Thanks to Paul Cannon for IP-address resolution functions (taken from aspn.activestate.com)
"""
Launch a DHT node which can respond to RPC commands.
"""
import logging
import requests
import miniupnpc
import argparse
from lbrynet.dht.node import Node
from txjsonrpc.web import jsonrpc
from twisted.web import server
from copy import deepcopy
from twisted.internet import reactor, defer
from twisted.web import resource
from twisted.web.server import Site
from lbrynet import conf
from lbrynet.core.log_support import configure_console
from lbrynet.dht.error import TimeoutError
conf.initialize_settings()
log = logging.getLogger("dht tool")
configure_console()
log.setLevel(logging.INFO)
from lbrynet.dht.node import Node
from lbrynet.dht.contact import Contact
from lbrynet.daemon.auth.server import AuthJSONRPCServer
from lbrynet.core.utils import generate_id
def get_external_ip_and_setup_upnp():
try:
u = miniupnpc.UPnP()
u.discoverdelay = 200
u.discover()
u.selectigd()
if u.getspecificportmapping(4444, "UDP"):
u.deleteportmapping(4444, "UDP")
log.info("Removed UPnP redirect for UDP 4444.")
u.addportmapping(4444, 'UDP', u.lanaddr, 4444, 'LBRY DHT port', '')
log.info("got external ip from upnp")
return u.externalipaddress()
except Exception:
log.exception("derp")
r = requests.get('https://api.ipify.org', {'format': 'json'})
log.info("got external ip from ipify.org")
return r.json()['ip']
class RPCNode(jsonrpc.JSONRPC):
def __init__(self, node, shut_down_cb):
jsonrpc.JSONRPC.__init__(self)
self.node = node
self.shut_down_cb = shut_down_cb
class NodeRPC(AuthJSONRPCServer):
def __init__(self, lbryid, seeds, node_port, rpc_port):
AuthJSONRPCServer.__init__(self, False)
self.root = None
self.port = None
self.seeds = seeds
self.node_port = node_port
self.rpc_port = rpc_port
if lbryid:
lbryid = lbryid.decode('hex')
else:
lbryid = generate_id()
self.node_id = lbryid
self.external_ip = get_external_ip_and_setup_upnp()
self.node_port = node_port
def jsonrpc_total_dht_nodes(self):
return self.node.getApproximateTotalDHTNodes()
@defer.inlineCallbacks
def setup(self):
self.node = Node(node_id=self.node_id, udpPort=self.node_port,
externalIP=self.external_ip)
hosts = []
for hostname, hostport in self.seeds:
host_ip = yield reactor.resolve(hostname)
hosts.append((host_ip, hostport))
log.info("connecting to dht")
yield self.node.joinNetwork(tuple(hosts))
log.info("connected to dht")
if not self.announced_startup:
self.announced_startup = True
self.start_api()
log.info("lbry id: %s (%i bytes)", self.node.node_id.encode('hex'), len(self.node.node_id))
def jsonrpc_total_dht_hashes(self):
return self.node.getApproximateTotalHashes()
def start_api(self):
root = resource.Resource()
root.putChild('', self)
self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost')
log.info("started jsonrpc server")
def jsonrpc_stop(self):
self.shut_down_cb()
return "fine"
@defer.inlineCallbacks
def jsonrpc_node_id_set(self, node_id):
old_id = self.node.node_id
self.node.stop()
del self.node
self.node_id = node_id.decode('hex')
yield self.setup()
msg = "changed dht id from %s to %s" % (old_id.encode('hex'),
self.node.node_id.encode('hex'))
defer.returnValue(msg)
def jsonrpc_node_id_get(self):
return self._render_response(self.node.node_id.encode('hex'))
@defer.inlineCallbacks
def jsonrpc_peer_find(self, node_id):
node_id = node_id.decode('hex')
contact = yield self.node.findContact(node_id)
result = None
if contact:
result = (contact.address, contact.port)
defer.returnValue(result)
@defer.inlineCallbacks
def jsonrpc_peer_list_for_blob(self, blob_hash):
peers = yield self.node.getPeersForBlob(blob_hash.decode('hex'))
defer.returnValue(peers)
@defer.inlineCallbacks
def jsonrpc_ping(self, node_id):
contact_host = yield self.jsonrpc_peer_find(node_id=node_id)
if not contact_host:
defer.returnValue("failed to find node")
contact_ip, contact_port = contact_host
contact = Contact(node_id.decode('hex'), contact_ip, contact_port, self.node._protocol)
try:
result = yield contact.ping()
except TimeoutError:
self.node.removeContact(contact.id)
self.node._dataStore.removePeer(contact.id)
result = {'error': 'timeout'}
defer.returnValue(result)
def get_routing_table(self):
result = {}
data_store = deepcopy(self.node._dataStore._dict)
datastore_len = len(data_store)
hosts = {}
missing_contacts = []
if datastore_len:
for k, v in data_store.iteritems():
for value, lastPublished, originallyPublished, originalPublisherID in v:
try:
contact = self.node._routingTable.getContact(originalPublisherID)
except ValueError:
if originalPublisherID.encode('hex') not in missing_contacts:
missing_contacts.append(originalPublisherID.encode('hex'))
continue
if contact in hosts:
blobs = hosts[contact]
else:
blobs = []
blobs.append(k.encode('hex'))
hosts[contact] = blobs
contact_set = []
blob_hashes = []
result['buckets'] = {}
for i in range(len(self.node._routingTable._buckets)):
for contact in self.node._routingTable._buckets[i]._contacts:
contacts = result['buckets'].get(i, [])
if contact in hosts:
blobs = hosts[contact]
del hosts[contact]
else:
blobs = []
host = {
"address": contact.address,
"id": contact.id.encode("hex"),
"blobs": blobs,
}
for blob_hash in blobs:
if blob_hash not in blob_hashes:
blob_hashes.append(blob_hash)
contacts.append(host)
result['buckets'][i] = contacts
contact_set.append(contact.id.encode("hex"))
if hosts:
result['datastore extra'] = [
{
"id": host.id.encode('hex'),
"blobs": hosts[host],
}
for host in hosts]
result['missing contacts'] = missing_contacts
result['contacts'] = contact_set
result['blob hashes'] = blob_hashes
result['node id'] = self.node_id.encode('hex')
return result
def jsonrpc_routing_table_get(self):
return self._render_response(self.get_routing_table())
def main():
parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands")
parser.add_argument("node_port",
parser.add_argument("--node_port",
help=("The UDP port on which the node will listen for connections "
"from other dht nodes"),
type=int)
parser.add_argument("rpc_port",
type=int, default=4444)
parser.add_argument("--rpc_port",
help="The TCP port on which the node will listen for rpc commands",
type=int)
parser.add_argument("dht_bootstrap_host",
type=int, default=5280)
parser.add_argument("--bootstrap_host",
help="The IP of a DHT node to be used to bootstrap into the network",
nargs='?')
parser.add_argument("dht_bootstrap_port",
default='lbrynet1.lbry.io')
parser.add_argument("--node_id",
help="The IP of a DHT node to be used to bootstrap into the network",
default=None)
parser.add_argument("--bootstrap_port",
help="The port of a DHT node to be used to bootstrap into the network",
nargs='?', default=4000, type=int)
parser.add_argument("--rpc_ip_address",
help="The network interface on which to listen for rpc connections",
default="127.0.0.1")
default=4444, type=int)
args = parser.parse_args()
def start_rpc():
rpc_node = RPCNode(node, shut_down)
reactor.listenTCP(args.rpc_port, server.Site(rpc_node), interface=args.rpc_ip_address)
def shut_down():
d = defer.maybeDeferred(node.stop)
d.addBoth(lambda _: reactor.stop())
return d
known_nodes = []
if args.dht_bootstrap_host:
known_nodes.append((args.dht_bootstrap_host, args.dht_bootstrap_port))
node = Node(udpPort=args.node_port)
node.joinNetwork(known_nodes)
d = node._joinDeferred
d.addCallback(lambda _: start_rpc())
seeds = [(args.bootstrap_host, args.bootstrap_port)]
server = NodeRPC(args.node_id, seeds, args.node_port, args.rpc_port)
reactor.addSystemEventTrigger('after', 'startup', server.setup)
reactor.run()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -53,7 +53,7 @@ def main(args=None):
session = Session.Session(
blob_data_payment_rate=0,
db_dir=db_dir,
lbryid=utils.generate_id(),
node_id=utils.generate_id(),
blob_dir=blob_dir,
dht_node_port=4444,
known_dht_nodes=conf.settings['known_dht_nodes'],