formatting

This commit is contained in:
Alex Grintsvayg 2017-03-31 13:32:43 -04:00
parent 5ec891c9ac
commit 850f51140e
13 changed files with 105 additions and 65 deletions

1
.gitignore vendored
View file

@ -6,6 +6,7 @@
*.log *.log
*.pem *.pem
*.decTest *.decTest
*.prof
.#* .#*
/build /build

View file

@ -43,7 +43,7 @@ peer_request_timeout = 10
#: The interval in which the node should check its whether any buckets need refreshing, #: The interval in which the node should check its whether any buckets need refreshing,
#: or whether any data needs to be republished (in seconds) #: or whether any data needs to be republished (in seconds)
checkRefreshInterval = refreshTimeout/5 checkRefreshInterval = refreshTimeout / 5
#: Max size of a single UDP datagram, in bytes. If a message is larger than this, it will #: Max size of a single UDP datagram, in bytes. If a message is larger than this, it will
#: be spread accross several UDP packets. #: be spread accross several UDP packets.

View file

@ -14,6 +14,7 @@ class Contact(object):
This class contains information on a single remote contact, and also This class contains information on a single remote contact, and also
provides a direct RPC API to the remote node which it represents provides a direct RPC API to the remote node which it represents
""" """
def __init__(self, id, ipAddress, udpPort, networkProtocol, firstComm=0): def __init__(self, id, ipAddress, udpPort, networkProtocol, firstComm=0):
self.id = id self.id = id
self.address = ipAddress self.address = ipAddress
@ -60,6 +61,8 @@ class Contact(object):
This happens via this contact's C{_networkProtocol} object (i.e. the This happens via this contact's C{_networkProtocol} object (i.e. the
host Node's C{_protocol} object). host Node's C{_protocol} object).
""" """
def _sendRPC(*args, **kwargs): def _sendRPC(*args, **kwargs):
return self._networkProtocol.sendRPC(self, name, args, **kwargs) return self._networkProtocol.sendRPC(self, name, args, **kwargs)
return _sendRPC return _sendRPC

View file

@ -12,21 +12,23 @@ import time
import constants import constants
class DataStore(UserDict.DictMixin): class DataStore(UserDict.DictMixin):
""" Interface for classes implementing physical storage (for data """ Interface for classes implementing physical storage (for data
published via the "STORE" RPC) for the Kademlia DHT published via the "STORE" RPC) for the Kademlia DHT
@note: This provides an interface for a dict-like object @note: This provides an interface for a dict-like object
""" """
def keys(self): def keys(self):
""" Return a list of the keys in this data store """ """ Return a list of the keys in this data store """
def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID): def addPeerToBlob(self, key, value, lastPublished, originallyPublished, originalPublisherID):
pass pass
class DictDataStore(DataStore): class DictDataStore(DataStore):
""" A datastore using an in-memory Python dictionary """ """ A datastore using an in-memory Python dictionary """
def __init__(self): def __init__(self):
# Dictionary format: # Dictionary format:
# { <key>: (<value>, <lastPublished>, <originallyPublished> <originalPublisherID>) } # { <key>: (<value>, <lastPublished>, <originallyPublished> <originalPublisherID>) }
@ -38,10 +40,12 @@ class DictDataStore(DataStore):
def removeExpiredPeers(self): def removeExpiredPeers(self):
now = int(time.time()) now = int(time.time())
def notExpired(peer): def notExpired(peer):
if (now - peer[2]) > constants.dataExpireTimeout: if (now - peer[2]) > constants.dataExpireTimeout:
return False return False
return True return True
for key in self._dict.keys(): for key in self._dict.keys():
unexpired_peers = filter(notExpired, self._dict[key]) unexpired_peers = filter(notExpired, self._dict[key])
self._dict[key] = unexpired_peers self._dict[key] = unexpired_peers

View file

@ -12,12 +12,14 @@ class DecodeError(Exception):
fails fails
""" """
class Encoding(object): class Encoding(object):
""" Interface for RPC message encoders/decoders """ Interface for RPC message encoders/decoders
All encoding implementations used with this library should inherit and All encoding implementations used with this library should inherit and
implement this. implement this.
""" """
def encode(self, data): def encode(self, data):
""" Encode the specified data """ Encode the specified data
@ -31,6 +33,7 @@ class Encoding(object):
@return: The encoded data @return: The encoded data
@rtype: str @rtype: str
""" """
def decode(self, data): def decode(self, data):
""" Decode the specified data string """ Decode the specified data string
@ -40,6 +43,7 @@ class Encoding(object):
@return: The decoded data (in its correct type) @return: The decoded data (in its correct type)
""" """
class Bencode(Encoding): class Bencode(Encoding):
""" Implementation of a Bencode-based algorithm (Bencode is the encoding """ Implementation of a Bencode-based algorithm (Bencode is the encoding
algorithm used by Bittorrent). algorithm used by Bittorrent).
@ -112,15 +116,15 @@ class Bencode(Encoding):
Do not call this; use C{decode()} instead Do not call this; use C{decode()} instead
""" """
if data[startIndex] == 'i': if data[startIndex] == 'i':
endPos = data[startIndex:].find('e')+startIndex endPos = data[startIndex:].find('e') + startIndex
return (int(data[startIndex+1:endPos]), endPos+1) return (int(data[startIndex + 1:endPos]), endPos + 1)
elif data[startIndex] == 'l': elif data[startIndex] == 'l':
startIndex += 1 startIndex += 1
decodedList = [] decodedList = []
while data[startIndex] != 'e': while data[startIndex] != 'e':
listData, startIndex = Bencode._decodeRecursive(data, startIndex) listData, startIndex = Bencode._decodeRecursive(data, startIndex)
decodedList.append(listData) decodedList.append(listData)
return (decodedList, startIndex+1) return (decodedList, startIndex + 1)
elif data[startIndex] == 'd': elif data[startIndex] == 'd':
startIndex += 1 startIndex += 1
decodedDict = {} decodedDict = {}
@ -131,19 +135,19 @@ class Bencode(Encoding):
return (decodedDict, startIndex) return (decodedDict, startIndex)
elif data[startIndex] == 'f': elif data[startIndex] == 'f':
# This (float data type) is a non-standard extension to the original Bencode algorithm # This (float data type) is a non-standard extension to the original Bencode algorithm
endPos = data[startIndex:].find('e')+startIndex endPos = data[startIndex:].find('e') + startIndex
return (float(data[startIndex+1:endPos]), endPos+1) return (float(data[startIndex + 1:endPos]), endPos + 1)
elif data[startIndex] == 'n': elif data[startIndex] == 'n':
# This (None/NULL data type) is a non-standard extension # This (None/NULL data type) is a non-standard extension
# to the original Bencode algorithm # to the original Bencode algorithm
return (None, startIndex+1) return (None, startIndex + 1)
else: else:
splitPos = data[startIndex:].find(':')+startIndex splitPos = data[startIndex:].find(':') + startIndex
try: try:
length = int(data[startIndex:splitPos]) length = int(data[startIndex:splitPos])
except ValueError, e: except ValueError, e:
raise DecodeError, e raise DecodeError, e
startIndex = splitPos+1 startIndex = splitPos + 1
endPos = startIndex+length endPos = startIndex + length
bytes = data[startIndex:endPos] bytes = data[startIndex:endPos]
return (bytes, endPos) return (bytes, endPos)

View file

@ -1,4 +1,3 @@
from collections import Counter from collections import Counter
import datetime import datetime

View file

@ -9,6 +9,7 @@
import constants import constants
class BucketFull(Exception): class BucketFull(Exception):
""" Raised when the bucket is full """ """ Raised when the bucket is full """
@ -16,6 +17,7 @@ class BucketFull(Exception):
class KBucket(object): class KBucket(object):
""" Description - later """ Description - later
""" """
def __init__(self, rangeMin, rangeMax): def __init__(self, rangeMin, rangeMax):
""" """
@param rangeMin: The lower boundary for the range in the n-bit ID @param rangeMin: The lower boundary for the range in the n-bit ID

View file

@ -9,6 +9,7 @@
import msgtypes import msgtypes
class MessageTranslator(object): class MessageTranslator(object):
""" Interface for RPC message translators/formatters """ Interface for RPC message translators/formatters
@ -16,6 +17,7 @@ class MessageTranslator(object):
the classes used internally by this Kademlia implementation and the actual the classes used internally by this Kademlia implementation and the actual
data that is transmitted between nodes. data that is transmitted between nodes.
""" """
def fromPrimitive(self, msgPrimitive): def fromPrimitive(self, msgPrimitive):
""" Create an RPC Message from a message's string representation """ Create an RPC Message from a message's string representation
@ -37,6 +39,7 @@ class MessageTranslator(object):
@rtype: str, int, list or dict @rtype: str, int, list or dict
""" """
class DefaultFormat(MessageTranslator): class DefaultFormat(MessageTranslator):
""" The default on-the-wire message format for this library """ """ The default on-the-wire message format for this library """
typeRequest, typeResponse, typeError = range(3) typeRequest, typeResponse, typeError = range(3)

View file

@ -10,8 +10,10 @@
import hashlib import hashlib
import random import random
class Message(object): class Message(object):
""" Base class for messages - all "unknown" messages use this class """ """ Base class for messages - all "unknown" messages use this class """
def __init__(self, rpcID, nodeID): def __init__(self, rpcID, nodeID):
self.id = rpcID self.id = rpcID
self.nodeID = nodeID self.nodeID = nodeID
@ -19,6 +21,7 @@ class Message(object):
class RequestMessage(Message): class RequestMessage(Message):
""" Message containing an RPC request """ """ Message containing an RPC request """
def __init__(self, nodeID, method, methodArgs, rpcID=None): def __init__(self, nodeID, method, methodArgs, rpcID=None):
if rpcID == None: if rpcID == None:
hash = hashlib.sha384() hash = hashlib.sha384()
@ -31,6 +34,7 @@ class RequestMessage(Message):
class ResponseMessage(Message): class ResponseMessage(Message):
""" Message containing the result from a successful RPC request """ """ Message containing the result from a successful RPC request """
def __init__(self, rpcID, nodeID, response): def __init__(self, rpcID, nodeID, response):
Message.__init__(self, rpcID, nodeID) Message.__init__(self, rpcID, nodeID)
self.response = response self.response = response
@ -38,6 +42,7 @@ class ResponseMessage(Message):
class ErrorMessage(ResponseMessage): class ErrorMessage(ResponseMessage):
""" Message containing the error from an unsuccessful RPC request """ """ Message containing the error from an unsuccessful RPC request """
def __init__(self, rpcID, nodeID, exceptionType, errorMessage): def __init__(self, rpcID, nodeID, exceptionType, errorMessage):
ResponseMessage.__init__(self, rpcID, nodeID, errorMessage) ResponseMessage.__init__(self, rpcID, nodeID, errorMessage)
if isinstance(exceptionType, type): if isinstance(exceptionType, type):

View file

@ -27,7 +27,6 @@ from contact import Contact
from hashwatcher import HashWatcher from hashwatcher import HashWatcher
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -51,6 +50,7 @@ class Node(object):
In Entangled, all interactions with the Kademlia network by a client In Entangled, all interactions with the Kademlia network by a client
application is performed via this class (or a subclass). application is performed via this class (or a subclass).
""" """
def __init__(self, id=None, udpPort=4000, dataStore=None, def __init__(self, id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None, lbryid=None, routingTableClass=None, networkProtocol=None, lbryid=None,
externalIP=None): externalIP=None):
@ -126,7 +126,7 @@ class Node(object):
self._listeningPort.stopListening() self._listeningPort.stopListening()
def stop(self): def stop(self):
#cancel callLaters: # cancel callLaters:
if self.next_refresh_call is not None: if self.next_refresh_call is not None:
self.next_refresh_call.cancel() self.next_refresh_call.cancel()
self.next_refresh_call = None self.next_refresh_call = None
@ -137,7 +137,6 @@ class Node(object):
self._listeningPort.stopListening() self._listeningPort.stopListening()
self.hash_watcher.stop() self.hash_watcher.stop()
def joinNetwork(self, knownNodeAddresses=None): def joinNetwork(self, knownNodeAddresses=None):
""" Causes the Node to join the Kademlia network; normally, this """ Causes the Node to join the Kademlia network; normally, this
should be called before any other DHT operations. should be called before any other DHT operations.
@ -156,7 +155,7 @@ class Node(object):
import traceback import traceback
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
raise ValueError("%s lbrynet may already be running." % str(e)) raise ValueError("%s lbrynet may already be running." % str(e))
#IGNORE:E1101 # IGNORE:E1101
# Create temporary contact information for the list of addresses of known nodes # Create temporary contact information for the list of addresses of known nodes
if knownNodeAddresses != None: if knownNodeAddresses != None:
bootstrapContacts = [] bootstrapContacts = []
@ -167,10 +166,10 @@ class Node(object):
bootstrapContacts = None bootstrapContacts = None
# Initiate the Kademlia joining sequence - perform a search for this node's own ID # Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts) self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour # #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary # Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = twisted.internet.reactor.callLater( self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode) #IGNORE:E1101 constants.checkRefreshInterval, self._refreshNode) # IGNORE:E1101
self.hash_watcher.tick() self.hash_watcher.tick()
return self._joinDeferred return self._joinDeferred
@ -187,7 +186,7 @@ class Node(object):
# estimate! # estimate!
bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)] bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)]
num_in_bucket = len(bucket._contacts) num_in_bucket = len(bucket._contacts)
factor = (2**constants.key_bits) / (bucket.rangeMax - bucket.rangeMin) factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin)
return num_in_bucket * factor return num_in_bucket * factor
def getApproximateTotalHashes(self): def getApproximateTotalHashes(self):
@ -351,6 +350,7 @@ class Node(object):
""" """
# Prepare a callback for this operation # Prepare a callback for this operation
outerDf = defer.Deferred() outerDf = defer.Deferred()
def checkResult(result): def checkResult(result):
if type(result) == dict: if type(result) == dict:
# We have found the value; now see who was the closest contact without it... # We have found the value; now see who was the closest contact without it...
@ -416,6 +416,7 @@ class Node(object):
return contact return contact
else: else:
return None return None
df = self.iterativeFindNode(contactID) df = self.iterativeFindNode(contactID)
df.addCallback(parseResults) df.addCallback(parseResults)
return df return df
@ -466,7 +467,7 @@ class Node(object):
compact_ip = contact.compact_ip() compact_ip = contact.compact_ip()
else: else:
return 'Not OK' return 'Not OK'
#raise TypeError, 'No contact info available' # raise TypeError, 'No contact info available'
if ((self_store is False) and if ((self_store is False) and
(not 'token' in value or not self.verify_token(value['token'], compact_ip))): (not 'token' in value or not self.verify_token(value['token'], compact_ip))):
@ -490,7 +491,7 @@ class Node(object):
raise TypeError, 'No lbryid given' raise TypeError, 'No lbryid given'
now = int(time.time()) now = int(time.time())
originallyPublished = now# - age originallyPublished = now # - age
self._dataStore.addPeerToBlob( self._dataStore.addPeerToBlob(
key, compact_address, now, originallyPublished, originalPublisherID) key, compact_address, now, originallyPublished, originalPublisherID)
return 'OK' return 'OK'
@ -618,6 +619,7 @@ class Node(object):
def _refreshRoutingTable(self): def _refreshRoutingTable(self):
nodeIDs = self._routingTable.getRefreshList(0, False) nodeIDs = self._routingTable.getRefreshList(0, False)
outerDf = defer.Deferred() outerDf = defer.Deferred()
def searchForNextNodeID(dfResult=None): def searchForNextNodeID(dfResult=None):
if len(nodeIDs) > 0: if len(nodeIDs) > 0:
searchID = nodeIDs.pop() searchID = nodeIDs.pop()
@ -626,16 +628,16 @@ class Node(object):
else: else:
# If this is reached, we have finished refreshing the routing table # If this is reached, we have finished refreshing the routing table
outerDf.callback(None) outerDf.callback(None)
# Start the refreshing cycle # Start the refreshing cycle
searchForNextNodeID() searchForNextNodeID()
return outerDf return outerDf
def _scheduleNextNodeRefresh(self, *args): def _scheduleNextNodeRefresh(self, *args):
self.next_refresh_call = twisted.internet.reactor.callLater( self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode) constants.checkRefreshInterval, self._refreshNode)
#args put here because _refreshRoutingTable does outerDF.callback(None) # args put here because _refreshRoutingTable does outerDF.callback(None)
def _removeExpiredPeers(self, *args): def _removeExpiredPeers(self, *args):
df = twisted.internet.threads.deferToThread(self._dataStore.removeExpiredPeers) df = twisted.internet.threads.deferToThread(self._dataStore.removeExpiredPeers)
return df return df
@ -693,7 +695,7 @@ class _IterativeFindHelper(object):
self.already_contacted.append(responseMsg.nodeID) self.already_contacted.append(responseMsg.nodeID)
# Now grow extend the (unverified) shortlist with the returned contacts # Now grow extend the (unverified) shortlist with the returned contacts
result = responseMsg.response result = responseMsg.response
#TODO: some validation on the result (for guarding against attacks) # TODO: some validation on the result (for guarding against attacks)
# If we are looking for a value, first see if this result is the value # If we are looking for a value, first see if this result is the value
# we are looking for before treating it as a list of contact triples # we are looking for before treating it as a list of contact triples
if self.find_value is True and self.key in result and not 'contacts' in result: if self.find_value is True and self.key in result and not 'contacts' in result:
@ -756,7 +758,7 @@ class _IterativeFindHelper(object):
def cancelActiveProbe(self, contactID): def cancelActiveProbe(self, contactID):
self.active_probes.pop() self.active_probes.pop()
if len(self.active_probes) <= constants.alpha/2 and len(self.pending_iteration_calls): if len(self.active_probes) <= constants.alpha / 2 and len(self.pending_iteration_calls):
# Force the iteration # Force the iteration
self.pending_iteration_calls[0].cancel() self.pending_iteration_calls[0].cancel()
del self.pending_iteration_calls[0] del self.pending_iteration_calls[0]
@ -804,7 +806,7 @@ class _IterativeFindHelper(object):
# Schedule the next iteration if there are any active # Schedule the next iteration if there are any active
# calls (Kademlia uses loose parallelism) # calls (Kademlia uses loose parallelism)
call = twisted.internet.reactor.callLater( call = twisted.internet.reactor.callLater(
constants.iterativeLookupDelay, self.searchIteration) #IGNORE:E1101 constants.iterativeLookupDelay, self.searchIteration) # IGNORE:E1101
self.pending_iteration_calls.append(call) self.pending_iteration_calls.append(call)
# Check for a quick contact response that made an update to the shortList # Check for a quick contact response that made an update to the shortList
elif prevShortlistLength < len(self.shortlist): elif prevShortlistLength < len(self.shortlist):
@ -850,6 +852,7 @@ class Distance(object):
Frequently we re-use one of the points so as an optimization Frequently we re-use one of the points so as an optimization
we pre-calculate the long value of that point. we pre-calculate the long value of that point.
""" """
def __init__(self, key): def __init__(self, key):
self.key = key self.key = key
self.val_key_one = long(key.encode('hex'), 16) self.val_key_one = long(key.encode('hex'), 16)
@ -879,6 +882,7 @@ class ExpensiveSort(object):
key: callable, like `key` in normal python sort key: callable, like `key` in normal python sort
attr: the attribute name used to cache the value on each item. attr: the attribute name used to cache the value on each item.
""" """
def __init__(self, to_sort, key, attr='__value'): def __init__(self, to_sort, key, attr='__value'):
self.to_sort = to_sort self.to_sort = to_sort
self.key = key self.key = key
@ -923,5 +927,6 @@ def main():
node.joinNetwork(known_nodes) node.joinNetwork(known_nodes)
twisted.internet.reactor.run() twisted.internet.reactor.run()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View file

@ -24,13 +24,13 @@ import msgtypes
import msgformat import msgformat
from contact import Contact from contact import Contact
reactor = twisted.internet.reactor reactor = twisted.internet.reactor
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class TimeoutError(Exception): class TimeoutError(Exception):
""" Raised when a RPC times out """ """ Raised when a RPC times out """
def __init__(self, remote_contact_id): def __init__(self, remote_contact_id):
# remote_contact_id is a binary blob so we need to convert it # remote_contact_id is a binary blob so we need to convert it
# into something more readable # into something more readable
@ -40,8 +40,8 @@ class TimeoutError(Exception):
class Delay(object): class Delay(object):
maxToSendDelay = 10**-3 #0.05 maxToSendDelay = 10 ** -3 # 0.05
minToSendDelay = 10**-5 #0.01 minToSendDelay = 10 ** -5 # 0.01
def __init__(self, start=0): def __init__(self, start=0):
self._next = start self._next = start
@ -62,8 +62,7 @@ class Delay(object):
class KademliaProtocol(protocol.DatagramProtocol): class KademliaProtocol(protocol.DatagramProtocol):
""" Implements all low-level network-related functions of a Kademlia node """ """ Implements all low-level network-related functions of a Kademlia node """
msgSizeLimit = constants.udpDatagramMaxSize-26 msgSizeLimit = constants.udpDatagramMaxSize - 26
def __init__(self, node, msgEncoder=encoding.Bencode(), def __init__(self, node, msgEncoder=encoding.Bencode(),
msgTranslator=msgformat.DefaultFormat()): msgTranslator=msgformat.DefaultFormat()):
@ -115,7 +114,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
# Set the RPC timeout timer # Set the RPC timeout timer
timeoutCall = reactor.callLater( timeoutCall = reactor.callLater(
constants.rpcTimeout, self._msgTimeout, msg.id) #IGNORE:E1101 constants.rpcTimeout, self._msgTimeout, msg.id) # IGNORE:E1101
# Transmit the data # Transmit the data
self._send(encodedMsg, msg.id, (contact.address, contact.port)) self._send(encodedMsg, msg.id, (contact.address, contact.port))
self._sentMessages[msg.id] = (contact.id, df, timeoutCall) self._sentMessages[msg.id] = (contact.id, df, timeoutCall)
@ -182,7 +181,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
else: else:
localModuleHierarchy = self.__module__.split('.') localModuleHierarchy = self.__module__.split('.')
remoteHierarchy = message.exceptionType.split('.') remoteHierarchy = message.exceptionType.split('.')
#strip the remote hierarchy # strip the remote hierarchy
while remoteHierarchy[0] == localModuleHierarchy[0]: while remoteHierarchy[0] == localModuleHierarchy[0]:
remoteHierarchy.pop(0) remoteHierarchy.pop(0)
localModuleHierarchy.pop(0) localModuleHierarchy.pop(0)
@ -199,7 +198,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
df.callback(message.response) df.callback(message.response)
else: else:
# If the original message isn't found, it must have timed out # If the original message isn't found, it must have timed out
#TODO: we should probably do something with this... # TODO: we should probably do something with this...
pass pass
def _send(self, data, rpcID, address): def _send(self, data, rpcID, address):
@ -233,7 +232,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
seqNumber = 0 seqNumber = 0
startPos = 0 startPos = 0
while seqNumber < totalPackets: while seqNumber < totalPackets:
packetData = data[startPos:startPos+self.msgSizeLimit] packetData = data[startPos:startPos + self.msgSizeLimit]
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff) encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData) txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
self._scheduleSendNext(txData, address) self._scheduleSendNext(txData, address)
@ -281,6 +280,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
def _handleRPC(self, senderContact, rpcID, method, args): def _handleRPC(self, senderContact, rpcID, method, args):
""" Executes a local function in response to an RPC request """ """ Executes a local function in response to an RPC request """
# Set up the deferred callchain # Set up the deferred callchain
def handleError(f): def handleError(f):
self._sendError(senderContact, rpcID, f.type, f.getErrorMessage()) self._sendError(senderContact, rpcID, f.type, f.getErrorMessage())

