diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 7ab36fd96..4162ef593 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -185,9 +185,6 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s Returns [, ] """ - if blob.get_is_verified() or blob.file_exists: - # file exists but not verified means someone is writing right now, give it time, come back later - return 0, connected_transport protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) if connected_transport and not connected_transport.is_closing(): connected_transport.set_protocol(protocol) @@ -199,6 +196,9 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s if not connected_transport: await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) + if blob.get_is_verified() or blob.file_exists: + # file exists but not verified means someone is writing right now, give it time, come back later + return 0, connected_transport return await protocol.download_blob(blob) except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): return 0, None diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 59a5b0ec5..b602225c5 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -26,7 +26,7 @@ class BlobDownloader: self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} self.rounds_won: typing.Dict['KademliaPeer', int] = {} - def should_race_continue(self): + def should_race_continue(self, blob: 'BlobFile'): if len(self.active_connections) >= self.config.max_connections_per_download: return False # if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves @@ -35,7 +35,7 @@ class BlobDownloader: # for peer, task in self.active_connections.items(): # if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done(): # return False - return True + return not (blob.get_is_verified() or blob.file_exists) async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): if blob.get_is_verified(): @@ -91,7 +91,7 @@ class BlobDownloader: len(batch), len(self.ignored), len(self.active_connections) ) for peer in batch: - if not self.should_race_continue(): + if not self.should_race_continue(blob): break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 45c82d838..fda968234 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -74,19 +74,21 @@ class StreamDownloader(StreamAssembler): def add_fixed_peers(self): async def _add_fixed_peers(): - self.peer_queue.put_nowait([ - KademliaPeer(self.loop, address=(await resolve_host(url, port + 1, proto='tcp')), tcp_port=port + 1) + addresses = [ + (await resolve_host(url, port + 1, proto='tcp'), port) for url, port in self.config.reflector_servers - ]) + ] + delay = self.config.fixed_peer_delay if ( + 'dht' not in self.config.components_to_skip + and self.node and len(self.node.protocol.routing_table.get_peers()) + ) else 0.0 + self.loop.call_later(delay, lambda: + self.peer_queue.put_nowait([ + KademliaPeer(self.loop, address=address, tcp_port=port + 1) + for address, port in addresses + ])) if self.config.reflector_servers: - self.fixed_peers_handle = self.loop.call_later( - self.config.fixed_peer_delay if ( - 'dht' not in self.config.components_to_skip - and self.node - and len(self.node.protocol.routing_table.get_peers()) - ) else 0.0, - lambda: self.loop.create_task(_add_fixed_peers()) - ) + self.loop.create_task(_add_fixed_peers()) def download(self, node: typing.Optional['Node'] = None): self.node = node