download headers from s3 when more than 10 chunks behind

This commit is contained in:
Jack Robison 2018-04-30 13:31:09 -04:00
parent f32861923e
commit 129d2687b9
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 102 additions and 12 deletions

View file

@ -44,11 +44,12 @@ at anytime.
* regenerate api keys on startup if the using authentication * regenerate api keys on startup if the using authentication
* support both positional and keyword args for api calls * support both positional and keyword args for api calls
* `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results * `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results
* download blockchain headers from s3 before starting the wallet when the local height is more than `s3_headers_depth` (a config setting) blocks behind
### Added ### Added
* virtual kademlia network and mock udp transport for dht integration tests * virtual kademlia network and mock udp transport for dht integration tests
* integration tests for bootstrapping the dht * integration tests for bootstrapping the dht
* configurable `concurrent_announcers` setting * configurable `concurrent_announcers` and `s3_headers_depth` settings
* `peer_ping` command * `peer_ping` command
### Removed ### Removed

View file

@ -294,7 +294,8 @@ ADJUSTABLE_SETTINGS = {
'use_keyring': (bool, False), 'use_keyring': (bool, False),
'wallet': (str, LBRYUM_WALLET), 'wallet': (str, LBRYUM_WALLET),
'blockchain_name': (str, 'lbrycrd_main'), 'blockchain_name': (str, 'lbrycrd_main'),
'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list) 'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list),
's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind
} }

View file

@ -1,3 +1,4 @@
import os
from collections import defaultdict, deque from collections import defaultdict, deque
import datetime import datetime
import logging import logging
@ -5,7 +6,11 @@ from decimal import Decimal
from zope.interface import implements from zope.interface import implements
from twisted.internet import threads, reactor, defer, task from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.python.threadpool import ThreadPool
from twisted._threads._ithreads import AlreadyQuit
from twisted.internet.error import ConnectionAborted from twisted.internet.error import ConnectionAborted
from txrequests import Session as _TxRequestsSession
from requests import Session as requestsSession
from lbryum import wallet as lbryum_wallet from lbryum import wallet as lbryum_wallet
from lbryum.network import Network from lbryum.network import Network
@ -13,12 +18,14 @@ from lbryum.simple_config import SimpleConfig
from lbryum.constants import COIN from lbryum.constants import COIN
from lbryum.commands import Commands from lbryum.commands import Commands
from lbryum.errors import InvalidPassword from lbryum.errors import InvalidPassword
from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbryschema.uri import parse_lbry_uri from lbryschema.uri import parse_lbry_uri
from lbryschema.claim import ClaimDict from lbryschema.claim import ClaimDict
from lbryschema.error import DecodeError from lbryschema.error import DecodeError
from lbryschema.decode import smart_decode from lbryschema.decode import smart_decode
from lbrynet.txlbryum.factory import StratumClient
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.utils import DeferredDict from lbrynet.core.utils import DeferredDict
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
@ -29,6 +36,29 @@ from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class TxRequestsSession(_TxRequestsSession):
# Session from txrequests would throw AlreadyQuit errors, this catches them
def __init__(self, pool=None, minthreads=1, maxthreads=4, **kwargs):
requestsSession.__init__(self, **kwargs) # pylint: disable=non-parent-init-called
self.ownPool = False
if pool is None:
self.ownPool = True
pool = ThreadPool(minthreads=minthreads, maxthreads=maxthreads)
# unclosed ThreadPool leads to reactor hangs at shutdown
# this is a problem in many situation, so better enforce pool stop here
def stop_pool():
try:
pool.stop()
except AlreadyQuit:
pass
reactor.addSystemEventTrigger("after", "shutdown", stop_pool)
self.pool = pool
if self.ownPool:
pool.start()
class ReservedPoints(object): class ReservedPoints(object):
def __init__(self, identifier, amount): def __init__(self, identifier, amount):
self.identifier = identifier self.identifier = identifier
@ -86,18 +116,73 @@ class Wallet(object):
self._batch_count = 20 self._batch_count = 20
self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims)
@defer.inlineCallbacks
def fetch_headers_from_s3(self):
with TxRequestsSession() as s:
r = yield s.get(HEADERS_URL)
raw_headers = r.content
if not len(raw_headers) % HEADER_SIZE: # should be divisible by the header size
s3_height = (len(raw_headers) / HEADER_SIZE) - 1
local_height = self.local_header_file_height()
if s3_height > local_height:
with open(os.path.join(self.config.path, "blockchain_headers"), "w") as headers_file:
headers_file.write(raw_headers)
log.info("updated headers from s3")
else:
log.warning("s3 is more out of date than we are")
else:
log.error("invalid size for headers from s3")
def local_header_file_height(self):
headers_path = os.path.join(self.config.path, "blockchain_headers")
if os.path.isfile(headers_path):
return max((os.stat(headers_path).st_size / 112) - 1, 0)
return 0
@defer.inlineCallbacks
def get_remote_height(self, server, port):
connected = defer.Deferred()
client = StratumClient(connected)
reactor.connectTCP(server, port, client)
yield connected
remote_height = yield client.blockchain_block_get_server_height()
client.client.transport.loseConnection()
defer.returnValue(remote_height)
@defer.inlineCallbacks
def should_download_headers_from_s3(self):
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
defer.returnValue(False)
s3_headers_depth = conf.settings['s3_headers_depth']
if not s3_headers_depth:
defer.returnValue(False)
local_height = self.local_header_file_height()
for server_url in self.config.get('default_servers'):
port = int(self.config.get('default_servers')[server_url]['t'])
try:
remote_height = yield self.get_remote_height(server_url, port)
log.debug("%s:%i remote height: %i, local height: %s", server_url, port, remote_height, local_height)
if remote_height > local_height + s3_headers_depth:
defer.returnValue(True)
except Exception as err:
log.warning("error requesting remote height from %s:%i - %s", server_url, port, err)
defer.returnValue(False)
@defer.inlineCallbacks
def start(self): def start(self):
should_download_headers = yield self.should_download_headers_from_s3()
if should_download_headers:
try:
yield self.fetch_headers_from_s3()
except Exception as err:
log.error("failed to fetch headers from s3: %s", err)
log.info("Starting wallet.") log.info("Starting wallet.")
yield self._start()
def start_manage(): self.stopped = False
self.stopped = False self.manage()
self.manage() self._pending_claim_checker.start(30)
self._pending_claim_checker.start(30) defer.returnValue(True)
return True
d = self._start()
d.addCallback(lambda _: start_manage())
return d
@staticmethod @staticmethod
def log_stop_error(err): def log_stop_error(err):

View file

@ -105,3 +105,6 @@ class StratumClient(ClientFactory):
def blockchain_address_get_history(self, address): def blockchain_address_get_history(self, address):
return self._rpc('blockchain.address.get_history', [address]) return self._rpc('blockchain.address.get_history', [address])
def blockchain_block_get_server_height(self):
return self._rpc('blockchain.block.get_server_height', [])