forked from LBRYCommunity/lbry-sdk
add dht bandwidth tracking
This commit is contained in:
parent
73a5828865
commit
02475ff985
5 changed files with 171 additions and 41 deletions
|
@ -10,6 +10,7 @@ at anytime.
|
|||
## [Unreleased]
|
||||
### Added
|
||||
* Prevent publish of files with size 0
|
||||
* Add `dht_status` parameter to `status` to include bandwidth and peer info
|
||||
*
|
||||
|
||||
### Changed
|
||||
|
|
|
@ -20,7 +20,8 @@ class HashWatcher(object):
|
|||
self.next_tick.cancel()
|
||||
self.next_tick = None
|
||||
|
||||
def add_requested_hash(self, hashsum, from_ip):
|
||||
def add_requested_hash(self, hashsum, contact):
|
||||
from_ip = contact.compact_ip
|
||||
matching_hashes = [h for h in self.hashes if h[0] == hashsum and h[2] == from_ip]
|
||||
if len(matching_hashes) == 0:
|
||||
self.hashes.append((hashsum, datetime.datetime.now(), from_ip))
|
||||
|
|
|
@ -12,15 +12,12 @@ import operator
|
|||
import struct
|
||||
import time
|
||||
|
||||
from twisted.internet import defer, error
|
||||
from twisted.internet import defer, error, reactor, threads
|
||||
|
||||
import constants
|
||||
import routingtable
|
||||
import datastore
|
||||
import protocol
|
||||
import twisted.internet.reactor
|
||||
import twisted.internet.threads
|
||||
import twisted.python.log
|
||||
|
||||
from contact import Contact
|
||||
from hashwatcher import HashWatcher
|
||||
|
@ -138,6 +135,7 @@ class Node(object):
|
|||
self._listeningPort.stopListening()
|
||||
self.hash_watcher.stop()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def joinNetwork(self, knownNodeAddresses=None):
|
||||
""" Causes the Node to join the Kademlia network; normally, this
|
||||
should be called before any other DHT operations.
|
||||
|
@ -151,7 +149,7 @@ class Node(object):
|
|||
# Prepare the underlying Kademlia protocol
|
||||
if self.port is not None:
|
||||
try:
|
||||
self._listeningPort = twisted.internet.reactor.listenUDP(self.port, self._protocol)
|
||||
self._listeningPort = reactor.listenUDP(self.port, self._protocol)
|
||||
except error.CannotListenError as e:
|
||||
import traceback
|
||||
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
|
||||
|
@ -169,10 +167,11 @@ class Node(object):
|
|||
self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts)
|
||||
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
|
||||
# Start refreshing k-buckets periodically, if necessary
|
||||
self.next_refresh_call = twisted.internet.reactor.callLater(
|
||||
constants.checkRefreshInterval, self._refreshNode) # IGNORE:E1101
|
||||
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
|
||||
self._refreshNode)
|
||||
|
||||
self.hash_watcher.tick()
|
||||
return self._joinDeferred
|
||||
yield self._joinDeferred
|
||||
|
||||
def printContacts(self, *args):
|
||||
print '\n\nNODE CONTACTS\n==============='
|
||||
|
@ -204,7 +203,6 @@ class Node(object):
|
|||
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid})
|
||||
|
||||
def getPeersForBlob(self, blob_hash):
|
||||
|
||||
def expand_and_filter(result):
|
||||
expanded_peers = []
|
||||
if isinstance(result, dict):
|
||||
|
@ -230,8 +228,10 @@ class Node(object):
|
|||
def get_most_popular_hashes(self, num_to_return):
|
||||
return self.hash_watcher.most_popular_hashes(num_to_return)
|
||||
|
||||
def iterativeAnnounceHaveBlob(self, blob_hash, value):
|
||||
def get_bandwidth_stats(self):
|
||||
return self._protocol.bandwidth_stats
|
||||
|
||||
def iterativeAnnounceHaveBlob(self, blob_hash, value):
|
||||
known_nodes = {}
|
||||
|
||||
def log_error(err, n):
|
||||
|
@ -295,8 +295,8 @@ class Node(object):
|
|||
def change_token(self):
|
||||
self.old_token_secret = self.token_secret
|
||||
self.token_secret = self._generateID()
|
||||
self.next_change_token_call = twisted.internet.reactor.callLater(
|
||||
constants.tokenSecretChangeInterval, self.change_token)
|
||||
self.next_change_token_call = reactor.callLater(constants.tokenSecretChangeInterval,
|
||||
self.change_token)
|
||||
|
||||
def make_token(self, compact_ip):
|
||||
h = hashlib.new('sha384')
|
||||
|
@ -329,6 +329,7 @@ class Node(object):
|
|||
"""
|
||||
return self._iterativeFind(key)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def iterativeFindValue(self, key):
|
||||
""" The Kademlia search operation (deterministic)
|
||||
|
||||
|
@ -372,9 +373,10 @@ class Node(object):
|
|||
outerDf.callback(result)
|
||||
|
||||
# Execute the search
|
||||
df = self._iterativeFind(key, rpc='findValue')
|
||||
df.addCallback(checkResult)
|
||||
return outerDf
|
||||
iterative_find_result = yield self._iterativeFind(key, rpc='findValue')
|
||||
checkResult(iterative_find_result)
|
||||
result = yield outerDf
|
||||
defer.returnValue(result)
|
||||
|
||||
def addContact(self, contact):
|
||||
""" Add/update the given contact; simple wrapper for the same method
|
||||
|
@ -493,8 +495,8 @@ class Node(object):
|
|||
|
||||
now = int(time.time())
|
||||
originallyPublished = now # - age
|
||||
self._dataStore.addPeerToBlob(
|
||||
key, compact_address, now, originallyPublished, originalPublisherID)
|
||||
self._dataStore.addPeerToBlob(key, compact_address, now, originallyPublished,
|
||||
originalPublisherID)
|
||||
return 'OK'
|
||||
|
||||
@rpcmethod
|
||||
|
@ -534,6 +536,7 @@ class Node(object):
|
|||
or a list of contact triples closest to the requested key.
|
||||
@rtype: dict or list
|
||||
"""
|
||||
|
||||
if self._dataStore.hasPeersForBlob(key):
|
||||
rval = {key: self._dataStore.getPeersForBlob(key)}
|
||||
else:
|
||||
|
@ -543,7 +546,7 @@ class Node(object):
|
|||
contact = kwargs['_rpcNodeContact']
|
||||
compact_ip = contact.compact_ip()
|
||||
rval['token'] = self.make_token(compact_ip)
|
||||
self.hash_watcher.add_requested_hash(key, compact_ip)
|
||||
self.hash_watcher.add_requested_hash(key, contact)
|
||||
return rval
|
||||
|
||||
def _generateID(self):
|
||||
|
@ -554,6 +557,7 @@ class Node(object):
|
|||
"""
|
||||
return generate_id()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'):
|
||||
""" The basic Kademlia iterative lookup operation (for nodes/values)
|
||||
|
||||
|
@ -593,7 +597,8 @@ class Node(object):
|
|||
# This node doesn't know of any other nodes
|
||||
fakeDf = defer.Deferred()
|
||||
fakeDf.callback([])
|
||||
return fakeDf
|
||||
result = yield fakeDf
|
||||
defer.returnValue(result)
|
||||
else:
|
||||
# This is used during the bootstrap process; node ID's are most probably fake
|
||||
shortlist = startupShortlist
|
||||
|
@ -603,11 +608,13 @@ class Node(object):
|
|||
helper = _IterativeFindHelper(self, outerDf, shortlist, key, findValue, rpc)
|
||||
# Start the iterations
|
||||
helper.searchIteration()
|
||||
return outerDf
|
||||
result = yield outerDf
|
||||
defer.returnValue(result)
|
||||
|
||||
def _refreshNode(self):
|
||||
""" Periodically called to perform k-bucket refreshes and data
|
||||
replication/republishing as necessary """
|
||||
|
||||
df = self._refreshRoutingTable()
|
||||
df.addCallback(self._removeExpiredPeers)
|
||||
df.addCallback(self._scheduleNextNodeRefresh)
|
||||
|
@ -630,12 +637,12 @@ class Node(object):
|
|||
return outerDf
|
||||
|
||||
def _scheduleNextNodeRefresh(self, *args):
|
||||
self.next_refresh_call = twisted.internet.reactor.callLater(
|
||||
constants.checkRefreshInterval, self._refreshNode)
|
||||
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
|
||||
self._refreshNode)
|
||||
|
||||
# args put here because _refreshRoutingTable does outerDF.callback(None)
|
||||
def _removeExpiredPeers(self, *args):
|
||||
df = twisted.internet.threads.deferToThread(self._dataStore.removeExpiredPeers)
|
||||
df = threads.deferToThread(self._dataStore.removeExpiredPeers)
|
||||
return df
|
||||
|
||||
|
||||
|
@ -689,6 +696,7 @@ class _IterativeFindHelper(object):
|
|||
# This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice
|
||||
if responseMsg.nodeID not in self.already_contacted:
|
||||
self.already_contacted.append(responseMsg.nodeID)
|
||||
|
||||
# Now grow extend the (unverified) shortlist with the returned contacts
|
||||
result = responseMsg.response
|
||||
# TODO: some validation on the result (for guarding against attacks)
|
||||
|
@ -785,6 +793,7 @@ class _IterativeFindHelper(object):
|
|||
# noted
|
||||
self.outer_d.callback(self.active_contacts)
|
||||
return
|
||||
|
||||
# The search continues...
|
||||
if len(self.active_contacts):
|
||||
self.prev_closest_node[0] = self.active_contacts[0]
|
||||
|
@ -801,8 +810,7 @@ class _IterativeFindHelper(object):
|
|||
if self._should_lookup_active_calls():
|
||||
# Schedule the next iteration if there are any active
|
||||
# calls (Kademlia uses loose parallelism)
|
||||
call = twisted.internet.reactor.callLater(
|
||||
constants.iterativeLookupDelay, self.searchIteration) # IGNORE:E1101
|
||||
call = reactor.callLater(constants.iterativeLookupDelay, self.searchIteration)
|
||||
self.pending_iteration_calls.append(call)
|
||||
# Check for a quick contact response that made an update to the shortList
|
||||
elif prevShortlistLength < len(self.shortlist):
|
||||
|
@ -819,7 +827,7 @@ class _IterativeFindHelper(object):
|
|||
df.addCallback(self.extendShortlist)
|
||||
df.addErrback(self.removeFromShortlist)
|
||||
df.addCallback(self.cancelActiveProbe)
|
||||
df.addErrback(log.fail(), 'Failed to contact %s', contact)
|
||||
df.addErrback(lambda _: log.exception('Failed to contact %s', contact))
|
||||
self.already_contacted.append(contact.id)
|
||||
|
||||
def _should_lookup_active_calls(self):
|
||||
|
|
|
@ -13,10 +13,8 @@ import time
|
|||
import socket
|
||||
import errno
|
||||
|
||||
from twisted.internet import protocol, defer
|
||||
from twisted.internet import protocol, defer, error, reactor, task
|
||||
from twisted.python import failure
|
||||
from twisted.internet import error
|
||||
import twisted.internet.reactor
|
||||
|
||||
import constants
|
||||
import encoding
|
||||
|
@ -24,7 +22,6 @@ import msgtypes
|
|||
import msgformat
|
||||
from contact import Contact
|
||||
|
||||
reactor = twisted.internet.reactor
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -62,13 +59,13 @@ class Delay(object):
|
|||
|
||||
class KademliaProtocol(protocol.DatagramProtocol):
|
||||
""" Implements all low-level network-related functions of a Kademlia node """
|
||||
|
||||
msgSizeLimit = constants.udpDatagramMaxSize - 26
|
||||
|
||||
def __init__(self, node, msgEncoder=encoding.Bencode(),
|
||||
msgTranslator=msgformat.DefaultFormat()):
|
||||
def __init__(self, node):
|
||||
self._node = node
|
||||
self._encoder = msgEncoder
|
||||
self._translator = msgTranslator
|
||||
self._encoder = encoding.Bencode()
|
||||
self._translator = msgformat.DefaultFormat()
|
||||
self._sentMessages = {}
|
||||
self._partialMessages = {}
|
||||
self._partialMessagesProgress = {}
|
||||
|
@ -77,6 +74,94 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
# can be cancelled on shutdown
|
||||
self._call_later_list = {}
|
||||
|
||||
# keep track of bandwidth usage by peer
|
||||
self._history_rx = {}
|
||||
self._history_tx = {}
|
||||
self._bytes_rx = {}
|
||||
self._bytes_tx = {}
|
||||
self._queries_rx_per_second = 0
|
||||
self._queries_tx_per_second = 0
|
||||
self._kbps_tx = 0
|
||||
self._kbps_rx = 0
|
||||
self._recent_contact_count = 0
|
||||
self._total_bytes_tx = 0
|
||||
self._total_bytes_rx = 0
|
||||
self._bandwidth_stats_update_lc = task.LoopingCall(self._update_bandwidth_stats)
|
||||
|
||||
def _update_bandwidth_stats(self):
|
||||
recent_rx_history = {}
|
||||
now = time.time()
|
||||
for address, history in self._history_rx.iteritems():
|
||||
recent_rx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0]
|
||||
qps_rx = sum(len(v) for (k, v) in recent_rx_history.iteritems())
|
||||
bps_rx = sum(sum([x[0] for x in v]) for (k, v) in recent_rx_history.iteritems())
|
||||
kbps_rx = round(float(bps_rx) / 1024.0, 2)
|
||||
|
||||
recent_tx_history = {}
|
||||
now = time.time()
|
||||
for address, history in self._history_tx.iteritems():
|
||||
recent_tx_history[address] = [(s, t) for (s, t) in history if now - t < 1.0]
|
||||
qps_tx = sum(len(v) for (k, v) in recent_tx_history.iteritems())
|
||||
bps_tx = sum(sum([x[0] for x in v]) for (k, v) in recent_tx_history.iteritems())
|
||||
kbps_tx = round(float(bps_tx) / 1024.0, 2)
|
||||
|
||||
recent_contacts = []
|
||||
for k, v in recent_rx_history.iteritems():
|
||||
if v:
|
||||
recent_contacts.append(k)
|
||||
for k, v in recent_tx_history.iteritems():
|
||||
if v and k not in recent_contacts:
|
||||
recent_contacts.append(k)
|
||||
|
||||
self._queries_rx_per_second = qps_rx
|
||||
self._queries_tx_per_second = qps_tx
|
||||
self._kbps_tx = kbps_tx
|
||||
self._kbps_rx = kbps_rx
|
||||
self._recent_contact_count = len(recent_contacts)
|
||||
self._total_bytes_tx = sum(v for (k, v) in self._bytes_tx.iteritems())
|
||||
self._total_bytes_rx = sum(v for (k, v) in self._bytes_rx.iteritems())
|
||||
|
||||
@property
|
||||
def queries_rx_per_second(self):
|
||||
return self._queries_rx_per_second
|
||||
|
||||
@property
|
||||
def queries_tx_per_second(self):
|
||||
return self._queries_tx_per_second
|
||||
|
||||
@property
|
||||
def kbps_tx(self):
|
||||
return self._kbps_tx
|
||||
|
||||
@property
|
||||
def kbps_rx(self):
|
||||
return self._kbps_rx
|
||||
|
||||
@property
|
||||
def recent_contact_count(self):
|
||||
return self._recent_contact_count
|
||||
|
||||
@property
|
||||
def total_bytes_tx(self):
|
||||
return self._total_bytes_tx
|
||||
|
||||
@property
|
||||
def total_bytes_rx(self):
|
||||
return self._total_bytes_rx
|
||||
|
||||
@property
|
||||
def bandwidth_stats(self):
|
||||
response = {
|
||||
"kbps_received": self.kbps_rx,
|
||||
"kbps_sent": self.kbps_tx,
|
||||
"total_bytes_sent": self.total_bytes_tx,
|
||||
"total_bytes_received": self.total_bytes_rx,
|
||||
"queries_received": self.queries_rx_per_second,
|
||||
"queries_sent": self.queries_tx_per_second,
|
||||
"recent_contacts": self.recent_contact_count,
|
||||
}
|
||||
return response
|
||||
|
||||
def sendRPC(self, contact, method, args, rawResponse=False):
|
||||
""" Sends an RPC to the specified contact
|
||||
|
||||
|
@ -108,26 +193,31 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
msgPrimitive = self._translator.toPrimitive(msg)
|
||||
encodedMsg = self._encoder.encode(msgPrimitive)
|
||||
|
||||
log.debug("DHT SEND: %s(%s)", method, args)
|
||||
log.debug("DHT SEND CALL %s(%s)", method, args[0].encode('hex'))
|
||||
|
||||
df = defer.Deferred()
|
||||
if rawResponse:
|
||||
df._rpcRawResponse = True
|
||||
|
||||
# Set the RPC timeout timer
|
||||
timeoutCall = reactor.callLater(
|
||||
constants.rpcTimeout, self._msgTimeout, msg.id) # IGNORE:E1101
|
||||
timeoutCall = reactor.callLater(constants.rpcTimeout, self._msgTimeout, msg.id)
|
||||
# Transmit the data
|
||||
self._send(encodedMsg, msg.id, (contact.address, contact.port))
|
||||
self._sentMessages[msg.id] = (contact.id, df, timeoutCall)
|
||||
return df
|
||||
|
||||
def startProtocol(self):
|
||||
log.info("DHT listening on UDP %i", self._node.port)
|
||||
if not self._bandwidth_stats_update_lc.running:
|
||||
self._bandwidth_stats_update_lc.start(1)
|
||||
|
||||
def datagramReceived(self, datagram, address):
|
||||
""" Handles and parses incoming RPC messages (and responses)
|
||||
|
||||
@note: This is automatically called by Twisted when the protocol
|
||||
receives a UDP datagram
|
||||
"""
|
||||
|
||||
if datagram[0] == '\x00' and datagram[25] == '\x00':
|
||||
totalPackets = (ord(datagram[1]) << 8) | ord(datagram[2])
|
||||
msgID = datagram[5:25]
|
||||
|
@ -157,12 +247,22 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
message = self._translator.fromPrimitive(msgPrimitive)
|
||||
remoteContact = Contact(message.nodeID, address[0], address[1], self)
|
||||
|
||||
now = time.time()
|
||||
contact_history = self._history_rx.get(address, [])
|
||||
if len(contact_history) > 1000:
|
||||
contact_history = [x for x in contact_history if now - x[1] < 1.0]
|
||||
contact_history.append((len(datagram), time.time()))
|
||||
self._history_rx[address] = contact_history
|
||||
bytes_rx = self._bytes_rx.get(address, 0)
|
||||
bytes_rx += len(datagram)
|
||||
self._bytes_rx[address] = bytes_rx
|
||||
|
||||
# Refresh the remote node's details in the local node's k-buckets
|
||||
self._node.addContact(remoteContact)
|
||||
|
||||
if isinstance(message, msgtypes.RequestMessage):
|
||||
# This is an RPC method request
|
||||
self._handleRPC(remoteContact, message.id, message.request, message.args)
|
||||
|
||||
elif isinstance(message, msgtypes.ResponseMessage):
|
||||
# Find the message that triggered this response
|
||||
if message.id in self._sentMessages:
|
||||
|
@ -205,6 +305,17 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
future, into something similar to a message translator/encoder
|
||||
class (see C{kademlia.msgformat} and C{kademlia.encoding}).
|
||||
"""
|
||||
|
||||
now = time.time()
|
||||
contact_history = self._history_tx.get(address, [])
|
||||
if len(contact_history) > 1000:
|
||||
contact_history = [x for x in contact_history if now - x[1] < 1.0]
|
||||
contact_history.append((len(data), time.time()))
|
||||
self._history_tx[address] = contact_history
|
||||
bytes_tx = self._bytes_tx.get(address, 0)
|
||||
bytes_tx += len(data)
|
||||
self._bytes_tx[address] = bytes_tx
|
||||
|
||||
if len(data) > self.msgSizeLimit:
|
||||
# We have to spread the data over multiple UDP datagrams,
|
||||
# and provide sequencing information
|
||||
|
@ -283,7 +394,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
func = getattr(self._node, method, None)
|
||||
if callable(func) and hasattr(func, 'rpcmethod'):
|
||||
# Call the exposed Node method and return the result to the deferred callback chain
|
||||
log.debug("DHT RECV CALL %s with args %s", method, args)
|
||||
log.debug("DHT RECV CALL %s(%s) %s:%i", method, args[0].encode('hex'),
|
||||
senderContact.address, senderContact.port)
|
||||
try:
|
||||
kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact}
|
||||
result = func(*args, **kwargs)
|
||||
|
@ -340,6 +452,10 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
Will only be called once, after all ports are disconnected.
|
||||
"""
|
||||
log.info('Stopping DHT')
|
||||
|
||||
if self._bandwidth_stats_update_lc.running:
|
||||
self._bandwidth_stats_update_lc.stop()
|
||||
|
||||
for delayed_call in self._call_later_list.values():
|
||||
try:
|
||||
delayed_call.cancel()
|
||||
|
|
|
@ -990,13 +990,15 @@ class Daemon(AuthJSONRPCServer):
|
|||
############################################################################
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_status(self, session_status=False):
|
||||
def jsonrpc_status(self, session_status=False, dht_status=False):
|
||||
"""
|
||||
Return daemon status
|
||||
|
||||
Args:
|
||||
'session_status' (optional): (bool) true to return session status,
|
||||
default is false
|
||||
'dht_status' (optional): (bool) true to return dht status,
|
||||
default is false
|
||||
Returns:
|
||||
(dict) Daemon status dictionary
|
||||
"""
|
||||
|
@ -1036,6 +1038,8 @@ class Daemon(AuthJSONRPCServer):
|
|||
'managed_blobs': len(blobs),
|
||||
'managed_streams': len(self.lbry_file_manager.lbry_files),
|
||||
}
|
||||
if dht_status:
|
||||
response['dht_status'] = self.session.dht_node.get_bandwidth_stats()
|
||||
defer.returnValue(response)
|
||||
|
||||
def jsonrpc_get_best_blockhash(self):
|
||||
|
|
Loading…
Add table
Reference in a new issue