diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 97c08dc18..bb13f3416 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -993,7 +993,7 @@ class Daemon(AuthJSONRPCServer): ############################################################################ @defer.inlineCallbacks - def jsonrpc_status(self, session_status=False, dht_status=False): + def jsonrpc_status(self, session_status=False): """ Get daemon status @@ -1002,7 +1002,6 @@ class Daemon(AuthJSONRPCServer): Options: --session_status : (bool) include session status in results - --dht_status : (bool) include dht network and peer status Returns: (dict) lbrynet-daemon status @@ -1033,19 +1032,6 @@ class Daemon(AuthJSONRPCServer): 'announce_queue_size': number of blobs currently queued to be announced 'should_announce_blobs': number of blobs that should be announced } - - If given the dht status option: - 'dht_status': { - 'kbps_received': current kbps receiving, - 'kbps_sent': current kdps being sent, - 'total_bytes_sent': total bytes sent - 'total_bytes_received': total bytes received - 'queries_received': number of queries received per second - 'queries_sent': number of queries sent per second - 'recent_contacts': count of recently contacted peers - 'single_hash_announce_duration': avg. seconds it takes to announce a blob - 'unique_contacts': count of unique peers - }, } """ @@ -1092,10 +1078,6 @@ class Daemon(AuthJSONRPCServer): 'announce_queue_size': announce_queue_size, 'should_announce_blobs': should_announce_blobs, } - if dht_status: - response['dht_status'] = self.session.dht_node.get_bandwidth_stats() - response['dht_status'].update({'single_hash_announce_duration': - self.session.blob_manager.single_hash_announce_duration}) defer.returnValue(response) def jsonrpc_version(self): diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 8aedb6ebe..0e5c980ab 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -264,9 +264,6 @@ class Node(object): def get_most_popular_hashes(self, num_to_return): return self.hash_watcher.most_popular_hashes(num_to_return) - def get_bandwidth_stats(self): - return self._protocol.bandwidth_stats - @defer.inlineCallbacks def iterativeAnnounceHaveBlob(self, blob_hash, value): known_nodes = {} diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 7e018c68a..629bb070c 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -1,9 +1,8 @@ import logging -import time import socket import errno -from twisted.internet import protocol, defer, task +from twisted.internet import protocol, defer from lbrynet.core.call_later_manager import CallLaterManager import constants @@ -30,104 +29,6 @@ class KademliaProtocol(protocol.DatagramProtocol): self._partialMessages = {} self._partialMessagesProgress = {} self._delay = Delay(0, self._node.clock.seconds) - # keep track of outstanding writes so that they - # can be cancelled on shutdown - self._call_later_list = {} - - # keep track of bandwidth usage by peer - self._history_rx = {} - self._history_tx = {} - self._bytes_rx = {} - self._bytes_tx = {} - self._unique_contacts = [] - self._queries_rx_per_second = 0 - self._queries_tx_per_second = 0 - self._kbps_tx = 0 - self._kbps_rx = 0 - self._recent_contact_count = 0 - self._total_bytes_tx = 0 - self._total_bytes_rx = 0 - self._bandwidth_stats_update_lc = task.LoopingCall(self._update_bandwidth_stats) - self._bandwidth_stats_update_lc.clock = self._node.clock - - def _update_bandwidth_stats(self): - recent_rx_history = {} - now = time.time() - for address, history in self._history_rx.iteritems(): - recent_rx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0] - qps_rx = sum(len(v) for (k, v) in recent_rx_history.iteritems()) - bps_rx = sum(sum([x[0] for x in v]) for (k, v) in recent_rx_history.iteritems()) - kbps_rx = round(float(bps_rx) / 1024.0, 2) - - recent_tx_history = {} - now = time.time() - for address, history in self._history_tx.iteritems(): - recent_tx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0] - qps_tx = sum(len(v) for (k, v) in recent_tx_history.iteritems()) - bps_tx = sum(sum([x[0] for x in v]) for (k, v) in recent_tx_history.iteritems()) - kbps_tx = round(float(bps_tx) / 1024.0, 2) - - recent_contacts = [] - for k, v in recent_rx_history.iteritems(): - if v: - recent_contacts.append(k) - for k, v in recent_tx_history.iteritems(): - if v and k not in recent_contacts: - recent_contacts.append(k) - - self._queries_rx_per_second = qps_rx - self._queries_tx_per_second = qps_tx - self._kbps_tx = kbps_tx - self._kbps_rx = kbps_rx - self._recent_contact_count = len(recent_contacts) - self._total_bytes_tx = sum(v for (k, v) in self._bytes_tx.iteritems()) - self._total_bytes_rx = sum(v for (k, v) in self._bytes_rx.iteritems()) - - @property - def unique_contacts(self): - return self._unique_contacts - - @property - def queries_rx_per_second(self): - return self._queries_rx_per_second - - @property - def queries_tx_per_second(self): - return self._queries_tx_per_second - - @property - def kbps_tx(self): - return self._kbps_tx - - @property - def kbps_rx(self): - return self._kbps_rx - - @property - def recent_contact_count(self): - return self._recent_contact_count - - @property - def total_bytes_tx(self): - return self._total_bytes_tx - - @property - def total_bytes_rx(self): - return self._total_bytes_rx - - @property - def bandwidth_stats(self): - response = { - "kbps_received": self.kbps_rx, - "kbps_sent": self.kbps_tx, - "total_bytes_sent": self.total_bytes_tx, - "total_bytes_received": self.total_bytes_rx, - "queries_received": self.queries_rx_per_second, - "queries_sent": self.queries_tx_per_second, - "recent_contacts": self.recent_contact_count, - "unique_contacts": len(self.unique_contacts) - } - return response def sendRPC(self, contact, method, args, rawResponse=False): """ Sends an RPC to the specified contact @@ -179,8 +80,6 @@ class KademliaProtocol(protocol.DatagramProtocol): def startProtocol(self): log.info("DHT listening on UDP %i", self._node.port) - if not self._bandwidth_stats_update_lc.running: - self._bandwidth_stats_update_lc.start(1) def datagramReceived(self, datagram, address): """ Handles and parses incoming RPC messages (and responses) @@ -219,18 +118,6 @@ class KademliaProtocol(protocol.DatagramProtocol): remoteContact = Contact(message.nodeID, address[0], address[1], self) - now = time.time() - contact_history = self._history_rx.get(address, []) - if len(contact_history) > 1000: - contact_history = [x for x in contact_history if now - x[1] < 1.0] - contact_history.append((len(datagram), time.time())) - self._history_rx[address] = contact_history - bytes_rx = self._bytes_rx.get(address, 0) - bytes_rx += len(datagram) - self._bytes_rx[address] = bytes_rx - if address not in self.unique_contacts: - self._unique_contacts.append(address) - # Refresh the remote node's details in the local node's k-buckets self._node.addContact(remoteContact) if isinstance(message, msgtypes.RequestMessage): @@ -295,18 +182,6 @@ class KademliaProtocol(protocol.DatagramProtocol): class (see C{kademlia.msgformat} and C{kademlia.encoding}). """ - now = time.time() - contact_history = self._history_tx.get(address, []) - if len(contact_history) > 1000: - contact_history = [x for x in contact_history if now - x[1] < 1.0] - contact_history.append((len(data), time.time())) - self._history_tx[address] = contact_history - bytes_tx = self._bytes_tx.get(address, 0) - bytes_tx += len(data) - self._bytes_tx[address] = bytes_tx - if address not in self.unique_contacts: - self._unique_contacts.append(address) - if len(data) > self.msgSizeLimit: # We have to spread the data over multiple UDP datagrams, # and provide sequencing information @@ -457,10 +332,5 @@ class KademliaProtocol(protocol.DatagramProtocol): Will only be called once, after all ports are disconnected. """ log.info('Stopping DHT') - - if self._bandwidth_stats_update_lc.running: - self._bandwidth_stats_update_lc.stop() - CallLaterManager.stop() - log.info('DHT stopped')