Merge branch 'master' into android_tests_refactoring

This commit is contained in:
Jack Robison 2017-10-25 12:01:37 -04:00 committed by GitHub
commit e4d2a3d2ec
42 changed files with 1187 additions and 795 deletions

View file

@ -13,39 +13,77 @@ at anytime.
*
### Fixed
* Fixed handling cancelled blob and availability requests
* Fixed redundant blob requests to a peer
* Fixed https://github.com/lbryio/lbry/issues/923
* Fixed concurrent reflects opening too many files
* Fixed cases when reflecting would fail on error conditions
* Fixed deadlocks from occuring during blob writes
* Fixed slow startup for nodes with many lbry files
* Fixed setting the external ip on startup
* Fixed session startup not blocking on joining the dht
* Fixed several parsing bugs that prevented replacing dead dht contacts
* Fixed lbryid length validation
* Fixed an old print statement that polluted logs
* Fixed rpc id length for dht requests
### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
*
*
### Changed
* Announcing by head blob is turned on by default
* Updated reflector server dns
* Use the first port available for the peer and dht ports, starting with the provided values (defaults of 3333 and 4444). This allows multiple lbrynet instances in a LAN with UPnP.
* Detect a UPnP redirect that didn't get cleaned up on a previous run and use it
* Bumped jsonschema requirement to 2.6.0
* Moved tests into the lbrynet package.
* Refactor some assert statements to accommodate the PYTHONOPTIMIZE flag set for Android.
### Added
* Added WAL pragma to sqlite3
* Added unit tests for `BlobFile`
* Updated exchange rate tests for the lbry.io api
* Use `hashlib` for sha384 instead of `pycrypto`
* Use `cryptography` instead of `pycrypto` for blob encryption and decryption
* Use `cryptography` for PKCS7 instead of doing it manually
* Use `BytesIO` buffers instead of temp files when processing blobs
* Refactored and pruned blob related classes into `lbrynet.blobs`
* Changed several `assert`s to raise more useful errors
* Added ability for reflector to store stream information for head blob announce
* Added blob announcement information to API call status with session flag
*
*
### Removed
* Removed `TempBlobFile`
* Removed unused `EncryptedFileOpener`
*
*
## [0.17.0] - 2017-10-12
### Fixed
* Fixed handling cancelled blob and availability requests
* Fixed redundant blob requests to a peer
* Fixed https://github.com/lbryio/lbry/issues/923
* Fixed concurrent reflects opening too many files
* Fixed cases when reflecting would fail on error conditions
* Fixed deadlocks from occuring during blob writes
* Fixed and updated`lbrynet.tests.dht`
* Fixed redundant dht id
* Fixed dht `ping` method
* Fixed raising remote exceptions in dht
* Fixed hanging delayedCall in dht node class
* Fixed logging error in dht when calling or receiving methods with no arguments
* Fixed IndexError in routingTable.findCloseNodes which would cause an empty list to be returned
* Fixed bug where last blob in a stream was not saved to blob manager
### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
### Changed
* Bumped `lbryschema` requirement to 0.0.12 [see changelog](https://github.com/lbryio/lbryschema/blob/master/CHANGELOG.md#0012---2017-10-12)
* Bumped `lbryum` requirement to 3.1.9 [see changelog](https://github.com/lbryio/lbryum/blob/master/CHANGELOG.md#319---2017-10-12)
* Announcing by head blob is turned on by default
* Updated reflector server dns
* Moved tests into the lbrynet package.
### Added
* Added WAL pragma to sqlite3
* Added unit tests for `BlobFile`
* Updated exchange rate tests for the lbry.io api
* Use `hashlib` for sha384 instead of `pycrypto`
* Use `cryptography` instead of `pycrypto` for blob encryption and decryption
* Use `cryptography` for PKCS7 instead of doing it manually
* Use `BytesIO` buffers instead of temp files when processing blobs
* Refactored and pruned blob related classes into `lbrynet.blobs`
* Changed several `assert`s to raise more useful errors
* Added ability for reflector to store stream information for head blob announce
* Added blob announcement information to API call status with session flag
### Removed
* Removed `TempBlobFile`
* Removed unused `EncryptedFileOpener`
## [0.16.3] - 2017-09-28

View file

@ -1,6 +1,6 @@
The MIT License (MIT)
Copyright (c) 2015-2016 LBRY Inc
Copyright (c) 2015-2017 LBRY Inc
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish,

View file

@ -759,6 +759,32 @@ Returns:
resolvable
```
## routing_table_get
```text
Get DHT routing information
Usage:
routing_table_get
Returns:
(dict) dictionary containing routing and contact information
{
"buckets": {
<bucket index>: [
{
"address": (str) peer address,
"node_id": (str) peer node id,
"blobs": (list) blob hashes announced by peer
}
]
},
"contacts": (list) contact node ids,
"blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets,
"node_id": (str) the local dht node id
}
```
## settings_get
```text
@ -857,6 +883,8 @@ Returns:
'session_status': {
'managed_blobs': count of blobs in the blob manager,
'managed_streams': count of streams in the file manager
'announce_queue_size': number of blobs currently queued to be announced
'should_announce_blobs': number of blobs that should be announced
}
If given the dht status option:

View file

@ -1,6 +1,6 @@
import logging
__version__ = "0.17.0rc13"
__version__ = "0.17.1rc4"
version = tuple(__version__.split('.'))
logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -168,7 +168,6 @@ class BlobFile(object):
"""
if file_handle is not None:
file_handle.close()
self.readers -= 1
def reader_finished(self, reader):
self.readers -= 1

View file

@ -37,18 +37,18 @@ class Session(object):
"""
def __init__(self, blob_data_payment_rate, db_dir=None,
lbryid=None, peer_manager=None, dht_node_port=None,
node_id=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None,
hash_announcer=None, blob_dir=None,
blob_manager=None, peer_port=None, use_upnp=True,
rate_limiter=None, wallet=None,
dht_node_class=node.Node, blob_tracker_class=None,
payment_rate_manager_class=None, is_generous=True):
payment_rate_manager_class=None, is_generous=True, external_ip=None):
"""@param blob_data_payment_rate: The default payment rate for blob data
@param db_dir: The directory in which levelDB files should be stored
@param lbryid: The unique ID of this node
@param node_id: The unique ID of this node
@param peer_manager: An object which keeps track of all known
peers. If None, a PeerManager will be created
@ -101,7 +101,7 @@ class Session(object):
"""
self.db_dir = db_dir
self.lbryid = lbryid
self.node_id = node_id
self.peer_manager = peer_manager
@ -124,7 +124,7 @@ class Session(object):
self.rate_limiter = rate_limiter
self.external_ip = '127.0.0.1'
self.external_ip = external_ip
self.upnp_redirects = []
@ -142,8 +142,8 @@ class Session(object):
log.debug("Starting session.")
if self.lbryid is None:
self.lbryid = generate_id()
if self.node_id is None:
self.node_id = generate_id()
if self.wallet is None:
from lbrynet.core.PTCWallet import PTCWallet
@ -193,6 +193,31 @@ class Session(object):
log.debug("In _try_upnp")
def get_free_port(upnp, port, protocol):
# returns an existing mapping if it exists
mapping = upnp.getspecificportmapping(port, protocol)
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping
return get_free_port(upnp, port + 1, protocol)
def get_port_mapping(upnp, internal_port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
external_port = get_free_port(upnp, internal_port, protocol)
if isinstance(external_port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, external_port[1], protocol, upnp.lanaddr, internal_port)
return external_port[1], protocol
upnp.addportmapping(external_port, protocol, upnp.lanaddr, internal_port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, external_port,
protocol, upnp.lanaddr, internal_port)
return external_port, protocol
def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
@ -202,40 +227,15 @@ class Session(object):
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0':
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port is not None:
if u.getspecificportmapping(self.peer_port, 'TCP') is None:
u.addportmapping(
self.peer_port, 'TCP', u.lanaddr, self.peer_port,
'LBRY peer port', '')
self.upnp_redirects.append((self.peer_port, 'TCP'))
log.info("Set UPnP redirect for TCP port %d", self.peer_port)
else:
# see comment below
log.warning("UPnP redirect already set for TCP port %d", self.peer_port)
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port is not None:
if u.getspecificportmapping(self.dht_node_port, 'UDP') is None:
u.addportmapping(
self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port,
'LBRY DHT port', '')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
log.info("Set UPnP redirect for UDP port %d", self.dht_node_port)
else:
# TODO: check that the existing redirect was
# put up by an old lbrynet session before
# grabbing it if such a disconnected redirect
# exists, then upnp won't work unless the
# redirect is appended or is torn down and set
# back up. a bad shutdown of lbrynet could
# leave such a redirect up and cause problems
# on the next start. this could be
# problematic if a previous lbrynet session
# didn't make the redirect, and it was made by
# another application
log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port)
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
if self.peer_port:
self.upnp_redirects.append(get_port_mapping(u, self.peer_port, 'TCP',
'LBRY peer port'))
if self.dht_node_port:
self.upnp_redirects.append(get_port_mapping(u, self.dht_node_port, 'UDP',
'LBRY DHT port'))
return True
return False
@ -260,8 +260,7 @@ class Session(object):
addresses.append(value)
return addresses
def start_dht(addresses):
self.dht_node.joinNetwork(addresses)
def start_dht(join_network_result):
self.peer_finder.run_manage_loop()
self.hash_announcer.run_manage_loop()
return True
@ -274,7 +273,7 @@ class Session(object):
self.dht_node = self.dht_node_class(
udpPort=self.dht_node_port,
lbryid=self.lbryid,
node_id=self.node_id,
externalIP=self.external_ip
)
self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager)
@ -283,6 +282,7 @@ class Session(object):
dl = defer.DeferredList(ds)
dl.addCallback(join_resolved_addresses)
dl.addCallback(self.dht_node.joinNetwork)
dl.addCallback(start_dht)
return dl

View file

@ -140,9 +140,8 @@ class ClientProtocol(Protocol, TimeoutMixin):
self._send_request_message(request_msg)
else:
# The connection manager has indicated that this connection should be terminated
log.info(
"Closing the connection to %s due to having no further requests to send",
self.peer)
log.debug("Closing the connection to %s due to having no further requests to send",
self.peer)
self.peer.report_success()
self.transport.loseConnection()
d = self._connection_manager.get_next_request(self.peer, self)

View file

@ -46,7 +46,10 @@ class DHTPeerFinder(object):
if timeout is not None:
reactor.callLater(timeout, _trigger_timeout)
peer_list = yield finished_deferred
try:
peer_list = yield finished_deferred
except defer.CancelledError:
peer_list = []
peers = set(peer_list)
good_peers = []

View file

@ -69,21 +69,26 @@ class CryptStreamCreator(object):
self.stopped = True
self.producer = None
def _close_current_blob(self):
# close the blob that was being written to
# and save it to blob manager
should_announce = self.blob_count == 0
d = self.current_blob.close()
d.addCallback(self._blob_finished)
d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info,
should_announce))
self.finished_deferreds.append(d)
self.current_blob = None
def stop(self):
"""Stop creating the stream. Create the terminating zero-length blob."""
log.debug("stop has been called for StreamCreator")
self.stopped = True
if self.current_blob is not None:
current_blob = self.current_blob
d = current_blob.close()
d.addCallback(self._blob_finished)
d.addErrback(self._error)
self.finished_deferreds.append(d)
self.current_blob = None
self._close_current_blob()
self._finalize()
dl = defer.DeferredList(self.finished_deferreds)
dl.addCallback(lambda _: self._finished())
dl.addErrback(self._error)
return dl
# TODO: move the stream creation process to its own thread and
@ -123,6 +128,7 @@ class CryptStreamCreator(object):
d.addCallback(self._blob_finished)
self.finished_deferreds.append(d)
def _write(self, data):
while len(data) > 0:
if self.current_blob is None:
@ -133,20 +139,11 @@ class CryptStreamCreator(object):
done, num_bytes_written = self.current_blob.write(data)
data = data[num_bytes_written:]
if done is True:
should_announce = self.blob_count == 0
d = self.current_blob.close()
d.addCallback(self._blob_finished)
d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info,
should_announce))
self.finished_deferreds.append(d)
self.current_blob = None
self._close_current_blob()
def _get_blob_maker(self, iv, blob_creator):
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
def _error(self, error):
log.error(error)
def _finished(self):
raise NotImplementedError()

