Merge pull request #1850 from lbryio/resume

Properly resume streams and fixup with statement
This commit is contained in:
Jack Robison 2019-02-04 10:18:31 -05:00 committed by GitHub
commit 43799168ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 11 deletions

View file

@ -82,11 +82,11 @@ class StreamAssembler:
if not self.got_descriptor.is_set():
self.got_descriptor.set()
await self.after_got_descriptor()
self.stream_handle = open(self.output_path, 'wb')
await self.blob_manager.storage.store_stream(
self.sd_blob, self.descriptor
)
try:
with open(self.output_path, 'wb') as stream_handle:
self.stream_handle = stream_handle
for blob_info in self.descriptor.blobs[:-1]:
while True:
try:
@ -101,10 +101,8 @@ class StreamAssembler:
self.descriptor.sd_hash)
continue
self.stream_finished_event.set()
await self.after_finished()
finally:
self.stream_handle.close()
self.stream_finished_event.set()
await self.after_finished()
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
return self.blob_manager.get_blob(blob_hash, length)

View file

@ -142,11 +142,12 @@ class StreamManager:
log.warning("no DHT node given, cannot resume downloads")
return
await self.node.joined.wait()
resumed = 0
t = [self.start_stream(stream) for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING]
if resumed:
log.info("resuming %i downloads", t)
await asyncio.gather(*t, loop=self.loop)
t = [
stream.start_download(self.node)
for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING
]
if t:
log.info("resuming %i downloads", len(t))
async def reflect_streams(self):
while True: