remove bandwidth stats

This commit is contained in:
Jack Robison 2018-03-27 20:56:34 -04:00
parent a41bbd5e27
commit 5bab6f7d39
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 2 additions and 153 deletions

View file

@ -993,7 +993,7 @@ class Daemon(AuthJSONRPCServer):
############################################################################ ############################################################################
@defer.inlineCallbacks @defer.inlineCallbacks
def jsonrpc_status(self, session_status=False, dht_status=False): def jsonrpc_status(self, session_status=False):
""" """
Get daemon status Get daemon status
@ -1002,7 +1002,6 @@ class Daemon(AuthJSONRPCServer):
Options: Options:
--session_status : (bool) include session status in results --session_status : (bool) include session status in results
--dht_status : (bool) include dht network and peer status
Returns: Returns:
(dict) lbrynet-daemon status (dict) lbrynet-daemon status
@ -1033,19 +1032,6 @@ class Daemon(AuthJSONRPCServer):
'announce_queue_size': number of blobs currently queued to be announced 'announce_queue_size': number of blobs currently queued to be announced
'should_announce_blobs': number of blobs that should be announced 'should_announce_blobs': number of blobs that should be announced
} }
If given the dht status option:
'dht_status': {
'kbps_received': current kbps receiving,
'kbps_sent': current kdps being sent,
'total_bytes_sent': total bytes sent
'total_bytes_received': total bytes received
'queries_received': number of queries received per second
'queries_sent': number of queries sent per second
'recent_contacts': count of recently contacted peers
'single_hash_announce_duration': avg. seconds it takes to announce a blob
'unique_contacts': count of unique peers
},
} }
""" """
@ -1092,10 +1078,6 @@ class Daemon(AuthJSONRPCServer):
'announce_queue_size': announce_queue_size, 'announce_queue_size': announce_queue_size,
'should_announce_blobs': should_announce_blobs, 'should_announce_blobs': should_announce_blobs,
} }
if dht_status:
response['dht_status'] = self.session.dht_node.get_bandwidth_stats()
response['dht_status'].update({'single_hash_announce_duration':
self.session.blob_manager.single_hash_announce_duration})
defer.returnValue(response) defer.returnValue(response)
def jsonrpc_version(self): def jsonrpc_version(self):

View file

@ -264,9 +264,6 @@ class Node(object):
def get_most_popular_hashes(self, num_to_return): def get_most_popular_hashes(self, num_to_return):
return self.hash_watcher.most_popular_hashes(num_to_return) return self.hash_watcher.most_popular_hashes(num_to_return)
def get_bandwidth_stats(self):
return self._protocol.bandwidth_stats
@defer.inlineCallbacks @defer.inlineCallbacks
def iterativeAnnounceHaveBlob(self, blob_hash, value): def iterativeAnnounceHaveBlob(self, blob_hash, value):
known_nodes = {} known_nodes = {}

View file

