From 02475ff98580427bdf99248d49b0f0aa72a29c14 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 25 May 2017 14:01:39 -0400 Subject: [PATCH] add dht bandwidth tracking --- CHANGELOG.md | 1 + lbrynet/dht/hashwatcher.py | 3 +- lbrynet/dht/node.py | 60 +++++++------ lbrynet/dht/protocol.py | 142 ++++++++++++++++++++++++++++--- lbrynet/lbrynet_daemon/Daemon.py | 6 +- 5 files changed, 171 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d46803c2f..361eb1003 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ at anytime. ## [Unreleased] ### Added * Prevent publish of files with size 0 + * Add `dht_status` parameter to `status` to include bandwidth and peer info * ### Changed diff --git a/lbrynet/dht/hashwatcher.py b/lbrynet/dht/hashwatcher.py index ea1651cbc..3f9699de2 100644 --- a/lbrynet/dht/hashwatcher.py +++ b/lbrynet/dht/hashwatcher.py @@ -20,7 +20,8 @@ class HashWatcher(object): self.next_tick.cancel() self.next_tick = None - def add_requested_hash(self, hashsum, from_ip): + def add_requested_hash(self, hashsum, contact): + from_ip = contact.compact_ip matching_hashes = [h for h in self.hashes if h[0] == hashsum and h[2] == from_ip] if len(matching_hashes) == 0: self.hashes.append((hashsum, datetime.datetime.now(), from_ip)) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 25e209db4..858b03a88 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -12,15 +12,12 @@ import operator import struct import time -from twisted.internet import defer, error +from twisted.internet import defer, error, reactor, threads import constants import routingtable import datastore import protocol -import twisted.internet.reactor -import twisted.internet.threads -import twisted.python.log from contact import Contact from hashwatcher import HashWatcher @@ -138,6 +135,7 @@ class Node(object): self._listeningPort.stopListening() self.hash_watcher.stop() + @defer.inlineCallbacks def joinNetwork(self, knownNodeAddresses=None): """ Causes the Node to join the Kademlia network; normally, this should be called before any other DHT operations. @@ -151,7 +149,7 @@ class Node(object): # Prepare the underlying Kademlia protocol if self.port is not None: try: - self._listeningPort = twisted.internet.reactor.listenUDP(self.port, self._protocol) + self._listeningPort = reactor.listenUDP(self.port, self._protocol) except error.CannotListenError as e: import traceback log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) @@ -169,10 +167,11 @@ class Node(object): self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts) # #TODO: Refresh all k-buckets further away than this node's closest neighbour # Start refreshing k-buckets periodically, if necessary - self.next_refresh_call = twisted.internet.reactor.callLater( - constants.checkRefreshInterval, self._refreshNode) # IGNORE:E1101 + self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, + self._refreshNode) + self.hash_watcher.tick() - return self._joinDeferred + yield self._joinDeferred def printContacts(self, *args): print '\n\nNODE CONTACTS\n===============' @@ -204,7 +203,6 @@ class Node(object): return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid}) def getPeersForBlob(self, blob_hash): - def expand_and_filter(result): expanded_peers = [] if isinstance(result, dict): @@ -230,8 +228,10 @@ class Node(object): def get_most_popular_hashes(self, num_to_return): return self.hash_watcher.most_popular_hashes(num_to_return) - def iterativeAnnounceHaveBlob(self, blob_hash, value): + def get_bandwidth_stats(self): + return self._protocol.bandwidth_stats + def iterativeAnnounceHaveBlob(self, blob_hash, value): known_nodes = {} def log_error(err, n): @@ -295,8 +295,8 @@ class Node(object): def change_token(self): self.old_token_secret = self.token_secret self.token_secret = self._generateID() - self.next_change_token_call = twisted.internet.reactor.callLater( - constants.tokenSecretChangeInterval, self.change_token) + self.next_change_token_call = reactor.callLater(constants.tokenSecretChangeInterval, + self.change_token) def make_token(self, compact_ip): h = hashlib.new('sha384') @@ -329,6 +329,7 @@ class Node(object): """ return self._iterativeFind(key) + @defer.inlineCallbacks def iterativeFindValue(self, key): """ The Kademlia search operation (deterministic) @@ -372,9 +373,10 @@ class Node(object): outerDf.callback(result) # Execute the search - df = self._iterativeFind(key, rpc='findValue') - df.addCallback(checkResult) - return outerDf + iterative_find_result = yield self._iterativeFind(key, rpc='findValue') + checkResult(iterative_find_result) + result = yield outerDf + defer.returnValue(result) def addContact(self, contact): """ Add/update the given contact; simple wrapper for the same method @@ -493,8 +495,8 @@ class Node(object): now = int(time.time()) originallyPublished = now # - age - self._dataStore.addPeerToBlob( - key, compact_address, now, originallyPublished, originalPublisherID) + self._dataStore.addPeerToBlob(key, compact_address, now, originallyPublished, + originalPublisherID) return 'OK' @rpcmethod @@ -534,6 +536,7 @@ class Node(object): or a list of contact triples closest to the requested key. @rtype: dict or list """ + if self._dataStore.hasPeersForBlob(key): rval = {key: self._dataStore.getPeersForBlob(key)} else: @@ -543,7 +546,7 @@ class Node(object): contact = kwargs['_rpcNodeContact'] compact_ip = contact.compact_ip() rval['token'] = self.make_token(compact_ip) - self.hash_watcher.add_requested_hash(key, compact_ip) + self.hash_watcher.add_requested_hash(key, contact) return rval def _generateID(self): @@ -554,6 +557,7 @@ class Node(object): """ return generate_id() + @defer.inlineCallbacks def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'): """ The basic Kademlia iterative lookup operation (for nodes/values) @@ -593,7 +597,8 @@ class Node(object): # This node doesn't know of any other nodes fakeDf = defer.Deferred() fakeDf.callback([]) - return fakeDf + result = yield fakeDf + defer.returnValue(result) else: # This is used during the bootstrap process; node ID's are most probably fake shortlist = startupShortlist @@ -603,11 +608,13 @@ class Node(object): helper = _IterativeFindHelper(self, outerDf, shortlist, key, findValue, rpc) # Start the iterations helper.searchIteration() - return outerDf + result = yield outerDf + defer.returnValue(result) def _refreshNode(self): """ Periodically called to perform k-bucket refreshes and data replication/republishing as necessary """ + df = self._refreshRoutingTable() df.addCallback(self._removeExpiredPeers) df.addCallback(self._scheduleNextNodeRefresh) @@ -630,12 +637,12 @@ class Node(object): return outerDf def _scheduleNextNodeRefresh(self, *args): - self.next_refresh_call = twisted.internet.reactor.callLater( - constants.checkRefreshInterval, self._refreshNode) + self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, + self._refreshNode) # args put here because _refreshRoutingTable does outerDF.callback(None) def _removeExpiredPeers(self, *args): - df = twisted.internet.threads.deferToThread(self._dataStore.removeExpiredPeers) + df = threads.deferToThread(self._dataStore.removeExpiredPeers) return df @@ -689,6 +696,7 @@ class _IterativeFindHelper(object): # This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice if responseMsg.nodeID not in self.already_contacted: self.already_contacted.append(responseMsg.nodeID) + # Now grow extend the (unverified) shortlist with the returned contacts result = responseMsg.response # TODO: some validation on the result (for guarding against attacks) @@ -785,6 +793,7 @@ class _IterativeFindHelper(object): # noted self.outer_d.callback(self.active_contacts) return + # The search continues... if len(self.active_contacts): self.prev_closest_node[0] = self.active_contacts[0] @@ -801,8 +810,7 @@ class _IterativeFindHelper(object): if self._should_lookup_active_calls(): # Schedule the next iteration if there are any active # calls (Kademlia uses loose parallelism) - call = twisted.internet.reactor.callLater( - constants.iterativeLookupDelay, self.searchIteration) # IGNORE:E1101 + call = reactor.callLater(constants.iterativeLookupDelay, self.searchIteration) self.pending_iteration_calls.append(call) # Check for a quick contact response that made an update to the shortList elif prevShortlistLength < len(self.shortlist): @@ -819,7 +827,7 @@ class _IterativeFindHelper(object): df.addCallback(self.extendShortlist) df.addErrback(self.removeFromShortlist) df.addCallback(self.cancelActiveProbe) - df.addErrback(log.fail(), 'Failed to contact %s', contact) + df.addErrback(lambda _: log.exception('Failed to contact %s', contact)) self.already_contacted.append(contact.id) def _should_lookup_active_calls(self): diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index f7b9f1304..6cf24733f 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -13,10 +13,8 @@ import time import socket import errno -from twisted.internet import protocol, defer +from twisted.internet import protocol, defer, error, reactor, task from twisted.python import failure -from twisted.internet import error -import twisted.internet.reactor import constants import encoding @@ -24,7 +22,6 @@ import msgtypes import msgformat from contact import Contact -reactor = twisted.internet.reactor log = logging.getLogger(__name__) @@ -62,13 +59,13 @@ class Delay(object): class KademliaProtocol(protocol.DatagramProtocol): """ Implements all low-level network-related functions of a Kademlia node """ + msgSizeLimit = constants.udpDatagramMaxSize - 26 - def __init__(self, node, msgEncoder=encoding.Bencode(), - msgTranslator=msgformat.DefaultFormat()): + def __init__(self, node): self._node = node - self._encoder = msgEncoder - self._translator = msgTranslator + self._encoder = encoding.Bencode() + self._translator = msgformat.DefaultFormat() self._sentMessages = {} self._partialMessages = {} self._partialMessagesProgress = {} @@ -77,6 +74,94 @@ class KademliaProtocol(protocol.DatagramProtocol): # can be cancelled on shutdown self._call_later_list = {} + # keep track of bandwidth usage by peer + self._history_rx = {} + self._history_tx = {} + self._bytes_rx = {} + self._bytes_tx = {} + self._queries_rx_per_second = 0 + self._queries_tx_per_second = 0 + self._kbps_tx = 0 + self._kbps_rx = 0 + self._recent_contact_count = 0 + self._total_bytes_tx = 0 + self._total_bytes_rx = 0 + self._bandwidth_stats_update_lc = task.LoopingCall(self._update_bandwidth_stats) + + def _update_bandwidth_stats(self): + recent_rx_history = {} + now = time.time() + for address, history in self._history_rx.iteritems(): + recent_rx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0] + qps_rx = sum(len(v) for (k, v) in recent_rx_history.iteritems()) + bps_rx = sum(sum([x[0] for x in v]) for (k, v) in recent_rx_history.iteritems()) + kbps_rx = round(float(bps_rx) / 1024.0, 2) + + recent_tx_history = {} + now = time.time() + for address, history in self._history_tx.iteritems(): + recent_tx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0] + qps_tx = sum(len(v) for (k, v) in recent_tx_history.iteritems()) + bps_tx = sum(sum([x[0] for x in v]) for (k, v) in recent_tx_history.iteritems()) + kbps_tx = round(float(bps_tx) / 1024.0, 2) + + recent_contacts = [] + for k, v in recent_rx_history.iteritems(): + if v: + recent_contacts.append(k) + for k, v in recent_tx_history.iteritems(): + if v and k not in recent_contacts: + recent_contacts.append(k) + + self._queries_rx_per_second = qps_rx + self._queries_tx_per_second = qps_tx + self._kbps_tx = kbps_tx + self._kbps_rx = kbps_rx + self._recent_contact_count = len(recent_contacts) + self._total_bytes_tx = sum(v for (k, v) in self._bytes_tx.iteritems()) + self._total_bytes_rx = sum(v for (k, v) in self._bytes_rx.iteritems()) + + @property + def queries_rx_per_second(self): + return self._queries_rx_per_second + + @property + def queries_tx_per_second(self): + return self._queries_tx_per_second + + @property + def kbps_tx(self): + return self._kbps_tx + + @property + def kbps_rx(self): + return self._kbps_rx + + @property + def recent_contact_count(self): + return self._recent_contact_count + + @property + def total_bytes_tx(self): + return self._total_bytes_tx + + @property + def total_bytes_rx(self): + return self._total_bytes_rx + + @property + def bandwidth_stats(self): + response = { + "kbps_received": self.kbps_rx, + "kbps_sent": self.kbps_tx, + "total_bytes_sent": self.total_bytes_tx, + "total_bytes_received": self.total_bytes_rx, + "queries_received": self.queries_rx_per_second, + "queries_sent": self.queries_tx_per_second, + "recent_contacts": self.recent_contact_count, + } + return response + def sendRPC(self, contact, method, args, rawResponse=False): """ Sends an RPC to the specified contact @@ -108,26 +193,31 @@ class KademliaProtocol(protocol.DatagramProtocol): msgPrimitive = self._translator.toPrimitive(msg) encodedMsg = self._encoder.encode(msgPrimitive) - log.debug("DHT SEND: %s(%s)", method, args) + log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex')) df = defer.Deferred() if rawResponse: df._rpcRawResponse = True # Set the RPC timeout timer - timeoutCall = reactor.callLater( - constants.rpcTimeout, self._msgTimeout, msg.id) # IGNORE:E1101 + timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id) # Transmit the data self._send(encodedMsg, msg.id, (contact.address, contact.port)) self._sentMessages[msg.id] = (contact.id, df, timeoutCall) return df + def startProtocol(self): + log.info("DHT listening on UDP %i", self._node.port) + if not self._bandwidth_stats_update_lc.running: + self._bandwidth_stats_update_lc.start(1) + def datagramReceived(self, datagram, address): """ Handles and parses incoming RPC messages (and responses) @note: This is automatically called by Twisted when the protocol receives a UDP datagram """ + if datagram[0] == '\x00' and datagram[25] == '\x00': totalPackets = (ord(datagram[1]) << 8) | ord(datagram[2]) msgID = datagram[5:25] @@ -157,12 +247,22 @@ class KademliaProtocol(protocol.DatagramProtocol): message = self._translator.fromPrimitive(msgPrimitive) remoteContact = Contact(message.nodeID, address[0], address[1], self) + now = time.time() + contact_history = self._history_rx.get(address, []) + if len(contact_history) > 1000: + contact_history = [x for x in contact_history if now - x[1] < 1.0] + contact_history.append((len(datagram), time.time())) + self._history_rx[address] = contact_history + bytes_rx = self._bytes_rx.get(address, 0) + bytes_rx += len(datagram) + self._bytes_rx[address] = bytes_rx + # Refresh the remote node's details in the local node's k-buckets self._node.addContact(remoteContact) - if isinstance(message, msgtypes.RequestMessage): # This is an RPC method request self._handleRPC(remoteContact, message.id, message.request, message.args) + elif isinstance(message, msgtypes.ResponseMessage): # Find the message that triggered this response if message.id in self._sentMessages: @@ -205,6 +305,17 @@ class KademliaProtocol(protocol.DatagramProtocol): future, into something similar to a message translator/encoder class (see C{kademlia.msgformat} and C{kademlia.encoding}). """ + + now = time.time() + contact_history = self._history_tx.get(address, []) + if len(contact_history) > 1000: + contact_history = [x for x in contact_history if now - x[1] < 1.0] + contact_history.append((len(data), time.time())) + self._history_tx[address] = contact_history + bytes_tx = self._bytes_tx.get(address, 0) + bytes_tx += len(data) + self._bytes_tx[address] = bytes_tx + if len(data) > self.msgSizeLimit: # We have to spread the data over multiple UDP datagrams, # and provide sequencing information @@ -283,7 +394,8 @@ class KademliaProtocol(protocol.DatagramProtocol): func = getattr(self._node, method, None) if callable(func) and hasattr(func, 'rpcmethod'): # Call the exposed Node method and return the result to the deferred callback chain - log.debug("DHT RECV CALL %s with args %s", method, args) + log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'), + senderContact.address, senderContact.port) try: kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact} result = func(*args, **kwargs) @@ -340,6 +452,10 @@ class KademliaProtocol(protocol.DatagramProtocol): Will only be called once, after all ports are disconnected. """ log.info('Stopping DHT') + + if self._bandwidth_stats_update_lc.running: + self._bandwidth_stats_update_lc.stop() + for delayed_call in self._call_later_list.values(): try: delayed_call.cancel() diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 6daf95981..dae108213 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -990,13 +990,15 @@ class Daemon(AuthJSONRPCServer): ############################################################################ @defer.inlineCallbacks - def jsonrpc_status(self, session_status=False): + def jsonrpc_status(self, session_status=False, dht_status=False): """ Return daemon status Args: 'session_status' (optional): (bool) true to return session status, default is false + 'dht_status' (optional): (bool) true to return dht status, + default is false Returns: (dict) Daemon status dictionary """ @@ -1036,6 +1038,8 @@ class Daemon(AuthJSONRPCServer): 'managed_blobs': len(blobs), 'managed_streams': len(self.lbry_file_manager.lbry_files), } + if dht_status: + response['dht_status'] = self.session.dht_node.get_bandwidth_stats() defer.returnValue(response) def jsonrpc_get_best_blockhash(self):