diff --git a/CHANGELOG.md b/CHANGELOG.md index 57d5a3662..51a1e089a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ at anytime. ### Fixed * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer + * Fixed https://github.com/lbryio/lbry/issues/923 ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index f1a2010ee..993a77400 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -9,7 +9,7 @@ from lbrynet import conf from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.utils import is_valid_blobhash from lbrynet.blob.writer import HashBlobWriter -from lbrynet.blob.reader import HashBlobReader +from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0 log = logging.getLogger(__name__) @@ -75,19 +75,13 @@ class BlobFile(object): """ open blob for reading - returns a file handle that can be read() from. - once finished with the file handle, user must call close_read_handle() - otherwise blob cannot be deleted. + returns a file like object that can be read() from, and closed() when + finished """ if self._verified is True: - file_handle = None - try: - file_handle = open(self.file_path, 'rb') - self.readers += 1 - return file_handle - except IOError: - log.exception('Failed to open %s', self.file_path) - self.close_read_handle(file_handle) + reader = HashBlobReader(self.file_path, self.reader_finished) + self.readers += 1 + return reader return None def delete(self): @@ -150,12 +144,16 @@ class BlobFile(object): return False def read(self, write_func): + """ + This function is only used in StreamBlobDecryptor + and should be deprecated in favor of open_for_reading() + """ def close_self(*args): self.close_read_handle(file_handle) return args[0] file_sender = FileSender() - reader = HashBlobReader(write_func) + reader = HashBlobReader_v0(write_func) file_handle = self.open_for_reading() if file_handle is not None: d = file_sender.beginFileTransfer(file_handle, reader) @@ -164,6 +162,19 @@ class BlobFile(object): d = defer.fail(IOError("Could not read the blob")) return d + def close_read_handle(self, file_handle): + """ + This function is only used in StreamBlobDecryptor + and should be deprecated in favor of open_for_reading() + """ + if file_handle is not None: + file_handle.close() + self.readers -= 1 + + def reader_finished(self, reader): + self.readers -= 1 + return defer.succeed(True) + def writer_finished(self, writer, err=None): def fire_finished_deferred(): self._verified = True @@ -208,11 +219,6 @@ class BlobFile(object): d.addBoth(lambda _: writer.close_handle()) return d - def close_read_handle(self, file_handle): - if file_handle is not None: - file_handle.close() - self.readers -= 1 - @defer.inlineCallbacks def _save_verified_blob(self, writer): with self.setting_verified_blob_lock: diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index c85cc38f3..292516a02 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -5,7 +5,11 @@ from zope.interface import implements log = logging.getLogger(__name__) -class HashBlobReader(object): +class HashBlobReader_v0(object): + """ + This is a class that is only used in StreamBlobDecryptor + and should be deprecated + """ implements(interfaces.IConsumer) def __init__(self, write_func): @@ -28,3 +32,28 @@ class HashBlobReader(object): self.write_func(data) if self.streaming is False: reactor.callLater(0, self.producer.resumeProducing) + +class HashBlobReader(object): + """ + This is a file like reader class that supports + read(size) and close() + """ + def __init__(self, file_path, finished_cb): + self.finished_cb = finished_cb + self.finished_cb_d = None + self.read_handle = open(file_path, 'rb') + + def __del__(self): + self.close() + + def read(self, size=-1): + return self.read_handle.read(size) + + def close(self): + # if we've already closed and called finished_cb, do nothing + if self.finished_cb_d is not None: + return + self.read_handle.close() + self.finished_cb_d = self.finished_cb(self) + + diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index b3fb714cb..d0bd28310 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -55,7 +55,7 @@ class BlobStreamDescriptorReader(StreamDescriptorReader): f = self.blob.open_for_reading() if f is not None: raw_data = f.read() - self.blob.close_read_handle(f) + f.close() return raw_data else: raise ValueError("Could not open the blob for reading") diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index de98cf898..cc8c800bd 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -89,7 +89,7 @@ class BlobRequestHandler(object): def cancel_send(self, err): if self.currently_uploading is not None: - self.currently_uploading.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.currently_uploading = None return err @@ -225,7 +225,7 @@ class BlobRequestHandler(object): def set_not_uploading(reason=None): if self.currently_uploading is not None: - self.currently_uploading.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.currently_uploading = None self.file_sender = None diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 63d7d1461..48124050d 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -2433,7 +2433,7 @@ class Daemon(AuthJSONRPCServer): if encoding and encoding in decoders: blob_file = blob.open_for_reading() result = decoders[encoding](blob_file.read()) - blob.close_read_handle(blob_file) + blob_file.close() else: result = "Downloaded blob %s" % blob_hash @@ -2682,7 +2682,7 @@ class Daemon(AuthJSONRPCServer): def read_sd_blob(sd_blob): sd_blob_file = sd_blob.open_for_reading() decoded_sd_blob = json.loads(sd_blob_file.read()) - sd_blob.close_read_handle(sd_blob_file) + sd_blob_file.close() return decoded_sd_blob resolved_result = yield self.session.wallet.resolve(uri) diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index 1f1c540a2..d2c41ce30 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -94,7 +94,7 @@ class BlobReflectorClient(Protocol): def set_not_uploading(self): if self.next_blob_to_send is not None: - self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.next_blob_to_send = None self.file_sender = None @@ -105,6 +105,7 @@ class BlobReflectorClient(Protocol): assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) + d.addCallback(lambda _: self.read_handle.close()) return d def handle_handshake_response(self, response_dict): diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index ebf605b02..7e0060f93 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -179,7 +179,7 @@ class EncryptedFileReflectorClient(Protocol): def set_not_uploading(self): if self.next_blob_to_send is not None: log.debug("Close %s", self.next_blob_to_send) - self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle.close() self.read_handle = None self.next_blob_to_send = None if self.file_sender is not None: @@ -191,6 +191,7 @@ class EncryptedFileReflectorClient(Protocol): assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) + d.addCallback(lambda _: self.read_handle.close()) return d def handle_handshake_response(self, response_dict): diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index d1c282478..93a19ef19 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -44,9 +44,15 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) self.assertTrue(blob_file.verified) f = blob_file.open_for_reading() + self.assertEqual(1, blob_file.readers) c = f.read() self.assertEqual(c, self.fake_content) + # close reader + f.close() + self.assertEqual(0, blob_file.readers) + + @defer.inlineCallbacks def test_delete(self): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) @@ -58,6 +64,21 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash) self.assertFalse(blob_file.verified) + @defer.inlineCallbacks + def test_delete_fail(self): + # deletes should fail if being written to + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + yield self.assertFailure(blob_file.delete(), ValueError) + writer.write(self.fake_content) + writer.close() + + # deletes should fail if being read and not closed + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertTrue(blob_file.verified) + f = blob_file.open_for_reading() + yield self.assertFailure(blob_file.delete(), ValueError) + @defer.inlineCallbacks def test_too_much_write(self): # writing too much data should result in failure