Merge branch 'master' into update_jsonschema

This commit is contained in:
Jack Robison 2017-10-25 10:50:06 -04:00 committed by GitHub
commit e799ad17b4
14 changed files with 129 additions and 103 deletions

View file

@ -13,16 +13,22 @@ at anytime.
*
### Fixed
*
*
* Fixed slow startup for nodes with many lbry files
* Fixed setting the external ip on startup
* Fixed session startup not blocking on joining the dht
* Fixed several parsing bugs that prevented replacing dead dht contacts
* Fixed lbryid length validation
* Fixed an old print statement that polluted logs
* Fixed rpc id length for dht requests
### Deprecated
*
*
### Changed
* Use the first port available for the peer and dht ports, starting with the provided values (defaults of 3333 and 4444). This allows multiple lbrynet instances in a LAN with UPnP.
* Detect a UPnP redirect that didn't get cleaned up on a previous run and use it
* Bumped jsonschema requirement to 2.6.0
*
### Added
*

View file

@ -1,6 +1,6 @@
import logging
__version__ = "0.17.0"
__version__ = "0.17.1rc3"
version = tuple(__version__.split('.'))
logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -193,6 +193,31 @@ class Session(object):
log.debug("In _try_upnp")
def get_free_port(upnp, port, protocol):
# returns an existing mapping if it exists
mapping = upnp.getspecificportmapping(port, protocol)
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping
return get_free_port(upnp, port + 1, protocol)
def get_port_mapping(upnp, internal_port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
external_port = get_free_port(upnp, internal_port, protocol)
if isinstance(external_port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, external_port[1], protocol, upnp.lanaddr, internal_port)
return external_port[1], protocol
upnp.addportmapping(external_port, protocol, upnp.lanaddr, internal_port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, external_port,
protocol, upnp.lanaddr, internal_port)
return external_port, protocol
def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
@ -202,40 +227,15 @@ class Session(object):
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0':
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port is not None:
if u.getspecificportmapping(self.peer_port, 'TCP') is None:
u.addportmapping(
self.peer_port, 'TCP', u.lanaddr, self.peer_port,
'LBRY peer port', '')
self.upnp_redirects.append((self.peer_port, 'TCP'))
log.info("Set UPnP redirect for TCP port %d", self.peer_port)
else:
# see comment below
log.warning("UPnP redirect already set for TCP port %d", self.peer_port)
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port is not None:
if u.getspecificportmapping(self.dht_node_port, 'UDP') is None:
u.addportmapping(
self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port,
'LBRY DHT port', '')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
log.info("Set UPnP redirect for UDP port %d", self.dht_node_port)
else:
# TODO: check that the existing redirect was
# put up by an old lbrynet session before
# grabbing it if such a disconnected redirect
# exists, then upnp won't work unless the
# redirect is appended or is torn down and set
# back up. a bad shutdown of lbrynet could
# leave such a redirect up and cause problems
# on the next start. this could be
# problematic if a previous lbrynet session
# didn't make the redirect, and it was made by
# another application
log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port)
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
if self.peer_port:
self.upnp_redirects.append(get_port_mapping(u, self.peer_port, 'TCP',
'LBRY peer port'))
if self.dht_node_port:
self.upnp_redirects.append(get_port_mapping(u, self.dht_node_port, 'UDP',
'LBRY DHT port'))
return True
return False
@ -260,8 +260,7 @@ class Session(object):
addresses.append(value)
return addresses
def start_dht(addresses):
self.dht_node.joinNetwork(addresses)
def start_dht(join_network_result):
self.peer_finder.run_manage_loop()
self.hash_announcer.run_manage_loop()
return True
@ -283,6 +282,7 @@ class Session(object):
dl = defer.DeferredList(ds)
dl.addCallback(join_resolved_addresses)
dl.addCallback(self.dht_node.joinNetwork)
dl.addCallback(start_dht)
return dl

View file

@ -569,7 +569,8 @@ class Daemon(AuthJSONRPCServer):
peer_port=self.peer_port,
use_upnp=self.use_upnp,
wallet=wallet,
is_generous=conf.settings['is_generous_host']
is_generous=conf.settings['is_generous_host'],
external_ip=self.platform['ip']
)
self.startup_status = STARTUP_STAGES[2]

