diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 7b5c2fad2..4d174a8e7 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -9,10 +9,10 @@ from twisted.enterprise import adbapi from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.utils import is_valid_blobhash -from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked + log = logging.getLogger(__name__) @@ -52,9 +52,6 @@ class BlobManager(DHTHashSupplier): def get_blob_length(self, blob_hash): pass - def check_consistency(self): - pass - def blob_requested(self, blob_hash): pass @@ -188,9 +185,6 @@ class DiskBlobManager(BlobManager): def get_blob_length(self, blob_hash): return self._get_blob_length(blob_hash) - def check_consistency(self): - return self._check_consistency() - def get_all_verified_blobs(self): d = self._get_all_verified_blob_hashes() d.addCallback(self.completed_blobs) @@ -382,73 +376,6 @@ class DiskBlobManager(BlobManager): return self.db_conn.runInteraction(delete_blobs) - @rerun_if_locked - def _check_consistency(self): - - ALREADY_VERIFIED = 1 - NEWLY_VERIFIED = 2 - INVALID = 3 - - current_time = time.time() - d = self.db_conn.runQuery("select blob_hash, blob_length, last_verified_time from blobs") - - def check_blob(blob_hash, blob_length, verified_time): - file_path = os.path.join(self.blob_dir, blob_hash) - if os.path.isfile(file_path): - if verified_time >= os.path.getctime(file_path): - return ALREADY_VERIFIED - else: - h = get_lbry_hash_obj() - len_so_far = 0 - f = open(file_path) - while True: - data = f.read(2**12) - if not data: - break - h.update(data) - len_so_far += len(data) - if len_so_far == blob_length and h.hexdigest() == blob_hash: - return NEWLY_VERIFIED - return INVALID - - def do_check(blobs): - already_verified = [] - newly_verified = [] - invalid = [] - for blob_hash, blob_length, verified_time in blobs: - status = check_blob(blob_hash, blob_length, verified_time) - if status == ALREADY_VERIFIED: - already_verified.append(blob_hash) - elif status == NEWLY_VERIFIED: - newly_verified.append(blob_hash) - else: - invalid.append(blob_hash) - return already_verified, newly_verified, invalid - - def update_newly_verified(transaction, blobs): - for b in blobs: - transaction.execute("update blobs set last_verified_time = ? where blob_hash = ?", - (current_time, b)) - - def check_blobs(blobs): - - @rerun_if_locked - def update_and_return(status_lists): - - already_verified, newly_verified, invalid = status_lists - - d = self.db_conn.runInteraction(update_newly_verified, newly_verified) - d.addCallback(lambda _: status_lists) - return d - - d = threads.deferToThread(do_check, blobs) - - d.addCallback(update_and_return) - return d - - d.addCallback(check_blobs) - return d - @rerun_if_locked def _get_all_verified_blob_hashes(self): d = self.db_conn.runQuery("select blob_hash, last_verified_time from blobs")