diff --git a/CHANGELOG.md b/CHANGELOG.md index 293096e51..2d1086763 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,39 +13,77 @@ at anytime. * ### Fixed - * Fixed handling cancelled blob and availability requests - * Fixed redundant blob requests to a peer - * Fixed https://github.com/lbryio/lbry/issues/923 - * Fixed concurrent reflects opening too many files - * Fixed cases when reflecting would fail on error conditions - * Fixed deadlocks from occuring during blob writes + * Fixed slow startup for nodes with many lbry files + * Fixed setting the external ip on startup + * Fixed session startup not blocking on joining the dht + * Fixed several parsing bugs that prevented replacing dead dht contacts + * Fixed lbryid length validation + * Fixed an old print statement that polluted logs + * Fixed rpc id length for dht requests ### Deprecated - * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. + * * ### Changed - * Announcing by head blob is turned on by default - * Updated reflector server dns + * Use the first port available for the peer and dht ports, starting with the provided values (defaults of 3333 and 4444). This allows multiple lbrynet instances in a LAN with UPnP. + * Detect a UPnP redirect that didn't get cleaned up on a previous run and use it + * Bumped jsonschema requirement to 2.6.0 * Moved tests into the lbrynet package. * Refactor some assert statements to accommodate the PYTHONOPTIMIZE flag set for Android. - + ### Added - * Added WAL pragma to sqlite3 - * Added unit tests for `BlobFile` - * Updated exchange rate tests for the lbry.io api - * Use `hashlib` for sha384 instead of `pycrypto` - * Use `cryptography` instead of `pycrypto` for blob encryption and decryption - * Use `cryptography` for PKCS7 instead of doing it manually - * Use `BytesIO` buffers instead of temp files when processing blobs - * Refactored and pruned blob related classes into `lbrynet.blobs` - * Changed several `assert`s to raise more useful errors - * Added ability for reflector to store stream information for head blob announce - * Added blob announcement information to API call status with session flag + * + * ### Removed - * Removed `TempBlobFile` - * Removed unused `EncryptedFileOpener` + * + * + + +## [0.17.0] - 2017-10-12 +### Fixed + * Fixed handling cancelled blob and availability requests + * Fixed redundant blob requests to a peer + * Fixed https://github.com/lbryio/lbry/issues/923 + * Fixed concurrent reflects opening too many files + * Fixed cases when reflecting would fail on error conditions + * Fixed deadlocks from occuring during blob writes + * Fixed and updated`lbrynet.tests.dht` + * Fixed redundant dht id + * Fixed dht `ping` method + * Fixed raising remote exceptions in dht + * Fixed hanging delayedCall in dht node class + * Fixed logging error in dht when calling or receiving methods with no arguments + * Fixed IndexError in routingTable.findCloseNodes which would cause an empty list to be returned + * Fixed bug where last blob in a stream was not saved to blob manager + +### Deprecated + * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. + +### Changed + * Bumped `lbryschema` requirement to 0.0.12 [see changelog](https://github.com/lbryio/lbryschema/blob/master/CHANGELOG.md#0012---2017-10-12) + * Bumped `lbryum` requirement to 3.1.9 [see changelog](https://github.com/lbryio/lbryum/blob/master/CHANGELOG.md#319---2017-10-12) + * Announcing by head blob is turned on by default + * Updated reflector server dns + * Moved tests into the lbrynet package. + +### Added + * Added WAL pragma to sqlite3 + * Added unit tests for `BlobFile` + * Updated exchange rate tests for the lbry.io api + * Use `hashlib` for sha384 instead of `pycrypto` + * Use `cryptography` instead of `pycrypto` for blob encryption and decryption + * Use `cryptography` for PKCS7 instead of doing it manually + * Use `BytesIO` buffers instead of temp files when processing blobs + * Refactored and pruned blob related classes into `lbrynet.blobs` + * Changed several `assert`s to raise more useful errors + * Added ability for reflector to store stream information for head blob announce + * Added blob announcement information to API call status with session flag + +### Removed + * Removed `TempBlobFile` + * Removed unused `EncryptedFileOpener` ## [0.16.3] - 2017-09-28 diff --git a/LICENSE b/LICENSE index c6ccaeb72..25f871f80 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2015-2016 LBRY Inc +Copyright (c) 2015-2017 LBRY Inc Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, diff --git a/docs/cli.md b/docs/cli.md index 017865ff1..c0f84a0a4 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -759,6 +759,32 @@ Returns: resolvable ``` +## routing_table_get + +```text +Get DHT routing information + +Usage: + routing_table_get + +Returns: + (dict) dictionary containing routing and contact information + { + "buckets": { + : [ + { + "address": (str) peer address, + "node_id": (str) peer node id, + "blobs": (list) blob hashes announced by peer + } + ] + }, + "contacts": (list) contact node ids, + "blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets, + "node_id": (str) the local dht node id + } +``` + ## settings_get ```text @@ -857,6 +883,8 @@ Returns: 'session_status': { 'managed_blobs': count of blobs in the blob manager, 'managed_streams': count of streams in the file manager + 'announce_queue_size': number of blobs currently queued to be announced + 'should_announce_blobs': number of blobs that should be announced } If given the dht status option: diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index 36da834ae..271f46286 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.17.0rc13" +__version__ = "0.17.1rc4" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 2516ce44a..78cf974ad 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -168,7 +168,6 @@ class BlobFile(object): """ if file_handle is not None: file_handle.close() - self.readers -= 1 def reader_finished(self, reader): self.readers -= 1 diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 04dbe491c..edf4e425f 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -37,18 +37,18 @@ class Session(object): """ def __init__(self, blob_data_payment_rate, db_dir=None, - lbryid=None, peer_manager=None, dht_node_port=None, + node_id=None, peer_manager=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, blob_tracker_class=None, - payment_rate_manager_class=None, is_generous=True): + payment_rate_manager_class=None, is_generous=True, external_ip=None): """@param blob_data_payment_rate: The default payment rate for blob data @param db_dir: The directory in which levelDB files should be stored - @param lbryid: The unique ID of this node + @param node_id: The unique ID of this node @param peer_manager: An object which keeps track of all known peers. If None, a PeerManager will be created @@ -101,7 +101,7 @@ class Session(object): """ self.db_dir = db_dir - self.lbryid = lbryid + self.node_id = node_id self.peer_manager = peer_manager @@ -124,7 +124,7 @@ class Session(object): self.rate_limiter = rate_limiter - self.external_ip = '127.0.0.1' + self.external_ip = external_ip self.upnp_redirects = [] @@ -142,8 +142,8 @@ class Session(object): log.debug("Starting session.") - if self.lbryid is None: - self.lbryid = generate_id() + if self.node_id is None: + self.node_id = generate_id() if self.wallet is None: from lbrynet.core.PTCWallet import PTCWallet @@ -193,6 +193,31 @@ class Session(object): log.debug("In _try_upnp") + def get_free_port(upnp, port, protocol): + # returns an existing mapping if it exists + mapping = upnp.getspecificportmapping(port, protocol) + if not mapping: + return port + if upnp.lanaddr == mapping[0]: + return mapping + return get_free_port(upnp, port + 1, protocol) + + def get_port_mapping(upnp, internal_port, protocol, description): + # try to map to the requested port, if there is already a mapping use the next external + # port available + if protocol not in ['UDP', 'TCP']: + raise Exception("invalid protocol") + external_port = get_free_port(upnp, internal_port, protocol) + if isinstance(external_port, tuple): + log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it", + self.external_ip, external_port[1], protocol, upnp.lanaddr, internal_port) + return external_port[1], protocol + upnp.addportmapping(external_port, protocol, upnp.lanaddr, internal_port, + description, '') + log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, external_port, + protocol, upnp.lanaddr, internal_port) + return external_port, protocol + def threaded_try_upnp(): if self.use_upnp is False: log.debug("Not using upnp") @@ -202,40 +227,15 @@ class Session(object): if num_devices_found > 0: u.selectigd() external_ip = u.externalipaddress() - if external_ip != '0.0.0.0': + if external_ip != '0.0.0.0' and not self.external_ip: + # best not to rely on this external ip, the router can be behind layers of NATs self.external_ip = external_ip - if self.peer_port is not None: - if u.getspecificportmapping(self.peer_port, 'TCP') is None: - u.addportmapping( - self.peer_port, 'TCP', u.lanaddr, self.peer_port, - 'LBRY peer port', '') - self.upnp_redirects.append((self.peer_port, 'TCP')) - log.info("Set UPnP redirect for TCP port %d", self.peer_port) - else: - # see comment below - log.warning("UPnP redirect already set for TCP port %d", self.peer_port) - self.upnp_redirects.append((self.peer_port, 'TCP')) - if self.dht_node_port is not None: - if u.getspecificportmapping(self.dht_node_port, 'UDP') is None: - u.addportmapping( - self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port, - 'LBRY DHT port', '') - self.upnp_redirects.append((self.dht_node_port, 'UDP')) - log.info("Set UPnP redirect for UDP port %d", self.dht_node_port) - else: - # TODO: check that the existing redirect was - # put up by an old lbrynet session before - # grabbing it if such a disconnected redirect - # exists, then upnp won't work unless the - # redirect is appended or is torn down and set - # back up. a bad shutdown of lbrynet could - # leave such a redirect up and cause problems - # on the next start. this could be - # problematic if a previous lbrynet session - # didn't make the redirect, and it was made by - # another application - log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port) - self.upnp_redirects.append((self.dht_node_port, 'UDP')) + if self.peer_port: + self.upnp_redirects.append(get_port_mapping(u, self.peer_port, 'TCP', + 'LBRY peer port')) + if self.dht_node_port: + self.upnp_redirects.append(get_port_mapping(u, self.dht_node_port, 'UDP', + 'LBRY DHT port')) return True return False @@ -260,8 +260,7 @@ class Session(object): addresses.append(value) return addresses - def start_dht(addresses): - self.dht_node.joinNetwork(addresses) + def start_dht(join_network_result): self.peer_finder.run_manage_loop() self.hash_announcer.run_manage_loop() return True @@ -274,7 +273,7 @@ class Session(object): self.dht_node = self.dht_node_class( udpPort=self.dht_node_port, - lbryid=self.lbryid, + node_id=self.node_id, externalIP=self.external_ip ) self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager) @@ -283,6 +282,7 @@ class Session(object): dl = defer.DeferredList(ds) dl.addCallback(join_resolved_addresses) + dl.addCallback(self.dht_node.joinNetwork) dl.addCallback(start_dht) return dl diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index b8861a6ab..061e87ad7 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -140,9 +140,8 @@ class ClientProtocol(Protocol, TimeoutMixin): self._send_request_message(request_msg) else: # The connection manager has indicated that this connection should be terminated - log.info( - "Closing the connection to %s due to having no further requests to send", - self.peer) + log.debug("Closing the connection to %s due to having no further requests to send", + self.peer) self.peer.report_success() self.transport.loseConnection() d = self._connection_manager.get_next_request(self.peer, self) diff --git a/lbrynet/core/client/DHTPeerFinder.py b/lbrynet/core/client/DHTPeerFinder.py index c24a8e112..488dce1ed 100644 --- a/lbrynet/core/client/DHTPeerFinder.py +++ b/lbrynet/core/client/DHTPeerFinder.py @@ -46,7 +46,10 @@ class DHTPeerFinder(object): if timeout is not None: reactor.callLater(timeout, _trigger_timeout) - peer_list = yield finished_deferred + try: + peer_list = yield finished_deferred + except defer.CancelledError: + peer_list = [] peers = set(peer_list) good_peers = [] diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index 9c94ad476..b0a2db2d2 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -69,21 +69,26 @@ class CryptStreamCreator(object): self.stopped = True self.producer = None + def _close_current_blob(self): + # close the blob that was being written to + # and save it to blob manager + should_announce = self.blob_count == 0 + d = self.current_blob.close() + d.addCallback(self._blob_finished) + d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info, + should_announce)) + self.finished_deferreds.append(d) + self.current_blob = None + def stop(self): """Stop creating the stream. Create the terminating zero-length blob.""" log.debug("stop has been called for StreamCreator") self.stopped = True if self.current_blob is not None: - current_blob = self.current_blob - d = current_blob.close() - d.addCallback(self._blob_finished) - d.addErrback(self._error) - self.finished_deferreds.append(d) - self.current_blob = None + self._close_current_blob() self._finalize() dl = defer.DeferredList(self.finished_deferreds) dl.addCallback(lambda _: self._finished()) - dl.addErrback(self._error) return dl # TODO: move the stream creation process to its own thread and @@ -123,6 +128,7 @@ class CryptStreamCreator(object): d.addCallback(self._blob_finished) self.finished_deferreds.append(d) + def _write(self, data): while len(data) > 0: if self.current_blob is None: @@ -133,20 +139,11 @@ class CryptStreamCreator(object): done, num_bytes_written = self.current_blob.write(data) data = data[num_bytes_written:] if done is True: - should_announce = self.blob_count == 0 - d = self.current_blob.close() - d.addCallback(self._blob_finished) - d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info, - should_announce)) - self.finished_deferreds.append(d) - self.current_blob = None + self._close_current_blob() def _get_blob_maker(self, iv, blob_creator): return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator) - def _error(self, error): - log.error(error) - def _finished(self): raise NotImplementedError() diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 3aaa2722f..87d6a54aa 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -9,7 +9,7 @@ import json import textwrap import random import signal - +from copy import deepcopy from twisted.web import server from twisted.internet import defer, threads, error, reactor from twisted.internet.task import LoopingCall @@ -206,7 +206,7 @@ class Daemon(AuthJSONRPCServer): # of the daemon, but I don't want to deal with that now self.analytics_manager = analytics_manager - self.lbryid = conf.settings.node_id + self.node_id = conf.settings.node_id self.wallet_user = None self.wallet_password = None @@ -562,14 +562,15 @@ class Daemon(AuthJSONRPCServer): self.session = Session( conf.settings['data_rate'], db_dir=self.db_dir, - lbryid=self.lbryid, + node_id=self.node_id, blob_dir=self.blobfile_dir, dht_node_port=self.dht_node_port, known_dht_nodes=conf.settings['known_dht_nodes'], peer_port=self.peer_port, use_upnp=self.use_upnp, wallet=wallet, - is_generous=conf.settings['is_generous_host'] + is_generous=conf.settings['is_generous_host'], + external_ip=self.platform['ip'] ) self.startup_status = STARTUP_STAGES[2] @@ -1054,7 +1055,7 @@ class Daemon(AuthJSONRPCServer): best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None response = { - 'lbry_id': base58.b58encode(self.lbryid), + 'lbry_id': base58.b58encode(self.node_id), 'installation_id': conf.settings.installation_id, 'is_running': self.announced_startup, 'is_first_run': self.session.wallet.is_first_run if has_wallet else None, @@ -2658,6 +2659,81 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + def jsonrpc_routing_table_get(self): + """ + Get DHT routing information + + Usage: + routing_table_get + + Returns: + (dict) dictionary containing routing and contact information + { + "buckets": { + : [ + { + "address": (str) peer address, + "node_id": (str) peer node id, + "blobs": (list) blob hashes announced by peer + } + ] + }, + "contacts": (list) contact node ids, + "blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets, + "node_id": (str) the local dht node id + } + """ + + result = {} + data_store = deepcopy(self.session.dht_node._dataStore._dict) + datastore_len = len(data_store) + hosts = {} + + if datastore_len: + for k, v in data_store.iteritems(): + for value, lastPublished, originallyPublished, originalPublisherID in v: + try: + contact = self.session.dht_node._routingTable.getContact( + originalPublisherID) + except ValueError: + continue + if contact in hosts: + blobs = hosts[contact] + else: + blobs = [] + blobs.append(k.encode('hex')) + hosts[contact] = blobs + + contact_set = [] + blob_hashes = [] + result['buckets'] = {} + + for i in range(len(self.session.dht_node._routingTable._buckets)): + for contact in self.session.dht_node._routingTable._buckets[i]._contacts: + contacts = result['buckets'].get(i, []) + if contact in hosts: + blobs = hosts[contact] + del hosts[contact] + else: + blobs = [] + host = { + "address": contact.address, + "node_id": contact.id.encode("hex"), + "blobs": blobs, + } + for blob_hash in blobs: + if blob_hash not in blob_hashes: + blob_hashes.append(blob_hash) + contacts.append(host) + result['buckets'][i] = contacts + if contact.id.encode('hex') not in contact_set: + contact_set.append(contact.id.encode("hex")) + + result['contacts'] = contact_set + result['blob_hashes'] = blob_hashes + result['node_id'] = self.session.dht_node.node_id.encode('hex') + return self._render_response(result) + @defer.inlineCallbacks def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): """ diff --git a/lbrynet/dht/constants.py b/lbrynet/dht/constants.py index 1cee46311..f84c89d2e 100644 --- a/lbrynet/dht/constants.py +++ b/lbrynet/dht/constants.py @@ -21,9 +21,15 @@ alpha = 3 #: Maximum number of contacts stored in a bucket; this should be an even number k = 8 +#: Maximum number of contacts stored in the replacement cache +replacementCacheSize = 8 + #: Timeout for network operations (in seconds) rpcTimeout = 5 +# number of rpc attempts to make before a timeout results in the node being removed as a contact +rpcAttempts = 5 + # Delay between iterations of iterative node lookups (for loose parallelism) (in seconds) iterativeLookupDelay = rpcTimeout / 2 @@ -53,3 +59,5 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj h = get_lbry_hash_obj() key_bits = h.digest_size * 8 # 384 bits + +rpc_id_length = 20 diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index 6109d9f9a..cba054e0d 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -1,13 +1,3 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - - class Contact(object): """ Encapsulation for remote contact diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index bdaf47644..a53942455 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -1,33 +1,13 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - import UserDict import time import constants +from interface import IDataStore +from zope.interface import implements -class DataStore(UserDict.DictMixin): - """ Interface for classes implementing physical storage (for data - published via the "STORE" RPC) for the Kademlia DHT - - @note: This provides an interface for a dict-like object - """ - - def keys(self): - """ Return a list of the keys in this data store """ - - def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): - pass - - -class DictDataStore(DataStore): +class DictDataStore(UserDict.DictMixin): """ A datastore using an in-memory Python dictionary """ + implements(IDataStore) def __init__(self): # Dictionary format: @@ -64,3 +44,9 @@ class DictDataStore(DataStore): def getPeersForBlob(self, key): if key in self._dict: return [val[0] for val in self._dict[key]] + + def removePeer(self, value): + for key in self._dict: + self._dict[key] = [val for val in self._dict[key] if val[0] != value] + if not self._dict[key]: + del self._dict[key] diff --git a/lbrynet/dht/delay.py b/lbrynet/dht/delay.py new file mode 100644 index 000000000..9610a73f8 --- /dev/null +++ b/lbrynet/dht/delay.py @@ -0,0 +1,22 @@ +import time + + +class Delay(object): + maxToSendDelay = 10 ** -3 # 0.05 + minToSendDelay = 10 ** -5 # 0.01 + + def __init__(self, start=0): + self._next = start + + # TODO: explain why this logic is like it is. And add tests that + # show that it actually does what it needs to do. + def __call__(self): + ts = time.time() + delay = 0 + if ts >= self._next: + delay = self.minToSendDelay + self._next = ts + self.minToSendDelay + else: + delay = (self._next - ts) + self.maxToSendDelay + self._next += self.maxToSendDelay + return delay diff --git a/lbrynet/dht/encoding.py b/lbrynet/dht/encoding.py index bc7e88ca0..45aeb3496 100644 --- a/lbrynet/dht/encoding.py +++ b/lbrynet/dht/encoding.py @@ -1,17 +1,4 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - - -class DecodeError(Exception): - """ Should be raised by an C{Encoding} implementation if decode operation - fails - """ +from error import DecodeError class Encoding(object): diff --git a/lbrynet/dht/error.py b/lbrynet/dht/error.py new file mode 100644 index 000000000..3111adf8f --- /dev/null +++ b/lbrynet/dht/error.py @@ -0,0 +1,38 @@ +import binascii +import exceptions + +# this is a dict of {"exceptions.": exception class} items used to raise +# remote built-in exceptions locally +BUILTIN_EXCEPTIONS = { + "exceptions.%s" % e: getattr(exceptions, e) for e in dir(exceptions) if not e.startswith("_") +} + + +class DecodeError(Exception): + """ + Should be raised by an C{Encoding} implementation if decode operation + fails + """ + pass + + +class BucketFull(Exception): + """ + Raised when the bucket is full + """ + pass + + +class UnknownRemoteException(Exception): + pass + + +class TimeoutError(Exception): + """ Raised when a RPC times out """ + + def __init__(self, remote_contact_id): + # remote_contact_id is a binary blob so we need to convert it + # into something more readable + msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id)) + Exception.__init__(self, msg) + self.remote_contact_id = remote_contact_id diff --git a/lbrynet/dht/interface.py b/lbrynet/dht/interface.py new file mode 100644 index 000000000..67b4984a5 --- /dev/null +++ b/lbrynet/dht/interface.py @@ -0,0 +1,117 @@ +from zope.interface import Interface + + +class IDataStore(Interface): + """ Interface for classes implementing physical storage (for data + published via the "STORE" RPC) for the Kademlia DHT + + @note: This provides an interface for a dict-like object + """ + + def keys(self): + """ Return a list of the keys in this data store """ + pass + + def removeExpiredPeers(self): + pass + + def hasPeersForBlob(self, key): + pass + + def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): + pass + + def getPeersForBlob(self, key): + pass + + def removePeer(self, key): + pass + + +class IRoutingTable(Interface): + """ Interface for RPC message translators/formatters + + Classes inheriting from this should provide a suitable routing table for + a parent Node object (i.e. the local entity in the Kademlia network) + """ + + def __init__(self, parentNodeID): + """ + @param parentNodeID: The n-bit node ID of the node to which this + routing table belongs + @type parentNodeID: str + """ + + def addContact(self, contact): + """ Add the given contact to the correct k-bucket; if it already + exists, its status will be updated + + @param contact: The contact to add to this node's k-buckets + @type contact: kademlia.contact.Contact + """ + + def findCloseNodes(self, key, count, _rpcNodeID=None): + """ Finds a number of known nodes closest to the node/value with the + specified key. + + @param key: the n-bit key (i.e. the node or value ID) to search for + @type key: str + @param count: the amount of contacts to return + @type count: int + @param _rpcNodeID: Used during RPC, this is be the sender's Node ID + Whatever ID is passed in the paramater will get + excluded from the list of returned contacts. + @type _rpcNodeID: str + + @return: A list of node contacts (C{kademlia.contact.Contact instances}) + closest to the specified key. + This method will return C{k} (or C{count}, if specified) + contacts if at all possible; it will only return fewer if the + node is returning all of the contacts that it knows of. + @rtype: list + """ + + def getContact(self, contactID): + """ Returns the (known) contact with the specified node ID + + @raise ValueError: No contact with the specified contact ID is known + by this node + """ + + def getRefreshList(self, startIndex=0, force=False): + """ Finds all k-buckets that need refreshing, starting at the + k-bucket with the specified index, and returns IDs to be searched for + in order to refresh those k-buckets + + @param startIndex: The index of the bucket to start refreshing at; + this bucket and those further away from it will + be refreshed. For example, when joining the + network, this node will set this to the index of + the bucket after the one containing it's closest + neighbour. + @type startIndex: index + @param force: If this is C{True}, all buckets (in the specified range) + will be refreshed, regardless of the time they were last + accessed. + @type force: bool + + @return: A list of node ID's that the parent node should search for + in order to refresh the routing Table + @rtype: list + """ + + def removeContact(self, contactID): + """ Remove the contact with the specified node ID from the routing + table + + @param contactID: The node ID of the contact to remove + @type contactID: str + """ + + def touchKBucket(self, key): + """ Update the "last accessed" timestamp of the k-bucket which covers + the range containing the specified key in the key/ID space + + @param key: A key in the range of the target k-bucket + @type key: str + """ diff --git a/lbrynet/dht/kbucket.py b/lbrynet/dht/kbucket.py index 227fec409..ead763895 100644 --- a/lbrynet/dht/kbucket.py +++ b/lbrynet/dht/kbucket.py @@ -1,17 +1,5 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - import constants - - -class BucketFull(Exception): - """ Raised when the bucket is full """ +from error import BucketFull class KBucket(object): diff --git a/lbrynet/dht/msgtypes.py b/lbrynet/dht/msgtypes.py index 3c1aeabe2..6eb2d3e74 100644 --- a/lbrynet/dht/msgtypes.py +++ b/lbrynet/dht/msgtypes.py @@ -8,12 +8,17 @@ # may be created by processing this file with epydoc: http://epydoc.sf.net from lbrynet.core.utils import generate_id +import constants class Message(object): """ Base class for messages - all "unknown" messages use this class """ def __init__(self, rpcID, nodeID): + if len(rpcID) != constants.rpc_id_length: + raise ValueError("invalid rpc id: %i bytes (expected 20)" % len(rpcID)) + if len(nodeID) != constants.key_bits / 8: + raise ValueError("invalid node id: %i bytes (expected 48)" % len(nodeID)) self.id = rpcID self.nodeID = nodeID @@ -23,7 +28,7 @@ class RequestMessage(Message): def __init__(self, nodeID, method, methodArgs, rpcID=None): if rpcID is None: - rpcID = generate_id() + rpcID = generate_id()[:constants.rpc_id_length] Message.__init__(self, rpcID, nodeID) self.request = method self.args = methodArgs diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 858b03a88..c77dc27c8 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -12,7 +12,7 @@ import operator import struct import time -from twisted.internet import defer, error, reactor, threads +from twisted.internet import defer, error, reactor, threads, task import constants import routingtable @@ -49,8 +49,8 @@ class Node(object): application is performed via this class (or a subclass). """ - def __init__(self, id=None, udpPort=4000, dataStore=None, - routingTableClass=None, networkProtocol=None, lbryid=None, + def __init__(self, node_id=None, udpPort=4000, dataStore=None, + routingTableClass=None, networkProtocol=None, externalIP=None): """ @param dataStore: The data store to use. This must be class inheriting @@ -74,11 +74,7 @@ class Node(object): being transmitted. @type networkProtocol: entangled.kademlia.protocol.KademliaProtocol """ - if id != None: - self.id = id - else: - self.id = self._generateID() - self.lbryid = lbryid + self.node_id = node_id or self._generateID() self.port = udpPort self._listeningPort = None # object implementing Twisted # IListeningPort This will contain a deferred created when @@ -88,12 +84,12 @@ class Node(object): # operations before the node has finished joining the network) self._joinDeferred = None self.next_refresh_call = None - self.next_change_token_call = None + self.change_token_lc = task.LoopingCall(self.change_token) # Create k-buckets (for storing contacts) if routingTableClass is None: - self._routingTable = routingtable.OptimizedTreeRoutingTable(self.id) + self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id) else: - self._routingTable = routingTableClass(self.id) + self._routingTable = routingTableClass(self.node_id) # Initialize this node's network access mechanisms if networkProtocol is None: @@ -103,7 +99,6 @@ class Node(object): # Initialize the data storage mechanism used by this node self.token_secret = self._generateID() self.old_token_secret = None - self.change_token() if dataStore is None: self._dataStore = datastore.DictDataStore() else: @@ -111,7 +106,7 @@ class Node(object): # Try to restore the node's state... if 'nodeState' in self._dataStore: state = self._dataStore['nodeState'] - self.id = state['id'] + self.node_id = state['id'] for contactTriple in state['closestNodes']: contact = Contact( contactTriple[0], contactTriple[1], contactTriple[2], self._protocol) @@ -128,9 +123,8 @@ class Node(object): if self.next_refresh_call is not None: self.next_refresh_call.cancel() self.next_refresh_call = None - if self.next_change_token_call is not None: - self.next_change_token_call.cancel() - self.next_change_token_call = None + if self.change_token_lc.running: + self.change_token_lc.stop() if self._listeningPort is not None: self._listeningPort.stopListening() self.hash_watcher.stop() @@ -163,8 +157,12 @@ class Node(object): bootstrapContacts.append(contact) else: bootstrapContacts = None + + # Start the token looping call + self.change_token_lc.start(constants.tokenSecretChangeInterval) + # Initiate the Kademlia joining sequence - perform a search for this node's own ID - self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts) + self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) # #TODO: Refresh all k-buckets further away than this node's closest neighbour # Start refreshing k-buckets periodically, if necessary self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, @@ -173,18 +171,27 @@ class Node(object): self.hash_watcher.tick() yield self._joinDeferred + @property + def contacts(self): + def _inner(): + for i in range(len(self._routingTable._buckets)): + for contact in self._routingTable._buckets[i]._contacts: + yield contact + return list(_inner()) + def printContacts(self, *args): print '\n\nNODE CONTACTS\n===============' for i in range(len(self._routingTable._buckets)): + print "bucket %i" % i for contact in self._routingTable._buckets[i]._contacts: - print contact + print " %s:%i" % (contact.address, contact.port) print '==================================' def getApproximateTotalDHTNodes(self): # get the deepest bucket and the number of contacts in that bucket and multiply it # by the number of equivalently deep buckets in the whole DHT to get a really bad # estimate! - bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)] + bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.node_id)] num_in_bucket = len(bucket._contacts) factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin) return num_in_bucket * factor @@ -200,30 +207,24 @@ class Node(object): return num_in_data_store * self.getApproximateTotalDHTNodes() / 8 def announceHaveBlob(self, key, port): - return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid}) + return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id}) + @defer.inlineCallbacks def getPeersForBlob(self, blob_hash): - def expand_and_filter(result): - expanded_peers = [] - if isinstance(result, dict): - if blob_hash in result: - for peer in result[blob_hash]: - if self.lbryid != peer[6:]: - host = ".".join([str(ord(d)) for d in peer[:4]]) - if host == "127.0.0.1": - if "from_peer" in result: - if result["from_peer"] != "self": - host = result["from_peer"] - port, = struct.unpack('>H', peer[4:6]) + result = yield self.iterativeFindValue(blob_hash) + expanded_peers = [] + if result: + if blob_hash in result: + for peer in result[blob_hash]: + if self.node_id != peer[6:]: + host = ".".join([str(ord(d)) for d in peer[:4]]) + if host == "127.0.0.1" and "from_peer" in result \ + and result["from_peer"] != "self": + host = result["from_peer"] + port, = struct.unpack('>H', peer[4:6]) + if (host, port) not in expanded_peers: expanded_peers.append((host, port)) - return expanded_peers - - def find_failed(err): - return [] - - d = self.iterativeFindValue(blob_hash) - d.addCallbacks(expand_and_filter, find_failed) - return d + defer.returnValue(expanded_peers) def get_most_popular_hashes(self, num_to_return): return self.hash_watcher.most_popular_hashes(num_to_return) @@ -263,7 +264,7 @@ class Node(object): result = responseMsg.response if 'token' in result: value['token'] = result['token'] - d = n.store(blob_hash, value, self.id, 0) + d = n.store(blob_hash, value, self.node_id, 0) d.addCallback(log_success) d.addErrback(log_error, n) else: @@ -272,12 +273,12 @@ class Node(object): def requestPeers(contacts): if self.externalIP is not None and len(contacts) >= constants.k: - is_closer = Distance(blob_hash).is_closer(self.id, contacts[-1].id) + is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) if is_closer: contacts.pop() - self.store(blob_hash, value, self_store=True, originalPublisherID=self.id) + self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) elif self.externalIP is not None: - self.store(blob_hash, value, self_store=True, originalPublisherID=self.id) + self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) ds = [] for contact in contacts: known_nodes[contact.id] = contact @@ -295,8 +296,6 @@ class Node(object): def change_token(self): self.old_token_secret = self.token_secret self.token_secret = self._generateID() - self.next_change_token_call = reactor.callLater(constants.tokenSecretChangeInterval, - self.change_token) def make_token(self, compact_ip): h = hashlib.new('sha384') @@ -463,14 +462,13 @@ class Node(object): raise TypeError, 'No NodeID given. Therefore we can\'t store this node' if self_store is True and self.externalIP: - contact = Contact(self.id, self.externalIP, self.port, None, None) + contact = Contact(self.node_id, self.externalIP, self.port, None, None) compact_ip = contact.compact_ip() elif '_rpcNodeContact' in kwargs: contact = kwargs['_rpcNodeContact'] compact_ip = contact.compact_ip() else: - return 'Not OK' - # raise TypeError, 'No contact info available' + raise TypeError, 'No contact info available' if ((self_store is False) and ('token' not in value or not self.verify_token(value['token'], compact_ip))): @@ -486,8 +484,9 @@ class Node(object): raise TypeError, 'No port available' if 'lbryid' in value: - if len(value['lbryid']) > constants.key_bits: - raise ValueError, 'Invalid lbryid' + if len(value['lbryid']) != constants.key_bits / 8: + raise ValueError('Invalid lbryid (%i bytes): %s' % (len(value['lbryid']), + value['lbryid'].encode('hex'))) else: compact_address = compact_ip + compact_port + value['lbryid'] else: @@ -513,6 +512,7 @@ class Node(object): node is returning all of the contacts that it knows of. @rtype: list """ + # Get the sender's ID (if any) if '_rpcNodeID' in kwargs: rpc_sender_id = kwargs['_rpcNodeID'] @@ -589,11 +589,12 @@ class Node(object): findValue = rpc != 'findNode' if startupShortlist is None: - shortlist = self._routingTable.findCloseNodes(key, constants.alpha) - if key != self.id: + shortlist = self._routingTable.findCloseNodes(key, constants.k) + if key != self.node_id: # Update the "last accessed" timestamp for the appropriate k-bucket self._routingTable.touchKBucket(key) if len(shortlist) == 0: + log.warning("This node doesnt know any other nodes") # This node doesn't know of any other nodes fakeDf = defer.Deferred() fakeDf.callback([]) @@ -686,7 +687,7 @@ class _IterativeFindHelper(object): responseMsg = responseTuple[0] originAddress = responseTuple[1] # tuple: (ip adress, udp port) # Make sure the responding node is valid, and abort the operation if it isn't - if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id: + if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.node_id: return responseMsg.nodeID # Mark this node as active @@ -752,10 +753,11 @@ class _IterativeFindHelper(object): if testContact not in self.shortlist: self.shortlist.append(testContact) - def removeFromShortlist(self, failure): + def removeFromShortlist(self, failure, deadContactID): """ @type failure: twisted.python.failure.Failure """ failure.trap(protocol.TimeoutError) - deadContactID = failure.getErrorMessage() + if len(deadContactID) != constants.key_bits / 8: + raise ValueError("invalid lbry id") if deadContactID in self.shortlist: self.shortlist.remove(deadContactID) return deadContactID @@ -825,7 +827,7 @@ class _IterativeFindHelper(object): rpcMethod = getattr(contact, self.rpc) df = rpcMethod(self.key, rawResponse=True) df.addCallback(self.extendShortlist) - df.addErrback(self.removeFromShortlist) + df.addErrback(self.removeFromShortlist, contact.id) df.addCallback(self.cancelActiveProbe) df.addErrback(lambda _: log.exception('Failed to contact %s', contact)) self.already_contacted.append(contact.id) diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index ba7aba586..9e535138e 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -1,62 +1,21 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# -# The docstrings in this module contain epytext markup; API documentation -# may be created by processing this file with epydoc: http://epydoc.sf.net - import logging -import binascii import time import socket import errno from twisted.internet import protocol, defer, error, reactor, task -from twisted.python import failure import constants import encoding import msgtypes import msgformat from contact import Contact +from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError +from delay import Delay log = logging.getLogger(__name__) -class TimeoutError(Exception): - """ Raised when a RPC times out """ - - def __init__(self, remote_contact_id): - # remote_contact_id is a binary blob so we need to convert it - # into something more readable - msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id)) - Exception.__init__(self, msg) - self.remote_contact_id = remote_contact_id - - -class Delay(object): - maxToSendDelay = 10 ** -3 # 0.05 - minToSendDelay = 10 ** -5 # 0.01 - - def __init__(self, start=0): - self._next = start - - # TODO: explain why this logic is like it is. And add tests that - # show that it actually does what it needs to do. - def __call__(self): - ts = time.time() - delay = 0 - if ts >= self._next: - delay = self.minToSendDelay - self._next = ts + self.minToSendDelay - else: - delay = (self._next - ts) + self.maxToSendDelay - self._next += self.maxToSendDelay - return delay - - class KademliaProtocol(protocol.DatagramProtocol): """ Implements all low-level network-related functions of a Kademlia node """ @@ -195,11 +154,14 @@ class KademliaProtocol(protocol.DatagramProtocol): C{ErrorMessage}). @rtype: twisted.internet.defer.Deferred """ - msg = msgtypes.RequestMessage(self._node.id, method, args) + msg = msgtypes.RequestMessage(self._node.node_id, method, args) msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) - log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex')) + if args: + log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex')) + else: + log.debug("DHT SEND CALL %s", method) df = defer.Deferred() if rawResponse: @@ -209,7 +171,7 @@ class KademliaProtocol(protocol.DatagramProtocol): timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id) # Transmit the data self._send(encodedMsg, msg.id, (contact.address, contact.port)) - self._sentMessages[msg.id] = (contact.id, df, timeoutCall) + self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args) return df def startProtocol(self): @@ -243,14 +205,14 @@ class KademliaProtocol(protocol.DatagramProtocol): return try: msgPrimitive = self._encoder.decode(datagram) - except encoding.DecodeError: + message = self._translator.fromPrimitive(msgPrimitive) + except (encoding.DecodeError, ValueError): # We received some rubbish here return except IndexError: log.warning("Couldn't decode dht datagram from %s", address) return - message = self._translator.fromPrimitive(msgPrimitive) remoteContact = Contact(message.nodeID, address[0], address[1], self) now = time.time() @@ -286,7 +248,12 @@ class KademliaProtocol(protocol.DatagramProtocol): df.callback((message, address)) elif isinstance(message, msgtypes.ErrorMessage): # The RPC request raised a remote exception; raise it locally - remoteException = Exception(message.response) + if message.exceptionType in BUILTIN_EXCEPTIONS: + exception_type = BUILTIN_EXCEPTIONS[message.exceptionType] + else: + exception_type = UnknownRemoteException + remoteException = exception_type(message.response) + log.error("Remote exception (%s): %s", address, remoteException) df.errback(remoteException) else: # We got a result from the RPC @@ -377,7 +344,7 @@ class KademliaProtocol(protocol.DatagramProtocol): def _sendResponse(self, contact, rpcID, response): """ Send a RPC response to the specified contact """ - msg = msgtypes.ResponseMessage(rpcID, self._node.id, response) + msg = msgtypes.ResponseMessage(rpcID, self._node.node_id, response) msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) self._send(encodedMsg, rpcID, (contact.address, contact.port)) @@ -385,7 +352,7 @@ class KademliaProtocol(protocol.DatagramProtocol): def _sendError(self, contact, rpcID, exceptionType, exceptionMessage): """ Send an RPC error message to the specified contact """ - msg = msgtypes.ErrorMessage(rpcID, self._node.id, exceptionType, exceptionMessage) + msg = msgtypes.ErrorMessage(rpcID, self._node.node_id, exceptionType, exceptionMessage) msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) self._send(encodedMsg, rpcID, (contact.address, contact.port)) @@ -408,48 +375,58 @@ class KademliaProtocol(protocol.DatagramProtocol): func = getattr(self._node, method, None) if callable(func) and hasattr(func, 'rpcmethod'): # Call the exposed Node method and return the result to the deferred callback chain - log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'), - senderContact.address, senderContact.port) + if args: + log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'), + senderContact.address, senderContact.port) + else: + log.debug("DHT RECV CALL %s %s:%i", method, senderContact.address, + senderContact.port) try: - kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} - result = func(*args, **kwargs) + if method != 'ping': + kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} + result = func(*args, **kwargs) + else: + result = func() except Exception, e: - df.errback(failure.Failure(e)) + log.exception("error handling request for %s: %s", senderContact.address, method) + df.errback(e) else: df.callback(result) else: # No such exposed method - df.errback(failure.Failure(AttributeError('Invalid method: %s' % method))) + df.errback(AttributeError('Invalid method: %s' % method)) def _msgTimeout(self, messageID): """ Called when an RPC request message times out """ # Find the message that timed out - if not self._sentMessages.has_key(messageID): + if messageID not in self._sentMessages: # This should never be reached log.error("deferred timed out, but is not present in sent messages list!") return - remoteContactID, df = self._sentMessages[messageID][0:2] + remoteContactID, df, timeout_call, method, args = self._sentMessages[messageID] if self._partialMessages.has_key(messageID): # We are still receiving this message - self._msgTimeoutInProgress(messageID, remoteContactID, df) + self._msgTimeoutInProgress(messageID, remoteContactID, df, method, args) return del self._sentMessages[messageID] # The message's destination node is now considered to be dead; # raise an (asynchronous) TimeoutError exception and update the host node self._node.removeContact(remoteContactID) - df.errback(failure.Failure(TimeoutError(remoteContactID))) + df.errback(TimeoutError(remoteContactID)) - def _msgTimeoutInProgress(self, messageID, remoteContactID, df): + def _msgTimeoutInProgress(self, messageID, remoteContactID, df, method, args): # See if any progress has been made; if not, kill the message if self._hasProgressBeenMade(messageID): # Reset the RPC timeout timer timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, messageID) - self._sentMessages[messageID] = (remoteContactID, df, timeoutCall) + self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args) else: # No progress has been made - del self._partialMessagesProgress[messageID] - del self._partialMessages[messageID] - df.errback(failure.Failure(TimeoutError(remoteContactID))) + if messageID in self._partialMessagesProgress: + del self._partialMessagesProgress[messageID] + if messageID in self._partialMessages: + del self._partialMessages[messageID] + df.errback(TimeoutError(remoteContactID)) def _hasProgressBeenMade(self, messageID): return ( diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 1f6cca926..8f3661f49 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -7,102 +7,17 @@ import time import random +from zope.interface import implements import constants import kbucket +import protocol +from interface import IRoutingTable +import logging -from protocol import TimeoutError +log = logging.getLogger(__name__) -class RoutingTable(object): - """ Interface for RPC message translators/formatters - - Classes inheriting from this should provide a suitable routing table for - a parent Node object (i.e. the local entity in the Kademlia network) - """ - - def __init__(self, parentNodeID): - """ - @param parentNodeID: The n-bit node ID of the node to which this - routing table belongs - @type parentNodeID: str - """ - - def addContact(self, contact): - """ Add the given contact to the correct k-bucket; if it already - exists, its status will be updated - - @param contact: The contact to add to this node's k-buckets - @type contact: kademlia.contact.Contact - """ - - def findCloseNodes(self, key, count, _rpcNodeID=None): - """ Finds a number of known nodes closest to the node/value with the - specified key. - - @param key: the n-bit key (i.e. the node or value ID) to search for - @type key: str - @param count: the amount of contacts to return - @type count: int - @param _rpcNodeID: Used during RPC, this is be the sender's Node ID - Whatever ID is passed in the paramater will get - excluded from the list of returned contacts. - @type _rpcNodeID: str - - @return: A list of node contacts (C{kademlia.contact.Contact instances}) - closest to the specified key. - This method will return C{k} (or C{count}, if specified) - contacts if at all possible; it will only return fewer if the - node is returning all of the contacts that it knows of. - @rtype: list - """ - - def getContact(self, contactID): - """ Returns the (known) contact with the specified node ID - - @raise ValueError: No contact with the specified contact ID is known - by this node - """ - - def getRefreshList(self, startIndex=0, force=False): - """ Finds all k-buckets that need refreshing, starting at the - k-bucket with the specified index, and returns IDs to be searched for - in order to refresh those k-buckets - - @param startIndex: The index of the bucket to start refreshing at; - this bucket and those further away from it will - be refreshed. For example, when joining the - network, this node will set this to the index of - the bucket after the one containing it's closest - neighbour. - @type startIndex: index - @param force: If this is C{True}, all buckets (in the specified range) - will be refreshed, regardless of the time they were last - accessed. - @type force: bool - - @return: A list of node ID's that the parent node should search for - in order to refresh the routing Table - @rtype: list - """ - - def removeContact(self, contactID): - """ Remove the contact with the specified node ID from the routing - table - - @param contactID: The node ID of the contact to remove - @type contactID: str - """ - - def touchKBucket(self, key): - """ Update the "last accessed" timestamp of the k-bucket which covers - the range containing the specified key in the key/ID space - - @param key: A key in the range of the target k-bucket - @type key: str - """ - - -class TreeRoutingTable(RoutingTable): +class TreeRoutingTable(object): """ This class implements a routing table used by a Node class. The Kademlia routing table is a binary tree whose leaves are k-buckets, @@ -117,6 +32,7 @@ class TreeRoutingTable(RoutingTable): C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ + implements(IRoutingTable) def __init__(self, parentNodeID): """ @@ -161,17 +77,18 @@ class TreeRoutingTable(RoutingTable): # the k-bucket. This implementation follows section # 2.2 regarding this point. - def replaceContact(failure): + def replaceContact(failure, deadContactID): """ Callback for the deferred PING RPC to see if the head node in the k-bucket is still responding @type failure: twisted.python.failure.Failure """ - failure.trap(TimeoutError) - print '==replacing contact==' - # Remove the old contact... - deadContactID = failure.getErrorMessage() + failure.trap(protocol.TimeoutError) + if len(deadContactID) != constants.key_bits / 8: + raise ValueError("invalid contact id") + log.debug("Replacing dead contact: %s", deadContactID.encode('hex')) try: + # Remove the old contact... self._buckets[bucketIndex].removeContact(deadContactID) except ValueError: # The contact has already been removed (probably due to a timeout) @@ -184,7 +101,7 @@ class TreeRoutingTable(RoutingTable): df = head_contact.ping() # If there's an error (i.e. timeout), remove the head # contact, and append the new one - df.addErrback(replaceContact) + df.addErrback(replaceContact, head_contact.id) def findCloseNodes(self, key, count, _rpcNodeID=None): """ Finds a number of known nodes closest to the node/value with the @@ -207,7 +124,11 @@ class TreeRoutingTable(RoutingTable): @rtype: list """ bucketIndex = self._kbucketIndex(key) - closestNodes = self._buckets[bucketIndex].getContacts(constants.k, _rpcNodeID) + + if bucketIndex < len(self._buckets): + closestNodes = self._buckets[bucketIndex].getContacts(count, _rpcNodeID) + else: + closestNodes = [] # This method must return k contacts (even if we have the node # with the specified key as node ID), unless there is less # than k remote nodes in the routing table @@ -215,7 +136,7 @@ class TreeRoutingTable(RoutingTable): canGoLower = bucketIndex - i >= 0 canGoHigher = bucketIndex + i < len(self._buckets) # Fill up the node list to k nodes, starting with the closest neighbouring nodes known - while len(closestNodes) < constants.k and (canGoLower or canGoHigher): + while len(closestNodes) < min(count, constants.k) and (canGoLower or canGoHigher): # TODO: this may need to be optimized if canGoLower: closestNodes.extend( @@ -224,8 +145,8 @@ class TreeRoutingTable(RoutingTable): canGoLower = bucketIndex - (i + 1) >= 0 if canGoHigher: closestNodes.extend( - self._buckets[bucketIndex + i].getContacts( - constants.k - len(closestNodes), _rpcNodeID)) + self._buckets[bucketIndex + i].getContacts(constants.k - len(closestNodes), + _rpcNodeID)) canGoHigher = bucketIndex + (i + 1) < len(self._buckets) i += 1 return closestNodes @@ -404,10 +325,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable): self._replacementCache[bucketIndex] = [] if contact in self._replacementCache[bucketIndex]: self._replacementCache[bucketIndex].remove(contact) - # TODO: Using k to limit the size of the contact - # replacement cache - maybe define a separate value for - # this in constants.py? - elif len(self._replacementCache[bucketIndex]) >= constants.k: + elif len(self._replacementCache[bucketIndex]) >= constants.replacementCacheSize: self._replacementCache[bucketIndex].pop(0) self._replacementCache[bucketIndex].append(contact) @@ -424,7 +342,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable): except ValueError: return contact.failedRPCs += 1 - if contact.failedRPCs >= 5: + if contact.failedRPCs >= constants.rpcAttempts: self._buckets[bucketIndex].removeContact(contactID) # Replace this stale contact with one from our replacement cache, if we have any if bucketIndex in self._replacementCache: diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index b0f9966a1..fd96aa8f0 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -53,9 +53,9 @@ class EncryptedFileManager(object): def setup(self): yield self._open_db() yield self._add_to_sd_identifier() - yield self._start_lbry_files() - if self.auto_re_reflect is True: - safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) + # don't block on starting the lbry files + self._start_lbry_files() + log.info("Started file manager") def get_lbry_file_status(self, lbry_file): return self._get_lbry_file_status(lbry_file.rowid) @@ -119,6 +119,9 @@ class EncryptedFileManager(object): self._set_options_and_restore(rowid, stream_hash, options) for rowid, stream_hash, options in files_and_options ]) + + if self.auto_re_reflect is True: + safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) log.info("Started %i lbry files", len(self.lbry_files)) @defer.inlineCallbacks diff --git a/lbrynet/tests/dht/__init__.py b/lbrynet/tests/dht/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/tests/dht/testNode.py b/lbrynet/tests/dht/test_node.py similarity index 64% rename from lbrynet/tests/dht/testNode.py rename to lbrynet/tests/dht/test_node.py index a4e751d51..9ae3eb3a9 100644 --- a/lbrynet/tests/dht/testNode.py +++ b/lbrynet/tests/dht/test_node.py @@ -8,10 +8,13 @@ import hashlib import unittest import struct +from twisted.internet import protocol, defer, selectreactor +from lbrynet.dht.msgtypes import ResponseMessage import lbrynet.dht.node import lbrynet.dht.constants import lbrynet.dht.datastore + class NodeIDTest(unittest.TestCase): """ Test case for the Node class's ID """ def setUp(self): @@ -19,71 +22,76 @@ class NodeIDTest(unittest.TestCase): def testAutoCreatedID(self): """ Tests if a new node has a valid node ID """ - self.failUnlessEqual(type(self.node.id), str, 'Node does not have a valid ID') - self.failUnlessEqual(len(self.node.id), 20, 'Node ID length is incorrect! Expected 160 bits, got %d bits.' % (len(self.node.id)*8)) + self.failUnlessEqual(type(self.node.node_id), str, 'Node does not have a valid ID') + self.failUnlessEqual(len(self.node.node_id), 48, 'Node ID length is incorrect! ' + 'Expected 384 bits, got %d bits.' % + (len(self.node.node_id) * 8)) def testUniqueness(self): - """ Tests the uniqueness of the values created by the NodeID generator - """ + """ Tests the uniqueness of the values created by the NodeID generator """ generatedIDs = [] for i in range(100): newID = self.node._generateID() # ugly uniqueness test self.failIf(newID in generatedIDs, 'Generated ID #%d not unique!' % (i+1)) generatedIDs.append(newID) - + def testKeyLength(self): """ Tests the key Node ID key length """ for i in range(20): id = self.node._generateID() # Key length: 20 bytes == 160 bits - self.failUnlessEqual(len(id), 20, 'Length of generated ID is incorrect! Expected 160 bits, got %d bits.' % (len(id)*8)) + self.failUnlessEqual(len(id), 48, + 'Length of generated ID is incorrect! Expected 384 bits, ' + 'got %d bits.' % (len(id)*8)) class NodeDataTest(unittest.TestCase): """ Test case for the Node class's data-related functions """ def setUp(self): import lbrynet.dht.contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('test') self.node = lbrynet.dht.node.Node() - self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345, self.node._protocol) + self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345, + self.node._protocol) self.token = self.node.make_token(self.contact.compact_ip()) self.cases = [] for i in xrange(5): h.update(str(i)) self.cases.append((h.digest(), 5000+2*i)) self.cases.append((h.digest(), 5001+2*i)) -<<<<<<< Updated upstream - #(('a', 'hello there\nthis is a test'), - # ('aMuchLongerKeyThanAnyOfThePreviousOnes', 'some data')) - -======= ->>>>>>> Stashed changes + @defer.inlineCallbacks def testStore(self): - - def check_val_in_result(r, peer_info): - self.failUnless - """ Tests if the node can store (and privately retrieve) some data """ for key, value in self.cases: - self.node.store(key, {'port': value, 'bbid': self.contact.id, 'token': self.token}, self.contact.id, _rpcNodeContact=self.contact) + request = { + 'port': value, + 'lbryid': self.contact.id, + 'token': self.token + } + yield self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact) for key, value in self.cases: - expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + self.contact.id - self.failUnless(self.node._dataStore.hasPeersForBlob(key), 'Stored key not found in node\'s DataStore: "%s"' % key) - self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key), 'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s' % (key, value, self.node._dataStore.getPeersForBlob(key))) + expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + \ + self.contact.id + self.failUnless(self.node._dataStore.hasPeersForBlob(key), + 'Stored key not found in node\'s DataStore: "%s"' % key) + self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key), + 'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s' + % (key, value, self.node._dataStore.getPeersForBlob(key))) + class NodeContactTest(unittest.TestCase): """ Test case for the Node class's contact management-related functions """ def setUp(self): self.node = lbrynet.dht.node.Node() - + def testAddContact(self): """ Tests if a contact can be added and retrieved correctly """ import lbrynet.dht.contact # Create the contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('node1') contactID = h.digest() contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.node._protocol) @@ -91,67 +99,60 @@ class NodeContactTest(unittest.TestCase): self.node.addContact(contact) # ...and request the closest nodes to it using FIND_NODE closestNodes = self.node._routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k) - self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes)) - self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()') - + self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; ' + 'expected 1, got %d' % len(closestNodes)) + self.failUnless(contact in closestNodes, 'Added contact not found by issueing ' + '_findCloseNodes()') + def testAddSelfAsContact(self): """ Tests the node's behaviour when attempting to add itself as a contact """ import lbrynet.dht.contact # Create a contact with the same ID as the local node's ID - contact = lbrynet.dht.contact.Contact(self.node.id, '127.0.0.1', 91824, None) + contact = lbrynet.dht.contact.Contact(self.node.node_id, '127.0.0.1', 91824, None) # Now try to add it self.node.addContact(contact) # ...and request the closest nodes to it using FIND_NODE - closestNodes = self.node._routingTable.findCloseNodes(self.node.id, lbrynet.dht.constants.k) + closestNodes = self.node._routingTable.findCloseNodes(self.node.node_id, + lbrynet.dht.constants.k) self.failIf(contact in closestNodes, 'Node added itself as a contact') -<<<<<<< Updated upstream -# """ Test case for the Node class's iterative node lookup algorithm """ - - -# """ Ugly brute-force test to see if the iterative node lookup algorithm runs without failing """ - -======= ->>>>>>> Stashed changes - -"""Some scaffolding for the NodeLookupTest class. Allows isolated -node testing by simulating remote node responses""" -from twisted.internet import protocol, defer, selectreactor -from lbrynet.dht.msgtypes import ResponseMessage - - class FakeRPCProtocol(protocol.DatagramProtocol): def __init__(self): - self.reactor = selectreactor.SelectReactor() + self.reactor = selectreactor.SelectReactor() self.testResponse = None self.network = None def createNetwork(self, contactNetwork): - """ set up a list of contacts together with their closest contacts - @param contactNetwork: a sequence of tuples, each containing a contact together with its closest - contacts: C{(, )} - """ - self.network = contactNetwork - - """ Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs """ + """ + set up a list of contacts together with their closest contacts + @param contactNetwork: a sequence of tuples, each containing a contact together with its + closest contacts: C{(, )} + """ + self.network = contactNetwork + def sendRPC(self, contact, method, args, rawResponse=False): - - if method == "findNode": + """ Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs""" + + h = hashlib.sha384() + h.update('rpcId') + rpc_id = h.digest()[:20] + + if method == "findNode": # get the specific contacts closest contacts closestContacts = [] + closestContactsList = [] for contactTuple in self.network: if contact == contactTuple[0]: # get the list of closest contacts for this contact closestContactsList = contactTuple[1] - - # Pack the closest contacts into a ResponseMessage + # Pack the closest contacts into a ResponseMessage for closeContact in closestContactsList: closestContacts.append((closeContact.id, closeContact.address, closeContact.port)) - message = ResponseMessage("rpcId", contact.id, closestContacts) - + + message = ResponseMessage(rpc_id, contact.id, closestContacts) df = defer.Deferred() - df.callback((message,(contact.address, contact.port))) + df.callback((message, (contact.address, contact.port))) return df elif method == "findValue": for contactTuple in self.network: @@ -160,12 +161,10 @@ class FakeRPCProtocol(protocol.DatagramProtocol): dataDict = contactTuple[2] dataKey = dataDict.keys()[0] data = dataDict.get(dataKey) - # Check if this contact has the requested value if dataKey == args[0]: # Return the data value response = dataDict - print "data found at contact: " + contact.id else: # Return the closest contact to the requested data key @@ -173,62 +172,55 @@ class FakeRPCProtocol(protocol.DatagramProtocol): closeContacts = contactTuple[1] closestContacts = [] for closeContact in closeContacts: - closestContacts.append((closeContact.id, closeContact.address, closeContact.port)) + closestContacts.append((closeContact.id, closeContact.address, + closeContact.port)) response = closestContacts - + # Create the response message - message = ResponseMessage("rpcId", contact.id, response) + message = ResponseMessage(rpc_id, contact.id, response) df = defer.Deferred() - df.callback((message,(contact.address, contact.port))) + df.callback((message, (contact.address, contact.port))) return df def _send(self, data, rpcID, address): """ fake sending data """ - - + class NodeLookupTest(unittest.TestCase): """ Test case for the Node class's iterativeFind node lookup algorithm """ - + def setUp(self): - # create a fake protocol to imitate communication with other nodes self._protocol = FakeRPCProtocol() - - # Note: The reactor is never started for this test. All deferred calls run sequentially, + # Note: The reactor is never started for this test. All deferred calls run sequentially, # since there is no asynchronous network communication - # create the node to be tested in isolation - self.node = lbrynet.dht.node.Node(None, 4000, None, None, self._protocol) - + h = hashlib.sha384() + h.update('node1') + node_id = str(h.digest()) + self.node = lbrynet.dht.node.Node(node_id, 4000, None, None, self._protocol) self.updPort = 81173 - -<<<<<<< Updated upstream - # create a dummy reactor - -======= ->>>>>>> Stashed changes self.contactsAmount = 80 - # set the node ID manually for testing - self.node.id = '12345678901234567800' - # Reinitialise the routing table - self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(self.node.id) - + self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable( + self.node.node_id) + # create 160 bit node ID's for test purposes self.testNodeIDs = [] - idNum = int(self.node.id) + idNum = int(self.node.node_id.encode('hex'), 16) for i in range(self.contactsAmount): - # create the testNodeIDs in ascending order, away from the actual node ID, with regards to the distance metric - self.testNodeIDs.append(idNum + i + 1) + # create the testNodeIDs in ascending order, away from the actual node ID, + # with regards to the distance metric + self.testNodeIDs.append(str("%X" % (idNum + i + 1)).decode('hex')) # generate contacts self.contacts = [] for i in range(self.contactsAmount): - contact = lbrynet.dht.contact.Contact(str(self.testNodeIDs[i]), "127.0.0.1", self.updPort + i + 1, self._protocol) + contact = lbrynet.dht.contact.Contact(self.testNodeIDs[i], "127.0.0.1", + self.updPort + i + 1, self._protocol) self.contacts.append(contact) - - # create the network of contacts in format: (contact, closest contacts) + + # create the network of contacts in format: (contact, closest contacts) contactNetwork = ((self.contacts[0], self.contacts[8:15]), (self.contacts[1], self.contacts[16:23]), (self.contacts[2], self.contacts[24:31]), @@ -254,43 +246,28 @@ class NodeLookupTest(unittest.TestCase): contacts_with_datastores = [] for contact_tuple in contactNetwork: - contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], lbrynet.dht.datastore.DictDataStore())) - + contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], + lbrynet.dht.datastore.DictDataStore())) self._protocol.createNetwork(contacts_with_datastores) - + + @defer.inlineCallbacks def testNodeBootStrap(self): """ Test bootstrap with the closest possible contacts """ - - df = self.node._iterativeFind(self.node.id, self.contacts[0:8]) + + activeContacts = yield self.node._iterativeFind(self.node.node_id, self.contacts[0:8]) # Set the expected result - expectedResult = [] - + expectedResult = set() for item in self.contacts[0:6]: - expectedResult.append(item.id) - + expectedResult.add(item.id) # Get the result from the deferred - activeContacts = df.result - - + # Check the length of the active contacts - self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), \ - "More active contacts should exist, there should be %d contacts" %expectedResult.__len__()) - - + self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), + "More active contacts should exist, there should be %d " + "contacts but there are %d" % (len(expectedResult), + len(activeContacts))) + # Check that the received active contacts are the same as the input contacts - self.failUnlessEqual(activeContacts, expectedResult, \ - "Active should only contain the closest possible contacts which were used as input for the boostrap") - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(NodeIDTest)) - suite.addTest(unittest.makeSuite(NodeDataTest)) - suite.addTest(unittest.makeSuite(NodeContactTest)) - suite.addTest(unittest.makeSuite(NodeLookupTest)) - return suite - - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) + self.failUnlessEqual({contact.id for contact in activeContacts}, expectedResult, + "Active should only contain the closest possible contacts" + " which were used as input for the boostrap") diff --git a/lbrynet/tests/dht/testProtocol.py b/lbrynet/tests/dht/test_protocol.py similarity index 50% rename from lbrynet/tests/dht/testProtocol.py rename to lbrynet/tests/dht/test_protocol.py index 7215eaa27..d616451d9 100644 --- a/lbrynet/tests/dht/testProtocol.py +++ b/lbrynet/tests/dht/test_protocol.py @@ -1,88 +1,22 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive - import time import unittest - -from twisted.internet import defer -from twisted.python import failure import twisted.internet.selectreactor -from twisted.internet.protocol import DatagramProtocol import lbrynet.dht.protocol import lbrynet.dht.contact import lbrynet.dht.constants import lbrynet.dht.msgtypes -from lbrynet.dht.node import rpcmethod +from lbrynet.dht.error import TimeoutError +from lbrynet.dht.node import Node, rpcmethod -class FakeNode(object): - """ A fake node object implementing some RPC and non-RPC methods to - test the Kademlia protocol's behaviour - """ - def __init__(self, id): - self.id = id - self.contacts = [] - - @rpcmethod - def ping(self): - return 'pong' - - def pingNoRPC(self): - return 'pong' - - @rpcmethod - def echo(self, value): - return value - - def addContact(self, contact): - self.contacts.append(contact) - - def removeContact(self, contact): - self.contacts.remove(contact) - - def indirectPingContact(self, protocol, contact): - """ Pings the given contact (using the specified KademliaProtocol - object, not the direct Contact API), and removes the contact - on a timeout """ - df = protocol.sendRPC(contact, 'ping', {}) - def handleError(f): - if f.check(lbrynet.dht.protocol.TimeoutError): - self.removeContact(contact) - return f - else: - # This is some other error - return f - df.addErrback(handleError) - return df - -class ClientDatagramProtocol(lbrynet.dht.protocol.KademliaProtocol): - data = '' - msgID = '' - destination = ('127.0.0.1', 9182) - - def __init__(self): - lbrynet.dht.protocol.KademliaProtocol.__init__(self, None) - - def startProtocol(self): - self.sendDatagram() - - def sendDatagram(self): - if len(self.data): - self._send(self.data, self.msgID, self.destination) - - - - class KademliaProtocolTest(unittest.TestCase): """ Test case for the Protocol class """ + def setUp(self): del lbrynet.dht.protocol.reactor lbrynet.dht.protocol.reactor = twisted.internet.selectreactor.SelectReactor() - self.node = FakeNode('node1') + self.node = Node(node_id='1' * 48, udpPort=9182, externalIP="127.0.0.1") self.protocol = lbrynet.dht.protocol.KademliaProtocol(self.node) def testReactor(self): @@ -93,36 +27,66 @@ class KademliaProtocolTest(unittest.TestCase): def testRPCTimeout(self): """ Tests if a RPC message sent to a dead remote node times out correctly """ - deadContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) + + @rpcmethod + def fake_ping(*args, **kwargs): + time.sleep(lbrynet.dht.constants.rpcTimeout + 1) + return 'pong' + + real_ping = self.node.ping + real_timeout = lbrynet.dht.constants.rpcTimeout + real_attempts = lbrynet.dht.constants.rpcAttempts + lbrynet.dht.constants.rpcAttempts = 1 + lbrynet.dht.constants.rpcTimeout = 1 + self.node.ping = fake_ping + deadContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) self.node.addContact(deadContact) # Make sure the contact was added - self.failIf(deadContact not in self.node.contacts, 'Contact not added to fake node (error in test code)') - # Set the timeout to 0 for testing - tempTimeout = lbrynet.dht.constants.rpcTimeout - lbrynet.dht.constants.rpcTimeout = 0 - lbrynet.dht.protocol.reactor.listenUDP(0, self.protocol) - # Run the PING RPC (which should timeout) - df = self.node.indirectPingContact(self.protocol, deadContact) + self.failIf(deadContact not in self.node.contacts, + 'Contact not added to fake node (error in test code)') + lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) + + # Run the PING RPC (which should raise a timeout error) + df = self.protocol.sendRPC(deadContact, 'ping', {}) + + def check_timeout(err): + self.assertEqual(type(err), TimeoutError) + + df.addErrback(check_timeout) + + def reset_values(): + self.node.ping = real_ping + lbrynet.dht.constants.rpcTimeout = real_timeout + lbrynet.dht.constants.rpcAttempts = real_attempts + + # See if the contact was removed due to the timeout + def check_removed_contact(): + self.failIf(deadContact in self.node.contacts, + 'Contact was not removed after RPC timeout; check exception types.') + + df.addCallback(lambda _: reset_values()) + # Stop the reactor if a result arrives (timeout or not) df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) + df.addCallback(lambda _: check_removed_contact()) lbrynet.dht.protocol.reactor.run() - # See if the contact was removed due to the timeout - self.failIf(deadContact in self.node.contacts, 'Contact was not removed after RPC timeout; check exception types.') - # Restore the global timeout - lbrynet.dht.constants.rpcTimeout = tempTimeout - + def testRPCRequest(self): """ Tests if a valid RPC request is executed and responded to correctly """ - remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) + remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) self.node.addContact(remoteContact) self.error = None + def handleError(f): self.error = 'An RPC error occurred: %s' % f.getErrorMessage() + def handleResult(result): expectedResult = 'pong' if result != expectedResult: - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result) - # Publish the "local" node on the network + self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \ + % (expectedResult, result) + + # Publish the "local" node on the network lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) # Simulate the RPC df = remoteContact.ping() @@ -132,17 +96,19 @@ class KademliaProtocolTest(unittest.TestCase): lbrynet.dht.protocol.reactor.run() self.failIf(self.error, self.error) # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!') + self.failUnlessEqual(len(self.protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') def testRPCAccess(self): """ Tests invalid RPC requests - Verifies that a RPC request for an existing but unpublished method is denied, and that the associated (remote) exception gets raised locally """ - remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) + remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) self.node.addContact(remoteContact) self.error = None + def handleError(f): try: f.raiseException() @@ -150,51 +116,52 @@ class KademliaProtocolTest(unittest.TestCase): # This is the expected outcome since the remote node did not publish the method self.error = None except Exception, e: - self.error = 'The remote method failed, but the wrong exception was raised; expected AttributeError, got %s' % type(e) - + self.error = 'The remote method failed, but the wrong exception was raised; ' \ + 'expected AttributeError, got %s' % type(e) + def handleResult(result): - self.error = 'The remote method executed successfully, returning: "%s"; this RPC should not have been allowed.' % result - # Publish the "local" node on the network + self.error = 'The remote method executed successfully, returning: "%s"; ' \ + 'this RPC should not have been allowed.' % result + + # Publish the "local" node on the network lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) # Simulate the RPC - df = remoteContact.pingNoRPC() + df = remoteContact.not_a_rpc_function() df.addCallback(handleResult) df.addErrback(handleError) df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) lbrynet.dht.protocol.reactor.run() self.failIf(self.error, self.error) # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!') + self.failUnlessEqual(len(self.protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') def testRPCRequestArgs(self): """ Tests if an RPC requiring arguments is executed correctly """ - remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol) + remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol) self.node.addContact(remoteContact) self.error = None + def handleError(f): self.error = 'An RPC error occurred: %s' % f.getErrorMessage() + def handleResult(result): - expectedResult = 'This should be returned.' - if result != 'This should be returned.': - self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result) - # Publish the "local" node on the network + expectedResult = 'pong' + if result != expectedResult: + self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \ + (expectedResult, result) + + # Publish the "local" node on the network lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol) # Simulate the RPC - df = remoteContact.echo('This should be returned.') + df = remoteContact.ping() df.addCallback(handleResult) df.addErrback(handleError) df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop()) lbrynet.dht.protocol.reactor.run() self.failIf(self.error, self.error) # The list of sent RPC messages should be empty at this stage - self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!') - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(KademliaProtocolTest)) - return suite - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) + self.failUnlessEqual(len(self.protocol._sentMessages), 0, + 'The protocol is still waiting for a RPC result, ' + 'but the transaction is already done!') diff --git a/lbrynet/tests/dht/testRoutingTable.py b/lbrynet/tests/dht/test_routing_table.py similarity index 62% rename from lbrynet/tests/dht/testRoutingTable.py rename to lbrynet/tests/dht/test_routing_table.py index 8a1ad9c54..fa0b0fd6e 100644 --- a/lbrynet/tests/dht/testRoutingTable.py +++ b/lbrynet/tests/dht/test_routing_table.py @@ -10,6 +10,8 @@ import unittest import lbrynet.dht.constants import lbrynet.dht.routingtable import lbrynet.dht.contact +import lbrynet.dht.node + class FakeRPCProtocol(object): """ Fake RPC protocol; allows lbrynet.dht.contact.Contact objects to "send" RPCs """ @@ -21,41 +23,47 @@ class FakeDeferred(object): """ Fake Twisted Deferred object; allows the routing table to add callbacks that do nothing """ def addCallback(self, *args, **kwargs): return + def addErrback(self, *args, **kwargs): return + def addCallbacks(self, *args, **kwargs): + return + class TreeRoutingTableTest(unittest.TestCase): """ Test case for the RoutingTable class """ def setUp(self): - h = hashlib.sha1() + h = hashlib.sha384() h.update('node1') self.nodeID = h.digest() self.protocol = FakeRPCProtocol() self.routingTable = lbrynet.dht.routingtable.TreeRoutingTable(self.nodeID) - + def testDistance(self): """ Test to see if distance method returns correct result""" - + # testList holds a couple 3-tuple (variable1, variable2, result) - basicTestList = [('123456789','123456789', 0L), ('12345', '98765', 34527773184L)] + basicTestList = [('123456789', '123456789', 0L), ('12345', '98765', 34527773184L)] for test in basicTestList: - result = self.routingTable.distance(test[0], test[1]) - self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' % (test[2], result)) + result = lbrynet.dht.node.Distance(test[0])(test[1]) + self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' % + (test[2], result)) baseIp = '146.64.19.111' ipTestList = ['146.64.29.222', '192.68.19.333'] - distanceOne = self.routingTable.distance(baseIp, ipTestList[0]) - distanceTwo = self.routingTable.distance(baseIp, ipTestList[1]) + distanceOne = lbrynet.dht.node.Distance(baseIp)(ipTestList[0]) + distanceTwo = lbrynet.dht.node.Distance(baseIp)(ipTestList[1]) + + self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' % + (ipTestList[0], baseIp, ipTestList[1])) - self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' % (ipTestList[0], baseIp, ipTestList[1])) - def testAddContact(self): """ Tests if a contact can be added and retrieved correctly """ # Create the contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('node2') contactID = h.digest() contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) @@ -63,12 +71,14 @@ class TreeRoutingTableTest(unittest.TestCase): self.routingTable.addContact(contact) # ...and request the closest nodes to it (will retrieve it) closestNodes = self.routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k) - self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes)) - self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()') - + self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,' + ' got %d' % len(closestNodes)) + self.failUnless(contact in closestNodes, 'Added contact not found by issueing ' + '_findCloseNodes()') + def testGetContact(self): """ Tests if a specific existing contact can be retrieved correctly """ - h = hashlib.sha1() + h = hashlib.sha384() h.update('node2') contactID = h.digest() contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) @@ -77,9 +87,12 @@ class TreeRoutingTableTest(unittest.TestCase): # ...and get it again sameContact = self.routingTable.getContact(contactID) self.failUnlessEqual(contact, sameContact, 'getContact() should return the same contact') - + def testAddParentNodeAsContact(self): - """ Tests the routing table's behaviour when attempting to add its parent node as a contact """ + """ + Tests the routing table's behaviour when attempting to add its parent node as a contact + """ + # Create a contact with the same ID as the local node's ID contact = lbrynet.dht.contact.Contact(self.nodeID, '127.0.0.1', 91824, self.protocol) # Now try to add it @@ -87,11 +100,11 @@ class TreeRoutingTableTest(unittest.TestCase): # ...and request the closest nodes to it using FIND_NODE closestNodes = self.routingTable.findCloseNodes(self.nodeID, lbrynet.dht.constants.k) self.failIf(contact in closestNodes, 'Node added itself as a contact') - + def testRemoveContact(self): """ Tests contact removal """ # Create the contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('node2') contactID = h.digest() contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol) @@ -105,54 +118,73 @@ class TreeRoutingTableTest(unittest.TestCase): def testSplitBucket(self): """ Tests if the the routing table correctly dynamically splits k-buckets """ - self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'Initial k-bucket range should be 0 <= range < 2**160') + self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**384, + 'Initial k-bucket range should be 0 <= range < 2**384') # Add k contacts for i in range(lbrynet.dht.constants.k): - h = hashlib.sha1() + h = hashlib.sha384() h.update('remote node %d' % i) nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, but should not yet be split') + self.failUnlessEqual(len(self.routingTable._buckets), 1, + 'Only k nodes have been added; the first k-bucket should now ' + 'be full, but should not yet be split') # Now add 1 more contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('yet another remote node') nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 2, 'k+1 nodes have been added; the first k-bucket should have been split into two new buckets') - self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'K-bucket was split, but its range was not properly adjusted') - self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**160, 'K-bucket was split, but the second (new) bucket\'s max range was not set properly') - self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, self.routingTable._buckets[1].rangeMin, 'K-bucket was split, but the min/max ranges were not divided properly') - + self.failUnlessEqual(len(self.routingTable._buckets), 2, + 'k+1 nodes have been added; the first k-bucket should have been ' + 'split into two new buckets') + self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**384, + 'K-bucket was split, but its range was not properly adjusted') + self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**384, + 'K-bucket was split, but the second (new) bucket\'s ' + 'max range was not set properly') + self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, + self.routingTable._buckets[1].rangeMin, + 'K-bucket was split, but the min/max ranges were ' + 'not divided properly') def testFullBucketNoSplit(self): - """ Test that a bucket is not split if it full, but does not cover the range containing the parent node's ID """ - self.routingTable._parentNodeID = 21*'a' # more than 160 bits; this will not be in the range of _any_ k-bucket + """ + Test that a bucket is not split if it full, but does not cover the range + containing the parent node's ID + """ + self.routingTable._parentNodeID = 49 * 'a' + # more than 384 bits; this will not be in the range of _any_ k-bucket # Add k contacts for i in range(lbrynet.dht.constants.k): - h = hashlib.sha1() + h = hashlib.sha384() h.update('remote node %d' % i) nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, and there should not be more than 1 bucket') - self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts))) + self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; ' + 'the first k-bucket should now be ' + 'full, and there should not be ' + 'more than 1 bucket') + self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, + 'Bucket should have k contacts; expected %d got %d' % + (lbrynet.dht.constants.k, + len(self.routingTable._buckets[0]._contacts))) # Now add 1 more contact - h = hashlib.sha1() + h = hashlib.sha384() h.update('yet another remote node') nodeID = h.digest() contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol) self.routingTable.addContact(contact) - self.failUnlessEqual(len(self.routingTable._buckets), 1, 'There should not be more than 1 bucket, since the bucket should not have been split (parent node ID not in range)') - self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts))) - self.failIf(contact in self.routingTable._buckets[0]._contacts, 'New contact should have been discarded (since RPC is faked in this test)') + self.failUnlessEqual(len(self.routingTable._buckets), 1, + 'There should not be more than 1 bucket, since the bucket ' + 'should not have been split (parent node ID not in range)') + self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), + lbrynet.dht.constants.k, 'Bucket should have k contacts; ' + 'expected %d got %d' % + (lbrynet.dht.constants.k, + len(self.routingTable._buckets[0]._contacts))) + self.failIf(contact in self.routingTable._buckets[0]._contacts, + 'New contact should have been discarded (since RPC is faked in this test)') -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(TreeRoutingTableTest)) - return suite - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index d84d693cb..52e53be74 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -46,6 +46,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker log_format = "%(funcName)s(): %(message)s" logging.basicConfig(level=logging.CRITICAL, format=log_format) + def require_system(system): def wrapper(fn): return fn @@ -115,10 +116,10 @@ class LbryUploader(object): self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir, - lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous) + dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") stream_info_manager = TempEncryptedFileMetadataManager() self.lbry_file_manager = EncryptedFileManager( self.session, stream_info_manager, self.sd_identifier) @@ -218,12 +219,13 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, db_dir, blob_dir = mk_db_and_blob_dir() session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd" + str(n), + node_id="abcd" + str(n), peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") stream_info_manager = TempEncryptedFileMetadataManager() @@ -330,12 +332,13 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero db_dir, blob_dir = mk_db_and_blob_dir() - session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="efgh", + session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") if slow is True: session.rate_limiter.set_ul_limit(2 ** 11) @@ -511,11 +514,11 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous) + dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -605,12 +608,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) @@ -687,11 +690,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, @@ -809,11 +813,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") self.stream_info_manager = TempEncryptedFileMetadataManager() diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index 8e32d451d..d252986a2 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -9,7 +9,6 @@ from lbrynet.core import PeerManager from lbrynet.core import RateLimiter from lbrynet.core import Session from lbrynet.core import StreamDescriptor -from lbrynet.dht.node import Node from lbrynet.lbry_file import EncryptedFileMetadataManager from lbrynet.lbry_file.client import EncryptedFileOptions from lbrynet.file_manager import EncryptedFileCreator @@ -18,6 +17,7 @@ from lbrynet.file_manager import EncryptedFileManager from lbrynet.tests import mocks from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir + class TestReflector(unittest.TestCase): def setUp(self): mocks.mock_conf_settings(self) @@ -57,7 +57,7 @@ class TestReflector(unittest.TestCase): self.session = Session.Session( conf.settings['data_rate'], db_dir=self.db_dir, - lbryid="abcd", + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=self.blob_dir, @@ -66,7 +66,7 @@ class TestReflector(unittest.TestCase): rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=mocks.BlobAvailabilityTracker, - dht_node_class=Node + external_ip="127.0.0.1" ) self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager( diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index 54b69fc1d..afd3b029c 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -72,12 +72,12 @@ class TestStreamify(TestCase): os.mkdir(blob_dir) self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=self.is_generous + is_generous=self.is_generous, external_ip="127.0.0.1" ) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -128,11 +128,11 @@ class TestStreamify(TestCase): os.mkdir(blob_dir) self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker + blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1" ) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) diff --git a/lbrynet/tests/integration/test_integration.py b/lbrynet/tests/integration/test_integration.py index 521d93844..2036a9730 100644 --- a/lbrynet/tests/integration/test_integration.py +++ b/lbrynet/tests/integration/test_integration.py @@ -18,6 +18,7 @@ def shell_command(command): FNULL = open(os.devnull, 'w') p = subprocess.Popen(command,shell=False,stdout=FNULL,stderr=subprocess.STDOUT) + def lbrynet_cli(commands): cli_cmd=['lbrynet-cli'] for cmd in commands: @@ -65,7 +66,6 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertTrue(out['is_running']) - def test_cli_docopts(self): out,err = lbrynet_cli(['cli_test_command']) self.assertEqual('',out) @@ -83,7 +83,6 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertEqual([1,[],1,None,False,False], out) - out,err = lbrynet_cli(['cli_test_command','1', '--pos_arg2=2','--pos_arg3=3']) out = json.loads(out) self.assertEqual([1,[],2,3,False,False], out) @@ -93,7 +92,6 @@ class TestIntegration(unittest.TestCase): # TODO: variable length arguments don't have guess_type() on them self.assertEqual([1,['2','3'],None,None,False,False], out) - out,err = lbrynet_cli(['cli_test_command','1','-a']) out = json.loads(out) self.assertEqual([1,[],None,None,True,False], out) @@ -102,13 +100,10 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertEqual([1,[],None,None,True,False], out) - out,err = lbrynet_cli(['cli_test_command','1','-a','-b']) out = json.loads(out) self.assertEqual([1,[],None,None,True,True], out) - - def test_status(self): out = lbrynet.status() self.assertTrue(out['is_running']) diff --git a/lbrynet/tests/unit/dht/test_messages.py b/lbrynet/tests/unit/dht/test_messages.py index dfa948059..36c2295b9 100644 --- a/lbrynet/tests/unit/dht/test_messages.py +++ b/lbrynet/tests/unit/dht/test_messages.py @@ -9,40 +9,41 @@ import unittest from lbrynet.dht.msgtypes import RequestMessage, ResponseMessage, ErrorMessage from lbrynet.dht.msgformat import MessageTranslator, DefaultFormat + class DefaultFormatTranslatorTest(unittest.TestCase): """ Test case for the default message translator """ def setUp(self): - self.cases = ((RequestMessage('node1', 'rpcMethod', - {'arg1': 'a string', 'arg2': 123}, 'rpc1'), + self.cases = ((RequestMessage('1' * 48, 'rpcMethod', + {'arg1': 'a string', 'arg2': 123}, '1' * 20), {DefaultFormat.headerType: DefaultFormat.typeRequest, - DefaultFormat.headerNodeID: 'node1', - DefaultFormat.headerMsgID: 'rpc1', + DefaultFormat.headerNodeID: '1' * 48, + DefaultFormat.headerMsgID: '1' * 20, DefaultFormat.headerPayload: 'rpcMethod', DefaultFormat.headerArgs: {'arg1': 'a string', 'arg2': 123}}), - (ResponseMessage('rpc2', 'node2', 'response'), + (ResponseMessage('2' * 20, '2' * 48, 'response'), {DefaultFormat.headerType: DefaultFormat.typeResponse, - DefaultFormat.headerNodeID: 'node2', - DefaultFormat.headerMsgID: 'rpc2', + DefaultFormat.headerNodeID: '2' * 48, + DefaultFormat.headerMsgID: '2' * 20, DefaultFormat.headerPayload: 'response'}), - (ErrorMessage('rpc3', 'node3', + (ErrorMessage('3' * 20, '3' * 48, "", 'this is a test exception'), {DefaultFormat.headerType: DefaultFormat.typeError, - DefaultFormat.headerNodeID: 'node3', - DefaultFormat.headerMsgID: 'rpc3', + DefaultFormat.headerNodeID: '3' * 48, + DefaultFormat.headerMsgID: '3' * 20, DefaultFormat.headerPayload: "", DefaultFormat.headerArgs: 'this is a test exception'}), (ResponseMessage( - 'rpc4', 'node4', + '4' * 20, '4' * 48, [('H\x89\xb0\xf4\xc9\xe6\xc5`H>\xd5\xc2\xc5\xe8Od\xf1\xca\xfa\x82', '127.0.0.1', 1919), ('\xae\x9ey\x93\xdd\xeb\xf1^\xff\xc5\x0f\xf8\xac!\x0e\x03\x9fY@{', '127.0.0.1', 1921)]), {DefaultFormat.headerType: DefaultFormat.typeResponse, - DefaultFormat.headerNodeID: 'node4', - DefaultFormat.headerMsgID: 'rpc4', + DefaultFormat.headerNodeID: '4' * 48, + DefaultFormat.headerMsgID: '4' * 20, DefaultFormat.headerPayload: [('H\x89\xb0\xf4\xc9\xe6\xc5`H>\xd5\xc2\xc5\xe8Od\xf1\xca\xfa\x82', '127.0.0.1', 1919), @@ -81,13 +82,3 @@ class DefaultFormatTranslatorTest(unittest.TestCase): 'Message instance variable "%s" not translated correctly; ' 'expected "%s", got "%s"' % (key, msg.__dict__[key], translatedObj.__dict__[key])) - - -def suite(): - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(DefaultFormatTranslatorTest)) - return suite - -if __name__ == '__main__': - # If this module is executed from the commandline, run all its tests - unittest.TextTestRunner().run(suite()) diff --git a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py index 1e0601455..3070b93e6 100644 --- a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py +++ b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py @@ -46,18 +46,23 @@ class CreateEncryptedFileTest(unittest.TestCase): session, manager, filename, handle, key, iv_generator()) defer.returnValue(out) + @defer.inlineCallbacks def test_can_create_file(self): expected_stream_hash = ('41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c357b1acb7' '42dd83151fb66393a7709e9f346260a4f4db6de10c25') filename = 'test.file' - d = self.create_file(filename) - d.addCallback(self.assertEqual, expected_stream_hash) - return d + stream_hash = yield self.create_file(filename) + self.assertEqual(expected_stream_hash, stream_hash) + blobs = yield self.blob_manager.get_all_verified_blobs() + self.assertEqual(2, len(blobs)) + num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs() + self.assertEqual(1, num_should_announce_blobs) + + @defer.inlineCallbacks def test_can_create_file_with_unicode_filename(self): expected_stream_hash = ('d1da4258f3ce12edb91d7e8e160d091d3ab1432c2e55a6352dce0' '2fd5adb86fe144e93e110075b5865fff8617776c6c0') filename = u'☃.file' - d = self.create_file(filename) - d.addCallback(self.assertEqual, expected_stream_hash) - return d + stream_hash = yield self.create_file(filename) + self.assertEqual(expected_stream_hash, stream_hash) diff --git a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py index 3a91c54b0..688925a15 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py @@ -18,6 +18,10 @@ from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed from lbrynet.tests.util import is_android +import logging +logging.getLogger("lbryum").setLevel(logging.WARNING) + + def get_test_daemon(data_rate=None, generous=True, with_fee=False): if data_rate is None: data_rate = conf.ADJUSTABLE_SETTINGS['data_rate'][1] @@ -71,7 +75,6 @@ class TestCostEst(trial.unittest.TestCase): size = 10000000 correct_result = 4.5 daemon = get_test_daemon(generous=True, with_fee=True) - print daemon.get_est_cost("test", size) self.assertEquals(daemon.get_est_cost("test", size).result, correct_result) def test_fee_and_ungenerous_data(self): diff --git a/requirements.txt b/requirements.txt index 2d5a0dcf2..994fcfc08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,9 +13,9 @@ GitPython==2.1.3 gmpy==1.17 jsonrpc==1.2 jsonrpclib==0.1.7 -jsonschema==2.5.1 -git+https://github.com/lbryio/lbryschema.git@v0.0.12rc1#egg=lbryschema -git+https://github.com/lbryio/lbryum.git@v3.1.9rc2#egg=lbryum +jsonschema==2.6.0 +git+https://github.com/lbryio/lbryum.git@v3.1.10rc1#egg=lbryum +git+https://github.com/lbryio/lbryschema.git@v0.0.13rc1#egg=lbryschema miniupnpc==1.9 pbkdf2==1.3 pycrypto==2.6.1 diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py new file mode 100644 index 000000000..60a07f799 --- /dev/null +++ b/scripts/dht_monitor.py @@ -0,0 +1,104 @@ +import curses +import time +from jsonrpc.proxy import JSONRPCProxy +import logging + +log = logging.getLogger(__name__) +log.addHandler(logging.FileHandler("dht contacts.log")) +# log.addHandler(logging.StreamHandler()) +log.setLevel(logging.INFO) +stdscr = curses.initscr() + +api = JSONRPCProxy.from_url("http://localhost:5279") + + +def init_curses(): + curses.noecho() + curses.cbreak() + stdscr.nodelay(1) + stdscr.keypad(1) + + +def teardown_curses(): + curses.nocbreak() + stdscr.keypad(0) + curses.echo() + curses.endwin() + + +def refresh(last_contacts, last_blobs): + height, width = stdscr.getmaxyx() + + try: + routing_table_info = api.routing_table_get() + node_id = routing_table_info['node_id'] + except: + node_id = "UNKNOWN" + routing_table_info = { + 'buckets': {}, + 'contacts': [], + 'blob_hashes': [] + } + for y in range(height): + stdscr.addstr(y, 0, " " * (width - 1)) + + buckets = routing_table_info['buckets'] + stdscr.addstr(0, 0, "node id: %s" % node_id) + stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" % + (len(buckets), len(routing_table_info['contacts']), + len(routing_table_info['blob_hashes']))) + + y = 3 + for i in sorted(buckets.keys()): + stdscr.addstr(y, 0, "bucket %s" % i) + y += 1 + for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')): + stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['node_id'], h['address'], + len(h['blobs']))) + y += 1 + y += 1 + + new_contacts = set(routing_table_info['contacts']) - last_contacts + lost_contacts = last_contacts - set(routing_table_info['contacts']) + + if new_contacts: + for c in new_contacts: + log.debug("added contact %s", c) + if lost_contacts: + for c in lost_contacts: + log.info("lost contact %s", c) + + new_blobs = set(routing_table_info['blob_hashes']) - last_blobs + lost_blobs = last_blobs - set(routing_table_info['blob_hashes']) + + if new_blobs: + for c in new_blobs: + log.debug("added blob %s", c) + if lost_blobs: + for c in lost_blobs: + log.info("lost blob %s", c) + + stdscr.addstr(y + 1, 0, str(time.time())) + stdscr.refresh() + return set(routing_table_info['contacts']), set(routing_table_info['blob_hashes']) + + +def do_main(): + c = None + last_contacts, last_blobs = set(), set() + while c not in [ord('q'), ord('Q')]: + last_contacts, last_blobs = refresh(last_contacts, last_blobs) + c = stdscr.getch() + time.sleep(0.1) + + +def main(): + try: + init_curses() + do_main() + finally: + teardown_curses() + + +if __name__ == "__main__": + main() diff --git a/scripts/dht_scripts.py b/scripts/dht_scripts.py index 657a5d7e0..b3a5cafe0 100644 --- a/scripts/dht_scripts.py +++ b/scripts/dht_scripts.py @@ -22,7 +22,7 @@ def join_network(udp_port, known_nodes): lbryid = generate_id() log.info('Creating node') - node = Node(udpPort=udp_port, lbryid=lbryid) + node = Node(udpPort=udp_port, node_id=lbryid) log.info('Joining network') yield node.joinNetwork(known_nodes) diff --git a/scripts/dhttest.py b/scripts/dhttest.py index a188030dc..fe0a0af7f 100644 --- a/scripts/dhttest.py +++ b/scripts/dhttest.py @@ -150,7 +150,7 @@ if __name__ == '__main__': # If you wish to have a pure Kademlia network, use the # entangled.kademlia.node.Node class instead print 'Creating Node' - node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid) + node = Node(udpPort=int(sys.argv[1]), node_id=lbryid) # Schedule the node to join the Kademlia/Entangled DHT node.joinNetwork(knownNodes) diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py index 39e1f406f..c2b08f944 100644 --- a/scripts/query_available_blobs.py +++ b/scripts/query_available_blobs.py @@ -51,7 +51,7 @@ def main(args=None): session = Session.Session( 0, db_dir=db_dir, - lbryid=utils.generate_id(), + node_id=utils.generate_id(), blob_dir=blob_dir, dht_node_port=4444, known_dht_nodes=conf.settings['known_dht_nodes'], diff --git a/scripts/rpc_node.py b/scripts/rpc_node.py index ced4fc6e8..40d69b8e7 100644 --- a/scripts/rpc_node.py +++ b/scripts/rpc_node.py @@ -1,82 +1,214 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# - -# Thanks to Paul Cannon for IP-address resolution functions (taken from aspn.activestate.com) - - -""" -Launch a DHT node which can respond to RPC commands. -""" - +import logging +import requests +import miniupnpc import argparse -from lbrynet.dht.node import Node -from txjsonrpc.web import jsonrpc -from twisted.web import server +from copy import deepcopy from twisted.internet import reactor, defer +from twisted.web import resource +from twisted.web.server import Site + +from lbrynet import conf +from lbrynet.core.log_support import configure_console +from lbrynet.dht.error import TimeoutError +conf.initialize_settings() + +log = logging.getLogger("dht tool") +configure_console() +log.setLevel(logging.INFO) + +from lbrynet.dht.node import Node +from lbrynet.dht.contact import Contact +from lbrynet.daemon.auth.server import AuthJSONRPCServer +from lbrynet.core.utils import generate_id + +def get_external_ip_and_setup_upnp(): + try: + u = miniupnpc.UPnP() + u.discoverdelay = 200 + u.discover() + u.selectigd() + + if u.getspecificportmapping(4444, "UDP"): + u.deleteportmapping(4444, "UDP") + log.info("Removed UPnP redirect for UDP 4444.") + u.addportmapping(4444, 'UDP', u.lanaddr, 4444, 'LBRY DHT port', '') + log.info("got external ip from upnp") + return u.externalipaddress() + except Exception: + log.exception("derp") + r = requests.get('https://api.ipify.org', {'format': 'json'}) + log.info("got external ip from ipify.org") + return r.json()['ip'] -class RPCNode(jsonrpc.JSONRPC): - def __init__(self, node, shut_down_cb): - jsonrpc.JSONRPC.__init__(self) - self.node = node - self.shut_down_cb = shut_down_cb +class NodeRPC(AuthJSONRPCServer): + def __init__(self, lbryid, seeds, node_port, rpc_port): + AuthJSONRPCServer.__init__(self, False) + self.root = None + self.port = None + self.seeds = seeds + self.node_port = node_port + self.rpc_port = rpc_port + if lbryid: + lbryid = lbryid.decode('hex') + else: + lbryid = generate_id() + self.node_id = lbryid + self.external_ip = get_external_ip_and_setup_upnp() + self.node_port = node_port - def jsonrpc_total_dht_nodes(self): - return self.node.getApproximateTotalDHTNodes() + @defer.inlineCallbacks + def setup(self): + self.node = Node(node_id=self.node_id, udpPort=self.node_port, + externalIP=self.external_ip) + hosts = [] + for hostname, hostport in self.seeds: + host_ip = yield reactor.resolve(hostname) + hosts.append((host_ip, hostport)) + log.info("connecting to dht") + yield self.node.joinNetwork(tuple(hosts)) + log.info("connected to dht") + if not self.announced_startup: + self.announced_startup = True + self.start_api() + log.info("lbry id: %s (%i bytes)", self.node.node_id.encode('hex'), len(self.node.node_id)) - def jsonrpc_total_dht_hashes(self): - return self.node.getApproximateTotalHashes() + def start_api(self): + root = resource.Resource() + root.putChild('', self) + self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost') + log.info("started jsonrpc server") - def jsonrpc_stop(self): - self.shut_down_cb() - return "fine" + @defer.inlineCallbacks + def jsonrpc_node_id_set(self, node_id): + old_id = self.node.node_id + self.node.stop() + del self.node + self.node_id = node_id.decode('hex') + yield self.setup() + msg = "changed dht id from %s to %s" % (old_id.encode('hex'), + self.node.node_id.encode('hex')) + defer.returnValue(msg) + + def jsonrpc_node_id_get(self): + return self._render_response(self.node.node_id.encode('hex')) + + @defer.inlineCallbacks + def jsonrpc_peer_find(self, node_id): + node_id = node_id.decode('hex') + contact = yield self.node.findContact(node_id) + result = None + if contact: + result = (contact.address, contact.port) + defer.returnValue(result) + + @defer.inlineCallbacks + def jsonrpc_peer_list_for_blob(self, blob_hash): + peers = yield self.node.getPeersForBlob(blob_hash.decode('hex')) + defer.returnValue(peers) + + @defer.inlineCallbacks + def jsonrpc_ping(self, node_id): + contact_host = yield self.jsonrpc_peer_find(node_id=node_id) + if not contact_host: + defer.returnValue("failed to find node") + contact_ip, contact_port = contact_host + contact = Contact(node_id.decode('hex'), contact_ip, contact_port, self.node._protocol) + try: + result = yield contact.ping() + except TimeoutError: + self.node.removeContact(contact.id) + self.node._dataStore.removePeer(contact.id) + result = {'error': 'timeout'} + defer.returnValue(result) + + def get_routing_table(self): + result = {} + data_store = deepcopy(self.node._dataStore._dict) + datastore_len = len(data_store) + hosts = {} + missing_contacts = [] + if datastore_len: + for k, v in data_store.iteritems(): + for value, lastPublished, originallyPublished, originalPublisherID in v: + try: + contact = self.node._routingTable.getContact(originalPublisherID) + except ValueError: + if originalPublisherID.encode('hex') not in missing_contacts: + missing_contacts.append(originalPublisherID.encode('hex')) + continue + if contact in hosts: + blobs = hosts[contact] + else: + blobs = [] + blobs.append(k.encode('hex')) + hosts[contact] = blobs + + contact_set = [] + blob_hashes = [] + result['buckets'] = {} + + for i in range(len(self.node._routingTable._buckets)): + for contact in self.node._routingTable._buckets[i]._contacts: + contacts = result['buckets'].get(i, []) + if contact in hosts: + blobs = hosts[contact] + del hosts[contact] + else: + blobs = [] + host = { + "address": contact.address, + "id": contact.id.encode("hex"), + "blobs": blobs, + } + for blob_hash in blobs: + if blob_hash not in blob_hashes: + blob_hashes.append(blob_hash) + contacts.append(host) + result['buckets'][i] = contacts + contact_set.append(contact.id.encode("hex")) + if hosts: + result['datastore extra'] = [ + { + "id": host.id.encode('hex'), + "blobs": hosts[host], + } + for host in hosts] + result['missing contacts'] = missing_contacts + result['contacts'] = contact_set + result['blob hashes'] = blob_hashes + result['node id'] = self.node_id.encode('hex') + return result + + def jsonrpc_routing_table_get(self): + return self._render_response(self.get_routing_table()) def main(): parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands") - - parser.add_argument("node_port", + parser.add_argument("--node_port", help=("The UDP port on which the node will listen for connections " "from other dht nodes"), - type=int) - parser.add_argument("rpc_port", + type=int, default=4444) + parser.add_argument("--rpc_port", help="The TCP port on which the node will listen for rpc commands", - type=int) - parser.add_argument("dht_bootstrap_host", + type=int, default=5280) + parser.add_argument("--bootstrap_host", help="The IP of a DHT node to be used to bootstrap into the network", - nargs='?') - parser.add_argument("dht_bootstrap_port", + default='lbrynet1.lbry.io') + parser.add_argument("--node_id", + help="The IP of a DHT node to be used to bootstrap into the network", + default=None) + parser.add_argument("--bootstrap_port", help="The port of a DHT node to be used to bootstrap into the network", - nargs='?', default=4000, type=int) - parser.add_argument("--rpc_ip_address", - help="The network interface on which to listen for rpc connections", - default="127.0.0.1") + default=4444, type=int) args = parser.parse_args() - - def start_rpc(): - rpc_node = RPCNode(node, shut_down) - reactor.listenTCP(args.rpc_port, server.Site(rpc_node), interface=args.rpc_ip_address) - - def shut_down(): - d = defer.maybeDeferred(node.stop) - d.addBoth(lambda _: reactor.stop()) - return d - - known_nodes = [] - if args.dht_bootstrap_host: - known_nodes.append((args.dht_bootstrap_host, args.dht_bootstrap_port)) - - node = Node(udpPort=args.node_port) - node.joinNetwork(known_nodes) - d = node._joinDeferred - d.addCallback(lambda _: start_rpc()) + seeds = [(args.bootstrap_host, args.bootstrap_port)] + server = NodeRPC(args.node_id, seeds, args.node_port, args.rpc_port) + reactor.addSystemEventTrigger('after', 'startup', server.setup) reactor.run() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scripts/send_sd_blobs_to_lighthouse.py b/scripts/send_sd_blobs_to_lighthouse.py index de40356cd..aad1f21f9 100644 --- a/scripts/send_sd_blobs_to_lighthouse.py +++ b/scripts/send_sd_blobs_to_lighthouse.py @@ -53,7 +53,7 @@ def main(args=None): session = Session.Session( blob_data_payment_rate=0, db_dir=db_dir, - lbryid=utils.generate_id(), + node_id=utils.generate_id(), blob_dir=blob_dir, dht_node_port=4444, known_dht_nodes=conf.settings['known_dht_nodes'], diff --git a/setup.py b/setup.py index 9ddd3a687..d9858bb5d 100644 --- a/setup.py +++ b/setup.py @@ -21,8 +21,8 @@ requires = [ 'envparse', 'jsonrpc', 'jsonschema', - 'lbryum==3.1.9rc2', - 'lbryschema==0.0.12rc1', + 'lbryum==3.1.10rc1', + 'lbryschema==0.0.13rc1', 'miniupnpc', 'pycrypto', 'pyyaml',