small clarifying changes

This commit is contained in:
Alex Grintsvayg 2019-10-04 09:18:54 -04:00
parent 44b27bbfc8
commit 284fdb2fcb
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
5 changed files with 19 additions and 18 deletions

View file

@ -37,12 +37,8 @@ class BlobManager:
self.connection_manager = ConnectionManager(loop) self.connection_manager = ConnectionManager(loop)
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None): def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
if self.config.save_blobs: if self.config.save_blobs or (
return BlobFile( is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))):
self.loop, blob_hash, length, self.blob_completed, self.blob_dir
)
else:
if is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
return BlobFile( return BlobFile(
self.loop, blob_hash, length, self.blob_completed, self.blob_dir self.loop, blob_hash, length, self.blob_completed, self.blob_dir
) )
@ -82,6 +78,7 @@ class BlobManager:
return { return {
item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name) item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name)
} }
in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir) in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir)
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir) to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
if to_add: if to_add:

View file

@ -112,10 +112,10 @@ class PeerManager:
delay = self._loop.time() - constants.check_refresh_interval delay = self._loop.time() - constants.check_refresh_interval
# fixme: find a way to re-enable that without breaking other parts # fixme: find a way to re-enable that without breaking other parts
#if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: # if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping:
# return # return
#addr_tup = (address, udp_port) # addr_tup = (address, udp_port)
#if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id: # if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id:
# return # return
previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None)) previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None))
last_requested = self._last_requested.get((address, udp_port)) last_requested = self._last_requested.get((address, udp_port))

View file

@ -71,7 +71,7 @@ class StreamDescriptor:
self.sd_hash = sd_hash self.sd_hash = sd_hash
@property @property
def length(self): def length(self) -> int:
return len(self.as_json()) return len(self.as_json())
def get_stream_hash(self) -> str: def get_stream_hash(self) -> str:
@ -114,7 +114,7 @@ class StreamDescriptor:
]) ])
).encode() ).encode()
def calculate_old_sort_sd_hash(self): def calculate_old_sort_sd_hash(self) -> str:
h = get_lbry_hash_obj() h = get_lbry_hash_obj()
h.update(self.old_sort_json()) h.update(self.old_sort_json())
return h.hexdigest() return h.hexdigest()

View file

@ -11,6 +11,8 @@ from lbry.stream.downloader import StreamDownloader
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from lbry.stream.reflector.client import StreamReflectorClient from lbry.stream.reflector.client import StreamReflectorClient
from lbry.extras.daemon.storage import StoredStreamClaim from lbry.extras.daemon.storage import StoredStreamClaim
from lbry.blob import MAX_BLOB_SIZE
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
@ -215,6 +217,9 @@ class ManagedStream:
async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
file_path: str, key: typing.Optional[bytes] = None, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream':
"""
Generate a stream from a file and save it to the db
"""
descriptor = await StreamDescriptor.create_stream( descriptor = await StreamDescriptor.create_stream(
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
blob_completed_callback=blob_manager.blob_completed blob_completed_callback=blob_manager.blob_completed
@ -295,7 +300,7 @@ class ManagedStream:
if not wrote: if not wrote:
decrypted = decrypted[first_blob_start_offset:] decrypted = decrypted[first_blob_start_offset:]
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size): if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151))) decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * (MAX_BLOB_SIZE - 1))))
log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1, log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1,
len(self.descriptor.blobs) - 1) len(self.descriptor.blobs) - 1)
await response.write_eof(decrypted) await response.write_eof(decrypted)
@ -486,8 +491,8 @@ class ManagedStream:
if end >= size: if end >= size:
raise HTTPRequestRangeNotSatisfiable() raise HTTPRequestRangeNotSatisfiable()
skip_blobs = start // 2097150 skip_blobs = start // (MAX_BLOB_SIZE - 2) # -2 because ... dont remember
skip = skip_blobs * 2097151 skip = skip_blobs * (MAX_BLOB_SIZE - 1) # -1 because
skip_first_blob = start - skip skip_first_blob = start - skip
start = skip_first_blob + skip start = skip_first_blob + skip
final_size = end - start + 1 final_size = end - start + 1

View file

@ -19,7 +19,6 @@ import collections
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.cryptoutils import get_lbry_hash_obj from lbry.cryptoutils import get_lbry_hash_obj
log = logging.getLogger(__name__) log = logging.getLogger(__name__)