From 38b108752e49661222573accfba3d1328fc4e182 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Jan 2020 23:05:49 -0500 Subject: [PATCH] batched blockchain.address.subscribe --- lbry/wallet/ledger.py | 34 ++++++++++++++++++++++++++-------- lbry/wallet/network.py | 5 +++-- lbry/wallet/server/session.py | 10 +++++++--- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 314903eea..a3fbdf257 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -425,6 +425,7 @@ class Ledger(metaclass=LedgerRegistry): async def subscribe_accounts(self): if self.network.is_connected and self.accounts: + log.info("Subscribe to %i accounts", len(self.accounts)) await asyncio.wait([ self.subscribe_account(a) for a in self.accounts ]) @@ -444,15 +445,32 @@ class Ledger(metaclass=LedgerRegistry): AddressesGeneratedEvent(address_manager, addresses) ) - async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str]): + async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str], batch_size: int = 1000): if self.network.is_connected and addresses: - await asyncio.wait([ - self.subscribe_address(address_manager, address) for address in addresses - ]) - - async def subscribe_address(self, address_manager: AddressManager, address: str): - remote_status = await self.network.subscribe_address(address) - self._update_tasks.add(self.update_history(address, remote_status, address_manager)) + addresses_remaining = list(addresses) + retries = 0 + while addresses_remaining: + batch = addresses_remaining[:batch_size] + try: + results = await self.network.rpc('blockchain.address.subscribe', batch, True) + for address, remote_status in zip(batch, results): + self._update_tasks.add(self.update_history(address, remote_status, address_manager)) + retries = 0 + addresses_remaining = addresses_remaining[batch_size:] + log.info("subscribed to %i/%i addresses", len(addresses) - len(addresses_remaining), len(addresses)) + except asyncio.TimeoutError: + if retries >= 3: + log.warning( + "timed out subscribing to addresses from %s:%i", + *self.network.client.server_address_and_port + ) + # abort and cancel, we can't lose a subscription, it will happen again on reconnect + if self.network.client: + self.network.client.abort() + raise asyncio.CancelledError() + await asyncio.sleep(1) + retries += 1 + log.info("finished subscribing to %i addresses", len(addresses)) def process_status_update(self, update): address, remote_status = update diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 542408d87..b3ae4b1b7 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -256,9 +256,10 @@ class Network: def subscribe_headers(self): return self.rpc('blockchain.headers.subscribe', [True], True) - async def subscribe_address(self, address): + async def subscribe_address(self, address, *addresses): + addresses = list((address, ) + addresses) try: - return await self.rpc('blockchain.address.subscribe', [address], True) + return await self.rpc('blockchain.address.subscribe', addresses, True) except asyncio.TimeoutError: # abort and cancel, we can't lose a subscription, it will happen again on reconnect if self.client: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9cca31d2b..ee4686284 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1141,12 +1141,16 @@ class LBRYElectrumX(SessionBase): hashX = self.address_to_hashX(address) return await self.hashX_listunspent(hashX) - async def address_subscribe(self, address): + async def address_subscribe(self, *addresses): """Subscribe to an address. address: the address to subscribe to""" - hashX = self.address_to_hashX(address) - return await self.hashX_subscribe(hashX, address) + if len(addresses) > 1000: + raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') + hashXes = [ + (self.address_to_hashX(address), address) for address in addresses + ] + return await asyncio.gather(*(self.hashX_subscribe(*args) for args in hashXes)) async def address_unsubscribe(self, address): """Unsubscribe an address.