Cleanup code in BlobManager
This commit is contained in:
parent
84baa5e065
commit
f181af8547
1 changed files with 38 additions and 19 deletions
|
@ -83,6 +83,8 @@ class DiskBlobManager(BlobManager):
|
|||
self.db_conn = None
|
||||
self.blob_type = BlobFile
|
||||
self.blob_creator_type = BlobFileCreator
|
||||
# TODO: consider using an LRU for blobs as there could potentially
|
||||
# be thousands of blobs loaded up, many stale
|
||||
self.blobs = {}
|
||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
||||
self._next_manage_call = None
|
||||
|
@ -295,18 +297,27 @@ class DiskBlobManager(BlobManager):
|
|||
|
||||
@rerun_if_locked
|
||||
def _completed_blobs(self, blobs_to_check):
|
||||
"""Returns of the blobs_to_check, which are valid"""
|
||||
blobs_to_check = filter(is_valid_blobhash, blobs_to_check)
|
||||
|
||||
def get_blobs_in_db(db_transaction):
|
||||
blobs_in_db = [] # [(blob_hash, last_verified_time)]
|
||||
def _get_last_verified_time(db_transaction, blob_hash):
|
||||
result = db_transaction.execute(
|
||||
"select last_verified_time from blobs where blob_hash = ?", (blob_hash,))
|
||||
row = result.fetchone()
|
||||
if row:
|
||||
return row[0]
|
||||
else:
|
||||
return None
|
||||
|
||||
def _filter_blobs_in_db(db_transaction, blobs_to_check):
|
||||
for b in blobs_to_check:
|
||||
result = db_transaction.execute(
|
||||
"select last_verified_time from blobs where blob_hash = ?",
|
||||
(b,))
|
||||
row = result.fetchone()
|
||||
if row is not None:
|
||||
blobs_in_db.append((b, row[0]))
|
||||
return blobs_in_db
|
||||
verified_time = _get_last_verified_time(db_transaction, b)
|
||||
if verified_time:
|
||||
yield (b, verified_time)
|
||||
|
||||
def get_blobs_in_db(db_transaction, blob_to_check):
|
||||
# [(blob_hash, last_verified_time)]
|
||||
return list(_filter_blobs_in_db(db_transaction, blobs_to_check))
|
||||
|
||||
def get_valid_blobs(blobs_in_db):
|
||||
|
||||
|
@ -315,23 +326,31 @@ class DiskBlobManager(BlobManager):
|
|||
if os.path.isfile(file_path):
|
||||
if verified_time > os.path.getctime(file_path):
|
||||
return True
|
||||
else:
|
||||
log.debug('Verification time for %s is too old (%s < %s)',
|
||||
file_path, verified_time, os.path.getctime(file_path))
|
||||
else:
|
||||
log.debug('file %s does not exist', file_path)
|
||||
return False
|
||||
|
||||
def return_valid_blobs(results):
|
||||
valid_blobs = []
|
||||
for (b, verified_date), (success, result) in zip(blobs_in_db, results):
|
||||
if success is True and result is True:
|
||||
valid_blobs.append(b)
|
||||
def filter_valid_blobs(results):
|
||||
assert len(blobs_in_db) == len(results)
|
||||
valid_blobs = [
|
||||
b for (b, verified_date), (success, result) in zip(blobs_in_db, results)
|
||||
if success is True and result is True
|
||||
]
|
||||
log.debug('Of %s blobs, %s were valid', len(results), len(valid_blobs))
|
||||
return valid_blobs
|
||||
|
||||
ds = []
|
||||
for b, verified_date in blobs_in_db:
|
||||
ds.append(threads.deferToThread(check_blob_verified_date, b, verified_date))
|
||||
ds = [
|
||||
threads.deferToThread(check_blob_verified_date, b, verified_date)
|
||||
for b, verified_date in blobs_in_db
|
||||
]
|
||||
dl = defer.DeferredList(ds)
|
||||
dl.addCallback(return_valid_blobs)
|
||||
dl.addCallback(filter_valid_blobs)
|
||||
return dl
|
||||
|
||||
d = self.db_conn.runInteraction(get_blobs_in_db)
|
||||
d = self.db_conn.runInteraction(get_blobs_in_db, blobs_to_check)
|
||||
d.addCallback(get_valid_blobs)
|
||||
return d
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue