diff --git a/CHANGELOG.md b/CHANGELOG.md index b75a4e18b..11e0ca8bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ at anytime. * Fixed some log messages throwing exceptions * Fix shutdown of the blob tracker by Session * Fixed claim_new_support docstrings + * Fixed BlobManager causing functional tests to fail, removed its unneeded manage() loop ### Deprecated * diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 548f28b6b..e641fda55 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -11,7 +11,6 @@ from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked - log = logging.getLogger(__name__) @@ -90,20 +89,16 @@ class DiskBlobManager(BlobManager): # be thousands of blobs loaded up, many stale self.blobs = {} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} - self._next_manage_call = None + + @defer.inlineCallbacks def setup(self): log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir), str(self.db_file)) - d = self._open_db() - d.addCallback(lambda _: self._manage()) - return d + yield self._open_db() def stop(self): log.info("Stopping the DiskBlobManager") - if self._next_manage_call is not None and self._next_manage_call.active(): - self._next_manage_call.cancel() - self._next_manage_call = None self.db_conn.close() return defer.succeed(True) @@ -150,11 +145,6 @@ class DiskBlobManager(BlobManager): d = self.blob_completed(new_blob, next_announce_time) return d - def delete_blobs(self, blob_hashes): - for blob_hash in blob_hashes: - if not blob_hash in self.blob_hashes_to_delete: - self.blob_hashes_to_delete[blob_hash] = False - def immediate_announce_all_blobs(self): d = self._get_all_verified_blob_hashes() d.addCallback(self._immediate_announce) @@ -173,58 +163,17 @@ class DiskBlobManager(BlobManager): d = self._add_blob_to_upload_history(blob_hash, host, rate) return d - def _manage(self): - from twisted.internet import reactor - - d = self._delete_blobs_marked_for_deletion() - - def set_next_manage_call(): - self._next_manage_call = reactor.callLater(1, self._manage) - - d.addCallback(lambda _: set_next_manage_call()) - - def _delete_blobs_marked_for_deletion(self): - - def remove_from_list(b_h): - del self.blob_hashes_to_delete[b_h] - return b_h - - def set_not_deleting(err, b_h): - log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage()) - self.blob_hashes_to_delete[b_h] = False - return err - - def delete_from_db(result): - b_hs = [r[1] for r in result if r[0] is True] - if b_hs: - d = self._delete_blobs_from_db(b_hs) - else: - d = defer.succeed(True) - - def log_error(err): - log.warning( - "Failed to delete completed blobs from the db: %s", err.getErrorMessage()) - - d.addErrback(log_error) - return d - - def delete(blob, b_h): - d = blob.delete() - d.addCallbacks(lambda _: remove_from_list(b_h), set_not_deleting, errbackArgs=(b_h,)) - return d - - ds = [] - for blob_hash, being_deleted in self.blob_hashes_to_delete.items(): - if being_deleted is False: - self.blob_hashes_to_delete[blob_hash] = True - d = self.get_blob(blob_hash) - d.addCallbacks( - delete, set_not_deleting, - callbackArgs=(blob_hash,), errbackArgs=(blob_hash,)) - ds.append(d) - dl = defer.DeferredList(ds, consumeErrors=True) - dl.addCallback(delete_from_db) - return defer.DeferredList(ds) + @defer.inlineCallbacks + def delete_blobs(self, blob_hashes): + bh_to_delete_from_db = [] + for blob_hash in blob_hashes: + try: + blob = yield self.get_blob(blob_hash) + yield blob.delete() + bh_to_delete_from_db.append(blob_hash) + except Exception as e: + log.warning("Failed to delete blob file. Reason: %s", e) + yield self._delete_blobs_from_db(bh_to_delete_from_db) ######### database calls ######### @@ -308,8 +257,13 @@ class DiskBlobManager(BlobManager): return self.db_conn.runInteraction(delete_blobs) @rerun_if_locked - def _get_all_verified_blob_hashes(self): + def _get_all_blob_hashes(self): d = self.db_conn.runQuery("select blob_hash from blobs") + return d + + @rerun_if_locked + def _get_all_verified_blob_hashes(self): + d = self._get_all_blob_hashes() def get_verified_blobs(blobs): verified_blobs = [] diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index f38d9a951..bd1919aa5 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -746,8 +746,9 @@ class TestTransfer(TestCase): yield download_file(sd_hash) yield check_md5_sum() yield download_file(sd_hash) - yield delete_lbry_file() + yield check_lbry_file() + yield delete_lbry_file() def stop(arg): if isinstance(arg, Failure): diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py new file mode 100644 index 000000000..1ef13a2d2 --- /dev/null +++ b/tests/unit/core/test_BlobManager.py @@ -0,0 +1,112 @@ +import tempfile +import shutil +import mock +import os +import random +import string + +from tests.util import random_lbry_hash +from lbrynet.core.BlobManager import DiskBlobManager +from lbrynet.core.HashAnnouncer import DummyHashAnnouncer +from lbrynet.core.Peer import Peer +from lbrynet.core.cryptoutils import get_lbry_hash_obj +from twisted.trial import unittest + +from twisted.internet import defer + +class BlobManagerTest(unittest.TestCase): + def setUp(self): + self.blob_dir = tempfile.mkdtemp() + self.db_dir = tempfile.mkdtemp() + hash_announcer = DummyHashAnnouncer() + self.bm = DiskBlobManager(hash_announcer, self.blob_dir, self.db_dir) + self.peer = Peer('somehost',22) + + def tearDown(self): + self.bm.stop() + shutil.rmtree(self.blob_dir) + shutil.rmtree(self.db_dir) + + @defer.inlineCallbacks + def _create_and_add_blob(self): + # create and add blob to blob manager + data_len = random.randint(1,1000) + data = ''.join(random.choice(string.lowercase) for data_len in range(data_len)) + + hashobj = get_lbry_hash_obj() + hashobj.update(data) + out=hashobj.hexdigest() + blob_hash=out + + # create new blob + yield self.bm.setup() + blob = yield self.bm.get_blob(blob_hash,len(data)) + + finished_d, write, cancel =yield blob.open_for_writing(self.peer) + yield write(data) + yield self.bm.blob_completed(blob) + yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data)) + + # check to see if blob is there + self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hash))) + blobs = yield self.bm.get_all_verified_blobs() + self.assertTrue(blob_hash in blobs) + defer.returnValue(blob_hash) + + @defer.inlineCallbacks + def test_create_blob(self): + blob_hashes = [] + + # create a bunch of blobs + for i in range(0,10): + blob_hash = yield self._create_and_add_blob() + blob_hashes.append(blob_hash) + blobs = yield self.bm.get_all_verified_blobs() + self.assertEqual(10,len(blobs)) + + + @defer.inlineCallbacks + def test_delete_blob(self): + # create blob + blob_hash = yield self._create_and_add_blob() + blobs = yield self.bm.get_all_verified_blobs() + self.assertEqual(len(blobs),1) + + # delete blob + yield self.bm.delete_blobs([blob_hash]) + self.assertFalse(os.path.isfile(os.path.join(self.blob_dir,blob_hash))) + blobs = yield self.bm.get_all_verified_blobs() + self.assertEqual(len(blobs),0) + blobs = yield self.bm._get_all_blob_hashes() + self.assertEqual(len(blobs),0) + + # delete blob that does not exist, nothing will + # happen + blob_hash= random_lbry_hash() + out = yield self.bm.delete_blobs([blob_hash]) + + + @defer.inlineCallbacks + def test_delete_open_blob(self): + # Test that a blob that is opened for writing will not be deleted + + # create blobs + blob_hashes =[] + for i in range(0,10): + blob_hash = yield self._create_and_add_blob() + blob_hashes.append(blob_hash) + blobs = yield self.bm.get_all_verified_blobs() + self.assertEqual(len(blobs),10) + + # open the last blob + blob = yield self.bm.get_blob(blob_hashes[-1]) + yield blob.open_for_writing(self.peer) + + # delete the last blob and check if it still exists + out = yield self.bm.delete_blobs([blob_hash]) + blobs = yield self.bm.get_all_verified_blobs() + self.assertEqual(len(blobs),10) + self.assertTrue(blob_hashes[-1] in blobs) + self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1]))) + +