View file

@ -59,3 +59,5 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj
h = get_lbry_hash_obj()
key_bits = h.digest_size * 8 # 384 bits
rpc_id_length = 20

View file

@ -8,12 +8,17 @@
# may be created by processing this file with epydoc: http://epydoc.sf.net
from lbrynet.core.utils import generate_id
import constants
class Message(object):
""" Base class for messages - all "unknown" messages use this class """
def __init__(self, rpcID, nodeID):
if len(rpcID) != constants.rpc_id_length:
raise ValueError("invalid rpc id: %i bytes (expected 20)" % len(rpcID))
if len(nodeID) != constants.key_bits / 8:
raise ValueError("invalid node id: %i bytes (expected 48)" % len(nodeID))
self.id = rpcID
self.nodeID = nodeID
@ -23,7 +28,7 @@ class RequestMessage(Message):
def __init__(self, nodeID, method, methodArgs, rpcID=None):
if rpcID is None:
rpcID = generate_id()
rpcID = generate_id()[:constants.rpc_id_length]
Message.__init__(self, rpcID, nodeID)
self.request = method
self.args = methodArgs

View file

@ -484,8 +484,9 @@ class Node(object):
raise TypeError, 'No port available'
if 'lbryid' in value:
if len(value['lbryid']) > constants.key_bits:
raise ValueError, 'Invalid lbryid'
if len(value['lbryid']) != constants.key_bits / 8:
raise ValueError('Invalid lbryid (%i bytes): %s' % (len(value['lbryid']),
value['lbryid'].encode('hex')))
else:
compact_address = compact_ip + compact_port + value['lbryid']
else:
@ -752,10 +753,11 @@ class _IterativeFindHelper(object):
if testContact not in self.shortlist:
self.shortlist.append(testContact)
def removeFromShortlist(self, failure):
def removeFromShortlist(self, failure, deadContactID):
""" @type failure: twisted.python.failure.Failure """
failure.trap(protocol.TimeoutError)
deadContactID = failure.getErrorMessage()
if len(deadContactID) != constants.key_bits / 8:
raise ValueError("invalid lbry id")
if deadContactID in self.shortlist:
self.shortlist.remove(deadContactID)
return deadContactID
@ -825,7 +827,7 @@ class _IterativeFindHelper(object):
rpcMethod = getattr(contact, self.rpc)
df = rpcMethod(self.key, rawResponse=True)
df.addCallback(self.extendShortlist)
df.addErrback(self.removeFromShortlist)
df.addErrback(self.removeFromShortlist, contact.id)
df.addCallback(self.cancelActiveProbe)
df.addErrback(lambda _: log.exception('Failed to contact %s', contact))
self.already_contacted.append(contact.id)

View file

@ -205,14 +205,14 @@ class KademliaProtocol(protocol.DatagramProtocol):
return
try:
msgPrimitive = self._encoder.decode(datagram)
except encoding.DecodeError:
message = self._translator.fromPrimitive(msgPrimitive)
except (encoding.DecodeError, ValueError):
# We received some rubbish here
return
except IndexError:
log.warning("Couldn't decode dht datagram from %s", address)
return
message = self._translator.fromPrimitive(msgPrimitive)
remoteContact = Contact(message.nodeID, address[0], address[1], self)
now = time.time()
@ -422,8 +422,10 @@ class KademliaProtocol(protocol.DatagramProtocol):
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args)
else:
# No progress has been made
del self._partialMessagesProgress[messageID]
del self._partialMessages[messageID]
if messageID in self._partialMessagesProgress:
del self._partialMessagesProgress[messageID]
if messageID in self._partialMessages:
del self._partialMessages[messageID]
df.errback(TimeoutError(remoteContactID))
def _hasProgressBeenMade(self, messageID):

View file