View file

@ -9,7 +9,7 @@ import json
import textwrap
import random
import signal
from copy import deepcopy
from twisted.web import server
from twisted.internet import defer, threads, error, reactor
from twisted.internet.task import LoopingCall
@ -206,7 +206,7 @@ class Daemon(AuthJSONRPCServer):
# of the daemon, but I don't want to deal with that now
self.analytics_manager = analytics_manager
self.lbryid = conf.settings.node_id
self.node_id = conf.settings.node_id
self.wallet_user = None
self.wallet_password = None
@ -562,14 +562,15 @@ class Daemon(AuthJSONRPCServer):
self.session = Session(
conf.settings['data_rate'],
db_dir=self.db_dir,
lbryid=self.lbryid,
node_id=self.node_id,
blob_dir=self.blobfile_dir,
dht_node_port=self.dht_node_port,
known_dht_nodes=conf.settings['known_dht_nodes'],
peer_port=self.peer_port,
use_upnp=self.use_upnp,
wallet=wallet,
is_generous=conf.settings['is_generous_host']
is_generous=conf.settings['is_generous_host'],
external_ip=self.platform['ip']
)
self.startup_status = STARTUP_STAGES[2]
@ -1054,7 +1055,7 @@ class Daemon(AuthJSONRPCServer):
best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None
response = {
'lbry_id': base58.b58encode(self.lbryid),
'lbry_id': base58.b58encode(self.node_id),
'installation_id': conf.settings.installation_id,
'is_running': self.announced_startup,
'is_first_run': self.session.wallet.is_first_run if has_wallet else None,
@ -2658,6 +2659,81 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(lambda r: self._render_response(r))
return d
def jsonrpc_routing_table_get(self):
"""
Get DHT routing information
Usage:
routing_table_get
Returns:
(dict) dictionary containing routing and contact information
{
"buckets": {
<bucket index>: [
{
"address": (str) peer address,
"node_id": (str) peer node id,
"blobs": (list) blob hashes announced by peer
}
]
},
"contacts": (list) contact node ids,
"blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets,
"node_id": (str) the local dht node id
}
"""
result = {}
data_store = deepcopy(self.session.dht_node._dataStore._dict)
datastore_len = len(data_store)
hosts = {}
if datastore_len:
for k, v in data_store.iteritems():
for value, lastPublished, originallyPublished, originalPublisherID in v:
try:
contact = self.session.dht_node._routingTable.getContact(
originalPublisherID)
except ValueError:
continue
if contact in hosts:
blobs = hosts[contact]
else:
blobs = []
blobs.append(k.encode('hex'))
hosts[contact] = blobs
contact_set = []
blob_hashes = []
result['buckets'] = {}
for i in range(len(self.session.dht_node._routingTable._buckets)):
for contact in self.session.dht_node._routingTable._buckets[i]._contacts:
contacts = result['buckets'].get(i, [])
if contact in hosts:
blobs = hosts[contact]
del hosts[contact]
else:
blobs = []
host = {
"address": contact.address,
"node_id": contact.id.encode("hex"),
"blobs": blobs,
}
for blob_hash in blobs:
if blob_hash not in blob_hashes:
blob_hashes.append(blob_hash)
contacts.append(host)
result['buckets'][i] = contacts
if contact.id.encode('hex') not in contact_set:
contact_set.append(contact.id.encode("hex"))
result['contacts'] = contact_set
result['blob_hashes'] = blob_hashes
result['node_id'] = self.session.dht_node.node_id.encode('hex')
return self._render_response(result)
@defer.inlineCallbacks
def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None):
"""

View file

@ -21,9 +21,15 @@ alpha = 3
#: Maximum number of contacts stored in a bucket; this should be an even number
k = 8
#: Maximum number of contacts stored in the replacement cache
replacementCacheSize = 8
#: Timeout for network operations (in seconds)
rpcTimeout = 5
# number of rpc attempts to make before a timeout results in the node being removed as a contact
rpcAttempts = 5
# Delay between iterations of iterative node lookups (for loose parallelism) (in seconds)
iterativeLookupDelay = rpcTimeout / 2
@ -53,3 +59,5 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj
h = get_lbry_hash_obj()
key_bits = h.digest_size * 8 # 384 bits
rpc_id_length = 20

View file

@ -1,13 +1,3 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
class Contact(object):
""" Encapsulation for remote contact

View file

@ -1,33 +1,13 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import UserDict
import time
import constants
from interface import IDataStore
from zope.interface import implements
class DataStore(UserDict.DictMixin):
""" Interface for classes implementing physical storage (for data
published via the "STORE" RPC) for the Kademlia DHT
@note: This provides an interface for a dict-like object
"""
def keys(self):
""" Return a list of the keys in this data store """
def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID):
pass
class DictDataStore(DataStore):
class DictDataStore(UserDict.DictMixin):
""" A datastore using an in-memory Python dictionary """
implements(IDataStore)
def __init__(self):
# Dictionary format:
@ -64,3 +44,9 @@ class DictDataStore(DataStore):
def getPeersForBlob(self, key):
if key in self._dict:
return [val[0] for val in self._dict[key]]
def removePeer(self, value):
for key in self._dict:
self._dict[key] = [val for val in self._dict[key] if val[0] != value]
if not self._dict[key]:
del self._dict[key]

22
lbrynet/dht/delay.py Normal file
View file

@ -0,0 +1,22 @@
import time
class Delay(object):
maxToSendDelay = 10 ** -3 # 0.05
minToSendDelay = 10 ** -5 # 0.01
def __init__(self, start=0):
self._next = start
# TODO: explain why this logic is like it is. And add tests that
# show that it actually does what it needs to do.
def __call__(self):
ts = time.time()
delay = 0
if ts >= self._next:
delay = self.minToSendDelay
self._next = ts + self.minToSendDelay
else:
delay = (self._next - ts) + self.maxToSendDelay
self._next += self.maxToSendDelay
return delay

View file

@ -1,17 +1,4 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
class DecodeError(Exception):
""" Should be raised by an C{Encoding} implementation if decode operation
fails
"""
from error import DecodeError
class Encoding(object):

38
lbrynet/dht/error.py Normal file
View file

@ -0,0 +1,38 @@
import binascii
import exceptions
# this is a dict of {"exceptions.<exception class name>": exception class} items used to raise
# remote built-in exceptions locally
BUILTIN_EXCEPTIONS = {
"exceptions.%s" % e: getattr(exceptions, e) for e in dir(exceptions) if not e.startswith("_")
}
class DecodeError(Exception):
"""
Should be raised by an C{Encoding} implementation if decode operation
fails
"""
pass
class BucketFull(Exception):
"""
Raised when the bucket is full
"""
pass
class UnknownRemoteException(Exception):
pass
class TimeoutError(Exception):
""" Raised when a RPC times out """
def __init__(self, remote_contact_id):
# remote_contact_id is a binary blob so we need to convert it
# into something more readable
msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id))
Exception.__init__(self, msg)
self.remote_contact_id = remote_contact_id

117
lbrynet/dht/interface.py Normal file
View file

@ -0,0 +1,117 @@
from zope.interface import Interface
class IDataStore(Interface):
""" Interface for classes implementing physical storage (for data
published via the "STORE" RPC) for the Kademlia DHT
@note: This provides an interface for a dict-like object
"""
def keys(self):
""" Return a list of the keys in this data store """
pass
def removeExpiredPeers(self):
pass
def hasPeersForBlob(self, key):
pass
def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID):
pass
def getPeersForBlob(self, key):
pass
def removePeer(self, key):
pass
class IRoutingTable(Interface):
""" Interface for RPC message translators/formatters
Classes inheriting from this should provide a suitable routing table for
a parent Node object (i.e. the local entity in the Kademlia network)
"""
def __init__(self, parentNodeID):
"""
@param parentNodeID: The n-bit node ID of the node to which this
routing table belongs
@type parentNodeID: str
"""
def addContact(self, contact):
""" Add the given contact to the correct k-bucket; if it already
exists, its status will be updated
@param contact: The contact to add to this node's k-buckets
@type contact: kademlia.contact.Contact
"""
def findCloseNodes(self, key, count, _rpcNodeID=None):
""" Finds a number of known nodes closest to the node/value with the
specified key.
@param key: the n-bit key (i.e. the node or value ID) to search for
@type key: str
@param count: the amount of contacts to return
@type count: int
@param _rpcNodeID: Used during RPC, this is be the sender's Node ID
Whatever ID is passed in the paramater will get
excluded from the list of returned contacts.
@type _rpcNodeID: str
@return: A list of node contacts (C{kademlia.contact.Contact instances})
closest to the specified key.
This method will return C{k} (or C{count}, if specified)
contacts if at all possible; it will only return fewer if the
node is returning all of the contacts that it knows of.
@rtype: list
"""
def getContact(self, contactID):
""" Returns the (known) contact with the specified node ID
@raise ValueError: No contact with the specified contact ID is known
by this node
"""
def getRefreshList(self, startIndex=0, force=False):
""" Finds all k-buckets that need refreshing, starting at the
k-bucket with the specified index, and returns IDs to be searched for
in order to refresh those k-buckets
@param startIndex: The index of the bucket to start refreshing at;
this bucket and those further away from it will
be refreshed. For example, when joining the
network, this node will set this to the index of
the bucket after the one containing it's closest
neighbour.
@type startIndex: index
@param force: If this is C{True}, all buckets (in the specified range)
will be refreshed, regardless of the time they were last
accessed.
@type force: bool
@return: A list of node ID's that the parent node should search for
in order to refresh the routing Table
@rtype: list
"""
def removeContact(self, contactID):
""" Remove the contact with the specified node ID from the routing
table
@param contactID: The node ID of the contact to remove
@type contactID: str
"""
def touchKBucket(self, key):
""" Update the "last accessed" timestamp of the k-bucket which covers
the range containing the specified key in the key/ID space
@param key: A key in the range of the target k-bucket
@type key: str
"""

View file

@ -1,17 +1,5 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import constants
class BucketFull(Exception):
""" Raised when the bucket is full """
from error import BucketFull
class KBucket(object):

View file

@ -8,12 +8,17 @@
# may be created by processing this file with epydoc: http://epydoc.sf.net
from lbrynet.core.utils import generate_id
import constants
class Message(object):
""" Base class for messages - all "unknown" messages use this class """
def __init__(self, rpcID, nodeID):
if len(rpcID) != constants.rpc_id_length:
raise ValueError("invalid rpc id: %i bytes (expected 20)" % len(rpcID))
if len(nodeID) != constants.key_bits / 8:
raise ValueError("invalid node id: %i bytes (expected 48)" % len(nodeID))
self.id = rpcID
self.nodeID = nodeID
@ -23,7 +28,7 @@ class RequestMessage(Message):
def __init__(self, nodeID, method, methodArgs, rpcID=None):
if rpcID is None:
rpcID = generate_id()
rpcID = generate_id()[:constants.rpc_id_length]
Message.__init__(self, rpcID, nodeID)
self.request = method
self.args = methodArgs

View file

@ -12,7 +12,7 @@ import operator
import struct
import time
from twisted.internet import defer, error, reactor, threads
from twisted.internet import defer, error, reactor, threads, task
import constants
import routingtable
@ -49,8 +49,8 @@ class Node(object):
application is performed via this class (or a subclass).
"""
def __init__(self, id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None, lbryid=None,
def __init__(self, node_id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None,
externalIP=None):
"""
@param dataStore: The data store to use. This must be class inheriting
@ -74,11 +74,7 @@ class Node(object):
being transmitted.
@type networkProtocol: entangled.kademlia.protocol.KademliaProtocol
"""
if id != None:
self.id = id
else:
self.id = self._generateID()
self.lbryid = lbryid
self.node_id = node_id or self._generateID()
self.port = udpPort
self._listeningPort = None # object implementing Twisted
# IListeningPort This will contain a deferred created when
@ -88,12 +84,12 @@ class Node(object):
# operations before the node has finished joining the network)
self._joinDeferred = None
self.next_refresh_call = None
self.next_change_token_call = None
self.change_token_lc = task.LoopingCall(self.change_token)
# Create k-buckets (for storing contacts)
if routingTableClass is None:
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.id)
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id)
else:
self._routingTable = routingTableClass(self.id)
self._routingTable = routingTableClass(self.node_id)
# Initialize this node's network access mechanisms
if networkProtocol is None:
@ -103,7 +99,6 @@ class Node(object):
# Initialize the data storage mechanism used by this node
self.token_secret = self._generateID()
self.old_token_secret = None
self.change_token()
if dataStore is None:
self._dataStore = datastore.DictDataStore()
else:
@ -111,7 +106,7 @@ class Node(object):
# Try to restore the node's state...
if 'nodeState' in self._dataStore:
state = self._dataStore['nodeState']
self.id = state['id']
self.node_id = state['id']
for contactTriple in state['closestNodes']:
contact = Contact(
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
@ -128,9 +123,8 @@ class Node(object):
if self.next_refresh_call is not None:
self.next_refresh_call.cancel()
self.next_refresh_call = None
if self.next_change_token_call is not None:
self.next_change_token_call.cancel()
self.next_change_token_call = None
if self.change_token_lc.running:
self.change_token_lc.stop()
if self._listeningPort is not None:
self._listeningPort.stopListening()
self.hash_watcher.stop()
@ -163,8 +157,12 @@ class Node(object):
bootstrapContacts.append(contact)
else:
bootstrapContacts = None
# Start the token looping call
self.change_token_lc.start(constants.tokenSecretChangeInterval)
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts)
self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
@ -173,18 +171,27 @@ class Node(object):
self.hash_watcher.tick()
yield self._joinDeferred
@property
def contacts(self):
def _inner():
for i in range(len(self._routingTable._buckets)):
for contact in self._routingTable._buckets[i]._contacts:
yield contact
return list(_inner())
def printContacts(self, *args):
print '\n\nNODE CONTACTS\n==============='
for i in range(len(self._routingTable._buckets)):
print "bucket %i" % i
for contact in self._routingTable._buckets[i]._contacts:
print contact
print " %s:%i" % (contact.address, contact.port)
print '=================================='
def getApproximateTotalDHTNodes(self):
# get the deepest bucket and the number of contacts in that bucket and multiply it
# by the number of equivalently deep buckets in the whole DHT to get a really bad
# estimate!
bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)]
bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.node_id)]
num_in_bucket = len(bucket._contacts)
factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin)
return num_in_bucket * factor
@ -200,30 +207,24 @@ class Node(object):
return num_in_data_store * self.getApproximateTotalDHTNodes() / 8
def announceHaveBlob(self, key, port):
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid})
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id})
@defer.inlineCallbacks
def getPeersForBlob(self, blob_hash):
def expand_and_filter(result):
expanded_peers = []
if isinstance(result, dict):
if blob_hash in result:
for peer in result[blob_hash]:
if self.lbryid != peer[6:]:
host = ".".join([str(ord(d)) for d in peer[:4]])
if host == "127.0.0.1":
if "from_peer" in result:
if result["from_peer"] != "self":
host = result["from_peer"]
port, = struct.unpack('>H', peer[4:6])
result = yield self.iterativeFindValue(blob_hash)
expanded_peers = []
if result:
if blob_hash in result:
for peer in result[blob_hash]:
if self.node_id != peer[6:]:
host = ".".join([str(ord(d)) for d in peer[:4]])
if host == "127.0.0.1" and "from_peer" in result \
and result["from_peer"] != "self":
host = result["from_peer"]
port, = struct.unpack('>H', peer[4:6])
if (host, port) not in expanded_peers:
expanded_peers.append((host, port))
return expanded_peers
def find_failed(err):
return []
d = self.iterativeFindValue(blob_hash)
d.addCallbacks(expand_and_filter, find_failed)
return d
defer.returnValue(expanded_peers)
def get_most_popular_hashes(self, num_to_return):
return self.hash_watcher.most_popular_hashes(num_to_return)
@ -263,7 +264,7 @@ class Node(object):
result = responseMsg.response
if 'token' in result:
value['token'] = result['token']
d = n.store(blob_hash, value, self.id, 0)
d = n.store(blob_hash, value, self.node_id, 0)
d.addCallback(log_success)
d.addErrback(log_error, n)
else:
@ -272,12 +273,12 @@ class Node(object):
def requestPeers(contacts):
if self.externalIP is not None and len(contacts) >= constants.k:
is_closer = Distance(blob_hash).is_closer(self.id, contacts[-1].id)
is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id)
if is_closer:
contacts.pop()
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id)
elif self.externalIP is not None:
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id)
ds = []
for contact in contacts:
known_nodes[contact.id] = contact
@ -295,8 +296,6 @@ class Node(object):
def change_token(self):
self.old_token_secret = self.token_secret
self.token_secret = self._generateID()
self.next_change_token_call = reactor.callLater(constants.tokenSecretChangeInterval,
self.change_token)
def make_token(self, compact_ip):
h = hashlib.new('sha384')
@ -463,14 +462,13 @@ class Node(object):
raise TypeError, 'No NodeID given. Therefore we can\'t store this node'
if self_store is True and self.externalIP:
contact = Contact(self.id, self.externalIP, self.port, None, None)
contact = Contact(self.node_id, self.externalIP, self.port, None, None)
compact_ip = contact.compact_ip()
elif '_rpcNodeContact' in kwargs:
contact = kwargs['_rpcNodeContact']
compact_ip = contact.compact_ip()
else:
return 'Not OK'
# raise TypeError, 'No contact info available'
raise TypeError, 'No contact info available'
if ((self_store is False) and
('token' not in value or not self.verify_token(value['token'], compact_ip))):
@ -486,8 +484,9 @@ class Node(object):
raise TypeError, 'No port available'
if 'lbryid' in value:
if len(value['lbryid']) > constants.key_bits:
raise ValueError, 'Invalid lbryid'
if len(value['lbryid']) != constants.key_bits / 8:
raise ValueError('Invalid lbryid (%i bytes): %s' % (len(value['lbryid']),
value['lbryid'].encode('hex')))
else:
compact_address = compact_ip + compact_port + value['lbryid']
else:
@ -513,6 +512,7 @@ class Node(object):
node is returning all of the contacts that it knows of.
@rtype: list
"""
# Get the sender's ID (if any)
if '_rpcNodeID' in kwargs:
rpc_sender_id = kwargs['_rpcNodeID']
@ -589,11 +589,12 @@ class Node(object):
findValue = rpc != 'findNode'
if startupShortlist is None:
shortlist = self._routingTable.findCloseNodes(key, constants.alpha)
if key != self.id:
shortlist = self._routingTable.findCloseNodes(key, constants.k)
if key != self.node_id:
# Update the "last accessed" timestamp for the appropriate k-bucket
self._routingTable.touchKBucket(key)
if len(shortlist) == 0:
log.warning("This node doesnt know any other nodes")
# This node doesn't know of any other nodes
fakeDf = defer.Deferred()
fakeDf.callback([])
@ -686,7 +687,7 @@ class _IterativeFindHelper(object):
responseMsg = responseTuple[0]
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
# Make sure the responding node is valid, and abort the operation if it isn't
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id:
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.node_id:
return responseMsg.nodeID
# Mark this node as active
@ -752,10 +753,11 @@ class _IterativeFindHelper(object):
if testContact not in self.shortlist:
self.shortlist.append(testContact)
def removeFromShortlist(self, failure):
def removeFromShortlist(self, failure, deadContactID):
""" @type failure: twisted.python.failure.Failure """
failure.trap(protocol.TimeoutError)
deadContactID = failure.getErrorMessage()
if len(deadContactID) != constants.key_bits / 8:
raise ValueError("invalid lbry id")
if deadContactID in self.shortlist:
self.shortlist.remove(deadContactID)
return deadContactID
@ -825,7 +827,7 @@ class _IterativeFindHelper(object):
rpcMethod = getattr(contact, self.rpc)
df = rpcMethod(self.key, rawResponse=True)
df.addCallback(self.extendShortlist)
df.addErrback(self.removeFromShortlist)
df.addErrback(self.removeFromShortlist, contact.id)
df.addCallback(self.cancelActiveProbe)
df.addErrback(lambda _: log.exception('Failed to contact %s', contact))
self.already_contacted.append(contact.id)

View file

@ -1,62 +1,21 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import logging
import binascii
import time
import socket
import errno
from twisted.internet import protocol, defer, error, reactor, task
from twisted.python import failure
import constants
import encoding
import msgtypes
import msgformat
from contact import Contact
from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError
from delay import Delay
log = logging.getLogger(__name__)
class TimeoutError(Exception):
""" Raised when a RPC times out """
def __init__(self, remote_contact_id):
# remote_contact_id is a binary blob so we need to convert it
# into something more readable
msg = 'Timeout connecting to {}'.format(binascii.hexlify(remote_contact_id))
Exception.__init__(self, msg)
self.remote_contact_id = remote_contact_id
class Delay(object):
maxToSendDelay = 10 ** -3 # 0.05
minToSendDelay = 10 ** -5 # 0.01
def __init__(self, start=0):
self._next = start
# TODO: explain why this logic is like it is. And add tests that
# show that it actually does what it needs to do.
def __call__(self):
ts = time.time()
delay = 0
if ts >= self._next:
delay = self.minToSendDelay
self._next = ts + self.minToSendDelay
else:
delay = (self._next - ts) + self.maxToSendDelay
self._next += self.maxToSendDelay
return delay
class KademliaProtocol(protocol.DatagramProtocol):
""" Implements all low-level network-related functions of a Kademlia node """
@ -195,11 +154,14 @@ class KademliaProtocol(protocol.DatagramProtocol):
C{ErrorMessage}).
@rtype: twisted.internet.defer.Deferred
"""
msg = msgtypes.RequestMessage(self._node.id, method, args)
msg = msgtypes.RequestMessage(self._node.node_id, method, args)
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex'))
if args:
log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex'))
else:
log.debug("DHT SEND CALL %s", method)
df = defer.Deferred()
if rawResponse:
@ -209,7 +171,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id)
# Transmit the data
self._send(encodedMsg, msg.id, (contact.address, contact.port))
self._sentMessages[msg.id] = (contact.id, df, timeoutCall)
self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args)
return df
def startProtocol(self):
@ -243,14 +205,14 @@ class KademliaProtocol(protocol.DatagramProtocol):
return
try:
msgPrimitive = self._encoder.decode(datagram)
except encoding.DecodeError:
message = self._translator.fromPrimitive(msgPrimitive)
except (encoding.DecodeError, ValueError):
# We received some rubbish here
return
except IndexError:
log.warning("Couldn't decode dht datagram from %s", address)
return
message = self._translator.fromPrimitive(msgPrimitive)
remoteContact = Contact(message.nodeID, address[0], address[1], self)
now = time.time()
@ -286,7 +248,12 @@ class KademliaProtocol(protocol.DatagramProtocol):
df.callback((message, address))
elif isinstance(message, msgtypes.ErrorMessage):
# The RPC request raised a remote exception; raise it locally
remoteException = Exception(message.response)
if message.exceptionType in BUILTIN_EXCEPTIONS:
exception_type = BUILTIN_EXCEPTIONS[message.exceptionType]
else:
exception_type = UnknownRemoteException
remoteException = exception_type(message.response)
log.error("Remote exception (%s): %s", address, remoteException)
df.errback(remoteException)
else:
# We got a result from the RPC
@ -377,7 +344,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
def _sendResponse(self, contact, rpcID, response):
""" Send a RPC response to the specified contact
"""
msg = msgtypes.ResponseMessage(rpcID, self._node.id, response)
msg = msgtypes.ResponseMessage(rpcID, self._node.node_id, response)
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
self._send(encodedMsg, rpcID, (contact.address, contact.port))
@ -385,7 +352,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
def _sendError(self, contact, rpcID, exceptionType, exceptionMessage):
""" Send an RPC error message to the specified contact
"""
msg = msgtypes.ErrorMessage(rpcID, self._node.id, exceptionType, exceptionMessage)
msg = msgtypes.ErrorMessage(rpcID, self._node.node_id, exceptionType, exceptionMessage)
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
self._send(encodedMsg, rpcID, (contact.address, contact.port))
@ -408,48 +375,58 @@ class KademliaProtocol(protocol.DatagramProtocol):
func = getattr(self._node, method, None)
if callable(func) and hasattr(func, 'rpcmethod'):
# Call the exposed Node method and return the result to the deferred callback chain
log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'),
senderContact.address, senderContact.port)
if args:
log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'),
senderContact.address, senderContact.port)
else:
log.debug("DHT RECV CALL %s %s:%i", method, senderContact.address,
senderContact.port)
try:
kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact}
result = func(*args, **kwargs)
if method != 'ping':
kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact}
result = func(*args, **kwargs)
else:
result = func()
except Exception, e:
df.errback(failure.Failure(e))
log.exception("error handling request for %s: %s", senderContact.address, method)
df.errback(e)
else:
df.callback(result)
else:
# No such exposed method
df.errback(failure.Failure(AttributeError('Invalid method: %s' % method)))
df.errback(AttributeError('Invalid method: %s' % method))
def _msgTimeout(self, messageID):
""" Called when an RPC request message times out """
# Find the message that timed out
if not self._sentMessages.has_key(messageID):
if messageID not in self._sentMessages:
# This should never be reached
log.error("deferred timed out, but is not present in sent messages list!")
return
remoteContactID, df = self._sentMessages[messageID][0:2]
remoteContactID, df, timeout_call, method, args = self._sentMessages[messageID]
if self._partialMessages.has_key(messageID):
# We are still receiving this message
self._msgTimeoutInProgress(messageID, remoteContactID, df)
self._msgTimeoutInProgress(messageID, remoteContactID, df, method, args)
return
del self._sentMessages[messageID]
# The message's destination node is now considered to be dead;
# raise an (asynchronous) TimeoutError exception and update the host node
self._node.removeContact(remoteContactID)
df.errback(failure.Failure(TimeoutError(remoteContactID)))
df.errback(TimeoutError(remoteContactID))
def _msgTimeoutInProgress(self, messageID, remoteContactID, df):
def _msgTimeoutInProgress(self, messageID, remoteContactID, df, method, args):
# See if any progress has been made; if not, kill the message
if self._hasProgressBeenMade(messageID):
# Reset the RPC timeout timer
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, messageID)
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall)
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args)
else:
# No progress has been made
del self._partialMessagesProgress[messageID]
del self._partialMessages[messageID]
df.errback(failure.Failure(TimeoutError(remoteContactID)))
if messageID in self._partialMessagesProgress:
del self._partialMessagesProgress[messageID]
if messageID in self._partialMessages:
del self._partialMessages[messageID]
df.errback(TimeoutError(remoteContactID))
def _hasProgressBeenMade(self, messageID):
return (

View file

@ -7,102 +7,17 @@
import time
import random
from zope.interface import implements
import constants
import kbucket
import protocol
from interface import IRoutingTable
import logging
from protocol import TimeoutError
log = logging.getLogger(__name__)
class RoutingTable(object):
""" Interface for RPC message translators/formatters
Classes inheriting from this should provide a suitable routing table for
a parent Node object (i.e. the local entity in the Kademlia network)
"""
def __init__(self, parentNodeID):
"""
@param parentNodeID: The n-bit node ID of the node to which this
routing table belongs
@type parentNodeID: str
"""
def addContact(self, contact):
""" Add the given contact to the correct k-bucket; if it already
exists, its status will be updated
@param contact: The contact to add to this node's k-buckets
@type contact: kademlia.contact.Contact
"""
def findCloseNodes(self, key, count, _rpcNodeID=None):
""" Finds a number of known nodes closest to the node/value with the
specified key.
@param key: the n-bit key (i.e. the node or value ID) to search for
@type key: str
@param count: the amount of contacts to return
@type count: int
@param _rpcNodeID: Used during RPC, this is be the sender's Node ID
Whatever ID is passed in the paramater will get
excluded from the list of returned contacts.
@type _rpcNodeID: str
@return: A list of node contacts (C{kademlia.contact.Contact instances})
closest to the specified key.
This method will return C{k} (or C{count}, if specified)
contacts if at all possible; it will only return fewer if the
node is returning all of the contacts that it knows of.
@rtype: list
"""
def getContact(self, contactID):
""" Returns the (known) contact with the specified node ID
@raise ValueError: No contact with the specified contact ID is known
by this node
"""
def getRefreshList(self, startIndex=0, force=False):
""" Finds all k-buckets that need refreshing, starting at the
k-bucket with the specified index, and returns IDs to be searched for
in order to refresh those k-buckets
@param startIndex: The index of the bucket to start refreshing at;
this bucket and those further away from it will
be refreshed. For example, when joining the
network, this node will set this to the index of
the bucket after the one containing it's closest
neighbour.
@type startIndex: index
@param force: If this is C{True}, all buckets (in the specified range)
will be refreshed, regardless of the time they were last
accessed.
@type force: bool
@return: A list of node ID's that the parent node should search for
in order to refresh the routing Table
@rtype: list
"""
def removeContact(self, contactID):
""" Remove the contact with the specified node ID from the routing
table
@param contactID: The node ID of the contact to remove
@type contactID: str
"""
def touchKBucket(self, key):
""" Update the "last accessed" timestamp of the k-bucket which covers
the range containing the specified key in the key/ID space
@param key: A key in the range of the target k-bucket
@type key: str
"""
class TreeRoutingTable(RoutingTable):
class TreeRoutingTable(object):
""" This class implements a routing table used by a Node class.
The Kademlia routing table is a binary tree whose leaves are k-buckets,
@ -117,6 +32,7 @@ class TreeRoutingTable(RoutingTable):
C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of
that paper.
"""
implements(IRoutingTable)
def __init__(self, parentNodeID):
"""
@ -161,17 +77,18 @@ class TreeRoutingTable(RoutingTable):
# the k-bucket. This implementation follows section
# 2.2 regarding this point.
def replaceContact(failure):
def replaceContact(failure, deadContactID):
""" Callback for the deferred PING RPC to see if the head
node in the k-bucket is still responding
@type failure: twisted.python.failure.Failure
"""
failure.trap(TimeoutError)
print '==replacing contact=='
# Remove the old contact...
deadContactID = failure.getErrorMessage()
failure.trap(protocol.TimeoutError)
if len(deadContactID) != constants.key_bits / 8:
raise ValueError("invalid contact id")
log.debug("Replacing dead contact: %s", deadContactID.encode('hex'))
try:
# Remove the old contact...
self._buckets[bucketIndex].removeContact(deadContactID)
except ValueError:
# The contact has already been removed (probably due to a timeout)
@ -184,7 +101,7 @@ class TreeRoutingTable(RoutingTable):
df = head_contact.ping()
# If there's an error (i.e. timeout), remove the head
# contact, and append the new one
df.addErrback(replaceContact)
df.addErrback(replaceContact, head_contact.id)
def findCloseNodes(self, key, count, _rpcNodeID=None):
""" Finds a number of known nodes closest to the node/value with the
@ -207,7 +124,11 @@ class TreeRoutingTable(RoutingTable):
@rtype: list
"""
bucketIndex = self._kbucketIndex(key)
closestNodes = self._buckets[bucketIndex].getContacts(constants.k, _rpcNodeID)
if bucketIndex < len(self._buckets):
closestNodes = self._buckets[bucketIndex].getContacts(count, _rpcNodeID)
else:
closestNodes = []
# This method must return k contacts (even if we have the node
# with the specified key as node ID), unless there is less
# than k remote nodes in the routing table
@ -215,7 +136,7 @@ class TreeRoutingTable(RoutingTable):
canGoLower = bucketIndex - i >= 0
canGoHigher = bucketIndex + i < len(self._buckets)
# Fill up the node list to k nodes, starting with the closest neighbouring nodes known
while len(closestNodes) < constants.k and (canGoLower or canGoHigher):
while len(closestNodes) < min(count, constants.k) and (canGoLower or canGoHigher):
# TODO: this may need to be optimized
if canGoLower:
closestNodes.extend(
@ -224,8 +145,8 @@ class TreeRoutingTable(RoutingTable):
canGoLower = bucketIndex - (i + 1) >= 0
if canGoHigher:
closestNodes.extend(
self._buckets[bucketIndex + i].getContacts(
constants.k - len(closestNodes), _rpcNodeID))
self._buckets[bucketIndex + i].getContacts(constants.k - len(closestNodes),
_rpcNodeID))
canGoHigher = bucketIndex + (i + 1) < len(self._buckets)
i += 1
return closestNodes
@ -404,10 +325,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
self._replacementCache[bucketIndex] = []
if contact in self._replacementCache[bucketIndex]:
self._replacementCache[bucketIndex].remove(contact)
# TODO: Using k to limit the size of the contact
# replacement cache - maybe define a separate value for
# this in constants.py?
elif len(self._replacementCache[bucketIndex]) >= constants.k:
elif len(self._replacementCache[bucketIndex]) >= constants.replacementCacheSize:
self._replacementCache[bucketIndex].pop(0)
self._replacementCache[bucketIndex].append(contact)
@ -424,7 +342,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
except ValueError:
return
contact.failedRPCs += 1
if contact.failedRPCs >= 5:
if contact.failedRPCs >= constants.rpcAttempts:
self._buckets[bucketIndex].removeContact(contactID)
# Replace this stale contact with one from our replacement cache, if we have any
if bucketIndex in self._replacementCache:

View file

@ -53,9 +53,9 @@ class EncryptedFileManager(object):
def setup(self):
yield self._open_db()
yield self._add_to_sd_identifier()
yield self._start_lbry_files()
if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
# don't block on starting the lbry files
self._start_lbry_files()
log.info("Started file manager")
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
@ -119,6 +119,9 @@ class EncryptedFileManager(object):
self._set_options_and_restore(rowid, stream_hash, options)
for rowid, stream_hash, options in files_and_options
])
if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
log.info("Started %i lbry files", len(self.lbry_files))
@defer.inlineCallbacks

View file

View file

