lbry-sdk/lbry/blob/writer.py

69 lines
2.4 KiB
Python
Raw Permalink Normal View History

2019-01-22 18:47:46 +01:00
import typing
2017-09-13 21:46:39 +02:00
import logging
2019-01-22 18:47:46 +01:00
import asyncio
2017-09-13 21:46:39 +02:00
from io import BytesIO
2019-06-21 02:55:47 +02:00
from lbry.error import InvalidBlobHashError, InvalidDataError
2020-01-03 05:44:41 +01:00
from lbry.utils import get_lbry_hash_obj
2017-09-13 21:46:39 +02:00
log = logging.getLogger(__name__)
class HashBlobWriter:
2019-01-22 18:47:46 +01:00
def __init__(self, expected_blob_hash: str, get_length: typing.Callable[[], int],
finished: asyncio.Future):
self.expected_blob_hash = expected_blob_hash
self.get_length = get_length
self.buffer = BytesIO()
self.finished = finished
self.finished.add_done_callback(lambda *_: self.close_handle())
2017-09-13 21:46:39 +02:00
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
def __del__(self):
2019-01-22 18:47:46 +01:00
if self.buffer is not None:
2018-07-22 01:08:28 +02:00
log.warning("Garbage collection was called, but writer was not closed yet")
2019-01-22 18:47:46 +01:00
self.close_handle()
2019-01-22 18:47:46 +01:00
def calculate_blob_hash(self) -> str:
2017-09-13 21:46:39 +02:00
return self._hashsum.hexdigest()
2019-01-22 18:47:46 +01:00
def closed(self):
return self.buffer is None or self.buffer.closed
def write(self, data: bytes):
expected_length = self.get_length()
if not expected_length:
raise OSError("unknown blob length")
2019-01-22 18:47:46 +01:00
if self.buffer is None:
2018-07-27 03:49:35 +02:00
log.warning("writer has already been closed")
2019-01-22 18:47:46 +01:00
if not self.finished.done():
self.finished.cancel()
return
raise OSError('I/O operation on closed file')
2017-09-13 21:46:39 +02:00
self._hashsum.update(data)
self.len_so_far += len(data)
2019-01-22 18:47:46 +01:00
if self.len_so_far > expected_length:
self.finished.set_exception(InvalidDataError(
2019-01-22 18:47:46 +01:00
f'Length so far is greater than the expected length. {self.len_so_far} to {expected_length}'
))
self.close_handle()
2019-01-22 18:47:46 +01:00
return
self.buffer.write(data)
if self.len_so_far == expected_length:
blob_hash = self.calculate_blob_hash()
if blob_hash != self.expected_blob_hash:
self.finished.set_exception(InvalidBlobHashError(
2019-01-22 18:47:46 +01:00
f"blob hash is {blob_hash} vs expected {self.expected_blob_hash}"
))
elif self.finished and not (self.finished.done() or self.finished.cancelled()):
self.finished.set_result(self.buffer.getvalue())
2019-01-22 18:47:46 +01:00
self.close_handle()
2017-09-13 21:46:39 +02:00
def close_handle(self):
if not self.finished.done():
self.finished.cancel()
2019-01-22 18:47:46 +01:00
if self.buffer is not None:
self.buffer.close()
self.buffer = None