diff --git a/lbry/lbry/blob_exchange/client.py b/lbry/lbry/blob_exchange/client.py index cbf376946..ff3516d32 100644 --- a/lbry/lbry/blob_exchange/client.py +++ b/lbry/lbry/blob_exchange/client.py @@ -187,7 +187,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): return await self._download_blob() except OSError as e: # i'm not sure how to fix this race condition - jack - log.warning("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) + log.warning("race happened downloading %s from %s:%s", blob_hash, self.peer_address, self.peer_port) # return self._blob_bytes_received, self.transport raise except asyncio.TimeoutError: @@ -220,7 +220,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): @cache_concurrent -async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', address: str, tcp_port: int, +async def request_blob(loop: asyncio.BaseEventLoop, blob: typing.Optional['AbstractBlob'], address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, connected_transport: asyncio.Transport = None, connection_id: int = 0, connection_manager: typing.Optional['ConnectionManager'] = None)\ @@ -242,7 +242,9 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'AbstractBlob', addres if not connected_transport: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) - if blob.get_is_verified() or not blob.is_writeable(): + connected_transport = protocol.transport + if blob is None or blob.get_is_verified() or not blob.is_writeable(): + # blob is None happens when we are just opening a connection # file exists but not verified means someone is writing right now, give it time, come back later return 0, connected_transport return await protocol.download_blob(blob) diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 02943749b..3c60e5db1 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -30,17 +30,19 @@ class BlobDownloader: self.is_running = asyncio.Event(loop=self.loop) def should_race_continue(self, blob: 'AbstractBlob'): - if len(self.active_connections) >= self.config.max_connections_per_download: + max_probes = self.config.max_connections_per_download * (1 if self.connections else 10) + if len(self.active_connections) >= max_probes: return False return not (blob.get_is_verified() or not blob.is_writeable()) - async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0): + async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0, + just_probe: bool = False): if blob.get_is_verified(): return transport = self.connections.get(peer) start = self.loop.time() bytes_received, transport = await request_blob( - self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, + self.loop, blob if not just_probe else None, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout, connected_transport=transport, connection_id=connection_id, connection_manager=self.blob_manager.connection_manager @@ -91,16 +93,18 @@ class BlobDownloader: if batch: self.peer_queue.put_nowait(list(batch)) log.debug( - "running, %d peers, %d ignored, %d active", - len(batch), len(self.ignored), len(self.active_connections) + "running, %d peers, %d ignored, %d active, %s connections", + len(batch), len(self.ignored), len(self.active_connections), len(self.connections) ) for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): + if peer in self.ignored or peer in self.active_connections: + continue if not self.should_race_continue(blob): break - if peer not in self.active_connections and peer not in self.ignored: - log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) - self.active_connections[peer] = t + log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) + just_probe = len(self.connections) == 0 + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) + self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active() log.debug("downloaded %s", blob_hash[:8])