@ -8,10 +8,13 @@ import hashlib
import unittest
import struct
from twisted.internet import protocol, defer, selectreactor
from lbrynet.dht.msgtypes import ResponseMessage
import lbrynet.dht.node
import lbrynet.dht.constants
import lbrynet.dht.datastore
class NodeIDTest(unittest.TestCase):
""" Test case for the Node class's ID """
def setUp(self):
@ -19,71 +22,76 @@ class NodeIDTest(unittest.TestCase):
def testAutoCreatedID(self):
""" Tests if a new node has a valid node ID """
self.failUnlessEqual(type(self.node.id), str, 'Node does not have a valid ID')
self.failUnlessEqual(len(self.node.id), 20, 'Node ID length is incorrect! Expected 160 bits, got %d bits.' % (len(self.node.id)*8))
self.failUnlessEqual(type(self.node.node_id), str, 'Node does not have a valid ID')
self.failUnlessEqual(len(self.node.node_id), 48, 'Node ID length is incorrect! '
'Expected 384 bits, got %d bits.' %
(len(self.node.node_id) * 8))
def testUniqueness(self):
""" Tests the uniqueness of the values created by the NodeID generator
"""
""" Tests the uniqueness of the values created by the NodeID generator """
generatedIDs = []
for i in range(100):
newID = self.node._generateID()
# ugly uniqueness test
self.failIf(newID in generatedIDs, 'Generated ID #%d not unique!' % (i+1))
generatedIDs.append(newID)
def testKeyLength(self):
""" Tests the key Node ID key length """
for i in range(20):
id = self.node._generateID()
# Key length: 20 bytes == 160 bits
self.failUnlessEqual(len(id), 20, 'Length of generated ID is incorrect! Expected 160 bits, got %d bits.' % (len(id)*8))
self.failUnlessEqual(len(id), 48,
'Length of generated ID is incorrect! Expected 384 bits, '
'got %d bits.' % (len(id)*8))
class NodeDataTest(unittest.TestCase):
""" Test case for the Node class's data-related functions """
def setUp(self):
import lbrynet.dht.contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('test')
self.node = lbrynet.dht.node.Node()
self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345, self.node._protocol)
self.contact = lbrynet.dht.contact.Contact(h.digest(), '127.0.0.1', 12345,
self.node._protocol)
self.token = self.node.make_token(self.contact.compact_ip())
self.cases = []
for i in xrange(5):
h.update(str(i))
self.cases.append((h.digest(), 5000+2*i))
self.cases.append((h.digest(), 5001+2*i))
<<<<<<< Updated upstream
#(('a', 'hello there\nthis is a test'),
# ('aMuchLongerKeyThanAnyOfThePreviousOnes', 'some data'))
=======
>>>>>>> Stashed changes
@defer.inlineCallbacks
def testStore(self):
def check_val_in_result(r, peer_info):
self.failUnless
""" Tests if the node can store (and privately retrieve) some data """
for key, value in self.cases:
self.node.store(key, {'port': value, 'bbid': self.contact.id, 'token': self.token}, self.contact.id, _rpcNodeContact=self.contact)
request = {
'port': value,
'lbryid': self.contact.id,
'token': self.token
}
yield self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact)
for key, value in self.cases:
expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + self.contact.id
self.failUnless(self.node._dataStore.hasPeersForBlob(key), 'Stored key not found in node\'s DataStore: "%s"' % key)
self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key), 'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s' % (key, value, self.node._dataStore.getPeersForBlob(key)))
expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + \
self.contact.id
self.failUnless(self.node._dataStore.hasPeersForBlob(key),
'Stored key not found in node\'s DataStore: "%s"' % key)
self.failUnless(expected_result in self.node._dataStore.getPeersForBlob(key),
'Stored val not found in node\'s DataStore: key:"%s" port:"%s" %s'
% (key, value, self.node._dataStore.getPeersForBlob(key)))
class NodeContactTest(unittest.TestCase):
""" Test case for the Node class's contact management-related functions """
def setUp(self):
self.node = lbrynet.dht.node.Node()
def testAddContact(self):
""" Tests if a contact can be added and retrieved correctly """
import lbrynet.dht.contact
# Create the contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node1')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.node._protocol)
@ -91,67 +99,60 @@ class NodeContactTest(unittest.TestCase):
self.node.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()')
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; '
'expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
def testAddSelfAsContact(self):
""" Tests the node's behaviour when attempting to add itself as a contact """
import lbrynet.dht.contact
# Create a contact with the same ID as the local node's ID
contact = lbrynet.dht.contact.Contact(self.node.id, '127.0.0.1', 91824, None)
contact = lbrynet.dht.contact.Contact(self.node.node_id, '127.0.0.1', 91824, None)
# Now try to add it
self.node.addContact(contact)
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.node._routingTable.findCloseNodes(self.node.id, lbrynet.dht.constants.k)
closestNodes = self.node._routingTable.findCloseNodes(self.node.node_id,
lbrynet.dht.constants.k)
self.failIf(contact in closestNodes, 'Node added itself as a contact')
<<<<<<< Updated upstream
# """ Test case for the Node class's iterative node lookup algorithm """
# """ Ugly brute-force test to see if the iterative node lookup algorithm runs without failing """
=======
>>>>>>> Stashed changes
"""Some scaffolding for the NodeLookupTest class. Allows isolated
node testing by simulating remote node responses"""
from twisted.internet import protocol, defer, selectreactor
from lbrynet.dht.msgtypes import ResponseMessage
class FakeRPCProtocol(protocol.DatagramProtocol):
def __init__(self):
self.reactor = selectreactor.SelectReactor()
self.reactor = selectreactor.SelectReactor()
self.testResponse = None
self.network = None
def createNetwork(self, contactNetwork):
""" set up a list of contacts together with their closest contacts
@param contactNetwork: a sequence of tuples, each containing a contact together with its closest
contacts: C{(<contact>, <closest contact 1, ...,closest contact n>)}
"""
self.network = contactNetwork
""" Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs """
"""
set up a list of contacts together with their closest contacts
@param contactNetwork: a sequence of tuples, each containing a contact together with its
closest contacts: C{(<contact>, <closest contact 1, ...,closest contact n>)}
"""
self.network = contactNetwork
def sendRPC(self, contact, method, args, rawResponse=False):
if method == "findNode":
""" Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs"""
h = hashlib.sha384()
h.update('rpcId')
rpc_id = h.digest()[:20]
if method == "findNode":
# get the specific contacts closest contacts
closestContacts = []
closestContactsList = []
for contactTuple in self.network:
if contact == contactTuple[0]:
# get the list of closest contacts for this contact
closestContactsList = contactTuple[1]
# Pack the closest contacts into a ResponseMessage
# Pack the closest contacts into a ResponseMessage
for closeContact in closestContactsList:
closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
message = ResponseMessage("rpcId", contact.id, closestContacts)
message = ResponseMessage(rpc_id, contact.id, closestContacts)
df = defer.Deferred()
df.callback((message,(contact.address, contact.port)))
df.callback((message, (contact.address, contact.port)))
return df
elif method == "findValue":
for contactTuple in self.network:
@ -160,12 +161,10 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
dataDict = contactTuple[2]
dataKey = dataDict.keys()[0]
data = dataDict.get(dataKey)
# Check if this contact has the requested value
if dataKey == args[0]:
# Return the data value
response = dataDict
print "data found at contact: " + contact.id
else:
# Return the closest contact to the requested data key
@ -173,62 +172,55 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
closeContacts = contactTuple[1]
closestContacts = []
for closeContact in closeContacts:
closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
closestContacts.append((closeContact.id, closeContact.address,
closeContact.port))
response = closestContacts
# Create the response message
message = ResponseMessage("rpcId", contact.id, response)
message = ResponseMessage(rpc_id, contact.id, response)
df = defer.Deferred()
df.callback((message,(contact.address, contact.port)))
df.callback((message, (contact.address, contact.port)))
return df
def _send(self, data, rpcID, address):
""" fake sending data """
class NodeLookupTest(unittest.TestCase):
""" Test case for the Node class's iterativeFind node lookup algorithm """
def setUp(self):
# create a fake protocol to imitate communication with other nodes
self._protocol = FakeRPCProtocol()
# Note: The reactor is never started for this test. All deferred calls run sequentially,
# Note: The reactor is never started for this test. All deferred calls run sequentially,
# since there is no asynchronous network communication
# create the node to be tested in isolation
self.node = lbrynet.dht.node.Node(None, 4000, None, None, self._protocol)
h = hashlib.sha384()
h.update('node1')
node_id = str(h.digest())
self.node = lbrynet.dht.node.Node(node_id, 4000, None, None, self._protocol)
self.updPort = 81173
<<<<<<< Updated upstream
# create a dummy reactor
=======
>>>>>>> Stashed changes
self.contactsAmount = 80
# set the node ID manually for testing
self.node.id = '12345678901234567800'
# Reinitialise the routing table
self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(self.node.id)
self.node._routingTable = lbrynet.dht.routingtable.OptimizedTreeRoutingTable(
self.node.node_id)
# create 160 bit node ID's for test purposes
self.testNodeIDs = []
idNum = int(self.node.id)
idNum = int(self.node.node_id.encode('hex'), 16)
for i in range(self.contactsAmount):
# create the testNodeIDs in ascending order, away from the actual node ID, with regards to the distance metric
self.testNodeIDs.append(idNum + i + 1)
# create the testNodeIDs in ascending order, away from the actual node ID,
# with regards to the distance metric
self.testNodeIDs.append(str("%X" % (idNum + i + 1)).decode('hex'))
# generate contacts
self.contacts = []
for i in range(self.contactsAmount):
contact = lbrynet.dht.contact.Contact(str(self.testNodeIDs[i]), "127.0.0.1", self.updPort + i + 1, self._protocol)
contact = lbrynet.dht.contact.Contact(self.testNodeIDs[i], "127.0.0.1",
self.updPort + i + 1, self._protocol)
self.contacts.append(contact)
# create the network of contacts in format: (contact, closest contacts)
# create the network of contacts in format: (contact, closest contacts)
contactNetwork = ((self.contacts[0], self.contacts[8:15]),
(self.contacts[1], self.contacts[16:23]),
(self.contacts[2], self.contacts[24:31]),
@ -254,43 +246,28 @@ class NodeLookupTest(unittest.TestCase):
contacts_with_datastores = []
for contact_tuple in contactNetwork:
contacts_with_datastores.append((contact_tuple[0], contact_tuple[1], lbrynet.dht.datastore.DictDataStore()))
contacts_with_datastores.append((contact_tuple[0], contact_tuple[1],
lbrynet.dht.datastore.DictDataStore()))
self._protocol.createNetwork(contacts_with_datastores)
@defer.inlineCallbacks
def testNodeBootStrap(self):
""" Test bootstrap with the closest possible contacts """
df = self.node._iterativeFind(self.node.id, self.contacts[0:8])
activeContacts = yield self.node._iterativeFind(self.node.node_id, self.contacts[0:8])
# Set the expected result
expectedResult = []
expectedResult = set()
for item in self.contacts[0:6]:
expectedResult.append(item.id)
expectedResult.add(item.id)
# Get the result from the deferred
activeContacts = df.result
# Check the length of the active contacts
self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(), \
"More active contacts should exist, there should be %d contacts" %expectedResult.__len__())
self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(),
"More active contacts should exist, there should be %d "
"contacts but there are %d" % (len(expectedResult),
len(activeContacts)))
# Check that the received active contacts are the same as the input contacts
self.failUnlessEqual(activeContacts, expectedResult, \
"Active should only contain the closest possible contacts which were used as input for the boostrap")
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(NodeIDTest))
suite.addTest(unittest.makeSuite(NodeDataTest))
suite.addTest(unittest.makeSuite(NodeContactTest))
suite.addTest(unittest.makeSuite(NodeLookupTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())
self.failUnlessEqual({contact.id for contact in activeContacts}, expectedResult,
"Active should only contain the closest possible contacts"
" which were used as input for the boostrap")

View file

@ -1,88 +1,22 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
import time
import unittest
from twisted.internet import defer
from twisted.python import failure
import twisted.internet.selectreactor
from twisted.internet.protocol import DatagramProtocol
import lbrynet.dht.protocol
import lbrynet.dht.contact
import lbrynet.dht.constants
import lbrynet.dht.msgtypes
from lbrynet.dht.node import rpcmethod
from lbrynet.dht.error import TimeoutError
from lbrynet.dht.node import Node, rpcmethod
class FakeNode(object):
""" A fake node object implementing some RPC and non-RPC methods to
test the Kademlia protocol's behaviour
"""
def __init__(self, id):
self.id = id
self.contacts = []
@rpcmethod
def ping(self):
return 'pong'
def pingNoRPC(self):
return 'pong'
@rpcmethod
def echo(self, value):
return value
def addContact(self, contact):
self.contacts.append(contact)
def removeContact(self, contact):
self.contacts.remove(contact)
def indirectPingContact(self, protocol, contact):
""" Pings the given contact (using the specified KademliaProtocol
object, not the direct Contact API), and removes the contact
on a timeout """
df = protocol.sendRPC(contact, 'ping', {})
def handleError(f):
if f.check(lbrynet.dht.protocol.TimeoutError):
self.removeContact(contact)
return f
else:
# This is some other error
return f
df.addErrback(handleError)
return df
class ClientDatagramProtocol(lbrynet.dht.protocol.KademliaProtocol):
data = ''
msgID = ''
destination = ('127.0.0.1', 9182)
def __init__(self):
lbrynet.dht.protocol.KademliaProtocol.__init__(self, None)
def startProtocol(self):
self.sendDatagram()
def sendDatagram(self):
if len(self.data):
self._send(self.data, self.msgID, self.destination)
class KademliaProtocolTest(unittest.TestCase):
""" Test case for the Protocol class """
def setUp(self):
del lbrynet.dht.protocol.reactor
lbrynet.dht.protocol.reactor = twisted.internet.selectreactor.SelectReactor()
self.node = FakeNode('node1')
self.node = Node(node_id='1' * 48, udpPort=9182, externalIP="127.0.0.1")
self.protocol = lbrynet.dht.protocol.KademliaProtocol(self.node)
def testReactor(self):
@ -93,36 +27,66 @@ class KademliaProtocolTest(unittest.TestCase):
def testRPCTimeout(self):
""" Tests if a RPC message sent to a dead remote node times out correctly """
deadContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
@rpcmethod
def fake_ping(*args, **kwargs):
time.sleep(lbrynet.dht.constants.rpcTimeout + 1)
return 'pong'
real_ping = self.node.ping
real_timeout = lbrynet.dht.constants.rpcTimeout
real_attempts = lbrynet.dht.constants.rpcAttempts
lbrynet.dht.constants.rpcAttempts = 1
lbrynet.dht.constants.rpcTimeout = 1
self.node.ping = fake_ping
deadContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(deadContact)
# Make sure the contact was added
self.failIf(deadContact not in self.node.contacts, 'Contact not added to fake node (error in test code)')
# Set the timeout to 0 for testing
tempTimeout = lbrynet.dht.constants.rpcTimeout
lbrynet.dht.constants.rpcTimeout = 0
lbrynet.dht.protocol.reactor.listenUDP(0, self.protocol)
# Run the PING RPC (which should timeout)
df = self.node.indirectPingContact(self.protocol, deadContact)
self.failIf(deadContact not in self.node.contacts,
'Contact not added to fake node (error in test code)')
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Run the PING RPC (which should raise a timeout error)
df = self.protocol.sendRPC(deadContact, 'ping', {})
def check_timeout(err):
self.assertEqual(type(err), TimeoutError)
df.addErrback(check_timeout)
def reset_values():
self.node.ping = real_ping
lbrynet.dht.constants.rpcTimeout = real_timeout
lbrynet.dht.constants.rpcAttempts = real_attempts
# See if the contact was removed due to the timeout
def check_removed_contact():
self.failIf(deadContact in self.node.contacts,
'Contact was not removed after RPC timeout; check exception types.')
df.addCallback(lambda _: reset_values())
# Stop the reactor if a result arrives (timeout or not)
df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
df.addCallback(lambda _: check_removed_contact())
lbrynet.dht.protocol.reactor.run()
# See if the contact was removed due to the timeout
self.failIf(deadContact in self.node.contacts, 'Contact was not removed after RPC timeout; check exception types.')
# Restore the global timeout
lbrynet.dht.constants.rpcTimeout = tempTimeout
def testRPCRequest(self):
""" Tests if a valid RPC request is executed and responded to correctly """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result)
# Publish the "local" node on the network
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' \
% (expectedResult, result)
# Publish the "local" node on the network
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Simulate the RPC
df = remoteContact.ping()
@ -132,17 +96,19 @@ class KademliaProtocolTest(unittest.TestCase):
lbrynet.dht.protocol.reactor.run()
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
self.failUnlessEqual(len(self.protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCAccess(self):
""" Tests invalid RPC requests
Verifies that a RPC request for an existing but unpublished
method is denied, and that the associated (remote) exception gets
raised locally """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
try:
f.raiseException()
@ -150,51 +116,52 @@ class KademliaProtocolTest(unittest.TestCase):
# This is the expected outcome since the remote node did not publish the method
self.error = None
except Exception, e:
self.error = 'The remote method failed, but the wrong exception was raised; expected AttributeError, got %s' % type(e)
self.error = 'The remote method failed, but the wrong exception was raised; ' \
'expected AttributeError, got %s' % type(e)
def handleResult(result):
self.error = 'The remote method executed successfully, returning: "%s"; this RPC should not have been allowed.' % result
# Publish the "local" node on the network
self.error = 'The remote method executed successfully, returning: "%s"; ' \
'this RPC should not have been allowed.' % result
# Publish the "local" node on the network
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Simulate the RPC
df = remoteContact.pingNoRPC()
df = remoteContact.not_a_rpc_function()
df.addCallback(handleResult)
df.addErrback(handleError)
df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
lbrynet.dht.protocol.reactor.run()
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
self.failUnlessEqual(len(self.protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')
def testRPCRequestArgs(self):
""" Tests if an RPC requiring arguments is executed correctly """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
def handleError(f):
self.error = 'An RPC error occurred: %s' % f.getErrorMessage()
def handleResult(result):
expectedResult = 'This should be returned.'
if result != 'This should be returned.':
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % (expectedResult, result)
# Publish the "local" node on the network
expectedResult = 'pong'
if result != expectedResult:
self.error = 'Result from RPC is incorrect; expected "%s", got "%s"' % \
(expectedResult, result)
# Publish the "local" node on the network
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Simulate the RPC
df = remoteContact.echo('This should be returned.')
df = remoteContact.ping()
df.addCallback(handleResult)
df.addErrback(handleError)
df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
lbrynet.dht.protocol.reactor.run()
self.failIf(self.error, self.error)
# The list of sent RPC messages should be empty at this stage
self.failUnlessEqual(len(self.protocol._sentMessages), 0, 'The protocol is still waiting for a RPC result, but the transaction is already done!')
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(KademliaProtocolTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())
self.failUnlessEqual(len(self.protocol._sentMessages), 0,
'The protocol is still waiting for a RPC result, '
'but the transaction is already done!')

View file

@ -10,6 +10,8 @@ import unittest
import lbrynet.dht.constants
import lbrynet.dht.routingtable
import lbrynet.dht.contact
import lbrynet.dht.node
class FakeRPCProtocol(object):
""" Fake RPC protocol; allows lbrynet.dht.contact.Contact objects to "send" RPCs """
@ -21,41 +23,47 @@ class FakeDeferred(object):
""" Fake Twisted Deferred object; allows the routing table to add callbacks that do nothing """
def addCallback(self, *args, **kwargs):
return
def addErrback(self, *args, **kwargs):
return
def addCallbacks(self, *args, **kwargs):
return
class TreeRoutingTableTest(unittest.TestCase):
""" Test case for the RoutingTable class """
def setUp(self):
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node1')
self.nodeID = h.digest()
self.protocol = FakeRPCProtocol()
self.routingTable = lbrynet.dht.routingtable.TreeRoutingTable(self.nodeID)
def testDistance(self):
""" Test to see if distance method returns correct result"""
# testList holds a couple 3-tuple (variable1, variable2, result)
basicTestList = [('123456789','123456789', 0L), ('12345', '98765', 34527773184L)]
basicTestList = [('123456789', '123456789', 0L), ('12345', '98765', 34527773184L)]
for test in basicTestList:
result = self.routingTable.distance(test[0], test[1])
self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' % (test[2], result))
result = lbrynet.dht.node.Distance(test[0])(test[1])
self.failIf(result != test[2], 'Result of _distance() should be %s but %s returned' %
(test[2], result))
baseIp = '146.64.19.111'
ipTestList = ['146.64.29.222', '192.68.19.333']
distanceOne = self.routingTable.distance(baseIp, ipTestList[0])
distanceTwo = self.routingTable.distance(baseIp, ipTestList[1])
distanceOne = lbrynet.dht.node.Distance(baseIp)(ipTestList[0])
distanceTwo = lbrynet.dht.node.Distance(baseIp)(ipTestList[1])
self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' %
(ipTestList[0], baseIp, ipTestList[1]))
self.failIf(distanceOne > distanceTwo, '%s should be closer to the base ip %s than %s' % (ipTestList[0], baseIp, ipTestList[1]))
def testAddContact(self):
""" Tests if a contact can be added and retrieved correctly """
# Create the contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
@ -63,12 +71,14 @@ class TreeRoutingTableTest(unittest.TestCase):
self.routingTable.addContact(contact)
# ...and request the closest nodes to it (will retrieve it)
closestNodes = self.routingTable.findCloseNodes(contactID, lbrynet.dht.constants.k)
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1, got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing _findCloseNodes()')
self.failUnlessEqual(len(closestNodes), 1, 'Wrong amount of contacts returned; expected 1,'
' got %d' % len(closestNodes))
self.failUnless(contact in closestNodes, 'Added contact not found by issueing '
'_findCloseNodes()')
def testGetContact(self):
""" Tests if a specific existing contact can be retrieved correctly """
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
@ -77,9 +87,12 @@ class TreeRoutingTableTest(unittest.TestCase):
# ...and get it again
sameContact = self.routingTable.getContact(contactID)
self.failUnlessEqual(contact, sameContact, 'getContact() should return the same contact')
def testAddParentNodeAsContact(self):
""" Tests the routing table's behaviour when attempting to add its parent node as a contact """
"""
Tests the routing table's behaviour when attempting to add its parent node as a contact
"""
# Create a contact with the same ID as the local node's ID
contact = lbrynet.dht.contact.Contact(self.nodeID, '127.0.0.1', 91824, self.protocol)
# Now try to add it
@ -87,11 +100,11 @@ class TreeRoutingTableTest(unittest.TestCase):
# ...and request the closest nodes to it using FIND_NODE
closestNodes = self.routingTable.findCloseNodes(self.nodeID, lbrynet.dht.constants.k)
self.failIf(contact in closestNodes, 'Node added itself as a contact')
def testRemoveContact(self):
""" Tests contact removal """
# Create the contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node2')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.protocol)
@ -105,54 +118,73 @@ class TreeRoutingTableTest(unittest.TestCase):
def testSplitBucket(self):
""" Tests if the the routing table correctly dynamically splits k-buckets """
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'Initial k-bucket range should be 0 <= range < 2**160')
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, 2**384,
'Initial k-bucket range should be 0 <= range < 2**384')
# Add k contacts
for i in range(lbrynet.dht.constants.k):
h = hashlib.sha1()
h = hashlib.sha384()
h.update('remote node %d' % i)
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, but should not yet be split')
self.failUnlessEqual(len(self.routingTable._buckets), 1,
'Only k nodes have been added; the first k-bucket should now '
'be full, but should not yet be split')
# Now add 1 more contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('yet another remote node')
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 2, 'k+1 nodes have been added; the first k-bucket should have been split into two new buckets')
self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**160, 'K-bucket was split, but its range was not properly adjusted')
self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**160, 'K-bucket was split, but the second (new) bucket\'s max range was not set properly')
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax, self.routingTable._buckets[1].rangeMin, 'K-bucket was split, but the min/max ranges were not divided properly')
self.failUnlessEqual(len(self.routingTable._buckets), 2,
'k+1 nodes have been added; the first k-bucket should have been '
'split into two new buckets')
self.failIfEqual(self.routingTable._buckets[0].rangeMax, 2**384,
'K-bucket was split, but its range was not properly adjusted')
self.failUnlessEqual(self.routingTable._buckets[1].rangeMax, 2**384,
'K-bucket was split, but the second (new) bucket\'s '
'max range was not set properly')
self.failUnlessEqual(self.routingTable._buckets[0].rangeMax,
self.routingTable._buckets[1].rangeMin,
'K-bucket was split, but the min/max ranges were '
'not divided properly')
def testFullBucketNoSplit(self):
""" Test that a bucket is not split if it full, but does not cover the range containing the parent node's ID """
self.routingTable._parentNodeID = 21*'a' # more than 160 bits; this will not be in the range of _any_ k-bucket
"""
Test that a bucket is not split if it full, but does not cover the range
containing the parent node's ID
"""
self.routingTable._parentNodeID = 49 * 'a'
# more than 384 bits; this will not be in the range of _any_ k-bucket
# Add k contacts
for i in range(lbrynet.dht.constants.k):
h = hashlib.sha1()
h = hashlib.sha384()
h.update('remote node %d' % i)
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; the first k-bucket should now be full, and there should not be more than 1 bucket')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts)))
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'Only k nodes have been added; '
'the first k-bucket should now be '
'full, and there should not be '
'more than 1 bucket')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k,
'Bucket should have k contacts; expected %d got %d' %
(lbrynet.dht.constants.k,
len(self.routingTable._buckets[0]._contacts)))
# Now add 1 more contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('yet another remote node')
nodeID = h.digest()
contact = lbrynet.dht.contact.Contact(nodeID, '127.0.0.1', 91824, self.protocol)
self.routingTable.addContact(contact)
self.failUnlessEqual(len(self.routingTable._buckets), 1, 'There should not be more than 1 bucket, since the bucket should not have been split (parent node ID not in range)')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts), lbrynet.dht.constants.k, 'Bucket should have k contacts; expected %d got %d' % (lbrynet.dht.constants.k, len(self.routingTable._buckets[0]._contacts)))
self.failIf(contact in self.routingTable._buckets[0]._contacts, 'New contact should have been discarded (since RPC is faked in this test)')
self.failUnlessEqual(len(self.routingTable._buckets), 1,
'There should not be more than 1 bucket, since the bucket '
'should not have been split (parent node ID not in range)')
self.failUnlessEqual(len(self.routingTable._buckets[0]._contacts),
lbrynet.dht.constants.k, 'Bucket should have k contacts; '
'expected %d got %d' %
(lbrynet.dht.constants.k,
len(self.routingTable._buckets[0]._contacts)))
self.failIf(contact in self.routingTable._buckets[0]._contacts,
'New contact should have been discarded (since RPC is faked in this test)')
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TreeRoutingTableTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -46,6 +46,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
log_format = "%(funcName)s(): %(message)s"
logging.basicConfig(level=logging.CRITICAL, format=log_format)
def require_system(system):
def wrapper(fn):
return fn
@ -115,10 +116,10 @@ class LbryUploader(object):
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir,
lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous)
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager()
self.lbry_file_manager = EncryptedFileManager(
self.session, stream_info_manager, self.sd_identifier)
@ -218,12 +219,13 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd" + str(n),
node_id="abcd" + str(n),
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
stream_info_manager = TempEncryptedFileMetadataManager()
@ -330,12 +332,13 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="efgh",
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
if slow is True:
session.rate_limiter.set_ul_limit(2 ** 11)
@ -511,11 +514,11 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous)
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -605,12 +608,12 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1")
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
@ -687,11 +690,12 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd", peer_finder=peer_finder,
node_id="abcd", peer_finder=peer_finder,
hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager,
@ -809,11 +813,12 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
lbryid="abcd", peer_finder=peer_finder,
node_id="abcd", peer_finder=peer_finder,
hash_announcer=hash_announcer, blob_dir=blob_dir,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter,
wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
self.stream_info_manager = TempEncryptedFileMetadataManager()

