Merge pull request #1830 from lbryio/dht_fixes

Dht fixes
This commit is contained in:
Jack Robison 2019-01-31 13:52:00 -05:00 committed by GitHub
commit 087b65522e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 117 deletions

View file

@ -82,7 +82,7 @@ class Node:
log.info("Store to %i peers", len(peers)) log.info("Store to %i peers", len(peers))
log.info(peers) log.info(peers)
for peer in peers: for peer in peers:
log.info("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port) log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port)
stored_to_tup = await asyncio.gather( stored_to_tup = await asyncio.gather(
*(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop *(self.protocol.store_to_peer(hash_value, peer) for peer in peers), loop=self.loop
) )
@ -229,7 +229,7 @@ class Node:
async def peer_search(self, node_id: bytes, count=constants.k, max_results=constants.k*2, async def peer_search(self, node_id: bytes, count=constants.k, max_results=constants.k*2,
bottom_out_limit=20) -> typing.List['KademliaPeer']: bottom_out_limit=20) -> typing.List['KademliaPeer']:
accumulated: typing.List['KademliaPeer'] = [] accumulated: typing.List['KademliaPeer'] = []
async with self.peer_search_junction(self.protocol.node_id, max_results=max_results, async with self.peer_search_junction(node_id, max_results=max_results,
bottom_out_limit=bottom_out_limit) as junction: bottom_out_limit=bottom_out_limit) as junction:
async for peers in junction: async for peers in junction:
accumulated.extend(peers) accumulated.extend(peers)

View file

@ -1,7 +1,9 @@
import asyncio import asyncio
from binascii import hexlify
from itertools import chain
import typing import typing
import logging import logging
from lbrynet.utils import drain_tasks
from lbrynet.dht import constants from lbrynet.dht import constants
from lbrynet.dht.error import RemoteException from lbrynet.dht.error import RemoteException
from lbrynet.dht.protocol.distance import Distance from lbrynet.dht.protocol.distance import Distance
@ -90,8 +92,8 @@ class IterativeFinder:
self.exclude = exclude or [] self.exclude = exclude or []
self.shortlist: typing.List['KademliaPeer'] = get_shortlist(routing_table, key, shortlist) self.shortlist: typing.List['KademliaPeer'] = get_shortlist(routing_table, key, shortlist)
self.active: typing.List['KademliaPeer'] = [] self.active: typing.Set['KademliaPeer'] = set()
self.contacted: typing.List[typing.Tuple[str, int]] = [] self.contacted: typing.Set[typing.Tuple[str, int]] = set()
self.distance = Distance(key) self.distance = Distance(key)
self.closest_peer: typing.Optional['KademliaPeer'] = None if not self.shortlist else self.shortlist[0] self.closest_peer: typing.Optional['KademliaPeer'] = None if not self.shortlist else self.shortlist[0]
@ -99,14 +101,12 @@ class IterativeFinder:
self.iteration_queue = asyncio.Queue(loop=self.loop) self.iteration_queue = asyncio.Queue(loop=self.loop)
self.running_probes: typing.List[asyncio.Task] = [] self.running_probes: typing.Set[asyncio.Task] = set()
self.lock = asyncio.Lock(loop=self.loop)
self.iteration_count = 0 self.iteration_count = 0
self.bottom_out_count = 0 self.bottom_out_count = 0
self.running = False self.running = False
self.tasks: typing.List[asyncio.Task] = [] self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = [] self.delayed_calls: typing.List[asyncio.Handle] = []
self.finished = asyncio.Event(loop=self.loop)
async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
""" """
@ -114,9 +114,16 @@ class IterativeFinder:
""" """
raise NotImplementedError() raise NotImplementedError()
def search_exhausted(self):
"""
This method ends the iterator due no more peers to contact.
Override to provide last time results.
"""
self.iteration_queue.put_nowait(None)
def check_result_ready(self, response: FindResponse): def check_result_ready(self, response: FindResponse):
""" """
Called with a lock after adding peers from an rpc result to the shortlist. Called after adding peers from an rpc result to the shortlist.
This method is responsible for putting a result for the generator into the Queue This method is responsible for putting a result for the generator into the Queue
""" """
raise NotImplementedError() raise NotImplementedError()
@ -129,9 +136,7 @@ class IterativeFinder:
return [] return []
def _is_closer(self, peer: 'KademliaPeer') -> bool: def _is_closer(self, peer: 'KademliaPeer') -> bool:
if not self.closest_peer: return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
return True
return self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
def _update_closest(self): def _update_closest(self):
self.shortlist.sort(key=lambda peer: self.distance(peer.node_id), reverse=True) self.shortlist.sort(key=lambda peer: self.distance(peer.node_id), reverse=True)
@ -141,21 +146,18 @@ class IterativeFinder:
self.closest_peer = self.shortlist[-1] self.closest_peer = self.shortlist[-1]
async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
async with self.lock: if peer not in self.shortlist:
if peer not in self.shortlist: self.shortlist.append(peer)
self.shortlist.append(peer) if peer not in self.active:
if peer not in self.active: self.active.add(peer)
self.active.append(peer) for contact_triple in response.get_close_triples():
for contact_triple in response.get_close_triples(): node_id, address, udp_port = contact_triple
addr_tuple = (contact_triple[1], contact_triple[2]) if (address, udp_port) not in self.contacted: # and not self.peer_manager.is_ignored(addr_tuple)
if addr_tuple not in self.contacted: # and not self.peer_manager.is_ignored(addr_tuple) found_peer = self.peer_manager.get_kademlia_peer(node_id, address, udp_port)
found_peer = self.peer_manager.get_kademlia_peer( if found_peer not in self.shortlist and self.peer_manager.peer_is_good(peer) is not False:
contact_triple[0], contact_triple[1], contact_triple[2] self.shortlist.append(found_peer)
) self._update_closest()
if found_peer not in self.shortlist and self.peer_manager.peer_is_good(peer) is not False: self.check_result_ready(response)
self.shortlist.append(found_peer)
self._update_closest()
self.check_result_ready(response)
async def _send_probe(self, peer: 'KademliaPeer'): async def _send_probe(self, peer: 'KademliaPeer'):
try: try:
@ -163,13 +165,11 @@ class IterativeFinder:
except asyncio.CancelledError: except asyncio.CancelledError:
return return
except asyncio.TimeoutError: except asyncio.TimeoutError:
if peer in self.active: self.active.discard(peer)
self.active.remove(peer)
return return
except ValueError as err: except ValueError as err:
log.warning(str(err)) log.warning(str(err))
if peer in self.active: self.active.discard(peer)
self.active.remove(peer)
return return
except RemoteException: except RemoteException:
return return
@ -181,31 +181,35 @@ class IterativeFinder:
""" """
added = 0 added = 0
async with self.lock: self.shortlist.sort(key=lambda p: self.distance(p.node_id), reverse=True)
self.shortlist.sort(key=lambda p: self.distance(p.node_id), reverse=True) while self.running and len(self.shortlist) and added < constants.alpha:
while self.running and len(self.shortlist) and added < constants.alpha: peer = self.shortlist.pop()
peer = self.shortlist.pop() origin_address = (peer.address, peer.udp_port)
origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude or self.peer_manager.peer_is_good(peer) is False:
if origin_address in self.exclude or self.peer_manager.peer_is_good(peer) is False: continue
continue if peer.node_id == self.protocol.node_id:
if peer.node_id == self.protocol.node_id: continue
continue if (peer.address, peer.udp_port) == (self.protocol.external_ip, self.protocol.udp_port):
if (peer.address, peer.udp_port) == (self.protocol.external_ip, self.protocol.udp_port): continue
continue if (peer.address, peer.udp_port) not in self.contacted:
if (peer.address, peer.udp_port) not in self.contacted: self.contacted.add((peer.address, peer.udp_port))
self.contacted.append((peer.address, peer.udp_port))
t = self.loop.create_task(self._send_probe(peer)) t = self.loop.create_task(self._send_probe(peer))
def callback(_): def callback(_):
if t and t in self.running_probes: self.running_probes.difference_update({
self.running_probes.remove(t) probe for probe in self.running_probes if probe.done() or probe == t
if not self.running_probes and self.shortlist: })
self.tasks.append(self.loop.create_task(self._search_task(0.0))) if not self.running_probes and self.shortlist:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
t.add_done_callback(callback) t.add_done_callback(callback)
self.running_probes.append(t) self.running_probes.add(t)
added += 1 added += 1
log.debug("running %d probes", len(self.running_probes))
if not added and not self.running_probes:
log.debug("search for %s exhausted", hexlify(self.key)[:8])
self.search_exhausted()
async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay): async def _search_task(self, delay: typing.Optional[float] = constants.iterative_lookup_delay):
try: try:
@ -215,70 +219,41 @@ class IterativeFinder:
self.delayed_calls.append(self.loop.call_later(delay, self._search)) self.delayed_calls.append(self.loop.call_later(delay, self._search))
except (asyncio.CancelledError, StopAsyncIteration): except (asyncio.CancelledError, StopAsyncIteration):
if self.running: if self.running:
drain_tasks(self.running_probes) self.loop.call_soon(self.aclose)
self.running = False
def _search(self): def _search(self):
self.tasks.append(self.loop.create_task(self._search_task())) self.tasks.append(self.loop.create_task(self._search_task()))
def search(self): def __aiter__(self):
if self.running: if self.running:
raise Exception("already running") raise Exception("already running")
self.running = True self.running = True
self._search() self._search()
async def next_queue_or_finished(self) -> typing.List['KademliaPeer']:
peers = self.loop.create_task(self.iteration_queue.get())
finished = self.loop.create_task(self.finished.wait())
err = None
try:
await asyncio.wait([peers, finished], loop=self.loop, return_when='FIRST_COMPLETED')
if peers.done():
return peers.result()
raise StopAsyncIteration()
except asyncio.CancelledError as error:
err = error
finally:
if not finished.done() and not finished.cancelled():
finished.cancel()
if not peers.done() and not peers.cancelled():
peers.cancel()
if err:
raise err
def __aiter__(self):
self.search()
return self return self
async def __anext__(self) -> typing.List['KademliaPeer']: async def __anext__(self) -> typing.List['KademliaPeer']:
try: try:
if self.iteration_count == 0: if self.iteration_count == 0:
initial_results = self.get_initial_result() result = self.get_initial_result() or await self.iteration_queue.get()
if initial_results: else:
self.iteration_queue.put_nowait(initial_results) result = await self.iteration_queue.get()
result = await self.next_queue_or_finished() if not result:
raise StopAsyncIteration
self.iteration_count += 1 self.iteration_count += 1
return result return result
except (asyncio.CancelledError, StopAsyncIteration): except (asyncio.CancelledError, StopAsyncIteration):
await self.aclose() self.loop.call_soon(self.aclose)
raise raise
def aclose(self): def aclose(self):
self.running = False self.running = False
self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
task.cancel()
self.tasks.clear()
self.running_probes.clear()
self.delayed_calls.clear()
async def _aclose():
async with self.lock:
self.running = False
if not self.finished.is_set():
self.finished.set()
drain_tasks(self.tasks)
drain_tasks(self.running_probes)
while self.delayed_calls:
timer = self.delayed_calls.pop()
if timer:
timer.cancel()
return asyncio.ensure_future(_aclose(), loop=self.loop)
class IterativeNodeFinder(IterativeFinder): class IterativeNodeFinder(IterativeFinder):
@ -295,24 +270,26 @@ class IterativeNodeFinder(IterativeFinder):
response = await self.protocol.get_rpc_peer(peer).find_node(self.key) response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
return FindNodeResponse(self.key, response) return FindNodeResponse(self.key, response)
def put_result(self, from_list: typing.List['KademliaPeer']): def search_exhausted(self):
not_yet_yielded = [peer for peer in from_list if peer not in self.yielded_peers] self.put_result(self.active, finish=True)
def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [peer for peer in from_iter if peer not in self.yielded_peers]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))] to_yield = not_yet_yielded[:min(constants.k, len(not_yet_yielded))]
if to_yield: if to_yield:
for peer in to_yield: for peer in to_yield:
self.yielded_peers.add(peer) self.yielded_peers.add(peer)
self.iteration_queue.put_nowait(to_yield) self.iteration_queue.put_nowait(to_yield)
if finish:
self.iteration_queue.put_nowait(None)
def check_result_ready(self, response: FindNodeResponse): def check_result_ready(self, response: FindNodeResponse):
found = response.found and self.key != self.protocol.node_id found = response.found and self.key != self.protocol.node_id
if found: if found:
log.info("found") log.info("found")
self.put_result(self.shortlist) return self.put_result(self.shortlist, finish=True)
if not self.finished.is_set():
self.finished.set()
return
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer): if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted), # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
# self.bottom_out_count, self.iteration_count) # self.bottom_out_count, self.iteration_count)
@ -323,16 +300,10 @@ class IterativeNodeFinder(IterativeFinder):
self.bottom_out_count) self.bottom_out_count)
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("limit hit") log.info("limit hit")
self.put_result(self.active) self.put_result(self.active, True)
if not self.finished.is_set(): elif self.max_results and len(self.active) - len(self.yielded_peers) >= self.max_results:
self.finished.set()
return
if self.max_results and len(self.active) - len(self.yielded_peers) >= self.max_results:
log.info("max results") log.info("max results")
self.put_result(self.active) self.put_result(self.active, True)
if not self.finished.is_set():
self.finished.set()
return
class IterativeValueFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder):
@ -366,14 +337,11 @@ class IterativeValueFinder(IterativeFinder):
# log.info("enough blob peers found") # log.info("enough blob peers found")
# if not self.finished.is_set(): # if not self.finished.is_set():
# self.finished.set() # self.finished.set()
return elif self.prev_closest_peer and self.closest_peer:
if self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1 self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit: if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out") log.info("blob peer search bottomed out")
if not self.finished.is_set(): self.iteration_queue.put_nowait(None)
self.finished.set()
return
def get_initial_result(self) -> typing.List['KademliaPeer']: def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key): if self.protocol.data_store.has_peers_for_blob(self.key):

View file

@ -274,7 +274,7 @@ class SQLiteStorage(SQLiteMixin):
def get_blobs_to_announce(self): def get_blobs_to_announce(self):
def get_and_update(transaction): def get_and_update(transaction):
timestamp = self.loop.time() timestamp = int(self.loop.time())
if self.conf.announce_head_and_sd_only: if self.conf.announce_head_and_sd_only:
r = transaction.execute( r = transaction.execute(
"select blob_hash from blob " "select blob_hash from blob "