From 129d2687b95f5c7cba0383210d9aa3855809dc1d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 30 Apr 2018 13:31:09 -0400 Subject: [PATCH] download headers from s3 when more than 10 chunks behind --- CHANGELOG.md | 3 +- lbrynet/conf.py | 3 +- lbrynet/core/Wallet.py | 105 ++++++++++++++++++++++++++++++++---- lbrynet/txlbryum/factory.py | 3 ++ 4 files changed, 102 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a261709e0..49125aa34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,11 +44,12 @@ at anytime. * regenerate api keys on startup if the using authentication * support both positional and keyword args for api calls * `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results + * download blockchain headers from s3 before starting the wallet when the local height is more than `s3_headers_depth` (a config setting) blocks behind ### Added * virtual kademlia network and mock udp transport for dht integration tests * integration tests for bootstrapping the dht - * configurable `concurrent_announcers` setting + * configurable `concurrent_announcers` and `s3_headers_depth` settings * `peer_ping` command ### Removed diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 3edee1437..0be7a423e 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -294,7 +294,8 @@ ADJUSTABLE_SETTINGS = { 'use_keyring': (bool, False), 'wallet': (str, LBRYUM_WALLET), 'blockchain_name': (str, 'lbrycrd_main'), - 'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list) + 'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list), + 's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind } diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index a4fa5f4d8..889e7514f 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -1,3 +1,4 @@ +import os from collections import defaultdict, deque import datetime import logging @@ -5,7 +6,11 @@ from decimal import Decimal from zope.interface import implements from twisted.internet import threads, reactor, defer, task from twisted.python.failure import Failure +from twisted.python.threadpool import ThreadPool +from twisted._threads._ithreads import AlreadyQuit from twisted.internet.error import ConnectionAborted +from txrequests import Session as _TxRequestsSession +from requests import Session as requestsSession from lbryum import wallet as lbryum_wallet from lbryum.network import Network @@ -13,12 +18,14 @@ from lbryum.simple_config import SimpleConfig from lbryum.constants import COIN from lbryum.commands import Commands from lbryum.errors import InvalidPassword +from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbryschema.uri import parse_lbry_uri from lbryschema.claim import ClaimDict from lbryschema.error import DecodeError from lbryschema.decode import smart_decode +from lbrynet.txlbryum.factory import StratumClient from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.core.utils import DeferredDict from lbrynet.core.client.ClientRequest import ClientRequest @@ -29,6 +36,29 @@ from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError log = logging.getLogger(__name__) +class TxRequestsSession(_TxRequestsSession): + # Session from txrequests would throw AlreadyQuit errors, this catches them + def __init__(self, pool=None, minthreads=1, maxthreads=4, **kwargs): + requestsSession.__init__(self, **kwargs) # pylint: disable=non-parent-init-called + self.ownPool = False + if pool is None: + self.ownPool = True + pool = ThreadPool(minthreads=minthreads, maxthreads=maxthreads) + # unclosed ThreadPool leads to reactor hangs at shutdown + # this is a problem in many situation, so better enforce pool stop here + + def stop_pool(): + try: + pool.stop() + except AlreadyQuit: + pass + + reactor.addSystemEventTrigger("after", "shutdown", stop_pool) + self.pool = pool + if self.ownPool: + pool.start() + + class ReservedPoints(object): def __init__(self, identifier, amount): self.identifier = identifier @@ -86,18 +116,73 @@ class Wallet(object): self._batch_count = 20 self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) + @defer.inlineCallbacks + def fetch_headers_from_s3(self): + with TxRequestsSession() as s: + r = yield s.get(HEADERS_URL) + raw_headers = r.content + if not len(raw_headers) % HEADER_SIZE: # should be divisible by the header size + s3_height = (len(raw_headers) / HEADER_SIZE) - 1 + local_height = self.local_header_file_height() + if s3_height > local_height: + with open(os.path.join(self.config.path, "blockchain_headers"), "w") as headers_file: + headers_file.write(raw_headers) + log.info("updated headers from s3") + else: + log.warning("s3 is more out of date than we are") + else: + log.error("invalid size for headers from s3") + + def local_header_file_height(self): + headers_path = os.path.join(self.config.path, "blockchain_headers") + if os.path.isfile(headers_path): + return max((os.stat(headers_path).st_size / 112) - 1, 0) + return 0 + + @defer.inlineCallbacks + def get_remote_height(self, server, port): + connected = defer.Deferred() + client = StratumClient(connected) + reactor.connectTCP(server, port, client) + yield connected + remote_height = yield client.blockchain_block_get_server_height() + client.client.transport.loseConnection() + defer.returnValue(remote_height) + + @defer.inlineCallbacks + def should_download_headers_from_s3(self): + from lbrynet import conf + if conf.settings['blockchain_name'] != "lbrycrd_main": + defer.returnValue(False) + s3_headers_depth = conf.settings['s3_headers_depth'] + if not s3_headers_depth: + defer.returnValue(False) + local_height = self.local_header_file_height() + for server_url in self.config.get('default_servers'): + port = int(self.config.get('default_servers')[server_url]['t']) + try: + remote_height = yield self.get_remote_height(server_url, port) + log.debug("%s:%i remote height: %i, local height: %s", server_url, port, remote_height, local_height) + if remote_height > local_height + s3_headers_depth: + defer.returnValue(True) + except Exception as err: + log.warning("error requesting remote height from %s:%i - %s", server_url, port, err) + defer.returnValue(False) + + @defer.inlineCallbacks def start(self): + should_download_headers = yield self.should_download_headers_from_s3() + if should_download_headers: + try: + yield self.fetch_headers_from_s3() + except Exception as err: + log.error("failed to fetch headers from s3: %s", err) log.info("Starting wallet.") - - def start_manage(): - self.stopped = False - self.manage() - self._pending_claim_checker.start(30) - return True - - d = self._start() - d.addCallback(lambda _: start_manage()) - return d + yield self._start() + self.stopped = False + self.manage() + self._pending_claim_checker.start(30) + defer.returnValue(True) @staticmethod def log_stop_error(err): diff --git a/lbrynet/txlbryum/factory.py b/lbrynet/txlbryum/factory.py index 72af607d1..6c59d83a3 100644 --- a/lbrynet/txlbryum/factory.py +++ b/lbrynet/txlbryum/factory.py @@ -105,3 +105,6 @@ class StratumClient(ClientFactory): def blockchain_address_get_history(self, address): return self._rpc('blockchain.address.get_history', [address]) + + def blockchain_block_get_server_height(self): + return self._rpc('blockchain.block.get_server_height', [])