Improved autofetcher

Added commands:

start-autofetcher
stop-autofetcher
autofetcher-status
This commit is contained in:
Jack 2015-11-19 14:43:36 -05:00
parent a05f2fe3fb
commit 45eb4f7201
3 changed files with 237 additions and 195 deletions

View file

@ -0,0 +1,196 @@
import json
import logging
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet.core.Error import InvalidStreamInfoError, InsufficientFundsError
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.StreamDescriptor import download_sd_blob
log = logging.getLogger(__name__)
class AutoAddStreamFromLBRYcrdName(object):
def __init__(self, console, sd_identifier, session, wallet, lbry_file_manager):
self.finished_deferred = defer.Deferred(None)
self.console = console
self.wallet = wallet
self.resolved_name = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.name = None
self.session = session
self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
self.loading_metadata_deferred = defer.Deferred()
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.metadata = None
self.loading_failed = False
self.resolved_name = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.stream_hash = None
def start(self, stream_info):
self.stream_info = stream_info
if 'stream_hash' not in json.loads(self.stream_info['value']):
print 'InvalidStreamInfoError'
raise InvalidStreamInfoError(self.stream_info)
self.resolved_name = self.stream_info.get('name', None)
self.description = json.loads(self.stream_info['value']).get('description', None)
try:
if 'key_fee' in json.loads(self.stream_info['value']):
self.key_fee = float(json.loads(self.stream_info['value'])['key_fee'])
except ValueError:
self.key_fee = None
self.key_fee_address = json.loads(self.stream_info['value']).get('key_fee_address', None)
self.stream_hash = json.loads(self.stream_info['value'])['stream_hash']
self.loading_metadata_deferred = defer.Deferred(None)
self.loading_metadata_deferred.addCallback(lambda _: download_sd_blob(self.session,
self.stream_hash, self.payment_rate_manager))
self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.loading_metadata_deferred.addCallback(self._handle_metadata)
self.loading_metadata_deferred.addErrback(self._handle_load_canceled)
self.loading_metadata_deferred.addErrback(self._handle_load_failed)
self.finished_deferred.addCallback(lambda _: self.loading_metadata_deferred.callback(None))
return self.finished_deferred.callback(None)
def _start_download(self):
d = self._pay_key_fee()
d.addCallback(lambda _: self._make_downloader())
d.addCallback(lambda stream_downloader: stream_downloader.start())
d.addErrback(self._handle_download_error)
return d
def _pay_key_fee(self):
if self.key_fee is not None and self.key_fee_address is not None:
reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, self.key_fee)
self.console.sendLine("Sent key fee" + str(self.key_fee_address) + " | " + str(self.key_fee))
return defer.succeed(None)
def _handle_load_canceled(self, err):
err.trap(defer.CancelledError)
self.finished_deferred.callback(None)
def _handle_load_failed(self, err):
self.loading_failed = True
self.console.sendLine("handle load failed: " + str(err.getTraceback()))
log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())
self.finished_deferred.callback(None)
def _handle_metadata(self, metadata):
self.metadata = metadata
self.factory = self.metadata.factories[0]
self.finished_deferred.addCallback(lambda _: self._start_download())
def _handle_download_error(self, err):
if err.check(InsufficientFundsError):
self.console.sendLine("Download stopped due to insufficient funds.")
else:
self.console.sendLine("Autoaddstream: An unexpected error has caused the download to stop: %s" % err.getTraceback())
def _make_downloader(self):
self.downloader = self.factory.make_downloader(self.metadata, [0.5, True], self.payment_rate_manager)
return self.downloader
class AutoFetcher(object):
def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier):
self.console = None
self.sd_identifier = sd_identifier
self.wallet = wallet
self.session = session
self.lbry_file_manager = lbry_file_manager
self.lbry_metadata_manager = lbry_file_metadata_manager
self.seen = []
self.lastbestblock = None
self.rpc_conn = self.wallet.get_rpc_conn_x()
self.search = None
self.first_run = True
self.is_running = False
def start(self, console):
#TODO first search through the nametrie before monitoring live updates
#TODO load previously downloaded streams
self.console = console
if not self.is_running:
self.is_running = True
self.search = LoopingCall(self._looped_search)
self.search.start(1)
else:
self.console.sendLine("Autofetcher is already running")
def stop(self, console):
self.console = console
if self.is_running:
self.search.stop()
self.is_running = False
else:
self.console.sendLine("Autofetcher isn't running, there's nothing to stop")
def check_if_running(self, console):
self.console = console
if self.is_running:
self.console.sendLine("Autofetcher is running")
self.console.sendLine("Last block hash: " + str(self.lastbestblock['bestblockhash']))
else:
self.console.sendLine("Autofetcher is not running")
def _get_names(self):
c = self.rpc_conn.getblockchaininfo()
rtn = []
if self.lastbestblock != c:
block = self.rpc_conn.getblock(c['bestblockhash'])
txids = block['tx']
transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids]
for t in transactions:
claims = self.rpc_conn.getclaimsfortx(t['txid'])
# uncomment to make it download lbry://y on startup
# if self.first_run:
# claims = self.rpc_conn.getclaimsfortx("38da801a2c3620a79252e5b0b619d1d3f4d53aa9edd9cdc76d9ba65660fb9f06")
# self.first_run = False
if claims:
for claim in claims:
if claim not in self.seen:
self.console.sendLine("lbry://" + str(claim['name']) + " | stream hash: " +
str(json.loads(claim['value'])['stream_hash']))
rtn.append(claim)
self.seen.append(claim)
else:
#self.console.sendLine("No new claims in block #" + str(block['height']))
pass
self.lastbestblock = c
if len(rtn):
return defer.succeed(rtn)
def _download_claims(self, claims):
if claims:
for claim in claims:
download = defer.Deferred()
stream = AutoAddStreamFromLBRYcrdName(self.console, self.sd_identifier, self.session,
self.wallet, self.lbry_file_manager)
download.addCallback(lambda _: stream.start(claim))
download.callback(None)
return defer.succeed(None)
def _looped_search(self):
d = defer.Deferred(None)
d.addCallback(lambda _: self._get_names())
d.addCallback(self._download_claims)
d.callback(None)

View file

@ -2362,216 +2362,52 @@ class StatusFactory(CommandHandlerFactory):
"to remove the file." "to remove the file."
class AutoAddStream(CommandHandler): class AutoFetcherStart(CommandHandler):
def __init__(self, console): def __init__(self, console, autofetcher):
CommandHandler.__init__(self, console) CommandHandler.__init__(self, console)
self.session = None self.autofetcher = autofetcher
self.loading_metadata_deferred = None
self.lbry_file_manager = None
self.sd_identifier = None
self.metadata = None
self.downloader = None
self.loading_failed = False
self.factory = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.payment_rate_manager = None
def start(self): def start(self):
pass self.autofetcher.start(self.console)
def _load_metadata(self, sd_file):
return defer.fail(NotImplementedError())
def _start_download(self):
d = self._make_downloader()
d.addCallback(lambda stream_downloader: stream_downloader.start())
d.addErrback(self._handle_download_error)
return d
def _handle_download_error(self, err):
if err.check(InsufficientFundsError):
self.console.sendLine("Download stopped due to insufficient funds.")
else:
self.console.sendLine("Autoaddstream: An unexpected error has caused the download to stop: %s" % err.getTraceback())
def _make_downloader(self):
self.downloader = self.factory.make_downloader(self.metadata, [0.5, True], self.payment_rate_manager)
return self.downloader
class AutoAddStreamFromLBRYcrdName(AutoAddStream):
def __init__(self, console, sd_identifier, session, wallet, lbry_file_manager):
AutoAddStream.__init__(self, console)
self.wallet = wallet
self.resolved_name = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.name = None
self.session = session
self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
self.loading_metadata_deferred = defer.Deferred()
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.metadata = None
self.loading_failed = False
self.resolved_name = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.stream_hash = None
def start(self, name):
self.name = name
self.loading_metadata_deferred = defer.Deferred(None)
self.loading_metadata_deferred.addCallback(lambda _: self._resolve_name(str(name)))
self.loading_metadata_deferred.addCallback(lambda stream_hash: download_sd_blob(self.session,
stream_hash, self.payment_rate_manager))
self.loading_metadata_deferred.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.loading_metadata_deferred.addCallback(self._handle_metadata)
#self.loading_metadata_deferred.addErrback(self._handle_load_canceled)
#self.loading_metadata_deferred.addErrback(self._handle_load_failed)
self.finished_deferred.addCallback(lambda _: self.loading_metadata_deferred.callback(None))
return self.finished_deferred.callback(None)
def _resolve_name(self, name):
def get_name_from_info(stream_info):
if 'stream_hash' not in stream_info:
print 'InvalidStreamInfoError'
raise InvalidStreamInfoError(name)
self.resolved_name = stream_info.get('name', None)
self.description = stream_info.get('description', None)
try:
if 'key_fee' in stream_info:
self.key_fee = float(stream_info['key_fee'])
except ValueError:
self.key_fee = None
self.key_fee_address = stream_info.get('key_fee_address', None)
self.stream_hash = stream_info['stream_hash']
return stream_info['stream_hash']
d = self.wallet.get_stream_info_for_name(name)
d.addCallback(get_name_from_info)
return d
def _start_download(self):
d = self._pay_key_fee()
d.addCallback(lambda _: AutoAddStream._start_download(self))
return d
def _pay_key_fee(self):
if self.key_fee is not None and self.key_fee_address is not None:
reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, self.key_fee)
#self.console.sendLine("Sent key fee" + str(self.key_fee_address) + " | " + str(self.key_fee))
return defer.succeed(None)
def _handle_load_canceled(self, err):
err.trap(defer.CancelledError)
self.finished_deferred.callback(None) self.finished_deferred.callback(None)
def _handle_load_failed(self, err):
self.loading_failed = True
self.console.sendLine("handle load failed: " + str(err.getTraceback()))
log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback())
self.finished_deferred.callback(None)
def _handle_metadata(self, metadata): class AutoFetcherStop(CommandHandler):
self.metadata = metadata def __init__(self, console, autofetcher):
self.factory = self.metadata.factories[0]
#self.console.sendLine("Factory: " + str(self.factory))
self.finished_deferred.addCallback(lambda _: AutoAddStream.start(self))
self.finished_deferred.addCallback(lambda _: self._start_download())
class AutoFetcher(CommandHandler):
def __init__(self, console, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier):
CommandHandler.__init__(self, console) CommandHandler.__init__(self, console)
self.d = None self.autofetcher = autofetcher
self.console = console
self.sd_identifier = sd_identifier
self.wallet = wallet
self.session = session
self.lbry_file_manager = lbry_file_manager
self.lbry_metadata_manager = lbry_file_metadata_manager
self.seen = []
self.lastbestblock = None
self.rpc_conn = self.wallet.get_rpc_conn_x()
self.search = None
self.first_run = True
def start(self): def start(self):
#TODO add a stop function self.autofetcher.stop(self.console)
#TODO detect if it's already running
#TODO first search through the nametrie before monitoring live updates
#TODO load previously downloaded streams
self.search = LoopingCall(self._looped_search)
self.search.start(1)
self.finished_deferred.callback(None) self.finished_deferred.callback(None)
def stop(self):
self.search.stop() class AutoFetcherStatus(CommandHandler):
def __init__(self, console, autofetcher):
CommandHandler.__init__(self, console)
self.autofetcher = autofetcher
def start(self):
self.autofetcher.check_if_running(self.console)
self.finished_deferred.callback(None) self.finished_deferred.callback(None)
def _get_names(self):
c = self.rpc_conn.getblockchaininfo()
rtn = []
if self.lastbestblock != c:
block = self.rpc_conn.getblock(c['bestblockhash'])
txids = block['tx']
transactions = [self.rpc_conn.decoderawtransaction(self.rpc_conn.getrawtransaction(t)) for t in txids]
for t in transactions:
claims = self.rpc_conn.getclaimsfortx(t['txid'])
# uncomment to make it download wonderfullife on startup
# if self.first_run:
# claims = self.rpc_conn.getclaimsfortx("c3684bd587856ba5cc38c4afdbcd2c6efc60cb2d1ed21188485ea58048b419a8")
# self.first_run = False
if claims: class AutoFetcherStartFactory(CommandHandlerFactory):
for claim in claims: control_handler_class = AutoFetcherStart
if claim not in self.seen: command = "start-autofetcher"
self.console.sendLine("lbry://" + str(claim['name']) + " | stream hash: " + short_help = "Start downloading all lbry files as they are published"
str(json.loads(claim['value'])['stream_hash']))
rtn.append(claim['name'])
self.seen.append(claim)
else:
self.console.sendLine("No new claims in block #" + str(block['height']))
#pass
self.lastbestblock = c
if len(rtn): class AutoFetcherStopFactory(CommandHandlerFactory):
return defer.succeed(rtn) control_handler_class = AutoFetcherStop
command = "stop-autofetcher"
short_help = "Stop downloading all lbry files as they are published"
def _download_claims(self, claims):
if claims:
for claim in claims:
download = defer.Deferred()
stream = AutoAddStreamFromLBRYcrdName(self.console, self.sd_identifier, self.session,
self.wallet, self.lbry_file_manager)
download.addCallback(lambda _: stream.start(str(claim)))
download.callback(None)
return defer.succeed(None) class AutoFetcherStatusFactory(CommandHandlerFactory):
control_handler_class = AutoFetcherStatus
def _looped_search(self): command = "autofetcher-status"
d = defer.Deferred(None) short_help = "Check autofetcher status"
d.addCallback(lambda _: self._get_names())
d.addCallback(self._download_claims)
d.callback(None)
class AutoFetcherFactory(CommandHandlerFactory):
control_handler_class = AutoFetcher
command = "start-autofetching"
short_help = "Download all lbry files as they are published"
class BlockchainStatus(CommandHandler): class BlockchainStatus(CommandHandler):

