diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index cbf376946..78de6fa1f 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -220,7 +220,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent -async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, +async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['AbstractBlob'], address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, connected_transport: asyncio.Transport = None, connection_id: int = 0, connection_manager: typing.Optional['ConnectionManager'] = None)\ @@ -242,7 +242,8 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', addres if not connected_transport: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) - if blob.get_is_verified() or not blob.is_writeable(): + if blob is None or blob.get_is_verified() or not blob.is_writeable(): + # blob is None happens when we are just opening a connection # file exists but not verified means someone is writing right now, give it time, come back later return 0, connected_transport return await protocol.download_blob(blob) diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 02943749b..c6fca2113 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -34,13 +34,14 @@ class BlobDownloader: return False return not (blob.get_is_verified() or not blob.is_writeable()) - async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): + async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0, + just_probe: bool = False): if blob.get_is_verified(): return transport = self.connections.get(peer) start = self.loop.time() bytes_received, transport = await request_blob( - self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, + self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, connection_manager=self.blob_manager.connection_manager @@ -95,11 +96,17 @@ class BlobDownloader: len(batch), len(self.ignored), len(self.active_connections) ) for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): + just_probe = False if not self.should_race_continue(blob): - break + if len(self.connections) >= self.config.max_connections_per_download * 10: + if peer in self.connections: + continue + just_probe = True + else: + break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active()