diff --git a/CHANGELOG.md b/CHANGELOG.md index c5bd31b1e..5d0a0de31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ at anytime. * Fixed https://github.com/lbryio/lbry/issues/923 * Fixed concurrent reflects opening too many files * Fixed cases when reflecting would fail on error conditions + * Fixed deadlocks from occuring during blob writes ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index e61016ba5..219b3aa82 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.17.0rc11" +__version__ = "0.17.0rc12" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 993a77400..2516ce44a 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -1,6 +1,5 @@ import logging import os -import threading from twisted.internet import defer, threads from twisted.protocols.basic import FileSender from twisted.web.client import FileBodyProducer @@ -40,8 +39,8 @@ class BlobFile(object): self.readers = 0 self.blob_dir = blob_dir self.file_path = os.path.join(blob_dir, self.blob_hash) - self.setting_verified_blob_lock = threading.Lock() - self.moved_verified_blob = False + self.blob_write_lock = defer.DeferredLock() + self.saved_verified_blob = False if os.path.isfile(self.file_path): self.set_length(os.path.getsize(self.file_path)) # This assumes that the hash of the blob has already been @@ -93,7 +92,7 @@ class BlobFile(object): """ if not self.writers and not self.readers: self._verified = False - self.moved_verified_blob = False + self.saved_verified_blob = False def delete_from_file_system(): if os.path.isfile(self.file_path): @@ -201,7 +200,7 @@ class BlobFile(object): if err is None: if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: if self._verified is False: - d = self._save_verified_blob(writer) + d = self.save_verified_blob(writer) d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) d.addCallback(lambda _: cancel_other_downloads()) else: @@ -219,15 +218,19 @@ class BlobFile(object): d.addBoth(lambda _: writer.close_handle()) return d + def save_verified_blob(self, writer): + # we cannot have multiple _save_verified_blob interrupting + # each other, can happen since startProducing is a deferred + return self.blob_write_lock.run(self._save_verified_blob, writer) + @defer.inlineCallbacks def _save_verified_blob(self, writer): - with self.setting_verified_blob_lock: - if self.moved_verified_blob is False: - writer.write_handle.seek(0) - out_path = os.path.join(self.blob_dir, self.blob_hash) - producer = FileBodyProducer(writer.write_handle) - yield producer.startProducing(open(out_path, 'wb')) - self.moved_verified_blob = True - defer.returnValue(True) - else: - raise DownloadCanceledError() + if self.saved_verified_blob is False: + writer.write_handle.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(writer.write_handle) + yield producer.startProducing(open(out_path, 'wb')) + self.saved_verified_blob = True + defer.returnValue(True) + else: + raise DownloadCanceledError() diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py index 292516a02..dd248c8fd 100644 --- a/lbrynet/blob/reader.py +++ b/lbrynet/blob/reader.py @@ -33,6 +33,7 @@ class HashBlobReader_v0(object): if self.streaming is False: reactor.callLater(0, self.producer.resumeProducing) + class HashBlobReader(object): """ This is a file like reader class that supports diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index a22b93add..d7ddc4018 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -331,8 +331,9 @@ class ReflectorServer(Protocol): return d def determine_missing_blobs(self, sd_blob): - with sd_blob.open_for_reading() as sd_file: - sd_blob_data = sd_file.read() + reader = sd_blob.open_for_reading() + sd_blob_data = reader.read() + reader.close() decoded_sd_blob = json.loads(sd_blob_data) return self.get_unvalidated_blobs_in_stream(decoded_sd_blob) diff --git a/lbrynet/tests/unit/core/test_HashBlob.py b/lbrynet/tests/unit/core/test_HashBlob.py index 925414067..66cc1758e 100644 --- a/lbrynet/tests/unit/core/test_HashBlob.py +++ b/lbrynet/tests/unit/core/test_HashBlob.py @@ -6,7 +6,6 @@ from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lb from twisted.trial import unittest from twisted.internet import defer - class BlobFileTest(unittest.TestCase): def setUp(self): self.db_dir, self.blob_dir = mk_db_and_blob_dir() @@ -144,4 +143,14 @@ class BlobFileTest(unittest.TestCase): self.assertEqual(self.fake_content_len, len(c)) self.assertEqual(bytearray(c), self.fake_content) + @defer.inlineCallbacks + def test_multiple_writers_save_at_same_time(self): + blob_hash = self.fake_content_hash + blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) + writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) + writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) + + blob_file.save_verified_blob(writer_1) + # second write should fail to save + yield self.assertFailure(blob_file.save_verified_blob(writer_2), DownloadCanceledError)