@ -10,8 +10,8 @@ import random
from zope.interface import implements
import constants
import kbucket
import protocol
from interface import IRoutingTable
from error import TimeoutError
import logging
log = logging.getLogger(__name__)
@ -77,17 +77,18 @@ class TreeRoutingTable(object):
# the k-bucket. This implementation follows section
# 2.2 regarding this point.
def replaceContact(failure):
def replaceContact(failure, deadContactID):
""" Callback for the deferred PING RPC to see if the head
node in the k-bucket is still responding
@type failure: twisted.python.failure.Failure
"""
failure.trap(TimeoutError)
log.warning('==replacing contact==')
# Remove the old contact...
deadContactID = failure.getErrorMessage()
failure.trap(protocol.TimeoutError)
if len(deadContactID) != constants.key_bits / 8:
raise ValueError("invalid contact id")
log.debug("Replacing dead contact: %s", deadContactID.encode('hex'))
try:
# Remove the old contact...
self._buckets[bucketIndex].removeContact(deadContactID)
except ValueError:
# The contact has already been removed (probably due to a timeout)
@ -100,7 +101,7 @@ class TreeRoutingTable(object):
df = head_contact.ping()
# If there's an error (i.e. timeout), remove the head
# contact, and append the new one
df.addErrback(replaceContact)
df.addErrback(replaceContact, head_contact.id)
def findCloseNodes(self, key, count, _rpcNodeID=None):
""" Finds a number of known nodes closest to the node/value with the

View file

@ -53,9 +53,9 @@ class EncryptedFileManager(object):
def setup(self):
yield self._open_db()
yield self._add_to_sd_identifier()
yield self._start_lbry_files()
if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
# don't block on starting the lbry files
self._start_lbry_files()
log.info("Started file manager")
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
@ -119,6 +119,9 @@ class EncryptedFileManager(object):
self._set_options_and_restore(rowid, stream_hash, options)
for rowid, stream_hash, options in files_and_options
])
if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
log.info("Started %i lbry files", len(self.lbry_files))
@defer.inlineCallbacks

View file

@ -62,6 +62,7 @@ class NodeDataTest(unittest.TestCase):
self.cases.append((h.digest(), 5000+2*i))
self.cases.append((h.digest(), 5001+2*i))
@defer.inlineCallbacks
def testStore(self):
""" Tests if the node can store (and privately retrieve) some data """
for key, value in self.cases:
@ -70,7 +71,7 @@ class NodeDataTest(unittest.TestCase):
'lbryid': self.contact.id,
'token': self.token
}
self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact)
yield self.node.store(key, request, self.contact.id, _rpcNodeContact=self.contact)
for key, value in self.cases:
expected_result = self.contact.compact_ip() + str(struct.pack('>H', value)) + \
self.contact.id
@ -90,7 +91,7 @@ class NodeContactTest(unittest.TestCase):
""" Tests if a contact can be added and retrieved correctly """
import lbrynet.dht.contact
# Create the contact
h = hashlib.sha1()
h = hashlib.sha384()
h.update('node1')
contactID = h.digest()
contact = lbrynet.dht.contact.Contact(contactID, '127.0.0.1', 91824, self.node._protocol)
@ -133,6 +134,10 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
def sendRPC(self, contact, method, args, rawResponse=False):
""" Fake RPC protocol; allows entangled.kademlia.contact.Contact objects to "send" RPCs"""
h = hashlib.sha384()
h.update('rpcId')
rpc_id = h.digest()[:20]
if method == "findNode":
# get the specific contacts closest contacts
closestContacts = []
@ -144,7 +149,8 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
# Pack the closest contacts into a ResponseMessage
for closeContact in closestContactsList:
closestContacts.append((closeContact.id, closeContact.address, closeContact.port))
message = ResponseMessage("rpcId", contact.id, closestContacts)
message = ResponseMessage(rpc_id, contact.id, closestContacts)
df = defer.Deferred()
df.callback((message, (contact.address, contact.port)))
return df
@ -171,7 +177,7 @@ class FakeRPCProtocol(protocol.DatagramProtocol):
response = closestContacts
# Create the response message
message = ResponseMessage("rpcId", contact.id, response)
message = ResponseMessage(rpc_id, contact.id, response)
df = defer.Deferred()
df.callback((message, (contact.address, contact.port)))
return df
@ -189,7 +195,10 @@ class NodeLookupTest(unittest.TestCase):
# Note: The reactor is never started for this test. All deferred calls run sequentially,
# since there is no asynchronous network communication
# create the node to be tested in isolation
self.node = lbrynet.dht.node.Node('12345678901234567800', 4000, None, None, self._protocol)
h = hashlib.sha384()
h.update('node1')
node_id = str(h.digest())
self.node = lbrynet.dht.node.Node(node_id, 4000, None, None, self._protocol)
self.updPort = 81173
self.contactsAmount = 80
# Reinitialise the routing table
@ -198,16 +207,16 @@ class NodeLookupTest(unittest.TestCase):
# create 160 bit node ID's for test purposes
self.testNodeIDs = []
idNum = int(self.node.node_id)
idNum = int(self.node.node_id.encode('hex'), 16)
for i in range(self.contactsAmount):
# create the testNodeIDs in ascending order, away from the actual node ID,
# with regards to the distance metric
self.testNodeIDs.append(idNum + i + 1)
self.testNodeIDs.append(str("%X" % (idNum + i + 1)).decode('hex'))
# generate contacts
self.contacts = []
for i in range(self.contactsAmount):
contact = lbrynet.dht.contact.Contact(str(self.testNodeIDs[i]), "127.0.0.1",
contact = lbrynet.dht.contact.Contact(self.testNodeIDs[i], "127.0.0.1",
self.updPort + i + 1, self._protocol)
self.contacts.append(contact)
@ -241,23 +250,24 @@ class NodeLookupTest(unittest.TestCase):
lbrynet.dht.datastore.DictDataStore()))
self._protocol.createNetwork(contacts_with_datastores)
@defer.inlineCallbacks
def testNodeBootStrap(self):
""" Test bootstrap with the closest possible contacts """
df = self.node._iterativeFind(self.node.node_id, self.contacts[0:8])
activeContacts = yield self.node._iterativeFind(self.node.node_id, self.contacts[0:8])
# Set the expected result
expectedResult = []
expectedResult = set()
for item in self.contacts[0:6]:
expectedResult.append(item.id)
expectedResult.add(item.id)
# Get the result from the deferred
activeContacts = df.result
# Check the length of the active contacts
self.failUnlessEqual(activeContacts.__len__(), expectedResult.__len__(),
"More active contacts should exist, there should be %d "
"contacts" % expectedResult.__len__())
"contacts but there are %d" % (len(expectedResult),
len(activeContacts)))
# Check that the received active contacts are the same as the input contacts
self.failUnlessEqual(activeContacts, expectedResult,
self.failUnlessEqual({contact.id for contact in activeContacts}, expectedResult,
"Active should only contain the closest possible contacts"
" which were used as input for the boostrap")

View file

@ -16,7 +16,7 @@ class KademliaProtocolTest(unittest.TestCase):
def setUp(self):
del lbrynet.dht.protocol.reactor
lbrynet.dht.protocol.reactor = twisted.internet.selectreactor.SelectReactor()
self.node = Node(node_id='node1', udpPort=9182, externalIP="127.0.0.1")
self.node = Node(node_id='1' * 48, udpPort=9182, externalIP="127.0.0.1")
self.protocol = lbrynet.dht.protocol.KademliaProtocol(self.node)
def testReactor(self):
@ -39,7 +39,7 @@ class KademliaProtocolTest(unittest.TestCase):
lbrynet.dht.constants.rpcAttempts = 1
lbrynet.dht.constants.rpcTimeout = 1
self.node.ping = fake_ping
deadContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
deadContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(deadContact)
# Make sure the contact was added
self.failIf(deadContact not in self.node.contacts,
@ -73,7 +73,7 @@ class KademliaProtocolTest(unittest.TestCase):
def testRPCRequest(self):
""" Tests if a valid RPC request is executed and responded to correctly """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
@ -105,7 +105,7 @@ class KademliaProtocolTest(unittest.TestCase):
Verifies that a RPC request for an existing but unpublished
method is denied, and that the associated (remote) exception gets
raised locally """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None
@ -126,7 +126,7 @@ class KademliaProtocolTest(unittest.TestCase):
# Publish the "local" node on the network
lbrynet.dht.protocol.reactor.listenUDP(9182, self.protocol)
# Simulate the RPC
df = remoteContact.pingNoRPC()
df = remoteContact.not_a_rpc_function()
df.addCallback(handleResult)
df.addErrback(handleError)
df.addBoth(lambda _: lbrynet.dht.protocol.reactor.stop())
@ -139,7 +139,7 @@ class KademliaProtocolTest(unittest.TestCase):
def testRPCRequestArgs(self):
""" Tests if an RPC requiring arguments is executed correctly """
remoteContact = lbrynet.dht.contact.Contact('node2', '127.0.0.1', 9182, self.protocol)
remoteContact = lbrynet.dht.contact.Contact('2' * 48, '127.0.0.1', 9182, self.protocol)
self.node.addContact(remoteContact)
self.error = None