View file

@ -5,24 +5,28 @@
# The docstrings in this module contain epytext markup; API documentation # The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net # may be created by processing this file with epydoc: http://epydoc.sf.net
import time, random import time
import random
import constants import constants
import kbucket import kbucket
from protocol import TimeoutError from protocol import TimeoutError
class RoutingTable(object): class RoutingTable(object):
""" Interface for RPC message translators/formatters """ Interface for RPC message translators/formatters
Classes inheriting from this should provide a suitable routing table for Classes inheriting from this should provide a suitable routing table for
a parent Node object (i.e. the local entity in the Kademlia network) a parent Node object (i.e. the local entity in the Kademlia network)
""" """
def __init__(self, parentNodeID): def __init__(self, parentNodeID):
""" """
@param parentNodeID: The n-bit node ID of the node to which this @param parentNodeID: The n-bit node ID of the node to which this
routing table belongs routing table belongs
@type parentNodeID: str @type parentNodeID: str
""" """
def addContact(self, contact): def addContact(self, contact):
""" Add the given contact to the correct k-bucket; if it already """ Add the given contact to the correct k-bucket; if it already
exists, its status will be updated exists, its status will be updated
@ -51,12 +55,14 @@ class RoutingTable(object):
node is returning all of the contacts that it knows of. node is returning all of the contacts that it knows of.
@rtype: list @rtype: list
""" """
def getContact(self, contactID): def getContact(self, contactID):
""" Returns the (known) contact with the specified node ID """ Returns the (known) contact with the specified node ID
@raise ValueError: No contact with the specified contact ID is known @raise ValueError: No contact with the specified contact ID is known
by this node by this node
""" """
def getRefreshList(self, startIndex=0, force=False): def getRefreshList(self, startIndex=0, force=False):
""" Finds all k-buckets that need refreshing, starting at the """ Finds all k-buckets that need refreshing, starting at the
k-bucket with the specified index, and returns IDs to be searched for k-bucket with the specified index, and returns IDs to be searched for
@ -78,6 +84,7 @@ class RoutingTable(object):
in order to refresh the routing Table in order to refresh the routing Table
@rtype: list @rtype: list
""" """
def removeContact(self, contactID): def removeContact(self, contactID):
""" Remove the contact with the specified node ID from the routing """ Remove the contact with the specified node ID from the routing
table table
@ -85,6 +92,7 @@ class RoutingTable(object):
@param contactID: The node ID of the contact to remove @param contactID: The node ID of the contact to remove
@type contactID: str @type contactID: str
""" """
def touchKBucket(self, key): def touchKBucket(self, key):
""" Update the "last accessed" timestamp of the k-bucket which covers """ Update the "last accessed" timestamp of the k-bucket which covers
the range containing the specified key in the key/ID space the range containing the specified key in the key/ID space
@ -109,6 +117,7 @@ class TreeRoutingTable(RoutingTable):
C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of
that paper. that paper.
""" """
def __init__(self, parentNodeID): def __init__(self, parentNodeID):
""" """
@param parentNodeID: The n-bit node ID of the node to which this @param parentNodeID: The n-bit node ID of the node to which this
@ -116,7 +125,7 @@ class TreeRoutingTable(RoutingTable):
@type parentNodeID: str @type parentNodeID: str
""" """
# Create the initial (single) k-bucket covering the range of the entire n-bit ID space # Create the initial (single) k-bucket covering the range of the entire n-bit ID space
self._buckets = [kbucket.KBucket(rangeMin=0, rangeMax=2**constants.key_bits)] self._buckets = [kbucket.KBucket(rangeMin=0, rangeMax=2 ** constants.key_bits)]
self._parentNodeID = parentNodeID self._parentNodeID = parentNodeID
def addContact(self, contact): def addContact(self, contact):
@ -204,21 +213,21 @@ class TreeRoutingTable(RoutingTable):
# with the specified key as node ID), unless there is less # with the specified key as node ID), unless there is less
# than k remote nodes in the routing table # than k remote nodes in the routing table
i = 1 i = 1
canGoLower = bucketIndex-i >= 0 canGoLower = bucketIndex - i >= 0
canGoHigher = bucketIndex+i < len(self._buckets) canGoHigher = bucketIndex + i < len(self._buckets)
# Fill up the node list to k nodes, starting with the closest neighbouring nodes known # Fill up the node list to k nodes, starting with the closest neighbouring nodes known
while len(closestNodes) < constants.k and (canGoLower or canGoHigher): while len(closestNodes) < constants.k and (canGoLower or canGoHigher):
#TODO: this may need to be optimized # TODO: this may need to be optimized
if canGoLower: if canGoLower:
closestNodes.extend( closestNodes.extend(
self._buckets[bucketIndex-i].getContacts( self._buckets[bucketIndex - i].getContacts(
constants.k - len(closestNodes), _rpcNodeID)) constants.k - len(closestNodes), _rpcNodeID))
canGoLower = bucketIndex-(i+1) >= 0 canGoLower = bucketIndex - (i + 1) >= 0
if canGoHigher: if canGoHigher:
closestNodes.extend( closestNodes.extend(
self._buckets[bucketIndex+i].getContacts( self._buckets[bucketIndex + i].getContacts(
constants.k - len(closestNodes), _rpcNodeID)) constants.k - len(closestNodes), _rpcNodeID))
canGoHigher = bucketIndex+(i+1) < len(self._buckets) canGoHigher = bucketIndex + (i + 1) < len(self._buckets)
i += 1 i += 1
return closestNodes return closestNodes
@ -322,7 +331,7 @@ class TreeRoutingTable(RoutingTable):
if len(randomID) % 2 != 0: if len(randomID) % 2 != 0:
randomID = '0' + randomID randomID = '0' + randomID
randomID = randomID.decode('hex') randomID = randomID.decode('hex')
randomID = (constants.key_bits/8 - len(randomID))*'\x00' + randomID randomID = (constants.key_bits / 8 - len(randomID)) * '\x00' + randomID
return randomID return randomID
def _splitBucket(self, oldBucketIndex): def _splitBucket(self, oldBucketIndex):
@ -335,7 +344,7 @@ class TreeRoutingTable(RoutingTable):
""" """
# Resize the range of the current (old) k-bucket # Resize the range of the current (old) k-bucket
oldBucket = self._buckets[oldBucketIndex] oldBucket = self._buckets[oldBucketIndex]
splitPoint = oldBucket.rangeMax - (oldBucket.rangeMax - oldBucket.rangeMin)/2 splitPoint = oldBucket.rangeMax - (oldBucket.rangeMax - oldBucket.rangeMin) / 2
# Create a new k-bucket to cover the range split off from the old bucket # Create a new k-bucket to cover the range split off from the old bucket
newBucket = kbucket.KBucket(splitPoint, oldBucket.rangeMax) newBucket = kbucket.KBucket(splitPoint, oldBucket.rangeMax)
oldBucket.rangeMax = splitPoint oldBucket.rangeMax = splitPoint
@ -349,11 +358,13 @@ class TreeRoutingTable(RoutingTable):
for contact in newBucket._contacts: for contact in newBucket._contacts:
oldBucket.removeContact(contact) oldBucket.removeContact(contact)
class OptimizedTreeRoutingTable(TreeRoutingTable): class OptimizedTreeRoutingTable(TreeRoutingTable):
""" A version of the "tree"-type routing table specified by Kademlia, """ A version of the "tree"-type routing table specified by Kademlia,
along with contact accounting optimizations specified in section 4.1 of along with contact accounting optimizations specified in section 4.1 of
of the 13-page version of the Kademlia paper. of the 13-page version of the Kademlia paper.
""" """
def __init__(self, parentNodeID): def __init__(self, parentNodeID):
TreeRoutingTable.__init__(self, parentNodeID) TreeRoutingTable.__init__(self, parentNodeID)
# Cache containing nodes eligible to replace stale k-bucket entries # Cache containing nodes eligible to replace stale k-bucket entries
@ -386,7 +397,7 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
# We can't split the k-bucket # We can't split the k-bucket
# NOTE: This implementation follows section 4.1 of the 13 page version # NOTE: This implementation follows section 4.1 of the 13 page version
# of the Kademlia paper (optimized contact accounting without PINGs # of the Kademlia paper (optimized contact accounting without PINGs
#- results in much less network traffic, at the expense of some memory) # - results in much less network traffic, at the expense of some memory)
# Put the new contact in our replacement cache for the # Put the new contact in our replacement cache for the
# corresponding k-bucket (or update it's position if # corresponding k-bucket (or update it's position if
@ -395,9 +406,9 @@ class OptimizedTreeRoutingTable(TreeRoutingTable):
self._replacementCache[bucketIndex] = [] self._replacementCache[bucketIndex] = []
if contact in self._replacementCache[bucketIndex]: if contact in self._replacementCache[bucketIndex]:
self._replacementCache[bucketIndex].remove(contact) self._replacementCache[bucketIndex].remove(contact)
#TODO: Using k to limit the size of the contact # TODO: Using k to limit the size of the contact
#replacement cache - maybe define a seperate value for # replacement cache - maybe define a seperate value for
#this in constants.py? # this in constants.py?
elif len(self._replacementCache) >= constants.k: elif len(self._replacementCache) >= constants.k:
self._replacementCache.pop(0) self._replacementCache.pop(0)
self._replacementCache[bucketIndex].append(contact) self._replacementCache[bucketIndex].append(contact)

View file

@ -140,4 +140,7 @@ def start_server_and_listen(launchui, use_auth, analytics_manager, max_tries=5):
if __name__ == "__main__": if __name__ == "__main__":
# import cProfile
# import time
# cProfile.run('start()', '/home/grin/code/lbry/lbry/daemonstats_' + time.strftime("%Y%m%d_%H%M%S"))
start() start()