From 6c571b5227e417f5519f959a8d17dc9cc6b48f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Job=20Evers=E2=80=90Meltzer?= Date: Fri, 20 Jan 2017 10:54:36 -0600 Subject: [PATCH] Blob verification fixes (#428) * Move the blob verification to the actual Blob object * remove the check on verification time * remove get_blob_length from BlobManager Removed because I'm not sure what checking verification time against ctime gets us, except some protection against an accidental modification of the blob. --- lbrynet/core/BlobManager.py | 149 ++++++------------------------- lbrynet/core/HashBlob.py | 53 +++++++---- lbrynet/lbrynet_daemon/Daemon.py | 9 +- 3 files changed, 68 insertions(+), 143 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 011ffc8a4..f2701c444 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -8,7 +8,6 @@ from twisted.python.failure import Failure from twisted.enterprise import adbapi from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier -from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -37,7 +36,7 @@ class BlobManager(DHTHashSupplier): def blob_completed(self, blob, next_announce_time=None): pass - def completed_blobs(self, blobs_to_check): + def completed_blobs(self, blobhashes_to_check): pass def hashes_to_announce(self): @@ -49,9 +48,6 @@ class BlobManager(DHTHashSupplier): def delete_blob(self, blob_hash): pass - def get_blob_length(self, blob_hash): - pass - def blob_requested(self, blob_hash): pass @@ -78,7 +74,9 @@ class BlobManager(DHTHashSupplier): return self.hash_announcer.immediate_announce(blob_hashes) - +# TODO: Having different managers for different blobs breaks the +# abstraction of a HashBlob. Why should the management of blobs +# care what kind of Blob it has? class DiskBlobManager(BlobManager): """This class stores blobs on the hard disk""" def __init__(self, hash_announcer, blob_dir, db_dir): @@ -126,35 +124,17 @@ class DiskBlobManager(BlobManager): log.debug('Making a new blob for %s', blob_hash) blob = self.blob_type(self.blob_dir, blob_hash, upload_allowed, length) self.blobs[blob_hash] = blob - d = self._completed_blobs([blob_hash]) - - def check_completed(completed_blobs): - - def set_length(length): - blob.length = length - - if len(completed_blobs) == 1 and completed_blobs[0] == blob_hash: - blob.verified = True - inner_d = self._get_blob_length(blob_hash) - inner_d.addCallback(set_length) - inner_d.addCallback(lambda _: blob) - else: - inner_d = defer.succeed(blob) - return inner_d - - d.addCallback(check_completed) - return d + return defer.succeed(blob) def blob_completed(self, blob, next_announce_time=None): if next_announce_time is None: next_announce_time = time.time() + self.hash_reannounce_time - d = self._add_completed_blob(blob.blob_hash, blob.length, - time.time(), next_announce_time) + d = self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time) d.addCallback(lambda _: self._immediate_announce([blob.blob_hash])) return d - def completed_blobs(self, blobs_to_check): - return self._completed_blobs(blobs_to_check) + def completed_blobs(self, blobhashes_to_check): + return self._completed_blobs(blobhashes_to_check) def hashes_to_announce(self): next_announce_time = time.time() + self.hash_reannounce_time @@ -166,7 +146,6 @@ class DiskBlobManager(BlobManager): assert blob_creator.blob_hash not in self.blobs assert blob_creator.length is not None new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, True, blob_creator.length) - new_blob.verified = True self.blobs[blob_creator.blob_hash] = new_blob self._immediate_announce([blob_creator.blob_hash]) next_announce_time = time.time() + self.hash_reannounce_time @@ -178,17 +157,11 @@ class DiskBlobManager(BlobManager): if not blob_hash in self.blob_hashes_to_delete: self.blob_hashes_to_delete[blob_hash] = False - def update_all_last_verified_dates(self, timestamp): - return self._update_all_last_verified_dates(timestamp) - def immediate_announce_all_blobs(self): d = self._get_all_verified_blob_hashes() d.addCallback(self._immediate_announce) return d - def get_blob_length(self, blob_hash): - return self._get_blob_length(blob_hash) - def get_all_verified_blobs(self): d = self._get_all_verified_blob_hashes() d.addCallback(self.completed_blobs) @@ -246,7 +219,7 @@ class DiskBlobManager(BlobManager): for blob_hash, being_deleted in self.blob_hashes_to_delete.items(): if being_deleted is False: self.blob_hashes_to_delete[blob_hash] = True - d = self.get_blob(blob_hash, True) + d = self.get_blob(blob_hash) d.addCallbacks( delete, set_not_deleting, callbackArgs=(blob_hash,), errbackArgs=(blob_hash,)) @@ -288,80 +261,21 @@ class DiskBlobManager(BlobManager): return self.db_conn.runInteraction(create_tables) @rerun_if_locked - def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None): + def _add_completed_blob(self, blob_hash, length, next_announce_time): log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) - if next_announce_time is None: - next_announce_time = timestamp - d = self.db_conn.runQuery("insert into blobs values (?, ?, ?, ?)", - (blob_hash, length, timestamp, next_announce_time)) + d = self.db_conn.runQuery( + "insert into blobs (blob_hash, blob_length, next_announce_time) values (?, ?, ?)", + (blob_hash, length, next_announce_time) + ) d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) return d - @rerun_if_locked - def _completed_blobs(self, blobs_to_check): - """Returns of the blobs_to_check, which are valid""" - blobs_to_check = filter(is_valid_blobhash, blobs_to_check) - - def _get_last_verified_time(db_transaction, blob_hash): - result = db_transaction.execute( - "select last_verified_time from blobs where blob_hash = ?", (blob_hash,)) - row = result.fetchone() - if row: - return row[0] - else: - return None - - def _filter_blobs_in_db(db_transaction, blobs_to_check): - for b in blobs_to_check: - verified_time = _get_last_verified_time(db_transaction, b) - if verified_time: - yield (b, verified_time) - - def get_blobs_in_db(db_transaction, blob_to_check): - # [(blob_hash, last_verified_time)] - return list(_filter_blobs_in_db(db_transaction, blobs_to_check)) - - def get_valid_blobs(blobs_in_db): - - def check_blob_verified_date(b, verified_time): - file_path = os.path.join(self.blob_dir, b) - if os.path.isfile(file_path): - if verified_time > os.path.getctime(file_path): - return True - else: - log.debug('Verification time for %s is too old (%s < %s)', - file_path, verified_time, os.path.getctime(file_path)) - else: - log.debug('file %s does not exist', file_path) - return False - - def filter_valid_blobs(results): - assert len(blobs_in_db) == len(results) - valid_blobs = [ - b for (b, verified_date), (success, result) in zip(blobs_in_db, results) - if success is True and result is True - ] - log.debug('Of %s blobs, %s were valid', len(results), len(valid_blobs)) - return valid_blobs - - ds = [ - threads.deferToThread(check_blob_verified_date, b, verified_date) - for b, verified_date in blobs_in_db - ] - dl = defer.DeferredList(ds) - dl.addCallback(filter_valid_blobs) - return dl - - d = self.db_conn.runInteraction(get_blobs_in_db, blobs_to_check) - d.addCallback(get_valid_blobs) - return d - - @rerun_if_locked - def _get_blob_length(self, blob): - d = self.db_conn.runQuery("select blob_length from blobs where blob_hash = ?", (blob,)) - d.addCallback(lambda r: r[0][0] if len(r) else Failure(NoSuchBlobError(blob))) - return d - + @defer.inlineCallbacks + def _completed_blobs(self, blobhashes_to_check): + """Returns of the blobhashes_to_check, which are valid""" + blobs = yield defer.DeferredList([self.get_blob(b, True) for b in blobhashes_to_check]) + blob_hashes = [b.blob_hash for success, b in blobs if success and b.verified] + defer.returnValue(blob_hashes) @rerun_if_locked def _update_blob_verified_timestamp(self, blob, timestamp): @@ -384,10 +298,6 @@ class DiskBlobManager(BlobManager): return self.db_conn.runInteraction(get_and_update) - @rerun_if_locked - def _update_all_last_verified_dates(self, timestamp): - return self.db_conn.runQuery("update blobs set last_verified_date = ?", (timestamp,)) - @rerun_if_locked def _delete_blobs_from_db(self, blob_hashes): @@ -430,6 +340,9 @@ class DiskBlobManager(BlobManager): return d +# TODO: Having different managers for different blobs breaks the +# abstraction of a HashBlob. Why should the management of blobs +# care what kind of Blob it has? class TempBlobManager(BlobManager): """This class stores blobs in memory""" def __init__(self, hash_announcer): @@ -469,10 +382,10 @@ class TempBlobManager(BlobManager): self.blob_next_announces[blob.blob_hash] = next_announce_time return defer.succeed(True) - def completed_blobs(self, blobs_to_check): + def completed_blobs(self, blobhashes_to_check): blobs = [ b.blob_hash for b in self.blobs.itervalues() - if b.blob_hash in blobs_to_check and b.is_validated() + if b.blob_hash in blobhashes_to_check and b.is_validated() ] return defer.succeed(blobs) @@ -496,9 +409,11 @@ class TempBlobManager(BlobManager): assert blob_creator.blob_hash not in self.blobs assert blob_creator.length is not None new_blob = self.blob_type(blob_creator.blob_hash, True, blob_creator.length) - new_blob.verified = True + # TODO: change this; its breaks the encapsulation of the + # blob. Maybe better would be to have the blob_creator + # produce a blob. new_blob.data_buffer = blob_creator.data_buffer - new_blob.length = blob_creator.length + new_blob._verified = True self.blobs[blob_creator.blob_hash] = new_blob self._immediate_announce([blob_creator.blob_hash]) next_announce_time = time.time() + self.hash_reannounce_time @@ -511,12 +426,6 @@ class TempBlobManager(BlobManager): if not blob_hash in self.blob_hashes_to_delete: self.blob_hashes_to_delete[blob_hash] = False - def get_blob_length(self, blob_hash): - if blob_hash in self.blobs: - if self.blobs[blob_hash].length is not None: - return defer.succeed(self.blobs[blob_hash].length) - return defer.fail(NoSuchBlobError(blob_hash)) - def immediate_announce_all_blobs(self): if self.hash_announcer: return self.hash_announcer.immediate_announce(self.blobs.iterkeys()) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index f32d374c4..af7dca563 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -11,6 +11,7 @@ from zope.interface import implements from lbrynet import conf from lbrynet.core.Error import DownloadCanceledError, InvalidDataError from lbrynet.core.cryptoutils import get_lbry_hash_obj +from lbrynet.core.utils import is_valid_blobhash log = logging.getLogger(__name__) @@ -48,11 +49,15 @@ class HashBlobWriter(object): self.write_handle = write_handle self.length_getter = length_getter self.finished_cb = finished_cb - self.hashsum = get_lbry_hash_obj() + self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 + @property + def blob_hash(self): + return self._hashsum.hexdigest() + def write(self, data): - self.hashsum.update(data) + self._hashsum.update(data) self.len_so_far += len(data) if self.len_so_far > self.length_getter(): self.finished_cb( @@ -78,14 +83,20 @@ class HashBlob(object): """A chunk of data available on the network which is specified by a hashsum""" def __init__(self, blob_hash, upload_allowed, length=None): + assert is_valid_blobhash(blob_hash) self.blob_hash = blob_hash self.length = length self.writers = {} # {Peer: writer, finished_deferred} self.finished_deferred = None - self.verified = False + self._verified = False self.upload_allowed = upload_allowed self.readers = 0 + @property + def verified(self): + # protect verified from being modified by other classes + return self._verified + def set_length(self, length): if self.length is not None and length == self.length: return True @@ -100,10 +111,7 @@ class HashBlob(object): return self.length def is_validated(self): - if self.verified: - return True - else: - return False + return bool(self._verified) def is_downloading(self): if self.writers: @@ -129,7 +137,7 @@ class HashBlob(object): def writer_finished(self, writer, err=None): def fire_finished_deferred(): - self.verified = True + self._verified = True for p, (w, finished_deferred) in self.writers.items(): if w == writer: finished_deferred.callback(self) @@ -151,8 +159,8 @@ class HashBlob(object): w.cancel() if err is None: - if writer.len_so_far == self.length and writer.hashsum.hexdigest() == self.blob_hash: - if self.verified is False: + if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: + if self._verified is False: d = self._save_verified_blob(writer) d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred) d.addCallback(lambda _: cancel_other_downloads()) @@ -162,7 +170,7 @@ class HashBlob(object): else: err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}" err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash, - writer.hashsum.hexdigest()) + writer.blob_hash) errback_finished_deferred(Failure(InvalidDataError(err_string))) d = defer.succeed(True) else: @@ -206,6 +214,14 @@ class BlobFile(HashBlob): self.file_path = os.path.join(blob_dir, self.blob_hash) self.setting_verified_blob_lock = threading.Lock() self.moved_verified_blob = False + if os.path.isfile(self.file_path): + self.set_length(os.path.getsize(self.file_path)) + # This assumes that the hash of the blob has already been + # checked as part of the blob creation process. It might + # be worth having a function that checks the actual hash; + # its probably too expensive to have that check be part of + # this call. + self._verified = True def open_for_writing(self, peer): if not peer in self.writers: @@ -220,7 +236,7 @@ class BlobFile(HashBlob): return None, None, None def open_for_reading(self): - if self.verified is True: + if self._verified is True: file_handle = None try: file_handle = open(self.file_path, 'rb') @@ -232,7 +248,7 @@ class BlobFile(HashBlob): def delete(self): if not self.writers and not self.readers: - self.verified = False + self._verified = False self.moved_verified_blob = False def delete_from_file_system(): @@ -299,13 +315,13 @@ class TempBlob(HashBlob): return None, None, None def open_for_reading(self): - if self.verified is True: + if self._verified is True: return StringIO(self.data_buffer) return None def delete(self): if not self.writers and not self.readers: - self.verified = False + self._verified = False self.data_buffer = '' return defer.succeed(True) else: @@ -333,7 +349,7 @@ class TempBlob(HashBlob): class HashBlobCreator(object): def __init__(self, blob_manager): self.blob_manager = blob_manager - self.hashsum = get_lbry_hash_obj() + self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 self.blob_hash = None self.length = None @@ -346,7 +362,7 @@ class HashBlobCreator(object): if self.length == 0: self.blob_hash = None else: - self.blob_hash = self.hashsum.hexdigest() + self.blob_hash = self._hashsum.hexdigest() d = self._close() if self.blob_hash is not None: d.addCallback(lambda _: self.blob_manager.creator_finished(self)) @@ -356,7 +372,7 @@ class HashBlobCreator(object): return d def write(self, data): - self.hashsum.update(data) + self._hashsum.update(data) self.len_so_far += len(data) self._write(data) @@ -394,6 +410,7 @@ class BlobFileCreator(HashBlobCreator): class TempBlobCreator(HashBlobCreator): def __init__(self, blob_manager): HashBlobCreator.__init__(self, blob_manager) + # TODO: use StringIO self.data_buffer = '' def _close(self): diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index d57d392ea..3aa9924a7 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -184,12 +184,11 @@ class AlwaysSend(object): return d +@defer.inlineCallbacks def calculate_available_blob_size(blob_manager): - d = blob_manager.get_all_verified_blobs() - d.addCallback( - lambda blobs: defer.DeferredList([blob_manager.get_blob_length(b) for b in blobs])) - d.addCallback(lambda blob_lengths: sum(val for success, val in blob_lengths if success)) - return d + blob_hashes = yield blob_manager.get_all_verified_blobs() + blobs = yield defer.DeferredList([blob_manager.get_blob(b) for b in blob_hashes]) + defer.returnValue(sum(b.length for success, b in blobs if success and b.length)) class Daemon(AuthJSONRPCServer):