@ -1,9 +1,8 @@
import logging import logging
import time
import socket import socket
import errno import errno
from twisted.internet import protocol, defer, task from twisted.internet import protocol, defer
from lbrynet.core.call_later_manager import CallLaterManager from lbrynet.core.call_later_manager import CallLaterManager
import constants import constants
@ -30,104 +29,6 @@ class KademliaProtocol(protocol.DatagramProtocol):
self._partialMessages = {} self._partialMessages = {}
self._partialMessagesProgress = {} self._partialMessagesProgress = {}
self._delay = Delay(0, self._node.clock.seconds) self._delay = Delay(0, self._node.clock.seconds)
# keep track of outstanding writes so that they
# can be cancelled on shutdown
self._call_later_list = {}
# keep track of bandwidth usage by peer
self._history_rx = {}
self._history_tx = {}
self._bytes_rx = {}
self._bytes_tx = {}
self._unique_contacts = []
self._queries_rx_per_second = 0
self._queries_tx_per_second = 0
self._kbps_tx = 0
self._kbps_rx = 0
self._recent_contact_count = 0
self._total_bytes_tx = 0
self._total_bytes_rx = 0
self._bandwidth_stats_update_lc = task.LoopingCall(self._update_bandwidth_stats)
self._bandwidth_stats_update_lc.clock = self._node.clock
def _update_bandwidth_stats(self):
recent_rx_history = {}
now = time.time()
for address, history in self._history_rx.iteritems():
recent_rx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0]
qps_rx = sum(len(v) for (k, v) in recent_rx_history.iteritems())
bps_rx = sum(sum([x[0] for x in v]) for (k, v) in recent_rx_history.iteritems())
kbps_rx = round(float(bps_rx) / 1024.0, 2)
recent_tx_history = {}
now = time.time()
for address, history in self._history_tx.iteritems():
recent_tx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0]
qps_tx = sum(len(v) for (k, v) in recent_tx_history.iteritems())
bps_tx = sum(sum([x[0] for x in v]) for (k, v) in recent_tx_history.iteritems())
kbps_tx = round(float(bps_tx) / 1024.0, 2)
recent_contacts = []
for k, v in recent_rx_history.iteritems():
if v:
recent_contacts.append(k)
for k, v in recent_tx_history.iteritems():
if v and k not in recent_contacts:
recent_contacts.append(k)
self._queries_rx_per_second = qps_rx
self._queries_tx_per_second = qps_tx
self._kbps_tx = kbps_tx
self._kbps_rx = kbps_rx
self._recent_contact_count = len(recent_contacts)
self._total_bytes_tx = sum(v for (k, v) in self._bytes_tx.iteritems())
self._total_bytes_rx = sum(v for (k, v) in self._bytes_rx.iteritems())
@property
def unique_contacts(self):
return self._unique_contacts
@property
def queries_rx_per_second(self):
return self._queries_rx_per_second
@property
def queries_tx_per_second(self):
return self._queries_tx_per_second
@property
def kbps_tx(self):
return self._kbps_tx
@property
def kbps_rx(self):
return self._kbps_rx
@property
def recent_contact_count(self):
return self._recent_contact_count
@property
def total_bytes_tx(self):
return self._total_bytes_tx
@property
def total_bytes_rx(self):
return self._total_bytes_rx
@property
def bandwidth_stats(self):
response = {
"kbps_received": self.kbps_rx,
"kbps_sent": self.kbps_tx,
"total_bytes_sent": self.total_bytes_tx,
"total_bytes_received": self.total_bytes_rx,
"queries_received": self.queries_rx_per_second,
"queries_sent": self.queries_tx_per_second,
"recent_contacts": self.recent_contact_count,
"unique_contacts": len(self.unique_contacts)
}
return response
def sendRPC(self, contact, method, args, rawResponse=False): def sendRPC(self, contact, method, args, rawResponse=False):
""" Sends an RPC to the specified contact """ Sends an RPC to the specified contact
@ -179,8 +80,6 @@ class KademliaProtocol(protocol.DatagramProtocol):
def startProtocol(self): def startProtocol(self):
log.info("DHT listening on UDP %i", self._node.port) log.info("DHT listening on UDP %i", self._node.port)
if not self._bandwidth_stats_update_lc.running:
self._bandwidth_stats_update_lc.start(1)
def datagramReceived(self, datagram, address): def datagramReceived(self, datagram, address):
""" Handles and parses incoming RPC messages (and responses) """ Handles and parses incoming RPC messages (and responses)
@ -219,18 +118,6 @@ class KademliaProtocol(protocol.DatagramProtocol):
remoteContact = Contact(message.nodeID, address[0], address[1], self) remoteContact = Contact(message.nodeID, address[0], address[1], self)
now = time.time()
contact_history = self._history_rx.get(address, [])
if len(contact_history) > 1000:
contact_history = [x for x in contact_history if now - x[1] < 1.0]
contact_history.append((len(datagram), time.time()))
self._history_rx[address] = contact_history
bytes_rx = self._bytes_rx.get(address, 0)
bytes_rx += len(datagram)
self._bytes_rx[address] = bytes_rx
if address not in self.unique_contacts:
self._unique_contacts.append(address)
# Refresh the remote node's details in the local node's k-buckets # Refresh the remote node's details in the local node's k-buckets
self._node.addContact(remoteContact) self._node.addContact(remoteContact)
if isinstance(message, msgtypes.RequestMessage): if isinstance(message, msgtypes.RequestMessage):
@ -295,18 +182,6 @@ class KademliaProtocol(protocol.DatagramProtocol):
class (see C{kademlia.msgformat} and C{kademlia.encoding}). class (see C{kademlia.msgformat} and C{kademlia.encoding}).
""" """
now = time.time()
contact_history = self._history_tx.get(address, [])
if len(contact_history) > 1000:
contact_history = [x for x in contact_history if now - x[1] < 1.0]
contact_history.append((len(data), time.time()))
self._history_tx[address] = contact_history
bytes_tx = self._bytes_tx.get(address, 0)
bytes_tx += len(data)
self._bytes_tx[address] = bytes_tx
if address not in self.unique_contacts:
self._unique_contacts.append(address)
if len(data) > self.msgSizeLimit: if len(data) > self.msgSizeLimit:
# We have to spread the data over multiple UDP datagrams, # We have to spread the data over multiple UDP datagrams,
# and provide sequencing information # and provide sequencing information
@ -457,10 +332,5 @@ class KademliaProtocol(protocol.DatagramProtocol):
Will only be called once, after all ports are disconnected. Will only be called once, after all ports are disconnected.
""" """
log.info('Stopping DHT') log.info('Stopping DHT')
if self._bandwidth_stats_update_lc.running:
self._bandwidth_stats_update_lc.stop()
CallLaterManager.stop() CallLaterManager.stop()
log.info('DHT stopped') log.info('DHT stopped')