From f181af85479fae7f0c7fec4855f0f92968d1cbcd Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 14 Dec 2016 16:37:17 -0600 Subject: [PATCH] Cleanup code in BlobManager --- lbrynet/core/BlobManager.py | 57 ++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 4d174a8e7..ee688d8ae 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -83,6 +83,8 @@ class DiskBlobManager(BlobManager): self.db_conn = None self.blob_type = BlobFile self.blob_creator_type = BlobFileCreator + # TODO: consider using an LRU for blobs as there could potentially + # be thousands of blobs loaded up, many stale self.blobs = {} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} self._next_manage_call = None @@ -295,18 +297,27 @@ class DiskBlobManager(BlobManager): @rerun_if_locked def _completed_blobs(self, blobs_to_check): + """Returns of the blobs_to_check, which are valid""" blobs_to_check = filter(is_valid_blobhash, blobs_to_check) - def get_blobs_in_db(db_transaction): - blobs_in_db = [] # [(blob_hash, last_verified_time)] + def _get_last_verified_time(db_transaction, blob_hash): + result = db_transaction.execute( + "select last_verified_time from blobs where blob_hash = ?", (blob_hash,)) + row = result.fetchone() + if row: + return row[0] + else: + return None + + def _filter_blobs_in_db(db_transaction, blobs_to_check): for b in blobs_to_check: - result = db_transaction.execute( - "select last_verified_time from blobs where blob_hash = ?", - (b,)) - row = result.fetchone() - if row is not None: - blobs_in_db.append((b, row[0])) - return blobs_in_db + verified_time = _get_last_verified_time(db_transaction, b) + if verified_time: + yield (b, verified_time) + + def get_blobs_in_db(db_transaction, blob_to_check): + # [(blob_hash, last_verified_time)] + return list(_filter_blobs_in_db(db_transaction, blobs_to_check)) def get_valid_blobs(blobs_in_db): @@ -315,23 +326,31 @@ class DiskBlobManager(BlobManager): if os.path.isfile(file_path): if verified_time > os.path.getctime(file_path): return True + else: + log.debug('Verification time for %s is too old (%s < %s)', + file_path, verified_time, os.path.getctime(file_path)) + else: + log.debug('file %s does not exist', file_path) return False - def return_valid_blobs(results): - valid_blobs = [] - for (b, verified_date), (success, result) in zip(blobs_in_db, results): - if success is True and result is True: - valid_blobs.append(b) + def filter_valid_blobs(results): + assert len(blobs_in_db) == len(results) + valid_blobs = [ + b for (b, verified_date), (success, result) in zip(blobs_in_db, results) + if success is True and result is True + ] + log.debug('Of %s blobs, %s were valid', len(results), len(valid_blobs)) return valid_blobs - ds = [] - for b, verified_date in blobs_in_db: - ds.append(threads.deferToThread(check_blob_verified_date, b, verified_date)) + ds = [ + threads.deferToThread(check_blob_verified_date, b, verified_date) + for b, verified_date in blobs_in_db + ] dl = defer.DeferredList(ds) - dl.addCallback(return_valid_blobs) + dl.addCallback(filter_valid_blobs) return dl - d = self.db_conn.runInteraction(get_blobs_in_db) + d = self.db_conn.runInteraction(get_blobs_in_db, blobs_to_check) d.addCallback(get_valid_blobs) return d