View file

@ -27,6 +27,9 @@ class FakeDeferred(object):
def addErrback(self, *args, **kwargs):
return
def addCallbacks(self, *args, **kwargs):
return
class TreeRoutingTableTest(unittest.TestCase):
""" Test case for the RoutingTable class """

View file

@ -9,40 +9,41 @@ import unittest
from lbrynet.dht.msgtypes import RequestMessage, ResponseMessage, ErrorMessage
from lbrynet.dht.msgformat import MessageTranslator, DefaultFormat
class DefaultFormatTranslatorTest(unittest.TestCase):
""" Test case for the default message translator """
def setUp(self):
self.cases = ((RequestMessage('node1', 'rpcMethod',
{'arg1': 'a string', 'arg2': 123}, 'rpc1'),
self.cases = ((RequestMessage('1' * 48, 'rpcMethod',
{'arg1': 'a string', 'arg2': 123}, '1' * 20),
{DefaultFormat.headerType: DefaultFormat.typeRequest,
DefaultFormat.headerNodeID: 'node1',
DefaultFormat.headerMsgID: 'rpc1',
DefaultFormat.headerNodeID: '1' * 48,
DefaultFormat.headerMsgID: '1' * 20,
DefaultFormat.headerPayload: 'rpcMethod',
DefaultFormat.headerArgs: {'arg1': 'a string', 'arg2': 123}}),
(ResponseMessage('rpc2', 'node2', 'response'),
(ResponseMessage('2' * 20, '2' * 48, 'response'),
{DefaultFormat.headerType: DefaultFormat.typeResponse,
DefaultFormat.headerNodeID: 'node2',
DefaultFormat.headerMsgID: 'rpc2',
DefaultFormat.headerNodeID: '2' * 48,
DefaultFormat.headerMsgID: '2' * 20,
DefaultFormat.headerPayload: 'response'}),
(ErrorMessage('rpc3', 'node3',
(ErrorMessage('3' * 20, '3' * 48,
"<type 'exceptions.ValueError'>", 'this is a test exception'),
{DefaultFormat.headerType: DefaultFormat.typeError,
DefaultFormat.headerNodeID: 'node3',
DefaultFormat.headerMsgID: 'rpc3',
DefaultFormat.headerNodeID: '3' * 48,
DefaultFormat.headerMsgID: '3' * 20,
DefaultFormat.headerPayload: "<type 'exceptions.ValueError'>",
DefaultFormat.headerArgs: 'this is a test exception'}),
(ResponseMessage(
'rpc4', 'node4',
'4' * 20, '4' * 48,
[('H\x89\xb0\xf4\xc9\xe6\xc5`H>\xd5\xc2\xc5\xe8Od\xf1\xca\xfa\x82',
'127.0.0.1', 1919),
('\xae\x9ey\x93\xdd\xeb\xf1^\xff\xc5\x0f\xf8\xac!\x0e\x03\x9fY@{',
'127.0.0.1', 1921)]),
{DefaultFormat.headerType: DefaultFormat.typeResponse,
DefaultFormat.headerNodeID: 'node4',
DefaultFormat.headerMsgID: 'rpc4',
DefaultFormat.headerNodeID: '4' * 48,
DefaultFormat.headerMsgID: '4' * 20,
DefaultFormat.headerPayload:
[('H\x89\xb0\xf4\xc9\xe6\xc5`H>\xd5\xc2\xc5\xe8Od\xf1\xca\xfa\x82',
'127.0.0.1', 1919),
@ -81,13 +82,3 @@ class DefaultFormatTranslatorTest(unittest.TestCase):
'Message instance variable "%s" not translated correctly; '
'expected "%s", got "%s"' %
(key, msg.__dict__[key], translatedObj.__dict__[key]))
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(DefaultFormatTranslatorTest))
return suite
if __name__ == '__main__':
# If this module is executed from the commandline, run all its tests
unittest.TextTestRunner().run(suite())