From 129d2687b95f5c7cba0383210d9aa3855809dc1d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 30 Apr 2018 13:31:09 -0400 Subject: [PATCH 1/2] download headers from s3 when more than 10 chunks behind --- CHANGELOG.md | 3 +- lbrynet/conf.py | 3 +- lbrynet/core/Wallet.py | 105 ++++++++++++++++++++++++++++++++---- lbrynet/txlbryum/factory.py | 3 ++ 4 files changed, 102 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a261709e0..49125aa34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,11 +44,12 @@ at anytime. * regenerate api keys on startup if the using authentication * support both positional and keyword args for api calls * `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results + * download blockchain headers from s3 before starting the wallet when the local height is more than `s3_headers_depth` (a config setting) blocks behind ### Added * virtual kademlia network and mock udp transport for dht integration tests * integration tests for bootstrapping the dht - * configurable `concurrent_announcers` setting + * configurable `concurrent_announcers` and `s3_headers_depth` settings * `peer_ping` command ### Removed diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 3edee1437..0be7a423e 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -294,7 +294,8 @@ ADJUSTABLE_SETTINGS = { 'use_keyring': (bool, False), 'wallet': (str, LBRYUM_WALLET), 'blockchain_name': (str, 'lbrycrd_main'), - 'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list) + 'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list), + 's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind } diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index a4fa5f4d8..889e7514f 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -1,3 +1,4 @@ +import os from collections import defaultdict, deque import datetime import logging @@ -5,7 +6,11 @@ from decimal import Decimal from zope.interface import implements from twisted.internet import threads, reactor, defer, task from twisted.python.failure import Failure +from twisted.python.threadpool import ThreadPool +from twisted._threads._ithreads import AlreadyQuit from twisted.internet.error import ConnectionAborted +from txrequests import Session as _TxRequestsSession +from requests import Session as requestsSession from lbryum import wallet as lbryum_wallet from lbryum.network import Network @@ -13,12 +18,14 @@ from lbryum.simple_config import SimpleConfig from lbryum.constants import COIN from lbryum.commands import Commands from lbryum.errors import InvalidPassword +from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbryschema.uri import parse_lbry_uri from lbryschema.claim import ClaimDict from lbryschema.error import DecodeError from lbryschema.decode import smart_decode +from lbrynet.txlbryum.factory import StratumClient from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.core.utils import DeferredDict from lbrynet.core.client.ClientRequest import ClientRequest @@ -29,6 +36,29 @@ from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError log = logging.getLogger(__name__) +class TxRequestsSession(_TxRequestsSession): + # Session from txrequests would throw AlreadyQuit errors, this catches them + def __init__(self, pool=None, minthreads=1, maxthreads=4, **kwargs): + requestsSession.__init__(self, **kwargs) # pylint: disable=non-parent-init-called + self.ownPool = False + if pool is None: + self.ownPool = True + pool = ThreadPool(minthreads=minthreads, maxthreads=maxthreads) + # unclosed ThreadPool leads to reactor hangs at shutdown + # this is a problem in many situation, so better enforce pool stop here + + def stop_pool(): + try: + pool.stop() + except AlreadyQuit: + pass + + reactor.addSystemEventTrigger("after", "shutdown", stop_pool) + self.pool = pool + if self.ownPool: + pool.start() + + class ReservedPoints(object): def __init__(self, identifier, amount): self.identifier = identifier @@ -86,18 +116,73 @@ class Wallet(object): self._batch_count = 20 self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) + @defer.inlineCallbacks + def fetch_headers_from_s3(self): + with TxRequestsSession() as s: + r = yield s.get(HEADERS_URL) + raw_headers = r.content + if not len(raw_headers) % HEADER_SIZE: # should be divisible by the header size + s3_height = (len(raw_headers) / HEADER_SIZE) - 1 + local_height = self.local_header_file_height() + if s3_height > local_height: + with open(os.path.join(self.config.path, "blockchain_headers"), "w") as headers_file: + headers_file.write(raw_headers) + log.info("updated headers from s3") + else: + log.warning("s3 is more out of date than we are") + else: + log.error("invalid size for headers from s3") + + def local_header_file_height(self): + headers_path = os.path.join(self.config.path, "blockchain_headers") + if os.path.isfile(headers_path): + return max((os.stat(headers_path).st_size / 112) - 1, 0) + return 0 + + @defer.inlineCallbacks + def get_remote_height(self, server, port): + connected = defer.Deferred() + client = StratumClient(connected) + reactor.connectTCP(server, port, client) + yield connected + remote_height = yield client.blockchain_block_get_server_height() + client.client.transport.loseConnection() + defer.returnValue(remote_height) + + @defer.inlineCallbacks + def should_download_headers_from_s3(self): + from lbrynet import conf + if conf.settings['blockchain_name'] != "lbrycrd_main": + defer.returnValue(False) + s3_headers_depth = conf.settings['s3_headers_depth'] + if not s3_headers_depth: + defer.returnValue(False) + local_height = self.local_header_file_height() + for server_url in self.config.get('default_servers'): + port = int(self.config.get('default_servers')[server_url]['t']) + try: + remote_height = yield self.get_remote_height(server_url, port) + log.debug("%s:%i remote height: %i, local height: %s", server_url, port, remote_height, local_height) + if remote_height > local_height + s3_headers_depth: + defer.returnValue(True) + except Exception as err: + log.warning("error requesting remote height from %s:%i - %s", server_url, port, err) + defer.returnValue(False) + + @defer.inlineCallbacks def start(self): + should_download_headers = yield self.should_download_headers_from_s3() + if should_download_headers: + try: + yield self.fetch_headers_from_s3() + except Exception as err: + log.error("failed to fetch headers from s3: %s", err) log.info("Starting wallet.") - - def start_manage(): - self.stopped = False - self.manage() - self._pending_claim_checker.start(30) - return True - - d = self._start() - d.addCallback(lambda _: start_manage()) - return d + yield self._start() + self.stopped = False + self.manage() + self._pending_claim_checker.start(30) + defer.returnValue(True) @staticmethod def log_stop_error(err): diff --git a/lbrynet/txlbryum/factory.py b/lbrynet/txlbryum/factory.py index 72af607d1..6c59d83a3 100644 --- a/lbrynet/txlbryum/factory.py +++ b/lbrynet/txlbryum/factory.py @@ -105,3 +105,6 @@ class StratumClient(ClientFactory): def blockchain_address_get_history(self, address): return self._rpc('blockchain.address.get_history', [address]) + + def blockchain_block_get_server_height(self): + return self._rpc('blockchain.block.get_server_height', []) From f42733ecba38cb5d0b9d23a0f4869b125ed0e8b3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 2 May 2018 14:45:01 -0400 Subject: [PATCH 2/2] fix tests --- lbrynet/tests/functional/test_misc.py | 15 +++++++-------- lbrynet/tests/mocks.py | 14 +++++--------- .../unit/core/client/test_ConnectionManager.py | 2 +- .../unit/core/server/test_DHTHashAnnouncer.py | 2 +- lbrynet/tests/unit/core/test_BlobManager.py | 2 +- lbrynet/tests/unit/database/test_SQLiteStorage.py | 2 +- .../tests/unit/lbrynet_daemon/auth/test_server.py | 3 ++- 7 files changed, 18 insertions(+), 22 deletions(-) diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index 355fa1d14..e806da5c2 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -22,7 +22,6 @@ from twisted.internet import defer, threads, task from twisted.trial.unittest import TestCase from twisted.python.failure import Failure -from lbrynet.dht.node import Node from lbrynet.core.PeerManager import PeerManager from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory @@ -115,7 +114,7 @@ class LbryUploader(object): node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") + dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1") self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) if self.ul_rate_limit is not None: self.session.rate_limiter.set_ul_limit(self.ul_rate_limit) @@ -207,7 +206,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, db_dir, blob_dir = mk_db_and_blob_dir() session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd" + str(n), dht_node_port=4446, + node_id="abcd" + str(n), dht_node_port=4446, dht_node_class=FakeNode, peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, @@ -315,7 +314,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero db_dir, blob_dir = mk_db_and_blob_dir() session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh", - peer_finder=peer_finder, hash_announcer=hash_announcer, + peer_finder=peer_finder, hash_announcer=hash_announcer, dht_node_class=FakeNode, blob_dir=blob_dir, peer_port=peer_port, dht_node_port=4446, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, @@ -497,7 +496,7 @@ class TestTransfer(TestCase): blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") + dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1") self.lbry_file_manager = EncryptedFileManager( self.session, sd_identifier) @@ -582,7 +581,7 @@ class TestTransfer(TestCase): self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, + blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, dht_node_class=FakeNode, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") @@ -662,7 +661,7 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, + node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, dht_node_class=FakeNode, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, @@ -769,7 +768,7 @@ class TestTransfer(TestCase): sd_identifier = StreamDescriptorIdentifier() db_dir, blob_dir = mk_db_and_blob_dir() - self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, + self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, dht_node_class=FakeNode, node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, diff --git a/lbrynet/tests/mocks.py b/lbrynet/tests/mocks.py index d4c52a2cd..d2bce3730 100644 --- a/lbrynet/tests/mocks.py +++ b/lbrynet/tests/mocks.py @@ -9,6 +9,7 @@ from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import RequestCanceledError from lbrynet.core import BlobAvailability from lbrynet.core.utils import generate_id +from lbrynet.dht.node import Node as RealNode from lbrynet.daemon import ExchangeRateManager as ERM from lbrynet import conf from util import debug_kademlia_packet @@ -24,15 +25,9 @@ class FakeLBRYFile(object): self.file_name = 'fake_lbry_file' -class Node(object): - def __init__(self, peer_finder=None, peer_manager=None, dht_node_port=None, peer_port=3333, **kwargs): - self.peer_finder = peer_finder - self.peer_manager = peer_manager - self.peerPort = peer_port - self.udpPort = dht_node_port - - def joinNetwork(self, *args): - return defer.succeed(True) +class Node(RealNode): + def joinNetwork(self, known_node_addresses=None): + return defer.succeed(None) def stop(self): return defer.succeed(None) @@ -392,6 +387,7 @@ create_stream_sd_file = { def mock_conf_settings(obj, settings={}): + conf.initialize_settings(False) original_settings = conf.settings conf.settings = conf.Config(conf.FIXED_SETTINGS, conf.ADJUSTABLE_SETTINGS) conf.settings.installation_id = conf.settings.get_installation_id() diff --git a/lbrynet/tests/unit/core/client/test_ConnectionManager.py b/lbrynet/tests/unit/core/client/test_ConnectionManager.py index 107afa997..61f177127 100644 --- a/lbrynet/tests/unit/core/client/test_ConnectionManager.py +++ b/lbrynet/tests/unit/core/client/test_ConnectionManager.py @@ -116,7 +116,7 @@ class MocServerProtocolFactory(ServerFactory): class TestIntegrationConnectionManager(unittest.TestCase): def setUp(self): - conf.initialize_settings() + conf.initialize_settings(False) self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT) self.downloader = MocDownloader() diff --git a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py index 0d3999c0b..2f67d5567 100644 --- a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py +++ b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -34,7 +34,7 @@ class DHTHashAnnouncerTest(unittest.TestCase): @defer.inlineCallbacks def setUp(self): from lbrynet.conf import initialize_settings - initialize_settings() + initialize_settings(False) self.num_blobs = 10 self.blobs_to_announce = [] for i in range(0, self.num_blobs): diff --git a/lbrynet/tests/unit/core/test_BlobManager.py b/lbrynet/tests/unit/core/test_BlobManager.py index 48b6df982..7526ee2fc 100644 --- a/lbrynet/tests/unit/core/test_BlobManager.py +++ b/lbrynet/tests/unit/core/test_BlobManager.py @@ -17,7 +17,7 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj class BlobManagerTest(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - conf.initialize_settings() + conf.initialize_settings(False) self.blob_dir = tempfile.mkdtemp() self.db_dir = tempfile.mkdtemp() self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(self.db_dir)) diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index 7cd69c3ff..5bfe72988 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -85,7 +85,7 @@ class StorageTest(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - conf.initialize_settings() + conf.initialize_settings(False) self.db_dir = tempfile.mkdtemp() self.storage = SQLiteStorage(self.db_dir) yield self.storage.setup() diff --git a/lbrynet/tests/unit/lbrynet_daemon/auth/test_server.py b/lbrynet/tests/unit/lbrynet_daemon/auth/test_server.py index ea1cefb55..80fa4aa7c 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/auth/test_server.py +++ b/lbrynet/tests/unit/lbrynet_daemon/auth/test_server.py @@ -1,6 +1,6 @@ import mock from twisted.trial import unittest - +from lbrynet import conf from lbrynet.tests.mocks import mock_conf_settings from lbrynet.daemon.auth import server @@ -10,6 +10,7 @@ class AuthJSONRPCServerTest(unittest.TestCase): # and add useful general utilities like this # onto it. def setUp(self): + conf.initialize_settings(False) self.server = server.AuthJSONRPCServer(use_authentication=False) def test_get_server_port(self):