Merge remote-tracking branch 'origin/fix_blob_reader_closing'

This commit is contained in:
Jack Robison 2017-09-29 12:37:15 -04:00
commit 838436d641
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
9 changed files with 85 additions and 26 deletions

View file

@ -19,6 +19,7 @@ at anytime.
### Fixed ### Fixed
* Fixed handling cancelled blob and availability requests * Fixed handling cancelled blob and availability requests
* Fixed redundant blob requests to a peer * Fixed redundant blob requests to a peer
* Fixed https://github.com/lbryio/lbry/issues/923
### Deprecated ### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.

View file

@ -9,7 +9,7 @@ from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.utils import is_valid_blobhash
from lbrynet.blob.writer import HashBlobWriter from lbrynet.blob.writer import HashBlobWriter
from lbrynet.blob.reader import HashBlobReader from lbrynet.blob.reader import HashBlobReader, HashBlobReader_v0
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -75,19 +75,13 @@ class BlobFile(object):
""" """
open blob for reading open blob for reading
returns a file handle that can be read() from. returns a file like object that can be read() from, and closed() when
once finished with the file handle, user must call close_read_handle() finished
otherwise blob cannot be deleted.
""" """
if self._verified is True: if self._verified is True:
file_handle = None reader = HashBlobReader(self.file_path, self.reader_finished)
try: self.readers += 1
file_handle = open(self.file_path, 'rb') return reader
self.readers += 1
return file_handle
except IOError:
log.exception('Failed to open %s', self.file_path)
self.close_read_handle(file_handle)
return None return None
def delete(self): def delete(self):
@ -150,12 +144,16 @@ class BlobFile(object):
return False return False
def read(self, write_func): def read(self, write_func):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
def close_self(*args): def close_self(*args):
self.close_read_handle(file_handle) self.close_read_handle(file_handle)
return args[0] return args[0]
file_sender = FileSender() file_sender = FileSender()
reader = HashBlobReader(write_func) reader = HashBlobReader_v0(write_func)
file_handle = self.open_for_reading() file_handle = self.open_for_reading()
if file_handle is not None: if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader) d = file_sender.beginFileTransfer(file_handle, reader)
@ -164,6 +162,19 @@ class BlobFile(object):
d = defer.fail(IOError("Could not read the blob")) d = defer.fail(IOError("Could not read the blob"))
return d return d
def close_read_handle(self, file_handle):
"""
This function is only used in StreamBlobDecryptor
and should be deprecated in favor of open_for_reading()
"""
if file_handle is not None:
file_handle.close()
self.readers -= 1
def reader_finished(self, reader):
self.readers -= 1
return defer.succeed(True)
def writer_finished(self, writer, err=None): def writer_finished(self, writer, err=None):
def fire_finished_deferred(): def fire_finished_deferred():
self._verified = True self._verified = True
@ -208,11 +219,6 @@ class BlobFile(object):
d.addBoth(lambda _: writer.close_handle()) d.addBoth(lambda _: writer.close_handle())
return d return d
def close_read_handle(self, file_handle):
if file_handle is not None:
file_handle.close()
self.readers -= 1
@defer.inlineCallbacks @defer.inlineCallbacks
def _save_verified_blob(self, writer): def _save_verified_blob(self, writer):
with self.setting_verified_blob_lock: with self.setting_verified_blob_lock:

View file

@ -5,7 +5,11 @@ from zope.interface import implements
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class HashBlobReader(object): class HashBlobReader_v0(object):
"""
This is a class that is only used in StreamBlobDecryptor
and should be deprecated
"""
implements(interfaces.IConsumer) implements(interfaces.IConsumer)
def __init__(self, write_func): def __init__(self, write_func):
@ -28,3 +32,28 @@ class HashBlobReader(object):
self.write_func(data) self.write_func(data)
if self.streaming is False: if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing) reactor.callLater(0, self.producer.resumeProducing)
class HashBlobReader(object):
"""
This is a file like reader class that supports
read(size) and close()
"""
def __init__(self, file_path, finished_cb):
self.finished_cb = finished_cb
self.finished_cb_d = None
self.read_handle = open(file_path, 'rb')
def __del__(self):
self.close()
def read(self, size=-1):
return self.read_handle.read(size)
def close(self):
# if we've already closed and called finished_cb, do nothing
if self.finished_cb_d is not None:
return
self.read_handle.close()
self.finished_cb_d = self.finished_cb(self)

View file

@ -55,7 +55,7 @@ class BlobStreamDescriptorReader(StreamDescriptorReader):
f = self.blob.open_for_reading() f = self.blob.open_for_reading()
if f is not None: if f is not None:
raw_data = f.read() raw_data = f.read()
self.blob.close_read_handle(f) f.close()
return raw_data return raw_data
else: else:
raise ValueError("Could not open the blob for reading") raise ValueError("Could not open the blob for reading")

