Merge pull request #1839 from lbryio/faster-stream-manager-startup

faster stream manager startup
This commit is contained in:
Jack Robison 2019-02-01 14:00:19 -05:00 committed by GitHub
commit df0635103e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -12,7 +12,6 @@ from lbrynet.extras.daemon.storage import lbc_to_dewies
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbrynet.conf import Config from lbrynet.conf import Config
from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.dht.peer import KademliaPeer
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.wallet import LbryWalletManager from lbrynet.extras.wallet import LbryWalletManager
@ -64,26 +63,33 @@ class StreamManager:
claim_info = await self.storage.get_content_claim(stream.stream_hash) claim_info = await self.storage.get_content_claim(stream.stream_hash)
stream.set_claim(claim_info, smart_decode(claim_info['value'])) stream.set_claim(claim_info, smart_decode(claim_info['value']))
async def load_streams_from_database(self): async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim):
infos = await self.storage.get_all_lbry_files() sd_blob = self.blob_manager.get_blob(sd_hash)
for file_info in infos:
sd_blob = self.blob_manager.get_blob(file_info['sd_hash'])
if sd_blob.get_is_verified(): if sd_blob.get_is_verified():
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
downloader = StreamDownloader( downloader = StreamDownloader(
self.loop, self.config, self.blob_manager, descriptor.sd_hash, self.loop, self.config, self.blob_manager, descriptor.sd_hash,
binascii.unhexlify(file_info['download_directory']).decode(), download_directory,
binascii.unhexlify(file_info['file_name']).decode() file_name
) )
stream = ManagedStream( stream = ManagedStream(
self.loop, self.blob_manager, descriptor, self.loop, self.blob_manager, descriptor,
binascii.unhexlify(file_info['download_directory']).decode(), download_directory,
binascii.unhexlify(file_info['file_name']).decode(), file_name,
downloader, file_info['status'], file_info['claim'] downloader, status, claim
) )
self.streams.add(stream) self.streams.add(stream)
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def load_streams_from_database(self):
file_infos = await self.storage.get_all_lbry_files()
await asyncio.gather(*[
self.add_stream(
file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(),
binascii.unhexlify(file_info['download_directory']).decode(), file_info['status'], file_info['claim']
) for file_info in file_infos
])
async def resume(self): async def resume(self):
if not self.node: if not self.node:
log.warning("no DHT node given, cannot resume downloads") log.warning("no DHT node given, cannot resume downloads")