forked from LBRYCommunity/lbry-sdk
non-blocking history lookup in notify
This commit is contained in:
parent
e6cae9bcc3
commit
fc9023386c
1 changed files with 15 additions and 24 deletions
|
@ -922,36 +922,27 @@ class LBRYElectrumX(SessionBase):
|
||||||
args = (await self.subscribe_headers_result(), )
|
args = (await self.subscribe_headers_result(), )
|
||||||
if not (await self.send_notification('blockchain.headers.subscribe', args)):
|
if not (await self.send_notification('blockchain.headers.subscribe', args)):
|
||||||
return
|
return
|
||||||
touched = touched.intersection(self.hashX_subs)
|
|
||||||
if touched or (height_changed and self.mempool_statuses):
|
|
||||||
changed = {}
|
|
||||||
|
|
||||||
for hashX in touched:
|
async def send_history_notification(alias, hashX):
|
||||||
alias = self.hashX_subs[hashX]
|
|
||||||
status = await self.address_status(hashX)
|
|
||||||
changed[alias] = status
|
|
||||||
|
|
||||||
# Check mempool hashXs - the status is a function of the
|
|
||||||
# confirmed state of other transactions. Note: we cannot
|
|
||||||
# iterate over mempool_statuses as it changes size.
|
|
||||||
for hashX in tuple(self.mempool_statuses):
|
|
||||||
# Items can be evicted whilst await-ing status; False
|
|
||||||
# ensures such hashXs are notified
|
|
||||||
old_status = self.mempool_statuses.get(hashX, False)
|
|
||||||
status = await self.address_status(hashX)
|
|
||||||
if status != old_status:
|
|
||||||
alias = self.hashX_subs[hashX]
|
|
||||||
changed[alias] = status
|
|
||||||
|
|
||||||
for alias, status in changed.items():
|
|
||||||
if len(alias) == 64:
|
if len(alias) == 64:
|
||||||
method = 'blockchain.scripthash.subscribe'
|
method = 'blockchain.scripthash.subscribe'
|
||||||
else:
|
else:
|
||||||
method = 'blockchain.address.subscribe'
|
method = 'blockchain.address.subscribe'
|
||||||
asyncio.create_task(self.send_notification(method, (alias, status)))
|
status = await self.address_status(hashX)
|
||||||
if changed:
|
await self.send_notification(method, (alias, status))
|
||||||
es = '' if len(changed) == 1 else 'es'
|
|
||||||
self.logger.info(f'notified of {len(changed):,d} address{es}')
|
touched = touched.intersection(self.hashX_subs)
|
||||||
|
if touched or (height_changed and self.mempool_statuses):
|
||||||
|
for hashX in touched:
|
||||||
|
alias = self.hashX_subs[hashX]
|
||||||
|
asyncio.create_task(send_history_notification(alias, hashX))
|
||||||
|
|
||||||
|
if touched:
|
||||||
|
es = '' if len(touched) == 1 else 'es'
|
||||||
|
self.logger.info(f'notified of {len(touched):,d} address{es}')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_metrics_or_placeholder_for_api(self, query_name):
|
def get_metrics_or_placeholder_for_api(self, query_name):
|
||||||
""" Do not hold on to a reference to the metrics
|
""" Do not hold on to a reference to the metrics
|
||||||
|
|
Loading…
Reference in a new issue