From 4a567f7ab12527d89999f47f47e411f107e5cc00 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:08:22 -0400 Subject: [PATCH 01/21] organize dht errors and interfaces --- lbrynet/dht/contact.py | 10 ---- lbrynet/dht/datastore.py | 28 ++------- lbrynet/dht/delay.py | 22 +++++++ lbrynet/dht/encoding.py | 15 +---- lbrynet/dht/error.py | 31 ++++++++++ lbrynet/dht/interface.py | 114 ++++++++++++++++++++++++++++++++++++ lbrynet/dht/kbucket.py | 14 +---- lbrynet/dht/protocol.py | 44 +------------- lbrynet/dht/routingtable.py | 98 +++---------------------------- 9 files changed, 182 insertions(+), 194 deletions(-) create mode 100644 lbrynet/dht/delay.py create mode 100644 lbrynet/dht/error.py create mode 100644 lbrynet/dht/interface.py diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index 6109d9f9a..cba054e0d 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -1,13 +1,3 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - - class Contact(object): """ Encapsulation for remote contact diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index bdaf47644..d67401240 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -1,33 +1,13 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - import UserDict import time import constants +from interface import IDataStore +from zope.interface import implements -class DataStore(UserDict.DictMixin): - """ Interface for classes implementing physical storage (for data - published via the "STORE" RPC) for the Kademlia DHT - - @note: This provides an interface for a dict-like object - """ - - def keys(self): - """ Return a list of the keys in this data store """ - - def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): - pass - - -class DictDataStore(DataStore): +class DictDataStore(UserDict.DictMixin): """ A datastore using an in-memory Python dictionary """ + implements(IDataStore) def __init__(self): # Dictionary format: diff --git a/lbrynet/dht/delay.py b/lbrynet/dht/delay.py new file mode 100644 index 000000000..9610a73f8 --- /dev/null +++ b/lbrynet/dht/delay.py @@ -0,0 +1,22 @@ +import time + + +class Delay(object): + maxToSendDelay = 10 ** -3 # 0.05 + minToSendDelay = 10 ** -5 # 0.01 + + def __init__(self, start=0): + self._next = start + + # TODO: explain why this logic is like it is. And add tests that + # show that it actually does what it needs to do. + def __call__(self): + ts = time.time() + delay = 0 + if ts >= self._next: + delay = self.minToSendDelay + self._next = ts + self.minToSendDelay + else: + delay = (self._next - ts) + self.maxToSendDelay + self._next += self.maxToSendDelay + return delay diff --git a/lbrynet/dht/encoding.py b/lbrynet/dht/encoding.py index bc7e88ca0..45aeb3496 100644 --- a/lbrynet/dht/encoding.py +++ b/lbrynet/dht/encoding.py @@ -1,17 +1,4 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - - -class DecodeError(Exception): - """ Should be raised by an C{Encoding} implementation if decode operation - fails - """ +from error import DecodeError class Encoding(object): diff --git a/lbrynet/dht/error.py b/lbrynet/dht/error.py new file mode 100644 index 000000000..78daee46b --- /dev/null +++ b/lbrynet/dht/error.py @@ -0,0 +1,31 @@ +import binascii + + +class DecodeError(Exception): + """ + Should be raised by an C{Encoding} implementation if decode operation + fails + """ + pass + + +class BucketFull(Exception): + """ + Raised when the bucket is full + """ + pass + + +class UnknownRemoteException(Exception): + pass + + +class TimeoutError(Exception): + """ Raised when a RPC times out """ + + def __init__(self, remote_contact_id): + # remote_contact_id is a binary blob so we need to convert it + # into something more readable + msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id)) + Exception.__init__(self, msg) + self.remote_contact_id = remote_contact_id diff --git a/lbrynet/dht/interface.py b/lbrynet/dht/interface.py new file mode 100644 index 000000000..0648d6d21 --- /dev/null +++ b/lbrynet/dht/interface.py @@ -0,0 +1,114 @@ +from zope.interface import Interface + + +class IDataStore(Interface): + """ Interface for classes implementing physical storage (for data + published via the "STORE" RPC) for the Kademlia DHT + + @note: This provides an interface for a dict-like object + """ + + def keys(self): + """ Return a list of the keys in this data store """ + pass + + def removeExpiredPeers(self): + pass + + def hasPeersForBlob(self, key): + pass + + def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): + pass + + def getPeersForBlob(self, key): + pass + + +class IRoutingTable(Interface): + """ Interface for RPC message translators/formatters + + Classes inheriting from this should provide a suitable routing table for + a parent Node object (i.e. the local entity in the Kademlia network) + """ + + def __init__(self, parentNodeID): + """ + @param parentNodeID: The n-bit node ID of the node to which this + routing table belongs + @type parentNodeID: str + """ + + def addContact(self, contact): + """ Add the given contact to the correct k-bucket; if it already + exists, its status will be updated + + @param contact: The contact to add to this node's k-buckets + @type contact: kademlia.contact.Contact + """ + + def findCloseNodes(self, key, count, _rpcNodeID=None): + """ Finds a number of known nodes closest to the node/value with the + specified key. + + @param key: the n-bit key (i.e. the node or value ID) to search for + @type key: str + @param count: the amount of contacts to return + @type count: int + @param _rpcNodeID: Used during RPC, this is be the sender's Node ID + Whatever ID is passed in the paramater will get + excluded from the list of returned contacts. + @type _rpcNodeID: str + + @return: A list of node contacts (C{kademlia.contact.Contact instances}) + closest to the specified key. + This method will return C{k} (or C{count}, if specified) + contacts if at all possible; it will only return fewer if the + node is returning all of the contacts that it knows of. + @rtype: list + """ + + def getContact(self, contactID): + """ Returns the (known) contact with the specified node ID + + @raise ValueError: No contact with the specified contact ID is known + by this node + """ + + def getRefreshList(self, startIndex=0, force=False): + """ Finds all k-buckets that need refreshing, starting at the + k-bucket with the specified index, and returns IDs to be searched for + in order to refresh those k-buckets + + @param startIndex: The index of the bucket to start refreshing at; + this bucket and those further away from it will + be refreshed. For example, when joining the + network, this node will set this to the index of + the bucket after the one containing it's closest + neighbour. + @type startIndex: index + @param force: If this is C{True}, all buckets (in the specified range) + will be refreshed, regardless of the time they were last + accessed. + @type force: bool + + @return: A list of node ID's that the parent node should search for + in order to refresh the routing Table + @rtype: list + """ + + def removeContact(self, contactID): + """ Remove the contact with the specified node ID from the routing + table + + @param contactID: The node ID of the contact to remove + @type contactID: str + """ + + def touchKBucket(self, key): + """ Update the "last accessed" timestamp of the k-bucket which covers + the range containing the specified key in the key/ID space + + @param key: A key in the range of the target k-bucket + @type key: str + """ diff --git a/lbrynet/dht/kbucket.py b/lbrynet/dht/kbucket.py index 227fec409..ead763895 100644 --- a/lbrynet/dht/kbucket.py +++ b/lbrynet/dht/kbucket.py @@ -1,17 +1,5 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - import constants - - -class BucketFull(Exception): - """ Raised when the bucket is full """ +from error import BucketFull class KBucket(object): diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index ba7aba586..17c1f2dd1 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -1,14 +1,4 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - import logging -import binascii import time import socket import errno @@ -21,42 +11,12 @@ import encoding import msgtypes import msgformat from contact import Contact +from error import UnknownRemoteException, TimeoutError +from delay import Delay log = logging.getLogger(__name__) -class TimeoutError(Exception): - """ Raised when a RPC times out """ - - def __init__(self, remote_contact_id): - # remote_contact_id is a binary blob so we need to convert it - # into something more readable - msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id)) - Exception.__init__(self, msg) - self.remote_contact_id = remote_contact_id - - -class Delay(object): - maxToSendDelay = 10 ** -3 # 0.05 - minToSendDelay = 10 ** -5 # 0.01 - - def __init__(self, start=0): - self._next = start - - # TODO: explain why this logic is like it is. And add tests that - # show that it actually does what it needs to do. - def __call__(self): - ts = time.time() - delay = 0 - if ts >= self._next: - delay = self.minToSendDelay - self._next = ts + self.minToSendDelay - else: - delay = (self._next - ts) + self.maxToSendDelay - self._next += self.maxToSendDelay - return delay - - class KademliaProtocol(protocol.DatagramProtocol): """ Implements all low-level network-related functions of a Kademlia node """ diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 1f6cca926..c03dd0fd0 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -7,102 +7,17 @@ import time import random +from zope.interface import implements import constants import kbucket +from interface import IRoutingTable +from error import TimeoutError +import logging -from protocol import TimeoutError +log = logging.getLogger(__name__) -class RoutingTable(object): - """ Interface for RPC message translators/formatters - - Classes inheriting from this should provide a suitable routing table for - a parent Node object (i.e. the local entity in the Kademlia network) - """ - - def __init__(self, parentNodeID): - """ - @param parentNodeID: The n-bit node ID of the node to which this - routing table belongs - @type parentNodeID: str - """ - - def addContact(self, contact): - """ Add the given contact to the correct k-bucket; if it already - exists, its status will be updated - - @param contact: The contact to add to this node's k-buckets - @type contact: kademlia.contact.Contact - """ - - def findCloseNodes(self, key, count, _rpcNodeID=None): - """ Finds a number of known nodes closest to the node/value with the - specified key. - - @param key: the n-bit key (i.e. the node or value ID) to search for - @type key: str - @param count: the amount of contacts to return - @type count: int - @param _rpcNodeID: Used during RPC, this is be the sender's Node ID - Whatever ID is passed in the paramater will get - excluded from the list of returned contacts. - @type _rpcNodeID: str - - @return: A list of node contacts (C{kademlia.contact.Contact instances}) - closest to the specified key. - This method will return C{k} (or C{count}, if specified) - contacts if at all possible; it will only return fewer if the - node is returning all of the contacts that it knows of. - @rtype: list - """ - - def getContact(self, contactID): - """ Returns the (known) contact with the specified node ID - - @raise ValueError: No contact with the specified contact ID is known - by this node - """ - - def getRefreshList(self, startIndex=0, force=False): - """ Finds all k-buckets that need refreshing, starting at the - k-bucket with the specified index, and returns IDs to be searched for - in order to refresh those k-buckets - - @param startIndex: The index of the bucket to start refreshing at; - this bucket and those further away from it will - be refreshed. For example, when joining the - network, this node will set this to the index of - the bucket after the one containing it's closest - neighbour. - @type startIndex: index - @param force: If this is C{True}, all buckets (in the specified range) - will be refreshed, regardless of the time they were last - accessed. - @type force: bool - - @return: A list of node ID's that the parent node should search for - in order to refresh the routing Table - @rtype: list - """ - - def removeContact(self, contactID): - """ Remove the contact with the specified node ID from the routing - table - - @param contactID: The node ID of the contact to remove - @type contactID: str - """ - - def touchKBucket(self, key): - """ Update the "last accessed" timestamp of the k-bucket which covers - the range containing the specified key in the key/ID space - - @param key: A key in the range of the target k-bucket - @type key: str - """ - - -class TreeRoutingTable(RoutingTable): +class TreeRoutingTable(object): """ This class implements a routing table used by a Node class. The Kademlia routing table is a binary tree whose leaves are k-buckets, @@ -117,6 +32,7 @@ class TreeRoutingTable(RoutingTable): C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ + implements(IRoutingTable) def __init__(self, parentNodeID): """ From f1980f524eac03eec6ca46c69f2b32101e48344e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:09:25 -0400 Subject: [PATCH 02/21] fix raising remote exceptions --- lbrynet/dht/error.py | 7 +++++++ lbrynet/dht/protocol.py | 9 +++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/lbrynet/dht/error.py b/lbrynet/dht/error.py index 78daee46b..3111adf8f 100644 --- a/lbrynet/dht/error.py +++ b/lbrynet/dht/error.py @@ -1,4 +1,11 @@ import binascii +import exceptions + +# this is a dict of {"exceptions.": exception class} items used to raise +# remote built-in exceptions locally +BUILTIN_EXCEPTIONS = { + "exceptions.%s" % e: getattr(exceptions, e) for e in dir(exceptions) if not e.startswith("_") +} class DecodeError(Exception): diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 17c1f2dd1..6ae7b98a3 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -11,7 +11,7 @@ import encoding import msgtypes import msgformat from contact import Contact -from error import UnknownRemoteException, TimeoutError +from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError from delay import Delay log = logging.getLogger(__name__) @@ -246,7 +246,12 @@ class KademliaProtocol(protocol.DatagramProtocol): df.callback((message, address)) elif isinstance(message, msgtypes.ErrorMessage): # The RPC request raised a remote exception; raise it locally - remoteException = Exception(message.response) + if message.exceptionType in BUILTIN_EXCEPTIONS: + exception_type = BUILTIN_EXCEPTIONS[message.exceptionType] + else: + exception_type = UnknownRemoteException + remoteException = exception_type(message.response) + log.error("Remote exception (%s): %s", address, remoteException) df.errback(remoteException) else: # We got a result from the RPC From 8e9f3c90a16d8538a0c2489539a532532b2c03ec Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:10:47 -0400 Subject: [PATCH 03/21] use looping call for Node.change_token to avoid hanging delayedCalls --- lbrynet/dht/node.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 858b03a88..cb1044641 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -12,7 +12,7 @@ import operator import struct import time -from twisted.internet import defer, error, reactor, threads +from twisted.internet import defer, error, reactor, threads, task import constants import routingtable @@ -88,7 +88,7 @@ class Node(object): # operations before the node has finished joining the network) self._joinDeferred = None self.next_refresh_call = None - self.next_change_token_call = None + self.change_token_lc = task.LoopingCall(self.change_token) # Create k-buckets (for storing contacts) if routingTableClass is None: self._routingTable = routingtable.OptimizedTreeRoutingTable(self.id) @@ -103,7 +103,6 @@ class Node(object): # Initialize the data storage mechanism used by this node self.token_secret = self._generateID() self.old_token_secret = None - self.change_token() if dataStore is None: self._dataStore = datastore.DictDataStore() else: @@ -128,9 +127,8 @@ class Node(object): if self.next_refresh_call is not None: self.next_refresh_call.cancel() self.next_refresh_call = None - if self.next_change_token_call is not None: - self.next_change_token_call.cancel() - self.next_change_token_call = None + if self.change_token_lc.running: + self.change_token_lc.stop() if self._listeningPort is not None: self._listeningPort.stopListening() self.hash_watcher.stop() @@ -163,6 +161,10 @@ class Node(object): bootstrapContacts.append(contact) else: bootstrapContacts = None + + # Start the token looping call + self.change_token_lc.start(constants.tokenSecretChangeInterval) + # Initiate the Kademlia joining sequence - perform a search for this node's own ID self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts) # #TODO: Refresh all k-buckets further away than this node's closest neighbour @@ -295,8 +297,6 @@ class Node(object): def change_token(self): self.old_token_secret = self.token_secret self.token_secret = self._generateID() - self.next_change_token_call = reactor.callLater(constants.tokenSecretChangeInterval, - self.change_token) def make_token(self, compact_ip): h = hashlib.new('sha384') From 46e31d5b455560f94ae72969bf7684aad4df27a3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:12:47 -0400 Subject: [PATCH 04/21] getPeersForBlob inlinecallbacks refactor --- lbrynet/dht/node.py | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index cb1044641..b86c5ba63 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -204,28 +204,21 @@ class Node(object): def announceHaveBlob(self, key, port): return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid}) + @defer.inlineCallbacks def getPeersForBlob(self, blob_hash): - def expand_and_filter(result): - expanded_peers = [] - if isinstance(result, dict): - if blob_hash in result: - for peer in result[blob_hash]: - if self.lbryid != peer[6:]: - host = ".".join([str(ord(d)) for d in peer[:4]]) - if host == "127.0.0.1": - if "from_peer" in result: - if result["from_peer"] != "self": - host = result["from_peer"] - port, = struct.unpack('>H', peer[4:6]) + result = yield self.iterativeFindValue(blob_hash) + expanded_peers = [] + if result: + if blob_hash in result: + for peer in result[blob_hash]: + if self.lbryid != peer[6:]: + host = ".".join([str(ord(d)) for d in peer[:4]]) + if host == "127.0.0.1" and "from_peer" in result and result["from_peer"] != "self": + host = result["from_peer"] + port, = struct.unpack('>H', peer[4:6]) + if (host, port) not in expanded_peers: expanded_peers.append((host, port)) - return expanded_peers - - def find_failed(err): - return [] - - d = self.iterativeFindValue(blob_hash) - d.addCallbacks(expand_and_filter, find_failed) - return d + defer.returnValue(expanded_peers) def get_most_popular_hashes(self, num_to_return): return self.hash_watcher.most_popular_hashes(num_to_return) From fcaca05a812567f4840176893cc0c81f1b494f96 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:15:25 -0400 Subject: [PATCH 05/21] fix redundant lbryid --- lbrynet/core/Session.py | 12 ++++++------ lbrynet/daemon/Daemon.py | 6 +++--- lbrynet/dht/node.py | 38 +++++++++++++++++--------------------- lbrynet/dht/protocol.py | 6 +++--- 4 files changed, 29 insertions(+), 33 deletions(-) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 04dbe491c..c50ec28ff 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -37,7 +37,7 @@ class Session(object): """ def __init__(self, blob_data_payment_rate, db_dir=None, - lbryid=None, peer_manager=None, dht_node_port=None, + node_id=None, peer_manager=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, @@ -48,7 +48,7 @@ class Session(object): @param db_dir: The directory in which levelDB files should be stored - @param lbryid: The unique ID of this node + @param node_id: The unique ID of this node @param peer_manager: An object which keeps track of all known peers. If None, a PeerManager will be created @@ -101,7 +101,7 @@ class Session(object): """ self.db_dir = db_dir - self.lbryid = lbryid + self.node_id = node_id self.peer_manager = peer_manager @@ -142,8 +142,8 @@ class Session(object): log.debug("Starting session.") - if self.lbryid is None: - self.lbryid = generate_id() + if self.node_id is None: + self.node_id = generate_id() if self.wallet is None: from lbrynet.core.PTCWallet import PTCWallet @@ -274,7 +274,7 @@ class Session(object): self.dht_node = self.dht_node_class( udpPort=self.dht_node_port, - lbryid=self.lbryid, + node_id=self.node_id, externalIP=self.external_ip ) self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 59e344b25..e2e9df32e 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -206,7 +206,7 @@ class Daemon(AuthJSONRPCServer): # of the daemon, but I don't want to deal with that now self.analytics_manager = analytics_manager - self.lbryid = conf.settings.node_id + self.node_id = conf.settings.node_id self.wallet_user = None self.wallet_password = None @@ -562,7 +562,7 @@ class Daemon(AuthJSONRPCServer): self.session = Session( conf.settings['data_rate'], db_dir=self.db_dir, - lbryid=self.lbryid, + node_id=self.node_id, blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port, known_dht_nodes=conf.settings['known_dht_nodes'], @@ -1054,7 +1054,7 @@ class Daemon(AuthJSONRPCServer): best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None response = { - 'lbry_id': base58.b58encode(self.lbryid), + 'lbry_id': base58.b58encode(self.node_id), 'installation_id': conf.settings.installation_id, 'is_running': self.announced_startup, 'is_first_run': self.session.wallet.is_first_run if has_wallet else None, diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index b86c5ba63..c261a6595 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -49,8 +49,8 @@ class Node(object): application is performed via this class (or a subclass). """ - def __init__(self, id=None, udpPort=4000, dataStore=None, - routingTableClass=None, networkProtocol=None, lbryid=None, + def __init__(self, node_id=None, udpPort=4000, dataStore=None, + routingTableClass=None, networkProtocol=None, externalIP=None): """ @param dataStore: The data store to use. This must be class inheriting @@ -74,11 +74,7 @@ class Node(object): being transmitted. @type networkProtocol: entangled.kademlia.protocol.KademliaProtocol """ - if id != None: - self.id = id - else: - self.id = self._generateID() - self.lbryid = lbryid + self.node_id = node_id or self._generateID() self.port = udpPort self._listeningPort = None # object implementing Twisted # IListeningPort This will contain a deferred created when @@ -91,9 +87,9 @@ class Node(object): self.change_token_lc = task.LoopingCall(self.change_token) # Create k-buckets (for storing contacts) if routingTableClass is None: - self._routingTable = routingtable.OptimizedTreeRoutingTable(self.id) + self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id) else: - self._routingTable = routingTableClass(self.id) + self._routingTable = routingTableClass(self.node_id) # Initialize this node's network access mechanisms if networkProtocol is None: @@ -110,7 +106,7 @@ class Node(object): # Try to restore the node's state... if 'nodeState' in self._dataStore: state = self._dataStore['nodeState'] - self.id = state['id'] + self.node_id = state['id'] for contactTriple in state['closestNodes']: contact = Contact( contactTriple[0], contactTriple[1], contactTriple[2], self._protocol) @@ -166,7 +162,7 @@ class Node(object): self.change_token_lc.start(constants.tokenSecretChangeInterval) # Initiate the Kademlia joining sequence - perform a search for this node's own ID - self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts) + self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) # #TODO: Refresh all k-buckets further away than this node's closest neighbour # Start refreshing k-buckets periodically, if necessary self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, @@ -186,7 +182,7 @@ class Node(object): # get the deepest bucket and the number of contacts in that bucket and multiply it # by the number of equivalently deep buckets in the whole DHT to get a really bad # estimate! - bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)] + bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.node_id)] num_in_bucket = len(bucket._contacts) factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin) return num_in_bucket * factor @@ -202,7 +198,7 @@ class Node(object): return num_in_data_store * self.getApproximateTotalDHTNodes() / 8 def announceHaveBlob(self, key, port): - return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid}) + return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id}) @defer.inlineCallbacks def getPeersForBlob(self, blob_hash): @@ -211,7 +207,7 @@ class Node(object): if result: if blob_hash in result: for peer in result[blob_hash]: - if self.lbryid != peer[6:]: + if self.node_id != peer[6:]: host = ".".join([str(ord(d)) for d in peer[:4]]) if host == "127.0.0.1" and "from_peer" in result and result["from_peer"] != "self": host = result["from_peer"] @@ -258,7 +254,7 @@ class Node(object): result = responseMsg.response if 'token' in result: value['token'] = result['token'] - d = n.store(blob_hash, value, self.id, 0) + d = n.store(blob_hash, value, self.node_id, 0) d.addCallback(log_success) d.addErrback(log_error, n) else: @@ -267,12 +263,12 @@ class Node(object): def requestPeers(contacts): if self.externalIP is not None and len(contacts) >= constants.k: - is_closer = Distance(blob_hash).is_closer(self.id, contacts[-1].id) + is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) if is_closer: contacts.pop() - self.store(blob_hash, value, self_store=True, originalPublisherID=self.id) + self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) elif self.externalIP is not None: - self.store(blob_hash, value, self_store=True, originalPublisherID=self.id) + self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) ds = [] for contact in contacts: known_nodes[contact.id] = contact @@ -456,7 +452,7 @@ class Node(object): raise TypeError, 'No NodeID given. Therefore we can\'t store this node' if self_store is True and self.externalIP: - contact = Contact(self.id, self.externalIP, self.port, None, None) + contact = Contact(self.node_id, self.externalIP, self.port, None, None) compact_ip = contact.compact_ip() elif '_rpcNodeContact' in kwargs: contact = kwargs['_rpcNodeContact'] @@ -583,7 +579,7 @@ class Node(object): if startupShortlist is None: shortlist = self._routingTable.findCloseNodes(key, constants.alpha) - if key != self.id: + if key != self.node_id: # Update the "last accessed" timestamp for the appropriate k-bucket self._routingTable.touchKBucket(key) if len(shortlist) == 0: @@ -679,7 +675,7 @@ class _IterativeFindHelper(object): responseMsg = responseTuple[0] originAddress = responseTuple[1] # tuple: (ip adress, udp port) # Make sure the responding node is valid, and abort the operation if it isn't - if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id: + if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.node_id: return responseMsg.nodeID # Mark this node as active diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 6ae7b98a3..70062e39b 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -155,7 +155,7 @@ class KademliaProtocol(protocol.DatagramProtocol): C{ErrorMessage}). @rtype: twisted.internet.defer.Deferred """ - msg = msgtypes.RequestMessage(self._node.id, method, args) + msg = msgtypes.RequestMessage(self._node.node_id, method, args) msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) @@ -342,7 +342,7 @@ class KademliaProtocol(protocol.DatagramProtocol): def _sendResponse(self, contact, rpcID, response): """ Send a RPC response to the specified contact """ - msg = msgtypes.ResponseMessage(rpcID, self._node.id, response) + msg = msgtypes.ResponseMessage(rpcID, self._node.node_id, response) msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) self._send(encodedMsg, rpcID, (contact.address, contact.port)) @@ -350,7 +350,7 @@ class KademliaProtocol(protocol.DatagramProtocol): def _sendError(self, contact, rpcID, exceptionType, exceptionMessage): """ Send an RPC error message to the specified contact """ - msg = msgtypes.ErrorMessage(rpcID, self._node.id, exceptionType, exceptionMessage) + msg = msgtypes.ErrorMessage(rpcID, self._node.node_id, exceptionType, exceptionMessage) msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) self._send(encodedMsg, rpcID, (contact.address, contact.port)) From 2a47385b62efa4c519f67c7094f8d3f56bc556b5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:16:26 -0400 Subject: [PATCH 06/21] add rpcAttempts and replacementCacheSize constants --- lbrynet/dht/constants.py | 6 ++++++ lbrynet/dht/routingtable.py | 7 ++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/lbrynet/dht/constants.py b/lbrynet/dht/constants.py index 1cee46311..b3d19022b 100644 --- a/lbrynet/dht/constants.py +++ b/lbrynet/dht/constants.py @@ -21,9 +21,15 @@ alpha = 3 #: Maximum number of contacts stored in a bucket; this should be an even number k = 8 +#: Maximum number of contacts stored in the replacement cache +replacementCacheSize = 8 + #: Timeout for network operations (in seconds) rpcTimeout = 5 +# number of rpc attempts to make before a timeout results in the node being removed as a contact +rpcAttempts = 5 + # Delay between iterations of iterative node lookups (for loose parallelism) (in seconds) iterativeLookupDelay = rpcTimeout / 2 diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index c03dd0fd0..636899f32 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -320,10 +320,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable): self._replacementCache[bucketIndex] = [] if contact in self._replacementCache[bucketIndex]: self._replacementCache[bucketIndex].remove(contact) - # TODO: Using k to limit the size of the contact - # replacement cache - maybe define a separate value for - # this in constants.py? - elif len(self._replacementCache[bucketIndex]) >= constants.k: + elif len(self._replacementCache[bucketIndex]) >= constants.replacementCacheSize: self._replacementCache[bucketIndex].pop(0) self._replacementCache[bucketIndex].append(contact) @@ -340,7 +337,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable): except ValueError: return contact.failedRPCs += 1 - if contact.failedRPCs >= 5: + if contact.failedRPCs >= constants.rpcAttempts: self._buckets[bucketIndex].removeContact(contactID) # Replace this stale contact with one from our replacement cache, if we have any if bucketIndex in self._replacementCache: From 3096c89e37330f435471fa4edb090aa030f2a432 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:17:36 -0400 Subject: [PATCH 07/21] don't wrap exceptions with Failure --- lbrynet/dht/protocol.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 70062e39b..c4a3782e8 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -4,7 +4,6 @@ import socket import errno from twisted.internet import protocol, defer, error, reactor, task -from twisted.python import failure import constants import encoding @@ -379,12 +378,12 @@ class KademliaProtocol(protocol.DatagramProtocol): kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} result = func(*args, **kwargs) except Exception, e: - df.errback(failure.Failure(e)) + df.errback(e) else: df.callback(result) else: # No such exposed method - df.errback(failure.Failure(AttributeError('Invalid method: %s' % method))) + df.errback(AttributeError('Invalid method: %s' % method)) def _msgTimeout(self, messageID): """ Called when an RPC request message times out """ @@ -402,7 +401,7 @@ class KademliaProtocol(protocol.DatagramProtocol): # The message's destination node is now considered to be dead; # raise an (asynchronous) TimeoutError exception and update the host node self._node.removeContact(remoteContactID) - df.errback(failure.Failure(TimeoutError(remoteContactID))) + df.errback(TimeoutError(remoteContactID)) def _msgTimeoutInProgress(self, messageID, remoteContactID, df): # See if any progress has been made; if not, kill the message @@ -414,7 +413,7 @@ class KademliaProtocol(protocol.DatagramProtocol): # No progress has been made del self._partialMessagesProgress[messageID] del self._partialMessages[messageID] - df.errback(failure.Failure(TimeoutError(remoteContactID))) + df.errback(TimeoutError(remoteContactID)) def _hasProgressBeenMade(self, messageID): return ( From fe2d6bad1b2fe7ae7dbc67555fa3ebbe9da3be30 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:18:00 -0400 Subject: [PATCH 08/21] fix logging error for dht rpc methods with no args (ping) --- lbrynet/dht/protocol.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index c4a3782e8..974c45dfe 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -158,7 +158,10 @@ class KademliaProtocol(protocol.DatagramProtocol): msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) - log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex')) + if args: + log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex')) + else: + log.debug("DHT SEND CALL %s", method) df = defer.Deferred() if rawResponse: @@ -372,8 +375,12 @@ class KademliaProtocol(protocol.DatagramProtocol): func = getattr(self._node, method, None) if callable(func) and hasattr(func, 'rpcmethod'): # Call the exposed Node method and return the result to the deferred callback chain - log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'), - senderContact.address, senderContact.port) + if args: + log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'), + senderContact.address, senderContact.port) + else: + log.debug("DHT RECV CALL %s %s:%i", method, senderContact.address, + senderContact.port) try: kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} result = func(*args, **kwargs) From a942e6f3eb758a1ab97f83e81521dda933b65b7e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:18:38 -0400 Subject: [PATCH 09/21] fix args for ping() --- lbrynet/dht/protocol.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 974c45dfe..96ffd3140 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -382,9 +382,13 @@ class KademliaProtocol(protocol.DatagramProtocol): log.debug("DHT RECV CALL %s %s:%i", method, senderContact.address, senderContact.port) try: - kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} - result = func(*args, **kwargs) + if method != 'ping': + kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} + result = func(*args, **kwargs) + else: + result = func() except Exception, e: + log.exception("error handling request for %s: %s", senderContact.address, method) df.errback(e) else: df.callback(result) From 7c50e26bd7acf69dd03ea86f724f7c6c7a045e83 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:19:03 -0400 Subject: [PATCH 10/21] include method and args in _sentMessages --- lbrynet/dht/protocol.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 96ffd3140..1608ae6eb 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -171,7 +171,7 @@ class KademliaProtocol(protocol.DatagramProtocol): timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id) # Transmit the data self._send(encodedMsg, msg.id, (contact.address, contact.port)) - self._sentMessages[msg.id] = (contact.id, df, timeoutCall) + self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args) return df def startProtocol(self): @@ -403,10 +403,10 @@ class KademliaProtocol(protocol.DatagramProtocol): # This should never be reached log.error("deferred timed out, but is not present in sent messages list!") return - remoteContactID, df = self._sentMessages[messageID][0:2] + remoteContactID, df, timeout_call, method, args = self._sentMessages[messageID] if self._partialMessages.has_key(messageID): # We are still receiving this message - self._msgTimeoutInProgress(messageID, remoteContactID, df) + self._msgTimeoutInProgress(messageID, remoteContactID, df, method, args) return del self._sentMessages[messageID] # The message's destination node is now considered to be dead; @@ -414,12 +414,12 @@ class KademliaProtocol(protocol.DatagramProtocol): self._node.removeContact(remoteContactID) df.errback(TimeoutError(remoteContactID)) - def _msgTimeoutInProgress(self, messageID, remoteContactID, df): + def _msgTimeoutInProgress(self, messageID, remoteContactID, df, method, args): # See if any progress has been made; if not, kill the message if self._hasProgressBeenMade(messageID): # Reset the RPC timeout timer timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, messageID) - self._sentMessages[messageID] = (remoteContactID, df, timeoutCall) + self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args) else: # No progress has been made del self._partialMessagesProgress[messageID] From ab956d4a8ec238de492866627e6b83d16f12910f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:20:19 -0400 Subject: [PATCH 11/21] use count parameter in findCloseNodes --- lbrynet/dht/node.py | 3 ++- lbrynet/dht/routingtable.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c261a6595..7ea3d3947 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -578,11 +578,12 @@ class Node(object): findValue = rpc != 'findNode' if startupShortlist is None: - shortlist = self._routingTable.findCloseNodes(key, constants.alpha) + shortlist = self._routingTable.findCloseNodes(key, constants.k) if key != self.node_id: # Update the "last accessed" timestamp for the appropriate k-bucket self._routingTable.touchKBucket(key) if len(shortlist) == 0: + log.warning("This node doesnt know any other nodes") # This node doesn't know of any other nodes fakeDf = defer.Deferred() fakeDf.callback([]) diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 636899f32..5ac8cd129 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -131,7 +131,7 @@ class TreeRoutingTable(object): canGoLower = bucketIndex - i >= 0 canGoHigher = bucketIndex + i < len(self._buckets) # Fill up the node list to k nodes, starting with the closest neighbouring nodes known - while len(closestNodes) < constants.k and (canGoLower or canGoHigher): + while len(closestNodes) < min(count, constants.k) and (canGoLower or canGoHigher): # TODO: this may need to be optimized if canGoLower: closestNodes.extend( @@ -140,8 +140,8 @@ class TreeRoutingTable(object): canGoLower = bucketIndex - (i + 1) >= 0 if canGoHigher: closestNodes.extend( - self._buckets[bucketIndex + i].getContacts( - constants.k - len(closestNodes), _rpcNodeID)) + self._buckets[bucketIndex + i].getContacts(constants.k - len(closestNodes), + _rpcNodeID)) canGoHigher = bucketIndex + (i + 1) < len(self._buckets) i += 1 return closestNodes From de1dc507ac8564ec8ff318536194d88e192aee55 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:21:06 -0400 Subject: [PATCH 12/21] fix findCloseNodes when buckets aren't fully populated --- lbrynet/dht/routingtable.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 5ac8cd129..6564b3647 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -123,7 +123,11 @@ class TreeRoutingTable(object): @rtype: list """ bucketIndex = self._kbucketIndex(key) - closestNodes = self._buckets[bucketIndex].getContacts(constants.k, _rpcNodeID) + + if bucketIndex < len(self._buckets): + closestNodes = self._buckets[bucketIndex].getContacts(count, _rpcNodeID) + else: + closestNodes = [] # This method must return k contacts (even if we have the node # with the specified key as node ID), unless there is less # than k remote nodes in the routing table From 23ce278f8aa18ec6aa62029b55bc701f7c0d6fe5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:27:44 -0400 Subject: [PATCH 13/21] add removePeer to interface --- lbrynet/dht/datastore.py | 6 ++++++ lbrynet/dht/interface.py | 3 +++ 2 files changed, 9 insertions(+) diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index d67401240..a53942455 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -44,3 +44,9 @@ class DictDataStore(UserDict.DictMixin): def getPeersForBlob(self, key): if key in self._dict: return [val[0] for val in self._dict[key]] + + def removePeer(self, value): + for key in self._dict: + self._dict[key] = [val for val in self._dict[key] if val[0] != value] + if not self._dict[key]: + del self._dict[key] diff --git a/lbrynet/dht/interface.py b/lbrynet/dht/interface.py index 0648d6d21..67b4984a5 100644 --- a/lbrynet/dht/interface.py +++ b/lbrynet/dht/interface.py @@ -24,6 +24,9 @@ class IDataStore(Interface): def getPeersForBlob(self, key): pass + def removePeer(self, key): + pass + class IRoutingTable(Interface): """ Interface for RPC message translators/formatters From 5937ead17c069d15328015d99e732b1f6ad3d71e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:28:57 -0400 Subject: [PATCH 14/21] add Node.contacts helper property --- lbrynet/dht/node.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 7ea3d3947..fdc476a83 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -171,6 +171,14 @@ class Node(object): self.hash_watcher.tick() yield self._joinDeferred + @property + def contacts(self): + def _inner(): + for i in range(len(self._routingTable._buckets)): + for contact in self._routingTable._buckets[i]._contacts: + yield contact + return list(_inner()) + def printContacts(self, *args): print '\n\nNODE CONTACTS\n===============' for i in range(len(self._routingTable._buckets)): From c9515f8fb6d663a133ec52031a68a56579eac425 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:29:29 -0400 Subject: [PATCH 15/21] logging and whitespace --- lbrynet/dht/node.py | 4 +++- lbrynet/dht/protocol.py | 2 +- lbrynet/dht/routingtable.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index fdc476a83..eca24d3c6 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -182,8 +182,9 @@ class Node(object): def printContacts(self, *args): print '\n\nNODE CONTACTS\n===============' for i in range(len(self._routingTable._buckets)): + print "bucket %i" % i for contact in self._routingTable._buckets[i]._contacts: - print contact + print " %s:%i" % (contact.address, contact.port) print '==================================' def getApproximateTotalDHTNodes(self): @@ -510,6 +511,7 @@ class Node(object): node is returning all of the contacts that it knows of. @rtype: list """ + # Get the sender's ID (if any) if '_rpcNodeID' in kwargs: rpc_sender_id = kwargs['_rpcNodeID'] diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 1608ae6eb..fa6670c66 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -399,7 +399,7 @@ class KademliaProtocol(protocol.DatagramProtocol): def _msgTimeout(self, messageID): """ Called when an RPC request message times out """ # Find the message that timed out - if not self._sentMessages.has_key(messageID): + if messageID not in self._sentMessages: # This should never be reached log.error("deferred timed out, but is not present in sent messages list!") return diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 6564b3647..86af043dc 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -84,7 +84,7 @@ class TreeRoutingTable(object): @type failure: twisted.python.failure.Failure """ failure.trap(TimeoutError) - print '==replacing contact==' + log.warning('==replacing contact==') # Remove the old contact... deadContactID = failure.getErrorMessage() try: From 9919fd06c6bda12179b6370517ba7b2abdafd6bb Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:29:54 -0400 Subject: [PATCH 16/21] uncomment error for store request where contact is not known --- lbrynet/dht/node.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index eca24d3c6..d82dc79fa 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -467,8 +467,7 @@ class Node(object): contact = kwargs['_rpcNodeContact'] compact_ip = contact.compact_ip() else: - return 'Not OK' - # raise TypeError, 'No contact info available' + raise TypeError, 'No contact info available' if ((self_store is False) and ('token' not in value or not self.verify_token(value['token'], compact_ip))): From e9fd8eb0969a6deadf181f266355eb7a85bf0d81 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:31:07 -0400 Subject: [PATCH 17/21] update tests and scripts --- lbrynet/dht/node.py | 3 +- lbrynet/tests/functional/test_misc.py | 33 ++- lbrynet/tests/functional/test_reflector.py | 6 +- lbrynet/tests/functional/test_streamify.py | 8 +- lbrynet/tests/integration/test_integration.py | 7 +- .../tests/unit/lbrynet_daemon/test_Daemon.py | 5 +- scripts/dht_monitor.py | 103 +++++++ scripts/dht_scripts.py | 2 +- scripts/dhttest.py | 2 +- scripts/query_available_blobs.py | 2 +- scripts/rpc_node.py | 252 +++++++++++++----- scripts/send_sd_blobs_to_lighthouse.py | 2 +- 12 files changed, 332 insertions(+), 93 deletions(-) create mode 100644 scripts/dht_monitor.py diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index d82dc79fa..bf1436f16 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -218,7 +218,8 @@ class Node(object): for peer in result[blob_hash]: if self.node_id != peer[6:]: host = ".".join([str(ord(d)) for d in peer[:4]]) - if host == "127.0.0.1" and "from_peer" in result and result["from_peer"] != "self": + if host == "127.0.0.1" and "from_peer" in result \ + and result["from_peer"] != "self": host = result["from_peer"] port, = struct.unpack('>H', peer[4:6]) if (host, port) not in expanded_peers: diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index 1fa2b2c26..02630821b 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -46,6 +46,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker log_format = "%(funcName)s(): %(message)s" logging.basicConfig(level=logging.CRITICAL, format=log_format) + def require_system(system): def wrapper(fn): return fn @@ -115,10 +116,10 @@ class LbryUploader(object): self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir, - lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous) + dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") stream_info_manager = TempEncryptedFileMetadataManager() self.lbry_file_manager = EncryptedFileManager( self.session, stream_info_manager, self.sd_identifier) @@ -218,12 +219,13 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, db_dir, blob_dir = mk_db_and_blob_dir() session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd" + str(n), + node_id="abcd" + str(n), peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") stream_info_manager = TempEncryptedFileMetadataManager() @@ -330,12 +332,13 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero db_dir, blob_dir = mk_db_and_blob_dir() - session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="efgh", + session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") if slow is True: session.rate_limiter.set_ul_limit(2 ** 11) @@ -508,11 +511,11 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous) + dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -599,12 +602,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) @@ -678,11 +681,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, @@ -800,11 +804,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") self.stream_info_manager = TempEncryptedFileMetadataManager() diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index 8e32d451d..d252986a2 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -9,7 +9,6 @@ from lbrynet.core import PeerManager from lbrynet.core import RateLimiter from lbrynet.core import Session from lbrynet.core import StreamDescriptor -from lbrynet.dht.node import Node from lbrynet.lbry_file import EncryptedFileMetadataManager from lbrynet.lbry_file.client import EncryptedFileOptions from lbrynet.file_manager import EncryptedFileCreator @@ -18,6 +17,7 @@ from lbrynet.file_manager import EncryptedFileManager from lbrynet.tests import mocks from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir + class TestReflector(unittest.TestCase): def setUp(self): mocks.mock_conf_settings(self) @@ -57,7 +57,7 @@ class TestReflector(unittest.TestCase): self.session = Session.Session( conf.settings['data_rate'], db_dir=self.db_dir, - lbryid="abcd", + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=self.blob_dir, @@ -66,7 +66,7 @@ class TestReflector(unittest.TestCase): rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=mocks.BlobAvailabilityTracker, - dht_node_class=Node + external_ip="127.0.0.1" ) self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager( diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index 54b69fc1d..afd3b029c 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -72,12 +72,12 @@ class TestStreamify(TestCase): os.mkdir(blob_dir) self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=self.is_generous + is_generous=self.is_generous, external_ip="127.0.0.1" ) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -128,11 +128,11 @@ class TestStreamify(TestCase): os.mkdir(blob_dir) self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker + blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1" ) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) diff --git a/lbrynet/tests/integration/test_integration.py b/lbrynet/tests/integration/test_integration.py index 521d93844..2036a9730 100644 --- a/lbrynet/tests/integration/test_integration.py +++ b/lbrynet/tests/integration/test_integration.py @@ -18,6 +18,7 @@ def shell_command(command): FNULL = open(os.devnull, 'w') p = subprocess.Popen(command,shell=False,stdout=FNULL,stderr=subprocess.STDOUT) + def lbrynet_cli(commands): cli_cmd=['lbrynet-cli'] for cmd in commands: @@ -65,7 +66,6 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertTrue(out['is_running']) - def test_cli_docopts(self): out,err = lbrynet_cli(['cli_test_command']) self.assertEqual('',out) @@ -83,7 +83,6 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertEqual([1,[],1,None,False,False], out) - out,err = lbrynet_cli(['cli_test_command','1', '--pos_arg2=2','--pos_arg3=3']) out = json.loads(out) self.assertEqual([1,[],2,3,False,False], out) @@ -93,7 +92,6 @@ class TestIntegration(unittest.TestCase): # TODO: variable length arguments don't have guess_type() on them self.assertEqual([1,['2','3'],None,None,False,False], out) - out,err = lbrynet_cli(['cli_test_command','1','-a']) out = json.loads(out) self.assertEqual([1,[],None,None,True,False], out) @@ -102,13 +100,10 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertEqual([1,[],None,None,True,False], out) - out,err = lbrynet_cli(['cli_test_command','1','-a','-b']) out = json.loads(out) self.assertEqual([1,[],None,None,True,True], out) - - def test_status(self): out = lbrynet.status() self.assertTrue(out['is_running']) diff --git a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py index 0fffb7b4a..a285cbfa8 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py @@ -15,6 +15,10 @@ from lbrynet.tests.mocks import BlobAvailabilityTracker as DummyBlobAvailability from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed +import logging +logging.getLogger("lbryum").setLevel(logging.WARNING) + + def get_test_daemon(data_rate=None, generous=True, with_fee=False): if data_rate is None: data_rate = conf.ADJUSTABLE_SETTINGS['data_rate'][1] @@ -68,7 +72,6 @@ class TestCostEst(unittest.TestCase): size = 10000000 correct_result = 4.5 daemon = get_test_daemon(generous=True, with_fee=True) - print daemon.get_est_cost("test", size) self.assertEquals(daemon.get_est_cost("test", size).result, correct_result) def test_fee_and_ungenerous_data(self): diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py new file mode 100644 index 000000000..70a93fea7 --- /dev/null +++ b/scripts/dht_monitor.py @@ -0,0 +1,103 @@ +import curses +import time +from jsonrpc.proxy import JSONRPCProxy +import logging + +log = logging.getLogger(__name__) +log.addHandler(logging.FileHandler("dht contacts.log")) +# log.addHandler(logging.StreamHandler()) +log.setLevel(logging.INFO) +stdscr = curses.initscr() + +api = JSONRPCProxy.from_url("http://localhost:5280") + + +def init_curses(): + curses.noecho() + curses.cbreak() + stdscr.nodelay(1) + stdscr.keypad(1) + + +def teardown_curses(): + curses.nocbreak() + stdscr.keypad(0) + curses.echo() + curses.endwin() + + +def refresh(last_contacts, last_blobs): + height, width = stdscr.getmaxyx() + + try: + routing_table_info = api.routing_table_get() + node_id = routing_table_info['node id'] + except: + node_id = "UNKNOWN" + routing_table_info = { + 'buckets': {}, + 'contacts': [], + 'blob hashes': [] + } + for y in range(height): + stdscr.addstr(y, 0, " " * (width - 1)) + + buckets = routing_table_info['buckets'] + stdscr.addstr(0, 0, "node id: %s" % node_id) + stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" % + (len(buckets), len(routing_table_info['contacts']), + len(routing_table_info['blob hashes']))) + + y = 3 + for i in sorted(buckets.keys()): + stdscr.addstr(y, 0, "bucket %s" % i) + y += 1 + for h in sorted(buckets[i], key=lambda x: x['id'].decode('hex')): + stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['id'], h['address'], len(h['blobs']))) + y += 1 + y += 1 + + new_contacts = set(routing_table_info['contacts']) - last_contacts + lost_contacts = last_contacts - set(routing_table_info['contacts']) + + if new_contacts: + for c in new_contacts: + log.debug("added contact %s", c) + if lost_contacts: + for c in lost_contacts: + log.info("lost contact %s", c) + + new_blobs = set(routing_table_info['blob hashes']) - last_blobs + lost_blobs = last_blobs - set(routing_table_info['blob hashes']) + + if new_blobs: + for c in new_blobs: + log.debug("added blob %s", c) + if lost_blobs: + for c in lost_blobs: + log.info("lost blob %s", c) + + stdscr.addstr(y + 1, 0, str(time.time())) + stdscr.refresh() + return set(routing_table_info['contacts']), set(routing_table_info['blob hashes']) + + +def do_main(): + c = None + last_contacts, last_blobs = set(), set() + while c not in [ord('q'), ord('Q')]: + last_contacts, last_blobs = refresh(last_contacts, last_blobs) + c = stdscr.getch() + time.sleep(0.1) + + +def main(): + try: + init_curses() + do_main() + finally: + teardown_curses() + + +if __name__ == "__main__": + main() diff --git a/scripts/dht_scripts.py b/scripts/dht_scripts.py index 657a5d7e0..b3a5cafe0 100644 --- a/scripts/dht_scripts.py +++ b/scripts/dht_scripts.py @@ -22,7 +22,7 @@ def join_network(udp_port, known_nodes): lbryid = generate_id() log.info('Creating node') - node = Node(udpPort=udp_port, lbryid=lbryid) + node = Node(udpPort=udp_port, node_id=lbryid) log.info('Joining network') yield node.joinNetwork(known_nodes) diff --git a/scripts/dhttest.py b/scripts/dhttest.py index a188030dc..fe0a0af7f 100644 --- a/scripts/dhttest.py +++ b/scripts/dhttest.py @@ -150,7 +150,7 @@ if __name__ == '__main__': # If you wish to have a pure Kademlia network, use the # entangled.kademlia.node.Node class instead print 'Creating Node' - node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid) + node = Node(udpPort=int(sys.argv[1]), node_id=lbryid) # Schedule the node to join the Kademlia/Entangled DHT node.joinNetwork(knownNodes) diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py index 39e1f406f..c2b08f944 100644 --- a/scripts/query_available_blobs.py +++ b/scripts/query_available_blobs.py @@ -51,7 +51,7 @@ def main(args=None): session = Session.Session( 0, db_dir=db_dir, - lbryid=utils.generate_id(), + node_id=utils.generate_id(), blob_dir=blob_dir, dht_node_port=4444, known_dht_nodes=conf.settings['known_dht_nodes'], diff --git a/scripts/rpc_node.py b/scripts/rpc_node.py index ced4fc6e8..40d69b8e7 100644 --- a/scripts/rpc_node.py +++ b/scripts/rpc_node.py @@ -1,82 +1,214 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# - -# Thanks to Paul Cannon for IP-address resolution functions (taken from aspn.activestate.com) - - -""" -Launch a DHT node which can respond to RPC commands. -""" - +import logging +import requests +import miniupnpc import argparse -from lbrynet.dht.node import Node -from txjsonrpc.web import jsonrpc -from twisted.web import server +from copy import deepcopy from twisted.internet import reactor, defer +from twisted.web import resource +from twisted.web.server import Site + +from lbrynet import conf +from lbrynet.core.log_support import configure_console +from lbrynet.dht.error import TimeoutError +conf.initialize_settings() + +log = logging.getLogger("dht tool") +configure_console() +log.setLevel(logging.INFO) + +from lbrynet.dht.node import Node +from lbrynet.dht.contact import Contact +from lbrynet.daemon.auth.server import AuthJSONRPCServer +from lbrynet.core.utils import generate_id + +def get_external_ip_and_setup_upnp(): + try: + u = miniupnpc.UPnP() + u.discoverdelay = 200 + u.discover() + u.selectigd() + + if u.getspecificportmapping(4444, "UDP"): + u.deleteportmapping(4444, "UDP") + log.info("Removed UPnP redirect for UDP 4444.") + u.addportmapping(4444, 'UDP', u.lanaddr, 4444, 'LBRY DHT port', '') + log.info("got external ip from upnp") + return u.externalipaddress() + except Exception: + log.exception("derp") + r = requests.get('https://api.ipify.org', {'format': 'json'}) + log.info("got external ip from ipify.org") + return r.json()['ip'] -class RPCNode(jsonrpc.JSONRPC): - def __init__(self, node, shut_down_cb): - jsonrpc.JSONRPC.__init__(self) - self.node = node - self.shut_down_cb = shut_down_cb +class NodeRPC(AuthJSONRPCServer): + def __init__(self, lbryid, seeds, node_port, rpc_port): + AuthJSONRPCServer.__init__(self, False) + self.root = None + self.port = None + self.seeds = seeds + self.node_port = node_port + self.rpc_port = rpc_port + if lbryid: + lbryid = lbryid.decode('hex') + else: + lbryid = generate_id() + self.node_id = lbryid + self.external_ip = get_external_ip_and_setup_upnp() + self.node_port = node_port - def jsonrpc_total_dht_nodes(self): - return self.node.getApproximateTotalDHTNodes() + @defer.inlineCallbacks + def setup(self): + self.node = Node(node_id=self.node_id, udpPort=self.node_port, + externalIP=self.external_ip) + hosts = [] + for hostname, hostport in self.seeds: + host_ip = yield reactor.resolve(hostname) + hosts.append((host_ip, hostport)) + log.info("connecting to dht") + yield self.node.joinNetwork(tuple(hosts)) + log.info("connected to dht") + if not self.announced_startup: + self.announced_startup = True + self.start_api() + log.info("lbry id: %s (%i bytes)", self.node.node_id.encode('hex'), len(self.node.node_id)) - def jsonrpc_total_dht_hashes(self): - return self.node.getApproximateTotalHashes() + def start_api(self): + root = resource.Resource() + root.putChild('', self) + self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost') + log.info("started jsonrpc server") - def jsonrpc_stop(self): - self.shut_down_cb() - return "fine" + @defer.inlineCallbacks + def jsonrpc_node_id_set(self, node_id): + old_id = self.node.node_id + self.node.stop() + del self.node + self.node_id = node_id.decode('hex') + yield self.setup() + msg = "changed dht id from %s to %s" % (old_id.encode('hex'), + self.node.node_id.encode('hex')) + defer.returnValue(msg) + + def jsonrpc_node_id_get(self): + return self._render_response(self.node.node_id.encode('hex')) + + @defer.inlineCallbacks + def jsonrpc_peer_find(self, node_id): + node_id = node_id.decode('hex') + contact = yield self.node.findContact(node_id) + result = None + if contact: + result = (contact.address, contact.port) + defer.returnValue(result) + + @defer.inlineCallbacks + def jsonrpc_peer_list_for_blob(self, blob_hash): + peers = yield self.node.getPeersForBlob(blob_hash.decode('hex')) + defer.returnValue(peers) + + @defer.inlineCallbacks + def jsonrpc_ping(self, node_id): + contact_host = yield self.jsonrpc_peer_find(node_id=node_id) + if not contact_host: + defer.returnValue("failed to find node") + contact_ip, contact_port = contact_host + contact = Contact(node_id.decode('hex'), contact_ip, contact_port, self.node._protocol) + try: + result = yield contact.ping() + except TimeoutError: + self.node.removeContact(contact.id) + self.node._dataStore.removePeer(contact.id) + result = {'error': 'timeout'} + defer.returnValue(result) + + def get_routing_table(self): + result = {} + data_store = deepcopy(self.node._dataStore._dict) + datastore_len = len(data_store) + hosts = {} + missing_contacts = [] + if datastore_len: + for k, v in data_store.iteritems(): + for value, lastPublished, originallyPublished, originalPublisherID in v: + try: + contact = self.node._routingTable.getContact(originalPublisherID) + except ValueError: + if originalPublisherID.encode('hex') not in missing_contacts: + missing_contacts.append(originalPublisherID.encode('hex')) + continue + if contact in hosts: + blobs = hosts[contact] + else: + blobs = [] + blobs.append(k.encode('hex')) + hosts[contact] = blobs + + contact_set = [] + blob_hashes = [] + result['buckets'] = {} + + for i in range(len(self.node._routingTable._buckets)): + for contact in self.node._routingTable._buckets[i]._contacts: + contacts = result['buckets'].get(i, []) + if contact in hosts: + blobs = hosts[contact] + del hosts[contact] + else: + blobs = [] + host = { + "address": contact.address, + "id": contact.id.encode("hex"), + "blobs": blobs, + } + for blob_hash in blobs: + if blob_hash not in blob_hashes: + blob_hashes.append(blob_hash) + contacts.append(host) + result['buckets'][i] = contacts + contact_set.append(contact.id.encode("hex")) + if hosts: + result['datastore extra'] = [ + { + "id": host.id.encode('hex'), + "blobs": hosts[host], + } + for host in hosts] + result['missing contacts'] = missing_contacts + result['contacts'] = contact_set + result['blob hashes'] = blob_hashes + result['node id'] = self.node_id.encode('hex') + return result + + def jsonrpc_routing_table_get(self): + return self._render_response(self.get_routing_table()) def main(): parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands") - - parser.add_argument("node_port", + parser.add_argument("--node_port", help=("The UDP port on which the node will listen for connections " "from other dht nodes"), - type=int) - parser.add_argument("rpc_port", + type=int, default=4444) + parser.add_argument("--rpc_port", help="The TCP port on which the node will listen for rpc commands", - type=int) - parser.add_argument("dht_bootstrap_host", + type=int, default=5280) + parser.add_argument("--bootstrap_host", help="The IP of a DHT node to be used to bootstrap into the network", - nargs='?') - parser.add_argument("dht_bootstrap_port", + default='lbrynet1.lbry.io') + parser.add_argument("--node_id", + help="The IP of a DHT node to be used to bootstrap into the network", + default=None) + parser.add_argument("--bootstrap_port", help="The port of a DHT node to be used to bootstrap into the network", - nargs='?', default=4000, type=int) - parser.add_argument("--rpc_ip_address", - help="The network interface on which to listen for rpc connections", - default="127.0.0.1") + default=4444, type=int) args = parser.parse_args() - - def start_rpc(): - rpc_node = RPCNode(node, shut_down) - reactor.listenTCP(args.rpc_port, server.Site(rpc_node), interface=args.rpc_ip_address) - - def shut_down(): - d = defer.maybeDeferred(node.stop) - d.addBoth(lambda _: reactor.stop()) - return d - - known_nodes = [] - if args.dht_bootstrap_host: - known_nodes.append((args.dht_bootstrap_host, args.dht_bootstrap_port)) - - node = Node(udpPort=args.node_port) - node.joinNetwork(known_nodes) - d = node._joinDeferred - d.addCallback(lambda _: start_rpc()) + seeds = [(args.bootstrap_host, args.bootstrap_port)] + server = NodeRPC(args.node_id, seeds, args.node_port, args.rpc_port) + reactor.addSystemEventTrigger('after', 'startup', server.setup) reactor.run() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scripts/send_sd_blobs_to_lighthouse.py b/scripts/send_sd_blobs_to_lighthouse.py index de40356cd..aad1f21f9 100644 --- a/scripts/send_sd_blobs_to_lighthouse.py +++ b/scripts/send_sd_blobs_to_lighthouse.py @@ -53,7 +53,7 @@ def main(args=None): session = Session.Session( blob_data_payment_rate=0, db_dir=db_dir, - lbryid=utils.generate_id(), + node_id=utils.generate_id(), blob_dir=blob_dir, dht_node_port=4444, known_dht_nodes=conf.settings['known_dht_nodes'], From c7acb31614dd3c512f22eb199fa558b1ee3a6e02 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:31:18 -0400 Subject: [PATCH 18/21] add dht tests --- lbrynet/tests/dht/__init__.py | 0 .../tests/dht/{testNode.py => test_node.py} | 203 ++++++++---------- .../dht/{testProtocol.py => test_protocol.py} | 181 +++++++--------- ...tRoutingTable.py => test_routing_table.py} | 119 ++++++---- 4 files changed, 233 insertions(+), 270 deletions(-) create mode 100644 lbrynet/tests/dht/__init__.py rename lbrynet/tests/dht/{testNode.py => test_node.py} (67%) rename lbrynet/tests/dht/{testProtocol.py => test_protocol.py} (55%) rename lbrynet/tests/dht/{testRoutingTable.py => test_routing_table.py} (62%) diff --git a/lbrynet/tests/dht/__init__.py b/lbrynet/tests/dht/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/tests/dht/testNode.py b/lbrynet/tests/dht/test_node.py similarity index 67% rename from lbrynet/tests/dht/testNode.py rename to lbrynet/tests/dht/test_node.py index a4e751d51..c86f98fa1 100644 --- a/lbrynet/tests/dht/testNode.py +++ b/lbrynet/tests/dht/test_node.py @@ -8,10 +8,13 @@ import hashlib import unittest import struct +from twisted.internet import protocol, defer, selectreactor +from lbrynet.dht.msgtypes import ResponseMessage import lbrynet.dht.node import lbrynet.dht.constants import lbrynet.dht.datastore + class NodeIDTest(unittest.TestCase): """ Test case for the Node class's ID """ def setUp(self): @@ -19,66 +22,70 @@ class NodeIDTest(unittest.TestCase): def testAutoCreatedID(self): """ Tests if a new node has a valid node ID """ - self.failUnlessEqual(type(self.node.id), str, 'Node does not have a valid ID') - self.failUnlessEqual(len(self.node.id), 20, 'Node ID length is incorrect! Expected 160 bits, got %d bits.' % (len(self.node.id)*8)) + self.failUnlessEqual(type(self.node.node_id), str, 'Node does not have a valid ID') + self.failUnlessEqual(len(self.node.node_id), 48, 'Node ID length is incorrect! ' + 'Expected 384 bits, got %d bits.' % + (len(self.node.node_id) * 8)) def testUniqueness(self): - """ Tests the uniqueness of the values created by the NodeID generator - """ + """ Tests the uniqueness of the values created by the NodeID generator """ generatedIDs = [] for i in range(100): newID = self.node._generateID() # ugly uniqueness test self.failIf(newID in generatedIDs, 'Generated ID #%d not unique!' % (i+1)) generatedIDs.append(newID) - + def testKeyLength(self): """ Tests the key Node ID key length """ for i in range(20): id = self.node._generateID() # Key length: 20 bytes == 160 bits - self.failUnlessEqual(len(id), 20, 'Length of generated ID is incorrect! Expected 160 bits, got %d bits.' % (len(id)*8)) + self.failUnlessEqual(len(id), 48, + 'Length of generated ID is incorrect! Expected 384 bits, ' + 'got %d bits.' % (len(id)*8)) class NodeDataTest(unittest.TestCase): """ Test case for the Node class's data-related functions """ def setUp(self): import lbrynet.dht.contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('test') self.node = lbrynet.dht.node.Node() - self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345, self.node._protocol) + self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345, + self.node._protocol) self.token = self.node.make_token(self.contact.compact_ip()) self.cases = [] for i in xrange(5): h.update(str(i)) self.cases.append((h.digest(), 5000+2*i)) self.cases.append((h.digest(), 5001+2*i)) -<<<<<<< Updated upstream - #(('a', 'hello there\nthis is a test'), - # ('aMuchLongerKeyThanAnyOfThePreviousOnes', 'some data')) - -======= ->>>>>>> Stashed changes def testStore(self): - - def check_val_in_result(r, peer_info): - self.failUnless - """ Tests if the node can store (and privately retrieve) some data """ for key, value in self.cases: - self.node.store(key, {'port': value, 'bbid': self.contact.id, 'token': self.token}, self.contact.id, _rpcNodeContact=self.contact) + request = { + 'port': value, + 'lbryid': self.contact.id, + 'token': self.token + } + self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact) for key, value in self.cases: - expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + self.contact.id - self.failUnless(self.node._dataStore.hasPeersForBlob(key), 'Stored key not found in node\'s DataStore: "%s"' % key) - self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key), 'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s' % (key, value, self.node._dataStore.getPeersForBlob(key))) + expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + \ + self.contact.id + self.failUnless(self.node._dataStore.hasPeersForBlob(key), + 'Stored key not found in node\'s DataStore: "%s"' % key) + self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key), + 'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s' + % (key, value, self.node._dataStore.getPeersForBlob(key))) + class NodeContactTest(unittest.TestCase): """ Test case for the Node class's contact management-related functions """ def setUp(self): self.node = lbrynet.dht.node.Node() - + def testAddContact(self): """ Tests if a contact can be added and retrieved correctly """ import lbrynet.dht.contact @@ -91,67 +98,55 @@ class NodeContactTest(unittest.TestCase): self.node.addContact(contact) # ...and request the closest nodes to it using FIND_NODE closestNodes = self.node._routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k) - self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes)) - self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()') - + self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; ' + 'expected 1, got %d' % len(closestNodes)) + self.failUnless(contact in closestNodes, 'Added contact not found by issueing ' + '_findCloseNodes()') + def testAddSelfAsContact(self): """ Tests the node's behaviour when attempting to add itself as a contact """ import lbrynet.dht.contact # Create a contact with the same ID as the local node's ID - contact = lbrynet.dht.contact.Contact(self.node.id, '127.0.0.1', 91824, None) + contact = lbrynet.dht.contact.Contact(self.node.node_id, '127.0.0.1', 91824, None) # Now try to add it self.node.addContact(contact) # ...and request the closest nodes to it using FIND_NODE - closestNodes = self.node._routingTable.findCloseNodes(self.node.id, lbrynet.dht.constants.k) + closestNodes = self.node._routingTable.findCloseNodes(self.node.node_id, + lbrynet.dht.constants.k) self.failIf(contact in closestNodes, 'Node added itself as a contact') -<<<<<<< Updated upstream -# """ Test case for the Node class's iterative node lookup algorithm """ - - -# """ Ugly brute-force test to see if the iterative node lookup algorithm runs without failing """ - -======= ->>>>>>> Stashed changes - -"""Some scaffolding for the NodeLookupTest class. Allows isolated -node testing by simulating remote node responses""" -from twisted.internet import protocol, defer, selectreactor -from lbrynet.dht.msgtypes import ResponseMessage - - class FakeRPCProtocol(protocol.DatagramProtocol): def __init__(self): - self.reactor = selectreactor.SelectReactor() + self.reactor = selectreactor.SelectReactor() self.testResponse = None self.network = None def createNetwork(self, contactNetwork): - """ set up a list of contacts together with their closest contacts - @param contactNetwork: a sequence of tuples, each containing a contact together with its closest - contacts: C{(, )} - """ - self.network = contactNetwork - - """ Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs """ + """ + set up a list of contacts together with their closest contacts + @param contactNetwork: a sequence of tuples, each containing a contact together with its + closest contacts: C{(, )} + """ + self.network = contactNetwork + def sendRPC(self, contact, method, args, rawResponse=False): - - if method == "findNode": + """ Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs""" + + if method == "findNode": # get the specific contacts closest contacts closestContacts = [] + closestContactsList = [] for contactTuple in self.network: if contact == contactTuple[0]: # get the list of closest contacts for this contact closestContactsList = contactTuple[1] - - # Pack the closest contacts into a ResponseMessage + # Pack the closest contacts into a ResponseMessage for closeContact in closestContactsList: closestContacts.append((closeContact.id, closeContact.address, closeContact.port)) message = ResponseMessage("rpcId", contact.id, closestContacts) - df = defer.Deferred() - df.callback((message,(contact.address, contact.port))) + df.callback((message, (contact.address, contact.port))) return df elif method == "findValue": for contactTuple in self.network: @@ -160,12 +155,10 @@ class FakeRPCProtocol(protocol.DatagramProtocol): dataDict = contactTuple[2] dataKey = dataDict.keys()[0] data = dataDict.get(dataKey) - # Check if this contact has the requested value if dataKey == args[0]: # Return the data value response = dataDict - print "data found at contact: " + contact.id else: # Return the closest contact to the requested data key @@ -173,62 +166,52 @@ class FakeRPCProtocol(protocol.DatagramProtocol): closeContacts = contactTuple[1] closestContacts = [] for closeContact in closeContacts: - closestContacts.append((closeContact.id, closeContact.address, closeContact.port)) + closestContacts.append((closeContact.id, closeContact.address, + closeContact.port)) response = closestContacts - + # Create the response message message = ResponseMessage("rpcId", contact.id, response) df = defer.Deferred() - df.callback((message,(contact.address, contact.port))) + df.callback((message, (contact.address, contact.port))) return df def _send(self, data, rpcID, address): """ fake sending data """ - - + class NodeLookupTest(unittest.TestCase): """ Test case for the Node class's iterativeFind node lookup algorithm """ - + def setUp(self): - # create a fake protocol to imitate communication with other nodes self._protocol = FakeRPCProtocol() - - # Note: The reactor is never started for this test. All deferred calls run sequentially, + # Note: The reactor is never started for this test. All deferred calls run sequentially, # since there is no asynchronous network communication - # create the node to be tested in isolation - self.node = lbrynet.dht.node.Node(None, 4000, None, None, self._protocol) - + self.node = lbrynet.dht.node.Node('12345678901234567800', 4000, None, None, self._protocol) self.updPort = 81173 - -<<<<<<< Updated upstream - # create a dummy reactor - -======= ->>>>>>> Stashed changes self.contactsAmount = 80 - # set the node ID manually for testing - self.node.id = '12345678901234567800' - # Reinitialise the routing table - self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(self.node.id) - + self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable( + self.node.node_id) + # create 160 bit node ID's for test purposes self.testNodeIDs = [] - idNum = int(self.node.id) + idNum = int(self.node.node_id) for i in range(self.contactsAmount): - # create the testNodeIDs in ascending order, away from the actual node ID, with regards to the distance metric + # create the testNodeIDs in ascending order, away from the actual node ID, + # with regards to the distance metric self.testNodeIDs.append(idNum + i + 1) # generate contacts self.contacts = [] for i in range(self.contactsAmount): - contact = lbrynet.dht.contact.Contact(str(self.testNodeIDs[i]), "127.0.0.1", self.updPort + i + 1, self._protocol) + contact = lbrynet.dht.contact.Contact(str(self.testNodeIDs[i]), "127.0.0.1", + self.updPort + i + 1, self._protocol) self.contacts.append(contact) - - # create the network of contacts in format: (contact, closest contacts) + + # create the network of contacts in format: (contact, closest contacts) contactNetwork = ((self.contacts[0], self.contacts[8:15]), (self.contacts[1], self.contacts[16:23]), (self.contacts[2], self.contacts[24:31]), @@ -254,43 +237,27 @@ class NodeLookupTest(unittest.TestCase): contacts_with_datastores = [] for contact_tuple in contactNetwork: - contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], lbrynet.dht.datastore.DictDataStore())) - + contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], + lbrynet.dht.datastore.DictDataStore())) self._protocol.createNetwork(contacts_with_datastores) - + def testNodeBootStrap(self): """ Test bootstrap with the closest possible contacts """ - - df = self.node._iterativeFind(self.node.id, self.contacts[0:8]) + + df = self.node._iterativeFind(self.node.node_id, self.contacts[0:8]) # Set the expected result - expectedResult = [] - + expectedResult = [] for item in self.contacts[0:6]: - expectedResult.append(item.id) - + expectedResult.append(item.id) # Get the result from the deferred activeContacts = df.result - - + # Check the length of the active contacts - self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), \ - "More active contacts should exist, there should be %d contacts" %expectedResult.__len__()) - - + self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), + "More active contacts should exist, there should be %d " + "contacts" % expectedResult.__len__()) + # Check that the received active contacts are the same as the input contacts - self.failUnlessEqual(activeContacts, expectedResult, \ - "Active should only contain the closest possible contacts which were used as input for the boostrap") - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(NodeIDTest)) - suite.addTest(unittest.makeSuite(NodeDataTest)) - suite.addTest(unittest.makeSuite(NodeContactTest)) - suite.addTest(unittest.makeSuite(NodeLookupTest)) - return suite - - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) + self.failUnlessEqual(activeContacts, expectedResult, + "Active should only contain the closest possible contacts" + " which were used as input for the boostrap") diff --git a/lbrynet/tests/dht/testProtocol.py b/lbrynet/tests/dht/test_protocol.py similarity index 55% rename from lbrynet/tests/dht/testProtocol.py rename to lbrynet/tests/dht/test_protocol.py index 7215eaa27..70c1e0456 100644 --- a/lbrynet/tests/dht/testProtocol.py +++ b/lbrynet/tests/dht/test_protocol.py @@ -1,88 +1,22 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive - import time import unittest - -from twisted.internet import defer -from twisted.python import failure import twisted.internet.selectreactor -from twisted.internet.protocol import DatagramProtocol import lbrynet.dht.protocol import lbrynet.dht.contact import lbrynet.dht.constants import lbrynet.dht.msgtypes -from lbrynet.dht.node import rpcmethod +from lbrynet.dht.error import TimeoutError +from lbrynet.dht.node import Node, rpcmethod -class FakeNode(object): - """ A fake node object implementing some RPC and non-RPC methods to - test the Kademlia protocol's behaviour - """ - def __init__(self, id): - self.id = id - self.contacts = [] - - @rpcmethod - def ping(self): - return 'pong' - - def pingNoRPC(self): - return 'pong' - - @rpcmethod - def echo(self, value): - return value - - def addContact(self, contact): - self.contacts.append(contact) - - def removeContact(self, contact): - self.contacts.remove(contact) - - def indirectPingContact(self, protocol, contact): - """ Pings the given contact (using the specified KademliaProtocol - object, not the direct Contact API), and removes the contact - on a timeout """ - df = protocol.sendRPC(contact, 'ping', {}) - def handleError(f): - if f.check(lbrynet.dht.protocol.TimeoutError): - self.removeContact(contact) - return f - else: - # This is some other error - return f - df.addErrback(handleError) - return df - -class ClientDatagramProtocol(lbrynet.dht.protocol.KademliaProtocol): - data = '' - msgID = '' - destination = ('127.0.0.1', 9182) - - def __init__(self): - lbrynet.dht.protocol.KademliaProtocol.__init__(self, None) - - def startProtocol(self): - self.sendDatagram() - - def sendDatagram(self): - if len(self.data): - self._send(self.data, self.msgID, self.destination) - - - - class KademliaProtocolTest(unittest.TestCase): """ Test case for the Protocol class """ + def setUp(self): del lbrynet.dht.protocol.reactor lbrynet.dht.protocol.reactor = twisted.internet.selectreactor.SelectReactor() - self.node = FakeNode('node1') + self.node = Node(node_id='node1', udpPort=9182, externalIP="127.0.0.1") self.protocol = lbrynet.dht.protocol.KademliaProtocol(self.node) def testReactor(self): @@ -93,36 +27,66 @@ class KademliaProtocolTest(unittest.TestCase): def testRPCTimeout(self): """ Tests if a RPC message sent to a dead remote node times out correctly """ + + @rpcmethod + def fake_ping(*args, **kwargs): + time.sleep(lbrynet.dht.constants.rpcTimeout + 1) + return 'pong' + + real_ping = self.node.ping + real_timeout = lbrynet.dht.constants.rpcTimeout + real_attempts = lbrynet.dht.constants.rpcAttempts + lbrynet.dht.constants.rpcAttempts = 1 + lbrynet.dht.constants.rpcTimeout = 1 + self.node.ping = fake_ping deadContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) self.node.addContact(deadContact) # Make sure the contact was added - self.failIf(deadContact not in self.node.contacts, 'Contact not added to fake node (error in test code)') - # Set the timeout to 0 for testing - tempTimeout = lbrynet.dht.constants.rpcTimeout - lbrynet.dht.constants.rpcTimeout = 0 - lbrynet.dht.protocol.reactor.listenUDP(0, self.protocol) - # Run the PING RPC (which should timeout) - df = self.node.indirectPingContact(self.protocol, deadContact) + self.failIf(deadContact not in self.node.contacts, + 'Contact not added to fake node (error in test code)') + lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) + + # Run the PING RPC (which should raise a timeout error) + df = self.protocol.sendRPC(deadContact, 'ping', {}) + + def check_timeout(err): + self.assertEqual(type(err), TimeoutError) + + df.addErrback(check_timeout) + + def reset_values(): + self.node.ping = real_ping + lbrynet.dht.constants.rpcTimeout = real_timeout + lbrynet.dht.constants.rpcAttempts = real_attempts + + # See if the contact was removed due to the timeout + def check_removed_contact(): + self.failIf(deadContact in self.node.contacts, + 'Contact was not removed after RPC timeout; check exception types.') + + df.addCallback(lambda _: reset_values()) + # Stop the reactor if a result arrives (timeout or not) df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) + df.addCallback(lambda _: check_removed_contact()) lbrynet.dht.protocol.reactor.run() - # See if the contact was removed due to the timeout - self.failIf(deadContact in self.node.contacts, 'Contact was not removed after RPC timeout; check exception types.') - # Restore the global timeout - lbrynet.dht.constants.rpcTimeout = tempTimeout - + def testRPCRequest(self): """ Tests if a valid RPC request is executed and responded to correctly """ remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) self.node.addContact(remoteContact) self.error = None + def handleError(f): self.error = 'An RPC error occurred: %s' % f.getErrorMessage() + def handleResult(result): expectedResult = 'pong' if result != expectedResult: - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result) - # Publish the "local" node on the network + self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \ + % (expectedResult, result) + + # Publish the "local" node on the network lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) # Simulate the RPC df = remoteContact.ping() @@ -132,17 +96,19 @@ class KademliaProtocolTest(unittest.TestCase): lbrynet.dht.protocol.reactor.run() self.failIf(self.error, self.error) # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!') + self.failUnlessEqual(len(self.protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') def testRPCAccess(self): """ Tests invalid RPC requests - Verifies that a RPC request for an existing but unpublished method is denied, and that the associated (remote) exception gets raised locally """ remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) self.node.addContact(remoteContact) self.error = None + def handleError(f): try: f.raiseException() @@ -150,11 +116,14 @@ class KademliaProtocolTest(unittest.TestCase): # This is the expected outcome since the remote node did not publish the method self.error = None except Exception, e: - self.error = 'The remote method failed, but the wrong exception was raised; expected AttributeError, got %s' % type(e) - + self.error = 'The remote method failed, but the wrong exception was raised; ' \ + 'expected AttributeError, got %s' % type(e) + def handleResult(result): - self.error = 'The remote method executed successfully, returning: "%s"; this RPC should not have been allowed.' % result - # Publish the "local" node on the network + self.error = 'The remote method executed successfully, returning: "%s"; ' \ + 'this RPC should not have been allowed.' % result + + # Publish the "local" node on the network lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) # Simulate the RPC df = remoteContact.pingNoRPC() @@ -164,37 +133,35 @@ class KademliaProtocolTest(unittest.TestCase): lbrynet.dht.protocol.reactor.run() self.failIf(self.error, self.error) # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!') + self.failUnlessEqual(len(self.protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') def testRPCRequestArgs(self): """ Tests if an RPC requiring arguments is executed correctly """ remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) self.node.addContact(remoteContact) self.error = None + def handleError(f): self.error = 'An RPC error occurred: %s' % f.getErrorMessage() + def handleResult(result): - expectedResult = 'This should be returned.' - if result != 'This should be returned.': - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result) - # Publish the "local" node on the network + expectedResult = 'pong' + if result != expectedResult: + self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \ + (expectedResult, result) + + # Publish the "local" node on the network lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) # Simulate the RPC - df = remoteContact.echo('This should be returned.') + df = remoteContact.ping() df.addCallback(handleResult) df.addErrback(handleError) df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) lbrynet.dht.protocol.reactor.run() self.failIf(self.error, self.error) # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!') - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(KademliaProtocolTest)) - return suite - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) + self.failUnlessEqual(len(self.protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') diff --git a/lbrynet/tests/dht/testRoutingTable.py b/lbrynet/tests/dht/test_routing_table.py similarity index 62% rename from lbrynet/tests/dht/testRoutingTable.py rename to lbrynet/tests/dht/test_routing_table.py index 8a1ad9c54..436b531ec 100644 --- a/lbrynet/tests/dht/testRoutingTable.py +++ b/lbrynet/tests/dht/test_routing_table.py @@ -10,6 +10,8 @@ import unittest import lbrynet.dht.constants import lbrynet.dht.routingtable import lbrynet.dht.contact +import lbrynet.dht.node + class FakeRPCProtocol(object): """ Fake RPC protocol; allows lbrynet.dht.contact.Contact objects to "send" RPCs """ @@ -21,6 +23,7 @@ class FakeDeferred(object): """ Fake Twisted Deferred object; allows the routing table to add callbacks that do nothing """ def addCallback(self, *args, **kwargs): return + def addErrback(self, *args, **kwargs): return @@ -28,34 +31,36 @@ class FakeDeferred(object): class TreeRoutingTableTest(unittest.TestCase): """ Test case for the RoutingTable class """ def setUp(self): - h = hashlib.sha1() + h = hashlib.sha384() h.update('node1') self.nodeID = h.digest() self.protocol = FakeRPCProtocol() self.routingTable = lbrynet.dht.routingtable.TreeRoutingTable(self.nodeID) - + def testDistance(self): """ Test to see if distance method returns correct result""" - + # testList holds a couple 3-tuple (variable1, variable2, result) - basicTestList = [('123456789','123456789', 0L), ('12345', '98765', 34527773184L)] + basicTestList = [('123456789', '123456789', 0L), ('12345', '98765', 34527773184L)] for test in basicTestList: - result = self.routingTable.distance(test[0], test[1]) - self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' % (test[2], result)) + result = lbrynet.dht.node.Distance(test[0])(test[1]) + self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' % + (test[2], result)) baseIp = '146.64.19.111' ipTestList = ['146.64.29.222', '192.68.19.333'] - distanceOne = self.routingTable.distance(baseIp, ipTestList[0]) - distanceTwo = self.routingTable.distance(baseIp, ipTestList[1]) + distanceOne = lbrynet.dht.node.Distance(baseIp)(ipTestList[0]) + distanceTwo = lbrynet.dht.node.Distance(baseIp)(ipTestList[1]) + + self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' % + (ipTestList[0], baseIp, ipTestList[1])) - self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' % (ipTestList[0], baseIp, ipTestList[1])) - def testAddContact(self): """ Tests if a contact can be added and retrieved correctly """ # Create the contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('node2') contactID = h.digest() contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) @@ -63,12 +68,14 @@ class TreeRoutingTableTest(unittest.TestCase): self.routingTable.addContact(contact) # ...and request the closest nodes to it (will retrieve it) closestNodes = self.routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k) - self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes)) - self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()') - + self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,' + ' got %d' % len(closestNodes)) + self.failUnless(contact in closestNodes, 'Added contact not found by issueing ' + '_findCloseNodes()') + def testGetContact(self): """ Tests if a specific existing contact can be retrieved correctly """ - h = hashlib.sha1() + h = hashlib.sha384() h.update('node2') contactID = h.digest() contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) @@ -77,9 +84,12 @@ class TreeRoutingTableTest(unittest.TestCase): # ...and get it again sameContact = self.routingTable.getContact(contactID) self.failUnlessEqual(contact, sameContact, 'getContact() should return the same contact') - + def testAddParentNodeAsContact(self): - """ Tests the routing table's behaviour when attempting to add its parent node as a contact """ + """ + Tests the routing table's behaviour when attempting to add its parent node as a contact + """ + # Create a contact with the same ID as the local node's ID contact = lbrynet.dht.contact.Contact(self.nodeID, '127.0.0.1', 91824, self.protocol) # Now try to add it @@ -87,11 +97,11 @@ class TreeRoutingTableTest(unittest.TestCase): # ...and request the closest nodes to it using FIND_NODE closestNodes = self.routingTable.findCloseNodes(self.nodeID, lbrynet.dht.constants.k) self.failIf(contact in closestNodes, 'Node added itself as a contact') - + def testRemoveContact(self): """ Tests contact removal """ # Create the contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('node2') contactID = h.digest() contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) @@ -105,54 +115,73 @@ class TreeRoutingTableTest(unittest.TestCase): def testSplitBucket(self): """ Tests if the the routing table correctly dynamically splits k-buckets """ - self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'Initial k-bucket range should be 0 <= range < 2**160') + self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**384, + 'Initial k-bucket range should be 0 <= range < 2**384') # Add k contacts for i in range(lbrynet.dht.constants.k): - h = hashlib.sha1() + h = hashlib.sha384() h.update('remote node %d' % i) nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, but should not yet be split') + self.failUnlessEqual(len(self.routingTable._buckets), 1, + 'Only k nodes have been added; the first k-bucket should now ' + 'be full, but should not yet be split') # Now add 1 more contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('yet another remote node') nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 2, 'k+1 nodes have been added; the first k-bucket should have been split into two new buckets') - self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'K-bucket was split, but its range was not properly adjusted') - self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**160, 'K-bucket was split, but the second (new) bucket\'s max range was not set properly') - self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, self.routingTable._buckets[1].rangeMin, 'K-bucket was split, but the min/max ranges were not divided properly') - + self.failUnlessEqual(len(self.routingTable._buckets), 2, + 'k+1 nodes have been added; the first k-bucket should have been ' + 'split into two new buckets') + self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**384, + 'K-bucket was split, but its range was not properly adjusted') + self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**384, + 'K-bucket was split, but the second (new) bucket\'s ' + 'max range was not set properly') + self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, + self.routingTable._buckets[1].rangeMin, + 'K-bucket was split, but the min/max ranges were ' + 'not divided properly') def testFullBucketNoSplit(self): - """ Test that a bucket is not split if it full, but does not cover the range containing the parent node's ID """ - self.routingTable._parentNodeID = 21*'a' # more than 160 bits; this will not be in the range of _any_ k-bucket + """ + Test that a bucket is not split if it full, but does not cover the range + containing the parent node's ID + """ + self.routingTable._parentNodeID = 49 * 'a' + # more than 384 bits; this will not be in the range of _any_ k-bucket # Add k contacts for i in range(lbrynet.dht.constants.k): - h = hashlib.sha1() + h = hashlib.sha384() h.update('remote node %d' % i) nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, and there should not be more than 1 bucket') - self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts))) + self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; ' + 'the first k-bucket should now be ' + 'full, and there should not be ' + 'more than 1 bucket') + self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, + 'Bucket should have k contacts; expected %d got %d' % + (lbrynet.dht.constants.k, + len(self.routingTable._buckets[0]._contacts))) # Now add 1 more contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('yet another remote node') nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, 'There should not be more than 1 bucket, since the bucket should not have been split (parent node ID not in range)') - self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts))) - self.failIf(contact in self.routingTable._buckets[0]._contacts, 'New contact should have been discarded (since RPC is faked in this test)') + self.failUnlessEqual(len(self.routingTable._buckets), 1, + 'There should not be more than 1 bucket, since the bucket ' + 'should not have been split (parent node ID not in range)') + self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), + lbrynet.dht.constants.k, 'Bucket should have k contacts; ' + 'expected %d got %d' % + (lbrynet.dht.constants.k, + len(self.routingTable._buckets[0]._contacts))) + self.failIf(contact in self.routingTable._buckets[0]._contacts, + 'New contact should have been discarded (since RPC is faked in this test)') -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(TreeRoutingTableTest)) - return suite - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) From 323c3e6cb39efb298a10e99405c802188129133a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:37:56 -0400 Subject: [PATCH 19/21] add external_ip argument to Session, default to None --- lbrynet/core/Session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index c50ec28ff..f390c6d26 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -43,7 +43,7 @@ class Session(object): blob_manager=None, peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker_class=None, - payment_rate_manager_class=None, is_generous=True): + payment_rate_manager_class=None, is_generous=True, external_ip=None): """@param blob_data_payment_rate: The default payment rate for blob data @param db_dir: The directory in which levelDB files should be stored @@ -124,7 +124,7 @@ class Session(object): self.rate_limiter = rate_limiter - self.external_ip = '127.0.0.1' + self.external_ip = external_ip self.upnp_redirects = [] From e7e987b07e071bc4833cce10a1a6d9bd6a0839e0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 13:44:57 -0400 Subject: [PATCH 20/21] changelog --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d0a0de31..609400329 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,13 @@ at anytime. * Fixed concurrent reflects opening too many files * Fixed cases when reflecting would fail on error conditions * Fixed deadlocks from occuring during blob writes + * Fixed and updated`lbrynet.tests.dht` + * Fixed redundant dht id + * Fixed dht `ping` method + * Fixed raising remote exceptions in dht + * Fixed hanging delayedCall in dht node class + * Fixed logging error in dht when calling or receiving methods with no arguments + * Fixed IndexError in routingTable.findCloseNodes which would cause an empty list to be returned ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. From a0a7d5f5698607dabc6073d430ef5d74ec398664 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 10 Oct 2017 15:04:48 -0400 Subject: [PATCH 21/21] add routing_table_get --- lbrynet/daemon/Daemon.py | 74 +++++++++++++++++++++++++++++++++++++++- scripts/dht_monitor.py | 19 ++++++----- 2 files changed, 83 insertions(+), 10 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index e2e9df32e..65e0207a9 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -9,7 +9,7 @@ import json import textwrap import random import signal - +from copy import deepcopy from twisted.web import server from twisted.internet import defer, threads, error, reactor from twisted.internet.task import LoopingCall @@ -2658,6 +2658,78 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + def jsonrpc_routing_table_get(self): + """ + Get DHT routing information + + Usage: + routing_table_get + + Returns: + (dict) dictionary containing routing and contact information + { + "buckets": { + : [ + { + "address": (str) peer address, + "node_id": (str) peer node id, + "blobs": (list) blob hashes announced by peer + } + "contacts": (list) contact node ids, + "blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets + "node_id": (str) the local dht node id + """ + + result = {} + data_store = deepcopy(self.session.dht_node._dataStore._dict) + datastore_len = len(data_store) + hosts = {} + + if datastore_len: + for k, v in data_store.iteritems(): + for value, lastPublished, originallyPublished, originalPublisherID in v: + try: + contact = self.session.dht_node._routingTable.getContact( + originalPublisherID) + except ValueError: + continue + if contact in hosts: + blobs = hosts[contact] + else: + blobs = [] + blobs.append(k.encode('hex')) + hosts[contact] = blobs + + contact_set = [] + blob_hashes = [] + result['buckets'] = {} + + for i in range(len(self.session.dht_node._routingTable._buckets)): + for contact in self.session.dht_node._routingTable._buckets[i]._contacts: + contacts = result['buckets'].get(i, []) + if contact in hosts: + blobs = hosts[contact] + del hosts[contact] + else: + blobs = [] + host = { + "address": contact.address, + "node_id": contact.id.encode("hex"), + "blobs": blobs, + } + for blob_hash in blobs: + if blob_hash not in blob_hashes: + blob_hashes.append(blob_hash) + contacts.append(host) + result['buckets'][i] = contacts + if contact.id.encode('hex') not in contact_set: + contact_set.append(contact.id.encode("hex")) + + result['contacts'] = contact_set + result['blob_hashes'] = blob_hashes + result['node_id'] = self.session.dht_node.node_id.encode('hex') + return self._render_response(result) + @defer.inlineCallbacks def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): """ diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py index 70a93fea7..60a07f799 100644 --- a/scripts/dht_monitor.py +++ b/scripts/dht_monitor.py @@ -9,7 +9,7 @@ log.addHandler(logging.FileHandler("dht contacts.log")) log.setLevel(logging.INFO) stdscr = curses.initscr() -api = JSONRPCProxy.from_url("http://localhost:5280") +api = JSONRPCProxy.from_url("http://localhost:5279") def init_curses(): @@ -31,13 +31,13 @@ def refresh(last_contacts, last_blobs): try: routing_table_info = api.routing_table_get() - node_id = routing_table_info['node id'] + node_id = routing_table_info['node_id'] except: node_id = "UNKNOWN" routing_table_info = { 'buckets': {}, 'contacts': [], - 'blob hashes': [] + 'blob_hashes': [] } for y in range(height): stdscr.addstr(y, 0, " " * (width - 1)) @@ -46,14 +46,15 @@ def refresh(last_contacts, last_blobs): stdscr.addstr(0, 0, "node id: %s" % node_id) stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" % (len(buckets), len(routing_table_info['contacts']), - len(routing_table_info['blob hashes']))) + len(routing_table_info['blob_hashes']))) y = 3 for i in sorted(buckets.keys()): stdscr.addstr(y, 0, "bucket %s" % i) y += 1 - for h in sorted(buckets[i], key=lambda x: x['id'].decode('hex')): - stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['id'], h['address'], len(h['blobs']))) + for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')): + stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['node_id'], h['address'], + len(h['blobs']))) y += 1 y += 1 @@ -67,8 +68,8 @@ def refresh(last_contacts, last_blobs): for c in lost_contacts: log.info("lost contact %s", c) - new_blobs = set(routing_table_info['blob hashes']) - last_blobs - lost_blobs = last_blobs - set(routing_table_info['blob hashes']) + new_blobs = set(routing_table_info['blob_hashes']) - last_blobs + lost_blobs = last_blobs - set(routing_table_info['blob_hashes']) if new_blobs: for c in new_blobs: @@ -79,7 +80,7 @@ def refresh(last_contacts, last_blobs): stdscr.addstr(y + 1, 0, str(time.time())) stdscr.refresh() - return set(routing_table_info['contacts']), set(routing_table_info['blob hashes']) + return set(routing_table_info['contacts']), set(routing_table_info['blob_hashes']) def do_main():