refactor DHTHashAnnouncer and iterativeAnnounceHaveBlob

-use looping call for running manage function rather than a scheduled
callLater

-track announce speed

-retry store requests that failed up to 3 times

-return a dict of {blob_hash: [storing_node_id]} results from
_announce_hashes

_refreshRoutingTable inline cb refactor

-add and use DeferredLockContextManager

-don't trap errback from iterativeFindNode in iterativeAnnounceHaveBlob
This commit is contained in:
Jack Robison 2017-10-23 15:36:50 -04:00
parent 67ef8be7b7
commit 446c3a88dc
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 190 additions and 97 deletions

View file

@ -2,8 +2,9 @@ import binascii
import collections import collections
import logging import logging
import time import time
import datetime
from twisted.internet import defer from twisted.internet import defer, task
from lbrynet.core import utils from lbrynet.core import utils
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -14,6 +15,8 @@ class DHTHashAnnouncer(object):
CONCURRENT_ANNOUNCERS = 5 CONCURRENT_ANNOUNCERS = 5
"""This class announces to the DHT that this peer has certain blobs""" """This class announces to the DHT that this peer has certain blobs"""
STORE_RETRIES = 3
def __init__(self, dht_node, peer_port): def __init__(self, dht_node, peer_port):
self.dht_node = dht_node self.dht_node = dht_node
self.peer_port = peer_port self.peer_port = peer_port
@ -21,17 +24,42 @@ class DHTHashAnnouncer(object):
self.next_manage_call = None self.next_manage_call = None
self.hash_queue = collections.deque() self.hash_queue = collections.deque()
self._concurrent_announcers = 0 self._concurrent_announcers = 0
self._manage_call_lc = task.LoopingCall(self.manage_lc)
self._lock = utils.DeferredLockContextManager(defer.DeferredLock())
self._last_checked = time.time(), self.CONCURRENT_ANNOUNCERS
self._retries = {}
self._total = None
def run_manage_loop(self): def run_manage_loop(self):
log.info("Starting hash announcer")
if not self._manage_call_lc.running:
self._manage_call_lc.start(self.ANNOUNCE_CHECK_INTERVAL)
def manage_lc(self):
last_time, last_hashes = self._last_checked
hashes = len(self.hash_queue)
if hashes:
t, h = time.time() - last_time, last_hashes - hashes
blobs_per_second = float(h) / float(t)
if blobs_per_second > 0:
estimated_time_remaining = int(float(hashes) / blobs_per_second)
remaining = str(datetime.timedelta(seconds=estimated_time_remaining))
else:
remaining = "unknown"
log.info("Announcing blobs: %i blobs left to announce, %i%s complete, "
"est time remaining: %s", hashes + self._concurrent_announcers,
100 - int(100.0 * float(hashes + self._concurrent_announcers) /
float(self._total)), "%", remaining)
self._last_checked = t + last_time, hashes
else:
self._total = 0
if self.peer_port is not None: if self.peer_port is not None:
self._announce_available_hashes() return self._announce_available_hashes()
self.next_manage_call = utils.call_later(self.ANNOUNCE_CHECK_INTERVAL, self.run_manage_loop)
def stop(self): def stop(self):
log.info("Stopping DHT hash announcer.") log.info("Stopping DHT hash announcer.")
if self.next_manage_call is not None: if self._manage_call_lc.running:
self.next_manage_call.cancel() self._manage_call_lc.stop()
self.next_manage_call = None
def add_supplier(self, supplier): def add_supplier(self, supplier):
self.suppliers.append(supplier) self.suppliers.append(supplier)
@ -45,60 +73,101 @@ class DHTHashAnnouncer(object):
def hash_queue_size(self): def hash_queue_size(self):
return len(self.hash_queue) return len(self.hash_queue)
@defer.inlineCallbacks
def _announce_available_hashes(self): def _announce_available_hashes(self):
log.debug('Announcing available hashes') log.debug('Announcing available hashes')
ds = []
for supplier in self.suppliers: for supplier in self.suppliers:
d = supplier.hashes_to_announce() hashes = yield supplier.hashes_to_announce()
d.addCallback(self._announce_hashes) yield self._announce_hashes(hashes)
ds.append(d)
dl = defer.DeferredList(ds)
return dl
@defer.inlineCallbacks
def _announce_hashes(self, hashes, immediate=False): def _announce_hashes(self, hashes, immediate=False):
if not hashes: if not hashes:
return defer.returnValue(None)
log.debug('Announcing %s hashes', len(hashes)) if not self.dht_node.can_store:
log.warning("Client only DHT node cannot store, skipping announce")
defer.returnValue(None)
log.info('Announcing %s hashes', len(hashes))
# TODO: add a timeit decorator # TODO: add a timeit decorator
start = time.time() start = time.time()
ds = []
for h in hashes: ds = []
announce_deferred = defer.Deferred() with self._lock:
ds.append(announce_deferred) for h in hashes:
if immediate: announce_deferred = defer.Deferred()
self.hash_queue.appendleft((h, announce_deferred)) if immediate:
else: self.hash_queue.appendleft((h, announce_deferred))
self.hash_queue.append((h, announce_deferred)) else:
self.hash_queue.append((h, announce_deferred))
if not self._total:
self._total = len(hashes)
log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size()) log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size())
def announce(): @defer.inlineCallbacks
def do_store(blob_hash, announce_d):
if announce_d.called:
defer.returnValue(announce_deferred.result)
try:
store_nodes = yield self.dht_node.announceHaveBlob(binascii.unhexlify(blob_hash))
if not store_nodes:
retries = self._retries.get(blob_hash, 0)
retries += 1
self._retries[blob_hash] = retries
if retries <= self.STORE_RETRIES:
log.debug("No nodes stored %s, retrying", blob_hash)
result = yield do_store(blob_hash, announce_d)
else:
log.warning("No nodes stored %s", blob_hash)
else:
result = store_nodes
if not announce_d.called:
announce_d.callback(result)
defer.returnValue(result)
except Exception as err:
if not announce_d.called:
announce_d.errback(err)
raise err
@defer.inlineCallbacks
def announce(progress=None):
progress = progress or {}
if len(self.hash_queue): if len(self.hash_queue):
h, announce_deferred = self.hash_queue.popleft() with self._lock:
log.debug('Announcing blob %s to dht', h) h, announce_deferred = self.hash_queue.popleft()
d = self.dht_node.announceHaveBlob(binascii.unhexlify(h)) log.debug('Announcing blob %s to dht', h[:16])
d.chainDeferred(announce_deferred) stored_to_nodes = yield do_store(h, announce_deferred)
d.addBoth(lambda _: utils.call_later(0, announce)) progress[h] = stored_to_nodes
log.debug("Stored %s to %i peers (hashes announced by this announcer: %i)",
h.encode('hex')[:16],
len(stored_to_nodes), len(progress))
yield announce(progress)
else: else:
self._concurrent_announcers -= 1 with self._lock:
self._concurrent_announcers -= 1
defer.returnValue(progress)
for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS): for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS):
self._concurrent_announcers += 1 self._concurrent_announcers += 1
announce() ds.append(announce())
d = defer.DeferredList(ds) announcer_results = yield defer.DeferredList(ds)
d.addCallback(lambda _: log.debug('Took %s seconds to announce %s hashes', stored_to = {}
time.time() - start, len(hashes))) for _, announced_to in announcer_results:
return d stored_to.update(announced_to)
log.info('Took %s seconds to announce %s hashes', time.time() - start, len(hashes))
defer.returnValue(stored_to)
class DHTHashSupplier(object): class DHTHashSupplier(object):
# 1 hour is the min time hash will be reannounced # 1 hour is the min time hash will be reannounced
MIN_HASH_REANNOUNCE_TIME = 60*60 MIN_HASH_REANNOUNCE_TIME = 60 * 60
# conservative assumption of the time it takes to announce # conservative assumption of the time it takes to announce
# a single hash # a single hash
SINGLE_HASH_ANNOUNCE_DURATION = 5 SINGLE_HASH_ANNOUNCE_DURATION = 5
"""Classes derived from this class give hashes to a hash announcer""" """Classes derived from this class give hashes to a hash announcer"""
def __init__(self, announcer): def __init__(self, announcer):
if announcer is not None: if announcer is not None:
announcer.add_supplier(self) announcer.add_supplier(self)
@ -107,7 +176,6 @@ class DHTHashSupplier(object):
def hashes_to_announce(self): def hashes_to_announce(self):
pass pass
def get_next_announce_time(self, num_hashes_to_announce=1): def get_next_announce_time(self, num_hashes_to_announce=1):
""" """
Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME, Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME,
@ -121,9 +189,7 @@ class DHTHashSupplier(object):
Returns: Returns:
timestamp for next announce time timestamp for next announce time
""" """
queue_size = self.hash_announcer.hash_queue_size()+num_hashes_to_announce queue_size = self.hash_announcer.hash_queue_size() + num_hashes_to_announce
reannounce = max(self.MIN_HASH_REANNOUNCE_TIME, reannounce = max(self.MIN_HASH_REANNOUNCE_TIME,
queue_size*self.SINGLE_HASH_ANNOUNCE_DURATION) queue_size * self.SINGLE_HASH_ANNOUNCE_DURATION)
return time.time() + reannounce return time.time() + reannounce