View file

@ -4,6 +4,8 @@ import os.path
import argparse import argparse
from yapsy.PluginManager import PluginManager from yapsy.PluginManager import PluginManager
from twisted.internet import defer, threads, stdio, task, error from twisted.internet import defer, threads, stdio, task, error
from lbrynet.core.client.AutoDownloader import AutoFetcher
from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl from lbrynet.lbrynet_console.ConsoleControl import ConsoleControl
from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings
from lbrynet.lbryfilemanager.LBRYFileManager import LBRYFileManager from lbrynet.lbryfilemanager.LBRYFileManager import LBRYFileManager
@ -21,7 +23,8 @@ from lbrynet.lbryfile.StreamDescriptor import LBRYFileStreamType
from lbrynet.lbryfile.LBRYFileMetadataManager import DBLBRYFileMetadataManager, TempLBRYFileMetadataManager from lbrynet.lbryfile.LBRYFileMetadataManager import DBLBRYFileMetadataManager, TempLBRYFileMetadataManager
#from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager #from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager
from lbrynet.lbrynet_console.ControlHandlers import ApplicationStatusFactory, GetWalletBalancesFactory, ShutDownFactory from lbrynet.lbrynet_console.ControlHandlers import ApplicationStatusFactory, GetWalletBalancesFactory, ShutDownFactory
from lbrynet.lbrynet_console.ControlHandlers import AutoFetcherFactory, ImmediateAnnounceAllBlobsFactory from lbrynet.lbrynet_console.ControlHandlers import AutoFetcherStartFactory, AutoFetcherStopFactory
from lbrynet.lbrynet_console.ControlHandlers import ImmediateAnnounceAllBlobsFactory, AutoFetcherStatusFactory
from lbrynet.lbrynet_console.ControlHandlers import LBRYFileStatusFactory, DeleteLBRYFileChooserFactory from lbrynet.lbrynet_console.ControlHandlers import LBRYFileStatusFactory, DeleteLBRYFileChooserFactory
from lbrynet.lbrynet_console.ControlHandlers import ToggleLBRYFileRunningChooserFactory from lbrynet.lbrynet_console.ControlHandlers import ToggleLBRYFileRunningChooserFactory
from lbrynet.lbrynet_console.ControlHandlers import ModifyApplicationDefaultsFactory from lbrynet.lbrynet_console.ControlHandlers import ModifyApplicationDefaultsFactory
@ -89,6 +92,7 @@ class LBRYConsole():
self.sd_identifier = StreamDescriptorIdentifier() self.sd_identifier = StreamDescriptorIdentifier()
self.plugin_objects = [] self.plugin_objects = []
self.db_migration_revisions = None self.db_migration_revisions = None
self.autofetcher = None
def start(self): def start(self):
"""Initialize the session and restore everything to its saved state""" """Initialize the session and restore everything to its saved state"""
@ -100,6 +104,7 @@ class LBRYConsole():
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self._setup_lbry_file_manager()) d.addCallback(lambda _: self._setup_lbry_file_manager())
d.addCallback(lambda _: self._setup_lbry_file_opener()) d.addCallback(lambda _: self._setup_lbry_file_opener())
d.addCallback(lambda _: self._get_autofetcher()),
d.addCallback(lambda _: self._setup_control_handlers()) d.addCallback(lambda _: self._setup_control_handlers())
d.addCallback(lambda _: self._setup_query_handlers()) d.addCallback(lambda _: self._setup_query_handlers())
d.addCallback(lambda _: self._load_plugins()) d.addCallback(lambda _: self._load_plugins())
@ -108,6 +113,10 @@ class LBRYConsole():
d.addErrback(self._show_start_error) d.addErrback(self._show_start_error)
return d return d
def _get_autofetcher(self):
self.autofetcher = AutoFetcher(self.session, self.lbry_file_manager, self.lbry_file_metadata_manager,
self.session.wallet, self.sd_identifier)
def _show_start_error(self, error): def _show_start_error(self, error):
print error.getErrorMessage() print error.getErrorMessage()
log.error("An error occurred during start up: %s", error.getTraceback()) log.error("An error occurred during start up: %s", error.getTraceback())
@ -313,8 +322,9 @@ class LBRYConsole():
AddStreamFromHashFactory(self.sd_identifier, self.session), AddStreamFromHashFactory(self.sd_identifier, self.session),
StatusFactory(self, self.session.rate_limiter, self.lbry_file_manager, StatusFactory(self, self.session.rate_limiter, self.lbry_file_manager,
self.session.blob_manager, self.session.wallet if self.wallet_type == 'lbrycrd' else None), self.session.blob_manager, self.session.wallet if self.wallet_type == 'lbrycrd' else None),
AutoFetcherFactory(self.session, self.lbry_file_manager, self.lbry_file_metadata_manager, AutoFetcherStartFactory(self.autofetcher),
self.session.wallet, self.sd_identifier), AutoFetcherStopFactory(self.autofetcher),
AutoFetcherStatusFactory(self.autofetcher),
ImmediateAnnounceAllBlobsFactory(self.session.blob_manager) ImmediateAnnounceAllBlobsFactory(self.session.blob_manager)
] ]
self.add_control_handlers(handlers) self.add_control_handlers(handlers)