From f9b4d465990d6f1c39e580a5f29b458f5c968222 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 16 Jun 2017 13:16:19 -0400 Subject: [PATCH] work on removing manage from BlobManager and simplify blob deletion --- lbrynet/core/BlobManager.py | 79 +++++++------------------------------ 1 file changed, 14 insertions(+), 65 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 548f28b6b..dc572be64 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -11,7 +11,6 @@ from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked - log = logging.getLogger(__name__) @@ -90,20 +89,16 @@ class DiskBlobManager(BlobManager): # be thousands of blobs loaded up, many stale self.blobs = {} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} - self._next_manage_call = None + + @defer.inlineCallbacks def setup(self): log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir), str(self.db_file)) - d = self._open_db() - d.addCallback(lambda _: self._manage()) - return d + yield self._open_db() def stop(self): log.info("Stopping the DiskBlobManager") - if self._next_manage_call is not None and self._next_manage_call.active(): - self._next_manage_call.cancel() - self._next_manage_call = None self.db_conn.close() return defer.succeed(True) @@ -150,11 +145,6 @@ class DiskBlobManager(BlobManager): d = self.blob_completed(new_blob, next_announce_time) return d - def delete_blobs(self, blob_hashes): - for blob_hash in blob_hashes: - if not blob_hash in self.blob_hashes_to_delete: - self.blob_hashes_to_delete[blob_hash] = False - def immediate_announce_all_blobs(self): d = self._get_all_verified_blob_hashes() d.addCallback(self._immediate_announce) @@ -173,58 +163,17 @@ class DiskBlobManager(BlobManager): d = self._add_blob_to_upload_history(blob_hash, host, rate) return d - def _manage(self): - from twisted.internet import reactor - - d = self._delete_blobs_marked_for_deletion() - - def set_next_manage_call(): - self._next_manage_call = reactor.callLater(1, self._manage) - - d.addCallback(lambda _: set_next_manage_call()) - - def _delete_blobs_marked_for_deletion(self): - - def remove_from_list(b_h): - del self.blob_hashes_to_delete[b_h] - return b_h - - def set_not_deleting(err, b_h): - log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage()) - self.blob_hashes_to_delete[b_h] = False - return err - - def delete_from_db(result): - b_hs = [r[1] for r in result if r[0] is True] - if b_hs: - d = self._delete_blobs_from_db(b_hs) - else: - d = defer.succeed(True) - - def log_error(err): - log.warning( - "Failed to delete completed blobs from the db: %s", err.getErrorMessage()) - - d.addErrback(log_error) - return d - - def delete(blob, b_h): - d = blob.delete() - d.addCallbacks(lambda _: remove_from_list(b_h), set_not_deleting, errbackArgs=(b_h,)) - return d - - ds = [] - for blob_hash, being_deleted in self.blob_hashes_to_delete.items(): - if being_deleted is False: - self.blob_hashes_to_delete[blob_hash] = True - d = self.get_blob(blob_hash) - d.addCallbacks( - delete, set_not_deleting, - callbackArgs=(blob_hash,), errbackArgs=(blob_hash,)) - ds.append(d) - dl = defer.DeferredList(ds, consumeErrors=True) - dl.addCallback(delete_from_db) - return defer.DeferredList(ds) + @defer.inlineCallbacks + def delete_blobs(self, blob_hashes): + bh_to_delete_from_db = [] + for blob_hash in blob_hashes: + try: + blob = yield self.get_blob(blob_hash) + yield blob.delete() + bh_to_delete_from_db.append(blob_hash) + except Exception as e: + log.warning("Failed to delete blob file. Reason: %s", e) + yield self._delete_blobs_from_db(bh_to_delete_from_db) ######### database calls #########