announces naturally, removes redundant calls
This commit is contained in:
parent
e9b58577dd
commit
6ead932ccb
2 changed files with 4 additions and 4 deletions
|
@ -59,6 +59,7 @@ class StreamAssembler:
|
|||
self.wrote_bytes_event.set()
|
||||
|
||||
await self.loop.run_in_executor(None, _decrypt_and_write)
|
||||
return True
|
||||
|
||||
async def setup(self):
|
||||
pass
|
||||
|
@ -76,6 +77,7 @@ class StreamAssembler:
|
|||
self.sd_blob = await self.get_blob(self.sd_hash)
|
||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
|
||||
self.sd_blob)
|
||||
await self.after_got_descriptor()
|
||||
self.output_path = await get_next_available_file_name(self.loop, output_dir,
|
||||
output_file_name or self.descriptor.suggested_file_name)
|
||||
if not self.got_descriptor.is_set():
|
||||
|
@ -84,14 +86,14 @@ class StreamAssembler:
|
|||
self.sd_blob, self.descriptor
|
||||
)
|
||||
await self.blob_manager.blob_completed(self.sd_blob)
|
||||
await self.after_got_descriptor()
|
||||
with open(self.output_path, 'wb') as stream_handle:
|
||||
self.stream_handle = stream_handle
|
||||
for blob_info in self.descriptor.blobs[:-1]:
|
||||
while True:
|
||||
try:
|
||||
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
|
||||
await self._decrypt_blob(blob, blob_info, self.descriptor.key)
|
||||
if await self._decrypt_blob(blob, blob_info, self.descriptor.key):
|
||||
await self.blob_manager.blob_completed(blob)
|
||||
break
|
||||
except FileNotFoundError:
|
||||
log.debug("stream assembler stopped")
|
||||
|
|
|
@ -47,11 +47,9 @@ class StreamDownloader(StreamAssembler):
|
|||
async def after_got_descriptor(self):
|
||||
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
||||
log.info("added head blob to search")
|
||||
await self.blob_manager.set_should_announce(self.sd_hash, 1)
|
||||
|
||||
async def after_finished(self):
|
||||
log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path)
|
||||
await self.blob_manager.set_should_announce(self.descriptor.blobs[0].blob_hash, 1)
|
||||
await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished')
|
||||
|
||||
def stop(self):
|
||||
|
|
Loading…
Add table
Reference in a new issue