fix bug caused by downloading file twice and deleting one

This commit is contained in:
Jimmy Kiselak 2016-01-16 01:16:37 -05:00
parent e906564f70
commit 2ddeca2976
9 changed files with 240 additions and 144 deletions

View file

@ -118,6 +118,8 @@ class FullStreamProgressManager(StreamProgressManager):
def finished_outputting_blob():
self.last_blob_outputted += 1
def check_if_finished():
final_blob_num = self.download_manager.final_blob_num()
if final_blob_num is not None and final_blob_num == self.last_blob_outputted:
self._finished_outputting()
@ -134,9 +136,13 @@ class FullStreamProgressManager(StreamProgressManager):
d = self.download_manager.handle_blob(self.last_blob_outputted + 1)
d.addCallback(lambda _: finished_outputting_blob())
d.addCallback(lambda _: self._finished_with_blob(current_blob_num))
d.addCallback(lambda _: check_if_finished())
def log_error(err):
log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage())
if self.outputting_d is not None and not self.outputting_d.called:
self.outputting_d.callback(True)
self.outputting_d = None
d.addErrback(log_error)
else:

View file

@ -1,3 +1,4 @@
import logging
from zope.interface import implements
from lbrynet.interfaces import IStreamDownloader
from lbrynet.core.client.BlobRequester import BlobRequester
@ -9,6 +10,9 @@ from twisted.internet import defer
from twisted.python.failure import Failure
log = logging.getLogger(__name__)
class StartFailedError(Exception):
pass
@ -79,10 +83,6 @@ class CryptStreamDownloader(object):
def start(self):
def set_finished_deferred():
self.finished_deferred = defer.Deferred()
return self.finished_deferred
if self.starting is True:
raise CurrentlyStartingError()
if self.stopping is True:
@ -92,8 +92,9 @@ class CryptStreamDownloader(object):
assert self.download_manager is None
self.starting = True
self.completed = False
self.finished_deferred = defer.Deferred()
d = self._start()
d.addCallback(lambda _: set_finished_deferred())
d.addCallback(lambda _: self.finished_deferred)
return d
def stop(self, err=None):
@ -112,8 +113,8 @@ class CryptStreamDownloader(object):
assert self.download_manager is not None
self.stopping = True
d = self.download_manager.stop_downloading()
self._fire_completed_deferred(err)
d.addCallback(check_if_stop_succeeded)
d.addCallback(lambda _: self._fire_completed_deferred(err))
return d
def _start_failed(self):
@ -203,6 +204,8 @@ class CryptStreamDownloader(object):
d.errback(err)
else:
d.callback(self._get_finished_deferred_callback_value())
else:
log.debug("Not firing the completed deferred because d is None")
def _get_finished_deferred_callback_value(self):
return None

View file

@ -46,6 +46,31 @@ class LBRYFileDownloader(CryptStreamDownloader):
else:
return defer.succeed(True)
def delete_data(self):
d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
def get_blob_hashes(blob_infos):
return [b[0] for b in blob_infos if b[0] is not None]
d1.addCallback(get_blob_hashes)
d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
def combine_blob_hashes(results):
blob_hashes = []
for success, result in results:
if success is True:
blob_hashes.extend(result)
return blob_hashes
def delete_blobs(blob_hashes):
self.blob_manager.delete_blobs(blob_hashes)
return True
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
dl.addCallback(combine_blob_hashes)
dl.addCallback(delete_blobs)
return dl
def stop(self, err=None):
d = self._close_output()
d.addCallback(lambda _: CryptStreamDownloader.stop(self, err=err))

View file

@ -36,7 +36,7 @@ class LBRYFileStreamCreator(CryptStreamCreator):
log.debug("length: %s", str(blob_info.length))
self.blob_infos.append(blob_info)
def _save_lbry_file_info(self):
def _save_stream_info(self):
stream_info_manager = self.lbry_file_manager.stream_info_manager
d = stream_info_manager.save_stream(self.stream_hash, binascii.hexlify(self.name),
binascii.hexlify(self.key),
@ -46,8 +46,6 @@ class LBRYFileStreamCreator(CryptStreamCreator):
def setup(self):
d = CryptStreamCreator.setup(self)
d.addCallback(lambda _: self.stream_hash)
return d
def _get_blobs_hashsum(self):
@ -79,10 +77,7 @@ class LBRYFileStreamCreator(CryptStreamCreator):
def _finished(self):
self._make_stream_hash()
d = self._save_lbry_file_info()
d.addCallback(lambda _: self.lbry_file_manager.change_lbry_file_status(
self.stream_hash, ManagedLBRYFileDownloader.STATUS_FINISHED
))
d = self._save_stream_info()
return d

View file

@ -18,17 +18,18 @@ class ManagedLBRYFileDownloader(LBRYFileSaver):
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
lbry_file_manager, payment_rate_manager, wallet, download_directory, upload_allowed,
file_name=None):
LBRYFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
stream_info_manager, payment_rate_manager, wallet, download_directory,
upload_allowed, file_name)
self.rowid = rowid
self.lbry_file_manager = lbry_file_manager
self.saving_status = False
def restore(self):
d = self.lbry_file_manager.get_lbry_file_status(self.stream_hash)
d = self.lbry_file_manager.get_lbry_file_status(self)
def restore_status(status):
if status == ManagedLBRYFileDownloader.STATUS_RUNNING:
@ -103,7 +104,7 @@ class ManagedLBRYFileDownloader(LBRYFileSaver):
s = ManagedLBRYFileDownloader.STATUS_STOPPED
else:
s = ManagedLBRYFileDownloader.STATUS_RUNNING
return self.lbry_file_manager.change_lbry_file_status(self.stream_hash, s)
return self.lbry_file_manager.change_lbry_file_status(self, s)
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager)

View file

@ -44,44 +44,15 @@ class LBRYFileManager(object):
d.addCallback(lambda _: self._start_lbry_files())
return d
def get_all_lbry_file_stream_hashes_and_options(self):
d = self._get_all_lbry_file_stream_hashes()
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
def get_options(stream_hashes):
ds = []
def set_lbry_file_data_payment_rate(self, lbry_file, new_rate):
return self._set_lbry_file_payment_rate(lbry_file.rowid, new_rate)
def get_options_for_stream_hash(stream_hash):
d = self.get_lbry_file_options(stream_hash)
d.addCallback(lambda options: (stream_hash, options))
return d
for stream_hash in stream_hashes:
ds.append(get_options_for_stream_hash(stream_hash))
dl = defer.DeferredList(ds)
dl.addCallback(lambda results: [r[1] for r in results if r[0]])
return dl
d.addCallback(get_options)
return d
def save_lbry_file(self, stream_hash, data_payment_rate):
return self._save_lbry_file(stream_hash, data_payment_rate)
def get_lbry_file_status(self, stream_hash):
return self._get_lbry_file_status(stream_hash)
def get_lbry_file_options(self, stream_hash):
return self._get_lbry_file_options(stream_hash)
def delete_lbry_file_options(self, stream_hash):
return self._delete_lbry_file_options(stream_hash)
def set_lbry_file_data_payment_rate(self, stream_hash, new_rate):
return self._set_lbry_file_payment_rate(stream_hash, new_rate)
def change_lbry_file_status(self, stream_hash, status):
log.debug("Changing status of %s to %s", stream_hash, status)
return self._change_file_status(stream_hash, status)
def change_lbry_file_status(self, lbry_file, status):
log.debug("Changing status of %s to %s", lbry_file.stream_hash, status)
return self._change_file_status(lbry_file.rowid, status)
def get_lbry_file_status_reports(self):
ds = []
@ -103,29 +74,32 @@ class LBRYFileManager(object):
def _start_lbry_files(self):
def set_options_and_restore(stream_hash, options):
def set_options_and_restore(rowid, stream_hash, options):
payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
d = self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0])
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
d.addCallback(lambda downloader: downloader.restore())
return d
def log_error(err):
log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage())
def start_lbry_files(stream_hashes_and_options):
for stream_hash, options in stream_hashes_and_options:
d = set_options_and_restore(stream_hash, options)
def start_lbry_files(lbry_files_and_options):
for rowid, stream_hash, options in lbry_files_and_options:
d = set_options_and_restore(rowid, stream_hash, options)
d.addErrback(log_error)
return True
d = self.get_all_lbry_file_stream_hashes_and_options()
d = self._get_all_lbry_files()
d.addCallback(start_lbry_files)
return d
def start_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
payment_rate_manager.min_blob_data_payment_rate = blob_data_rate
lbry_file_downloader = ManagedLBRYFileDownloader(stream_hash, self.session.peer_finder,
self.session.rate_limiter, self.session.blob_manager,
lbry_file_downloader = ManagedLBRYFileDownloader(rowid, stream_hash,
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager, self,
payment_rate_manager, self.session.wallet,
self.download_directory,
@ -137,17 +111,17 @@ class LBRYFileManager(object):
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
d = self._save_lbry_file(stream_hash, blob_data_rate)
d.addCallback(lambda _: self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate, upload_allowed))
d.addCallback(lambda rowid: self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate, upload_allowed))
return d
def delete_lbry_file(self, stream_hash):
def delete_lbry_file(self, lbry_file):
for l in self.lbry_files:
if l.stream_hash == stream_hash:
if l == lbry_file:
lbry_file = l
break
else:
return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " +
stream_hash)))
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def wait_for_finished(count=2):
if count <= 0 or lbry_file.saving_status is False:
@ -166,23 +140,16 @@ class LBRYFileManager(object):
self.lbry_files.remove(lbry_file)
d.addCallback(lambda _: remove_from_list())
d.addCallback(lambda _: self.delete_lbry_file_options(stream_hash))
d.addCallback(lambda _: self._delete_lbry_file_options(lbry_file.rowid))
return d
def toggle_lbry_file_running(self, stream_hash):
def toggle_lbry_file_running(self, lbry_file):
"""Toggle whether a stream reader is currently running"""
for l in self.lbry_files:
if l.stream_hash == stream_hash:
if l == lbry_file:
return l.toggle_running()
else:
return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " +
stream_hash)))
def get_stream_hash_from_name(self, lbry_file_name):
for l in self.lbry_files:
if l.file_name == lbry_file_name:
return l.stream_hash
return None
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def stop(self):
ds = []
@ -209,6 +176,9 @@ class LBRYFileManager(object):
dl.addCallback(lambda _: close_db())
return dl
def get_count_for_stream_hash(self, stream_hash):
return self._get_count_for_stream_hash(stream_hash)
######### database calls #########
def _open_db(self):
@ -227,41 +197,41 @@ class LBRYFileManager(object):
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
return self.sql_db.runQuery("insert into lbry_file_options values (?, ?, ?)",
(data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED,
stream_hash))
def do_save(db_transaction):
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)",
(data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED,
stream_hash))
return db_transaction.lastrowid
return self.sql_db.runInteraction(do_save)
@rerun_if_locked
def _get_lbry_file_options(self, stream_hash):
d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda result: result[0] if len(result) else (None, ))
def _delete_lbry_file_options(self, rowid):
return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?",
(rowid,))
@rerun_if_locked
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
@rerun_if_locked
def _get_all_lbry_files(self):
d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options")
return d
@rerun_if_locked
def _delete_lbry_file_options(self, stream_hash):
return self.sql_db.runQuery("delete from lbry_file_options where stream_hash = ?",
(stream_hash,))
def _change_file_status(self, rowid, new_status):
return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
(new_status, rowid))
@rerun_if_locked
def _set_lbry_file_payment_rate(self, stream_hash, new_rate):
return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where stream_hash = ?",
(new_rate, stream_hash))
@rerun_if_locked
def _get_all_lbry_file_stream_hashes(self):
d = self.sql_db.runQuery("select stream_hash from lbry_file_options")
d.addCallback(lambda results: [r[0] for r in results])
return d
@rerun_if_locked
def _change_file_status(self, stream_hash, new_status):
return self.sql_db.runQuery("update lbry_file_options set status = ? where stream_hash = ?",
(new_status, stream_hash))
@rerun_if_locked
def _get_lbry_file_status(self, stream_hash):
d = self.sql_db.runQuery("select status from lbry_file_options where stream_hash = ?",
(stream_hash,))
def _get_lbry_file_status(self, rowid):
d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,))
d.addCallback(lambda r: r[0][0] if len(r) else ManagedLBRYFileDownloader.STATUS_STOPPED)
return d
return d
@rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash):
return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
(stream_hash,))

