Merge branch 'faster-headers'

This commit is contained in:
Jack Robison 2018-05-02 15:08:38 -04:00
commit 28386e8935
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
11 changed files with 120 additions and 34 deletions

View file

@ -44,11 +44,12 @@ at anytime.
* regenerate api keys on startup if the using authentication
* support both positional and keyword args for api calls
* `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results
* download blockchain headers from s3 before starting the wallet when the local height is more than `s3_headers_depth` (a config setting) blocks behind
### Added
* virtual kademlia network and mock udp transport for dht integration tests
* integration tests for bootstrapping the dht
* configurable `concurrent_announcers` setting
* configurable `concurrent_announcers` and `s3_headers_depth` settings
* `peer_ping` command
### Removed

View file

@ -294,7 +294,8 @@ ADJUSTABLE_SETTINGS = {
'use_keyring': (bool, False),
'wallet': (str, LBRYUM_WALLET),
'blockchain_name': (str, 'lbrycrd_main'),
'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list)
'lbryum_servers': (list, [('lbryum8.lbry.io', 50001), ('lbryum9.lbry.io', 50001)], server_list),
's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind
}

View file

@ -1,3 +1,4 @@
import os
from collections import defaultdict, deque
import datetime
import logging
@ -5,7 +6,11 @@ from decimal import Decimal
from zope.interface import implements
from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure
from twisted.python.threadpool import ThreadPool
from twisted._threads._ithreads import AlreadyQuit
from twisted.internet.error import ConnectionAborted
from txrequests import Session as _TxRequestsSession
from requests import Session as requestsSession
from lbryum import wallet as lbryum_wallet
from lbryum.network import Network
@ -13,12 +18,14 @@ from lbryum.simple_config import SimpleConfig
from lbryum.constants import COIN
from lbryum.commands import Commands
from lbryum.errors import InvalidPassword
from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbryschema.uri import parse_lbry_uri
from lbryschema.claim import ClaimDict
from lbryschema.error import DecodeError
from lbryschema.decode import smart_decode
from lbrynet.txlbryum.factory import StratumClient
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.utils import DeferredDict
from lbrynet.core.client.ClientRequest import ClientRequest
@ -29,6 +36,29 @@ from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError
log = logging.getLogger(__name__)
class TxRequestsSession(_TxRequestsSession):
# Session from txrequests would throw AlreadyQuit errors, this catches them
def __init__(self, pool=None, minthreads=1, maxthreads=4, **kwargs):
requestsSession.__init__(self, **kwargs) # pylint: disable=non-parent-init-called
self.ownPool = False
if pool is None:
self.ownPool = True
pool = ThreadPool(minthreads=minthreads, maxthreads=maxthreads)
# unclosed ThreadPool leads to reactor hangs at shutdown
# this is a problem in many situation, so better enforce pool stop here
def stop_pool():
try:
pool.stop()
except AlreadyQuit:
pass
reactor.addSystemEventTrigger("after", "shutdown", stop_pool)
self.pool = pool
if self.ownPool:
pool.start()
class ReservedPoints(object):
def __init__(self, identifier, amount):
self.identifier = identifier
@ -86,18 +116,73 @@ class Wallet(object):
self._batch_count = 20
self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims)
@defer.inlineCallbacks
def fetch_headers_from_s3(self):
with TxRequestsSession() as s:
r = yield s.get(HEADERS_URL)
raw_headers = r.content
if not len(raw_headers) % HEADER_SIZE: # should be divisible by the header size
s3_height = (len(raw_headers) / HEADER_SIZE) - 1
local_height = self.local_header_file_height()
if s3_height > local_height:
with open(os.path.join(self.config.path, "blockchain_headers"), "w") as headers_file:
headers_file.write(raw_headers)
log.info("updated headers from s3")
else:
log.warning("s3 is more out of date than we are")
else:
log.error("invalid size for headers from s3")
def local_header_file_height(self):
headers_path = os.path.join(self.config.path, "blockchain_headers")
if os.path.isfile(headers_path):
return max((os.stat(headers_path).st_size / 112) - 1, 0)
return 0
@defer.inlineCallbacks
def get_remote_height(self, server, port):
connected = defer.Deferred()
client = StratumClient(connected)
reactor.connectTCP(server, port, client)
yield connected
remote_height = yield client.blockchain_block_get_server_height()
client.client.transport.loseConnection()
defer.returnValue(remote_height)
@defer.inlineCallbacks
def should_download_headers_from_s3(self):
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
defer.returnValue(False)
s3_headers_depth = conf.settings['s3_headers_depth']
if not s3_headers_depth:
defer.returnValue(False)
local_height = self.local_header_file_height()
for server_url in self.config.get('default_servers'):
port = int(self.config.get('default_servers')[server_url]['t'])
try:
remote_height = yield self.get_remote_height(server_url, port)
log.debug("%s:%i remote height: %i, local height: %s", server_url, port, remote_height, local_height)
if remote_height > local_height + s3_headers_depth:
defer.returnValue(True)
except Exception as err:
log.warning("error requesting remote height from %s:%i - %s", server_url, port, err)
defer.returnValue(False)
@defer.inlineCallbacks
def start(self):
should_download_headers = yield self.should_download_headers_from_s3()
if should_download_headers:
try:
yield self.fetch_headers_from_s3()
except Exception as err:
log.error("failed to fetch headers from s3: %s", err)
log.info("Starting wallet.")
def start_manage():
self.stopped = False
self.manage()
self._pending_claim_checker.start(30)
return True
d = self._start()
d.addCallback(lambda _: start_manage())
return d
yield self._start()
self.stopped = False
self.manage()
self._pending_claim_checker.start(30)
defer.returnValue(True)
@staticmethod
def log_stop_error(err):

View file

@ -22,7 +22,6 @@ from twisted.internet import defer, threads, task
from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure
from lbrynet.dht.node import Node
from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
@ -115,7 +114,7 @@ class LbryUploader(object):
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1")
self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
if self.ul_rate_limit is not None:
self.session.rate_limiter.set_ul_limit(self.ul_rate_limit)
@ -207,7 +206,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd" + str(n), dht_node_port=4446,
node_id="abcd" + str(n), dht_node_port=4446, dht_node_class=FakeNode,
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
@ -315,7 +314,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer,
peer_finder=peer_finder, hash_announcer=hash_announcer, dht_node_class=FakeNode,
blob_dir=blob_dir, peer_port=peer_port, dht_node_port=4446,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
@ -497,7 +496,7 @@ class TestTransfer(TestCase):
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1")
self.lbry_file_manager = EncryptedFileManager(
self.session, sd_identifier)
@ -582,7 +581,7 @@ class TestTransfer(TestCase):
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445,
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, dht_node_class=FakeNode,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1")
@ -662,7 +661,7 @@ class TestTransfer(TestCase):
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, dht_node_class=FakeNode,
hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
@ -769,7 +768,7 @@ class TestTransfer(TestCase):
sd_identifier = StreamDescriptorIdentifier()
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, dht_node_class=FakeNode,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445,
hash_announcer=hash_announcer, blob_dir=blob_dir,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter,

View file

@ -9,6 +9,7 @@ from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import RequestCanceledError
from lbrynet.core import BlobAvailability
from lbrynet.core.utils import generate_id
from lbrynet.dht.node import Node as RealNode
from lbrynet.daemon import ExchangeRateManager as ERM
from lbrynet import conf
from util import debug_kademlia_packet
@ -24,15 +25,9 @@ class FakeLBRYFile(object):
self.file_name = 'fake_lbry_file'
class Node(object):
def __init__(self, peer_finder=None, peer_manager=None, dht_node_port=None, peer_port=3333, **kwargs):
self.peer_finder = peer_finder
self.peer_manager = peer_manager
self.peerPort = peer_port
self.udpPort = dht_node_port
def joinNetwork(self, *args):
return defer.succeed(True)
class Node(RealNode):
def joinNetwork(self, known_node_addresses=None):
return defer.succeed(None)
def stop(self):
return defer.succeed(None)
@ -392,6 +387,7 @@ create_stream_sd_file = {
def mock_conf_settings(obj, settings={}):
conf.initialize_settings(False)
original_settings = conf.settings
conf.settings = conf.Config(conf.FIXED_SETTINGS, conf.ADJUSTABLE_SETTINGS)
conf.settings.installation_id = conf.settings.get_installation_id()

View file

@ -116,7 +116,7 @@ class MocServerProtocolFactory(ServerFactory):
class TestIntegrationConnectionManager(unittest.TestCase):
def setUp(self):
conf.initialize_settings()
conf.initialize_settings(False)
self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT)
self.downloader = MocDownloader()

View file

@ -34,7 +34,7 @@ class DHTHashAnnouncerTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
from lbrynet.conf import initialize_settings
initialize_settings()
initialize_settings(False)
self.num_blobs = 10
self.blobs_to_announce = []
for i in range(0, self.num_blobs):

View file

@ -17,7 +17,7 @@ from lbrynet.core.cryptoutils import get_lbry_hash_obj
class BlobManagerTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
conf.initialize_settings()
conf.initialize_settings(False)
self.blob_dir = tempfile.mkdtemp()
self.db_dir = tempfile.mkdtemp()
self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(self.db_dir))

View file

@ -85,7 +85,7 @@ class StorageTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
conf.initialize_settings()
conf.initialize_settings(False)
self.db_dir = tempfile.mkdtemp()
self.storage = SQLiteStorage(self.db_dir)
yield self.storage.setup()

View file

@ -1,6 +1,6 @@
import mock
from twisted.trial import unittest
from lbrynet import conf
from lbrynet.tests.mocks import mock_conf_settings
from lbrynet.daemon.auth import server
@ -10,6 +10,7 @@ class AuthJSONRPCServerTest(unittest.TestCase):
# and add useful general utilities like this
# onto it.
def setUp(self):
conf.initialize_settings(False)
self.server = server.AuthJSONRPCServer(use_authentication=False)
def test_get_server_port(self):

View file

@ -105,3 +105,6 @@ class StratumClient(ClientFactory):
def blockchain_address_get_history(self, address):
return self._rpc('blockchain.address.get_history', [address])
def blockchain_block_get_server_height(self):
return self._rpc('blockchain.block.get_server_height', [])