lbry-sdk/lbry/stream/descriptor.py

305 lines
12 KiB
Python
Raw Permalink Normal View History

2019-01-22 12:54:17 -05:00
import os
import json
import binascii
import logging
import typing
import asyncio
2021-09-10 10:53:52 -04:00
import time
2019-10-08 20:03:27 +02:00
import re
from collections import OrderedDict
2019-01-22 12:54:17 -05:00
from cryptography.hazmat.primitives.ciphers.algorithms import AES
2019-06-20 20:55:47 -04:00
from lbry.blob import MAX_BLOB_SIZE
from lbry.blob.blob_info import BlobInfo
from lbry.blob.blob_file import AbstractBlob, BlobFile
2020-01-03 01:44:41 -03:00
from lbry.utils import get_lbry_hash_obj
2019-06-20 20:55:47 -04:00
from lbry.error import InvalidStreamDescriptorError
2019-01-22 12:54:17 -05:00
log = logging.getLogger(__name__)
2019-10-14 15:33:26 +02:00
RE_ILLEGAL_FILENAME_CHARS = re.compile(
2020-01-03 02:35:38 -03:00
r'('
2020-01-03 04:00:33 -03:00
r'[<>:"/\\|?*]+|' # Illegal characters
r'[\x00-\x1F]+|' # All characters in range 0-31
2020-01-03 02:35:38 -03:00
r'[ \t]*(\.)+[ \t]*$|' # Dots at the end
r'(^[ \t]+|[ \t]+$)|' # Leading and trailing whitespace
r'^CON$|^PRN$|^AUX$|' # Illegal names
r'^NUL$|^COM[1-9]$|^LPT[1-9]$' # ...
r')'
2019-10-14 15:33:26 +02:00
)
2019-01-22 12:54:17 -05:00
def format_sd_info(stream_name: str, key: str, suggested_file_name: str, stream_hash: str,
blobs: typing.List[typing.Dict]) -> typing.Dict:
return {
"stream_type": "lbryfile",
"stream_name": stream_name,
"key": key,
"suggested_file_name": suggested_file_name,
"stream_hash": stream_hash,
"blobs": blobs
}
def random_iv_generator() -> typing.Generator[bytes, None, None]:
while 1:
yield os.urandom(AES.block_size // 8)
def read_bytes(file_path: str, offset: int, to_read: int):
with open(file_path, 'rb') as f:
f.seek(offset)
return f.read(to_read)
async def file_reader(file_path: str):
2019-01-22 12:54:17 -05:00
length = int(os.stat(file_path).st_size)
offset = 0
while offset < length:
bytes_to_read = min((length - offset), MAX_BLOB_SIZE - 1)
if not bytes_to_read:
break
blob_bytes = await asyncio.get_event_loop().run_in_executor(
None, read_bytes, file_path, offset, bytes_to_read
)
yield blob_bytes
offset += bytes_to_read
2019-01-22 12:54:17 -05:00
2019-10-19 19:44:32 +02:00
def sanitize_file_name(dirty_name: str, default_file_name: str = 'lbry_download'):
2019-10-08 22:43:11 +02:00
file_name, ext = os.path.splitext(dirty_name)
2019-10-14 15:33:26 +02:00
file_name = re.sub(RE_ILLEGAL_FILENAME_CHARS, '', file_name)
ext = re.sub(RE_ILLEGAL_FILENAME_CHARS, '', ext)
2019-10-08 22:43:11 +02:00
if not file_name:
2019-10-19 19:44:32 +02:00
log.warning('Unable to sanitize file name for %s, returning default value %s', dirty_name, default_file_name)
file_name = default_file_name
if len(ext) > 1:
2019-10-08 22:43:11 +02:00
file_name += ext
2019-10-08 20:03:27 +02:00
return file_name
2019-01-22 12:54:17 -05:00
class StreamDescriptor:
__slots__ = [
'loop',
'blob_dir',
'stream_name',
'key',
'suggested_file_name',
'blobs',
'stream_hash',
'sd_hash'
]
2019-08-02 13:14:41 -04:00
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, stream_name: str, key: str,
2019-01-22 12:54:17 -05:00
suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None,
sd_hash: typing.Optional[str] = None):
self.loop = loop
self.blob_dir = blob_dir
self.stream_name = stream_name
self.key = key
2019-10-09 19:32:52 +02:00
self.suggested_file_name = suggested_file_name
2019-01-22 12:54:17 -05:00
self.blobs = blobs
self.stream_hash = stream_hash or self.get_stream_hash()
self.sd_hash = sd_hash
@property
2019-10-04 09:18:54 -04:00
def length(self) -> int:
return len(self.as_json())
2019-01-22 12:54:17 -05:00
def get_stream_hash(self) -> str:
return self.calculate_stream_hash(
binascii.hexlify(self.stream_name.encode()), self.key.encode(),
binascii.hexlify(self.suggested_file_name.encode()),
[blob_info.as_dict() for blob_info in self.blobs]
)
def calculate_sd_hash(self) -> str:
h = get_lbry_hash_obj()
h.update(self.as_json())
return h.hexdigest()
def as_json(self) -> bytes:
return json.dumps(
format_sd_info(binascii.hexlify(self.stream_name.encode()).decode(), self.key,
binascii.hexlify(self.suggested_file_name.encode()).decode(),
self.stream_hash,
[blob_info.as_dict() for blob_info in self.blobs]), sort_keys=True
).encode()
def old_sort_json(self) -> bytes:
blobs = []
2020-01-03 02:35:38 -03:00
for blob in self.blobs:
blobs.append(OrderedDict(
2020-01-03 02:35:38 -03:00
[('length', blob.length), ('blob_num', blob.blob_num), ('iv', blob.iv)] if not blob.blob_hash else
[('length', blob.length), ('blob_num', blob.blob_num), ('blob_hash', blob.blob_hash), ('iv', blob.iv)]
))
2020-01-03 02:35:38 -03:00
if not blob.blob_hash:
break
return json.dumps(
OrderedDict([
('stream_name', binascii.hexlify(self.stream_name.encode()).decode()),
('blobs', blobs),
('stream_type', 'lbryfile'),
('key', self.key),
('suggested_file_name', binascii.hexlify(self.suggested_file_name.encode()).decode()),
('stream_hash', self.stream_hash),
])
).encode()
2019-10-04 09:18:54 -04:00
def calculate_old_sort_sd_hash(self) -> str:
h = get_lbry_hash_obj()
h.update(self.old_sort_json())
return h.hexdigest()
async def make_sd_blob(
self, blob_file_obj: typing.Optional[AbstractBlob] = None, old_sort: typing.Optional[bool] = False,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
added_on: float = None, is_mine: bool = False
):
sd_hash = self.calculate_sd_hash() if not old_sort else self.calculate_old_sort_sd_hash()
if not old_sort:
sd_data = self.as_json()
else:
sd_data = self.old_sort_json()
sd_blob = blob_file_obj or BlobFile(
self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir, added_on, is_mine
)
if blob_file_obj:
blob_file_obj.set_length(len(sd_data))
2019-01-22 12:54:17 -05:00
if not sd_blob.get_is_verified():
writer = sd_blob.get_blob_writer()
2019-01-22 12:54:17 -05:00
writer.write(sd_data)
2019-01-22 12:54:17 -05:00
await sd_blob.verified.wait()
sd_blob.close()
2019-01-22 12:54:17 -05:00
return sd_blob
@classmethod
2019-08-02 13:14:41 -04:00
def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str,
blob: AbstractBlob) -> 'StreamDescriptor':
with blob.reader_context() as blob_reader:
json_bytes = blob_reader.read()
try:
decoded = json.loads(json_bytes.decode())
except json.JSONDecodeError:
blob.delete()
raise InvalidStreamDescriptorError("Does not decode as valid JSON")
2019-01-22 12:54:17 -05:00
if decoded['blobs'][-1]['length'] != 0:
raise InvalidStreamDescriptorError("Does not end with a zero-length blob.")
if any(blob_info['length'] == 0 for blob_info in decoded['blobs'][:-1]):
2019-01-22 12:54:17 -05:00
raise InvalidStreamDescriptorError("Contains zero-length data blob")
if 'blob_hash' in decoded['blobs'][-1]:
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
if any(i != blob_info['blob_num'] for i, blob_info in enumerate(decoded['blobs'])):
2019-02-05 20:59:32 -05:00
raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs")
added_on = time.time()
2019-01-22 12:54:17 -05:00
descriptor = cls(
loop, blob_dir,
binascii.unhexlify(decoded['stream_name']).decode(),
decoded['key'],
binascii.unhexlify(decoded['suggested_file_name']).decode(),
[BlobInfo(info['blob_num'], info['length'], info['iv'], added_on, info.get('blob_hash'))
2019-01-22 12:54:17 -05:00
for info in decoded['blobs']],
decoded['stream_hash'],
blob.blob_hash
)
if descriptor.get_stream_hash() != decoded['stream_hash']:
raise InvalidStreamDescriptorError("Stream hash does not match stream metadata")
return descriptor
@classmethod
2019-08-02 13:14:41 -04:00
async def from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str,
blob: AbstractBlob) -> 'StreamDescriptor':
if not blob.is_readable():
raise InvalidStreamDescriptorError(f"unreadable/missing blob: {blob.blob_hash}")
return await loop.run_in_executor(None, cls._from_stream_descriptor_blob, loop, blob_dir, blob)
2019-01-22 12:54:17 -05:00
@staticmethod
2020-01-03 02:35:38 -03:00
def get_blob_hashsum(blob_dict: typing.Dict):
length = blob_dict['length']
2019-01-22 12:54:17 -05:00
if length != 0:
2020-01-03 02:35:38 -03:00
blob_hash = blob_dict['blob_hash']
2019-01-22 12:54:17 -05:00
else:
blob_hash = None
2020-01-03 02:35:38 -03:00
blob_num = blob_dict['blob_num']
iv = blob_dict['iv']
2019-01-22 12:54:17 -05:00
blob_hashsum = get_lbry_hash_obj()
if length != 0:
blob_hashsum.update(blob_hash.encode())
blob_hashsum.update(str(blob_num).encode())
blob_hashsum.update(iv.encode())
blob_hashsum.update(str(length).encode())
return blob_hashsum.digest()
@staticmethod
def calculate_stream_hash(hex_stream_name: bytes, key: bytes, hex_suggested_file_name: bytes,
blob_infos: typing.List[typing.Dict]) -> str:
h = get_lbry_hash_obj()
h.update(hex_stream_name)
h.update(key)
h.update(hex_suggested_file_name)
blobs_hashsum = get_lbry_hash_obj()
for blob in blob_infos:
blobs_hashsum.update(StreamDescriptor.get_blob_hashsum(blob))
h.update(blobs_hashsum.digest())
return h.hexdigest()
@classmethod
async def create_stream(
2019-08-02 13:14:41 -04:00
cls, loop: asyncio.AbstractEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None,
old_sort: bool = False,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'],
2019-04-17 15:04:07 -04:00
asyncio.Task]] = None) -> 'StreamDescriptor':
2019-01-22 12:54:17 -05:00
blobs: typing.List[BlobInfo] = []
iv_generator = iv_generator or random_iv_generator()
key = key or os.urandom(AES.block_size // 8)
blob_num = -1
2021-09-10 10:53:52 -04:00
added_on = time.time()
async for blob_bytes in file_reader(file_path):
2019-01-22 12:54:17 -05:00
blob_num += 1
blob_info = await BlobFile.create_from_unencrypted(
2021-09-10 10:53:52 -04:00
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, added_on, True, blob_completed_callback
2020-01-03 02:35:38 -03:00
)
2019-01-22 12:54:17 -05:00
blobs.append(blob_info)
blobs.append(
2021-09-10 10:53:52 -04:00
# add the stream terminator
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode(), added_on, None, True)
2021-09-10 10:53:52 -04:00
)
2019-10-09 19:32:52 +02:00
file_name = os.path.basename(file_path)
suggested_file_name = sanitize_file_name(file_name)
2019-01-22 12:54:17 -05:00
descriptor = cls(
2019-10-09 19:32:52 +02:00
loop, blob_dir, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs
2019-01-22 12:54:17 -05:00
)
sd_blob = await descriptor.make_sd_blob(
old_sort=old_sort, blob_completed_callback=blob_completed_callback, added_on=added_on, is_mine=True
)
2019-01-22 12:54:17 -05:00
descriptor.sd_hash = sd_blob.blob_hash
return descriptor
def lower_bound_decrypted_length(self) -> int:
length = sum(blob.length - 1 for blob in self.blobs[:-2])
2019-01-22 12:54:17 -05:00
return length + self.blobs[-2].length - (AES.block_size // 8)
def upper_bound_decrypted_length(self) -> int:
return self.lower_bound_decrypted_length() + (AES.block_size // 8)
@classmethod
async def recover(cls, blob_dir: str, sd_blob: 'AbstractBlob', stream_hash: str, stream_name: str,
suggested_file_name: str, key: str,
blobs: typing.List['BlobInfo']) -> typing.Optional['StreamDescriptor']:
descriptor = cls(asyncio.get_event_loop(), blob_dir, stream_name, key, suggested_file_name,
blobs, stream_hash, sd_blob.blob_hash)
if descriptor.calculate_sd_hash() == sd_blob.blob_hash: # first check for a normal valid sd
old_sort = False
elif descriptor.calculate_old_sort_sd_hash() == sd_blob.blob_hash: # check if old field sorting works
old_sort = True
else:
return
await descriptor.make_sd_blob(sd_blob, old_sort)
return descriptor