forked from LBRYCommunity/lbry-sdk
add file download statuses for get_lbry_file
also clean up remaining fetcher stuff
This commit is contained in:
parent
c1d0f9cf1b
commit
461c2f9055
2 changed files with 59 additions and 14 deletions
|
@ -31,7 +31,7 @@ from lbrynet.core.Error import UnknownNameError, InsufficientFundsError
|
||||||
from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
|
from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
|
||||||
from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory
|
from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory
|
||||||
from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier
|
from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier
|
||||||
from lbrynet.lbrynet_daemon.LBRYDownloader import GetStream, FetcherDaemon
|
from lbrynet.lbrynet_daemon.LBRYDownloader import GetStream
|
||||||
from lbrynet.lbrynet_daemon.LBRYPublisher import Publisher
|
from lbrynet.lbrynet_daemon.LBRYPublisher import Publisher
|
||||||
from lbrynet.core.utils import generate_id
|
from lbrynet.core.utils import generate_id
|
||||||
from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings
|
from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings
|
||||||
|
@ -147,6 +147,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
self.run_server = True
|
self.run_server = True
|
||||||
self.session = None
|
self.session = None
|
||||||
self.waiting_on = {}
|
self.waiting_on = {}
|
||||||
|
self.streams = {}
|
||||||
self.known_dht_nodes = KNOWN_DHT_NODES
|
self.known_dht_nodes = KNOWN_DHT_NODES
|
||||||
self.platform_info = {
|
self.platform_info = {
|
||||||
"processor": platform.processor(),
|
"processor": platform.processor(),
|
||||||
|
@ -408,7 +409,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
d.addCallback(lambda _: self._setup_lbry_file_opener())
|
d.addCallback(lambda _: self._setup_lbry_file_opener())
|
||||||
d.addCallback(lambda _: self._setup_query_handlers())
|
d.addCallback(lambda _: self._setup_query_handlers())
|
||||||
d.addCallback(lambda _: self._setup_server())
|
d.addCallback(lambda _: self._setup_server())
|
||||||
d.addCallback(lambda _: self._setup_fetcher())
|
|
||||||
d.addCallback(lambda _: _log_starting_vals())
|
d.addCallback(lambda _: _log_starting_vals())
|
||||||
d.addCallback(lambda _: _announce_startup())
|
d.addCallback(lambda _: _announce_startup())
|
||||||
d.callback(None)
|
d.callback(None)
|
||||||
|
@ -891,9 +891,10 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
|
|
||||||
def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None):
|
def _download_name(self, name, timeout=DEFAULT_TIMEOUT, download_directory=None):
|
||||||
"""
|
"""
|
||||||
Add a lbry file to the file manager, start the download, and return the new lbry file
|
Add a lbry file to the file manager, start the download, and return the new lbry file.
|
||||||
if it already exists in the file manager, return the existing lbry file
|
If it already exists in the file manager, return the existing lbry file
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not download_directory:
|
if not download_directory:
|
||||||
download_directory = self.download_directory
|
download_directory = self.download_directory
|
||||||
elif not os.path.isdir(download_directory):
|
elif not os.path.isdir(download_directory):
|
||||||
|
@ -919,12 +920,12 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
return f
|
return f
|
||||||
|
|
||||||
def _get_stream(stream_info):
|
def _get_stream(stream_info):
|
||||||
stream = GetStream(self.sd_identifier, self.session, self.session.wallet,
|
self.streams[name] = GetStream(self.sd_identifier, self.session, self.session.wallet,
|
||||||
self.lbry_file_manager,
|
self.lbry_file_manager, max_key_fee=self.max_key_fee,
|
||||||
max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout,
|
data_rate=self.data_rate, timeout=timeout,
|
||||||
download_directory=download_directory)
|
download_directory=download_directory)
|
||||||
d = stream.start(stream_info, name)
|
d = self.streams[name].start(stream_info, name)
|
||||||
d.addCallback(lambda _: stream.downloader)
|
d.addCallback(lambda _: self.streams[name].downloader)
|
||||||
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -1054,16 +1055,39 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
return f
|
return f
|
||||||
|
|
||||||
def _get_json_for_return(f):
|
def _get_json_for_return(f):
|
||||||
if f:
|
def _generate_reply(size):
|
||||||
if f.key:
|
if f.key:
|
||||||
key = binascii.b2a_hex(f.key)
|
key = binascii.b2a_hex(f.key)
|
||||||
else:
|
else:
|
||||||
key = None
|
key = None
|
||||||
|
|
||||||
|
if os.path.isfile(os.path.join(self.download_directory, f.file_name)):
|
||||||
|
written_file = file(os.path.join(self.download_directory, f.file_name))
|
||||||
|
written_file.seek(0, os.SEEK_END)
|
||||||
|
written_bytes = written_file.tell()
|
||||||
|
written_file.close()
|
||||||
|
else:
|
||||||
|
written_bytes = False
|
||||||
|
|
||||||
|
if search_by == "name":
|
||||||
|
if val in self.streams.keys():
|
||||||
|
status = self.streams[val].code
|
||||||
|
else:
|
||||||
|
status = [False, False]
|
||||||
|
else:
|
||||||
|
status = [False, False]
|
||||||
|
|
||||||
t = {'completed': f.completed, 'file_name': f.file_name, 'key': key,
|
t = {'completed': f.completed, 'file_name': f.file_name, 'key': key,
|
||||||
'points_paid': f.points_paid, 'stopped': f.stopped, 'stream_hash': f.stream_hash,
|
'points_paid': f.points_paid, 'stopped': f.stopped, 'stream_hash': f.stream_hash,
|
||||||
'stream_name': f.stream_name, 'suggested_file_name': f.suggested_file_name,
|
'stream_name': f.stream_name, 'suggested_file_name': f.suggested_file_name,
|
||||||
'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash}
|
'upload_allowed': f.upload_allowed, 'sd_hash': f.sd_hash, 'total_bytes': size,
|
||||||
|
'written_bytes': written_bytes, 'code': status[0], 'message': status[1]}
|
||||||
return t
|
return t
|
||||||
|
|
||||||
|
if f:
|
||||||
|
d = f.get_total_bytes()
|
||||||
|
d.addCallback(_generate_reply)
|
||||||
|
return d
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,19 @@ from lbrynet.core.StreamDescriptor import download_sd_blob
|
||||||
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory
|
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory
|
||||||
from lbrynet.conf import DEFAULT_TIMEOUT
|
from lbrynet.conf import DEFAULT_TIMEOUT
|
||||||
|
|
||||||
|
INITIALIZING_CODE = 'initializing'
|
||||||
|
DOWNLOAD_METADATA_CODE = 'downloading_metadata'
|
||||||
|
DOWNLOAD_TIMEOUT_CODE = 'timeout'
|
||||||
|
DOWNLOAD_RUNNING_CODE = 'running'
|
||||||
|
DOWNLOAD_STOPPED_CODE = 'stopped'
|
||||||
|
STREAM_STAGES = [
|
||||||
|
(INITIALIZING_CODE, 'Initializing...'),
|
||||||
|
(DOWNLOAD_METADATA_CODE, 'Downloading metadata'),
|
||||||
|
(DOWNLOAD_RUNNING_CODE, 'Started stream'),
|
||||||
|
(DOWNLOAD_STOPPED_CODE, 'Paused stream'),
|
||||||
|
(DOWNLOAD_TIMEOUT_CODE, 'Stream timed out')
|
||||||
|
]
|
||||||
|
|
||||||
if sys.platform != "darwin":
|
if sys.platform != "darwin":
|
||||||
log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet")
|
log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet")
|
||||||
else:
|
else:
|
||||||
|
@ -56,6 +69,7 @@ class GetStream(object):
|
||||||
self.downloader = None
|
self.downloader = None
|
||||||
self.finished = defer.Deferred()
|
self.finished = defer.Deferred()
|
||||||
self.checker = LoopingCall(self.check_status)
|
self.checker = LoopingCall(self.check_status)
|
||||||
|
self.code = STREAM_STAGES[0]
|
||||||
|
|
||||||
def check_status(self):
|
def check_status(self):
|
||||||
self.timeout_counter += 1
|
self.timeout_counter += 1
|
||||||
|
@ -68,6 +82,7 @@ class GetStream(object):
|
||||||
log.info("Timeout downloading lbry://" + self.resolved_name + ", " + str(self.stream_info))
|
log.info("Timeout downloading lbry://" + self.resolved_name + ", " + str(self.stream_info))
|
||||||
self.checker.stop()
|
self.checker.stop()
|
||||||
self.d.cancel()
|
self.d.cancel()
|
||||||
|
self.code = STREAM_STAGES[4]
|
||||||
self.finished.callback(False)
|
self.finished.callback(False)
|
||||||
|
|
||||||
def start(self, stream_info, name):
|
def start(self, stream_info, name):
|
||||||
|
@ -104,10 +119,16 @@ class GetStream(object):
|
||||||
def _cause_timeout():
|
def _cause_timeout():
|
||||||
self.timeout_counter = self.timeout * 2
|
self.timeout_counter = self.timeout * 2
|
||||||
|
|
||||||
|
def _set_status(x, status):
|
||||||
|
self.code = next(s for s in STREAM_STAGES if s[0] == status)
|
||||||
|
return x
|
||||||
|
|
||||||
self.checker.start(1)
|
self.checker.start(1)
|
||||||
|
|
||||||
|
self.d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE))
|
||||||
self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
|
self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
|
||||||
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
|
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
|
||||||
|
self.d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
|
||||||
self.d.addCallback(lambda metadata: (
|
self.d.addCallback(lambda metadata: (
|
||||||
next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)),
|
next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)),
|
||||||
metadata))
|
metadata))
|
||||||
|
|
Loading…
Reference in a new issue