Update lbrynet-daemon

Add auto fetcher to fetch new name claims
This commit is contained in:
Jack 2015-12-14 12:59:44 -05:00
parent 0eecfb52ed
commit a1861a2f84
2 changed files with 161 additions and 11 deletions

View file

@ -1,7 +1,7 @@
import json import json
import logging import logging
import os import os
from datetime import datetime
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.task import LoopingCall from twisted.internet.task import LoopingCall
from lbrynet.core.Error import InvalidStreamInfoError, InsufficientFundsError from lbrynet.core.Error import InvalidStreamInfoError, InsufficientFundsError
@ -145,11 +145,27 @@ class GetStream(object):
self.description = self.stream_info['description'] self.description = self.stream_info['description']
if 'key_fee' in self.stream_info.keys(): if 'key_fee' in self.stream_info.keys():
self.key_fee = float(self.stream_info['key_fee']) self.key_fee = float(self.stream_info['key_fee'])
self.key_fee_address = self.stream_info['key_fee_address'] if 'key_fee_address' in self.stream_info.keys():
self.key_fee_address = self.stream_info['key_fee_address']
else:
self.key_fee_address = None
else: else:
self.key_fee = None self.key_fee = None
self.key_fee_address = None
self.stream_hash = self.stream_info['stream_hash'] self.stream_hash = self.stream_info['stream_hash']
elif 'stream_hash' in json.loads(self.stream_info['value']):
self.resolved_name = self.stream_info.get('name', None)
self.description = json.loads(self.stream_info['value']).get('description', None)
try:
if 'key_fee' in json.loads(self.stream_info['value']):
self.key_fee = float(json.loads(self.stream_info['value'])['key_fee'])
except ValueError:
self.key_fee = None
self.key_fee_address = json.loads(self.stream_info['value']).get('key_fee_address', None)
self.stream_hash = json.loads(self.stream_info['value'])['stream_hash']
else: else:
print 'InvalidStreamInfoError'
raise InvalidStreamInfoError(self.stream_info) raise InvalidStreamInfoError(self.stream_info)
if self.key_fee > self.max_key_fee: if self.key_fee > self.max_key_fee:
@ -212,7 +228,7 @@ class GetStream(object):
def _set_downloader(downloader): def _set_downloader(downloader):
self.downloader = downloader self.downloader = downloader
print os.path.join(self.downloader.download_directory, self.downloader.file_name) print "Downloading", self.stream_hash, " -->", os.path.join(self.downloader.download_directory, self.downloader.file_name)
return self.downloader return self.downloader
self.downloader = self.factory.make_downloader(self.metadata, [0.5, True], self.payment_rate_manager) self.downloader = self.factory.make_downloader(self.metadata, [0.5, True], self.payment_rate_manager)
@ -278,9 +294,9 @@ class AutoFetcher(object):
transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids] transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids]
for t in transactions: for t in transactions:
claims = self.rpc_conn.getclaimsfortx(t['txid']) claims = self.rpc_conn.getclaimsfortx(t['txid'])
# uncomment to make it download lbry://yyyy on startup
# if self.first_run: # if self.first_run:
# claims = self.rpc_conn.getclaimsfortx("43a784085949f7bebe5c2a2b74f4e2c6abec36219a5d04d285206b4056ea218b") # #claims = self.rpc_conn.getclaimsfortx("96aca2c60efded5806b7336430c5987b9092ffbea9c6ed444e3bf8e008993e11")
# claims = self.rpc_conn.getclaimsfortx("cc9c7f5225ecb38877e6ca7574d110b23214ac3556b9d65784065ad3a85b4f74")
# self.first_run = False # self.first_run = False
if claims: if claims:
for claim in claims: for claim in claims:
@ -330,3 +346,107 @@ class AutoFetcher(object):
print "maxkey=1.0" print "maxkey=1.0"
self.max_key_fee = settings["maxkey"] self.max_key_fee = settings["maxkey"]
class FetcherDaemon(object):
def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf):
self.autofetcher_conf = autofetcher_conf
self.max_key_fee = 0.0
self.sd_identifier = sd_identifier
self.wallet = wallet
self.session = session
self.lbry_file_manager = lbry_file_manager
self.lbry_metadata_manager = lbry_file_metadata_manager
self.seen = []
self.lastbestblock = None
self.rpc_conn = self.wallet.get_rpc_conn_x()
self.search = None
self.first_run = True
self.is_running = False
self._get_autofetcher_conf()
def start(self):
if not self.is_running:
self.is_running = True
self.search = LoopingCall(self._looped_search)
self.search.start(1)
else:
print "Autofetcher is already running"
def stop(self):
if self.is_running:
self.search.stop()
self.is_running = False
else:
print "Autofetcher isn't running, there's nothing to stop"
def check_if_running(self):
if self.is_running:
msg = "Autofetcher is running\n"
msg += "Last block hash: " + str(self.lastbestblock['bestblockhash'])
else:
msg = "Autofetcher is not running"
return msg
def _get_names(self):
c = self.rpc_conn.getblockchaininfo()
rtn = []
if self.lastbestblock != c:
block = self.rpc_conn.getblock(c['bestblockhash'])
txids = block['tx']
transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids]
for t in transactions:
claims = self.rpc_conn.getclaimsfortx(t['txid'])
# if self.first_run:
# # claims = self.rpc_conn.getclaimsfortx("96aca2c60efded5806b7336430c5987b9092ffbea9c6ed444e3bf8e008993e11")
# # claims = self.rpc_conn.getclaimsfortx("cc9c7f5225ecb38877e6ca7574d110b23214ac3556b9d65784065ad3a85b4f74")
# self.first_run = False
if claims:
for claim in claims:
if claim not in self.seen:
msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \
" | stream hash: " + str(json.loads(claim['value'])['stream_hash'])
print msg
log.debug(msg)
rtn.append(claim)
self.seen.append(claim)
else:
# self.console.sendLine("No new claims in block #" + str(block['height']))
pass
self.lastbestblock = c
if len(rtn):
return defer.succeed(rtn)
def _download_claims(self, claims):
if claims:
for claim in claims:
download = defer.Deferred()
stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager, self.max_key_fee)
download.addCallback(lambda _: stream.start(claim))
download.callback(None)
return defer.succeed(None)
def _looped_search(self):
d = defer.Deferred(None)
d.addCallback(lambda _: self._get_names())
d.addCallback(self._download_claims)
d.callback(None)
def _get_autofetcher_conf(self):
settings = {"maxkey": "0.0"}
if os.path.exists(self.autofetcher_conf):
conf = open(self.autofetcher_conf)
for l in conf:
if l.startswith("maxkey="):
settings["maxkey"] = float(l[7:].rstrip('\n'))
else:
print "Autofetcher using default max key price of 0.0"
print "To change this create the file:"
print str(self.autofetcher_conf)
print "Example contents of conf file:"
print "maxkey=1.0"
self.max_key_fee = settings["maxkey"]

