forked from LBRYCommunity/lbry-sdk
Merge pull request #2762 from lbryio/fix-2368
cancel reflector uploads upon file delete
This commit is contained in:
commit
c3233e03ef
5 changed files with 49 additions and 35 deletions
|
@ -167,7 +167,7 @@ class AbstractBlob:
|
|||
with self.reader_context() as handle:
|
||||
try:
|
||||
return await self.loop.sendfile(writer.transport, handle, count=self.get_length())
|
||||
except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError, AttributeError):
|
||||
except (ConnectionError, BrokenPipeError, RuntimeError, OSError, AttributeError):
|
||||
return -1
|
||||
|
||||
def decrypt(self, key: bytes, iv: bytes) -> bytes:
|
||||
|
|
|
@ -105,18 +105,21 @@ class BlobServerProtocol(asyncio.Protocol):
|
|||
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent)
|
||||
log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port)
|
||||
else:
|
||||
self.close()
|
||||
log.debug("stopped sending %s to %s:%i", blob_hash, peer_address, peer_port)
|
||||
except (OSError, asyncio.TimeoutError) as err:
|
||||
return
|
||||
except (OSError, ValueError, asyncio.TimeoutError) as err:
|
||||
if isinstance(err, asyncio.TimeoutError):
|
||||
log.debug("timed out sending blob %s to %s", blob_hash, peer_address)
|
||||
else:
|
||||
log.warning("could not read blob %s to send %s:%i", blob_hash, peer_address, peer_port)
|
||||
self.close()
|
||||
return
|
||||
finally:
|
||||
self.transfer_finished.set()
|
||||
else:
|
||||
log.info("don't have %s to send %s:%i", blob.blob_hash[:8], peer_address, peer_port)
|
||||
if responses:
|
||||
if responses and not self.transport.is_closing():
|
||||
self.send_response(responses)
|
||||
|
||||
def data_received(self, data):
|
||||
|
|
|
@ -1402,7 +1402,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.ledger.broadcast(tx)
|
||||
await self.analytics_manager.send_credits_sent()
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_credits_sent())
|
||||
else:
|
||||
await self.ledger.release_tx(tx)
|
||||
|
||||
|
@ -2458,7 +2458,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, txo, claim_address, claim, name, dewies_to_lbc(amount)
|
||||
)])
|
||||
await self.analytics_manager.send_new_channel()
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -2614,7 +2614,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
||||
)])
|
||||
await self.analytics_manager.send_new_channel()
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -2673,7 +2673,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.analytics_manager.send_claim_action('abandon')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -2993,7 +2993,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.analytics_manager.send_claim_action('publish')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -3148,7 +3148,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount)
|
||||
)])
|
||||
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id)
|
||||
await self.analytics_manager.send_claim_action('publish')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -3333,17 +3333,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
stream_hash = None
|
||||
if not preview:
|
||||
old_stream_hash = await self.storage.get_stream_hash_for_sd_hash(old_txo.claim.stream.source.sd_hash)
|
||||
old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None)
|
||||
if file_path is not None:
|
||||
if old_stream_hash:
|
||||
stream_to_delete = self.stream_manager.get_stream_by_stream_hash(old_stream_hash)
|
||||
await self.stream_manager.delete_stream(stream_to_delete, delete_file=False)
|
||||
if old_stream:
|
||||
await self.stream_manager.delete_stream(old_stream, delete_file=False)
|
||||
file_stream = await self.stream_manager.create_stream(file_path)
|
||||
new_txo.claim.stream.source.sd_hash = file_stream.sd_hash
|
||||
new_txo.script.generate()
|
||||
stream_hash = file_stream.stream_hash
|
||||
else:
|
||||
stream_hash = old_stream_hash
|
||||
elif old_stream:
|
||||
stream_hash = old_stream.stream_hash
|
||||
|
||||
if channel:
|
||||
new_txo.sign(channel)
|
||||
|
@ -3356,7 +3355,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
)])
|
||||
if stream_hash:
|
||||
await self.storage.save_content_claim(stream_hash, new_txo.id)
|
||||
await self.analytics_manager.send_claim_action('publish')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -3415,7 +3414,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.analytics_manager.send_claim_action('abandon')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon'))
|
||||
else:
|
||||
await self.ledger.release_tx(tx)
|
||||
|
||||
|
@ -3580,7 +3579,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.analytics_manager.send_claim_action('publish')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -3734,7 +3733,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.analytics_manager.send_claim_action('publish')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
|
@ -3886,7 +3885,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
'claim_id': claim_id,
|
||||
'amount': dewies_to_lbc(amount)
|
||||
}]})
|
||||
await self.analytics_manager.send_claim_action('new_support')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('new_support'))
|
||||
else:
|
||||
await self.ledger.release_tx(tx)
|
||||
|
||||
|
@ -3986,7 +3985,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.analytics_manager.send_claim_action('abandon')
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('abandon'))
|
||||
else:
|
||||
await self.ledger.release_tx(tx)
|
||||
|
||||
|
@ -4390,7 +4389,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
else:
|
||||
server, port = random.choice(self.conf.reflector_servers)
|
||||
reflected = await asyncio.gather(*[
|
||||
stream.upload_to_reflector(server, port)
|
||||
self.stream_manager.reflect_stream(stream, server, port)
|
||||
for stream in self.stream_manager.get_filtered_streams(**kwargs)
|
||||
])
|
||||
total = []
|
||||
|
|
|
@ -77,7 +77,7 @@ class StreamManager:
|
|||
self.resume_saving_task: Optional[asyncio.Task] = None
|
||||
self.re_reflect_task: Optional[asyncio.Task] = None
|
||||
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
||||
self.running_reflector_uploads: typing.List[asyncio.Task] = []
|
||||
self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {}
|
||||
self.started = asyncio.Event(loop=self.loop)
|
||||
|
||||
async def _update_content_claim(self, stream: ManagedStream):
|
||||
|
@ -185,10 +185,10 @@ class StreamManager:
|
|||
batch = []
|
||||
while sd_hashes:
|
||||
stream = self.streams[sd_hashes.pop()]
|
||||
if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed:
|
||||
if not stream.fully_reflected.is_set():
|
||||
host, port = random.choice(self.config.reflector_servers)
|
||||
batch.append(stream.upload_to_reflector(host, port))
|
||||
if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed and \
|
||||
stream.sd_hash not in self.running_reflector_uploads and not \
|
||||
stream.fully_reflected.is_set():
|
||||
batch.append(self.reflect_stream(stream))
|
||||
if len(batch) >= self.config.concurrent_reflector_uploads:
|
||||
await asyncio.gather(*batch, loop=self.loop)
|
||||
batch = []
|
||||
|
@ -212,26 +212,37 @@ class StreamManager:
|
|||
while self.update_stream_finished_futs:
|
||||
self.update_stream_finished_futs.pop().cancel()
|
||||
while self.running_reflector_uploads:
|
||||
self.running_reflector_uploads.pop().cancel()
|
||||
_, t = self.running_reflector_uploads.popitem()
|
||||
t.cancel()
|
||||
self.started.clear()
|
||||
log.info("finished stopping the stream manager")
|
||||
|
||||
def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None,
|
||||
port: Optional[int] = None) -> asyncio.Task:
|
||||
if not server or not port:
|
||||
server, port = random.choice(self.config.reflector_servers)
|
||||
if stream.sd_hash in self.running_reflector_uploads:
|
||||
return self.running_reflector_uploads[stream.sd_hash]
|
||||
task = self.loop.create_task(stream.upload_to_reflector(server, port))
|
||||
self.running_reflector_uploads[stream.sd_hash] = task
|
||||
task.add_done_callback(
|
||||
lambda _: None if stream.sd_hash not in self.running_reflector_uploads else
|
||||
self.running_reflector_uploads.pop(stream.sd_hash)
|
||||
)
|
||||
return task
|
||||
|
||||
async def create_stream(self, file_path: str, key: Optional[bytes] = None,
|
||||
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
||||
stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator)
|
||||
self.streams[stream.sd_hash] = stream
|
||||
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||
if self.config.reflect_streams and self.config.reflector_servers:
|
||||
host, port = random.choice(self.config.reflector_servers)
|
||||
task = self.loop.create_task(stream.upload_to_reflector(host, port))
|
||||
self.running_reflector_uploads.append(task)
|
||||
task.add_done_callback(
|
||||
lambda _: None
|
||||
if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task)
|
||||
)
|
||||
self.reflect_stream(stream)
|
||||
return stream
|
||||
|
||||
async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False):
|
||||
if stream.sd_hash in self.running_reflector_uploads:
|
||||
self.running_reflector_uploads[stream.sd_hash].cancel()
|
||||
stream.stop_tasks()
|
||||
if stream.sd_hash in self.streams:
|
||||
del self.streams[stream.sd_hash]
|
||||
|
|
|
@ -219,6 +219,7 @@ class TestBlobExchange(BlobExchangeTestBase):
|
|||
server_protocol = BlobServerProtocol(self.loop, self.server_blob_manager, self.server.lbrycrd_address)
|
||||
transport = asyncio.Transport(extra={'peername': ('ip', 90)})
|
||||
received_data = BytesIO()
|
||||
transport.is_closing = lambda: received_data.closed
|
||||
transport.write = received_data.write
|
||||
server_protocol.connection_made(transport)
|
||||
blob_request = BlobRequest.make_request_for_blob_hash(blob_hash).serialize()
|
||||
|
|
Loading…
Add table
Reference in a new issue