From 14636a5d3822d3a98716cd30d1c51af687dd7474 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Sep 2017 15:46:39 -0400 Subject: [PATCH] split up HashBlob.py into lbrynet.blob --- lbrynet/blob/__init__.py | 4 + .../{core/HashBlob.py => blob/blob_file.py} | 114 +----------------- lbrynet/blob/creator.py | 43 +++++++ lbrynet/blob/reader.py | 30 +++++ lbrynet/blob/writer.py | 54 +++++++++ lbrynet/core/BlobManager.py | 3 +- scripts/decrypt_blob.py | 4 +- scripts/download_blob_from_peer.py | 4 +- tests/unit/core/test_HashBlob.py | 2 +- 9 files changed, 141 insertions(+), 117 deletions(-) create mode 100644 lbrynet/blob/__init__.py rename lbrynet/{core/HashBlob.py => blob/blob_file.py} (69%) create mode 100644 lbrynet/blob/creator.py create mode 100644 lbrynet/blob/reader.py create mode 100644 lbrynet/blob/writer.py diff --git a/lbrynet/blob/__init__.py b/lbrynet/blob/__init__.py new file mode 100644 index 000000000..e605ea317 --- /dev/null +++ b/lbrynet/blob/__init__.py @@ -0,0 +1,4 @@ +from blob_file import BlobFile +from creator import BlobFileCreator +from writer import HashBlobWriter +from reader import HashBlobReader diff --git a/lbrynet/core/HashBlob.py b/lbrynet/blob/blob_file.py similarity index 69% rename from lbrynet/core/HashBlob.py rename to lbrynet/blob/blob_file.py index 8aa822db2..9eaad184f 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/blob/blob_file.py @@ -1,93 +1,20 @@ -from io import BytesIO import logging import os import threading -from twisted.internet import interfaces, defer, threads +from twisted.internet import defer, threads from twisted.protocols.basic import FileSender from twisted.web.client import FileBodyProducer from twisted.python.failure import Failure -from zope.interface import implements from lbrynet import conf from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError -from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.utils import is_valid_blobhash +from lbrynet.blob.writer import HashBlobWriter +from lbrynet.blob.reader import HashBlobReader log = logging.getLogger(__name__) -class HashBlobReader(object): - implements(interfaces.IConsumer) - - def __init__(self, write_func): - self.write_func = write_func - - def registerProducer(self, producer, streaming): - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - pass - - def write(self, data): - from twisted.internet import reactor - - self.write_func(data) - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - -class HashBlobWriter(object): - def __init__(self, length_getter, finished_cb): - self.write_handle = BytesIO() - self.length_getter = length_getter - self.finished_cb = finished_cb - self.finished_cb_d = None - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - - @property - def blob_hash(self): - return self._hashsum.hexdigest() - - def write(self, data): - if self.write_handle is None: - log.info("writer has already been closed") - # can this be changed to IOError? - raise ValueError('I/O operation on closed file') - - self._hashsum.update(data) - self.len_so_far += len(data) - if self.len_so_far > self.length_getter(): - self.finished_cb_d = self.finished_cb( - self, - Failure(InvalidDataError("Length so far is greater than the expected length." - " %s to %s" % (self.len_so_far, - self.length_getter())))) - else: - self.write_handle.write(data) - if self.len_so_far == self.length_getter(): - self.finished_cb_d = self.finished_cb(self) - - def close_handle(self): - if self.write_handle is not None: - self.write_handle.close() - self.write_handle = None - - def close(self, reason=None): - # if we've already called finished_cb because we either finished writing - # or closed already, do nothing - if self.finished_cb_d is not None: - return - if reason is None: - reason = Failure(DownloadCanceledError()) - self.finished_cb_d = self.finished_cb(self, reason) - - class BlobFile(object): """ A chunk of data available on the network which is specified by a hashsum @@ -299,38 +226,3 @@ class BlobFile(object): defer.returnValue(True) else: raise DownloadCanceledError() - - -class BlobFileCreator(object): - """ - This class is used to create blobs on the local filesystem - when we do not know the blob hash beforehand (i.e, when creating - a new stream) - """ - def __init__(self, blob_dir): - self.blob_dir = blob_dir - self.buffer = BytesIO() - self._is_open = True - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - self.blob_hash = None - self.length = None - - @defer.inlineCallbacks - def close(self): - self.length = self.len_so_far - self.blob_hash = self._hashsum.hexdigest() - if self.blob_hash and self._is_open: - self.buffer.seek(0) - out_path = os.path.join(self.blob_dir, self.blob_hash) - producer = FileBodyProducer(self.buffer) - yield producer.startProducing(open(out_path, 'wb')) - self._is_open = False - defer.returnValue(self.blob_hash) - - def write(self, data): - if not self._is_open: - raise IOError - self._hashsum.update(data) - self.len_so_far += len(data) - self.buffer.write(data) diff --git a/lbrynet/blob/creator.py b/lbrynet/blob/creator.py new file mode 100644 index 000000000..417d08d85 --- /dev/null +++ b/lbrynet/blob/creator.py @@ -0,0 +1,43 @@ +import os +import logging +from io import BytesIO +from twisted.internet import defer +from twisted.web.client import FileBodyProducer +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class BlobFileCreator(object): + """ + This class is used to create blobs on the local filesystem + when we do not know the blob hash beforehand (i.e, when creating + a new stream) + """ + def __init__(self, blob_dir): + self.blob_dir = blob_dir + self.buffer = BytesIO() + self._is_open = True + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + self.blob_hash = None + self.length = None + + @defer.inlineCallbacks + def close(self): + self.length = self.len_so_far + self.blob_hash = self._hashsum.hexdigest() + if self.blob_hash and self._is_open: + self.buffer.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(self.buffer) + yield producer.startProducing(open(out_path, 'wb')) + self._is_open = False + defer.returnValue(self.blob_hash) + + def write(self, data): + if not self._is_open: + raise IOError + self._hashsum.update(data) + self.len_so_far += len(data) + self.buffer.write(data) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py new file mode 100644 index 000000000..c85cc38f3 --- /dev/null +++ b/lbrynet/blob/reader.py @@ -0,0 +1,30 @@ +import logging +from twisted.internet import interfaces +from zope.interface import implements + +log = logging.getLogger(__name__) + + +class HashBlobReader(object): + implements(interfaces.IConsumer) + + def __init__(self, write_func): + self.write_func = write_func + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + pass + + def write(self, data): + from twisted.internet import reactor + + self.write_func(data) + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py new file mode 100644 index 000000000..7e6f4d13f --- /dev/null +++ b/lbrynet/blob/writer.py @@ -0,0 +1,54 @@ +import logging +from io import BytesIO +from twisted.python.failure import Failure +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class HashBlobWriter(object): + def __init__(self, length_getter, finished_cb): + self.write_handle = BytesIO() + self.length_getter = length_getter + self.finished_cb = finished_cb + self.finished_cb_d = None + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + + @property + def blob_hash(self): + return self._hashsum.hexdigest() + + def write(self, data): + if self.write_handle is None: + log.info("writer has already been closed") + # can this be changed to IOError? + raise ValueError('I/O operation on closed file') + + self._hashsum.update(data) + self.len_so_far += len(data) + if self.len_so_far > self.length_getter(): + self.finished_cb_d = self.finished_cb( + self, + Failure(InvalidDataError("Length so far is greater than the expected length." + " %s to %s" % (self.len_so_far, + self.length_getter())))) + else: + self.write_handle.write(data) + if self.len_so_far == self.length_getter(): + self.finished_cb_d = self.finished_cb(self) + + def close_handle(self): + if self.write_handle is not None: + self.write_handle.close() + self.write_handle = None + + def close(self, reason=None): + # if we've already called finished_cb because we either finished writing + # or closed already, do nothing + if self.finished_cb_d is not None: + return + if reason is None: + reason = Failure(DownloadCanceledError()) + self.finished_cb_d = self.finished_cb(self, reason) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index cb7d738f9..48ee53e24 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -6,7 +6,8 @@ import sqlite3 from twisted.internet import threads, defer, reactor from twisted.enterprise import adbapi from lbrynet import conf -from lbrynet.core.HashBlob import BlobFile, BlobFileCreator +from lbrynet.blob.blob_file import BlobFile +from lbrynet.blob.creator import BlobFileCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.sqlite_helpers import rerun_if_locked diff --git a/scripts/decrypt_blob.py b/scripts/decrypt_blob.py index bc905bf2e..4f5c8b8e9 100644 --- a/scripts/decrypt_blob.py +++ b/scripts/decrypt_blob.py @@ -10,7 +10,7 @@ from twisted.internet import reactor from lbrynet import conf from lbrynet.cryptstream import CryptBlob -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import log_support @@ -46,7 +46,7 @@ def decrypt_blob(blob_file, key, iv, output): filename = os.path.abspath(blob_file) length = os.path.getsize(filename) directory, blob_hash = os.path.split(filename) - blob = HashBlob.BlobFile(directory, blob_hash, True, length) + blob = BlobFile(directory, blob_hash, length) decryptor = CryptBlob.StreamBlobDecryptor( blob, binascii.unhexlify(key), binascii.unhexlify(iv), length) with open(output, 'w') as f: diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index 02d9a1693..43a510328 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -13,7 +13,7 @@ from lbrynet import conf from lbrynet.core import log_support from lbrynet.core import BlobManager from lbrynet.core import HashAnnouncer -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import RateLimiter from lbrynet.core import Peer from lbrynet.core import Wallet @@ -38,7 +38,7 @@ def main(args=None): announcer = HashAnnouncer.DummyHashAnnouncer() blob_manager = MyBlobManager(announcer) - blob = HashBlob.BlobFile(args.directory, args.blob_hash) + blob = BlobFile(args.directory, args.blob_hash) download_manager = SingleBlobDownloadManager(blob) peer = Peer.Peer(*conf.server_port(args.peer)) payment_rate_manager = DumbPaymentRateManager() diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index 076e201bb..179313720 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -1,4 +1,4 @@ -from lbrynet.core.HashBlob import BlobFile +from lbrynet.blob import BlobFile from lbrynet.core.Error import DownloadCanceledError, InvalidDataError