View file

@ -9,7 +9,6 @@ from lbrynet.core import PeerManager
from lbrynet.core import RateLimiter
from lbrynet.core import Session
from lbrynet.core import StreamDescriptor
from lbrynet.dht.node import Node
from lbrynet.lbry_file import EncryptedFileMetadataManager
from lbrynet.lbry_file.client import EncryptedFileOptions
from lbrynet.file_manager import EncryptedFileCreator
@ -18,6 +17,7 @@ from lbrynet.file_manager import EncryptedFileManager
from lbrynet.tests import mocks
from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir
class TestReflector(unittest.TestCase):
def setUp(self):
mocks.mock_conf_settings(self)
@ -57,7 +57,7 @@ class TestReflector(unittest.TestCase):
self.session = Session.Session(
conf.settings['data_rate'],
db_dir=self.db_dir,
lbryid="abcd",
node_id="abcd",
peer_finder=peer_finder,
hash_announcer=hash_announcer,
blob_dir=self.blob_dir,
@ -66,7 +66,7 @@ class TestReflector(unittest.TestCase):
rate_limiter=rate_limiter,
wallet=wallet,
blob_tracker_class=mocks.BlobAvailabilityTracker,
dht_node_class=Node
external_ip="127.0.0.1"
)
self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(

View file

@ -72,12 +72,12 @@ class TestStreamify(TestCase):
os.mkdir(blob_dir)
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=self.is_generous
is_generous=self.is_generous, external_ip="127.0.0.1"
)
self.stream_info_manager = TempEncryptedFileMetadataManager()
@ -128,11 +128,11 @@ class TestStreamify(TestCase):
os.mkdir(blob_dir)
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker
blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1"
)
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)

