From d1511cba5498e91104dc1fa21a96a1e9d8adf277 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 5 Oct 2017 15:59:53 -0400 Subject: [PATCH 1/6] fix reading sd blob TODO: add __enter__ and __exit__ methods to HashBlobReader to let it be used as a contextmanager --- lbrynet/blob/reader.py | 1 + lbrynet/reflector/server/server.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index 292516a02..dd248c8fd 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -33,6 +33,7 @@ class HashBlobReader_v0(object): if self.streaming is False: reactor.callLater(0, self.producer.resumeProducing) + class HashBlobReader(object): """ This is a file like reader class that supports diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index a22b93add..d7ddc4018 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -331,8 +331,9 @@ class ReflectorServer(Protocol): return d def determine_missing_blobs(self, sd_blob): - with sd_blob.open_for_reading() as sd_file: - sd_blob_data = sd_file.read() + reader = sd_blob.open_for_reading() + sd_blob_data = reader.read() + reader.close() decoded_sd_blob = json.loads(sd_blob_data) return self.get_unvalidated_blobs_in_stream(decoded_sd_blob) From 2115919c551e073b1f0781eacc0b0f01258eb2d7 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 6 Oct 2017 16:34:45 -0400 Subject: [PATCH 2/6] Add test when mutliple writers call _save_verified_blob at once, this will cause deadlock --- lbrynet/tests/unit/core/test_HashBlob.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lbrynet/tests/unit/core/test_HashBlob.py b/lbrynet/tests/unit/core/test_HashBlob.py index 925414067..888887bb8 100644 --- a/lbrynet/tests/unit/core/test_HashBlob.py +++ b/lbrynet/tests/unit/core/test_HashBlob.py @@ -6,7 +6,6 @@ from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lb from twisted.trial import unittest from twisted.internet import defer - class BlobFileTest(unittest.TestCase): def setUp(self): self.db_dir, self.blob_dir = mk_db_and_blob_dir() @@ -144,4 +143,14 @@ class BlobFileTest(unittest.TestCase): self.assertEqual(self.fake_content_len, len(c)) self.assertEqual(bytearray(c), self.fake_content) + @defer.inlineCallbacks + def test_multiple_writers_save_at_same_time(self): + blob_hash = self.fake_content_hash + blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) + writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) + writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) + + blob_file._save_verified_blob(writer_1) + # second write should fail to save + yield self.assertFailure(blob_file._save_verified_blob(writer_2), DownloadCanceledError) From d9cc81766d6a917a50fac6804ea8267197980f91 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 6 Oct 2017 17:05:10 -0400 Subject: [PATCH 3/6] use DeferredLock instead of threading.Lock() as it can cause deadlocks --- lbrynet/blob/blob_file.py | 29 +++++++++++++----------- lbrynet/tests/unit/core/test_HashBlob.py | 4 ++-- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 993a77400..2abc10f3a 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -1,6 +1,5 @@ import logging import os -import threading from twisted.internet import defer, threads from twisted.protocols.basic import FileSender from twisted.web.client import FileBodyProducer @@ -40,7 +39,7 @@ class BlobFile(object): self.readers = 0 self.blob_dir = blob_dir self.file_path = os.path.join(blob_dir, self.blob_hash) - self.setting_verified_blob_lock = threading.Lock() + self.blob_write_lock = defer.DeferredLock() self.moved_verified_blob = False if os.path.isfile(self.file_path): self.set_length(os.path.getsize(self.file_path)) @@ -201,7 +200,7 @@ class BlobFile(object): if err is None: if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: if self._verified is False: - d = self._save_verified_blob(writer) + d = self.save_verified_blob(writer) d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) d.addCallback(lambda _: cancel_other_downloads()) else: @@ -219,15 +218,19 @@ class BlobFile(object): d.addBoth(lambda _: writer.close_handle()) return d + def save_verified_blob(self, writer): + # we cannot have multiple _save_verified_blob interrupting + # each other, can happen since startProducing is a deferred + return self.blob_write_lock.run(self._save_verified_blob, writer) + @defer.inlineCallbacks def _save_verified_blob(self, writer): - with self.setting_verified_blob_lock: - if self.moved_verified_blob is False: - writer.write_handle.seek(0) - out_path = os.path.join(self.blob_dir, self.blob_hash) - producer = FileBodyProducer(writer.write_handle) - yield producer.startProducing(open(out_path, 'wb')) - self.moved_verified_blob = True - defer.returnValue(True) - else: - raise DownloadCanceledError() + if self.moved_verified_blob is False: + writer.write_handle.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(writer.write_handle) + yield producer.startProducing(open(out_path, 'wb')) + self.moved_verified_blob = True + defer.returnValue(True) + else: + raise DownloadCanceledError() diff --git a/lbrynet/tests/unit/core/test_HashBlob.py b/lbrynet/tests/unit/core/test_HashBlob.py index 888887bb8..66cc1758e 100644 --- a/lbrynet/tests/unit/core/test_HashBlob.py +++ b/lbrynet/tests/unit/core/test_HashBlob.py @@ -150,7 +150,7 @@ class BlobFileTest(unittest.TestCase): writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) - blob_file._save_verified_blob(writer_1) + blob_file.save_verified_blob(writer_1) # second write should fail to save - yield self.assertFailure(blob_file._save_verified_blob(writer_2), DownloadCanceledError) + yield self.assertFailure(blob_file.save_verified_blob(writer_2), DownloadCanceledError) From 043758c11f32a5469146986eadfde1167faa86ad Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 6 Oct 2017 17:04:27 -0400 Subject: [PATCH 4/6] rename moved_verified_blob as saved_verified_blob --- lbrynet/blob/blob_file.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 2abc10f3a..2516ce44a 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -40,7 +40,7 @@ class BlobFile(object): self.blob_dir = blob_dir self.file_path = os.path.join(blob_dir, self.blob_hash) self.blob_write_lock = defer.DeferredLock() - self.moved_verified_blob = False + self.saved_verified_blob = False if os.path.isfile(self.file_path): self.set_length(os.path.getsize(self.file_path)) # This assumes that the hash of the blob has already been @@ -92,7 +92,7 @@ class BlobFile(object): """ if not self.writers and not self.readers: self._verified = False - self.moved_verified_blob = False + self.saved_verified_blob = False def delete_from_file_system(): if os.path.isfile(self.file_path): @@ -225,12 +225,12 @@ class BlobFile(object): @defer.inlineCallbacks def _save_verified_blob(self, writer): - if self.moved_verified_blob is False: + if self.saved_verified_blob is False: writer.write_handle.seek(0) out_path = os.path.join(self.blob_dir, self.blob_hash) producer = FileBodyProducer(writer.write_handle) yield producer.startProducing(open(out_path, 'wb')) - self.moved_verified_blob = True + self.saved_verified_blob = True defer.returnValue(True) else: raise DownloadCanceledError() From b168c13c4026aa9552f83cdbbe9d54a224e465d9 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 6 Oct 2017 17:02:37 -0400 Subject: [PATCH 5/6] add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95a2dc473..af6f44c32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ at anytime. * Fixed https://github.com/lbryio/lbry/issues/923 * Fixed concurrent reflects opening too many files * Fixed cases when reflecting would fail on error conditions + * Fixed deadlocks from occuring during blob writes ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. From e4cc87b8f07a306f6fcc6b31b3d962bc76ec56df Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 9 Oct 2017 10:30:36 -0400 Subject: [PATCH 6/6] Bump version 0.17.0rc11 --> 0.17.0rc12 Signed-off-by: Jack Robison --- lbrynet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index e61016ba5..219b3aa82 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.17.0rc11" +__version__ = "0.17.0rc12" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler())