forked from LBRYCommunity/lbry-sdk
update remote height on demand
This commit is contained in:
parent
f109aa1f73
commit
a4d8508b40
5 changed files with 27 additions and 21 deletions
|
@ -5,7 +5,6 @@ import math
|
||||||
import binascii
|
import binascii
|
||||||
import typing
|
import typing
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from types import SimpleNamespace
|
|
||||||
import base58
|
import base58
|
||||||
|
|
||||||
from aioupnp import __version__ as aioupnp_version
|
from aioupnp import __version__ as aioupnp_version
|
||||||
|
@ -23,7 +22,6 @@ from lbry.extras.daemon.Component import Component
|
||||||
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
from lbry.wallet import LbryWalletManager
|
from lbry.wallet import LbryWalletManager
|
||||||
from lbry.wallet import Network
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -160,19 +158,10 @@ class HeadersComponent(Component):
|
||||||
return os.stat(self.headers_file).st_size
|
return os.stat(self.headers_file).st_size
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
async def get_remote_height(self):
|
async def get_download_height(self):
|
||||||
ledger = SimpleNamespace()
|
async with utils.aiohttp_request('HEAD', HEADERS_URL) as response:
|
||||||
ledger.config = {
|
log.warning(response)
|
||||||
'default_servers': self.conf.lbryum_servers,
|
return response.content_length // HEADER_SIZE
|
||||||
'data_path': self.conf.wallet_dir
|
|
||||||
}
|
|
||||||
net = Network(ledger)
|
|
||||||
first_connection = net.on_connected.first
|
|
||||||
asyncio.ensure_future(net.start()) # TODO: SKETCHY! it might be trapping a CancelledError and not raising it
|
|
||||||
await first_connection
|
|
||||||
remote_height = await net.get_server_height()
|
|
||||||
await net.stop()
|
|
||||||
return remote_height
|
|
||||||
|
|
||||||
async def should_download_headers_from_s3(self):
|
async def should_download_headers_from_s3(self):
|
||||||
if self.conf.blockchain_name != "lbrycrd_main":
|
if self.conf.blockchain_name != "lbrycrd_main":
|
||||||
|
@ -181,8 +170,9 @@ class HeadersComponent(Component):
|
||||||
s3_headers_depth = self.conf.s3_headers_depth
|
s3_headers_depth = self.conf.s3_headers_depth
|
||||||
if not s3_headers_depth:
|
if not s3_headers_depth:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
local_height = self.local_header_file_height()
|
local_height = self.local_header_file_height()
|
||||||
remote_height = await self.get_remote_height()
|
remote_height = await self.get_download_height()
|
||||||
log.info("remote height: %i, local height: %i", remote_height, local_height)
|
log.info("remote height: %i, local height: %i", remote_height, local_height)
|
||||||
if remote_height > (local_height + s3_headers_depth):
|
if remote_height > (local_height + s3_headers_depth):
|
||||||
return True
|
return True
|
||||||
|
@ -242,9 +232,9 @@ class WalletComponent(Component):
|
||||||
return self.wallet_manager
|
return self.wallet_manager
|
||||||
|
|
||||||
async def get_status(self):
|
async def get_status(self):
|
||||||
if self.wallet_manager and self.running:
|
if self.wallet_manager and self.wallet_manager.ledger.network.remote_height:
|
||||||
local_height = self.wallet_manager.ledger.headers.height
|
local_height = self.wallet_manager.ledger.headers.height
|
||||||
remote_height = await self.wallet_manager.ledger.network.get_server_height()
|
remote_height = self.wallet_manager.ledger.network.remote_height
|
||||||
best_hash = self.wallet_manager.get_best_blockhash()
|
best_hash = self.wallet_manager.get_best_blockhash()
|
||||||
return {
|
return {
|
||||||
'blocks': max(local_height, 0),
|
'blocks': max(local_height, 0),
|
||||||
|
|
|
@ -225,7 +225,7 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.assertFindsClaims([claim4, claim3, claim2], all_tags=['abc'], any_tags=['def', 'ghi'])
|
await self.assertFindsClaims([claim4, claim3, claim2], all_tags=['abc'], any_tags=['def', 'ghi'])
|
||||||
|
|
||||||
async def test_order_by(self):
|
async def test_order_by(self):
|
||||||
height = await self.ledger.network.get_server_height()
|
height = self.ledger.network.remote_height
|
||||||
claims = [await self.stream_create(f'claim{i}') for i in range(5)]
|
claims = [await self.stream_create(f'claim{i}') for i in range(5)]
|
||||||
|
|
||||||
await self.assertFindsClaims(claims, order_by=["^height"])
|
await self.assertFindsClaims(claims, order_by=["^height"])
|
||||||
|
|
|
@ -7,6 +7,15 @@ from torba.rpc import RPCSession
|
||||||
from torba.testcase import IntegrationTestCase, AsyncioTestCase
|
from torba.testcase import IntegrationTestCase, AsyncioTestCase
|
||||||
|
|
||||||
|
|
||||||
|
class NetworkTests(IntegrationTestCase):
|
||||||
|
|
||||||
|
async def test_remote_height_updated_automagically(self):
|
||||||
|
initial_height = self.ledger.network.remote_height
|
||||||
|
await self.blockchain.generate(1)
|
||||||
|
await self.ledger.network.on_header.first
|
||||||
|
self.assertEqual(self.ledger.network.remote_height, initial_height + 1)
|
||||||
|
|
||||||
|
|
||||||
class ReconnectTests(IntegrationTestCase):
|
class ReconnectTests(IntegrationTestCase):
|
||||||
|
|
||||||
VERBOSITY = logging.WARN
|
VERBOSITY = logging.WARN
|
|
@ -263,8 +263,8 @@ class BaseLedger(metaclass=LedgerRegistry):
|
||||||
|
|
||||||
async def join_network(self, *args):
|
async def join_network(self, *args):
|
||||||
log.info("Subscribing and updating accounts.")
|
log.info("Subscribing and updating accounts.")
|
||||||
await self.update_headers()
|
async with self._header_processing_lock:
|
||||||
await self.network.subscribe_headers()
|
await self.update_headers()
|
||||||
await self.subscribe_accounts()
|
await self.subscribe_accounts()
|
||||||
await self._update_tasks.done.wait()
|
await self._update_tasks.done.wait()
|
||||||
|
|
||||||
|
|
|
@ -62,6 +62,7 @@ class BaseNetwork:
|
||||||
self.client: ClientSession = None
|
self.client: ClientSession = None
|
||||||
self.session_pool: SessionPool = None
|
self.session_pool: SessionPool = None
|
||||||
self.running = False
|
self.running = False
|
||||||
|
self.remote_height: int = 0
|
||||||
|
|
||||||
self._on_connected_controller = StreamController()
|
self._on_connected_controller = StreamController()
|
||||||
self.on_connected = self._on_connected_controller.stream
|
self.on_connected = self._on_connected_controller.stream
|
||||||
|
@ -82,11 +83,13 @@ class BaseNetwork:
|
||||||
connect_timeout = self.config.get('connect_timeout', 6)
|
connect_timeout = self.config.get('connect_timeout', 6)
|
||||||
self.session_pool = SessionPool(network=self, timeout=connect_timeout)
|
self.session_pool = SessionPool(network=self, timeout=connect_timeout)
|
||||||
self.session_pool.start(self.config['default_servers'])
|
self.session_pool.start(self.config['default_servers'])
|
||||||
|
self.on_header.listen(self._update_remote_height)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
self.client = await self.pick_fastest_session()
|
self.client = await self.pick_fastest_session()
|
||||||
if self.is_connected:
|
if self.is_connected:
|
||||||
await self.ensure_server_version()
|
await self.ensure_server_version()
|
||||||
|
self._update_remote_height((await self.subscribe_headers(),))
|
||||||
log.info("Successfully connected to SPV wallet server: %s:%d", *self.client.server)
|
log.info("Successfully connected to SPV wallet server: %s:%d", *self.client.server)
|
||||||
self._on_connected_controller.add(True)
|
self._on_connected_controller.add(True)
|
||||||
await self.client.on_disconnected.first
|
await self.client.on_disconnected.first
|
||||||
|
@ -136,6 +139,10 @@ class BaseNetwork:
|
||||||
await session.send_request('server.banner')
|
await session.send_request('server.banner')
|
||||||
return session
|
return session
|
||||||
|
|
||||||
|
def _update_remote_height(self, header_args):
|
||||||
|
if header_args and header_args[0]:
|
||||||
|
self.remote_height = header_args[0]["height"]
|
||||||
|
|
||||||
def ensure_server_version(self, required='1.2'):
|
def ensure_server_version(self, required='1.2'):
|
||||||
return self.rpc('server.version', [__version__, required])
|
return self.rpc('server.version', [__version__, required])
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue