Merge remote-tracking branch 'lbryio/master'
This commit is contained in:
commit
7ef9a0f0bf
11 changed files with 312 additions and 156 deletions
|
@ -402,7 +402,9 @@ class LBRYcrdWallet(object):
|
|||
|
||||
def _stop_daemon(self):
|
||||
if self.lbrycrdd is not None and self.started_lbrycrdd is True:
|
||||
alert.info("Stopping lbrycrdd...")
|
||||
d = threads.deferToThread(self._rpc_stop)
|
||||
d.addCallback(lambda _: alert.info("Stopped lbrycrdd."))
|
||||
return d
|
||||
return defer.succeed(True)
|
||||
|
||||
|
|
|
@ -118,6 +118,8 @@ class FullStreamProgressManager(StreamProgressManager):
|
|||
|
||||
def finished_outputting_blob():
|
||||
self.last_blob_outputted += 1
|
||||
|
||||
def check_if_finished():
|
||||
final_blob_num = self.download_manager.final_blob_num()
|
||||
if final_blob_num is not None and final_blob_num == self.last_blob_outputted:
|
||||
self._finished_outputting()
|
||||
|
@ -134,9 +136,13 @@ class FullStreamProgressManager(StreamProgressManager):
|
|||
d = self.download_manager.handle_blob(self.last_blob_outputted + 1)
|
||||
d.addCallback(lambda _: finished_outputting_blob())
|
||||
d.addCallback(lambda _: self._finished_with_blob(current_blob_num))
|
||||
d.addCallback(lambda _: check_if_finished())
|
||||
|
||||
def log_error(err):
|
||||
log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage())
|
||||
if self.outputting_d is not None and not self.outputting_d.called:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
|
||||
d.addErrback(log_error)
|
||||
else:
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import logging
|
||||
from zope.interface import implements
|
||||
from lbrynet.interfaces import IStreamDownloader
|
||||
from lbrynet.core.client.BlobRequester import BlobRequester
|
||||
|
@ -9,6 +10,9 @@ from twisted.internet import defer
|
|||
from twisted.python.failure import Failure
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StartFailedError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -79,10 +83,6 @@ class CryptStreamDownloader(object):
|
|||
|
||||
def start(self):
|
||||
|
||||
def set_finished_deferred():
|
||||
self.finished_deferred = defer.Deferred()
|
||||
return self.finished_deferred
|
||||
|
||||
if self.starting is True:
|
||||
raise CurrentlyStartingError()
|
||||
if self.stopping is True:
|
||||
|
@ -92,8 +92,9 @@ class CryptStreamDownloader(object):
|
|||
assert self.download_manager is None
|
||||
self.starting = True
|
||||
self.completed = False
|
||||
self.finished_deferred = defer.Deferred()
|
||||
d = self._start()
|
||||
d.addCallback(lambda _: set_finished_deferred())
|
||||
d.addCallback(lambda _: self.finished_deferred)
|
||||
return d
|
||||
|
||||
def stop(self, err=None):
|
||||
|
@ -112,8 +113,8 @@ class CryptStreamDownloader(object):
|
|||
assert self.download_manager is not None
|
||||
self.stopping = True
|
||||
d = self.download_manager.stop_downloading()
|
||||
self._fire_completed_deferred(err)
|
||||
d.addCallback(check_if_stop_succeeded)
|
||||
d.addCallback(lambda _: self._fire_completed_deferred(err))
|
||||
return d
|
||||
|
||||
def _start_failed(self):
|
||||
|
@ -203,6 +204,8 @@ class CryptStreamDownloader(object):
|
|||
d.errback(err)
|
||||
else:
|
||||
d.callback(self._get_finished_deferred_callback_value())
|
||||
else:
|
||||
log.debug("Not firing the completed deferred because d is None")
|
||||
|
||||
def _get_finished_deferred_callback_value(self):
|
||||
return None
|
||||
|
|
|
@ -12,6 +12,11 @@ from lbrynet.lbryfile.client.LBRYFileMetadataHandler import LBRYFileMetadataHand
|
|||
import os
|
||||
from twisted.internet import defer, threads, reactor
|
||||
from twisted.python.procutils import which
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LBRYFileDownloader(CryptStreamDownloader):
|
||||
|
@ -41,6 +46,31 @@ class LBRYFileDownloader(CryptStreamDownloader):
|
|||
else:
|
||||
return defer.succeed(True)
|
||||
|
||||
def delete_data(self):
|
||||
d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||
|
||||
def get_blob_hashes(blob_infos):
|
||||
return [b[0] for b in blob_infos if b[0] is not None]
|
||||
|
||||
d1.addCallback(get_blob_hashes)
|
||||
d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
|
||||
|
||||
def combine_blob_hashes(results):
|
||||
blob_hashes = []
|
||||
for success, result in results:
|
||||
if success is True:
|
||||
blob_hashes.extend(result)
|
||||
return blob_hashes
|
||||
|
||||
def delete_blobs(blob_hashes):
|
||||
self.blob_manager.delete_blobs(blob_hashes)
|
||||
return True
|
||||
|
||||
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
|
||||
dl.addCallback(combine_blob_hashes)
|
||||
dl.addCallback(delete_blobs)
|
||||
return dl
|
||||
|
||||
def stop(self, err=None):
|
||||
d = self._close_output()
|
||||
d.addCallback(lambda _: CryptStreamDownloader.stop(self, err=err))
|
||||
|
@ -178,7 +208,13 @@ class LBRYFileSaver(LBRYFileDownloader):
|
|||
file_name + "_" + str(ext_num))):
|
||||
ext_num += 1
|
||||
file_name = file_name + "_" + str(ext_num)
|
||||
self.file_handle = open(os.path.join(self.download_directory, file_name), 'wb')
|
||||
try:
|
||||
self.file_handle = open(os.path.join(self.download_directory, file_name), 'wb')
|
||||
except IOError:
|
||||
log.error(traceback.format_exc())
|
||||
raise ValueError("Failed to open %s. Make sure you have permission to save files to that"
|
||||
" location." % str(os.path.join(self.download_directory,
|
||||
file_name)))
|
||||
return threads.deferToThread(open_file)
|
||||
|
||||
def _close_output(self):
|
||||
|
|
|
@ -36,7 +36,7 @@ class LBRYFileStreamCreator(CryptStreamCreator):
|
|||
log.debug("length: %s", str(blob_info.length))
|
||||
self.blob_infos.append(blob_info)
|
||||
|
||||
def _save_lbry_file_info(self):
|
||||
def _save_stream_info(self):
|
||||
stream_info_manager = self.lbry_file_manager.stream_info_manager
|
||||
d = stream_info_manager.save_stream(self.stream_hash, binascii.hexlify(self.name),
|
||||
binascii.hexlify(self.key),
|
||||
|
@ -46,8 +46,6 @@ class LBRYFileStreamCreator(CryptStreamCreator):
|
|||
|
||||
def setup(self):
|
||||
d = CryptStreamCreator.setup(self)
|
||||
d.addCallback(lambda _: self.stream_hash)
|
||||
|
||||
return d
|
||||
|
||||
def _get_blobs_hashsum(self):
|
||||
|
@ -79,10 +77,7 @@ class LBRYFileStreamCreator(CryptStreamCreator):
|
|||
|
||||
def _finished(self):
|
||||
self._make_stream_hash()
|
||||
d = self._save_lbry_file_info()
|
||||
d.addCallback(lambda _: self.lbry_file_manager.change_lbry_file_status(
|
||||
self.stream_hash, ManagedLBRYFileDownloader.STATUS_FINISHED
|
||||
))
|
||||
d = self._save_stream_info()
|
||||
return d
|
||||
|
||||
|
||||
|
|
|
@ -18,17 +18,18 @@ class ManagedLBRYFileDownloader(LBRYFileSaver):
|
|||
STATUS_STOPPED = "stopped"
|
||||
STATUS_FINISHED = "finished"
|
||||
|
||||
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
lbry_file_manager, payment_rate_manager, wallet, download_directory, upload_allowed,
|
||||
file_name=None):
|
||||
LBRYFileSaver.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
|
||||
stream_info_manager, payment_rate_manager, wallet, download_directory,
|
||||
upload_allowed, file_name)
|
||||
self.rowid = rowid
|
||||
self.lbry_file_manager = lbry_file_manager
|
||||
self.saving_status = False
|
||||
|
||||
def restore(self):
|
||||
d = self.lbry_file_manager.get_lbry_file_status(self.stream_hash)
|
||||
d = self.lbry_file_manager.get_lbry_file_status(self)
|
||||
|
||||
def restore_status(status):
|
||||
if status == ManagedLBRYFileDownloader.STATUS_RUNNING:
|
||||
|
@ -103,7 +104,7 @@ class ManagedLBRYFileDownloader(LBRYFileSaver):
|
|||
s = ManagedLBRYFileDownloader.STATUS_STOPPED
|
||||
else:
|
||||
s = ManagedLBRYFileDownloader.STATUS_RUNNING
|
||||
return self.lbry_file_manager.change_lbry_file_status(self.stream_hash, s)
|
||||
return self.lbry_file_manager.change_lbry_file_status(self, s)
|
||||
|
||||
def _get_progress_manager(self, download_manager):
|
||||
return FullStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager)
|
||||
|
|
|
@ -36,6 +36,7 @@ class LBRYFileManager(object):
|
|||
self.download_directory = os.path.join(os.path.expanduser("~"), 'Downloads')
|
||||
else:
|
||||
self.download_directory = os.getcwd()
|
||||
log.debug("Download directory for LBRYFileManager: %s", str(self.download_directory))
|
||||
|
||||
def setup(self):
|
||||
d = self._open_db()
|
||||
|
@ -43,44 +44,15 @@ class LBRYFileManager(object):
|
|||
d.addCallback(lambda _: self._start_lbry_files())
|
||||
return d
|
||||
|
||||
def get_all_lbry_file_stream_hashes_and_options(self):
|
||||
d = self._get_all_lbry_file_stream_hashes()
|
||||
def get_lbry_file_status(self, lbry_file):
|
||||
return self._get_lbry_file_status(lbry_file.rowid)
|
||||
|
||||
def get_options(stream_hashes):
|
||||
ds = []
|
||||
def set_lbry_file_data_payment_rate(self, lbry_file, new_rate):
|
||||
return self._set_lbry_file_payment_rate(lbry_file.rowid, new_rate)
|
||||
|
||||
def get_options_for_stream_hash(stream_hash):
|
||||
d = self.get_lbry_file_options(stream_hash)
|
||||
d.addCallback(lambda options: (stream_hash, options))
|
||||
return d
|
||||
|
||||
for stream_hash in stream_hashes:
|
||||
ds.append(get_options_for_stream_hash(stream_hash))
|
||||
dl = defer.DeferredList(ds)
|
||||
dl.addCallback(lambda results: [r[1] for r in results if r[0]])
|
||||
return dl
|
||||
|
||||
d.addCallback(get_options)
|
||||
return d
|
||||
|
||||
def save_lbry_file(self, stream_hash, data_payment_rate):
|
||||
return self._save_lbry_file(stream_hash, data_payment_rate)
|
||||
|
||||
def get_lbry_file_status(self, stream_hash):
|
||||
return self._get_lbry_file_status(stream_hash)
|
||||
|
||||
def get_lbry_file_options(self, stream_hash):
|
||||
return self._get_lbry_file_options(stream_hash)
|
||||
|
||||
def delete_lbry_file_options(self, stream_hash):
|
||||
return self._delete_lbry_file_options(stream_hash)
|
||||
|
||||
def set_lbry_file_data_payment_rate(self, stream_hash, new_rate):
|
||||
return self._set_lbry_file_payment_rate(stream_hash, new_rate)
|
||||
|
||||
def change_lbry_file_status(self, stream_hash, status):
|
||||
log.debug("Changing status of %s to %s", stream_hash, status)
|
||||
return self._change_file_status(stream_hash, status)
|
||||
def change_lbry_file_status(self, lbry_file, status):
|
||||
log.debug("Changing status of %s to %s", lbry_file.stream_hash, status)
|
||||
return self._change_file_status(lbry_file.rowid, status)
|
||||
|
||||
def get_lbry_file_status_reports(self):
|
||||
ds = []
|
||||
|
@ -102,29 +74,32 @@ class LBRYFileManager(object):
|
|||
|
||||
def _start_lbry_files(self):
|
||||
|
||||
def set_options_and_restore(stream_hash, options):
|
||||
def set_options_and_restore(rowid, stream_hash, options):
|
||||
payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
|
||||
d = self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate=options[0])
|
||||
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
|
||||
blob_data_rate=options)
|
||||
d.addCallback(lambda downloader: downloader.restore())
|
||||
return d
|
||||
|
||||
def log_error(err):
|
||||
log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage())
|
||||
|
||||
def start_lbry_files(stream_hashes_and_options):
|
||||
for stream_hash, options in stream_hashes_and_options:
|
||||
d = set_options_and_restore(stream_hash, options)
|
||||
def start_lbry_files(lbry_files_and_options):
|
||||
for rowid, stream_hash, options in lbry_files_and_options:
|
||||
d = set_options_and_restore(rowid, stream_hash, options)
|
||||
d.addErrback(log_error)
|
||||
return True
|
||||
|
||||
d = self.get_all_lbry_file_stream_hashes_and_options()
|
||||
d = self._get_all_lbry_files()
|
||||
d.addCallback(start_lbry_files)
|
||||
return d
|
||||
|
||||
def start_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
|
||||
def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
|
||||
payment_rate_manager.min_blob_data_payment_rate = blob_data_rate
|
||||
lbry_file_downloader = ManagedLBRYFileDownloader(stream_hash, self.session.peer_finder,
|
||||
self.session.rate_limiter, self.session.blob_manager,
|
||||
lbry_file_downloader = ManagedLBRYFileDownloader(rowid, stream_hash,
|
||||
self.session.peer_finder,
|
||||
self.session.rate_limiter,
|
||||
self.session.blob_manager,
|
||||
self.stream_info_manager, self,
|
||||
payment_rate_manager, self.session.wallet,
|
||||
self.download_directory,
|
||||
|
@ -136,17 +111,17 @@ class LBRYFileManager(object):
|
|||
|
||||
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True):
|
||||
d = self._save_lbry_file(stream_hash, blob_data_rate)
|
||||
d.addCallback(lambda _: self.start_lbry_file(stream_hash, payment_rate_manager, blob_data_rate, upload_allowed))
|
||||
d.addCallback(lambda rowid: self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
|
||||
blob_data_rate, upload_allowed))
|
||||
return d
|
||||
|
||||
def delete_lbry_file(self, stream_hash):
|
||||
def delete_lbry_file(self, lbry_file):
|
||||
for l in self.lbry_files:
|
||||
if l.stream_hash == stream_hash:
|
||||
if l == lbry_file:
|
||||
lbry_file = l
|
||||
break
|
||||
else:
|
||||
return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " +
|
||||
stream_hash)))
|
||||
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
|
||||
|
||||
def wait_for_finished(count=2):
|
||||
if count <= 0 or lbry_file.saving_status is False:
|
||||
|
@ -165,23 +140,16 @@ class LBRYFileManager(object):
|
|||
self.lbry_files.remove(lbry_file)
|
||||
|
||||
d.addCallback(lambda _: remove_from_list())
|
||||
d.addCallback(lambda _: self.delete_lbry_file_options(stream_hash))
|
||||
d.addCallback(lambda _: self._delete_lbry_file_options(lbry_file.rowid))
|
||||
return d
|
||||
|
||||
def toggle_lbry_file_running(self, stream_hash):
|
||||
def toggle_lbry_file_running(self, lbry_file):
|
||||
"""Toggle whether a stream reader is currently running"""
|
||||
for l in self.lbry_files:
|
||||
if l.stream_hash == stream_hash:
|
||||
if l == lbry_file:
|
||||
return l.toggle_running()
|
||||
else:
|
||||
return defer.fail(Failure(ValueError("Could not find an LBRY file with the given stream hash, " +
|
||||
stream_hash)))
|
||||
|
||||
def get_stream_hash_from_name(self, lbry_file_name):
|
||||
for l in self.lbry_files:
|
||||
if l.file_name == lbry_file_name:
|
||||
return l.stream_hash
|
||||
return None
|
||||
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
|
||||
|
||||
def stop(self):
|
||||
ds = []
|
||||
|
@ -208,6 +176,9 @@ class LBRYFileManager(object):
|
|||
dl.addCallback(lambda _: close_db())
|
||||
return dl
|
||||
|
||||
def get_count_for_stream_hash(self, stream_hash):
|
||||
return self._get_count_for_stream_hash(stream_hash)
|
||||
|
||||
######### database calls #########
|
||||
|
||||
def _open_db(self):
|
||||
|
@ -226,41 +197,41 @@ class LBRYFileManager(object):
|
|||
|
||||
@rerun_if_locked
|
||||
def _save_lbry_file(self, stream_hash, data_payment_rate):
|
||||
return self.sql_db.runQuery("insert into lbry_file_options values (?, ?, ?)",
|
||||
(data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED,
|
||||
stream_hash))
|
||||
def do_save(db_transaction):
|
||||
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)",
|
||||
(data_payment_rate, ManagedLBRYFileDownloader.STATUS_STOPPED,
|
||||
stream_hash))
|
||||
return db_transaction.lastrowid
|
||||
return self.sql_db.runInteraction(do_save)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_lbry_file_options(self, stream_hash):
|
||||
d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda result: result[0] if len(result) else (None, ))
|
||||
def _delete_lbry_file_options(self, rowid):
|
||||
return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?",
|
||||
(rowid,))
|
||||
|
||||
@rerun_if_locked
|
||||
def _set_lbry_file_payment_rate(self, rowid, new_rate):
|
||||
return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where rowid = ?",
|
||||
(new_rate, rowid))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_lbry_files(self):
|
||||
d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options")
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _delete_lbry_file_options(self, stream_hash):
|
||||
return self.sql_db.runQuery("delete from lbry_file_options where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
def _change_file_status(self, rowid, new_status):
|
||||
return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
|
||||
(new_status, rowid))
|
||||
|
||||
@rerun_if_locked
|
||||
def _set_lbry_file_payment_rate(self, stream_hash, new_rate):
|
||||
return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where stream_hash = ?",
|
||||
(new_rate, stream_hash))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_lbry_file_stream_hashes(self):
|
||||
d = self.sql_db.runQuery("select stream_hash from lbry_file_options")
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _change_file_status(self, stream_hash, new_status):
|
||||
return self.sql_db.runQuery("update lbry_file_options set status = ? where stream_hash = ?",
|
||||
(new_status, stream_hash))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_lbry_file_status(self, stream_hash):
|
||||
d = self.sql_db.runQuery("select status from lbry_file_options where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
def _get_lbry_file_status(self, rowid):
|
||||
d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?",
|
||||
(rowid,))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else ManagedLBRYFileDownloader.STATUS_STOPPED)
|
||||
return d
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_count_for_stream_hash(self, stream_hash):
|
||||
return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
|
||||
(stream_hash,))
|
|
@ -37,7 +37,8 @@ class ConsoleControl(basic.LineReceiver):
|
|||
"your balance is showing 0 when you know it shouldn't be, it\n"
|
||||
"is likely that the culprit is the blockchain.\n\n"
|
||||
"You should have received 1000 LBC the first time you ran\n"
|
||||
"this program. If you did not, let us know!\n\n"
|
||||
"this program. If you did not, let us know! But first give\n"
|
||||
"them a couple of minutes to show up.\n\n"
|
||||
"Welcome to lbrynet-console!")
|
||||
self.sendLine("")
|
||||
self.sendLine("Enter a command. Try 'get wonderfullife' or 'help' to see more options.")
|
||||
|
|
|
@ -54,6 +54,14 @@ class InvalidValueError(Exception):
|
|||
# prompt_description = None
|
||||
|
||||
|
||||
def get_log_file():
|
||||
log_file = "console.log"
|
||||
logging_handlers = logging.getLogger().handlers
|
||||
if len(logging_handlers):
|
||||
log_file = logging_handlers[0].baseFilename
|
||||
return log_file
|
||||
|
||||
|
||||
class RoundedTime(object):
|
||||
SECOND = 0
|
||||
MINUTE = 1
|
||||
|
@ -554,7 +562,8 @@ class AddStream(CommandHandler):
|
|||
if command in self.factory_choices:
|
||||
self.factory = self.factory_choices[command]
|
||||
self._start_download()
|
||||
self.console.sendLine("Downloading in the background")
|
||||
self.console.sendLine("Downloading in the background. Use the command 'status'\n"
|
||||
"to check the status of the download.")
|
||||
self.finished_deferred.callback(None)
|
||||
else:
|
||||
self._show_factory_choices()
|
||||
|
@ -610,9 +619,7 @@ class AddStream(CommandHandler):
|
|||
def _handle_load_failed(self, err):
|
||||
self.loading_failed = True
|
||||
log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())
|
||||
log_file = "console.log"
|
||||
if len(log.handlers):
|
||||
log_file = log.handlers[0].baseFilename
|
||||
log_file = get_log_file()
|
||||
self.console.sendLine("An unexpected error occurred attempting to load the stream's metadata.\n"
|
||||
"See %s for further details.\n\n" % log_file)
|
||||
self.finished_deferred.callback(None)
|
||||
|
@ -639,8 +646,26 @@ class AddStream(CommandHandler):
|
|||
self._show_info_and_options()
|
||||
return self._show_factory_choices()
|
||||
|
||||
def _get_estimated_cost_string(self):
|
||||
estimated_cost_string = "unknown LBC"
|
||||
for option, option_value in zip(self.download_options, self.options_chosen):
|
||||
if option.short_description == "data payment rate":
|
||||
if option_value == None:
|
||||
rate = self.payment_rate_manager.get_effective_min_blob_data_payment_rate()
|
||||
else:
|
||||
rate = option_value
|
||||
stream_size = None
|
||||
for field, val in self.metadata.validator.info_to_show():
|
||||
if field == "stream_size":
|
||||
stream_size = int(val)
|
||||
if stream_size is not None and rate is not None:
|
||||
estimated_cost_string = str(stream_size * 1.0 / 2**20 * rate) + " LBC"
|
||||
return estimated_cost_string
|
||||
|
||||
def _show_factory_choices(self):
|
||||
prompt = "\n"
|
||||
prompt += "Estimated cost: " + self._get_estimated_cost_string()
|
||||
prompt += "\n\n"
|
||||
for factory_choice_string in self.factory_choice_strings:
|
||||
prompt += factory_choice_string[1] + '\n'
|
||||
self.console.sendLine(str(prompt))
|
||||
|
@ -649,13 +674,35 @@ class AddStream(CommandHandler):
|
|||
#self.download_options = self.metadata.options.get_downloader_options(self.metadata.validator,
|
||||
# self.payment_rate_manager)
|
||||
prompt = "Stream info:\n"
|
||||
for info_line in self._get_info_to_show():
|
||||
prompt += info_line[0] + ": " + info_line[1] + "\n"
|
||||
for field_name, value in self._get_info_to_show():
|
||||
if field_name == "stream_size":
|
||||
value = str(self._get_formatted_stream_size(int(value)))
|
||||
prompt += field_name + ": " + value + "\n"
|
||||
prompt += "\nOptions:\n"
|
||||
for option in self.download_options:
|
||||
prompt += option.long_description + ": " + str(option.default_value_description) + "\n"
|
||||
self.console.sendLine(str(prompt))
|
||||
|
||||
@staticmethod
|
||||
def _get_formatted_stream_size(stream_size):
|
||||
if isinstance(stream_size, (int, long)):
|
||||
if stream_size >= 2**40:
|
||||
units = "TB"
|
||||
factor = 2**40
|
||||
elif stream_size >= 2**30:
|
||||
units = "GB"
|
||||
factor = 2**30
|
||||
elif stream_size >= 2**20:
|
||||
units = "MB"
|
||||
factor = 2**20
|
||||
elif stream_size >= 2**10:
|
||||
units = "KB"
|
||||
factor = 2**10
|
||||
else:
|
||||
return str(stream_size) + " B"
|
||||
return "%.1f %s" % (round((stream_size * 1.0 / factor), 1), units)
|
||||
return stream_size
|
||||
|
||||
def _get_info_to_show(self):
|
||||
return self.metadata.validator.info_to_show()
|
||||
|
||||
|
@ -730,10 +777,8 @@ class AddStream(CommandHandler):
|
|||
d.addErrback(self._log_recent_blockchain_time_error_download)
|
||||
else:
|
||||
log.error("An unexpected error has caused the download to stop: %s" % err.getTraceback())
|
||||
log_file = "console.log"
|
||||
if len(log.handlers):
|
||||
log_file = log.handlers[0].baseFilename
|
||||
self.console.sendLine("An unexpected error has caused the download to stop. See %s for details." % log_file)
|
||||
log_file = get_log_file()
|
||||
self.console.sendLine("An unexpected error has caused the download to stop:\n%s\n\nSee %s for further details." % (err.getErrorMessage(), log_file))
|
||||
|
||||
def _make_downloader(self):
|
||||
return self.factory.make_downloader(self.metadata, self.options_chosen,
|
||||
|
@ -1022,11 +1067,11 @@ class DeleteLBRYFile(CommandHandler):
|
|||
self.finished_deferred.callback(None)
|
||||
|
||||
def _delete_lbry_file(self):
|
||||
d = self.lbry_file_manager.delete_lbry_file(self.lbry_file.stream_hash)
|
||||
d = self.lbry_file_manager.delete_lbry_file(self.lbry_file)
|
||||
|
||||
def finish_deletion():
|
||||
if self.delete_data is True:
|
||||
d = self._delete_data()
|
||||
d = self.lbry_file.delete_data()
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
d.addCallback(lambda _: self._delete_stream_data())
|
||||
|
@ -1035,33 +1080,12 @@ class DeleteLBRYFile(CommandHandler):
|
|||
d.addCallback(lambda _: finish_deletion())
|
||||
return d
|
||||
|
||||
def _delete_data(self):
|
||||
d1 = self.stream_info_manager.get_blobs_for_stream(self.lbry_file.stream_hash)
|
||||
|
||||
def get_blob_hashes(blob_infos):
|
||||
return [b[0] for b in blob_infos if b[0] is not None]
|
||||
|
||||
d1.addCallback(get_blob_hashes)
|
||||
d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.lbry_file.stream_hash)
|
||||
|
||||
def combine_blob_hashes(results):
|
||||
blob_hashes = []
|
||||
for success, result in results:
|
||||
if success is True:
|
||||
blob_hashes.extend(result)
|
||||
return blob_hashes
|
||||
|
||||
def delete_blobs(blob_hashes):
|
||||
self.blob_manager.delete_blobs(blob_hashes)
|
||||
return True
|
||||
|
||||
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
|
||||
dl.addCallback(combine_blob_hashes)
|
||||
dl.addCallback(delete_blobs)
|
||||
return dl
|
||||
|
||||
def _delete_stream_data(self):
|
||||
return self.stream_info_manager.delete_stream(self.lbry_file.stream_hash)
|
||||
s_h = self.lbry_file.stream_hash
|
||||
d = self.lbry_file_manager.get_count_for_stream_hash(s_h)
|
||||
# TODO: could possibly be a timing issue here
|
||||
d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
|
||||
return d
|
||||
|
||||
|
||||
class DeleteLBRYFileFactory(LBRYFileChooserFactory):
|
||||
|
@ -1093,7 +1117,7 @@ class ToggleLBRYFileRunning(CommandHandler):
|
|||
self.lbry_file_manager = lbry_file_manager
|
||||
|
||||
def start(self):
|
||||
d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file.stream_hash)
|
||||
d = self.lbry_file_manager.toggle_lbry_file_running(self.lbry_file)
|
||||
d.addErrback(self._handle_download_error)
|
||||
self.finished_deferred.callback(None)
|
||||
|
||||
|
@ -1132,11 +1156,11 @@ class CreateLBRYFile(CommandHandler):
|
|||
def add_to_lbry_files(self, stream_hash):
|
||||
prm = PaymentRateManager(self.session.base_payment_rate_manager)
|
||||
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
|
||||
d.addCallback(self.set_status, stream_hash)
|
||||
d.addCallback(self.set_status)
|
||||
return d
|
||||
|
||||
def set_status(self, lbry_file_downloader, stream_hash):
|
||||
d = self.lbry_file_manager.change_lbry_file_status(stream_hash,
|
||||
def set_status(self, lbry_file_downloader):
|
||||
d = self.lbry_file_manager.change_lbry_file_status(lbry_file_downloader,
|
||||
ManagedLBRYFileDownloader.STATUS_FINISHED)
|
||||
d.addCallback(lambda _: lbry_file_downloader.restore())
|
||||
return d
|
||||
|
@ -1362,7 +1386,7 @@ class ModifyLBRYFileDataPaymentRate(ModifyPaymentRate):
|
|||
|
||||
def _set_rate(self, rate):
|
||||
self.payment_rate_manager.min_blob_data_payment_rate = rate
|
||||
return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file.stream_hash, rate)
|
||||
return self.lbry_file_manager.set_lbry_file_data_payment_rate(self.lbry_file, rate)
|
||||
|
||||
def _get_current_status(self):
|
||||
status = "The LBRY file's current data payment rate is "
|
||||
|
@ -1776,9 +1800,9 @@ class Publish(CommandHandler):
|
|||
v_string += "Is this correct? (y/n): "
|
||||
return v_string
|
||||
|
||||
def set_status(self, lbry_file_downloader, stream_hash):
|
||||
def set_status(self, lbry_file_downloader):
|
||||
self.lbry_file = lbry_file_downloader
|
||||
d = self.lbry_file_manager.change_lbry_file_status(stream_hash,
|
||||
d = self.lbry_file_manager.change_lbry_file_status(self.lbry_file,
|
||||
ManagedLBRYFileDownloader.STATUS_FINISHED)
|
||||
d.addCallback(lambda _: lbry_file_downloader.restore())
|
||||
return d
|
||||
|
@ -1786,7 +1810,7 @@ class Publish(CommandHandler):
|
|||
def add_to_lbry_files(self, stream_hash):
|
||||
prm = PaymentRateManager(self.session.base_payment_rate_manager)
|
||||
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
|
||||
d.addCallback(self.set_status, stream_hash)
|
||||
d.addCallback(self.set_status)
|
||||
return d
|
||||
|
||||
def _create_sd_blob(self):
|
||||
|
|
|
@ -125,7 +125,7 @@ class LBRYConsole():
|
|||
# self.session.wallet, self.sd_identifier, self.autofetcher_conf)
|
||||
|
||||
def _show_start_error(self, error):
|
||||
print error.getErrorMessage()
|
||||
print error.getTraceback()
|
||||
log.error("An error occurred during start up: %s", error.getTraceback())
|
||||
return error
|
||||
|
||||
|
|
|
@ -911,6 +911,121 @@ class TestTransfer(TestCase):
|
|||
|
||||
return d
|
||||
|
||||
def test_double_download(self):
|
||||
|
||||
sd_hash_queue = Queue()
|
||||
kill_event = Event()
|
||||
dead_event = Event()
|
||||
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event))
|
||||
uploader.start()
|
||||
self.server_processes.append(uploader)
|
||||
|
||||
logging.debug("Testing double download")
|
||||
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
downloaders = []
|
||||
|
||||
db_dir = "client"
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
os.mkdir(db_dir)
|
||||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553, use_upnp=False,
|
||||
rate_limiter=rate_limiter, wallet=wallet)
|
||||
|
||||
self.stream_info_manager = DBLBRYFileMetadataManager(self.session.db_dir)
|
||||
|
||||
self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier)
|
||||
|
||||
def make_downloader(metadata, prm):
|
||||
info_validator = metadata.validator
|
||||
options = metadata.options
|
||||
factories = metadata.factories
|
||||
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
|
||||
return factories[0].make_downloader(metadata, chosen_options, prm)
|
||||
|
||||
def append_downloader(downloader):
|
||||
downloaders.append(downloader)
|
||||
return downloader
|
||||
|
||||
def download_file(sd_hash):
|
||||
prm = PaymentRateManager(self.session.base_payment_rate_manager)
|
||||
d = download_sd_blob(self.session, sd_hash, prm)
|
||||
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
|
||||
d.addCallback(make_downloader, prm)
|
||||
d.addCallback(append_downloader)
|
||||
d.addCallback(lambda downloader: downloader.start())
|
||||
return d
|
||||
|
||||
def check_md5_sum():
|
||||
f = open('test_file')
|
||||
hashsum = MD5.new()
|
||||
hashsum.update(f.read())
|
||||
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
|
||||
|
||||
def delete_lbry_file():
|
||||
logging.debug("deleting the file...")
|
||||
d = self.lbry_file_manager.delete_lbry_file(downloaders[0])
|
||||
d.addCallback(lambda _: self.lbry_file_manager.get_count_for_stream_hash(downloaders[0].stream_hash))
|
||||
d.addCallback(lambda c: self.stream_info_manager.delete_stream(downloaders[1].stream_hash) if c == 0 else True)
|
||||
return d
|
||||
|
||||
def check_lbry_file():
|
||||
d = downloaders[1].status()
|
||||
d.addCallback(lambda _: downloaders[1].status())
|
||||
|
||||
def check_status_report(status_report):
|
||||
self.assertEqual(status_report.num_known, status_report.num_completed)
|
||||
self.assertEqual(status_report.num_known, 3)
|
||||
|
||||
d.addCallback(check_status_report)
|
||||
return d
|
||||
|
||||
def start_transfer(sd_hash):
|
||||
|
||||
logging.debug("Starting the transfer")
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||
d.addCallback(lambda _: download_file(sd_hash))
|
||||
d.addCallback(lambda _: check_md5_sum())
|
||||
d.addCallback(lambda _: download_file(sd_hash))
|
||||
d.addCallback(lambda _: delete_lbry_file())
|
||||
d.addCallback(lambda _: check_lbry_file())
|
||||
|
||||
return d
|
||||
|
||||
def stop(arg):
|
||||
if isinstance(arg, Failure):
|
||||
logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback())
|
||||
else:
|
||||
logging.debug("Client is stopping normally.")
|
||||
kill_event.set()
|
||||
logging.debug("Set the kill event")
|
||||
d = self.wait_for_dead_event(dead_event)
|
||||
|
||||
def print_shutting_down():
|
||||
logging.info("Client is shutting down")
|
||||
|
||||
d.addCallback(lambda _: print_shutting_down())
|
||||
d.addCallback(lambda _: arg)
|
||||
return d
|
||||
|
||||
d = self.wait_for_hash_from_queue(sd_hash_queue)
|
||||
d.addCallback(start_transfer)
|
||||
d.addBoth(stop)
|
||||
return d
|
||||
|
||||
|
||||
class TestStreamify(TestCase):
|
||||
|
||||
|
@ -932,6 +1047,8 @@ class TestStreamify(TestCase):
|
|||
|
||||
def delete_test_env():
|
||||
shutil.rmtree('client')
|
||||
if os.path.exists("test_file"):
|
||||
os.remove("test_file")
|
||||
|
||||
d.addCallback(lambda _: threads.deferToThread(delete_test_env))
|
||||
return d
|
||||
|
|
Loading…
Reference in a new issue