Update lbrynet-daemon

-Get rid of separate db for daemon, instead use already existing files
-Simplify the get function
-Add start/stop functions for lbry files
This commit is contained in:
Jack 2016-01-19 21:07:16 -05:00
parent e2ae1ca866
commit 888d77c96c
2 changed files with 178 additions and 272 deletions

View file

@ -1,5 +1,3 @@
import binascii
import webbrowser
from lbrynet.core.Error import UnknownNameError
from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory
@ -20,29 +18,24 @@ from datetime import datetime
import logging
import os
import sys
import sqlite3
import json
import binascii
import webbrowser
log = logging.getLogger(__name__)
#TODO add login credentials in a conf file
#issues with delete:
#TODO when stream is complete the generated file doesn't delete, but blobs do
#TODO when stream is stopped the generated file is deleted
class DummyDownloader(object):
def __init__(self, directory, file_name):
self.download_directory = directory
self.file_name = file_name
class DummyStream(object):
def __init__(self, row):
download_directory = os.path.join(*row[2].split('/')[:-1])
file_name = row[2].split('/')[len(row[2].split('/')) - 1]
self.stream_hash = row[0]
self.downloader = DummyDownloader(download_directory, file_name)
self.is_dummy = True
#functions to add:
#TODO publish
#TODO send credits to address
#TODO get new address
#TODO alert if your copy of a lbry file is out of date with the name record
class LBRYDaemon(xmlrpc.XMLRPC):
@ -62,7 +55,6 @@ class LBRYDaemon(xmlrpc.XMLRPC):
self.peer_port = 3333
self.dht_node_port = 4444
self.first_run = False
self.current_db_revision = 1
if os.name == "nt":
from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle
self.download_directory = get_path(FOLDERID.Downloads, UserHandle.current)
@ -106,8 +98,6 @@ class LBRYDaemon(xmlrpc.XMLRPC):
self.session_settings = None
self.data_rate = 0.5
self.max_key_fee = 100.0
self.db = None
self.cur = None
return defer.succeed(None)
def _disp_startup():
@ -119,14 +109,13 @@ class LBRYDaemon(xmlrpc.XMLRPC):
d.addCallback(lambda _: threads.deferToThread(self._setup_data_directory))
d.addCallback(lambda _: self._check_db_migration())
d.addCallback(lambda _: self._get_settings())
d.addCallback(lambda _: self.get_lbrycrdd_path())
d.addCallback(lambda _: self._get_lbrycrdd_path())
d.addCallback(lambda _: self._get_session())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self._setup_stream_identifier())
d.addCallback(lambda _: self._setup_lbry_file_manager())
d.addCallback(lambda _: self._setup_lbry_file_opener())
d.addCallback(lambda _: self._setup_fetcher())
d.addCallback(lambda _: self._setup_daemon_db())
d.addCallback(lambda _: _disp_startup())
d.callback(None)
@ -184,38 +173,11 @@ class LBRYDaemon(xmlrpc.XMLRPC):
return d
return defer.succeed(True)
def _setup_daemon_db(self):
self.db = sqlite3.connect(os.path.join(self.db_dir, 'daemon.sqlite'))
self.cur = self.db.cursor()
query = "create table if not exists history \
(stream_hash char(96) primary key not null,\
uri text not null, \
path text not null);"
self.cur.execute(query)
self.db.commit()
r = self.cur.execute("select * from history")
files = r.fetchall()
print "Checking files in download history still exist, pruning records of those that don't"
for file in files:
if not os.path.isfile(file[2]):
print "Couldn't find", file[2], ", removing record"
self.cur.execute("delete from history where stream_hash='" + file[0] + "'")
self.db.commit()
print "Done checking records"
return defer.succeed(None)
def _get_settings(self):
d = self.settings.start()
d.addCallback(lambda _: self.settings.get_lbryid())
d.addCallback(self.set_lbryid)
d.addCallback(lambda _: self.get_lbrycrdd_path())
d.addCallback(lambda _: self._get_lbrycrdd_path())
return d
def set_lbryid(self, lbryid):
@ -285,7 +247,7 @@ class LBRYDaemon(xmlrpc.XMLRPC):
dl.addCallback(lambda _: self.session.setup())
return dl
def get_lbrycrdd_path(self):
def _get_lbrycrdd_path(self):
def get_lbrycrdd_path_conf_file():
lbrycrdd_path_conf_path = os.path.join(os.path.expanduser("~"), ".lbrycrddpath.conf")
if not os.path.exists(lbrycrdd_path_conf_path):
@ -336,91 +298,107 @@ class LBRYDaemon(xmlrpc.XMLRPC):
self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory)
return defer.succeed(True)
def _download_name(self, history, name):
def _disp(stream):
print '[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash)
log.debug('[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash))
return defer.succeed(None)
if history == 'UnknownNameError':
return 'UnknownNameError'
if not history:
stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager,
max_key_fee=self.max_key_fee, data_rate=self.data_rate)
self.downloads.append(stream)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallback(lambda stream_info: stream.start(stream_info))
d.addCallback(lambda _: _disp(stream))
d.addCallback(lambda _: {'ts': datetime.now(),'name': name})
d.addErrback(lambda err: str(err.getTraceback()))
def _download_name(self, name):
def _disp_file(file):
print '[' + str(datetime.now()) + ']' + ' Already downloaded: ' + str(file.stream_hash)
d = self._path_from_lbry_file(file)
return d
else:
self.downloads.append(DummyStream(history[0]))
return defer.succeed(None)
def _get_stream(name):
def _disp(stream):
print '[' + str(datetime.now()) + ']' + ' Start stream: ' + stream['stream_hash']
return stream
d = self.session.wallet.get_stream_info_for_name(name)
stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager,
max_key_fee=self.max_key_fee, data_rate=self.data_rate)
d.addCallback(_disp)
d.addCallback(lambda stream_info: stream.start(stream_info))
d.addCallback(lambda _: self._path_from_name(name))
return d
d = self._check_history(name)
d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file))
d.addCallback(lambda _: self._check_history(name))
d.addCallback(lambda lbry_file: self._path_from_lbry_file(lbry_file) if lbry_file else 'Not found')
d.addErrback(lambda err: str(err))
def _path_from_name(self, name):
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallback(lambda stream_info: stream_info['stream_hash'])
d.addCallback(lambda stream_hash: [{'stream_hash': stream.stream_hash,
'path': os.path.join(stream.downloader.download_directory,
stream.downloader.file_name)}
for stream in self.downloads if stream.stream_hash == stream_hash][0])
d.addErrback(lambda _: 'UnknownNameError')
return d
def _get_downloads(self):
downloads = []
for stream in self.downloads:
try:
downloads.append({'stream_hash': stream.stream_hash,
'path': os.path.join(stream.downloader.download_directory, stream.downloader.file_name)})
except:
pass
return downloads
def _resolve_name(self, name):
d = defer.Deferred()
d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name))
d.addErrback(lambda _: 'UnknownNameError')
d.callback(None)
d.addErrback(lambda _: defer.fail(UnknownNameError))
return d
def _check_history(self, name, metadata):
if metadata == 'UnknownNameError':
return 'UnknownNameError'
r = self.cur.execute("select * from history where stream_hash='" + metadata['stream_hash'] + "'")
files = r.fetchall()
def _resolve_name_wc(self, name):
d = defer.Deferred()
d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name))
d.addErrback(lambda _: defer.fail(UnknownNameError))
d.callback(None)
if files:
if not os.path.isfile(files[0][2]):
print "[" + str(datetime.now()) + "] Couldn't find", files[0][2], ", trying to redownload it"
self.cur.execute("delete from history where stream_hash='" + files[0][0] + "'")
self.db.commit()
return []
return d
def _check_history(self, name):
def _get_lbry_file(path):
f = open(path, 'r')
l = json.loads(f.read())
f.close()
file_name = l['stream_name'].decode('hex')
lbry_file = [file for file in self.lbry_file_manager.lbry_files if file.stream_name == file_name][0]
return lbry_file
def _check(info):
stream_hash = info['stream_hash']
path = os.path.join(self.blobfile_dir, stream_hash)
if os.path.isfile(path):
print "[" + str(datetime.now()) + "] Search for lbry_file, returning: " + stream_hash
return defer.succeed(_get_lbry_file(path))
else:
return files
print "[" + str(datetime.now()) + "] Search for lbry_file didn't return anything"
return defer.succeed(False)
d = self._resolve_name(name)
d.addCallbacks(_check, lambda _: False)
d.callback(None)
return d
def _delete_lbry_file(self, lbry_file):
d = self.lbry_file_manager.delete_lbry_file(lbry_file)
def finish_deletion(lbry_file):
d = lbry_file.delete_data()
d.addCallback(lambda _: _delete_stream_data(lbry_file))
return d
def _delete_stream_data(lbry_file):
s_h = lbry_file.stream_hash
d = self.lbry_file_manager.get_count_for_stream_hash(s_h)
# TODO: could possibly be a timing issue here
d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
return d
d.addCallback(lambda _: finish_deletion(lbry_file))
return d
def _path_from_name(self, name):
d = self._check_history(name)
d.addCallback(lambda lbry_file: {'stream_hash': lbry_file.stream_hash,
'path': os.path.join(self.download_directory, lbry_file.file_name)}
if lbry_file else defer.fail(UnknownNameError))
return d
def _path_from_lbry_file(self, lbry_file):
if lbry_file:
r = {'stream_hash': lbry_file.stream_hash,
'path': os.path.join(self.download_directory, lbry_file.file_name)}
return defer.succeed(r)
else:
return files
def _add_to_history(self, name, path):
if path == 'UnknownNameError':
return 'UnknownNameError'
r = self.cur.execute("select * from history where stream_hash='" + path['stream_hash'] + "'")
files = r.fetchall()
if not files:
vals = path['stream_hash'], name, path['path']
self.cur.execute("insert into history values (?, ?, ?)", vals)
self.db.commit()
else:
print '[' + str(datetime.now()) + '] Already downloaded', path['stream_hash'], '-->', path['path']
return path
return defer.fail(UnknownNameError)
def xmlrpc_get_settings(self):
"""
@ -483,7 +461,6 @@ class LBRYDaemon(xmlrpc.XMLRPC):
print 'Shutting down lbrynet daemon'
d = self._shutdown()
d.addCallback(lambda _: self.db.close())
d.addCallback(lambda _: _disp_shutdown())
d.addCallback(lambda _: reactor.stop())
d.callback(None)
@ -523,95 +500,53 @@ class LBRYDaemon(xmlrpc.XMLRPC):
"""
def _disp(info):
log.debug('[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info))
print '[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info)
log.debug('[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info['stream_hash']))
print '[' + str(datetime.now()) + ']' + ' Resolved info: ' + str(info['stream_hash'])
return info
d = defer.Deferred()
d.addCallback(lambda _: self.session.wallet.get_stream_info_for_name(name))
d = self._resolve_name(name)
d.addCallbacks(_disp, lambda _: str('UnknownNameError'))
d.callback(None)
return d
def xmlrpc_get_downloads(self):
"""
Get files downloaded in this session
@return: [{stream_hash, path}]
"""
downloads = []
for stream in self.downloads:
try:
downloads.append({'stream_hash': stream.stream_hash,
'path': os.path.join(stream.downloader.download_directory, stream.downloader.file_name)})
except:
pass
print '[' + str(datetime.now()) + '] Get downloads'
return downloads
def xmlrpc_download_name(self, name):
def xmlrpc_get(self, name):
"""
Download stream from a LBRY uri
@param: name
"""
def _disp(stream):
print '[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash)
log.debug('[' + str(datetime.now()) + ']' + ' Downloading: ' + str(stream.stream_hash))
return defer.succeed(None)
d = self._download_name(name)
stream = GetStream(self.sd_identifier, self.session, self.session.wallet, self.lbry_file_manager,
max_key_fee=self.max_key_fee, data_rate=self.data_rate)
self.downloads.append(stream)
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallback(lambda stream_info: stream.start(stream_info))
d.addCallback(lambda _: _disp(stream))
d.addCallback(lambda _: {'ts': datetime.now(),'name': name})
d.addErrback(lambda err: str(err.getTraceback()))
return d
def xmlrpc_path_from_name(self, name):
"""
Get file path for a downloaded name
def xmlrpc_stop_lbry_file(self, stream_hash):
try:
lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0]
except IndexError:
return defer.fail(UnknownNameError)
@param: name
@return: {stream_hash, path}:
"""
if not lbry_file.stopped:
d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file)
d.addCallback(lambda _: 'Stream has been stopped')
d.addErrback(lambda err: str(err))
return d
else:
return defer.succeed('Stream was already stopped')
d = self.session.wallet.get_stream_info_for_name(name)
d.addCallback(lambda stream_info: stream_info['stream_hash'])
d.addCallback(lambda stream_hash: [{'stream_hash': stream.stream_hash,
'path': os.path.join(stream.downloader.download_directory,
stream.downloader.file_name)}
for stream in self.downloads if stream.stream_hash == stream_hash][0])
d.addErrback(lambda _: 'UnknownNameError')
return d
def xmlrpc_start_lbry_file(self, stream_hash):
try:
lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0]
except IndexError:
return defer.fail(UnknownNameError)
def xmlrpc_get(self, name):
"""
Download a name and return the path of the resulting file
if lbry_file.stopped:
d = self.lbry_file_manager.toggle_lbry_file_running(lbry_file)
d.callback(None)
return defer.succeed('Stream started')
else:
return defer.succeed('Stream was already running')
@param: name:
@return: {stream_hash, path}:
"""
d = self._resolve_name(name)
d.addCallback(lambda metadata: self._check_history(name, metadata))
d.addCallback(lambda hist: self._download_name(hist, name))
d.addCallback(lambda _: self._path_from_name(name))
d.addCallback(lambda path: self._add_to_history(name, path))
return d
def xmlrpc_toggle_lbry_file_status(self, stream_hash):
d = self.lbry_file_manager.toggle_lbry_file_running(stream_hash)
d.addErrback(lambda err: str(err))
return d
def xmlrpc_render_html(self, html):
def _make_file(html, path):
@ -690,7 +625,7 @@ class LBRYDaemon(xmlrpc.XMLRPC):
filtered_results = [n for n in self.rpc_conn.getnametrie() if n['name'].startswith(search)]
filtered_results = [n for n in filtered_results if 'txid' in n.keys()]
resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name(n['name'])]) for n in filtered_results]
resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name_wc(n['name'])]) for n in filtered_results]
d = defer.DeferredList(resolved_results)
d.addCallback(_clean)
@ -699,6 +634,23 @@ class LBRYDaemon(xmlrpc.XMLRPC):
return d
def xmlrpc_delete_lbry_file(self, file_name):
def _disp(file_name):
print '[' + str(datetime.now()) + '] Deleted: ' + file_name
return defer.succeed(str('Deleted: ' + file_name))
lbry_files = [self._delete_lbry_file(f) for f in self.lbry_file_manager.lbry_files if file_name == f.file_name]
d = defer.DeferredList(lbry_files)
d.addCallback(lambda _: _disp(file_name))
return d
def xmlrpc_check(self, name):
d = self._check_history(name)
d.addCallback(lambda lbry_file: self._path_from_lbry_file(lbry_file) if lbry_file else 'Not found')
d.addErrback(lambda err: str(err))
return d
def main():
daemon = LBRYDaemon()

