2019-01-22 12:47:46 -05:00
|
|
|
import typing
|
2017-09-13 15:46:39 -04:00
|
|
|
import logging
|
2019-01-22 12:47:46 -05:00
|
|
|
import asyncio
|
2017-09-13 15:46:39 -04:00
|
|
|
from io import BytesIO
|
2019-06-20 20:55:47 -04:00
|
|
|
from lbry.error import InvalidBlobHashError, InvalidDataError
|
2020-01-03 01:44:41 -03:00
|
|
|
from lbry.utils import get_lbry_hash_obj
|
2017-09-13 15:46:39 -04:00
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2018-07-21 18:34:59 -04:00
|
|
|
class HashBlobWriter:
|
2019-01-22 12:47:46 -05:00
|
|
|
def __init__(self, expected_blob_hash: str, get_length: typing.Callable[[], int],
|
|
|
|
finished: asyncio.Future):
|
|
|
|
self.expected_blob_hash = expected_blob_hash
|
|
|
|
self.get_length = get_length
|
|
|
|
self.buffer = BytesIO()
|
|
|
|
self.finished = finished
|
|
|
|
self.finished.add_done_callback(lambda *_: self.close_handle())
|
2017-09-13 15:46:39 -04:00
|
|
|
self._hashsum = get_lbry_hash_obj()
|
|
|
|
self.len_so_far = 0
|
|
|
|
|
2017-09-29 14:29:35 -04:00
|
|
|
def __del__(self):
|
2019-01-22 12:47:46 -05:00
|
|
|
if self.buffer is not None:
|
2018-07-21 19:08:28 -04:00
|
|
|
log.warning("Garbage collection was called, but writer was not closed yet")
|
2019-01-22 12:47:46 -05:00
|
|
|
self.close_handle()
|
2017-09-29 14:29:35 -04:00
|
|
|
|
2019-01-22 12:47:46 -05:00
|
|
|
def calculate_blob_hash(self) -> str:
|
2017-09-13 15:46:39 -04:00
|
|
|
return self._hashsum.hexdigest()
|
|
|
|
|
2019-01-22 12:47:46 -05:00
|
|
|
def closed(self):
|
|
|
|
return self.buffer is None or self.buffer.closed
|
|
|
|
|
|
|
|
def write(self, data: bytes):
|
|
|
|
expected_length = self.get_length()
|
|
|
|
if not expected_length:
|
2019-10-02 19:38:56 +03:00
|
|
|
raise OSError("unknown blob length")
|
2019-01-22 12:47:46 -05:00
|
|
|
if self.buffer is None:
|
2018-07-26 22:49:35 -03:00
|
|
|
log.warning("writer has already been closed")
|
2019-01-22 12:47:46 -05:00
|
|
|
if not self.finished.done():
|
|
|
|
self.finished.cancel()
|
|
|
|
return
|
2019-10-02 19:38:56 +03:00
|
|
|
raise OSError('I/O operation on closed file')
|
2017-09-13 15:46:39 -04:00
|
|
|
|
|
|
|
self._hashsum.update(data)
|
|
|
|
self.len_so_far += len(data)
|
2019-01-22 12:47:46 -05:00
|
|
|
if self.len_so_far > expected_length:
|
2019-02-01 16:02:27 -03:00
|
|
|
self.finished.set_exception(InvalidDataError(
|
2019-01-22 12:47:46 -05:00
|
|
|
f'Length so far is greater than the expected length. {self.len_so_far} to {expected_length}'
|
|
|
|
))
|
2019-04-23 14:25:47 -04:00
|
|
|
self.close_handle()
|
2019-01-22 12:47:46 -05:00
|
|
|
return
|
|
|
|
self.buffer.write(data)
|
|
|
|
if self.len_so_far == expected_length:
|
|
|
|
blob_hash = self.calculate_blob_hash()
|
|
|
|
if blob_hash != self.expected_blob_hash:
|
2019-02-01 16:02:27 -03:00
|
|
|
self.finished.set_exception(InvalidBlobHashError(
|
2019-01-22 12:47:46 -05:00
|
|
|
f"blob hash is {blob_hash} vs expected {self.expected_blob_hash}"
|
|
|
|
))
|
2019-02-01 16:02:27 -03:00
|
|
|
elif self.finished and not (self.finished.done() or self.finished.cancelled()):
|
|
|
|
self.finished.set_result(self.buffer.getvalue())
|
2019-01-22 12:47:46 -05:00
|
|
|
self.close_handle()
|
2017-09-13 15:46:39 -04:00
|
|
|
|
|
|
|
def close_handle(self):
|
2019-04-23 13:55:58 -04:00
|
|
|
if not self.finished.done():
|
|
|
|
self.finished.cancel()
|
2019-01-22 12:47:46 -05:00
|
|
|
if self.buffer is not None:
|
|
|
|
self.buffer.close()
|
|
|
|
self.buffer = None
|