diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index dfcf1f3aa..65e0d4a43 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -167,7 +167,7 @@ class AbstractBlob: with self.reader_context() as handle: try: return await self.loop.sendfile(writer.transport, handle, count=self.get_length()) - except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, AttributeError): + except (ConnectionError, BrokenPipeError, RuntimeError, OSError, AttributeError): return -1 def decrypt(self, key: bytes, iv: bytes) -> bytes: diff --git a/lbry/blob_exchange/server.py b/lbry/blob_exchange/server.py index 3a2547ba0..a2ff8b698 100644 --- a/lbry/blob_exchange/server.py +++ b/lbry/blob_exchange/server.py @@ -105,18 +105,21 @@ class BlobServerProtocol(asyncio.Protocol): self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port) else: + self.close() log.debug("stopped sending %s to %s:%i", blob_hash, peer_address, peer_port) - except (OSError, asyncio.TimeoutError) as err: + return + except (OSError, ValueError, asyncio.TimeoutError) as err: if isinstance(err, asyncio.TimeoutError): log.debug("timed out sending blob %s to %s", blob_hash, peer_address) else: log.warning("could not read blob %s to send %s:%i", blob_hash, peer_address, peer_port) self.close() + return finally: self.transfer_finished.set() else: log.info("don't have %s to send %s:%i", blob.blob_hash[:8], peer_address, peer_port) - if responses: + if responses and not self.transport.is_closing(): self.send_response(responses) def data_received(self, data): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index d7333e967..3af5ac95f 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1402,7 +1402,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.ledger.broadcast(tx) - await self.analytics_manager.send_credits_sent() + self.component_manager.loop.create_task(self.analytics_manager.send_credits_sent()) else: await self.ledger.release_tx(tx) @@ -2458,7 +2458,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.storage.save_claims([self._old_get_temp_claim_info( tx, txo, claim_address, claim, name, dewies_to_lbc(amount) )]) - await self.analytics_manager.send_new_channel() + self.component_manager.loop.create_task(self.analytics_manager.send_new_channel()) else: await account.ledger.release_tx(tx) @@ -2614,7 +2614,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.storage.save_claims([self._old_get_temp_claim_info( tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount) )]) - await self.analytics_manager.send_new_channel() + self.component_manager.loop.create_task(self.analytics_manager.send_new_channel()) else: await account.ledger.release_tx(tx) @@ -2673,7 +2673,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('abandon') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon')) else: await account.ledger.release_tx(tx) @@ -2993,7 +2993,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3148,7 +3148,7 @@ class Daemon(metaclass=JSONRPCServerType): tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount) )]) await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3333,17 +3333,16 @@ class Daemon(metaclass=JSONRPCServerType): stream_hash = None if not preview: - old_stream_hash = await self.storage.get_stream_hash_for_sd_hash(old_txo.claim.stream.source.sd_hash) + old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None) if file_path is not None: - if old_stream_hash: - stream_to_delete = self.stream_manager.get_stream_by_stream_hash(old_stream_hash) - await self.stream_manager.delete_stream(stream_to_delete, delete_file=False) + if old_stream: + await self.stream_manager.delete_stream(old_stream, delete_file=False) file_stream = await self.stream_manager.create_stream(file_path) new_txo.claim.stream.source.sd_hash = file_stream.sd_hash new_txo.script.generate() stream_hash = file_stream.stream_hash - else: - stream_hash = old_stream_hash + elif old_stream: + stream_hash = old_stream.stream_hash if channel: new_txo.sign(channel) @@ -3356,7 +3355,7 @@ class Daemon(metaclass=JSONRPCServerType): )]) if stream_hash: await self.storage.save_content_claim(stream_hash, new_txo.id) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3415,7 +3414,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('abandon') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon')) else: await self.ledger.release_tx(tx) @@ -3580,7 +3579,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3734,7 +3733,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3886,7 +3885,7 @@ class Daemon(metaclass=JSONRPCServerType): 'claim_id': claim_id, 'amount': dewies_to_lbc(amount) }]}) - await self.analytics_manager.send_claim_action('new_support') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('new_support')) else: await self.ledger.release_tx(tx) @@ -3986,7 +3985,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('abandon') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon')) else: await self.ledger.release_tx(tx) @@ -4390,7 +4389,7 @@ class Daemon(metaclass=JSONRPCServerType): else: server, port = random.choice(self.conf.reflector_servers) reflected = await asyncio.gather(*[ - stream.upload_to_reflector(server, port) + self.stream_manager.reflect_stream(stream, server, port) for stream in self.stream_manager.get_filtered_streams(**kwargs) ]) total = [] diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 40c42b0d9..119f7f01e 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -77,7 +77,7 @@ class StreamManager: self.resume_saving_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] - self.running_reflector_uploads: typing.List[asyncio.Task] = [] + self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.started = asyncio.Event(loop=self.loop) async def _update_content_claim(self, stream: ManagedStream): @@ -185,10 +185,10 @@ class StreamManager: batch = [] while sd_hashes: stream = self.streams[sd_hashes.pop()] - if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed: - if not stream.fully_reflected.is_set(): - host, port = random.choice(self.config.reflector_servers) - batch.append(stream.upload_to_reflector(host, port)) + if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed and \ + stream.sd_hash not in self.running_reflector_uploads and not \ + stream.fully_reflected.is_set(): + batch.append(self.reflect_stream(stream)) if len(batch) >= self.config.concurrent_reflector_uploads: await asyncio.gather(*batch, loop=self.loop) batch = [] @@ -212,26 +212,37 @@ class StreamManager: while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: - self.running_reflector_uploads.pop().cancel() + _, t = self.running_reflector_uploads.popitem() + t.cancel() self.started.clear() log.info("finished stopping the stream manager") + def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None, + port: Optional[int] = None) -> asyncio.Task: + if not server or not port: + server, port = random.choice(self.config.reflector_servers) + if stream.sd_hash in self.running_reflector_uploads: + return self.running_reflector_uploads[stream.sd_hash] + task = self.loop.create_task(stream.upload_to_reflector(server, port)) + self.running_reflector_uploads[stream.sd_hash] = task + task.add_done_callback( + lambda _: None if stream.sd_hash not in self.running_reflector_uploads else + self.running_reflector_uploads.pop(stream.sd_hash) + ) + return task + async def create_stream(self, file_path: str, key: Optional[bytes] = None, iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator) self.streams[stream.sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: - host, port = random.choice(self.config.reflector_servers) - task = self.loop.create_task(stream.upload_to_reflector(host, port)) - self.running_reflector_uploads.append(task) - task.add_done_callback( - lambda _: None - if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task) - ) + self.reflect_stream(stream) return stream async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False): + if stream.sd_hash in self.running_reflector_uploads: + self.running_reflector_uploads[stream.sd_hash].cancel() stream.stop_tasks() if stream.sd_hash in self.streams: del self.streams[stream.sd_hash] diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index 28a414996..f7c011e3b 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -219,6 +219,7 @@ class TestBlobExchange(BlobExchangeTestBase): server_protocol = BlobServerProtocol(self.loop, self.server_blob_manager, self.server.lbrycrd_address) transport = asyncio.Transport(extra={'peername': ('ip', 90)}) received_data = BytesIO() + transport.is_closing = lambda: received_data.closed transport.write = received_data.write server_protocol.connection_made(transport) blob_request = BlobRequest.make_request_for_blob_hash(blob_hash).serialize()