make added_on a required parameter on BlobInfo, fix callers

This commit is contained in:
Victor Shyba 2022-03-23 13:41:02 -03:00
parent cb78e95e3d
commit 200761ff13
6 changed files with 15 additions and 13 deletions

View file

@ -201,7 +201,7 @@ class AbstractBlob:
writer = blob.get_blob_writer() writer = blob.get_blob_writer()
writer.write(blob_bytes) writer.write(blob_bytes)
await blob.verified.wait() await blob.verified.wait()
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash, added_on, is_mine) return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), added_on, blob_hash, is_mine)
def save_verified_blob(self, verified_bytes: bytes): def save_verified_blob(self, verified_bytes: bytes):
if self.verified.is_set(): if self.verified.is_set():

View file

@ -1,4 +1,3 @@
import time
import typing import typing
@ -13,13 +12,13 @@ class BlobInfo:
] ]
def __init__( def __init__(
self, blob_num: int, length: int, iv: str, self, blob_num: int, length: int, iv: str, added_on,
blob_hash: typing.Optional[str] = None, added_on=0, is_mine=False): blob_hash: typing.Optional[str] = None, is_mine=False):
self.blob_hash = blob_hash self.blob_hash = blob_hash
self.blob_num = blob_num self.blob_num = blob_num
self.length = length self.length = length
self.iv = iv self.iv = iv
self.added_on = added_on or time.time() self.added_on = added_on
self.is_mine = is_mine self.is_mine = is_mine
def as_dict(self) -> typing.Dict: def as_dict(self) -> typing.Dict:

View file

@ -20,7 +20,7 @@ def do_migration(conf):
"left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall() "left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
blobs_by_stream = {} blobs_by_stream = {}
for stream_hash, position, iv, blob_hash, blob_length in blobs: for stream_hash, position, iv, blob_hash, blob_length in blobs:
blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, blob_hash)) blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, 0, blob_hash))
for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams: for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
sd = StreamDescriptor(None, blob_dir, stream_name, stream_key, suggested_filename, sd = StreamDescriptor(None, blob_dir, stream_name, stream_key, suggested_filename,

View file

@ -532,7 +532,8 @@ class SQLiteStorage(SQLiteMixin):
def _get_blobs_for_stream(transaction): def _get_blobs_for_stream(transaction):
crypt_blob_infos = [] crypt_blob_infos = []
stream_blobs = transaction.execute( stream_blobs = transaction.execute(
"select blob_hash, position, iv from stream_blob where stream_hash=? " "select s.blob_hash, s.position, s.iv, b.added_on "
"from stream_blob s left outer join blob b on b.blob_hash=s.blob_hash where stream_hash=? "
"order by position asc", (stream_hash, ) "order by position asc", (stream_hash, )
).fetchall() ).fetchall()
if only_completed: if only_completed:
@ -552,9 +553,10 @@ class SQLiteStorage(SQLiteMixin):
for blob_hash, length in lengths: for blob_hash, length in lengths:
blob_length_dict[blob_hash] = length blob_length_dict[blob_hash] = length
for blob_hash, position, iv in stream_blobs: current_time = time.time()
for blob_hash, position, iv, added_on in stream_blobs:
blob_length = blob_length_dict.get(blob_hash, 0) blob_length = blob_length_dict.get(blob_hash, 0)
crypt_blob_infos.append(BlobInfo(position, blob_length, iv, blob_hash)) crypt_blob_infos.append(BlobInfo(position, blob_length, iv, added_on or current_time, blob_hash))
if not blob_hash: if not blob_hash:
break break
return crypt_blob_infos return crypt_blob_infos

View file

@ -194,12 +194,13 @@ class StreamDescriptor:
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
if any(i != blob_info['blob_num'] for i, blob_info in enumerate(decoded['blobs'])): if any(i != blob_info['blob_num'] for i, blob_info in enumerate(decoded['blobs'])):
raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs") raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs")
added_on = time.time()
descriptor = cls( descriptor = cls(
loop, blob_dir, loop, blob_dir,
binascii.unhexlify(decoded['stream_name']).decode(), binascii.unhexlify(decoded['stream_name']).decode(),
decoded['key'], decoded['key'],
binascii.unhexlify(decoded['suggested_file_name']).decode(), binascii.unhexlify(decoded['suggested_file_name']).decode(),
[BlobInfo(info['blob_num'], info['length'], info['iv'], info.get('blob_hash')) [BlobInfo(info['blob_num'], info['length'], info['iv'], added_on, info.get('blob_hash'))
for info in decoded['blobs']], for info in decoded['blobs']],
decoded['stream_hash'], decoded['stream_hash'],
blob.blob_hash blob.blob_hash
@ -266,7 +267,7 @@ class StreamDescriptor:
blobs.append(blob_info) blobs.append(blob_info)
blobs.append( blobs.append(
# add the stream terminator # add the stream terminator
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), None, added_on, True) BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), added_on, None, True)
) )
file_name = os.path.basename(file_path) file_name = os.path.basename(file_path)
suggested_file_name = sanitize_file_name(file_name) suggested_file_name = sanitize_file_name(file_name)

View file

@ -84,7 +84,7 @@ class StorageTest(AsyncioTestCase):
await self.storage.add_blobs((blob_hash, length, 0, 0), finished=True) await self.storage.add_blobs((blob_hash, length, 0, 0), finished=True)
async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"): async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"):
blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", random_lbry_hash())] blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", 0, random_lbry_hash())]
descriptor = StreamDescriptor( descriptor = StreamDescriptor(
asyncio.get_event_loop(), self.blob_dir, file_name, key, file_name, blobs, stream_hash asyncio.get_event_loop(), self.blob_dir, file_name, key, file_name, blobs, stream_hash
) )
@ -95,7 +95,7 @@ class StorageTest(AsyncioTestCase):
async def make_and_store_fake_stream(self, blob_count=2, stream_hash=None): async def make_and_store_fake_stream(self, blob_count=2, stream_hash=None):
stream_hash = stream_hash or random_lbry_hash() stream_hash = stream_hash or random_lbry_hash()
blobs = [ blobs = [
BlobInfo(i + 1, 100, "DEADBEEF", random_lbry_hash()) BlobInfo(i + 1, 100, "DEADBEEF", 0, random_lbry_hash())
for i in range(blob_count) for i in range(blob_count)
] ]
await self.store_fake_stream(stream_hash, blobs) await self.store_fake_stream(stream_hash, blobs)