Merge pull request #650 from lbryio/dht-bandwidth-tracking

add dht bandwidth tracking
This commit is contained in:
Jack Robison 2017-05-30 11:42:55 -04:00 committed by GitHub
commit 17b692d258
5 changed files with 171 additions and 41 deletions

View file

@ -10,6 +10,7 @@ at anytime.
## [Unreleased]
### Added
* Prevent publish of files with size 0
* Add `dht_status` parameter to `status` to include bandwidth and peer info
*
### Changed

View file

@ -20,7 +20,8 @@ class HashWatcher(object):
self.next_tick.cancel()
self.next_tick = None
def add_requested_hash(self, hashsum, from_ip):
def add_requested_hash(self, hashsum, contact):
from_ip = contact.compact_ip
matching_hashes = [h for h in self.hashes if h[0] == hashsum and h[2] == from_ip]
if len(matching_hashes) == 0:
self.hashes.append((hashsum, datetime.datetime.now(), from_ip))

View file

@ -12,15 +12,12 @@ import operator
import struct
import time
from twisted.internet import defer, error
from twisted.internet import defer, error, reactor, threads
import constants
import routingtable
import datastore
import protocol
import twisted.internet.reactor
import twisted.internet.threads
import twisted.python.log
from contact import Contact
from hashwatcher import HashWatcher
@ -138,6 +135,7 @@ class Node(object):
self._listeningPort.stopListening()
self.hash_watcher.stop()
@defer.inlineCallbacks
def joinNetwork(self, knownNodeAddresses=None):
""" Causes the Node to join the Kademlia network; normally, this
should be called before any other DHT operations.
@ -151,7 +149,7 @@ class Node(object):
# Prepare the underlying Kademlia protocol
if self.port is not None:
try:
self._listeningPort = twisted.internet.reactor.listenUDP(self.port, self._protocol)
self._listeningPort = reactor.listenUDP(self.port, self._protocol)
except error.CannotListenError as e:
import traceback
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
@ -169,10 +167,11 @@ class Node(object):
self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode) # IGNORE:E1101
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
self._refreshNode)
self.hash_watcher.tick()
return self._joinDeferred
yield self._joinDeferred
def printContacts(self, *args):
print '\n\nNODE CONTACTS\n==============='
@ -204,7 +203,6 @@ class Node(object):
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid})
def getPeersForBlob(self, blob_hash):
def expand_and_filter(result):
expanded_peers = []
if isinstance(result, dict):
@ -230,8 +228,10 @@ class Node(object):
def get_most_popular_hashes(self, num_to_return):
return self.hash_watcher.most_popular_hashes(num_to_return)
def iterativeAnnounceHaveBlob(self, blob_hash, value):
def get_bandwidth_stats(self):
return self._protocol.bandwidth_stats
def iterativeAnnounceHaveBlob(self, blob_hash, value):
known_nodes = {}
def log_error(err, n):
@ -295,8 +295,8 @@ class Node(object):
def change_token(self):
self.old_token_secret = self.token_secret
self.token_secret = self._generateID()
self.next_change_token_call = twisted.internet.reactor.callLater(
constants.tokenSecretChangeInterval, self.change_token)
self.next_change_token_call = reactor.callLater(constants.tokenSecretChangeInterval,
self.change_token)
def make_token(self, compact_ip):
h = hashlib.new('sha384')
@ -329,6 +329,7 @@ class Node(object):
"""
return self._iterativeFind(key)
@defer.inlineCallbacks
def iterativeFindValue(self, key):
""" The Kademlia search operation (deterministic)
@ -372,9 +373,10 @@ class Node(object):
outerDf.callback(result)
# Execute the search
df = self._iterativeFind(key, rpc='findValue')
df.addCallback(checkResult)
return outerDf
iterative_find_result = yield self._iterativeFind(key, rpc='findValue')
checkResult(iterative_find_result)
result = yield outerDf
defer.returnValue(result)
def addContact(self, contact):
""" Add/update the given contact; simple wrapper for the same method
@ -493,8 +495,8 @@ class Node(object):
now = int(time.time())
originallyPublished = now # - age
self._dataStore.addPeerToBlob(
key, compact_address, now, originallyPublished, originalPublisherID)
self._dataStore.addPeerToBlob(key, compact_address, now, originallyPublished,
originalPublisherID)
return 'OK'
@rpcmethod
@ -534,6 +536,7 @@ class Node(object):
or a list of contact triples closest to the requested key.
@rtype: dict or list
"""
if self._dataStore.hasPeersForBlob(key):
rval = {key: self._dataStore.getPeersForBlob(key)}
else:
@ -543,7 +546,7 @@ class Node(object):
contact = kwargs['_rpcNodeContact']
compact_ip = contact.compact_ip()
rval['token'] = self.make_token(compact_ip)
self.hash_watcher.add_requested_hash(key, compact_ip)
self.hash_watcher.add_requested_hash(key, contact)
return rval
def _generateID(self):
@ -554,6 +557,7 @@ class Node(object):
"""
return generate_id()
@defer.inlineCallbacks
def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'):
""" The basic Kademlia iterative lookup operation (for nodes/values)
@ -593,7 +597,8 @@ class Node(object):
# This node doesn't know of any other nodes
fakeDf = defer.Deferred()
fakeDf.callback([])
return fakeDf
result = yield fakeDf
defer.returnValue(result)
else:
# This is used during the bootstrap process; node ID's are most probably fake
shortlist = startupShortlist
@ -603,11 +608,13 @@ class Node(object):
helper = _IterativeFindHelper(self, outerDf, shortlist, key, findValue, rpc)
# Start the iterations
helper.searchIteration()
return outerDf
result = yield outerDf
defer.returnValue(result)
def _refreshNode(self):
""" Periodically called to perform k-bucket refreshes and data
replication/republishing as necessary """
df = self._refreshRoutingTable()
df.addCallback(self._removeExpiredPeers)
df.addCallback(self._scheduleNextNodeRefresh)
@ -630,12 +637,12 @@ class Node(object):
return outerDf
def _scheduleNextNodeRefresh(self, *args):
self.next_refresh_call = twisted.internet.reactor.callLater(
constants.checkRefreshInterval, self._refreshNode)
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
self._refreshNode)
# args put here because _refreshRoutingTable does outerDF.callback(None)
def _removeExpiredPeers(self, *args):
df = twisted.internet.threads.deferToThread(self._dataStore.removeExpiredPeers)
df = threads.deferToThread(self._dataStore.removeExpiredPeers)
return df
@ -689,6 +696,7 @@ class _IterativeFindHelper(object):
# This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice
if responseMsg.nodeID not in self.already_contacted:
self.already_contacted.append(responseMsg.nodeID)
# Now grow extend the (unverified) shortlist with the returned contacts
result = responseMsg.response
# TODO: some validation on the result (for guarding against attacks)
@ -785,6 +793,7 @@ class _IterativeFindHelper(object):
# noted
self.outer_d.callback(self.active_contacts)
return
# The search continues...
if len(self.active_contacts):
self.prev_closest_node[0] = self.active_contacts[0]
@ -801,8 +810,7 @@ class _IterativeFindHelper(object):
if self._should_lookup_active_calls():
# Schedule the next iteration if there are any active
# calls (Kademlia uses loose parallelism)
call = twisted.internet.reactor.callLater(
constants.iterativeLookupDelay, self.searchIteration) # IGNORE:E1101
call = reactor.callLater(constants.iterativeLookupDelay, self.searchIteration)
self.pending_iteration_calls.append(call)
# Check for a quick contact response that made an update to the shortList
elif prevShortlistLength < len(self.shortlist):
@ -819,7 +827,7 @@ class _IterativeFindHelper(object):
df.addCallback(self.extendShortlist)
df.addErrback(self.removeFromShortlist)
df.addCallback(self.cancelActiveProbe)
df.addErrback(log.fail(), 'Failed to contact %s', contact)
df.addErrback(lambda _: log.exception('Failed to contact %s', contact))
self.already_contacted.append(contact.id)
def _should_lookup_active_calls(self):

View file

@ -13,10 +13,8 @@ import time
import socket
import errno
from twisted.internet import protocol, defer
from twisted.internet import protocol, defer, error, reactor, task
from twisted.python import failure
from twisted.internet import error
import twisted.internet.reactor
import constants
import encoding
@ -24,7 +22,6 @@ import msgtypes
import msgformat
from contact import Contact
reactor = twisted.internet.reactor
log = logging.getLogger(__name__)
@ -62,13 +59,13 @@ class Delay(object):
class KademliaProtocol(protocol.DatagramProtocol):
""" Implements all low-level network-related functions of a Kademlia node """
msgSizeLimit = constants.udpDatagramMaxSize - 26
def __init__(self, node, msgEncoder=encoding.Bencode(),
msgTranslator=msgformat.DefaultFormat()):
def __init__(self, node):
self._node = node
self._encoder = msgEncoder
self._translator = msgTranslator
self._encoder = encoding.Bencode()
self._translator = msgformat.DefaultFormat()
self._sentMessages = {}
self._partialMessages = {}
self._partialMessagesProgress = {}
@ -77,6 +74,94 @@ class KademliaProtocol(protocol.DatagramProtocol):
# can be cancelled on shutdown
self._call_later_list = {}
# keep track of bandwidth usage by peer
self._history_rx = {}
self._history_tx = {}
self._bytes_rx = {}
self._bytes_tx = {}
self._queries_rx_per_second = 0
self._queries_tx_per_second = 0
self._kbps_tx = 0
self._kbps_rx = 0
self._recent_contact_count = 0
self._total_bytes_tx = 0
self._total_bytes_rx = 0
self._bandwidth_stats_update_lc = task.LoopingCall(self._update_bandwidth_stats)
def _update_bandwidth_stats(self):
recent_rx_history = {}
now = time.time()
for address, history in self._history_rx.iteritems():
recent_rx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0]
qps_rx = sum(len(v) for (k, v) in recent_rx_history.iteritems())
bps_rx = sum(sum([x[0] for x in v]) for (k, v) in recent_rx_history.iteritems())
kbps_rx = round(float(bps_rx) / 1024.0, 2)
recent_tx_history = {}
now = time.time()
for address, history in self._history_tx.iteritems():
recent_tx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0]
qps_tx = sum(len(v) for (k, v) in recent_tx_history.iteritems())
bps_tx = sum(sum([x[0] for x in v]) for (k, v) in recent_tx_history.iteritems())
kbps_tx = round(float(bps_tx) / 1024.0, 2)
recent_contacts = []
for k, v in recent_rx_history.iteritems():
if v:
recent_contacts.append(k)
for k, v in recent_tx_history.iteritems():
if v and k not in recent_contacts:
recent_contacts.append(k)
self._queries_rx_per_second = qps_rx
self._queries_tx_per_second = qps_tx
self._kbps_tx = kbps_tx
self._kbps_rx = kbps_rx
self._recent_contact_count = len(recent_contacts)
self._total_bytes_tx = sum(v for (k, v) in self._bytes_tx.iteritems())
self._total_bytes_rx = sum(v for (k, v) in self._bytes_rx.iteritems())
@property
def queries_rx_per_second(self):
return self._queries_rx_per_second
@property
def queries_tx_per_second(self):
return self._queries_tx_per_second
@property
def kbps_tx(self):
return self._kbps_tx
@property
def kbps_rx(self):
return self._kbps_rx
@property
def recent_contact_count(self):
return self._recent_contact_count
@property
def total_bytes_tx(self):
return self._total_bytes_tx
@property
def total_bytes_rx(self):
return self._total_bytes_rx
@property
def bandwidth_stats(self):
response = {
"kbps_received": self.kbps_rx,
"kbps_sent": self.kbps_tx,
"total_bytes_sent": self.total_bytes_tx,
"total_bytes_received": self.total_bytes_rx,
"queries_received": self.queries_rx_per_second,
"queries_sent": self.queries_tx_per_second,
"recent_contacts": self.recent_contact_count,
}
return response
def sendRPC(self, contact, method, args, rawResponse=False):
""" Sends an RPC to the specified contact
@ -108,26 +193,31 @@ class KademliaProtocol(protocol.DatagramProtocol):
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
log.debug("DHT SEND: %s(%s)", method, args)
log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex'))
df = defer.Deferred()
if rawResponse:
df._rpcRawResponse = True
# Set the RPC timeout timer
timeoutCall = reactor.callLater(
constants.rpcTimeout, self._msgTimeout, msg.id) # IGNORE:E1101
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id)
# Transmit the data
self._send(encodedMsg, msg.id, (contact.address, contact.port))
self._sentMessages[msg.id] = (contact.id, df, timeoutCall)
return df
def startProtocol(self):
log.info("DHT listening on UDP %i", self._node.port)
if not self._bandwidth_stats_update_lc.running:
self._bandwidth_stats_update_lc.start(1)
def datagramReceived(self, datagram, address):
""" Handles and parses incoming RPC messages (and responses)
@note: This is automatically called by Twisted when the protocol
receives a UDP datagram
"""
if datagram[0] == '\x00' and datagram[25] == '\x00':
totalPackets = (ord(datagram[1]) << 8) | ord(datagram[2])
msgID = datagram[5:25]
@ -157,12 +247,22 @@ class KademliaProtocol(protocol.DatagramProtocol):
message = self._translator.fromPrimitive(msgPrimitive)
remoteContact = Contact(message.nodeID, address[0], address[1], self)
now = time.time()
contact_history = self._history_rx.get(address, [])
if len(contact_history) > 1000:
contact_history = [x for x in contact_history if now - x[1] < 1.0]
contact_history.append((len(datagram), time.time()))
self._history_rx[address] = contact_history
bytes_rx = self._bytes_rx.get(address, 0)
bytes_rx += len(datagram)
self._bytes_rx[address] = bytes_rx
# Refresh the remote node's details in the local node's k-buckets
self._node.addContact(remoteContact)
if isinstance(message, msgtypes.RequestMessage):
# This is an RPC method request
self._handleRPC(remoteContact, message.id, message.request, message.args)
elif isinstance(message, msgtypes.ResponseMessage):
# Find the message that triggered this response
if message.id in self._sentMessages:
@ -205,6 +305,17 @@ class KademliaProtocol(protocol.DatagramProtocol):
future, into something similar to a message translator/encoder
class (see C{kademlia.msgformat} and C{kademlia.encoding}).
"""
now = time.time()
contact_history = self._history_tx.get(address, [])
if len(contact_history) > 1000:
contact_history = [x for x in contact_history if now - x[1] < 1.0]
contact_history.append((len(data), time.time()))
self._history_tx[address] = contact_history
bytes_tx = self._bytes_tx.get(address, 0)
bytes_tx += len(data)
self._bytes_tx[address] = bytes_tx
if len(data) > self.msgSizeLimit:
# We have to spread the data over multiple UDP datagrams,
# and provide sequencing information
@ -283,7 +394,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
func = getattr(self._node, method, None)
if callable(func) and hasattr(func, 'rpcmethod'):
# Call the exposed Node method and return the result to the deferred callback chain
log.debug("DHT RECV CALL %s with args %s", method, args)
log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'),
senderContact.address, senderContact.port)
try:
kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact}
result = func(*args, **kwargs)
@ -340,6 +452,10 @@ class KademliaProtocol(protocol.DatagramProtocol):
Will only be called once, after all ports are disconnected.
"""
log.info('Stopping DHT')
if self._bandwidth_stats_update_lc.running:
self._bandwidth_stats_update_lc.stop()
for delayed_call in self._call_later_list.values():
try:
delayed_call.cancel()

View file

@ -990,13 +990,15 @@ class Daemon(AuthJSONRPCServer):
############################################################################
@defer.inlineCallbacks
def jsonrpc_status(self, session_status=False):
def jsonrpc_status(self, session_status=False, dht_status=False):
"""
Return daemon status
Args:
'session_status' (optional): (bool) true to return session status,
default is false
'dht_status' (optional): (bool) true to return dht status,
default is false
Returns:
(dict) Daemon status dictionary
"""
@ -1036,6 +1038,8 @@ class Daemon(AuthJSONRPCServer):
'managed_blobs': len(blobs),
'managed_streams': len(self.lbry_file_manager.lbry_files),
}
if dht_status:
response['dht_status'] = self.session.dht_node.get_bandwidth_stats()
defer.returnValue(response)
def jsonrpc_get_best_blockhash(self):