Run AutoFetcher in the background

This commit is contained in:
Jack 2015-11-14 21:53:16 -05:00
parent f16eec2ad2
commit 6e93c68a77

View file

@ -3,6 +3,7 @@ import logging
from time import sleep from time import sleep
from bitcoinrpc.authproxy import AuthServiceProxy from bitcoinrpc.authproxy import AuthServiceProxy
from twisted.internet.task import LoopingCall
from zope.interface import implements from zope.interface import implements
#from lbrynet.core.StreamDescriptor import PlainStreamDescriptorWriter, BlobStreamDescriptorWriter #from lbrynet.core.StreamDescriptor import PlainStreamDescriptorWriter, BlobStreamDescriptorWriter
from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.PaymentRateManager import PaymentRateManager
@ -16,6 +17,7 @@ from lbrynet.core.Error import UnknownNameError, InvalidBlobHashError, Insuffici
from lbrynet.core.Error import InvalidStreamInfoError from lbrynet.core.Error import InvalidStreamInfoError
from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.utils import is_valid_blobhash
from twisted.internet import defer, threads from twisted.internet import defer, threads
import datetime
import os import os
@ -2386,15 +2388,13 @@ class AutoAddStream(CommandHandler):
self.payment_rate_manager = None self.payment_rate_manager = None
def start(self): def start(self):
self.console.sendLine("Started autoaddstream") pass
def _load_metadata(self, sd_file): def _load_metadata(self, sd_file):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _start_download(self): def _start_download(self):
self.console.sendLine("Autoaddstream _start_download, making downloader")
d = self._make_downloader() d = self._make_downloader()
self.console.sendLine("Autoaddstream starting download")
d.addCallback(lambda stream_downloader: stream_downloader.start()) d.addCallback(lambda stream_downloader: stream_downloader.start())
d.addErrback(self._handle_download_error) d.addErrback(self._handle_download_error)
return d return d
@ -2406,7 +2406,6 @@ class AutoAddStream(CommandHandler):
self.console.sendLine("Autoaddstream: An unexpected error has caused the download to stop: %s" % err.getTraceback()) self.console.sendLine("Autoaddstream: An unexpected error has caused the download to stop: %s" % err.getTraceback())
def _make_downloader(self): def _make_downloader(self):
self.console.sendLine("making downloader, factory: " + str(self.factory))
self.downloader = self.factory.make_downloader(self.metadata, [0.5, True], self.payment_rate_manager) self.downloader = self.factory.make_downloader(self.metadata, [0.5, True], self.payment_rate_manager)
return self.downloader return self.downloader
@ -2427,7 +2426,6 @@ class AutoAddStreamFromLBRYcrdName(AutoAddStream):
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.metadata = None self.metadata = None
self.loading_failed = False self.loading_failed = False
self.downloading_metadata_deferred = defer.Deferred()
self.resolved_name = None self.resolved_name = None
self.description = None self.description = None
self.key_fee = None self.key_fee = None
@ -2435,14 +2433,12 @@ class AutoAddStreamFromLBRYcrdName(AutoAddStream):
self.stream_hash = None self.stream_hash = None
def start(self, name): def start(self, name):
self.console.sendLine("Started AutoAddStreamFromLBRYcrdName, file: " + name)
self.name = name self.name = name
self.loading_metadata_deferred = defer.Deferred(None) self.loading_metadata_deferred = defer.Deferred(None)
self.loading_metadata_deferred.addCallback(lambda _: self._resolve_name(str(name))) self.loading_metadata_deferred.addCallback(lambda _: self._resolve_name(str(name)))
self.loading_metadata_deferred.addCallback(lambda stream_hash: download_sd_blob(self.session,
#self.downloading_metadata_deferred.addCallback(lambda _: self.loading_metadata_deferred.callback(None)) stream_hash, self.payment_rate_manager))
self.loading_metadata_deferred.addCallback(lambda stream_hash: download_sd_blob(self.session, stream_hash, self.payment_rate_manager))
self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob) self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.loading_metadata_deferred.addCallback(self._handle_metadata) self.loading_metadata_deferred.addCallback(self._handle_metadata)
#self.loading_metadata_deferred.addErrback(self._handle_load_canceled) #self.loading_metadata_deferred.addErrback(self._handle_load_canceled)
@ -2466,14 +2462,12 @@ class AutoAddStreamFromLBRYcrdName(AutoAddStream):
self.key_fee = None self.key_fee = None
self.key_fee_address = stream_info.get('key_fee_address', None) self.key_fee_address = stream_info.get('key_fee_address', None)
self.stream_hash = stream_info['stream_hash'] self.stream_hash = stream_info['stream_hash']
print "Stream info ", stream_info
return stream_info['stream_hash'] return stream_info['stream_hash']
d = self.wallet.get_stream_info_for_name(name) d = self.wallet.get_stream_info_for_name(name)
d.addCallback(get_name_from_info) d.addCallback(get_name_from_info)
return d return d
def _start_download(self): def _start_download(self):
self.console.sendLine("crd name _start_download")
d = self._pay_key_fee() d = self._pay_key_fee()
d.addCallback(lambda _: AutoAddStream._start_download(self)) d.addCallback(lambda _: AutoAddStream._start_download(self))
return d return d
@ -2484,7 +2478,7 @@ class AutoAddStreamFromLBRYcrdName(AutoAddStream):
if reserved_points is None: if reserved_points is None:
return defer.fail(InsufficientFundsError()) return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, self.key_fee) return self.wallet.send_points_to_address(reserved_points, self.key_fee)
self.console.sendLine("Sent key fee" + str(self.key_fee_address) + " | " + str(self.key_fee)) #self.console.sendLine("Sent key fee" + str(self.key_fee_address) + " | " + str(self.key_fee))
return defer.succeed(None) return defer.succeed(None)
def _handle_load_canceled(self, err): def _handle_load_canceled(self, err):
@ -2498,15 +2492,12 @@ class AutoAddStreamFromLBRYcrdName(AutoAddStream):
self.finished_deferred.callback(None) self.finished_deferred.callback(None)
def _handle_metadata(self, metadata): def _handle_metadata(self, metadata):
self.console.sendLine("Metadata: " + str(metadata))
self.metadata = metadata self.metadata = metadata
self.factory = self.metadata.factories[0] self.factory = self.metadata.factories[0]
self.console.sendLine("Factory: " + str(self.factory)) #self.console.sendLine("Factory: " + str(self.factory))
self.finished_deferred.addCallback(lambda _: self.console.sendLine("Factoryx " + str(self.factory)))
self.finished_deferred.addCallback(lambda _: AutoAddStream.start(self)) self.finished_deferred.addCallback(lambda _: AutoAddStream.start(self))
self.finished_deferred.addCallback(lambda _: self._start_download()) self.finished_deferred.addCallback(lambda _: self._start_download())
self.finished_deferred.addCallback(lambda _: AutoAddStream._start_download(self))
class AutoFetcher(CommandHandler): class AutoFetcher(CommandHandler):
@ -2524,49 +2515,106 @@ class AutoFetcher(CommandHandler):
self.rpc_conn = self.wallet.get_rpc_conn_x() self.rpc_conn = self.wallet.get_rpc_conn_x()
def start(self): def start(self):
self.d = self._get_names() #TODO add a stop function
self.d.addCallback(self._download_claims) #TODO detect if it's already running
self.d.addErrback(self.finished_deferred.callback(None)) #TODO first search through the nametrie before monitoring live updates
#TODO load previously downloaded streams
search = LoopingCall(self._looped_search)
search.start(10)
self.finished_deferred.callback(None)
def _get_names(self): def _get_names(self):
c = self.rpc_conn.getblockchaininfo() c = self.rpc_conn.getblockchaininfo()
rtn = []
if self.lastbestblock != c: if self.lastbestblock != c:
block = self.rpc_conn.getblock(c['bestblockhash']) block = self.rpc_conn.getblock(c['bestblockhash'])
txids = block['tx'] txids = block['tx']
transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids] transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids]
rtn = []
for t in transactions: for t in transactions:
#claims = self.rpc_conn.getclaimsfortx(t['txid']) claims = self.rpc_conn.getclaimsfortx(t['txid'])
claims = self.rpc_conn.getclaimsfortx("c3684bd587856ba5cc38c4afdbcd2c6efc60cb2d1ed21188485ea58048b419a8") #claims = self.rpc_conn.getclaimsfortx("c3684bd587856ba5cc38c4afdbcd2c6efc60cb2d1ed21188485ea58048b419a8")
if claims: if claims:
for claim in claims: for claim in claims:
if claim not in self.seen: if claim not in self.seen:
#self.console.sendLine(str(claim))
self.console.sendLine("lbry://" + str(claim['name']) + " | stream hash: " + self.console.sendLine("lbry://" + str(claim['name']) + " | stream hash: " +
str(json.loads(claim['value'])['stream_hash'])) str(json.loads(claim['value'])['stream_hash']))
rtn.append(claim['name']) rtn.append(claim['name'])
self.seen.append(claim) self.seen.append(claim)
if not len(rtn): else:
self.console.sendLine("No claims in block " + str(block['hash']) + ", height: " + str(block['height'])) #self.console.sendLine("No new claims in block #" + str(block['height']))
pass
self.lastbestblock = c self.lastbestblock = c
if len(rtn): if len(rtn):
return defer.succeed(rtn) return defer.succeed(rtn)
else:
return defer.failure(None)
def _download_claims(self, claims): def _download_claims(self, claims):
for claim in claims: if claims:
download = defer.Deferred() for claim in claims:
stream = AutoAddStreamFromLBRYcrdName(self.console, self.sd_identifier, self.session, download = defer.Deferred()
self.wallet, self.lbry_file_manager) stream = AutoAddStreamFromLBRYcrdName(self.console, self.sd_identifier, self.session,
download.addCallback(lambda _: stream.start(str(claim))) self.wallet, self.lbry_file_manager)
download.callback(None) download.addCallback(lambda _: stream.start(str(claim)))
self.console.sendLine("Download complete") download.callback(None)
return defer.succeed(None) return defer.succeed(None)
def _looped_search(self):
d = defer.Deferred(None)
d.addCallback(lambda _: self._get_names())
d.addCallback(self._download_claims)
d.callback(None)
class AutoFetcherFactory(CommandHandlerFactory): class AutoFetcherFactory(CommandHandlerFactory):
control_handler_class = AutoFetcher control_handler_class = AutoFetcher
command = "start-autofetching" command = "start-autofetching"
short_help = "Download all lbry files as they are published" short_help = "Download all lbry files as they are published"
class BlockchainStatus(CommandHandler):
def __init__(self, console, wallet=None):
CommandHandler.__init__(self, console)
self.wallet = wallet
def start(self):
d = self.wallet.get_most_recent_blocktime()
d.addCallbacks(self._show_time_behind_blockchain, self._show_error)
d.chainDeferred(self.finished_deferred)
return d
def _show_time_behind_blockchain(self, best_block_time):
best_time = datetime.datetime.utcfromtimestamp(best_block_time)
diff = datetime.datetime.utcnow() - best_time
unit = None
val = None
if diff.days > 0:
if diff.days >= 7:
val = diff.days // 7
unit = "week"
else:
val = diff.days
unit = "day"
elif diff.seconds >= 60 * 90:
if diff.seconds >= 60 * 60:
val = diff.seconds // (60 * 60)
unit = "hour"
if unit is not None:
if val != 1:
unit += "s"
self.console.sendLine("This application is %d %s behind the LBC blockchain." % (val, unit))
else:
self.console.sendLine("This application is up to date with the LBC blockchain.")
def _show_error(self, err):
logging.error(err.getTraceback())
self.console.sendLine("Unable to determine the status of the blockchain.")
class BlockchainStatusFactory(CommandHandlerFactory):
control_handler_class = BlockchainStatus
command = "get-blockchain-status"
short_help = "Show whether this application has caught up with the LBC blockchain"
full_help = "Show whether this applications has caught up with the LBC blockchain"