Merge branch 'fix_freezing_2'

This commit is contained in:
Jack Robison 2017-10-09 10:29:51 -04:00
commit 7a351ffff0
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
3 changed files with 29 additions and 16 deletions

View file

@ -18,6 +18,7 @@ at anytime.
* Fixed https://github.com/lbryio/lbry/issues/923 * Fixed https://github.com/lbryio/lbry/issues/923
* Fixed concurrent reflects opening too many files * Fixed concurrent reflects opening too many files
* Fixed cases when reflecting would fail on error conditions * Fixed cases when reflecting would fail on error conditions
* Fixed deadlocks from occuring during blob writes
### Deprecated ### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.

View file

@ -1,6 +1,5 @@
import logging import logging
import os import os
import threading
from twisted.internet import defer, threads from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer from twisted.web.client import FileBodyProducer
@ -40,8 +39,8 @@ class BlobFile(object):
self.readers = 0 self.readers = 0
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash) self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock() self.blob_write_lock = defer.DeferredLock()
self.moved_verified_blob = False self.saved_verified_blob = False
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path)) self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been # This assumes that the hash of the blob has already been
@ -93,7 +92,7 @@ class BlobFile(object):
""" """
if not self.writers and not self.readers: if not self.writers and not self.readers:
self._verified = False self._verified = False
self.moved_verified_blob = False self.saved_verified_blob = False
def delete_from_file_system(): def delete_from_file_system():
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):
@ -201,7 +200,7 @@ class BlobFile(object):
if err is None: if err is None:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False: if self._verified is False:
d = self._save_verified_blob(writer) d = self.save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads()) d.addCallback(lambda _: cancel_other_downloads())
else: else:
@ -219,15 +218,19 @@ class BlobFile(object):
d.addBoth(lambda _: writer.close_handle()) d.addBoth(lambda _: writer.close_handle())
return d return d
def save_verified_blob(self, writer):
# we cannot have multiple _save_verified_blob interrupting
# each other, can happen since startProducing is a deferred
return self.blob_write_lock.run(self._save_verified_blob, writer)
@defer.inlineCallbacks @defer.inlineCallbacks
def _save_verified_blob(self, writer): def _save_verified_blob(self, writer):
with self.setting_verified_blob_lock: if self.saved_verified_blob is False:
if self.moved_verified_blob is False:
writer.write_handle.seek(0) writer.write_handle.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash) out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(writer.write_handle) producer = FileBodyProducer(writer.write_handle)
yield producer.startProducing(open(out_path, 'wb')) yield producer.startProducing(open(out_path, 'wb'))
self.moved_verified_blob = True self.saved_verified_blob = True
defer.returnValue(True) defer.returnValue(True)
else: else:
raise DownloadCanceledError() raise DownloadCanceledError()

View file

@ -6,7 +6,6 @@ from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lb
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
class BlobFileTest(unittest.TestCase): class BlobFileTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.db_dir, self.blob_dir = mk_db_and_blob_dir() self.db_dir, self.blob_dir = mk_db_and_blob_dir()
@ -144,4 +143,14 @@ class BlobFileTest(unittest.TestCase):
self.assertEqual(self.fake_content_len, len(c)) self.assertEqual(self.fake_content_len, len(c))
self.assertEqual(bytearray(c), self.fake_content) self.assertEqual(bytearray(c), self.fake_content)
@defer.inlineCallbacks
def test_multiple_writers_save_at_same_time(self):
blob_hash = self.fake_content_hash
blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len)
writer_1, finished_d_1 = blob_file.open_for_writing(peer=1)
writer_2, finished_d_2 = blob_file.open_for_writing(peer=2)
blob_file.save_verified_blob(writer_1)
# second write should fail to save
yield self.assertFailure(blob_file.save_verified_blob(writer_2), DownloadCanceledError)