View file

@ -89,7 +89,7 @@ class BlobRequestHandler(object):
def cancel_send(self, err): def cancel_send(self, err):
if self.currently_uploading is not None: if self.currently_uploading is not None:
self.currently_uploading.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.currently_uploading = None self.currently_uploading = None
return err return err
@ -225,7 +225,7 @@ class BlobRequestHandler(object):
def set_not_uploading(reason=None): def set_not_uploading(reason=None):
if self.currently_uploading is not None: if self.currently_uploading is not None:
self.currently_uploading.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.currently_uploading = None self.currently_uploading = None
self.file_sender = None self.file_sender = None

View file

@ -2433,7 +2433,7 @@ class Daemon(AuthJSONRPCServer):
if encoding and encoding in decoders: if encoding and encoding in decoders:
blob_file = blob.open_for_reading() blob_file = blob.open_for_reading()
result = decoders[encoding](blob_file.read()) result = decoders[encoding](blob_file.read())
blob.close_read_handle(blob_file) blob_file.close()
else: else:
result = "Downloaded blob %s" % blob_hash result = "Downloaded blob %s" % blob_hash
@ -2682,7 +2682,7 @@ class Daemon(AuthJSONRPCServer):
def read_sd_blob(sd_blob): def read_sd_blob(sd_blob):
sd_blob_file = sd_blob.open_for_reading() sd_blob_file = sd_blob.open_for_reading()
decoded_sd_blob = json.loads(sd_blob_file.read()) decoded_sd_blob = json.loads(sd_blob_file.read())
sd_blob.close_read_handle(sd_blob_file) sd_blob_file.close()
return decoded_sd_blob return decoded_sd_blob
resolved_result = yield self.session.wallet.resolve(uri) resolved_result = yield self.session.wallet.resolve(uri)

View file

@ -94,7 +94,7 @@ class BlobReflectorClient(Protocol):
def set_not_uploading(self): def set_not_uploading(self):
if self.next_blob_to_send is not None: if self.next_blob_to_send is not None:
self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.next_blob_to_send = None self.next_blob_to_send = None
self.file_sender = None self.file_sender = None
@ -105,6 +105,7 @@ class BlobReflectorClient(Protocol):
assert self.read_handle is not None, \ assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer" "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self) d = self.file_sender.beginFileTransfer(self.read_handle, self)
d.addCallback(lambda _: self.read_handle.close())
return d return d
def handle_handshake_response(self, response_dict): def handle_handshake_response(self, response_dict):

View file

@ -179,7 +179,7 @@ class EncryptedFileReflectorClient(Protocol):
def set_not_uploading(self): def set_not_uploading(self):
if self.next_blob_to_send is not None: if self.next_blob_to_send is not None:
log.debug("Close %s", self.next_blob_to_send) log.debug("Close %s", self.next_blob_to_send)
self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle.close()
self.read_handle = None self.read_handle = None
self.next_blob_to_send = None self.next_blob_to_send = None
if self.file_sender is not None: if self.file_sender is not None:
@ -191,6 +191,7 @@ class EncryptedFileReflectorClient(Protocol):
assert self.read_handle is not None, \ assert self.read_handle is not None, \
"self.read_handle was None when trying to start the transfer" "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self) d = self.file_sender.beginFileTransfer(self.read_handle, self)
d.addCallback(lambda _: self.read_handle.close())
return d return d
def handle_handshake_response(self, response_dict): def handle_handshake_response(self, response_dict):

View file

@ -44,9 +44,15 @@ class BlobFileTest(unittest.TestCase):
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
self.assertTrue(blob_file.verified) self.assertTrue(blob_file.verified)
f = blob_file.open_for_reading() f = blob_file.open_for_reading()
self.assertEqual(1, blob_file.readers)
c = f.read() c = f.read()
self.assertEqual(c, self.fake_content) self.assertEqual(c, self.fake_content)
# close reader
f.close()
self.assertEqual(0, blob_file.readers)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_delete(self): def test_delete(self):
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
@ -58,6 +64,21 @@ class BlobFileTest(unittest.TestCase):
blob_file = BlobFile(self.blob_dir, self.fake_content_hash) blob_file = BlobFile(self.blob_dir, self.fake_content_hash)
self.assertFalse(blob_file.verified) self.assertFalse(blob_file.verified)
@defer.inlineCallbacks
def test_delete_fail(self):
# deletes should fail if being written to
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
writer, finished_d = blob_file.open_for_writing(peer=1)
yield self.assertFailure(blob_file.delete(), ValueError)
writer.write(self.fake_content)
writer.close()
# deletes should fail if being read and not closed
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
self.assertTrue(blob_file.verified)
f = blob_file.open_for_reading()
yield self.assertFailure(blob_file.delete(), ValueError)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_too_much_write(self): def test_too_much_write(self):
# writing too much data should result in failure # writing too much data should result in failure