View file

@ -148,6 +148,17 @@ def json_dumps_pretty(obj, **kwargs):
return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs)
class DeferredLockContextManager(object):
def __init__(self, lock):
self._lock = lock
def __enter__(self):
yield self._lock.aquire()
def __exit__(self, exc_type, exc_val, exc_tb):
yield self._lock.release()
@defer.inlineCallbacks @defer.inlineCallbacks
def DeferredDict(d, consumeErrors=False): def DeferredDict(d, consumeErrors=False):
keys = [] keys = []

View file

@ -117,10 +117,17 @@ class Node(object):
self.peerPort = peerPort self.peerPort = peerPort
self.hash_watcher = HashWatcher() self.hash_watcher = HashWatcher()
# will be used later
self._can_store = True
def __del__(self): def __del__(self):
if self._listeningPort is not None: if self._listeningPort is not None:
self._listeningPort.stopListening() self._listeningPort.stopListening()
@property
def can_store(self):
return self._can_store is True
def stop(self): def stop(self):
# stop LoopingCalls: # stop LoopingCalls:
if self.refresh_node_lc.running: if self.refresh_node_lc.running:
@ -252,20 +259,7 @@ class Node(object):
def iterativeAnnounceHaveBlob(self, blob_hash, value): def iterativeAnnounceHaveBlob(self, blob_hash, value):
known_nodes = {} known_nodes = {}
def log_error(err, n): @defer.inlineCallbacks
if err.check(protocol.TimeoutError):
log.debug(
"Timeout while storing blob_hash %s at %s",
binascii.hexlify(blob_hash), n)
else:
log.error(
"Unexpected error while storing blob_hash %s at %s: %s",
binascii.hexlify(blob_hash), n, err.getErrorMessage())
def log_success(res):
log.debug("Response to store request: %s", str(res))
return res
def announce_to_peer(responseTuple): def announce_to_peer(responseTuple):
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """ """ @type responseMsg: kademlia.msgtypes.ResponseMessage """
# The "raw response" tuple contains the response message, # The "raw response" tuple contains the response message,
@ -274,40 +268,65 @@ class Node(object):
originAddress = responseTuple[1] # tuple: (ip adress, udp port) originAddress = responseTuple[1] # tuple: (ip adress, udp port)
# Make sure the responding node is valid, and abort the operation if it isn't # Make sure the responding node is valid, and abort the operation if it isn't
if not responseMsg.nodeID in known_nodes: if not responseMsg.nodeID in known_nodes:
return responseMsg.nodeID log.warning("Responding node was not expected")
defer.returnValue(responseMsg.nodeID)
n = known_nodes[responseMsg.nodeID] n = known_nodes[responseMsg.nodeID]
result = responseMsg.response result = responseMsg.response
announced = False
if 'token' in result: if 'token' in result:
value['token'] = result['token'] value['token'] = result['token']
d = n.store(blob_hash, value, self.node_id, 0) try:
d.addCallback(log_success) res = yield n.store(blob_hash, value, self.node_id)
d.addErrback(log_error, n) log.debug("Response to store request: %s", str(res))
announced = True
except protocol.TimeoutError:
log.debug("Timeout while storing blob_hash %s at %s",
blob_hash.encode('hex')[:16], n.id.encode('hex'))
except Exception as err:
log.error("Unexpected error while storing blob_hash %s at %s: %s",
blob_hash.encode('hex')[:16], n.id.encode('hex'), err)
else: else:
d = defer.succeed(False) log.warning("missing token")
return d defer.returnValue(announced)
@defer.inlineCallbacks
def requestPeers(contacts): def requestPeers(contacts):
if self.externalIP is not None and len(contacts) >= constants.k: if self.externalIP is not None and len(contacts) >= constants.k:
is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id)
if is_closer: if is_closer:
contacts.pop() contacts.pop()
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) yield self.store(blob_hash, value, originalPublisherID=self.node_id,
self_store=True)
elif self.externalIP is not None: elif self.externalIP is not None:
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) yield self.store(blob_hash, value, originalPublisherID=self.node_id,
ds = [] self_store=True)
else:
raise Exception("Cannot determine external IP: %s" % self.externalIP)
contacted = []
for contact in contacts: for contact in contacts:
known_nodes[contact.id] = contact known_nodes[contact.id] = contact
rpcMethod = getattr(contact, "findValue") rpcMethod = getattr(contact, "findValue")
df = rpcMethod(blob_hash, rawResponse=True) try:
df.addCallback(announce_to_peer) response = yield rpcMethod(blob_hash, rawResponse=True)
df.addErrback(log_error, contact) stored = yield announce_to_peer(response)
ds.append(df) if stored:
return defer.DeferredList(ds) contacted.append(contact)
except protocol.TimeoutError:
log.debug("Timeout while storing blob_hash %s at %s",
binascii.hexlify(blob_hash), contact)
except Exception as err:
log.error("Unexpected error while storing blob_hash %s at %s: %s",
binascii.hexlify(blob_hash), contact, err)
log.debug("Stored %s to %i of %i attempted peers", blob_hash.encode('hex')[:16],
len(contacted), len(contacts))
contacted_node_ids = [c.id.encode('hex') for c in contacts]
defer.returnValue(contacted_node_ids)
d = self.iterativeFindNode(blob_hash) d = self.iterativeFindNode(blob_hash)
d.addCallbacks(requestPeers) d.addCallback(requestPeers)
return d return d
def change_token(self): def change_token(self):
@ -638,28 +657,13 @@ class Node(object):
self._dataStore.removeExpiredPeers() self._dataStore.removeExpiredPeers()
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks
def _refreshRoutingTable(self): def _refreshRoutingTable(self):
nodeIDs = self._routingTable.getRefreshList(0, False) nodeIDs = self._routingTable.getRefreshList(0, False)
outerDf = defer.Deferred() while nodeIDs:
searchID = nodeIDs.pop()
def searchForNextNodeID(dfResult=None): yield self.iterativeFindNode(searchID)
if len(nodeIDs) > 0: defer.returnValue(None)
searchID = nodeIDs.pop()
df = self.iterativeFindNode(searchID)
df.addCallback(searchForNextNodeID)
else:
# If this is reached, we have finished refreshing the routing table
outerDf.callback(None)
# Start the refreshing cycle
searchForNextNodeID()
return outerDf
# args put here because _refreshRoutingTable does outerDF.callback(None)
def _removeExpiredPeers(self, *args):
df = threads.deferToThread(self._dataStore.removeExpiredPeers)
return df
# This was originally a set of nested methods in _iterativeFind # This was originally a set of nested methods in _iterativeFind

View file

@ -1,21 +1,30 @@
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer, task from twisted.internet import defer, task, reactor
from lbrynet.core import utils from lbrynet.core import utils
from lbrynet.tests.util import random_lbry_hash from lbrynet.tests.util import random_lbry_hash
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer
class MocDHTNode(object): class MocDHTNode(object):
def __init__(self): def __init__(self):
self.can_store = True
self.blobs_announced = 0 self.blobs_announced = 0
@defer.inlineCallbacks
def announceHaveBlob(self, blob): def announceHaveBlob(self, blob):
self.blobs_announced += 1 self.blobs_announced += 1
return defer.succeed(True) d = defer.Deferred(None)
reactor.callLater(1, d.callback, {blob: ["ab" * 48]})
result = yield d
defer.returnValue(result)
class MocSupplier(object): class MocSupplier(object):
def __init__(self, blobs_to_announce): def __init__(self, blobs_to_announce):
self.blobs_to_announce = blobs_to_announce self.blobs_to_announce = blobs_to_announce
self.announced = False self.announced = False
def hashes_to_announce(self): def hashes_to_announce(self):
if not self.announced: if not self.announced:
self.announced = True self.announced = True
@ -23,8 +32,8 @@ class MocSupplier(object):
else: else:
return defer.succeed([]) return defer.succeed([])
class DHTHashAnnouncerTest(unittest.TestCase):
class DHTHashAnnouncerTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.num_blobs = 10 self.num_blobs = 10
self.blobs_to_announce = [] self.blobs_to_announce = []
@ -33,23 +42,26 @@ class DHTHashAnnouncerTest(unittest.TestCase):
self.clock = task.Clock() self.clock = task.Clock()
self.dht_node = MocDHTNode() self.dht_node = MocDHTNode()
utils.call_later = self.clock.callLater utils.call_later = self.clock.callLater
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer
self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333) self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333)
self.supplier = MocSupplier(self.blobs_to_announce) self.supplier = MocSupplier(self.blobs_to_announce)
self.announcer.add_supplier(self.supplier) self.announcer.add_supplier(self.supplier)
@defer.inlineCallbacks
def test_basic(self): def test_basic(self):
self.announcer._announce_available_hashes() d = self.announcer._announce_available_hashes()
self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS) self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS)
self.clock.advance(1) self.clock.advance(1)
yield d
self.assertEqual(self.dht_node.blobs_announced, self.num_blobs) self.assertEqual(self.dht_node.blobs_announced, self.num_blobs)
self.assertEqual(self.announcer.hash_queue_size(), 0) self.assertEqual(self.announcer.hash_queue_size(), 0)
@defer.inlineCallbacks
def test_immediate_announce(self): def test_immediate_announce(self):
# Test that immediate announce puts a hash at the front of the queue # Test that immediate announce puts a hash at the front of the queue
self.announcer._announce_available_hashes() d = self.announcer._announce_available_hashes()
self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS)
blob_hash = random_lbry_hash() blob_hash = random_lbry_hash()
self.announcer.immediate_announce([blob_hash]) self.announcer.immediate_announce([blob_hash])
self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS+1) self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS+1)
self.assertEqual(blob_hash, self.announcer.hash_queue[0][0]) self.assertEqual(blob_hash, self.announcer.hash_queue[0][0])
yield d