Have autofetcher use managed downloader

This commit is contained in:
Jack 2016-01-27 11:02:57 -05:00
parent 2bfbb824bf
commit cb5c772233
3 changed files with 33 additions and 11 deletions

View file

@ -267,7 +267,7 @@ class LBRYcrdWallet(object):
d.addCallback(set_address_for_peer) d.addCallback(set_address_for_peer)
return d return d
def get_stream_info_for_name(self, name): def get_stream_info_for_name(self, name, txid=None):
def get_stream_info_from_value(result): def get_stream_info_from_value(result):
r_dict = {} r_dict = {}
@ -277,7 +277,8 @@ class LBRYcrdWallet(object):
value_dict = json.loads(value) value_dict = json.loads(value)
except ValueError: except ValueError:
return Failure(InvalidStreamInfoError(name)) return Failure(InvalidStreamInfoError(name))
known_fields = ['stream_hash', 'name', 'description', 'key_fee', 'key_fee_address', 'thumbnail'] known_fields = ['stream_hash', 'name', 'description', 'key_fee', 'key_fee_address', 'thumbnail',
'content_license']
for field in known_fields: for field in known_fields:
if field in value_dict: if field in value_dict:
r_dict[field] = value_dict[field] r_dict[field] = value_dict[field]
@ -289,7 +290,7 @@ class LBRYcrdWallet(object):
return d return d
return Failure(UnknownNameError(name)) return Failure(UnknownNameError(name))
d = threads.deferToThread(self._get_value_for_name, name) d = threads.deferToThread(self._get_value_for_name, name, txid)
d.addCallback(get_stream_info_from_value) d.addCallback(get_stream_info_from_value)
return d return d
@ -594,9 +595,17 @@ class LBRYcrdWallet(object):
return rpc_conn.getnewaddress() return rpc_conn.getnewaddress()
@_catch_connection_error @_catch_connection_error
def _get_value_for_name(self, name): def _get_value_for_name(self, name, txid=None):
rpc_conn = self._get_rpc_conn() rpc_conn = self._get_rpc_conn()
return rpc_conn.getvalueforname(name) if not txid:
return rpc_conn.getvalueforname(name)
else:
claim = rpc_conn.getclaimsfortx(txid)[0]
if claim['name'] == name:
claim['txid'] = txid
return claim
else:
raise ValueError
@_catch_connection_error @_catch_connection_error
def _claim_name(self, name, value, amount): def _claim_name(self, name, value, amount):

View file

@ -803,7 +803,7 @@ class LBRYDaemon(xmlrpc.XMLRPC):
# if not os.path.isfile(metadata['file_path']): # if not os.path.isfile(metadata['file_path']):
# return defer.fail() # return defer.fail()
if not type(metadata['bid']) is float and metadata['bid'] > 0.0: if not isinstance(metadata['bid'], float) and metadata['bid'] > 0.0:
return defer.fail() return defer.fail()
name = metadata['name'] name = metadata['name']
@ -880,6 +880,14 @@ class LBRYDaemon(xmlrpc.XMLRPC):
return d return d
def xmlrpc_toggle_fetcher_verbose(self):
if self.fetcher.verbose:
self.fetcher.verbose = False
else:
self.fetcher.verbose = True
return self.fetcher.verbose
def main(): def main():
daemon = LBRYDaemon() daemon = LBRYDaemon()

View file

@ -94,7 +94,8 @@ class GetStream(object):
class FetcherDaemon(object): class FetcherDaemon(object):
def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf): def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf,
verbose=False):
self.autofetcher_conf = autofetcher_conf self.autofetcher_conf = autofetcher_conf
self.max_key_fee = 0.0 self.max_key_fee = 0.0
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
@ -107,6 +108,7 @@ class FetcherDaemon(object):
self.search = None self.search = None
self.first_run = True self.first_run = True
self.is_running = False self.is_running = False
self.verbose = verbose
self._get_autofetcher_conf() self._get_autofetcher_conf()
def start(self): def start(self):
@ -152,13 +154,16 @@ class FetcherDaemon(object):
" | stream hash: " + str(json.loads(claim['value'])['stream_hash']) " | stream hash: " + str(json.loads(claim['value'])['stream_hash'])
print msg print msg
log.debug(msg) log.debug(msg)
rtn.append(claim) rtn.append([claim['name'], t['txid']])
self.seen.append(claim) self.seen.append(claim)
else:
if self.verbose:
print "[" + str(datetime.now()) + "] No claims in block", c['bestblockhash']
self.lastbestblock = c self.lastbestblock = c
if len(rtn): if len(rtn):
return defer.succeed(rtn) return defer.DeferredList([self.wallet.get_stream_info_for_name(name, txid=t) for name, t in rtn])
def _download_claims(self, claims): def _download_claims(self, claims):
if claims: if claims:
@ -166,13 +171,13 @@ class FetcherDaemon(object):
download = defer.Deferred() download = defer.Deferred()
stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager, stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager,
self.max_key_fee, pay_key=False) self.max_key_fee, pay_key=False)
download.addCallback(lambda _: stream.start(claim)) download.addCallback(lambda _: stream.start(claim[1]))
download.callback(None) download.callback(None)
return defer.succeed(None) return defer.succeed(None)
def _looped_search(self): def _looped_search(self):
d = defer.Deferred(None) d = defer.Deferred()
d.addCallback(lambda _: self._get_names()) d.addCallback(lambda _: self._get_names())
d.addCallback(self._download_claims) d.addCallback(self._download_claims)
d.callback(None) d.callback(None)