From 3a997277aa350cc940ecb0fee76ca804383ef1e5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 18 Apr 2019 15:19:06 -0400 Subject: [PATCH] reduce the amount of Blob objects created --- lbrynet/blob/blob_manager.py | 12 ++++++++++-- lbrynet/extras/daemon/Daemon.py | 4 ++-- lbrynet/stream/managed_stream.py | 2 +- lbrynet/stream/reflector/client.py | 6 +++--- lbrynet/stream/stream_manager.py | 4 ++-- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 24900a4d9..9ae0a0e0e 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -61,6 +61,15 @@ class BlobManager: self.blobs[blob_hash] = self._get_blob(blob_hash, length) return self.blobs[blob_hash] + def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool: + if not is_valid_blobhash(blob_hash): + raise ValueError(blob_hash) + if blob_hash in self.blobs: + return self.blobs[blob_hash].get_is_verified() + if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)): + return False + return self._get_blob(blob_hash, length).get_is_verified() + async def setup(self) -> bool: def get_files_in_blob_dir() -> typing.Set[str]: if not self.blob_dir: @@ -97,8 +106,7 @@ class BlobManager: def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]: """Returns of the blobhashes_to_check, which are valid""" - blobs = [self.get_blob(b) for b in blob_hashes] - return [blob.blob_hash for blob in blobs if blob.get_is_verified()] + return [blob_hash for blob_hash in blob_hashes if self.is_blob_verified(blob_hash)] def delete_blob(self, blob_hash: str): if not is_valid_blobhash(blob_hash): diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index a32ad6519..4727e116c 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -3054,9 +3054,9 @@ class Daemon(metaclass=JSONRPCServerType): else: blobs = list(self.blob_manager.completed_blob_hashes) if needed: - blobs = [blob_hash for blob_hash in blobs if not self.blob_manager.get_blob(blob_hash).get_is_verified()] + blobs = [blob_hash for blob_hash in blobs if not self.blob_manager.is_blob_verified(blob_hash)] if finished: - blobs = [blob_hash for blob_hash in blobs if self.blob_manager.get_blob(blob_hash).get_is_verified()] + blobs = [blob_hash for blob_hash in blobs if self.blob_manager.is_blob_verified(blob_hash)] page_size = page_size or len(blobs) page = page or 0 start_index = page * page_size diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 1584786d4..b722ac69b 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -139,7 +139,7 @@ class ManagedStream: @property def blobs_completed(self) -> int: - return sum([1 if self.blob_manager.get_blob(b.blob_hash).get_is_verified() else 0 + return sum([1 if self.blob_manager.is_blob_verified(b.blob_hash) else 0 for b in self.descriptor.blobs[:-1]]) @property diff --git a/lbrynet/stream/reflector/client.py b/lbrynet/stream/reflector/client.py index 88997051e..bd00ed412 100644 --- a/lbrynet/stream/reflector/client.py +++ b/lbrynet/stream/reflector/client.py @@ -64,7 +64,7 @@ class StreamReflectorClient(asyncio.Protocol): async def send_descriptor(self) -> typing.Tuple[bool, typing.List[str]]: # returns a list of needed blob hashes sd_blob = self.blob_manager.get_blob(self.descriptor.sd_hash) - assert sd_blob.get_is_verified(), "need to have a sd blob to send at this point" + assert self.blob_manager.is_blob_verified(self.descriptor.sd_hash), "need to have sd blob to send at this point" response = await self.send_request({ 'sd_blob_hash': sd_blob.blob_hash, 'sd_blob_size': sd_blob.length @@ -80,7 +80,7 @@ class StreamReflectorClient(asyncio.Protocol): sent_sd = True if not needed: for blob in self.descriptor.blobs[:-1]: - if self.blob_manager.get_blob(blob.blob_hash, blob.length).get_is_verified(): + if self.blob_manager.is_blob_verified(blob.blob_hash, blob.length): needed.append(blob.blob_hash) log.info("Sent reflector descriptor %s", sd_blob.blob_hash[:8]) self.reflected_blobs.append(sd_blob.blob_hash) @@ -91,8 +91,8 @@ class StreamReflectorClient(asyncio.Protocol): return sent_sd, needed async def send_blob(self, blob_hash: str): + assert self.blob_manager.is_blob_verified(blob_hash), "need to have a blob to send at this point" blob = self.blob_manager.get_blob(blob_hash) - assert blob.get_is_verified(), "need to have a blob to send at this point" response = await self.send_request({ 'blob_hash': blob.blob_hash, 'blob_size': blob.length diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index f84dfe6a3..2b896d16e 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -147,7 +147,7 @@ class StreamManager: to_start = [] await self.storage.sync_files_to_blobs() for file_info in await self.storage.get_all_lbry_files(): - if not self.blob_manager.get_blob(file_info['sd_hash']).get_is_verified(): + if not self.blob_manager.is_blob_verified(file_info['sd_hash']): to_recover.append(file_info) to_start.append(file_info) if to_recover: @@ -185,7 +185,7 @@ class StreamManager: batch = [] while sd_hashes: stream = self.streams[sd_hashes.pop()] - if self.blob_manager.get_blob(stream.sd_hash).get_is_verified() and stream.blobs_completed: + if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed: if not stream.fully_reflected.is_set(): host, port = random.choice(self.config.reflector_servers) batch.append(stream.upload_to_reflector(host, port))