View file

@ -18,6 +18,7 @@ def shell_command(command):
FNULL = open(os.devnull, 'w')
p = subprocess.Popen(command,shell=False,stdout=FNULL,stderr=subprocess.STDOUT)
def lbrynet_cli(commands):
cli_cmd=['lbrynet-cli']
for cmd in commands:
@ -65,7 +66,6 @@ class TestIntegration(unittest.TestCase):
out = json.loads(out)
self.assertTrue(out['is_running'])
def test_cli_docopts(self):
out,err = lbrynet_cli(['cli_test_command'])
self.assertEqual('',out)
@ -83,7 +83,6 @@ class TestIntegration(unittest.TestCase):
out = json.loads(out)
self.assertEqual([1,[],1,None,False,False], out)
out,err = lbrynet_cli(['cli_test_command','1', '--pos_arg2=2','--pos_arg3=3'])
out = json.loads(out)
self.assertEqual([1,[],2,3,False,False], out)
@ -93,7 +92,6 @@ class TestIntegration(unittest.TestCase):
# TODO: variable length arguments don't have guess_type() on them
self.assertEqual([1,['2','3'],None,None,False,False], out)
out,err = lbrynet_cli(['cli_test_command','1','-a'])
out = json.loads(out)
self.assertEqual([1,[],None,None,True,False], out)
@ -102,13 +100,10 @@ class TestIntegration(unittest.TestCase):
out = json.loads(out)
self.assertEqual([1,[],None,None,True,False], out)
out,err = lbrynet_cli(['cli_test_command','1','-a','-b'])
out = json.loads(out)
self.assertEqual([1,[],None,None,True,True], out)
def test_status(self):
out = lbrynet.status()
self.assertTrue(out['is_running'])

View file

@ -9,40 +9,41 @@ import unittest
from lbrynet.dht.msgtypes import RequestMessage, ResponseMessage, ErrorMessage
from lbrynet.dht.msgformat import MessageTranslator, DefaultFormat
class DefaultFormatTranslatorTest(unittest.TestCase):
""" Test case for the default message translator """
def setUp(self):
self.cases = ((RequestMessage('node1', 'rpcMethod',
{'arg1': 'a string', 'arg2': 123}, 'rpc1'),
self.cases = ((RequestMessage('1' * 48, 'rpcMethod',
{'arg1': 'a string', 'arg2': 123}, '1' * 20),
{DefaultFormat.headerType: DefaultFormat.typeRequest,
DefaultFormat.headerNodeID: 'node1',
DefaultFormat.headerMsgID: 'rpc1',
DefaultFormat.headerNodeID: '1' * 48,
DefaultFormat.headerMsgID: '1' * 20,
DefaultFormat.headerPayload: 'rpcMethod',
DefaultFormat.headerArgs: {'arg1': 'a string', 'arg2': 123}}),
(ResponseMessage('rpc2', 'node2', 'response'),
(ResponseMessage('2' * 20, '2' * 48, 'response'),
{DefaultFormat.headerType: DefaultFormat.typeResponse,
DefaultFormat.headerNodeID: 'node2',
DefaultFormat.headerMsgID: 'rpc2',
DefaultFormat.headerNodeID: '2' * 48,
DefaultFormat.headerMsgID: '2' * 20,
DefaultFormat.headerPayload: 'response'}),
(ErrorMessage('rpc3', 'node3',
(ErrorMessage('3' * 20, '3' * 48,
"<type 'exceptions.ValueError'>", 'this is a test exception'),
{DefaultFormat.headerType: DefaultFormat.typeError,
DefaultFormat.headerNodeID: 'node3',
DefaultFormat.headerMsgID: 'rpc3',
DefaultFormat.headerNodeID: '3' * 48,
DefaultFormat.headerMsgID: '3' * 20,
DefaultFormat.headerPayload: "<type 'exceptions.ValueError'>",
DefaultFormat.headerArgs: 'this is a test exception'}),
(ResponseMessage(
'rpc4', 'node4',
'4' * 20, '4' * 48,
[('H\x89\xb0\xf4\xc9\xe6\xc5`H>\xd5\xc2\xc5\xe8Od\xf1\xca\xfa\x82',
'127.0.0.1', 1919),
('\xae\x9ey\x93\xdd\xeb\xf1^\xff\xc5\x0f\xf8\xac!\x0e\x03\x9fY@{',
'127.0.0.1', 1921)]),
{DefaultFormat.headerType: DefaultFormat.typeResponse,
DefaultFormat.headerNodeID: 'node4',
DefaultFormat.headerMsgID: 'rpc4',
DefaultFormat.headerNodeID: '4' * 48,
DefaultFormat.headerMsgID: '4' * 20,
DefaultFormat.headerPayload:
[('H\x89\xb0\xf4\xc9\xe6\xc5`H>\xd5\xc2\xc5\xe8Od\xf1\xca\xfa\x82',
'127.0.0.1', 1919),
@ -81,13 +82,3 @@ class DefaultFormatTranslatorTest(unittest.TestCase):
'Message instance variable "%s" not translated correctly; '
'expected "%s", got "%s"' %
(key, msg.__dict__[key], translatedObj.__dict__[key]))
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(DefaultFormatTranslatorTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())

View file

@ -46,18 +46,23 @@ class CreateEncryptedFileTest(unittest.TestCase):
session, manager, filename, handle, key, iv_generator())
defer.returnValue(out)
@defer.inlineCallbacks
def test_can_create_file(self):
expected_stream_hash = ('41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c357b1acb7'
'42dd83151fb66393a7709e9f346260a4f4db6de10c25')
filename = 'test.file'
d = self.create_file(filename)
d.addCallback(self.assertEqual, expected_stream_hash)
return d
stream_hash = yield self.create_file(filename)
self.assertEqual(expected_stream_hash, stream_hash)
blobs = yield self.blob_manager.get_all_verified_blobs()
self.assertEqual(2, len(blobs))
num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs()
self.assertEqual(1, num_should_announce_blobs)
@defer.inlineCallbacks
def test_can_create_file_with_unicode_filename(self):
expected_stream_hash = ('d1da4258f3ce12edb91d7e8e160d091d3ab1432c2e55a6352dce0'
'2fd5adb86fe144e93e110075b5865fff8617776c6c0')
filename = u'☃.file'
d = self.create_file(filename)
d.addCallback(self.assertEqual, expected_stream_hash)
return d
stream_hash = yield self.create_file(filename)
self.assertEqual(expected_stream_hash, stream_hash)

View file

@ -18,6 +18,10 @@ from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed
from lbrynet.tests.util import is_android
import logging
logging.getLogger("lbryum").setLevel(logging.WARNING)
def get_test_daemon(data_rate=None, generous=True, with_fee=False):
if data_rate is None:
data_rate = conf.ADJUSTABLE_SETTINGS['data_rate'][1]
@ -71,7 +75,6 @@ class TestCostEst(trial.unittest.TestCase):
size = 10000000
correct_result = 4.5
daemon = get_test_daemon(generous=True, with_fee=True)
print daemon.get_est_cost("test", size)
self.assertEquals(daemon.get_est_cost("test", size).result, correct_result)
def test_fee_and_ungenerous_data(self):

View file

@ -13,9 +13,9 @@ GitPython==2.1.3
gmpy==1.17
jsonrpc==1.2
jsonrpclib==0.1.7
jsonschema==2.5.1
git+https://github.com/lbryio/lbryschema.git@v0.0.12rc1#egg=lbryschema
git+https://github.com/lbryio/lbryum.git@v3.1.9rc2#egg=lbryum
jsonschema==2.6.0
git+https://github.com/lbryio/lbryum.git@v3.1.10rc1#egg=lbryum
git+https://github.com/lbryio/lbryschema.git@v0.0.13rc1#egg=lbryschema
miniupnpc==1.9
pbkdf2==1.3
pycrypto==2.6.1

104
scripts/dht_monitor.py Normal file
View file

@ -0,0 +1,104 @@
import curses
import time
from jsonrpc.proxy import JSONRPCProxy
import logging
log = logging.getLogger(__name__)
log.addHandler(logging.FileHandler("dht contacts.log"))
# log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
stdscr = curses.initscr()
api = JSONRPCProxy.from_url("http://localhost:5279")
def init_curses():
curses.noecho()
curses.cbreak()
stdscr.nodelay(1)
stdscr.keypad(1)
def teardown_curses():
curses.nocbreak()
stdscr.keypad(0)
curses.echo()
curses.endwin()
def refresh(last_contacts, last_blobs):
height, width = stdscr.getmaxyx()
try:
routing_table_info = api.routing_table_get()
node_id = routing_table_info['node_id']
except:
node_id = "UNKNOWN"
routing_table_info = {
'buckets': {},
'contacts': [],
'blob_hashes': []
}
for y in range(height):
stdscr.addstr(y, 0, " " * (width - 1))
buckets = routing_table_info['buckets']
stdscr.addstr(0, 0, "node id: %s" % node_id)
stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" %
(len(buckets), len(routing_table_info['contacts']),
len(routing_table_info['blob_hashes'])))
y = 3
for i in sorted(buckets.keys()):
stdscr.addstr(y, 0, "bucket %s" % i)
y += 1
for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')):
stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['node_id'], h['address'],
len(h['blobs'])))
y += 1
y += 1
new_contacts = set(routing_table_info['contacts']) - last_contacts
lost_contacts = last_contacts - set(routing_table_info['contacts'])
if new_contacts:
for c in new_contacts:
log.debug("added contact %s", c)
if lost_contacts:
for c in lost_contacts:
log.info("lost contact %s", c)
new_blobs = set(routing_table_info['blob_hashes']) - last_blobs
lost_blobs = last_blobs - set(routing_table_info['blob_hashes'])
if new_blobs:
for c in new_blobs:
log.debug("added blob %s", c)
if lost_blobs:
for c in lost_blobs:
log.info("lost blob %s", c)
stdscr.addstr(y + 1, 0, str(time.time()))
stdscr.refresh()
return set(routing_table_info['contacts']), set(routing_table_info['blob_hashes'])
def do_main():
c = None
last_contacts, last_blobs = set(), set()
while c not in [ord('q'), ord('Q')]:
last_contacts, last_blobs = refresh(last_contacts, last_blobs)
c = stdscr.getch()
time.sleep(0.1)
def main():
try:
init_curses()
do_main()
finally:
teardown_curses()
if __name__ == "__main__":
main()

View file

@ -22,7 +22,7 @@ def join_network(udp_port, known_nodes):
lbryid = generate_id()
log.info('Creating node')
node = Node(udpPort=udp_port, lbryid=lbryid)
node = Node(udpPort=udp_port, node_id=lbryid)
log.info('Joining network')
yield node.joinNetwork(known_nodes)

View file

@ -150,7 +150,7 @@ if __name__ == '__main__':
# If you wish to have a pure Kademlia network, use the
# entangled.kademlia.node.Node class instead
print 'Creating Node'
node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid)
node = Node(udpPort=int(sys.argv[1]), node_id=lbryid)
# Schedule the node to join the Kademlia/Entangled DHT
node.joinNetwork(knownNodes)

View file

@ -51,7 +51,7 @@ def main(args=None):
session = Session.Session(
0,
db_dir=db_dir,
lbryid=utils.generate_id(),
node_id=utils.generate_id(),
blob_dir=blob_dir,
dht_node_port=4444,
known_dht_nodes=conf.settings['known_dht_nodes'],

View file

@ -1,82 +1,214 @@
#!/usr/bin/env python
#
# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# Thanks to Paul Cannon for IP-address resolution functions (taken from aspn.activestate.com)
"""
Launch a DHT node which can respond to RPC commands.
"""
import logging
import requests
import miniupnpc
import argparse
from lbrynet.dht.node import Node
from txjsonrpc.web import jsonrpc
from twisted.web import server
from copy import deepcopy
from twisted.internet import reactor, defer
from twisted.web import resource
from twisted.web.server import Site
from lbrynet import conf
from lbrynet.core.log_support import configure_console
from lbrynet.dht.error import TimeoutError
conf.initialize_settings()
log = logging.getLogger("dht tool")
configure_console()
log.setLevel(logging.INFO)
from lbrynet.dht.node import Node
from lbrynet.dht.contact import Contact
from lbrynet.daemon.auth.server import AuthJSONRPCServer
from lbrynet.core.utils import generate_id
def get_external_ip_and_setup_upnp():
try:
u = miniupnpc.UPnP()
u.discoverdelay = 200
u.discover()
u.selectigd()
if u.getspecificportmapping(4444, "UDP"):
u.deleteportmapping(4444, "UDP")
log.info("Removed UPnP redirect for UDP 4444.")
u.addportmapping(4444, 'UDP', u.lanaddr, 4444, 'LBRY DHT port', '')
log.info("got external ip from upnp")
return u.externalipaddress()
except Exception:
log.exception("derp")
r = requests.get('https://api.ipify.org', {'format': 'json'})
log.info("got external ip from ipify.org")
return r.json()['ip']
class RPCNode(jsonrpc.JSONRPC):
def __init__(self, node, shut_down_cb):
jsonrpc.JSONRPC.__init__(self)
self.node = node
self.shut_down_cb = shut_down_cb
class NodeRPC(AuthJSONRPCServer):
def __init__(self, lbryid, seeds, node_port, rpc_port):
AuthJSONRPCServer.__init__(self, False)
self.root = None
self.port = None
self.seeds = seeds
self.node_port = node_port
self.rpc_port = rpc_port
if lbryid:
lbryid = lbryid.decode('hex')
else:
lbryid = generate_id()
self.node_id = lbryid
self.external_ip = get_external_ip_and_setup_upnp()
self.node_port = node_port
def jsonrpc_total_dht_nodes(self):
return self.node.getApproximateTotalDHTNodes()
@defer.inlineCallbacks
def setup(self):
self.node = Node(node_id=self.node_id, udpPort=self.node_port,
externalIP=self.external_ip)
hosts = []
for hostname, hostport in self.seeds:
host_ip = yield reactor.resolve(hostname)
hosts.append((host_ip, hostport))
log.info("connecting to dht")
yield self.node.joinNetwork(tuple(hosts))
log.info("connected to dht")
if not self.announced_startup:
self.announced_startup = True
self.start_api()
log.info("lbry id: %s (%i bytes)", self.node.node_id.encode('hex'), len(self.node.node_id))
def jsonrpc_total_dht_hashes(self):
return self.node.getApproximateTotalHashes()
def start_api(self):
root = resource.Resource()
root.putChild('', self)
self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost')
log.info("started jsonrpc server")
def jsonrpc_stop(self):
self.shut_down_cb()
return "fine"
@defer.inlineCallbacks
def jsonrpc_node_id_set(self, node_id):
old_id = self.node.node_id
self.node.stop()
del self.node
self.node_id = node_id.decode('hex')
yield self.setup()
msg = "changed dht id from %s to %s" % (old_id.encode('hex'),
self.node.node_id.encode('hex'))
defer.returnValue(msg)
def jsonrpc_node_id_get(self):
return self._render_response(self.node.node_id.encode('hex'))
@defer.inlineCallbacks
def jsonrpc_peer_find(self, node_id):
node_id = node_id.decode('hex')
contact = yield self.node.findContact(node_id)
result = None
if contact:
result = (contact.address, contact.port)
defer.returnValue(result)
@defer.inlineCallbacks
def jsonrpc_peer_list_for_blob(self, blob_hash):
peers = yield self.node.getPeersForBlob(blob_hash.decode('hex'))
defer.returnValue(peers)
@defer.inlineCallbacks
def jsonrpc_ping(self, node_id):
contact_host = yield self.jsonrpc_peer_find(node_id=node_id)
if not contact_host:
defer.returnValue("failed to find node")
contact_ip, contact_port = contact_host
contact = Contact(node_id.decode('hex'), contact_ip, contact_port, self.node._protocol)
try:
result = yield contact.ping()
except TimeoutError:
self.node.removeContact(contact.id)
self.node._dataStore.removePeer(contact.id)
result = {'error': 'timeout'}
defer.returnValue(result)
def get_routing_table(self):
result = {}
data_store = deepcopy(self.node._dataStore._dict)
datastore_len = len(data_store)
hosts = {}
missing_contacts = []
if datastore_len:
for k, v in data_store.iteritems():
for value, lastPublished, originallyPublished, originalPublisherID in v:
try:
contact = self.node._routingTable.getContact(originalPublisherID)
except ValueError:
if originalPublisherID.encode('hex') not in missing_contacts:
missing_contacts.append(originalPublisherID.encode('hex'))
continue
if contact in hosts:
blobs = hosts[contact]
else:
blobs = []
blobs.append(k.encode('hex'))
hosts[contact] = blobs
contact_set = []
blob_hashes = []
result['buckets'] = {}
for i in range(len(self.node._routingTable._buckets)):
for contact in self.node._routingTable._buckets[i]._contacts:
contacts = result['buckets'].get(i, [])
if contact in hosts:
blobs = hosts[contact]
del hosts[contact]
else:
blobs = []
host = {
"address": contact.address,
"id": contact.id.encode("hex"),
"blobs": blobs,
}
for blob_hash in blobs:
if blob_hash not in blob_hashes:
blob_hashes.append(blob_hash)
contacts.append(host)
result['buckets'][i] = contacts
contact_set.append(contact.id.encode("hex"))
if hosts:
result['datastore extra'] = [
{
"id": host.id.encode('hex'),
"blobs": hosts[host],
}
for host in hosts]
result['missing contacts'] = missing_contacts
result['contacts'] = contact_set
result['blob hashes'] = blob_hashes
result['node id'] = self.node_id.encode('hex')
return result
def jsonrpc_routing_table_get(self):
return self._render_response(self.get_routing_table())
def main():
parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands")
parser.add_argument("node_port",
parser.add_argument("--node_port",
help=("The UDP port on which the node will listen for connections "
"from other dht nodes"),
type=int)
parser.add_argument("rpc_port",
type=int, default=4444)
parser.add_argument("--rpc_port",
help="The TCP port on which the node will listen for rpc commands",
type=int)
parser.add_argument("dht_bootstrap_host",
type=int, default=5280)
parser.add_argument("--bootstrap_host",
help="The IP of a DHT node to be used to bootstrap into the network",
nargs='?')
parser.add_argument("dht_bootstrap_port",
default='lbrynet1.lbry.io')
parser.add_argument("--node_id",
help="The IP of a DHT node to be used to bootstrap into the network",
default=None)
parser.add_argument("--bootstrap_port",
help="The port of a DHT node to be used to bootstrap into the network",
nargs='?', default=4000, type=int)
parser.add_argument("--rpc_ip_address",
help="The network interface on which to listen for rpc connections",
default="127.0.0.1")
default=4444, type=int)
args = parser.parse_args()
def start_rpc():
rpc_node = RPCNode(node, shut_down)
reactor.listenTCP(args.rpc_port, server.Site(rpc_node), interface=args.rpc_ip_address)
def shut_down():
d = defer.maybeDeferred(node.stop)
d.addBoth(lambda _: reactor.stop())
return d
known_nodes = []
if args.dht_bootstrap_host:
known_nodes.append((args.dht_bootstrap_host, args.dht_bootstrap_port))
node = Node(udpPort=args.node_port)
node.joinNetwork(known_nodes)
d = node._joinDeferred
d.addCallback(lambda _: start_rpc())
seeds = [(args.bootstrap_host, args.bootstrap_port)]
server = NodeRPC(args.node_id, seeds, args.node_port, args.rpc_port)
reactor.addSystemEventTrigger('after', 'startup', server.setup)
reactor.run()
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -53,7 +53,7 @@ def main(args=None):
session = Session.Session(
blob_data_payment_rate=0,
db_dir=db_dir,
lbryid=utils.generate_id(),
node_id=utils.generate_id(),
blob_dir=blob_dir,
dht_node_port=4444,
known_dht_nodes=conf.settings['known_dht_nodes'],

View file

@ -21,8 +21,8 @@ requires = [
'envparse',
'jsonrpc',
'jsonschema',
'lbryum==3.1.9rc2',
'lbryschema==0.0.12rc1',
'lbryum==3.1.10rc1',
'lbryschema==0.0.13rc1',
'miniupnpc',
'pycrypto',
'pyyaml',