use network.subscribe_address

This commit is contained in:
Jack Robison 2020-01-10 13:57:52 -05:00
parent 08f6520557
commit 0bb4cdadd9
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 14 additions and 21 deletions

View file

@ -448,29 +448,18 @@ class Ledger(metaclass=LedgerRegistry):
async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str], batch_size: int = 1000): async def subscribe_addresses(self, address_manager: AddressManager, addresses: List[str], batch_size: int = 1000):
if self.network.is_connected and addresses: if self.network.is_connected and addresses:
addresses_remaining = list(addresses) addresses_remaining = list(addresses)
retries = 0
while addresses_remaining: while addresses_remaining:
batch = addresses_remaining[:batch_size] batch = addresses_remaining[:batch_size]
try: results = await self.network.subscribe_address(*batch)
results = await self.network.rpc('blockchain.address.subscribe', batch, True)
for address, remote_status in zip(batch, results): for address, remote_status in zip(batch, results):
self._update_tasks.add(self.update_history(address, remote_status, address_manager)) self._update_tasks.add(self.update_history(address, remote_status, address_manager))
retries = 0
addresses_remaining = addresses_remaining[batch_size:] addresses_remaining = addresses_remaining[batch_size:]
log.info("subscribed to %i/%i addresses", len(addresses) - len(addresses_remaining), len(addresses)) log.info("subscribed to %i/%i addresses on %s:%i", len(addresses) - len(addresses_remaining),
except asyncio.TimeoutError: len(addresses), *self.network.client.server_address_and_port)
if retries >= 3: log.info(
log.warning( "finished subscribing to %i addresses on %s:%i", len(addresses),
"timed out subscribing to addresses from %s:%i",
*self.network.client.server_address_and_port *self.network.client.server_address_and_port
) )
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
if self.network.client:
self.network.client.abort()
raise asyncio.CancelledError()
await asyncio.sleep(1)
retries += 1
log.info("finished subscribing to %i addresses", len(addresses))
def process_status_update(self, update): def process_status_update(self, update):
address, remote_status = update address, remote_status = update

View file

@ -261,6 +261,10 @@ class Network:
try: try:
return await self.rpc('blockchain.address.subscribe', addresses, True) return await self.rpc('blockchain.address.subscribe', addresses, True)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning(
"timed out subscribing to addresses from %s:%i",
*self.client.server_address_and_port
)
# abort and cancel, we can't lose a subscription, it will happen again on reconnect # abort and cancel, we can't lose a subscription, it will happen again on reconnect
if self.client: if self.client:
self.client.abort() self.client.abort()