View file

@ -1,7 +1,7 @@
from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory from lbrynet.lbryfile.client.LBRYFileDownloader import LBRYFileSaverFactory, LBRYFileOpenerFactory
from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.core.client.AutoDownloader import GetStream from lbrynet.core.client.AutoDownloader import GetStream, FetcherDaemon
from lbrynet.core.utils import generate_id from lbrynet.core.utils import generate_id
from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE
@ -27,6 +27,7 @@ class LBRYDaemon(xmlrpc.XMLRPC):
def setup(self): def setup(self):
def _set_vars(): def _set_vars():
self.fetcher = None
self.current_db_revision = 1 self.current_db_revision = 1
self.run_server = True self.run_server = True
self.session = None self.session = None
@ -67,6 +68,7 @@ class LBRYDaemon(xmlrpc.XMLRPC):
self.wallet_type = "lbrycrd" self.wallet_type = "lbrycrd"
self.lbrycrd_dir = os.path.join(os.path.expanduser("~"), ".lbrycrd") self.lbrycrd_dir = os.path.join(os.path.expanduser("~"), ".lbrycrd")
self.lbrycrd_conf = os.path.join(self.lbrycrd_dir, "lbrycrd.conf") self.lbrycrd_conf = os.path.join(self.lbrycrd_dir, "lbrycrd.conf")
self.autofetcher_conf = os.path.join(self.lbrycrd_dir, "autofetcher.conf")
self.rpc_conn = None self.rpc_conn = None
self.files = [] self.files = []
self.created_data_dir = False self.created_data_dir = False
@ -86,10 +88,16 @@ class LBRYDaemon(xmlrpc.XMLRPC):
d.addCallback(lambda _: self._setup_stream_identifier()) d.addCallback(lambda _: self._setup_stream_identifier())
d.addCallback(lambda _: self._setup_lbry_file_manager()) d.addCallback(lambda _: self._setup_lbry_file_manager())
d.addCallback(lambda _: self._setup_lbry_file_opener()) d.addCallback(lambda _: self._setup_lbry_file_opener())
d.addCallback(lambda _: self._setup_fetcher())
d.callback(None) d.callback(None)
return defer.succeed(None) return defer.succeed(None)
def _setup_fetcher(self):
self.fetcher = FetcherDaemon(self.session, self.lbry_file_manager, self.lbry_file_metadata_manager,
self.session.wallet, self.sd_identifier, self.autofetcher_conf)
return defer.succeed(None)
def _setup_data_directory(self): def _setup_data_directory(self):
print "Loading databases..." print "Loading databases..."
if self.created_data_dir: if self.created_data_dir:
@ -194,13 +202,9 @@ class LBRYDaemon(xmlrpc.XMLRPC):
self.rpc_conn = self.session.wallet.get_rpc_conn_x() self.rpc_conn = self.session.wallet.get_rpc_conn_x()
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True) dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
dl.addCallback(combine_results) dl.addCallback(combine_results)
dl.addCallback(create_session) dl.addCallback(create_session)
dl.addCallback(lambda _: self.session.setup()) dl.addCallback(lambda _: self.session.setup())
return dl return dl
def get_lbrycrdd_path(self): def get_lbrycrdd_path(self):
@ -254,10 +258,36 @@ class LBRYDaemon(xmlrpc.XMLRPC):
self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory) self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory)
return defer.succeed(True) return defer.succeed(True)
def xmlrpc_start_fetcher(self):
"""
Start autofetcher
"""
self.fetcher.start()
return str('Started autofetching')
def xmlrpc_stop_fetcher(self):
"""
Start autofetcher
"""
self.fetcher.stop()
return str('Started autofetching')
def xmlrpc_fetcher_status(self):
"""
Start autofetcher
"""
return str(self.fetcher.check_if_running())
def xmlrpc_get_balance(self): def xmlrpc_get_balance(self):
""" """
Get LBC balance Get LBC balance
""" """
return str(self.session.wallet.wallet_balance) return str(self.session.wallet.wallet_balance)
def xmlrpc_stop(self): def xmlrpc_stop(self):
@ -324,7 +354,7 @@ class LBRYDaemon(xmlrpc.XMLRPC):
# d.addCallback(lambda _: self.files.append({'name': name, 'stream_hash': stream.stream_hash, # d.addCallback(lambda _: self.files.append({'name': name, 'stream_hash': stream.stream_hash,
# 'path': os.path.join(stream.downloader.download_directory, stream.downloader.file_name)})) # 'path': os.path.join(stream.downloader.download_directory, stream.downloader.file_name)}))
d.addCallback(lambda _: {'ts': datetime.now(),'name': name}) d.addCallback(lambda _: {'ts': datetime.now(),'name': name})
d.addErrback(lambda _: 'UnknownNameError') d.addErrback(lambda err: str(err.getTraceback()))
return d return d