From 34eb856d093218c586142f863294f465ebda42cd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 3 Feb 2020 19:38:36 -0500 Subject: [PATCH 1/4] cancel reflector uploads upon file delete -remove unnecessary db call in stream_update --- lbry/extras/daemon/daemon.py | 13 ++++++------ lbry/stream/stream_manager.py | 37 +++++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index d7333e967..6068d2878 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -3333,17 +3333,16 @@ class Daemon(metaclass=JSONRPCServerType): stream_hash = None if not preview: - old_stream_hash = await self.storage.get_stream_hash_for_sd_hash(old_txo.claim.stream.source.sd_hash) + old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None) if file_path is not None: - if old_stream_hash: - stream_to_delete = self.stream_manager.get_stream_by_stream_hash(old_stream_hash) - await self.stream_manager.delete_stream(stream_to_delete, delete_file=False) + if old_stream: + await self.stream_manager.delete_stream(old_stream, delete_file=False) file_stream = await self.stream_manager.create_stream(file_path) new_txo.claim.stream.source.sd_hash = file_stream.sd_hash new_txo.script.generate() stream_hash = file_stream.stream_hash - else: - stream_hash = old_stream_hash + elif old_stream: + stream_hash = old_stream.stream_hash if channel: new_txo.sign(channel) @@ -4390,7 +4389,7 @@ class Daemon(metaclass=JSONRPCServerType): else: server, port = random.choice(self.conf.reflector_servers) reflected = await asyncio.gather(*[ - stream.upload_to_reflector(server, port) + self.stream_manager.reflect_stream(stream, server, port) for stream in self.stream_manager.get_filtered_streams(**kwargs) ]) total = [] diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 40c42b0d9..119f7f01e 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -77,7 +77,7 @@ class StreamManager: self.resume_saving_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] - self.running_reflector_uploads: typing.List[asyncio.Task] = [] + self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.started = asyncio.Event(loop=self.loop) async def _update_content_claim(self, stream: ManagedStream): @@ -185,10 +185,10 @@ class StreamManager: batch = [] while sd_hashes: stream = self.streams[sd_hashes.pop()] - if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed: - if not stream.fully_reflected.is_set(): - host, port = random.choice(self.config.reflector_servers) - batch.append(stream.upload_to_reflector(host, port)) + if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed and \ + stream.sd_hash not in self.running_reflector_uploads and not \ + stream.fully_reflected.is_set(): + batch.append(self.reflect_stream(stream)) if len(batch) >= self.config.concurrent_reflector_uploads: await asyncio.gather(*batch, loop=self.loop) batch = [] @@ -212,26 +212,37 @@ class StreamManager: while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: - self.running_reflector_uploads.pop().cancel() + _, t = self.running_reflector_uploads.popitem() + t.cancel() self.started.clear() log.info("finished stopping the stream manager") + def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None, + port: Optional[int] = None) -> asyncio.Task: + if not server or not port: + server, port = random.choice(self.config.reflector_servers) + if stream.sd_hash in self.running_reflector_uploads: + return self.running_reflector_uploads[stream.sd_hash] + task = self.loop.create_task(stream.upload_to_reflector(server, port)) + self.running_reflector_uploads[stream.sd_hash] = task + task.add_done_callback( + lambda _: None if stream.sd_hash not in self.running_reflector_uploads else + self.running_reflector_uploads.pop(stream.sd_hash) + ) + return task + async def create_stream(self, file_path: str, key: Optional[bytes] = None, iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator) self.streams[stream.sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: - host, port = random.choice(self.config.reflector_servers) - task = self.loop.create_task(stream.upload_to_reflector(host, port)) - self.running_reflector_uploads.append(task) - task.add_done_callback( - lambda _: None - if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task) - ) + self.reflect_stream(stream) return stream async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False): + if stream.sd_hash in self.running_reflector_uploads: + self.running_reflector_uploads[stream.sd_hash].cancel() stream.stop_tasks() if stream.sd_hash in self.streams: del self.streams[stream.sd_hash] From 2ed8ebff09b468e689abd00381bbec15b8219d0a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 3 Feb 2020 20:01:17 -0500 Subject: [PATCH 2/4] handle ConnectionError and ValueError in blob sendfile --- lbry/blob/blob_file.py | 2 +- lbry/blob_exchange/server.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index dfcf1f3aa..65e0d4a43 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -167,7 +167,7 @@ class AbstractBlob: with self.reader_context() as handle: try: return await self.loop.sendfile(writer.transport, handle, count=self.get_length()) - except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, AttributeError): + except (ConnectionError, BrokenPipeError, RuntimeError, OSError, AttributeError): return -1 def decrypt(self, key: bytes, iv: bytes) -> bytes: diff --git a/lbry/blob_exchange/server.py b/lbry/blob_exchange/server.py index 3a2547ba0..93f685233 100644 --- a/lbry/blob_exchange/server.py +++ b/lbry/blob_exchange/server.py @@ -105,8 +105,9 @@ class BlobServerProtocol(asyncio.Protocol): self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port) else: + self.close() log.debug("stopped sending %s to %s:%i", blob_hash, peer_address, peer_port) - except (OSError, asyncio.TimeoutError) as err: + except (OSError, ValueError, asyncio.TimeoutError) as err: if isinstance(err, asyncio.TimeoutError): log.debug("timed out sending blob %s to %s", blob_hash, peer_address) else: @@ -116,7 +117,7 @@ class BlobServerProtocol(asyncio.Protocol): self.transfer_finished.set() else: log.info("don't have %s to send %s:%i", blob.blob_hash[:8], peer_address, peer_port) - if responses: + if responses and not self.transport.is_closing(): self.send_response(responses) def data_received(self, data): From bf5b5f43e313904c32d4bf31dc329c19bb7962e7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 3 Feb 2020 20:15:10 -0500 Subject: [PATCH 3/4] non blocking analytics --- lbry/extras/daemon/daemon.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 6068d2878..3af5ac95f 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1402,7 +1402,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.ledger.broadcast(tx) - await self.analytics_manager.send_credits_sent() + self.component_manager.loop.create_task(self.analytics_manager.send_credits_sent()) else: await self.ledger.release_tx(tx) @@ -2458,7 +2458,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.storage.save_claims([self._old_get_temp_claim_info( tx, txo, claim_address, claim, name, dewies_to_lbc(amount) )]) - await self.analytics_manager.send_new_channel() + self.component_manager.loop.create_task(self.analytics_manager.send_new_channel()) else: await account.ledger.release_tx(tx) @@ -2614,7 +2614,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.storage.save_claims([self._old_get_temp_claim_info( tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount) )]) - await self.analytics_manager.send_new_channel() + self.component_manager.loop.create_task(self.analytics_manager.send_new_channel()) else: await account.ledger.release_tx(tx) @@ -2673,7 +2673,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('abandon') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon')) else: await account.ledger.release_tx(tx) @@ -2993,7 +2993,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3148,7 +3148,7 @@ class Daemon(metaclass=JSONRPCServerType): tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount) )]) await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3355,7 +3355,7 @@ class Daemon(metaclass=JSONRPCServerType): )]) if stream_hash: await self.storage.save_content_claim(stream_hash, new_txo.id) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3414,7 +3414,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('abandon') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon')) else: await self.ledger.release_tx(tx) @@ -3579,7 +3579,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3733,7 +3733,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('publish') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) else: await account.ledger.release_tx(tx) @@ -3885,7 +3885,7 @@ class Daemon(metaclass=JSONRPCServerType): 'claim_id': claim_id, 'amount': dewies_to_lbc(amount) }]}) - await self.analytics_manager.send_claim_action('new_support') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('new_support')) else: await self.ledger.release_tx(tx) @@ -3985,7 +3985,7 @@ class Daemon(metaclass=JSONRPCServerType): if not preview: await self.broadcast_or_release(tx, blocking) - await self.analytics_manager.send_claim_action('abandon') + self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon')) else: await self.ledger.release_tx(tx) From 111871bb286c2188614d27b1dff4d453df71a4e4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 3 Feb 2020 22:24:02 -0500 Subject: [PATCH 4/4] update test --- lbry/blob_exchange/server.py | 2 ++ tests/unit/blob_exchange/test_transfer_blob.py | 1 + 2 files changed, 3 insertions(+) diff --git a/lbry/blob_exchange/server.py b/lbry/blob_exchange/server.py index 93f685233..a2ff8b698 100644 --- a/lbry/blob_exchange/server.py +++ b/lbry/blob_exchange/server.py @@ -107,12 +107,14 @@ class BlobServerProtocol(asyncio.Protocol): else: self.close() log.debug("stopped sending %s to %s:%i", blob_hash, peer_address, peer_port) + return except (OSError, ValueError, asyncio.TimeoutError) as err: if isinstance(err, asyncio.TimeoutError): log.debug("timed out sending blob %s to %s", blob_hash, peer_address) else: log.warning("could not read blob %s to send %s:%i", blob_hash, peer_address, peer_port) self.close() + return finally: self.transfer_finished.set() else: diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index 28a414996..f7c011e3b 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -219,6 +219,7 @@ class TestBlobExchange(BlobExchangeTestBase): server_protocol = BlobServerProtocol(self.loop, self.server_blob_manager, self.server.lbrycrd_address) transport = asyncio.Transport(extra={'peername': ('ip', 90)}) received_data = BytesIO() + transport.is_closing = lambda: received_data.closed transport.write = received_data.write server_protocol.connection_made(transport) blob_request = BlobRequest.make_request_for_blob_hash(blob_hash).serialize()