From e92321a9c12545fd11ad6a44081c27118579522e Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 23:52:42 -0400 Subject: [PATCH] have BlobFile.open_for_writing() return the writer instead of write and close functions --- lbrynet/core/HashBlob.py | 9 ++++----- lbrynet/core/client/BlobRequester.py | 4 ++-- lbrynet/reflector/server/server.py | 18 +++++++++--------- tests/unit/core/test_BlobManager.py | 7 +++---- tests/unit/core/test_HashBlob.py | 24 ++++++++++++------------ 5 files changed, 30 insertions(+), 32 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 686242a01..c87f1bab9 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -238,21 +238,20 @@ class BlobFile(HashBlob): open a blob file to be written by peer, supports concurrent writers, as long as they are from differnt peers. - returns tuple of (finished_deferred, writer.writer, writer.close) + returns tuple of (writer, finished_deferred) + writer - a file like object with a write() function, close() when finished finished_deferred - deferred that is fired when write is finished and returns a instance of itself as HashBlob - writer.write - function used to write to file, argument is data to be written - writer.close - function used to cancel the write, takes no argument """ if not peer in self.writers: log.debug("Opening %s to be written by %s", str(self), str(peer)) finished_deferred = defer.Deferred() writer = HashBlobWriter(self.get_length, self.writer_finished) self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.close + return (writer, finished_deferred) log.warning("Tried to download the same file twice simultaneously from the same peer") - return None, None, None + return None, None def open_for_reading(self): """ diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 95ffaa327..a887b24c6 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -466,9 +466,9 @@ class DownloadRequest(RequestHelper): if blob.is_validated(): log.debug('Skipping blob %s as its already validated', blob) continue - d, write_func, cancel_func = blob.open_for_writing(self.peer) + writer, d = blob.open_for_writing(self.peer) if d is not None: - return BlobDownloadDetails(blob, d, write_func, cancel_func, self.peer) + return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer) log.debug('Skipping blob %s as there was an issue opening it for writing', blob) return None diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 8467e5321..1c41c9eea 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -35,11 +35,11 @@ class ReflectorServer(Protocol): self.peer_version = None self.receiving_blob = False self.incoming_blob = None - self.blob_write = None self.blob_finished_d = None - self.cancel_write = None self.request_buff = "" + self.blob_writer = None + def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): log.info("Reflector upload from %s finished" % self.peer.host) @@ -82,14 +82,14 @@ class ReflectorServer(Protocol): """ blob = self.incoming_blob - self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer) self.blob_finished_d.addCallback(self._on_completed_blob, response_key) self.blob_finished_d.addErrback(self._on_failed_blob, response_key) def close_blob(self): + self.blob_writer.close() + self.blob_writer = None self.blob_finished_d = None - self.blob_write = None - self.cancel_write = None self.incoming_blob = None self.receiving_blob = False @@ -99,7 +99,7 @@ class ReflectorServer(Protocol): def dataReceived(self, data): if self.receiving_blob: - self.blob_write(data) + self.blob_writer.write(data) else: log.debug('Not yet recieving blob, data needs further processing') self.request_buff += data @@ -110,7 +110,7 @@ class ReflectorServer(Protocol): d.addErrback(self.handle_error) if self.receiving_blob and extra_data: log.debug('Writing extra data to blob') - self.blob_write(extra_data) + self.blob_writer.write(extra_data) def _get_valid_response(self, response_msg): extra_data = None @@ -221,7 +221,7 @@ class ReflectorServer(Protocol): sd_blob_hash = request_dict[SD_BLOB_HASH] sd_blob_size = request_dict[SD_BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size) d.addCallback(self.get_descriptor_response) d.addCallback(self.send_response) @@ -293,7 +293,7 @@ class ReflectorServer(Protocol): blob_hash = request_dict[BLOB_HASH] blob_size = request_dict[BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: log.debug('Received info for blob: %s', blob_hash[:16]) d = self.blob_manager.get_blob(blob_hash, blob_size) d.addCallback(self.get_blob_response) diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py index 1b7271dc2..f6b4a1f04 100644 --- a/tests/unit/core/test_BlobManager.py +++ b/tests/unit/core/test_BlobManager.py @@ -47,8 +47,8 @@ class BlobManagerTest(unittest.TestCase): yield self.bm.setup() blob = yield self.bm.get_blob(blob_hash,len(data)) - finished_d, write, cancel =yield blob.open_for_writing(self.peer) - yield write(data) + writer, finished_d = yield blob.open_for_writing(self.peer) + yield writer.write(data) yield self.bm.blob_completed(blob) yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data)) @@ -105,7 +105,7 @@ class BlobManagerTest(unittest.TestCase): # open the last blob blob = yield self.bm.get_blob(blob_hashes[-1]) - finished_d, write, cancel = yield blob.open_for_writing(self.peer) + writer, finished_d = yield blob.open_for_writing(self.peer) # delete the last blob and check if it still exists out = yield self.bm.delete_blobs([blob_hash]) @@ -114,4 +114,3 @@ class BlobManagerTest(unittest.TestCase): self.assertTrue(blob_hashes[-1] in blobs) self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1]))) - blob._close_writer(blob.writers[self.peer][0]) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index d4a7a4d57..d8843f390 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -28,8 +28,8 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) self.assertFalse(blob_file.verified) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(self.fake_content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) out = yield finished_d self.assertTrue(isinstance(out,HashBlob)) self.assertTrue(out.verified) @@ -52,8 +52,8 @@ class BlobFileTest(unittest.TestCase): @defer.inlineCallbacks def test_delete(self): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(self.fake_content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) out = yield finished_d out = yield blob_file.delete() @@ -67,8 +67,8 @@ class BlobFileTest(unittest.TestCase): content = bytearray('0'*32) blob_hash = random_lbry_hash() blob_file = BlobFile(self.blob_dir, blob_hash, expected_length) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) out = yield self.assertFailure(finished_d, InvalidDataError) @defer.inlineCallbacks @@ -79,8 +79,8 @@ class BlobFileTest(unittest.TestCase): content = bytearray('0'*length) blob_hash = random_lbry_hash() blob_file = BlobFile(self.blob_dir, blob_hash, length) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) yield self.assertFailure(finished_d, InvalidDataError) @@ -89,11 +89,11 @@ class BlobFileTest(unittest.TestCase): # start first writer and write half way, and then start second writer and write everything blob_hash = self.fake_content_hash blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) - finished_d_1, write_func_1, cancel_func_1 = blob_file.open_for_writing(peer=1) - write_func_1(self.fake_content[:self.fake_content_len/2]) + writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) + writer_1.write(self.fake_content[:self.fake_content_len/2]) - finished_d_2, write_func_2, cancel_func_2 = blob_file.open_for_writing(peer=2) - write_func_2(self.fake_content) + writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) + writer_2.write(self.fake_content) out_2 = yield finished_d_2 out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError)