Merge pull request #2348 from lbryio/probemon

Probe if peers can be connected to during blob downloads to more quickly skip bad peers
This commit is contained in:
Jack Robison 2019-07-25 13:21:42 -04:00 committed by GitHub
commit 1374d337d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 12 deletions

View file

@ -187,7 +187,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
return await self._download_blob() return await self._download_blob()
except OSError as e: except OSError as e:
# i'm not sure how to fix this race condition - jack # i'm not sure how to fix this race condition - jack
log.warning("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) log.warning("race happened downloading %s from %s:%s", blob_hash, self.peer_address, self.peer_port)
# return self._blob_bytes_received, self.transport # return self._blob_bytes_received, self.transport
raise raise
except asyncio.TimeoutError: except asyncio.TimeoutError:
@ -220,7 +220,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
@cache_concurrent @cache_concurrent
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['AbstractBlob'], address: str, tcp_port: int,
peer_connect_timeout: float, blob_download_timeout: float, peer_connect_timeout: float, blob_download_timeout: float,
connected_transport: asyncio.Transport = None, connection_id: int = 0, connected_transport: asyncio.Transport = None, connection_id: int = 0,
connection_manager: typing.Optional['ConnectionManager'] = None)\ connection_manager: typing.Optional['ConnectionManager'] = None)\
@ -242,7 +242,9 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', addres
if not connected_transport: if not connected_transport:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout, loop=loop) peer_connect_timeout, loop=loop)
if blob.get_is_verified() or not blob.is_writeable(): connected_transport = protocol.transport
if blob is None or blob.get_is_verified() or not blob.is_writeable():
# blob is None happens when we are just opening a connection
# file exists but not verified means someone is writing right now, give it time, come back later # file exists but not verified means someone is writing right now, give it time, come back later
return 0, connected_transport return 0, connected_transport
return await protocol.download_blob(blob) return await protocol.download_blob(blob)

View file

@ -30,17 +30,19 @@ class BlobDownloader:
self.is_running = asyncio.Event(loop=self.loop) self.is_running = asyncio.Event(loop=self.loop)
def should_race_continue(self, blob: 'AbstractBlob'): def should_race_continue(self, blob: 'AbstractBlob'):
if len(self.active_connections) >= self.config.max_connections_per_download: max_probes = self.config.max_connections_per_download * (1 if self.connections else 10)
if len(self.active_connections) >= max_probes:
return False return False
return not (blob.get_is_verified() or not blob.is_writeable()) return not (blob.get_is_verified() or not blob.is_writeable())
async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0,
just_probe: bool = False):
if blob.get_is_verified(): if blob.get_is_verified():
return return
transport = self.connections.get(peer) transport = self.connections.get(peer)
start = self.loop.time() start = self.loop.time()
bytes_received, transport = await request_blob( bytes_received, transport = await request_blob(
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id,
connection_manager=self.blob_manager.connection_manager connection_manager=self.blob_manager.connection_manager
@ -91,16 +93,18 @@ class BlobDownloader:
if batch: if batch:
self.peer_queue.put_nowait(list(batch)) self.peer_queue.put_nowait(list(batch))
log.debug( log.debug(
"running, %d peers, %d ignored, %d active", "running, %d peers, %d ignored, %d active, %s connections",
len(batch), len(self.ignored), len(self.active_connections) len(batch), len(self.ignored), len(self.active_connections), len(self.connections)
) )
for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True):
if peer in self.ignored or peer in self.active_connections:
continue
if not self.should_race_continue(blob): if not self.should_race_continue(blob):
break break
if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) just_probe = len(self.connections) == 0
t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe))
self.active_connections[peer] = t self.active_connections[peer] = t
await self.new_peer_or_finished() await self.new_peer_or_finished()
self.cleanup_active() self.cleanup_active()
log.debug("downloaded %s", blob_hash[:8]) log.debug("downloaded %s", blob_hash[:8])