diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 7fd5a09e3..41b4191e7 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -285,6 +285,8 @@ class BaseLedger(metaclass=LedgerRegistry): first_connection = self.network.on_connected.first asyncio.ensure_future(self.network.start()) await first_connection + async with self._header_processing_lock: + await self.initial_headers_sync() await self.join_network() self.network.on_connected.listen(self.join_network) @@ -303,6 +305,18 @@ class BaseLedger(metaclass=LedgerRegistry): await self.db.close() await self.headers.close() + async def initial_headers_sync(self): + target = self.network.remote_height + current = len(self.headers) + get_chunk = partial(self.network.retriable_call, self.network.get_headers, count=2000) + chunks = [asyncio.ensure_future(get_chunk(height)) for height in range(current, target, 2000)] + for chunk in chunks: + headers = await chunk + if not headers: + continue + headers = headers['hex'] + await self.update_headers(height=len(self.headers), headers=headers, subscription_update=True) + async def update_headers(self, height=None, headers=None, subscription_update=False): rewound = 0 while True: diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index dfed4d800..cdf33dab2 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -173,9 +173,9 @@ class BaseNetwork: continue self.client = await self.session_pool.wait_for_fastest_session() log.info("Switching to SPV wallet server: %s:%d", *self.client.server) - self._on_connected_controller.add(True) try: self._update_remote_height((await self.subscribe_headers(),)) + self._on_connected_controller.add(True) log.info("Subscribed to headers: %s:%d", *self.client.server) except (asyncio.TimeoutError, ConnectionError): log.info("Switching to %s:%d timed out, closing and retrying.", *self.client.server) @@ -240,7 +240,8 @@ class BaseNetwork: return self.rpc('blockchain.transaction.get_merkle', [tx_hash, height], restricted) def get_headers(self, height, count=10000): - return self.rpc('blockchain.block.headers', [height, count]) + restricted = height >= self.remote_height - 100 + return self.rpc('blockchain.block.headers', [height, count], restricted) # --- Subscribes, history and broadcasts are always aimed towards the master client directly def get_history(self, address):