View file

@ -1067,11 +1067,11 @@ class DeleteLBRYFile(CommandHandler):
self.finished_deferred.callback(None)
def _delete_lbry_file(self):
d = self.lbry_file_manager.delete_lbry_file(self.lbry_file.stream_hash)
d = self.lbry_file_manager.delete_lbry_file(self.lbry_file)
def finish_deletion():
if self.delete_data is True:
d = self._delete_data()
d = self.lbry_file.delete_data()
else:
d = defer.succeed(True)
d.addCallback(lambda _: self._delete_stream_data())
@ -1080,33 +1080,12 @@ class DeleteLBRYFile(CommandHandler):
d.addCallback(lambda _: finish_deletion())
return d
def _delete_data(self):
d1 = self.stream_info_manager.get_blobs_for_stream(self.lbry_file.stream_hash)
def get_blob_hashes(blob_infos):
return [b[0] for b in blob_infos if b[0] is not None]
d1.addCallback(get_blob_hashes)
d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.lbry_file.stream_hash)
def combine_blob_hashes(results):
blob_hashes = []
for success, result in results:
if success is True:
blob_hashes.extend(result)
return blob_hashes
def delete_blobs(blob_hashes):
self.blob_manager.delete_blobs(blob_hashes)
return True
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
dl.addCallback(combine_blob_hashes)
dl.addCallback(delete_blobs)
return dl
def _delete_stream_data(self):
return self.stream_info_manager.delete_stream(self.lbry_file.stream_hash)
s_h = self.lbry_file.stream_hash
d = self.lbry_file_manager.get_count_for_stream_hash(s_h)
# TODO: could possibly be a timing issue here
d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
return d
class DeleteLBRYFileFactory(LBRYFileChooserFactory):
@ -1138,7 +1117,7 @@ class ToggleLBRYFileRunning(CommandHandler):
self.lbry_file_manager = lbry_file_manager
def start(self):
d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file.stream_hash)
d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file)
d.addErrback(self._handle_download_error)
self.finished_deferred.callback(None)
@ -1177,11 +1156,11 @@ class CreateLBRYFile(CommandHandler):
def add_to_lbry_files(self, stream_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(self.set_status, stream_hash)
d.addCallback(self.set_status)
return d
def set_status(self, lbry_file_downloader, stream_hash):
d = self.lbry_file_manager.change_lbry_file_status(stream_hash,
def set_status(self, lbry_file_downloader):
d = self.lbry_file_manager.change_lbry_file_status(lbry_file_downloader,
ManagedLBRYFileDownloader.STATUS_FINISHED)
d.addCallback(lambda _: lbry_file_downloader.restore())
return d
@ -1407,7 +1386,7 @@ class ModifyLBRYFileDataPaymentRate(ModifyPaymentRate):
def _set_rate(self, rate):
self.payment_rate_manager.min_blob_data_payment_rate = rate
return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file.stream_hash, rate)
return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file, rate)
def _get_current_status(self):
status = "The LBRY file's current data payment rate is "
@ -1821,9 +1800,9 @@ class Publish(CommandHandler):
v_string += "Is this correct? (y/n): "
return v_string
def set_status(self, lbry_file_downloader, stream_hash):
def set_status(self, lbry_file_downloader):
self.lbry_file = lbry_file_downloader
d = self.lbry_file_manager.change_lbry_file_status(stream_hash,
d = self.lbry_file_manager.change_lbry_file_status(self.lbry_file,
ManagedLBRYFileDownloader.STATUS_FINISHED)
d.addCallback(lambda _: lbry_file_downloader.restore())
return d
@ -1831,7 +1810,7 @@ class Publish(CommandHandler):
def add_to_lbry_files(self, stream_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(self.set_status, stream_hash)
d.addCallback(self.set_status)
return d
def _create_sd_blob(self):

View file

@ -125,7 +125,7 @@ class LBRYConsole():
# self.session.wallet, self.sd_identifier, self.autofetcher_conf)
def _show_start_error(self, error):
print error.getErrorMessage()
print error.getTraceback()
log.error("An error occurred during start up: %s", error.getTraceback())
return error

View file

@ -911,6 +911,121 @@ class TestTransfer(TestCase):
return d
def test_double_download(self):
sd_hash_queue = Queue()
kill_event = Event()
dead_event = Event()
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event))
uploader.start()
self.server_processes.append(uploader)
logging.debug("Testing double download")
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
downloaders = []
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, use_upnp=False,
rate_limiter=rate_limiter, wallet=wallet)
self.stream_info_manager = DBLBRYFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier)
def make_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm)
def append_downloader(downloader):
downloaders.append(downloader)
return downloader
def download_file(sd_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
d = download_sd_blob(self.session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
d.addCallback(append_downloader)
d.addCallback(lambda downloader: downloader.start())
return d
def check_md5_sum():
f = open('test_file')
hashsum = MD5.new()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
def delete_lbry_file():
logging.debug("deleting the file...")
d = self.lbry_file_manager.delete_lbry_file(downloaders[0])
d.addCallback(lambda _: self.lbry_file_manager.get_count_for_stream_hash(downloaders[0].stream_hash))
d.addCallback(lambda c: self.stream_info_manager.delete_stream(downloaders[1].stream_hash) if c == 0 else True)
return d
def check_lbry_file():
d = downloaders[1].status()
d.addCallback(lambda _: downloaders[1].status())
def check_status_report(status_report):
self.assertEqual(status_report.num_known, status_report.num_completed)
self.assertEqual(status_report.num_known, 3)
d.addCallback(check_status_report)
return d
def start_transfer(sd_hash):
logging.debug("Starting the transfer")
d = self.session.setup()
d.addCallback(lambda _: self.stream_info_manager.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: download_file(sd_hash))
d.addCallback(lambda _: check_md5_sum())
d.addCallback(lambda _: download_file(sd_hash))
d.addCallback(lambda _: delete_lbry_file())
d.addCallback(lambda _: check_lbry_file())
return d
def stop(arg):
if isinstance(arg, Failure):
logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback())
else:
logging.debug("Client is stopping normally.")
kill_event.set()
logging.debug("Set the kill event")
d = self.wait_for_dead_event(dead_event)
def print_shutting_down():
logging.info("Client is shutting down")
d.addCallback(lambda _: print_shutting_down())
d.addCallback(lambda _: arg)
return d
d = self.wait_for_hash_from_queue(sd_hash_queue)
d.addCallback(start_transfer)
d.addBoth(stop)
return d
class TestStreamify(TestCase):
@ -932,6 +1047,8 @@ class TestStreamify(TestCase):
def delete_test_env():
shutil.rmtree('client')
if os.path.exists("test_file"):
os.remove("test_file")
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
return d