forked from LBRYCommunity/lbry-sdk
faster header download
This commit is contained in:
parent
817869b915
commit
c30db15efa
2 changed files with 17 additions and 2 deletions
|
@ -285,6 +285,8 @@ class BaseLedger(metaclass=LedgerRegistry):
|
||||||
first_connection = self.network.on_connected.first
|
first_connection = self.network.on_connected.first
|
||||||
asyncio.ensure_future(self.network.start())
|
asyncio.ensure_future(self.network.start())
|
||||||
await first_connection
|
await first_connection
|
||||||
|
async with self._header_processing_lock:
|
||||||
|
await self.initial_headers_sync()
|
||||||
await self.join_network()
|
await self.join_network()
|
||||||
self.network.on_connected.listen(self.join_network)
|
self.network.on_connected.listen(self.join_network)
|
||||||
|
|
||||||
|
@ -303,6 +305,18 @@ class BaseLedger(metaclass=LedgerRegistry):
|
||||||
await self.db.close()
|
await self.db.close()
|
||||||
await self.headers.close()
|
await self.headers.close()
|
||||||
|
|
||||||
|
async def initial_headers_sync(self):
|
||||||
|
target = self.network.remote_height
|
||||||
|
current = len(self.headers)
|
||||||
|
get_chunk = partial(self.network.retriable_call, self.network.get_headers, count=2000)
|
||||||
|
chunks = [asyncio.ensure_future(get_chunk(height)) for height in range(current, target, 2000)]
|
||||||
|
for chunk in chunks:
|
||||||
|
headers = await chunk
|
||||||
|
if not headers:
|
||||||
|
continue
|
||||||
|
headers = headers['hex']
|
||||||
|
await self.update_headers(height=len(self.headers), headers=headers, subscription_update=True)
|
||||||
|
|
||||||
async def update_headers(self, height=None, headers=None, subscription_update=False):
|
async def update_headers(self, height=None, headers=None, subscription_update=False):
|
||||||
rewound = 0
|
rewound = 0
|
||||||
while True:
|
while True:
|
||||||
|
|
|
@ -173,9 +173,9 @@ class BaseNetwork:
|
||||||
continue
|
continue
|
||||||
self.client = await self.session_pool.wait_for_fastest_session()
|
self.client = await self.session_pool.wait_for_fastest_session()
|
||||||
log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
|
log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
|
||||||
self._on_connected_controller.add(True)
|
|
||||||
try:
|
try:
|
||||||
self._update_remote_height((await self.subscribe_headers(),))
|
self._update_remote_height((await self.subscribe_headers(),))
|
||||||
|
self._on_connected_controller.add(True)
|
||||||
log.info("Subscribed to headers: %s:%d", *self.client.server)
|
log.info("Subscribed to headers: %s:%d", *self.client.server)
|
||||||
except (asyncio.TimeoutError, ConnectionError):
|
except (asyncio.TimeoutError, ConnectionError):
|
||||||
log.info("Switching to %s:%d timed out, closing and retrying.", *self.client.server)
|
log.info("Switching to %s:%d timed out, closing and retrying.", *self.client.server)
|
||||||
|
@ -240,7 +240,8 @@ class BaseNetwork:
|
||||||
return self.rpc('blockchain.transaction.get_merkle', [tx_hash, height], restricted)
|
return self.rpc('blockchain.transaction.get_merkle', [tx_hash, height], restricted)
|
||||||
|
|
||||||
def get_headers(self, height, count=10000):
|
def get_headers(self, height, count=10000):
|
||||||
return self.rpc('blockchain.block.headers', [height, count])
|
restricted = height >= self.remote_height - 100
|
||||||
|
return self.rpc('blockchain.block.headers', [height, count], restricted)
|
||||||
|
|
||||||
# --- Subscribes, history and broadcasts are always aimed towards the master client directly
|
# --- Subscribes, history and broadcasts are always aimed towards the master client directly
|
||||||
def get_history(self, address):
|
def get_history(self, address):
|
||||||
|
|
Loading…
Reference in a new issue