diff --git a/lbry/lbry/blob/blob_file.py b/lbry/lbry/blob/blob_file.py index 45ac2b26f..0492a3844 100644 --- a/lbry/lbry/blob/blob_file.py +++ b/lbry/lbry/blob/blob_file.py @@ -163,7 +163,10 @@ class AbstractBlob: if not self.is_readable(): raise OSError('blob files cannot be read') with self.reader_context() as handle: - return await self.loop.sendfile(writer.transport, handle, count=self.get_length()) + try: + return await self.loop.sendfile(writer.transport, handle, count=self.get_length()) + except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, AttributeError): + return -1 def decrypt(self, key: bytes, iv: bytes) -> bytes: """ diff --git a/lbry/lbry/blob_exchange/downloader.py b/lbry/lbry/blob_exchange/downloader.py index 55b9a305c..8584bb464 100644 --- a/lbry/lbry/blob_exchange/downloader.py +++ b/lbry/lbry/blob_exchange/downloader.py @@ -27,7 +27,7 @@ class BlobDownloader: self.ignored: typing.Dict['KademliaPeer', int] = {} self.scores: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {} - self.connection_failures: typing.List['KademliaPeer'] = [] + self.connection_failures: typing.Set['KademliaPeer'] = set() self.connections: typing.Dict['KademliaPeer', 'BlobExchangeClientProtocol'] = {} self.is_running = asyncio.Event(loop=self.loop) @@ -48,7 +48,7 @@ class BlobDownloader: connection_id=connection_id, connection_manager=self.blob_manager.connection_manager ) if not bytes_received and not protocol and peer not in self.connection_failures: - self.connection_failures.append(peer) + self.connection_failures.add(peer) if not protocol and peer not in self.ignored: self.ignored[peer] = self.loop.time() log.debug("drop peer %s:%i", peer.address, peer.tcp_port) @@ -87,32 +87,42 @@ class BlobDownloader: if blob.get_is_verified(): return blob self.is_running.set() + tried_for_this_blob: typing.Set['KademliaPeer'] = set() try: while not blob.get_is_verified() and self.is_running.is_set(): - batch: typing.Set['KademliaPeer'] = set() + batch: typing.Set['KademliaPeer'] = set(self.connections.keys()) while not self.peer_queue.empty(): batch.update(self.peer_queue.get_nowait()) - if batch: - self.peer_queue.put_nowait(list(batch)) log.debug( - "running, %d peers, %d ignored, %d active, %s connections", + "%s running, %d peers, %d ignored, %d active, %s connections", blob_hash[:6], len(batch), len(self.ignored), len(self.active_connections), len(self.connections) ) + re_add: typing.Set['KademliaPeer'] = set() for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): - if peer in self.ignored or peer in self.active_connections: + if peer in self.ignored: + continue + if peer in tried_for_this_blob: + continue + if peer in self.active_connections: + if peer not in re_add: + re_add.add(peer) continue if not self.should_race_continue(blob): break log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) - just_probe = len(self.connections) == 0 - t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id, just_probe)) + t = self.loop.create_task(self.request_blob_from_peer(blob, peer, connection_id)) self.active_connections[peer] = t + tried_for_this_blob.add(peer) + if not re_add: + self.peer_queue.put_nowait(list(batch)) await self.new_peer_or_finished() self.cleanup_active() log.debug("downloaded %s", blob_hash[:8]) return blob finally: blob.close() + if self.loop.is_running(): + self.loop.call_soon(self.cleanup_active) def close(self): self.connection_failures.clear() diff --git a/lbry/lbry/blob_exchange/server.py b/lbry/lbry/blob_exchange/server.py index 123351888..15ac046ee 100644 --- a/lbry/lbry/blob_exchange/server.py +++ b/lbry/lbry/blob_exchange/server.py @@ -101,16 +101,21 @@ class BlobServerProtocol(asyncio.Protocol): self.started_transfer.set() try: sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop) - self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) - log.info("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port) - except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, asyncio.TimeoutError) as err: + if sent and sent > 0: + self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) + log.info("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port) + else: + log.debug("stopped sending %s to %s:%i", bh, peer_address, peer_port) + except (OSError, asyncio.TimeoutError) as err: if isinstance(err, asyncio.TimeoutError): log.debug("timed out sending blob %s to %s", bh, peer_address) else: - log.debug("stopped sending %s to %s:%i", bh, peer_address, peer_port) + log.warning("could not read blob %s to send %s:%i", bh, peer_address, peer_port) self.close() finally: self.transfer_finished.set() + else: + log.info("don't have %s to send %s:%i", blob.blob_hash[:8], peer_address, peer_port) if responses: self.send_response(responses) diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index 5558d3370..7c5f4e5d9 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -404,7 +404,12 @@ class ManagedStream: ) await self.update_status(ManagedStream.STATUS_RUNNING) self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) - await self.started_writing.wait() + try: + await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout, loop=self.loop) + except asyncio.TimeoutError: + log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id) + self.stop_tasks() + await self.update_status(ManagedStream.STATUS_STOPPED) def stop_tasks(self): if self.file_output_task and not self.file_output_task.done(): diff --git a/lbry/tests/integration/test_file_commands.py b/lbry/tests/integration/test_file_commands.py index f324e803f..e558e57fe 100644 --- a/lbry/tests/integration/test_file_commands.py +++ b/lbry/tests/integration/test_file_commands.py @@ -294,26 +294,6 @@ class FileCommands(CommandTestCase): self.assertEqual(file_info['blobs_completed'], file_info['blobs_in_stream']) self.assertEqual('finished', file_info['status']) - async def test_unban_recovers_stream(self): - BlobDownloader.BAN_FACTOR = .5 # fixme: temporary field, will move to connection manager or a conf - tx = await self.stream_create('foo', '0.01', data=bytes([0] * (1 << 23))) - sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] - missing_blob_hash = (await self.daemon.jsonrpc_blob_list(sd_hash=sd_hash))['items'][-2] - await self.daemon.jsonrpc_file_delete(claim_name='foo') - # backup blob - missing_blob = self.server_blob_manager.get_blob(missing_blob_hash) - os.rename(missing_blob.file_path, missing_blob.file_path + '__') - self.server_blob_manager.delete_blob(missing_blob_hash) - await self.daemon.jsonrpc_get('lbry://foo') - with self.assertRaises(asyncio.TimeoutError): - await asyncio.wait_for(self.wait_files_to_complete(), timeout=1) - # restore blob - os.rename(missing_blob.file_path + '__', missing_blob.file_path) - self.server_blob_manager.blobs.clear() - missing_blob = self.server_blob_manager.get_blob(missing_blob_hash) - self.server_blob_manager.blob_completed(missing_blob) - await asyncio.wait_for(self.wait_files_to_complete(), timeout=1) - async def test_paid_download(self): target_address = await self.blockchain.get_raw_change_address()