Merge branch 'master' into add_more_session_info

This commit is contained in:
Jack Robison 2017-10-09 10:39:48 -04:00
commit 7898b9c2d8
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
6 changed files with 34 additions and 19 deletions

View file

@ -18,6 +18,7 @@ at anytime.
* Fixed https://github.com/lbryio/lbry/issues/923 * Fixed https://github.com/lbryio/lbry/issues/923
* Fixed concurrent reflects opening too many files * Fixed concurrent reflects opening too many files
* Fixed cases when reflecting would fail on error conditions * Fixed cases when reflecting would fail on error conditions
* Fixed deadlocks from occuring during blob writes
### Deprecated ### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.

View file

@ -1,6 +1,6 @@
import logging import logging
__version__ = "0.17.0rc11" __version__ = "0.17.0rc12"
version = tuple(__version__.split('.')) version = tuple(__version__.split('.'))
logging.getLogger(__name__).addHandler(logging.NullHandler()) logging.getLogger(__name__).addHandler(logging.NullHandler())

View file

@ -1,6 +1,5 @@
import logging import logging
import os import os
import threading
from twisted.internet import defer, threads from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from twisted.web.client import FileBodyProducer from twisted.web.client import FileBodyProducer
@ -40,8 +39,8 @@ class BlobFile(object):
self.readers = 0 self.readers = 0
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash) self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock() self.blob_write_lock = defer.DeferredLock()
self.moved_verified_blob = False self.saved_verified_blob = False
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path)) self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been # This assumes that the hash of the blob has already been
@ -93,7 +92,7 @@ class BlobFile(object):
""" """
if not self.writers and not self.readers: if not self.writers and not self.readers:
self._verified = False self._verified = False
self.moved_verified_blob = False self.saved_verified_blob = False
def delete_from_file_system(): def delete_from_file_system():
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):
@ -201,7 +200,7 @@ class BlobFile(object):
if err is None: if err is None:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False: if self._verified is False:
d = self._save_verified_blob(writer) d = self.save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads()) d.addCallback(lambda _: cancel_other_downloads())
else: else:
@ -219,15 +218,19 @@ class BlobFile(object):
d.addBoth(lambda _: writer.close_handle()) d.addBoth(lambda _: writer.close_handle())
return d return d
def save_verified_blob(self, writer):
# we cannot have multiple _save_verified_blob interrupting
# each other, can happen since startProducing is a deferred
return self.blob_write_lock.run(self._save_verified_blob, writer)
@defer.inlineCallbacks @defer.inlineCallbacks
def _save_verified_blob(self, writer): def _save_verified_blob(self, writer):
with self.setting_verified_blob_lock: if self.saved_verified_blob is False:
if self.moved_verified_blob is False: writer.write_handle.seek(0)
writer.write_handle.seek(0) out_path = os.path.join(self.blob_dir, self.blob_hash)
out_path = os.path.join(self.blob_dir, self.blob_hash) producer = FileBodyProducer(writer.write_handle)
producer = FileBodyProducer(writer.write_handle) yield producer.startProducing(open(out_path, 'wb'))
yield producer.startProducing(open(out_path, 'wb')) self.saved_verified_blob = True
self.moved_verified_blob = True defer.returnValue(True)
defer.returnValue(True) else:
else: raise DownloadCanceledError()
raise DownloadCanceledError()

View file

@ -33,6 +33,7 @@ class HashBlobReader_v0(object):
if self.streaming is False: if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing) reactor.callLater(0, self.producer.resumeProducing)
class HashBlobReader(object): class HashBlobReader(object):
""" """
This is a file like reader class that supports This is a file like reader class that supports

View file

@ -331,8 +331,9 @@ class ReflectorServer(Protocol):
return d return d
def determine_missing_blobs(self, sd_blob): def determine_missing_blobs(self, sd_blob):
with sd_blob.open_for_reading() as sd_file: reader = sd_blob.open_for_reading()
sd_blob_data = sd_file.read() sd_blob_data = reader.read()
reader.close()
decoded_sd_blob = json.loads(sd_blob_data) decoded_sd_blob = json.loads(sd_blob_data)
return self.get_unvalidated_blobs_in_stream(decoded_sd_blob) return self.get_unvalidated_blobs_in_stream(decoded_sd_blob)

View file

@ -6,7 +6,6 @@ from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lb
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
class BlobFileTest(unittest.TestCase): class BlobFileTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.db_dir, self.blob_dir = mk_db_and_blob_dir() self.db_dir, self.blob_dir = mk_db_and_blob_dir()
@ -144,4 +143,14 @@ class BlobFileTest(unittest.TestCase):
self.assertEqual(self.fake_content_len, len(c)) self.assertEqual(self.fake_content_len, len(c))
self.assertEqual(bytearray(c), self.fake_content) self.assertEqual(bytearray(c), self.fake_content)
@defer.inlineCallbacks
def test_multiple_writers_save_at_same_time(self):
blob_hash = self.fake_content_hash
blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len)
writer_1, finished_d_1 = blob_file.open_for_writing(peer=1)
writer_2, finished_d_2 = blob_file.open_for_writing(peer=2)
blob_file.save_verified_blob(writer_1)
# second write should fail to save
yield self.assertFailure(blob_file.save_verified_blob(writer_2), DownloadCanceledError)