View file

@ -13,31 +13,23 @@ log = logging.getLogger(__name__)
class GetStream(object):
def __init__(self, sd_identifier, session, wallet, lbry_file_manager, max_key_fee, pay_key=True, data_rate=0.5):
self.finished_deferred = defer.Deferred(None)
self.wallet = wallet
self.resolved_name = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.data_rate = data_rate
self.pay_key = pay_key
self.name = None
self.session = session
self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
self.loading_metadata_deferred = defer.Deferred()
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.metadata = None
self.loading_failed = False
self.resolved_name = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.stream_hash = None
self.max_key_fee = max_key_fee
self.stream_info = None
self.stream_info_manager = None
self.downloader = None
self.data_rate = data_rate
self.pay_key = pay_key
def start(self, stream_info):
self.stream_info = stream_info
@ -52,18 +44,9 @@ class GetStream(object):
else:
self.key_fee = None
self.key_fee_address = None
self.stream_hash = self.stream_info['stream_hash']
elif 'stream_hash' in json.loads(self.stream_info['value']):
self.resolved_name = self.stream_info.get('name', None)
self.description = json.loads(self.stream_info['value']).get('description', None)
try:
if 'key_fee' in json.loads(self.stream_info['value']):
self.key_fee = float(json.loads(self.stream_info['value'])['key_fee'])
except ValueError:
self.key_fee = None
self.key_fee_address = json.loads(self.stream_info['value']).get('key_fee_address', None)
self.stream_hash = json.loads(self.stream_info['value'])['stream_hash']
self.stream_hash = self.stream_info['stream_hash']
else:
print 'InvalidStreamInfoError'
raise InvalidStreamInfoError(self.stream_info)
@ -72,71 +55,42 @@ class GetStream(object):
if self.pay_key:
print "Key fee (" + str(self.key_fee) + ") above limit of " + str(
self.max_key_fee) + ", didn't download lbry://" + str(self.resolved_name)
return self.finished_deferred.callback(None)
return defer.fail(None)
else:
pass
def _get_downloader_for_return():
return defer.succeed(self.downloader)
d = defer.Deferred(None)
d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
d.addCallback(lambda metadata:
metadata.factories[1].make_downloader(metadata, [self.data_rate, True], self.payment_rate_manager))
d.addErrback(lambda err: err.trap(defer.CancelledError))
d.addErrback(lambda err: log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback()))
d.addCallback(self._start_download)
d.callback(None)
return d
def _start_download(self, downloader):
def _pay_key_fee():
if self.key_fee is not None and self.key_fee_address is not None:
reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
print 'Key fee: ' + str(self.key_fee) + ' | ' + str(self.key_fee_address)
return self.wallet.send_points_to_address(reserved_points, self.key_fee)
return defer.succeed(None)
self.loading_metadata_deferred = defer.Deferred(None)
self.loading_metadata_deferred.addCallback(
lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.loading_metadata_deferred.addCallback(self._handle_metadata)
self.loading_metadata_deferred.addErrback(self._handle_load_canceled)
self.loading_metadata_deferred.addErrback(self._handle_load_failed)
if self.pay_key:
self.loading_metadata_deferred.addCallback(lambda _: self._pay_key_fee())
self.loading_metadata_deferred.addCallback(lambda _: self._make_downloader())
self.loading_metadata_deferred.addCallback(lambda _: self.downloader.start())
self.loading_metadata_deferred.addErrback(self._handle_download_error)
self.loading_metadata_deferred.addCallback(lambda _: _get_downloader_for_return())
self.loading_metadata_deferred.callback(None)
return defer.succeed(None)
def _pay_key_fee(self):
if self.key_fee is not None and self.key_fee_address is not None:
reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
print 'Key fee: ' + str(self.key_fee) + ' | ' + str(self.key_fee_address)
return self.wallet.send_points_to_address(reserved_points, self.key_fee)
return defer.succeed(None)
def _handle_load_canceled(self, err):
err.trap(defer.CancelledError)
self.finished_deferred.callback(None)
def _handle_load_failed(self, err):
self.loading_failed = True
log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())
print 'Load Failed: ', err.getTraceback()
self.finished_deferred.callback(None)
def _handle_metadata(self, metadata):
self.metadata = metadata
self.factory = self.metadata.factories[1]
return defer.succeed(None)
def _handle_download_error(self, err):
if err.check(InsufficientFundsError):
print "Download stopped due to insufficient funds."
d = _pay_key_fee()
else:
print "Autoaddstream: An unexpected error has caused the download to stop: ", err.getTraceback()
d = defer.Deferred()
def _make_downloader(self):
downloader.start()
def _set_downloader(downloader):
self.downloader = downloader
print "Downloading", self.stream_hash, "-->", os.path.join(self.downloader.download_directory,
self.downloader.file_name)
return self.downloader
print "Downloading", self.stream_hash, "-->", os.path.join(downloader.download_directory, downloader.file_name)
downloader = self.factory.make_downloader(self.metadata, [self.data_rate, True], self.payment_rate_manager)
downloader.addCallback(_set_downloader)
return downloader
return d
class FetcherDaemon(object):