forked from LBRYCommunity/lbry-sdk
reduce the amount of Blob objects created
This commit is contained in:
parent
a7d94f4514
commit
3a997277aa
5 changed files with 18 additions and 10 deletions
|
@ -61,6 +61,15 @@ class BlobManager:
|
||||||
self.blobs[blob_hash] = self._get_blob(blob_hash, length)
|
self.blobs[blob_hash] = self._get_blob(blob_hash, length)
|
||||||
return self.blobs[blob_hash]
|
return self.blobs[blob_hash]
|
||||||
|
|
||||||
|
def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool:
|
||||||
|
if not is_valid_blobhash(blob_hash):
|
||||||
|
raise ValueError(blob_hash)
|
||||||
|
if blob_hash in self.blobs:
|
||||||
|
return self.blobs[blob_hash].get_is_verified()
|
||||||
|
if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
|
||||||
|
return False
|
||||||
|
return self._get_blob(blob_hash, length).get_is_verified()
|
||||||
|
|
||||||
async def setup(self) -> bool:
|
async def setup(self) -> bool:
|
||||||
def get_files_in_blob_dir() -> typing.Set[str]:
|
def get_files_in_blob_dir() -> typing.Set[str]:
|
||||||
if not self.blob_dir:
|
if not self.blob_dir:
|
||||||
|
@ -97,8 +106,7 @@ class BlobManager:
|
||||||
|
|
||||||
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
||||||
"""Returns of the blobhashes_to_check, which are valid"""
|
"""Returns of the blobhashes_to_check, which are valid"""
|
||||||
blobs = [self.get_blob(b) for b in blob_hashes]
|
return [blob_hash for blob_hash in blob_hashes if self.is_blob_verified(blob_hash)]
|
||||||
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
|
||||||
|
|
||||||
def delete_blob(self, blob_hash: str):
|
def delete_blob(self, blob_hash: str):
|
||||||
if not is_valid_blobhash(blob_hash):
|
if not is_valid_blobhash(blob_hash):
|
||||||
|
|
|
@ -3054,9 +3054,9 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
else:
|
else:
|
||||||
blobs = list(self.blob_manager.completed_blob_hashes)
|
blobs = list(self.blob_manager.completed_blob_hashes)
|
||||||
if needed:
|
if needed:
|
||||||
blobs = [blob_hash for blob_hash in blobs if not self.blob_manager.get_blob(blob_hash).get_is_verified()]
|
blobs = [blob_hash for blob_hash in blobs if not self.blob_manager.is_blob_verified(blob_hash)]
|
||||||
if finished:
|
if finished:
|
||||||
blobs = [blob_hash for blob_hash in blobs if self.blob_manager.get_blob(blob_hash).get_is_verified()]
|
blobs = [blob_hash for blob_hash in blobs if self.blob_manager.is_blob_verified(blob_hash)]
|
||||||
page_size = page_size or len(blobs)
|
page_size = page_size or len(blobs)
|
||||||
page = page or 0
|
page = page or 0
|
||||||
start_index = page * page_size
|
start_index = page * page_size
|
||||||
|
|
|
@ -139,7 +139,7 @@ class ManagedStream:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def blobs_completed(self) -> int:
|
def blobs_completed(self) -> int:
|
||||||
return sum([1 if self.blob_manager.get_blob(b.blob_hash).get_is_verified() else 0
|
return sum([1 if self.blob_manager.is_blob_verified(b.blob_hash) else 0
|
||||||
for b in self.descriptor.blobs[:-1]])
|
for b in self.descriptor.blobs[:-1]])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -64,7 +64,7 @@ class StreamReflectorClient(asyncio.Protocol):
|
||||||
|
|
||||||
async def send_descriptor(self) -> typing.Tuple[bool, typing.List[str]]: # returns a list of needed blob hashes
|
async def send_descriptor(self) -> typing.Tuple[bool, typing.List[str]]: # returns a list of needed blob hashes
|
||||||
sd_blob = self.blob_manager.get_blob(self.descriptor.sd_hash)
|
sd_blob = self.blob_manager.get_blob(self.descriptor.sd_hash)
|
||||||
assert sd_blob.get_is_verified(), "need to have a sd blob to send at this point"
|
assert self.blob_manager.is_blob_verified(self.descriptor.sd_hash), "need to have sd blob to send at this point"
|
||||||
response = await self.send_request({
|
response = await self.send_request({
|
||||||
'sd_blob_hash': sd_blob.blob_hash,
|
'sd_blob_hash': sd_blob.blob_hash,
|
||||||
'sd_blob_size': sd_blob.length
|
'sd_blob_size': sd_blob.length
|
||||||
|
@ -80,7 +80,7 @@ class StreamReflectorClient(asyncio.Protocol):
|
||||||
sent_sd = True
|
sent_sd = True
|
||||||
if not needed:
|
if not needed:
|
||||||
for blob in self.descriptor.blobs[:-1]:
|
for blob in self.descriptor.blobs[:-1]:
|
||||||
if self.blob_manager.get_blob(blob.blob_hash, blob.length).get_is_verified():
|
if self.blob_manager.is_blob_verified(blob.blob_hash, blob.length):
|
||||||
needed.append(blob.blob_hash)
|
needed.append(blob.blob_hash)
|
||||||
log.info("Sent reflector descriptor %s", sd_blob.blob_hash[:8])
|
log.info("Sent reflector descriptor %s", sd_blob.blob_hash[:8])
|
||||||
self.reflected_blobs.append(sd_blob.blob_hash)
|
self.reflected_blobs.append(sd_blob.blob_hash)
|
||||||
|
@ -91,8 +91,8 @@ class StreamReflectorClient(asyncio.Protocol):
|
||||||
return sent_sd, needed
|
return sent_sd, needed
|
||||||
|
|
||||||
async def send_blob(self, blob_hash: str):
|
async def send_blob(self, blob_hash: str):
|
||||||
|
assert self.blob_manager.is_blob_verified(blob_hash), "need to have a blob to send at this point"
|
||||||
blob = self.blob_manager.get_blob(blob_hash)
|
blob = self.blob_manager.get_blob(blob_hash)
|
||||||
assert blob.get_is_verified(), "need to have a blob to send at this point"
|
|
||||||
response = await self.send_request({
|
response = await self.send_request({
|
||||||
'blob_hash': blob.blob_hash,
|
'blob_hash': blob.blob_hash,
|
||||||
'blob_size': blob.length
|
'blob_size': blob.length
|
||||||
|
|
|
@ -147,7 +147,7 @@ class StreamManager:
|
||||||
to_start = []
|
to_start = []
|
||||||
await self.storage.sync_files_to_blobs()
|
await self.storage.sync_files_to_blobs()
|
||||||
for file_info in await self.storage.get_all_lbry_files():
|
for file_info in await self.storage.get_all_lbry_files():
|
||||||
if not self.blob_manager.get_blob(file_info['sd_hash']).get_is_verified():
|
if not self.blob_manager.is_blob_verified(file_info['sd_hash']):
|
||||||
to_recover.append(file_info)
|
to_recover.append(file_info)
|
||||||
to_start.append(file_info)
|
to_start.append(file_info)
|
||||||
if to_recover:
|
if to_recover:
|
||||||
|
@ -185,7 +185,7 @@ class StreamManager:
|
||||||
batch = []
|
batch = []
|
||||||
while sd_hashes:
|
while sd_hashes:
|
||||||
stream = self.streams[sd_hashes.pop()]
|
stream = self.streams[sd_hashes.pop()]
|
||||||
if self.blob_manager.get_blob(stream.sd_hash).get_is_verified() and stream.blobs_completed:
|
if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed:
|
||||||
if not stream.fully_reflected.is_set():
|
if not stream.fully_reflected.is_set():
|
||||||
host, port = random.choice(self.config.reflector_servers)
|
host, port = random.choice(self.config.reflector_servers)
|
||||||
batch.append(stream.upload_to_reflector(host, port))
|
batch.append(stream.upload_to_reflector(host, port))
|
||||||
|
|
Loading…
Reference in a new issue