use multi_get for sending history notifications and for batched subscriptions

This commit is contained in:
Jack Robison 2022-05-27 14:06:34 -04:00
parent f1d51eae7b
commit 237b78ee63
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 32 additions and 25 deletions

View file

@ -242,7 +242,7 @@ class HubMemPool:
(self.session_manager.hsub_results[session.subscribe_headers_raw],)) (self.session_manager.hsub_results[session.subscribe_headers_raw],))
) )
if hashXes: if hashXes:
asyncio.create_task(session.send_history_notifications(*hashXes)) asyncio.create_task(session.send_history_notifications(hashXes))
async def _notify_sessions(self, height, touched, new_touched): async def _notify_sessions(self, height, touched, new_touched):
"""Notify sessions about height changes and touched addresses.""" """Notify sessions about height changes and touched addresses."""

View file

@ -22,7 +22,7 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION
from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from hub.herald.search import SearchIndex from hub.herald.search import SearchIndex
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, asyncify_for_loop
from hub.common import LRUCacheWithMetrics from hub.common import LRUCacheWithMetrics
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
@ -1181,33 +1181,39 @@ class LBRYElectrumX(asyncio.Protocol):
status = sha256(history.encode()) status = sha256(history.encode())
return status.hex() return status.hex()
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): async def get_hashX_statuses(self, hashXes: typing.List[bytes]):
if self.env.index_address_status:
return await self.db.get_hashX_statuses(hashXes)
return [await self.get_hashX_status(hashX) for hashX in hashXes]
async def send_history_notifications(self, hashXes: typing.List[bytes]):
notifications = [] notifications = []
for hashX in hashXes: start = time.perf_counter()
statuses = await self.get_hashX_statuses(hashXes)
duration = time.perf_counter() - start
self.session_manager.address_history_metric.observe(duration)
start = time.perf_counter()
scripthash_notifications = 0
address_notifications = 0
for hashX, status in zip(hashXes, statuses):
alias = self.hashX_subs[hashX] alias = self.hashX_subs[hashX]
if len(alias) == 64: if len(alias) == 64:
method = 'blockchain.scripthash.subscribe' method = 'blockchain.scripthash.subscribe'
scripthash_notifications += 1
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
start = time.perf_counter() address_notifications += 1
status = await self.get_hashX_status(hashX) notifications.append(Notification(method, (alias, status)))
duration = time.perf_counter() - start if scripthash_notifications:
self.session_manager.address_history_metric.observe(duration) self.NOTIFICATION_COUNT.labels(method='blockchain.scripthash.subscribe',).inc(scripthash_notifications)
notifications.append((method, (alias, status))) if address_notifications:
if duration > 30: self.NOTIFICATION_COUNT.labels(method='blockchain.address.subscribe', ).inc(address_notifications)
self.logger.warning("slow history notification (%s) for '%s'", duration, alias) self.session_manager.notifications_in_flight_metric.inc(len(notifications))
start = time.perf_counter()
self.session_manager.notifications_in_flight_metric.inc()
for method, args in notifications:
self.NOTIFICATION_COUNT.labels(method=method,).inc()
try: try:
await self.send_notifications( await self.send_notifications(Batch(notifications))
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
)
self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start) self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start)
finally: finally:
self.session_manager.notifications_in_flight_metric.dec() self.session_manager.notifications_in_flight_metric.dec(len(notifications))
# def get_metrics_or_placeholder_for_api(self, query_name): # def get_metrics_or_placeholder_for_api(self, query_name):
# """ Do not hold on to a reference to the metrics # """ Do not hold on to a reference to the metrics
@ -1470,12 +1476,13 @@ class LBRYElectrumX(asyncio.Protocol):
address: the address to subscribe to""" address: the address to subscribe to"""
if len(addresses) > 1000: if len(addresses) > 1000:
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
results = [] hashXes = [item async for item in asyncify_for_loop((self.address_to_hashX(address) for address in addresses), 100)]
statuses = await self.get_hashX_statuses(hashXes)
for hashX, alias in zip(hashXes, addresses):
self.hashX_subs[hashX] = alias
self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self))
self.session_manager.address_subscription_metric.inc(len(addresses)) self.session_manager.address_subscription_metric.inc(len(addresses))
for address in addresses: return statuses
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
await asyncio.sleep(0)
return results
async def address_unsubscribe(self, address): async def address_unsubscribe(self, address):
"""Unsubscribe an address. """Unsubscribe an address.