Merge pull request #704 from lbryio/fix_blobmanager_manage

Fix blobmanager manage
This commit is contained in:
Jack Robison 2017-06-23 13:26:21 -04:00 committed by GitHub
commit 5e0a82da54
4 changed files with 135 additions and 67 deletions

View file

@ -20,6 +20,7 @@ at anytime.
* Fixed some log messages throwing exceptions * Fixed some log messages throwing exceptions
* Fix shutdown of the blob tracker by Session * Fix shutdown of the blob tracker by Session
* Fixed claim_new_support docstrings * Fixed claim_new_support docstrings
* Fixed BlobManager causing functional tests to fail, removed its unneeded manage() loop
### Deprecated ### Deprecated
* *

View file

@ -11,7 +11,6 @@ from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.Error import NoSuchBlobError
from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -90,20 +89,16 @@ class DiskBlobManager(BlobManager):
# be thousands of blobs loaded up, many stale # be thousands of blobs loaded up, many stale
self.blobs = {} self.blobs = {}
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
self._next_manage_call = None
@defer.inlineCallbacks
def setup(self): def setup(self):
log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir), log.info("Setting up the DiskBlobManager. blob_dir: %s, db_file: %s", str(self.blob_dir),
str(self.db_file)) str(self.db_file))
d = self._open_db() yield self._open_db()
d.addCallback(lambda _: self._manage())
return d
def stop(self): def stop(self):
log.info("Stopping the DiskBlobManager") log.info("Stopping the DiskBlobManager")
if self._next_manage_call is not None and self._next_manage_call.active():
self._next_manage_call.cancel()
self._next_manage_call = None
self.db_conn.close() self.db_conn.close()
return defer.succeed(True) return defer.succeed(True)
@ -150,11 +145,6 @@ class DiskBlobManager(BlobManager):
d = self.blob_completed(new_blob, next_announce_time) d = self.blob_completed(new_blob, next_announce_time)
return d return d
def delete_blobs(self, blob_hashes):
for blob_hash in blob_hashes:
if not blob_hash in self.blob_hashes_to_delete:
self.blob_hashes_to_delete[blob_hash] = False
def immediate_announce_all_blobs(self): def immediate_announce_all_blobs(self):
d = self._get_all_verified_blob_hashes() d = self._get_all_verified_blob_hashes()
d.addCallback(self._immediate_announce) d.addCallback(self._immediate_announce)
@ -173,58 +163,17 @@ class DiskBlobManager(BlobManager):
d = self._add_blob_to_upload_history(blob_hash, host, rate) d = self._add_blob_to_upload_history(blob_hash, host, rate)
return d return d
def _manage(self): @defer.inlineCallbacks
from twisted.internet import reactor def delete_blobs(self, blob_hashes):
bh_to_delete_from_db = []
d = self._delete_blobs_marked_for_deletion() for blob_hash in blob_hashes:
try:
def set_next_manage_call(): blob = yield self.get_blob(blob_hash)
self._next_manage_call = reactor.callLater(1, self._manage) yield blob.delete()
bh_to_delete_from_db.append(blob_hash)
d.addCallback(lambda _: set_next_manage_call()) except Exception as e:
log.warning("Failed to delete blob file. Reason: %s", e)
def _delete_blobs_marked_for_deletion(self): yield self._delete_blobs_from_db(bh_to_delete_from_db)
def remove_from_list(b_h):
del self.blob_hashes_to_delete[b_h]
return b_h
def set_not_deleting(err, b_h):
log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage())
self.blob_hashes_to_delete[b_h] = False
return err
def delete_from_db(result):
b_hs = [r[1] for r in result if r[0] is True]
if b_hs:
d = self._delete_blobs_from_db(b_hs)
else:
d = defer.succeed(True)
def log_error(err):
log.warning(
"Failed to delete completed blobs from the db: %s", err.getErrorMessage())
d.addErrback(log_error)
return d
def delete(blob, b_h):
d = blob.delete()
d.addCallbacks(lambda _: remove_from_list(b_h), set_not_deleting, errbackArgs=(b_h,))
return d
ds = []
for blob_hash, being_deleted in self.blob_hashes_to_delete.items():
if being_deleted is False:
self.blob_hashes_to_delete[blob_hash] = True
d = self.get_blob(blob_hash)
d.addCallbacks(
delete, set_not_deleting,
callbackArgs=(blob_hash,), errbackArgs=(blob_hash,))
ds.append(d)
dl = defer.DeferredList(ds, consumeErrors=True)
dl.addCallback(delete_from_db)
return defer.DeferredList(ds)
######### database calls ######### ######### database calls #########
@ -308,8 +257,13 @@ class DiskBlobManager(BlobManager):
return self.db_conn.runInteraction(delete_blobs) return self.db_conn.runInteraction(delete_blobs)
@rerun_if_locked @rerun_if_locked
def _get_all_verified_blob_hashes(self): def _get_all_blob_hashes(self):
d = self.db_conn.runQuery("select blob_hash from blobs") d = self.db_conn.runQuery("select blob_hash from blobs")
return d
@rerun_if_locked
def _get_all_verified_blob_hashes(self):
d = self._get_all_blob_hashes()
def get_verified_blobs(blobs): def get_verified_blobs(blobs):
verified_blobs = [] verified_blobs = []

View file

@ -746,8 +746,9 @@ class TestTransfer(TestCase):
yield download_file(sd_hash) yield download_file(sd_hash)
yield check_md5_sum() yield check_md5_sum()
yield download_file(sd_hash) yield download_file(sd_hash)
yield delete_lbry_file()
yield check_lbry_file() yield check_lbry_file()
yield delete_lbry_file()
def stop(arg): def stop(arg):
if isinstance(arg, Failure): if isinstance(arg, Failure):

View file

@ -0,0 +1,112 @@
import tempfile
import shutil
import mock
import os
import random
import string
from tests.util import random_lbry_hash
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
from lbrynet.core.Peer import Peer
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from twisted.trial import unittest
from twisted.internet import defer
class BlobManagerTest(unittest.TestCase):
def setUp(self):
self.blob_dir = tempfile.mkdtemp()
self.db_dir = tempfile.mkdtemp()
hash_announcer = DummyHashAnnouncer()
self.bm = DiskBlobManager(hash_announcer, self.blob_dir, self.db_dir)
self.peer = Peer('somehost',22)
def tearDown(self):
self.bm.stop()
shutil.rmtree(self.blob_dir)
shutil.rmtree(self.db_dir)
@defer.inlineCallbacks
def _create_and_add_blob(self):
# create and add blob to blob manager
data_len = random.randint(1,1000)
data = ''.join(random.choice(string.lowercase) for data_len in range(data_len))
hashobj = get_lbry_hash_obj()
hashobj.update(data)
out=hashobj.hexdigest()
blob_hash=out
# create new blob
yield self.bm.setup()
blob = yield self.bm.get_blob(blob_hash,len(data))
finished_d, write, cancel =yield blob.open_for_writing(self.peer)
yield write(data)
yield self.bm.blob_completed(blob)
yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data))
# check to see if blob is there
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hash)))
blobs = yield self.bm.get_all_verified_blobs()
self.assertTrue(blob_hash in blobs)
defer.returnValue(blob_hash)
@defer.inlineCallbacks
def test_create_blob(self):
blob_hashes = []
# create a bunch of blobs
for i in range(0,10):
blob_hash = yield self._create_and_add_blob()
blob_hashes.append(blob_hash)
blobs = yield self.bm.get_all_verified_blobs()
self.assertEqual(10,len(blobs))
@defer.inlineCallbacks
def test_delete_blob(self):
# create blob
blob_hash = yield self._create_and_add_blob()
blobs = yield self.bm.get_all_verified_blobs()
self.assertEqual(len(blobs),1)
# delete blob
yield self.bm.delete_blobs([blob_hash])
self.assertFalse(os.path.isfile(os.path.join(self.blob_dir,blob_hash)))
blobs = yield self.bm.get_all_verified_blobs()
self.assertEqual(len(blobs),0)
blobs = yield self.bm._get_all_blob_hashes()
self.assertEqual(len(blobs),0)
# delete blob that does not exist, nothing will
# happen
blob_hash= random_lbry_hash()
out = yield self.bm.delete_blobs([blob_hash])
@defer.inlineCallbacks
def test_delete_open_blob(self):
# Test that a blob that is opened for writing will not be deleted
# create blobs
blob_hashes =[]
for i in range(0,10):
blob_hash = yield self._create_and_add_blob()
blob_hashes.append(blob_hash)
blobs = yield self.bm.get_all_verified_blobs()
self.assertEqual(len(blobs),10)
# open the last blob
blob = yield self.bm.get_blob(blob_hashes[-1])
yield blob.open_for_writing(self.peer)
# delete the last blob and check if it still exists
out = yield self.bm.delete_blobs([blob_hash])
blobs = yield self.bm.get_all_verified_blobs()
self.assertEqual(len(blobs),10)
self.assertTrue(blob_hashes[-1] in blobs)
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1])))