From fc9023386c66d62184b4b6e10e88bdfb43afca42 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 15:56:44 -0400 Subject: [PATCH] non-blocking history lookup in notify --- lbry/wallet/server/session.py | 39 ++++++++++++++--------------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6eb514335..98f7b6fd5 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -922,36 +922,27 @@ class LBRYElectrumX(SessionBase): args = (await self.subscribe_headers_result(), ) if not (await self.send_notification('blockchain.headers.subscribe', args)): return + + async def send_history_notification(alias, hashX): + if len(alias) == 64: + method = 'blockchain.scripthash.subscribe' + else: + method = 'blockchain.address.subscribe' + status = await self.address_status(hashX) + await self.send_notification(method, (alias, status)) + touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): - changed = {} - for hashX in touched: alias = self.hashX_subs[hashX] - status = await self.address_status(hashX) - changed[alias] = status + asyncio.create_task(send_history_notification(alias, hashX)) + + if touched: + es = '' if len(touched) == 1 else 'es' + self.logger.info(f'notified of {len(touched):,d} address{es}') + - # Check mempool hashXs - the status is a function of the - # confirmed state of other transactions. Note: we cannot - # iterate over mempool_statuses as it changes size. - for hashX in tuple(self.mempool_statuses): - # Items can be evicted whilst await-ing status; False - # ensures such hashXs are notified - old_status = self.mempool_statuses.get(hashX, False) - status = await self.address_status(hashX) - if status != old_status: - alias = self.hashX_subs[hashX] - changed[alias] = status - for alias, status in changed.items(): - if len(alias) == 64: - method = 'blockchain.scripthash.subscribe' - else: - method = 'blockchain.address.subscribe' - asyncio.create_task(self.send_notification(method, (alias, status))) - if changed: - es = '' if len(changed) == 1 else 'es' - self.logger.info(f'notified of {len(changed):,d} address{es}') def get_metrics_or_placeholder_for_api(self, query_name): """ Do not hold on to a reference to the metrics