forked from LBRYCommunity/lbry-sdk
work on removing manage from BlobManager and simplify blob deletion
This commit is contained in:
parent
fa9b9233d1
commit
f9b4d46599
1 changed files with 14 additions and 65 deletions
|
@ -11,7 +11,6 @@ from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
|
||||||
from lbrynet.core.Error import NoSuchBlobError
|
from lbrynet.core.Error import NoSuchBlobError
|
||||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -90,20 +89,16 @@ class DiskBlobManager(BlobManager):
|
||||||
# be thousands of blobs loaded up, many stale
|
# be thousands of blobs loaded up, many stale
|
||||||
self.blobs = {}
|
self.blobs = {}
|
||||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
||||||
self._next_manage_call = None
|
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def setup(self):
|
def setup(self):
|
||||||
log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir),
|
log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir),
|
||||||
str(self.db_file))
|
str(self.db_file))
|
||||||
d = self._open_db()
|
yield self._open_db()
|
||||||
d.addCallback(lambda _: self._manage())
|
|
||||||
return d
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
log.info("Stopping the DiskBlobManager")
|
log.info("Stopping the DiskBlobManager")
|
||||||
if self._next_manage_call is not None and self._next_manage_call.active():
|
|
||||||
self._next_manage_call.cancel()
|
|
||||||
self._next_manage_call = None
|
|
||||||
self.db_conn.close()
|
self.db_conn.close()
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
|
@ -150,11 +145,6 @@ class DiskBlobManager(BlobManager):
|
||||||
d = self.blob_completed(new_blob, next_announce_time)
|
d = self.blob_completed(new_blob, next_announce_time)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def delete_blobs(self, blob_hashes):
|
|
||||||
for blob_hash in blob_hashes:
|
|
||||||
if not blob_hash in self.blob_hashes_to_delete:
|
|
||||||
self.blob_hashes_to_delete[blob_hash] = False
|
|
||||||
|
|
||||||
def immediate_announce_all_blobs(self):
|
def immediate_announce_all_blobs(self):
|
||||||
d = self._get_all_verified_blob_hashes()
|
d = self._get_all_verified_blob_hashes()
|
||||||
d.addCallback(self._immediate_announce)
|
d.addCallback(self._immediate_announce)
|
||||||
|
@ -173,58 +163,17 @@ class DiskBlobManager(BlobManager):
|
||||||
d = self._add_blob_to_upload_history(blob_hash, host, rate)
|
d = self._add_blob_to_upload_history(blob_hash, host, rate)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _manage(self):
|
@defer.inlineCallbacks
|
||||||
from twisted.internet import reactor
|
def delete_blobs(self, blob_hashes):
|
||||||
|
bh_to_delete_from_db = []
|
||||||
d = self._delete_blobs_marked_for_deletion()
|
for blob_hash in blob_hashes:
|
||||||
|
try:
|
||||||
def set_next_manage_call():
|
blob = yield self.get_blob(blob_hash)
|
||||||
self._next_manage_call = reactor.callLater(1, self._manage)
|
yield blob.delete()
|
||||||
|
bh_to_delete_from_db.append(blob_hash)
|
||||||
d.addCallback(lambda _: set_next_manage_call())
|
except Exception as e:
|
||||||
|
log.warning("Failed to delete blob file. Reason: %s", e)
|
||||||
def _delete_blobs_marked_for_deletion(self):
|
yield self._delete_blobs_from_db(bh_to_delete_from_db)
|
||||||
|
|
||||||
def remove_from_list(b_h):
|
|
||||||
del self.blob_hashes_to_delete[b_h]
|
|
||||||
return b_h
|
|
||||||
|
|
||||||
def set_not_deleting(err, b_h):
|
|
||||||
log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage())
|
|
||||||
self.blob_hashes_to_delete[b_h] = False
|
|
||||||
return err
|
|
||||||
|
|
||||||
def delete_from_db(result):
|
|
||||||
b_hs = [r[1] for r in result if r[0] is True]
|
|
||||||
if b_hs:
|
|
||||||
d = self._delete_blobs_from_db(b_hs)
|
|
||||||
else:
|
|
||||||
d = defer.succeed(True)
|
|
||||||
|
|
||||||
def log_error(err):
|
|
||||||
log.warning(
|
|
||||||
"Failed to delete completed blobs from the db: %s", err.getErrorMessage())
|
|
||||||
|
|
||||||
d.addErrback(log_error)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def delete(blob, b_h):
|
|
||||||
d = blob.delete()
|
|
||||||
d.addCallbacks(lambda _: remove_from_list(b_h), set_not_deleting, errbackArgs=(b_h,))
|
|
||||||
return d
|
|
||||||
|
|
||||||
ds = []
|
|
||||||
for blob_hash, being_deleted in self.blob_hashes_to_delete.items():
|
|
||||||
if being_deleted is False:
|
|
||||||
self.blob_hashes_to_delete[blob_hash] = True
|
|
||||||
d = self.get_blob(blob_hash)
|
|
||||||
d.addCallbacks(
|
|
||||||
delete, set_not_deleting,
|
|
||||||
callbackArgs=(blob_hash,), errbackArgs=(blob_hash,))
|
|
||||||
ds.append(d)
|
|
||||||
dl = defer.DeferredList(ds, consumeErrors=True)
|
|
||||||
dl.addCallback(delete_from_db)
|
|
||||||
return defer.DeferredList(ds)
|
|
||||||
|
|
||||||
######### database calls #########
|
######### database calls #########
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue