lbry-sdk/lbry/blob/blob_file.py

355 lines
13 KiB
Python
Raw Normal View History

2015-08-20 17:27:15 +02:00
import os
2019-02-09 01:54:59 +01:00
import re
2019-01-22 18:47:46 +01:00
import asyncio
import binascii
2018-11-07 21:15:05 +01:00
import logging
2019-01-22 18:47:46 +01:00
import typing
import contextlib
from io import BytesIO
2019-01-22 18:47:46 +01:00
from cryptography.hazmat.primitives.ciphers import Cipher, modes
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.padding import PKCS7
2020-01-03 05:44:41 +01:00
from cryptography.hazmat.backends import default_backend
2015-08-20 17:27:15 +02:00
2020-01-03 05:44:41 +01:00
from lbry.utils import get_lbry_hash_obj
2019-06-21 02:55:47 +02:00
from lbry.error import DownloadCancelledError, InvalidBlobHashError, InvalidDataError
2020-01-03 06:57:36 +01:00
from lbry.blob import MAX_BLOB_SIZE, BLOBHASH_LENGTH
2019-06-21 02:55:47 +02:00
from lbry.blob.blob_info import BlobInfo
from lbry.blob.writer import HashBlobWriter
2017-09-13 21:27:43 +02:00
2019-01-22 18:47:46 +01:00
log = logging.getLogger(__name__)
2020-01-03 06:57:36 +01:00
HEXMATCH = re.compile("^[a-f,0-9]+$")
BACKEND = default_backend()
2019-01-22 18:47:46 +01:00
def is_valid_blobhash(blobhash: str) -> bool:
"""Checks whether the blobhash is the correct length and contains only
valid characters (0-9, a-f)
@param blobhash: string, the blobhash to check
@return: True/False
"""
2020-01-03 06:57:36 +01:00
return len(blobhash) == BLOBHASH_LENGTH and HEXMATCH.match(blobhash)
2018-02-12 20:16:43 +01:00
2019-02-09 01:57:26 +01:00
2019-01-22 18:47:46 +01:00
def encrypt_blob_bytes(key: bytes, iv: bytes, unencrypted: bytes) -> typing.Tuple[bytes, str]:
2020-01-03 06:57:36 +01:00
cipher = Cipher(AES(key), modes.CBC(iv), backend=BACKEND)
2019-01-22 18:47:46 +01:00
padder = PKCS7(AES.block_size).padder()
encryptor = cipher.encryptor()
encrypted = encryptor.update(padder.update(unencrypted) + padder.finalize()) + encryptor.finalize()
digest = get_lbry_hash_obj()
digest.update(encrypted)
return encrypted, digest.hexdigest()
def decrypt_blob_bytes(data: bytes, length: int, key: bytes, iv: bytes) -> bytes:
if len(data) != length:
raise ValueError("unexpected length")
2020-01-03 06:57:36 +01:00
cipher = Cipher(AES(key), modes.CBC(iv), backend=BACKEND)
unpadder = PKCS7(AES.block_size).unpadder()
decryptor = cipher.decryptor()
return unpadder.update(decryptor.update(data) + decryptor.finalize()) + unpadder.finalize()
2015-08-20 17:27:15 +02:00
class AbstractBlob:
2017-09-13 21:27:43 +02:00
"""
A chunk of data (up to 2MB) available on the network which is specified by a sha384 hash
2017-09-13 21:27:43 +02:00
This class is non-io specific
"""
2019-04-05 18:28:45 +02:00
__slots__ = [
'loop',
'blob_hash',
'length',
'blob_completed_callback',
'blob_directory',
'writers',
'verified',
2019-04-05 19:20:08 +02:00
'writing',
'readers'
2019-04-05 18:28:45 +02:00
]
2019-08-02 19:14:41 +02:00
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None):
2019-01-22 18:47:46 +01:00
self.loop = loop
2015-08-20 17:27:15 +02:00
self.blob_hash = blob_hash
self.length = length
self.blob_completed_callback = blob_completed_callback
self.blob_directory = blob_directory
self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {}
2019-01-22 18:47:46 +01:00
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
self.writing: asyncio.Event = asyncio.Event(loop=self.loop)
2019-04-05 19:20:08 +02:00
self.readers: typing.List[typing.BinaryIO] = []
2019-04-05 18:28:45 +02:00
if not is_valid_blobhash(blob_hash):
raise InvalidBlobHashError(blob_hash)
def __del__(self):
2019-04-05 19:20:08 +02:00
if self.writers or self.readers:
log.warning("%s not closed before being garbage collected", self.blob_hash)
2019-04-05 18:28:45 +02:00
self.close()
@contextlib.contextmanager
2019-04-05 19:20:08 +02:00
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
raise NotImplementedError()
2019-04-05 19:20:08 +02:00
@contextlib.contextmanager
def reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
if not self.is_readable():
raise OSError(f"{str(type(self))} not readable, {len(self.readers)} readers {len(self.writers)} writers")
with self._reader_context() as reader:
try:
2019-04-05 19:20:08 +02:00
self.readers.append(reader)
yield reader
finally:
if reader in self.readers:
self.readers.remove(reader)
2019-04-05 19:20:08 +02:00
2020-04-22 22:10:23 +02:00
def _write_blob(self, blob_bytes: bytes) -> asyncio.Task:
raise NotImplementedError()
def set_length(self, length) -> None:
if self.length is not None and length == self.length:
return
if self.length is None and 0 <= length <= MAX_BLOB_SIZE:
2019-01-22 18:47:46 +01:00
self.length = length
return
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", self.length, length)
2017-09-13 21:27:43 +02:00
def get_length(self) -> typing.Optional[int]:
return self.length
2019-02-08 06:38:27 +01:00
def get_is_verified(self) -> bool:
return self.verified.is_set()
2019-01-22 18:47:46 +01:00
def is_readable(self) -> bool:
return self.verified.is_set()
2019-01-22 18:47:46 +01:00
def is_writeable(self) -> bool:
return not self.writing.is_set()
2017-09-13 21:27:43 +02:00
def write_blob(self, blob_bytes: bytes):
if not self.is_writeable():
raise OSError("cannot open blob for writing")
try:
self.writing.set()
self._write_blob(blob_bytes)
finally:
self.writing.clear()
2019-01-22 18:47:46 +01:00
def close(self):
2019-01-22 18:47:46 +01:00
while self.writers:
2020-01-03 06:57:36 +01:00
_, writer = self.writers.popitem()
2019-04-23 21:36:06 +02:00
if writer and writer.finished and not writer.finished.done() and not self.loop.is_closed():
writer.finished.cancel()
2019-04-05 19:20:08 +02:00
while self.readers:
reader = self.readers.pop()
if reader:
reader.close()
2019-01-22 18:47:46 +01:00
def delete(self):
self.close()
self.verified.clear()
2019-02-22 01:00:28 +01:00
self.length = None
2019-01-22 18:47:46 +01:00
async def sendfile(self, writer: asyncio.StreamWriter) -> int:
"""
Read and send the file to the writer and return the number of bytes sent
"""
if not self.is_readable():
raise OSError('blob files cannot be read')
with self.reader_context() as handle:
2019-12-02 05:05:40 +01:00
try:
return await self.loop.sendfile(writer.transport, handle, count=self.get_length())
except (ConnectionError, BrokenPipeError, RuntimeError, OSError, AttributeError):
2019-12-02 05:05:40 +01:00
return -1
2019-01-22 18:47:46 +01:00
def decrypt(self, key: bytes, iv: bytes) -> bytes:
2017-09-13 21:27:43 +02:00
"""
2019-01-22 18:47:46 +01:00
Decrypt a BlobFile to plaintext bytes
2017-09-13 21:27:43 +02:00
"""
with self.reader_context() as reader:
return decrypt_blob_bytes(reader.read(), self.length, key, iv)
2019-01-22 18:47:46 +01:00
@classmethod
async def create_from_unencrypted(
2019-08-02 19:14:41 +02:00
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None) -> BlobInfo:
2017-09-13 21:27:43 +02:00
"""
2019-01-22 18:47:46 +01:00
Create an encrypted BlobFile from plaintext bytes
"""
2019-01-22 18:47:46 +01:00
blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
length = len(blob_bytes)
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir)
writer = blob.get_blob_writer()
2019-01-22 18:47:46 +01:00
writer.write(blob_bytes)
await blob.verified.wait()
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash)
2019-04-05 18:28:45 +02:00
def save_verified_blob(self, verified_bytes: bytes):
if self.verified.is_set():
2019-01-22 18:47:46 +01:00
return
2020-04-22 22:10:23 +02:00
def update_events(_):
self.verified.set()
2020-04-22 22:10:23 +02:00
self.writing.clear()
if self.is_writeable():
self.writing.set()
task = self._write_blob(verified_bytes)
task.add_done_callback(update_events)
2019-04-05 03:44:40 +02:00
if self.blob_completed_callback:
2020-04-22 22:10:23 +02:00
task.add_done_callback(lambda _: self.blob_completed_callback(self))
def get_blob_writer(self, peer_address: typing.Optional[str] = None,
peer_port: typing.Optional[int] = None) -> HashBlobWriter:
if (peer_address, peer_port) in self.writers and not self.writers[(peer_address, peer_port)].closed():
raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}")
fut = asyncio.Future(loop=self.loop)
writer = HashBlobWriter(self.blob_hash, self.get_length, fut)
self.writers[(peer_address, peer_port)] = writer
2015-08-20 17:27:15 +02:00
2019-04-23 21:36:06 +02:00
def remove_writer(_):
if (peer_address, peer_port) in self.writers:
del self.writers[(peer_address, peer_port)]
fut.add_done_callback(remove_writer)
def writer_finished_callback(finished: asyncio.Future):
try:
err = finished.exception()
if err:
raise err
verified_bytes = finished.result()
while self.writers:
_, other = self.writers.popitem()
if other is not writer:
2019-04-23 21:36:06 +02:00
other.close_handle()
2019-04-05 18:28:45 +02:00
self.save_verified_blob(verified_bytes)
except (InvalidBlobHashError, InvalidDataError) as error:
log.warning("writer error downloading %s: %s", self.blob_hash[:8], str(error))
2019-04-05 18:28:45 +02:00
except (DownloadCancelledError, asyncio.CancelledError, asyncio.TimeoutError):
pass
2019-04-23 21:36:06 +02:00
fut.add_done_callback(writer_finished_callback)
return writer
2015-08-20 17:27:15 +02:00
class BlobBuffer(AbstractBlob):
"""
An in-memory only blob
"""
2019-08-02 19:14:41 +02:00
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None):
self._verified_bytes: typing.Optional[BytesIO] = None
2019-04-05 19:20:08 +02:00
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
@contextlib.contextmanager
2019-04-05 19:20:08 +02:00
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
if not self.is_readable():
raise OSError("cannot open blob for reading")
try:
yield self._verified_bytes
finally:
if self._verified_bytes:
self._verified_bytes.close()
self._verified_bytes = None
self.verified.clear()
def _write_blob(self, blob_bytes: bytes):
2020-04-22 22:10:23 +02:00
async def write():
if self._verified_bytes:
raise OSError("already have bytes for blob")
self._verified_bytes = BytesIO(blob_bytes)
return self.loop.create_task(write())
2019-04-05 03:44:40 +02:00
def delete(self):
if self._verified_bytes:
self._verified_bytes.close()
self._verified_bytes = None
return super().delete()
2019-04-05 19:20:08 +02:00
def __del__(self):
super().__del__()
if self._verified_bytes:
self.delete()
class BlobFile(AbstractBlob):
"""
A blob existing on the local file system
"""
2019-08-02 19:14:41 +02:00
def __init__(self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None):
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory)
if not blob_directory or not os.path.isdir(blob_directory):
raise OSError(f"invalid blob directory '{blob_directory}'")
self.file_path = os.path.join(self.blob_directory, self.blob_hash)
if self.file_exists:
file_size = int(os.stat(self.file_path).st_size)
if length and length != file_size:
log.warning("expected %s to be %s bytes, file has %s", self.blob_hash, length, file_size)
self.delete()
else:
self.length = file_size
self.verified.set()
@property
def file_exists(self):
return os.path.isfile(self.file_path)
def is_writeable(self) -> bool:
return super().is_writeable() and not os.path.isfile(self.file_path)
def get_blob_writer(self, peer_address: typing.Optional[str] = None,
peer_port: typing.Optional[str] = None) -> HashBlobWriter:
if self.file_exists:
raise OSError(f"File already exists '{self.file_path}'")
return super().get_blob_writer(peer_address, peer_port)
@contextlib.contextmanager
2019-04-05 19:20:08 +02:00
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
handle = open(self.file_path, 'rb')
try:
yield handle
finally:
handle.close()
def _write_blob(self, blob_bytes: bytes):
2020-04-22 22:10:23 +02:00
def _write_blob():
with open(self.file_path, 'wb') as f:
f.write(blob_bytes)
async def write_blob():
await self.loop.run_in_executor(None, _write_blob)
return self.loop.create_task(write_blob())
def delete(self):
if os.path.isfile(self.file_path):
os.remove(self.file_path)
return super().delete()
@classmethod
async def create_from_unencrypted(
2019-08-02 19:14:41 +02:00
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'],
asyncio.Task]] = None) -> BlobInfo:
if not blob_dir or not os.path.isdir(blob_dir):
raise OSError(f"cannot create blob in directory: '{blob_dir}'")
return await super().create_from_unencrypted(
loop, blob_dir, key, iv, unencrypted, blob_num, blob_completed_callback
)