From 850f51140e894c712c1aa52083181affa2165f1c Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Fri, 31 Mar 2017 13:32:43 -0400 Subject: [PATCH] formatting --- .gitignore | 1 + lbrynet/dht/constants.py | 10 +++--- lbrynet/dht/contact.py | 3 ++ lbrynet/dht/datastore.py | 6 +++- lbrynet/dht/encoding.py | 22 +++++++------ lbrynet/dht/hashwatcher.py | 1 - lbrynet/dht/kbucket.py | 2 ++ lbrynet/dht/msgformat.py | 5 ++- lbrynet/dht/msgtypes.py | 5 +++ lbrynet/dht/node.py | 43 ++++++++++++++----------- lbrynet/dht/protocol.py | 26 +++++++-------- lbrynet/dht/routingtable.py | 43 ++++++++++++++++--------- lbrynet/lbrynet_daemon/DaemonControl.py | 3 ++ 13 files changed, 105 insertions(+), 65 deletions(-) diff --git a/.gitignore b/.gitignore index 2940c971f..049067011 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ *.log *.pem *.decTest +*.prof .#* /build diff --git a/lbrynet/dht/constants.py b/lbrynet/dht/constants.py index 9c1119546..079fc9787 100644 --- a/lbrynet/dht/constants.py +++ b/lbrynet/dht/constants.py @@ -28,14 +28,14 @@ rpcTimeout = 5 iterativeLookupDelay = rpcTimeout / 2 #: If a k-bucket has not been used for this amount of time, refresh it (in seconds) -refreshTimeout = 3600 # 1 hour +refreshTimeout = 3600 # 1 hour #: The interval at which nodes replicate (republish/refresh) data they are holding replicateInterval = refreshTimeout # The time it takes for data to expire in the network; the original publisher of the data # will also republish the data at this time if it is still valid -dataExpireTimeout = 86400 # 24 hours +dataExpireTimeout = 86400 # 24 hours -tokenSecretChangeInterval = 300 # 5 minutes +tokenSecretChangeInterval = 300 # 5 minutes peer_request_timeout = 10 @@ -43,10 +43,10 @@ peer_request_timeout = 10 #: The interval in which the node should check its whether any buckets need refreshing, #: or whether any data needs to be republished (in seconds) -checkRefreshInterval = refreshTimeout/5 +checkRefreshInterval = refreshTimeout / 5 #: Max size of a single UDP datagram, in bytes. If a message is larger than this, it will #: be spread accross several UDP packets. -udpDatagramMaxSize = 8192 # 8 KB +udpDatagramMaxSize = 8192 # 8 KB key_bits = 384 diff --git a/lbrynet/dht/contact.py b/lbrynet/dht/contact.py index ae83f311b..6109d9f9a 100644 --- a/lbrynet/dht/contact.py +++ b/lbrynet/dht/contact.py @@ -14,6 +14,7 @@ class Contact(object): This class contains information on a single remote contact, and also provides a direct RPC API to the remote node which it represents """ + def __init__(self, id, ipAddress, udpPort, networkProtocol, firstComm=0): self.id = id self.address = ipAddress @@ -60,6 +61,8 @@ class Contact(object): This happens via this contact's C{_networkProtocol} object (i.e. the host Node's C{_protocol} object). """ + def _sendRPC(*args, **kwargs): return self._networkProtocol.sendRPC(self, name, args, **kwargs) + return _sendRPC diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index 8983f2bb6..bdaf47644 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -12,21 +12,23 @@ import time import constants - class DataStore(UserDict.DictMixin): """ Interface for classes implementing physical storage (for data published via the "STORE" RPC) for the Kademlia DHT @note: This provides an interface for a dict-like object """ + def keys(self): """ Return a list of the keys in this data store """ def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): pass + class DictDataStore(DataStore): """ A datastore using an in-memory Python dictionary """ + def __init__(self): # Dictionary format: # { : (, , ) } @@ -38,10 +40,12 @@ class DictDataStore(DataStore): def removeExpiredPeers(self): now = int(time.time()) + def notExpired(peer): if (now - peer[2]) > constants.dataExpireTimeout: return False return True + for key in self._dict.keys(): unexpired_peers = filter(notExpired, self._dict[key]) self._dict[key] = unexpired_peers diff --git a/lbrynet/dht/encoding.py b/lbrynet/dht/encoding.py index 2ef8d2e12..55bb68fb7 100644 --- a/lbrynet/dht/encoding.py +++ b/lbrynet/dht/encoding.py @@ -12,12 +12,14 @@ class DecodeError(Exception): fails """ + class Encoding(object): """ Interface for RPC message encoders/decoders All encoding implementations used with this library should inherit and implement this. """ + def encode(self, data): """ Encode the specified data @@ -31,6 +33,7 @@ class Encoding(object): @return: The encoded data @rtype: str """ + def decode(self, data): """ Decode the specified data string @@ -40,6 +43,7 @@ class Encoding(object): @return: The decoded data (in its correct type) """ + class Bencode(Encoding): """ Implementation of a Bencode-based algorithm (Bencode is the encoding algorithm used by Bittorrent). @@ -112,15 +116,15 @@ class Bencode(Encoding): Do not call this; use C{decode()} instead """ if data[startIndex] == 'i': - endPos = data[startIndex:].find('e')+startIndex - return (int(data[startIndex+1:endPos]), endPos+1) + endPos = data[startIndex:].find('e') + startIndex + return (int(data[startIndex + 1:endPos]), endPos + 1) elif data[startIndex] == 'l': startIndex += 1 decodedList = [] while data[startIndex] != 'e': listData, startIndex = Bencode._decodeRecursive(data, startIndex) decodedList.append(listData) - return (decodedList, startIndex+1) + return (decodedList, startIndex + 1) elif data[startIndex] == 'd': startIndex += 1 decodedDict = {} @@ -131,19 +135,19 @@ class Bencode(Encoding): return (decodedDict, startIndex) elif data[startIndex] == 'f': # This (float data type) is a non-standard extension to the original Bencode algorithm - endPos = data[startIndex:].find('e')+startIndex - return (float(data[startIndex+1:endPos]), endPos+1) + endPos = data[startIndex:].find('e') + startIndex + return (float(data[startIndex + 1:endPos]), endPos + 1) elif data[startIndex] == 'n': # This (None/NULL data type) is a non-standard extension # to the original Bencode algorithm - return (None, startIndex+1) + return (None, startIndex + 1) else: - splitPos = data[startIndex:].find(':')+startIndex + splitPos = data[startIndex:].find(':') + startIndex try: length = int(data[startIndex:splitPos]) except ValueError, e: raise DecodeError, e - startIndex = splitPos+1 - endPos = startIndex+length + startIndex = splitPos + 1 + endPos = startIndex + length bytes = data[startIndex:endPos] return (bytes, endPos) diff --git a/lbrynet/dht/hashwatcher.py b/lbrynet/dht/hashwatcher.py index f7270eea2..f55185f82 100644 --- a/lbrynet/dht/hashwatcher.py +++ b/lbrynet/dht/hashwatcher.py @@ -1,4 +1,3 @@ - from collections import Counter import datetime diff --git a/lbrynet/dht/kbucket.py b/lbrynet/dht/kbucket.py index 2ccb088d2..227fec409 100644 --- a/lbrynet/dht/kbucket.py +++ b/lbrynet/dht/kbucket.py @@ -9,6 +9,7 @@ import constants + class BucketFull(Exception): """ Raised when the bucket is full """ @@ -16,6 +17,7 @@ class BucketFull(Exception): class KBucket(object): """ Description - later """ + def __init__(self, rangeMin, rangeMax): """ @param rangeMin: The lower boundary for the range in the n-bit ID diff --git a/lbrynet/dht/msgformat.py b/lbrynet/dht/msgformat.py index 027a91d5f..ae9cab435 100644 --- a/lbrynet/dht/msgformat.py +++ b/lbrynet/dht/msgformat.py @@ -9,6 +9,7 @@ import msgtypes + class MessageTranslator(object): """ Interface for RPC message translators/formatters @@ -16,6 +17,7 @@ class MessageTranslator(object): the classes used internally by this Kademlia implementation and the actual data that is transmitted between nodes. """ + def fromPrimitive(self, msgPrimitive): """ Create an RPC Message from a message's string representation @@ -37,6 +39,7 @@ class MessageTranslator(object): @rtype: str, int, list or dict """ + class DefaultFormat(MessageTranslator): """ The default on-the-wire message format for this library """ typeRequest, typeResponse, typeError = range(3) @@ -64,7 +67,7 @@ class DefaultFormat(MessageTranslator): return msg def toPrimitive(self, message): - msg = {self.headerMsgID: message.id, + msg = {self.headerMsgID: message.id, self.headerNodeID: message.nodeID} if isinstance(message, msgtypes.RequestMessage): msg[self.headerType] = self.typeRequest diff --git a/lbrynet/dht/msgtypes.py b/lbrynet/dht/msgtypes.py index a2365b501..b24f5ced4 100644 --- a/lbrynet/dht/msgtypes.py +++ b/lbrynet/dht/msgtypes.py @@ -10,8 +10,10 @@ import hashlib import random + class Message(object): """ Base class for messages - all "unknown" messages use this class """ + def __init__(self, rpcID, nodeID): self.id = rpcID self.nodeID = nodeID @@ -19,6 +21,7 @@ class Message(object): class RequestMessage(Message): """ Message containing an RPC request """ + def __init__(self, nodeID, method, methodArgs, rpcID=None): if rpcID == None: hash = hashlib.sha384() @@ -31,6 +34,7 @@ class RequestMessage(Message): class ResponseMessage(Message): """ Message containing the result from a successful RPC request """ + def __init__(self, rpcID, nodeID, response): Message.__init__(self, rpcID, nodeID) self.response = response @@ -38,6 +42,7 @@ class ResponseMessage(Message): class ErrorMessage(ResponseMessage): """ Message containing the error from an unsuccessful RPC request """ + def __init__(self, rpcID, nodeID, exceptionType, errorMessage): ResponseMessage.__init__(self, rpcID, nodeID, errorMessage) if isinstance(exceptionType, type): diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 6dd6cadd3..0befa359b 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -27,7 +27,6 @@ from contact import Contact from hashwatcher import HashWatcher import logging - log = logging.getLogger(__name__) @@ -51,6 +50,7 @@ class Node(object): In Entangled, all interactions with the Kademlia network by a client application is performed via this class (or a subclass). """ + def __init__(self, id=None, udpPort=4000, dataStore=None, routingTableClass=None, networkProtocol=None, lbryid=None, externalIP=None): @@ -82,7 +82,7 @@ class Node(object): self.id = self._generateID() self.lbryid = lbryid self.port = udpPort - self._listeningPort = None # object implementing Twisted + self._listeningPort = None # object implementing Twisted # IListeningPort This will contain a deferred created when # joining the network, to enable publishing/retrieving # information from the DHT as soon as the node is part of the @@ -126,7 +126,7 @@ class Node(object): self._listeningPort.stopListening() def stop(self): - #cancel callLaters: + # cancel callLaters: if self.next_refresh_call is not None: self.next_refresh_call.cancel() self.next_refresh_call = None @@ -137,7 +137,6 @@ class Node(object): self._listeningPort.stopListening() self.hash_watcher.stop() - def joinNetwork(self, knownNodeAddresses=None): """ Causes the Node to join the Kademlia network; normally, this should be called before any other DHT operations. @@ -156,7 +155,7 @@ class Node(object): import traceback log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) raise ValueError("%s lbrynet may already be running." % str(e)) - #IGNORE:E1101 + # IGNORE:E1101 # Create temporary contact information for the list of addresses of known nodes if knownNodeAddresses != None: bootstrapContacts = [] @@ -167,10 +166,10 @@ class Node(object): bootstrapContacts = None # Initiate the Kademlia joining sequence - perform a search for this node's own ID self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts) -# #TODO: Refresh all k-buckets further away than this node's closest neighbour + # #TODO: Refresh all k-buckets further away than this node's closest neighbour # Start refreshing k-buckets periodically, if necessary self.next_refresh_call = twisted.internet.reactor.callLater( - constants.checkRefreshInterval, self._refreshNode) #IGNORE:E1101 + constants.checkRefreshInterval, self._refreshNode) # IGNORE:E1101 self.hash_watcher.tick() return self._joinDeferred @@ -187,7 +186,7 @@ class Node(object): # estimate! bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)] num_in_bucket = len(bucket._contacts) - factor = (2**constants.key_bits) / (bucket.rangeMax - bucket.rangeMin) + factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin) return num_in_bucket * factor def getApproximateTotalHashes(self): @@ -253,7 +252,7 @@ class Node(object): # The "raw response" tuple contains the response message, # and the originating address info responseMsg = responseTuple[0] - originAddress = responseTuple[1] # tuple: (ip adress, udp port) + originAddress = responseTuple[1] # tuple: (ip adress, udp port) # Make sure the responding node is valid, and abort the operation if it isn't if not responseMsg.nodeID in known_nodes: return responseMsg.nodeID @@ -351,10 +350,11 @@ class Node(object): """ # Prepare a callback for this operation outerDf = defer.Deferred() + def checkResult(result): if type(result) == dict: # We have found the value; now see who was the closest contact without it... - # ...and store the key/value pair + # ...and store the key/value pair outerDf.callback(result) else: # The value wasn't found, but a list of contacts was returned @@ -416,6 +416,7 @@ class Node(object): return contact else: return None + df = self.iterativeFindNode(contactID) df.addCallback(parseResults) return df @@ -466,10 +467,10 @@ class Node(object): compact_ip = contact.compact_ip() else: return 'Not OK' - #raise TypeError, 'No contact info available' + # raise TypeError, 'No contact info available' if ((self_store is False) and - (not 'token' in value or not self.verify_token(value['token'], compact_ip))): + (not 'token' in value or not self.verify_token(value['token'], compact_ip))): raise ValueError('Invalid or missing token') if 'port' in value: @@ -490,7 +491,7 @@ class Node(object): raise TypeError, 'No lbryid given' now = int(time.time()) - originallyPublished = now# - age + originallyPublished = now # - age self._dataStore.addPeerToBlob( key, compact_address, now, originallyPublished, originalPublisherID) return 'OK' @@ -618,6 +619,7 @@ class Node(object): def _refreshRoutingTable(self): nodeIDs = self._routingTable.getRefreshList(0, False) outerDf = defer.Deferred() + def searchForNextNodeID(dfResult=None): if len(nodeIDs) > 0: searchID = nodeIDs.pop() @@ -626,16 +628,16 @@ class Node(object): else: # If this is reached, we have finished refreshing the routing table outerDf.callback(None) + # Start the refreshing cycle searchForNextNodeID() return outerDf - def _scheduleNextNodeRefresh(self, *args): self.next_refresh_call = twisted.internet.reactor.callLater( constants.checkRefreshInterval, self._refreshNode) - #args put here because _refreshRoutingTable does outerDF.callback(None) + # args put here because _refreshRoutingTable does outerDF.callback(None) def _removeExpiredPeers(self, *args): df = twisted.internet.threads.deferToThread(self._dataStore.removeExpiredPeers) return df @@ -679,7 +681,7 @@ class _IterativeFindHelper(object): # The "raw response" tuple contains the response message, # and the originating address info responseMsg = responseTuple[0] - originAddress = responseTuple[1] # tuple: (ip adress, udp port) + originAddress = responseTuple[1] # tuple: (ip adress, udp port) # Make sure the responding node is valid, and abort the operation if it isn't if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id: return responseMsg.nodeID @@ -693,7 +695,7 @@ class _IterativeFindHelper(object): self.already_contacted.append(responseMsg.nodeID) # Now grow extend the (unverified) shortlist with the returned contacts result = responseMsg.response - #TODO: some validation on the result (for guarding against attacks) + # TODO: some validation on the result (for guarding against attacks) # If we are looking for a value, first see if this result is the value # we are looking for before treating it as a list of contact triples if self.find_value is True and self.key in result and not 'contacts' in result: @@ -756,7 +758,7 @@ class _IterativeFindHelper(object): def cancelActiveProbe(self, contactID): self.active_probes.pop() - if len(self.active_probes) <= constants.alpha/2 and len(self.pending_iteration_calls): + if len(self.active_probes) <= constants.alpha / 2 and len(self.pending_iteration_calls): # Force the iteration self.pending_iteration_calls[0].cancel() del self.pending_iteration_calls[0] @@ -804,7 +806,7 @@ class _IterativeFindHelper(object): # Schedule the next iteration if there are any active # calls (Kademlia uses loose parallelism) call = twisted.internet.reactor.callLater( - constants.iterativeLookupDelay, self.searchIteration) #IGNORE:E1101 + constants.iterativeLookupDelay, self.searchIteration) # IGNORE:E1101 self.pending_iteration_calls.append(call) # Check for a quick contact response that made an update to the shortList elif prevShortlistLength < len(self.shortlist): @@ -850,6 +852,7 @@ class Distance(object): Frequently we re-use one of the points so as an optimization we pre-calculate the long value of that point. """ + def __init__(self, key): self.key = key self.val_key_one = long(key.encode('hex'), 16) @@ -879,6 +882,7 @@ class ExpensiveSort(object): key: callable, like `key` in normal python sort attr: the attribute name used to cache the value on each item. """ + def __init__(self, to_sort, key, attr='__value'): self.to_sort = to_sort self.key = key @@ -923,5 +927,6 @@ def main(): node.joinNetwork(known_nodes) twisted.internet.reactor.run() + if __name__ == '__main__': main() diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index a58d7ba8b..66ceb5de4 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -24,13 +24,13 @@ import msgtypes import msgformat from contact import Contact - reactor = twisted.internet.reactor log = logging.getLogger(__name__) class TimeoutError(Exception): """ Raised when a RPC times out """ + def __init__(self, remote_contact_id): # remote_contact_id is a binary blob so we need to convert it # into something more readable @@ -40,8 +40,8 @@ class TimeoutError(Exception): class Delay(object): - maxToSendDelay = 10**-3 #0.05 - minToSendDelay = 10**-5 #0.01 + maxToSendDelay = 10 ** -3 # 0.05 + minToSendDelay = 10 ** -5 # 0.01 def __init__(self, start=0): self._next = start @@ -62,8 +62,7 @@ class Delay(object): class KademliaProtocol(protocol.DatagramProtocol): """ Implements all low-level network-related functions of a Kademlia node """ - msgSizeLimit = constants.udpDatagramMaxSize-26 - + msgSizeLimit = constants.udpDatagramMaxSize - 26 def __init__(self, node, msgEncoder=encoding.Bencode(), msgTranslator=msgformat.DefaultFormat()): @@ -115,7 +114,7 @@ class KademliaProtocol(protocol.DatagramProtocol): # Set the RPC timeout timer timeoutCall = reactor.callLater( - constants.rpcTimeout, self._msgTimeout, msg.id) #IGNORE:E1101 + constants.rpcTimeout, self._msgTimeout, msg.id) # IGNORE:E1101 # Transmit the data self._send(encodedMsg, msg.id, (contact.address, contact.port)) self._sentMessages[msg.id] = (contact.id, df, timeoutCall) @@ -182,7 +181,7 @@ class KademliaProtocol(protocol.DatagramProtocol): else: localModuleHierarchy = self.__module__.split('.') remoteHierarchy = message.exceptionType.split('.') - #strip the remote hierarchy + # strip the remote hierarchy while remoteHierarchy[0] == localModuleHierarchy[0]: remoteHierarchy.pop(0) localModuleHierarchy.pop(0) @@ -199,7 +198,7 @@ class KademliaProtocol(protocol.DatagramProtocol): df.callback(message.response) else: # If the original message isn't found, it must have timed out - #TODO: we should probably do something with this... + # TODO: we should probably do something with this... pass def _send(self, data, rpcID, address): @@ -233,7 +232,7 @@ class KademliaProtocol(protocol.DatagramProtocol): seqNumber = 0 startPos = 0 while seqNumber < totalPackets: - packetData = data[startPos:startPos+self.msgSizeLimit] + packetData = data[startPos:startPos + self.msgSizeLimit] encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff) txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData) self._scheduleSendNext(txData, address) @@ -281,6 +280,7 @@ class KademliaProtocol(protocol.DatagramProtocol): def _handleRPC(self, senderContact, rpcID, method, args): """ Executes a local function in response to an RPC request """ + # Set up the deferred callchain def handleError(f): self._sendError(senderContact, rpcID, f.type, f.getErrorMessage()) @@ -359,7 +359,7 @@ class KademliaProtocol(protocol.DatagramProtocol): log.debug('Attempted to cancel a DelayedCall that was not active') except Exception: log.exception('Failed to cancel a DelayedCall') - # not sure why this is needed, but taking this out sometimes causes - # exceptions.AttributeError: 'Port' object has no attribute 'socket' - # to happen on shutdown - # reactor.iterate() + # not sure why this is needed, but taking this out sometimes causes + # exceptions.AttributeError: 'Port' object has no attribute 'socket' + # to happen on shutdown + # reactor.iterate() diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index b9dff61f2..ed09fb40d 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -5,24 +5,28 @@ # The docstrings in this module contain epytext markup; API documentation # may be created by processing this file with epydoc: http://epydoc.sf.net -import time, random - +import time +import random import constants import kbucket + from protocol import TimeoutError + class RoutingTable(object): """ Interface for RPC message translators/formatters Classes inheriting from this should provide a suitable routing table for a parent Node object (i.e. the local entity in the Kademlia network) """ + def __init__(self, parentNodeID): """ @param parentNodeID: The n-bit node ID of the node to which this routing table belongs @type parentNodeID: str """ + def addContact(self, contact): """ Add the given contact to the correct k-bucket; if it already exists, its status will be updated @@ -51,12 +55,14 @@ class RoutingTable(object): node is returning all of the contacts that it knows of. @rtype: list """ + def getContact(self, contactID): """ Returns the (known) contact with the specified node ID @raise ValueError: No contact with the specified contact ID is known by this node """ + def getRefreshList(self, startIndex=0, force=False): """ Finds all k-buckets that need refreshing, starting at the k-bucket with the specified index, and returns IDs to be searched for @@ -78,6 +84,7 @@ class RoutingTable(object): in order to refresh the routing Table @rtype: list """ + def removeContact(self, contactID): """ Remove the contact with the specified node ID from the routing table @@ -85,6 +92,7 @@ class RoutingTable(object): @param contactID: The node ID of the contact to remove @type contactID: str """ + def touchKBucket(self, key): """ Update the "last accessed" timestamp of the k-bucket which covers the range containing the specified key in the key/ID space @@ -109,6 +117,7 @@ class TreeRoutingTable(RoutingTable): C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. """ + def __init__(self, parentNodeID): """ @param parentNodeID: The n-bit node ID of the node to which this @@ -116,7 +125,7 @@ class TreeRoutingTable(RoutingTable): @type parentNodeID: str """ # Create the initial (single) k-bucket covering the range of the entire n-bit ID space - self._buckets = [kbucket.KBucket(rangeMin=0, rangeMax=2**constants.key_bits)] + self._buckets = [kbucket.KBucket(rangeMin=0, rangeMax=2 ** constants.key_bits)] self._parentNodeID = parentNodeID def addContact(self, contact): @@ -204,21 +213,21 @@ class TreeRoutingTable(RoutingTable): # with the specified key as node ID), unless there is less # than k remote nodes in the routing table i = 1 - canGoLower = bucketIndex-i >= 0 - canGoHigher = bucketIndex+i < len(self._buckets) + canGoLower = bucketIndex - i >= 0 + canGoHigher = bucketIndex + i < len(self._buckets) # Fill up the node list to k nodes, starting with the closest neighbouring nodes known while len(closestNodes) < constants.k and (canGoLower or canGoHigher): - #TODO: this may need to be optimized + # TODO: this may need to be optimized if canGoLower: closestNodes.extend( - self._buckets[bucketIndex-i].getContacts( + self._buckets[bucketIndex - i].getContacts( constants.k - len(closestNodes), _rpcNodeID)) - canGoLower = bucketIndex-(i+1) >= 0 + canGoLower = bucketIndex - (i + 1) >= 0 if canGoHigher: closestNodes.extend( - self._buckets[bucketIndex+i].getContacts( + self._buckets[bucketIndex + i].getContacts( constants.k - len(closestNodes), _rpcNodeID)) - canGoHigher = bucketIndex+(i+1) < len(self._buckets) + canGoHigher = bucketIndex + (i + 1) < len(self._buckets) i += 1 return closestNodes @@ -322,7 +331,7 @@ class TreeRoutingTable(RoutingTable): if len(randomID) % 2 != 0: randomID = '0' + randomID randomID = randomID.decode('hex') - randomID = (constants.key_bits/8 - len(randomID))*'\x00' + randomID + randomID = (constants.key_bits / 8 - len(randomID)) * '\x00' + randomID return randomID def _splitBucket(self, oldBucketIndex): @@ -335,7 +344,7 @@ class TreeRoutingTable(RoutingTable): """ # Resize the range of the current (old) k-bucket oldBucket = self._buckets[oldBucketIndex] - splitPoint = oldBucket.rangeMax - (oldBucket.rangeMax - oldBucket.rangeMin)/2 + splitPoint = oldBucket.rangeMax - (oldBucket.rangeMax - oldBucket.rangeMin) / 2 # Create a new k-bucket to cover the range split off from the old bucket newBucket = kbucket.KBucket(splitPoint, oldBucket.rangeMax) oldBucket.rangeMax = splitPoint @@ -349,11 +358,13 @@ class TreeRoutingTable(RoutingTable): for contact in newBucket._contacts: oldBucket.removeContact(contact) + class OptimizedTreeRoutingTable(TreeRoutingTable): """ A version of the "tree"-type routing table specified by Kademlia, along with contact accounting optimizations specified in section 4.1 of of the 13-page version of the Kademlia paper. """ + def __init__(self, parentNodeID): TreeRoutingTable.__init__(self, parentNodeID) # Cache containing nodes eligible to replace stale k-bucket entries @@ -386,7 +397,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable): # We can't split the k-bucket # NOTE: This implementation follows section 4.1 of the 13 page version # of the Kademlia paper (optimized contact accounting without PINGs - #- results in much less network traffic, at the expense of some memory) + # - results in much less network traffic, at the expense of some memory) # Put the new contact in our replacement cache for the # corresponding k-bucket (or update it's position if @@ -395,9 +406,9 @@ class OptimizedTreeRoutingTable(TreeRoutingTable): self._replacementCache[bucketIndex] = [] if contact in self._replacementCache[bucketIndex]: self._replacementCache[bucketIndex].remove(contact) - #TODO: Using k to limit the size of the contact - #replacement cache - maybe define a seperate value for - #this in constants.py? + # TODO: Using k to limit the size of the contact + # replacement cache - maybe define a seperate value for + # this in constants.py? elif len(self._replacementCache) >= constants.k: self._replacementCache.pop(0) self._replacementCache[bucketIndex].append(contact) diff --git a/lbrynet/lbrynet_daemon/DaemonControl.py b/lbrynet/lbrynet_daemon/DaemonControl.py index 4dbd24411..da551647b 100644 --- a/lbrynet/lbrynet_daemon/DaemonControl.py +++ b/lbrynet/lbrynet_daemon/DaemonControl.py @@ -140,4 +140,7 @@ def start_server_and_listen(launchui, use_auth, analytics_manager, max_tries=5): if __name__ == "__main__": + # import cProfile + # import time + # cProfile.run('start()', '/home/grin/code/lbry/lbry/daemonstats_' + time.strftime("%Y%m%d_%H%M%S")) start()