From 7d6e62eb776810ca766893911f5c5264b16152d8 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Sep 2017 15:27:43 -0400 Subject: [PATCH] consolidate HashBlob and BlobFile --- lbrynet/core/HashBlob.py | 217 ++++++++++++++----------------- tests/unit/core/test_HashBlob.py | 9 +- 2 files changed, 103 insertions(+), 123 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index c87f1bab9..8aa822db2 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -8,13 +8,14 @@ from twisted.web.client import FileBodyProducer from twisted.python.failure import Failure from zope.interface import implements from lbrynet import conf -from lbrynet.core.Error import DownloadCanceledError, InvalidDataError +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.utils import is_valid_blobhash log = logging.getLogger(__name__) + class HashBlobReader(object): implements(interfaces.IConsumer) @@ -22,7 +23,6 @@ class HashBlobReader(object): self.write_func = write_func def registerProducer(self, producer, streaming): - from twisted.internet import reactor self.producer = producer @@ -34,7 +34,6 @@ class HashBlobReader(object): pass def write(self, data): - from twisted.internet import reactor self.write_func(data) @@ -58,6 +57,7 @@ class HashBlobWriter(object): def write(self, data): if self.write_handle is None: log.info("writer has already been closed") + # can this be changed to IOError? raise ValueError('I/O operation on closed file') self._hashsum.update(data) @@ -88,17 +88,109 @@ class HashBlobWriter(object): self.finished_cb_d = self.finished_cb(self, reason) -class HashBlob(object): - """A chunk of data available on the network which is specified by a hashsum""" +class BlobFile(object): + """ + A chunk of data available on the network which is specified by a hashsum - def __init__(self, blob_hash, length=None): - assert is_valid_blobhash(blob_hash) + This class is used to create blobs on the local filesystem + when we already know the blob hash before hand (i.e., when downloading blobs) + Also can be used for reading from blobs on the local filesystem + """ + + def __str__(self): + return self.blob_hash[:16] + + def __repr__(self): + return '<{}({})>'.format(self.__class__.__name__, str(self)) + + def __init__(self, blob_dir, blob_hash, length=None): + if not is_valid_blobhash(blob_hash): + raise InvalidBlobHashError(blob_hash) self.blob_hash = blob_hash self.length = length self.writers = {} # {Peer: writer, finished_deferred} self.finished_deferred = None self._verified = False self.readers = 0 + self.blob_dir = blob_dir + self.file_path = os.path.join(blob_dir, self.blob_hash) + self.setting_verified_blob_lock = threading.Lock() + self.moved_verified_blob = False + if os.path.isfile(self.file_path): + self.set_length(os.path.getsize(self.file_path)) + # This assumes that the hash of the blob has already been + # checked as part of the blob creation process. It might + # be worth having a function that checks the actual hash; + # its probably too expensive to have that check be part of + # this call. + self._verified = True + + def open_for_writing(self, peer): + """ + open a blob file to be written by peer, supports concurrent + writers, as long as they are from differnt peers. + + returns tuple of (writer, finished_deferred) + + writer - a file like object with a write() function, close() when finished + finished_deferred - deferred that is fired when write is finished and returns + a instance of itself as HashBlob + """ + if not peer in self.writers: + log.debug("Opening %s to be written by %s", str(self), str(peer)) + finished_deferred = defer.Deferred() + writer = HashBlobWriter(self.get_length, self.writer_finished) + self.writers[peer] = (writer, finished_deferred) + return (writer, finished_deferred) + log.warning("Tried to download the same file twice simultaneously from the same peer") + return None, None + + def open_for_reading(self): + """ + open blob for reading + + returns a file handle that can be read() from. + once finished with the file handle, user must call close_read_handle() + otherwise blob cannot be deleted. + """ + if self._verified is True: + file_handle = None + try: + file_handle = open(self.file_path, 'rb') + self.readers += 1 + return file_handle + except IOError: + log.exception('Failed to open %s', self.file_path) + self.close_read_handle(file_handle) + return None + + def delete(self): + """ + delete blob file from file system, prevent deletion + if a blob is being read from or written to + + returns a deferred that firesback when delete is completed + """ + if not self.writers and not self.readers: + self._verified = False + self.moved_verified_blob = False + + def delete_from_file_system(): + if os.path.isfile(self.file_path): + os.remove(self.file_path) + + d = threads.deferToThread(delete_from_file_system) + + def log_error(err): + log.warning("An error occurred deleting %s: %s", + str(self.file_path), err.getErrorMessage()) + return err + + d.addErrback(log_error) + return d + else: + return defer.fail(Failure( + ValueError("File is currently being read or written and cannot be deleted"))) @property def verified(self): @@ -190,116 +282,6 @@ class HashBlob(object): d.addBoth(lambda _: writer.close_handle()) return d - def open_for_writing(self, peer): - raise NotImplementedError() - - def open_for_reading(self): - raise NotImplementedError() - - def delete(self): - raise NotImplementedError() - - def close_read_handle(self, file_handle): - raise NotImplementedError() - - def _save_verified_blob(self, writer): - raise NotImplementedError() - - def __str__(self): - return self.blob_hash[:16] - - def __repr__(self): - return '<{}({})>'.format(self.__class__.__name__, str(self)) - - -class BlobFile(HashBlob): - """ - This class is used to create blobs on the local filesystem - when we already know the blob hash before hand (i.e., when downloading blobs) - Also can be used for reading from blobs on the local filesystem - """ - def __init__(self, blob_dir, blob_hash, length=None): - HashBlob.__init__(self, blob_hash, length) - self.blob_dir = blob_dir - self.file_path = os.path.join(blob_dir, self.blob_hash) - self.setting_verified_blob_lock = threading.Lock() - self.moved_verified_blob = False - if os.path.isfile(self.file_path): - self.set_length(os.path.getsize(self.file_path)) - # This assumes that the hash of the blob has already been - # checked as part of the blob creation process. It might - # be worth having a function that checks the actual hash; - # its probably too expensive to have that check be part of - # this call. - self._verified = True - - def open_for_writing(self, peer): - """ - open a blob file to be written by peer, supports concurrent - writers, as long as they are from differnt peers. - - returns tuple of (writer, finished_deferred) - - writer - a file like object with a write() function, close() when finished - finished_deferred - deferred that is fired when write is finished and returns - a instance of itself as HashBlob - """ - if not peer in self.writers: - log.debug("Opening %s to be written by %s", str(self), str(peer)) - finished_deferred = defer.Deferred() - writer = HashBlobWriter(self.get_length, self.writer_finished) - self.writers[peer] = (writer, finished_deferred) - return (writer, finished_deferred) - log.warning("Tried to download the same file twice simultaneously from the same peer") - return None, None - - def open_for_reading(self): - """ - open blob for reading - - returns a file handle that can be read() from. - once finished with the file handle, user must call close_read_handle() - otherwise blob cannot be deleted. - """ - if self._verified is True: - file_handle = None - try: - file_handle = open(self.file_path, 'rb') - self.readers += 1 - return file_handle - except IOError: - log.exception('Failed to open %s', self.file_path) - self.close_read_handle(file_handle) - return None - - def delete(self): - """ - delete blob file from file system, prevent deletion - if a blob is being read from or written to - - returns a deferred that firesback when delete is completed - """ - if not self.writers and not self.readers: - self._verified = False - self.moved_verified_blob = False - - def delete_from_file_system(): - if os.path.isfile(self.file_path): - os.remove(self.file_path) - - d = threads.deferToThread(delete_from_file_system) - - def log_error(err): - log.warning("An error occurred deleting %s: %s", - str(self.file_path), err.getErrorMessage()) - return err - - d.addErrback(log_error) - return d - else: - return defer.fail(Failure( - ValueError("File is currently being read or written and cannot be deleted"))) - def close_read_handle(self, file_handle): if file_handle is not None: file_handle.close() @@ -318,6 +300,7 @@ class BlobFile(HashBlob): else: raise DownloadCanceledError() + class BlobFileCreator(object): """ This class is used to create blobs on the local filesystem diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index 3bea14c4e..076e201bb 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -1,4 +1,4 @@ -from lbrynet.core.HashBlob import HashBlob, BlobFile +from lbrynet.core.HashBlob import BlobFile from lbrynet.core.Error import DownloadCanceledError, InvalidDataError @@ -9,7 +9,6 @@ import os import time - class BlobFileTest(unittest.TestCase): def setUp(self): self.db_dir, self.blob_dir = mk_db_and_blob_dir() @@ -17,11 +16,9 @@ class BlobFileTest(unittest.TestCase): self.fake_content = bytearray('0'*self.fake_content_len) self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca5602b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105' - def tearDown(self): rm_db_and_blob_dir(self.db_dir, self.blob_dir) - @defer.inlineCallbacks def test_good_write_and_read(self): # test a write that should succeed @@ -32,7 +29,7 @@ class BlobFileTest(unittest.TestCase): writer.write(self.fake_content) writer.close() out = yield finished_d - self.assertTrue(isinstance(out,HashBlob)) + self.assertTrue(isinstance(out, BlobFile)) self.assertTrue(out.verified) self.assertEqual(self.fake_content_len, out.get_length()) @@ -118,7 +115,7 @@ class BlobFileTest(unittest.TestCase): out_2 = yield finished_d_2 out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError) - self.assertTrue(isinstance(out_2,HashBlob)) + self.assertTrue(isinstance(out_2, BlobFile)) self.assertTrue(out_2.verified) self.assertEqual(self.fake_content_len, out_2.get_length())