batch address history notifications
This commit is contained in:
parent
68d397a269
commit
f77f2f6e80
2 changed files with 58 additions and 19 deletions
|
@ -496,6 +496,17 @@ class RPCSession(SessionBase):
|
|||
self.abort()
|
||||
return False
|
||||
|
||||
async def send_notifications(self, notifications) -> bool:
|
||||
"""Send an RPC notification over the network."""
|
||||
message, _ = self.connection.send_batch(notifications)
|
||||
try:
|
||||
await self._send_message(message)
|
||||
return True
|
||||
except asyncio.TimeoutError:
|
||||
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
|
||||
self.abort()
|
||||
return False
|
||||
|
||||
def send_batch(self, raise_errors=False):
|
||||
"""Return a BatchRequest. Intended to be used like so:
|
||||
|
||||
|
|
|
@ -29,11 +29,12 @@ from lbry.wallet.server.leveldb import LevelDB
|
|||
from lbry.wallet.server.websocket import AdminWebSocket
|
||||
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
|
||||
from lbry.wallet.rpc.framing import NewlineFramer
|
||||
|
||||
import lbry.wallet.server.version as VERSION
|
||||
|
||||
from lbry.wallet.rpc import (
|
||||
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
|
||||
handler_invocation, RPCError, Request, JSONRPC
|
||||
handler_invocation, RPCError, Request, JSONRPC, Notification, Batch
|
||||
)
|
||||
from lbry.wallet.server import text
|
||||
from lbry.wallet.server import util
|
||||
|
@ -637,15 +638,17 @@ class SessionManager:
|
|||
|
||||
if touched or (height_changed and self.mempool_statuses):
|
||||
notified_hashxs = 0
|
||||
notified_sessions = set()
|
||||
session_hashxes_to_notify = defaultdict(list)
|
||||
to_notify = touched if height_changed else new_touched
|
||||
|
||||
for hashX in to_notify:
|
||||
for session_id in self.hashx_subscriptions_by_session[hashX]:
|
||||
asyncio.create_task(self.sessions[session_id].send_history_notification(hashX))
|
||||
notified_sessions.add(session_id)
|
||||
notified_hashxs += 1
|
||||
if notified_sessions:
|
||||
self.logger.info(f'notified {len(notified_sessions)} sessions/{notified_hashxs:,d} touched addresses')
|
||||
session_hashxes_to_notify[session_id].append(hashX)
|
||||
notified_hashxs += 1
|
||||
for session_id, hashXes in session_hashxes_to_notify.items():
|
||||
asyncio.create_task(self.sessions[session_id].send_history_notifications(*hashXes))
|
||||
if session_hashxes_to_notify:
|
||||
self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
|
||||
|
||||
def add_session(self, session):
|
||||
self.sessions[id(session)] = session
|
||||
|
@ -914,19 +917,44 @@ class LBRYElectrumX(SessionBase):
|
|||
def sub_count(self):
|
||||
return len(self.hashX_subs)
|
||||
|
||||
async def send_history_notification(self, hashX):
|
||||
start = time.perf_counter()
|
||||
alias = self.hashX_subs[hashX]
|
||||
if len(alias) == 64:
|
||||
method = 'blockchain.scripthash.subscribe'
|
||||
else:
|
||||
method = 'blockchain.address.subscribe'
|
||||
try:
|
||||
self.session_mgr.notifications_in_flight_metric.inc()
|
||||
status = await self.address_status(hashX)
|
||||
self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
|
||||
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
|
||||
notifications = []
|
||||
for hashX in hashXes:
|
||||
alias = self.hashX_subs[hashX]
|
||||
if len(alias) == 64:
|
||||
method = 'blockchain.scripthash.subscribe'
|
||||
else:
|
||||
method = 'blockchain.address.subscribe'
|
||||
start = time.perf_counter()
|
||||
await self.send_notification(method, (alias, status))
|
||||
db_history = await self.session_mgr.limited_history(hashX)
|
||||
mempool = self.mempool.transaction_summaries(hashX)
|
||||
|
||||
status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
|
||||
f'{height:d}:'
|
||||
for tx_hash, height in db_history)
|
||||
status += ''.join(f'{hash_to_hex_str(tx.hash)}:'
|
||||
f'{-tx.has_unconfirmed_inputs:d}:'
|
||||
for tx in mempool)
|
||||
if status:
|
||||
status = sha256(status.encode()).hex()
|
||||
else:
|
||||
status = None
|
||||
if mempool:
|
||||
self.session_mgr.mempool_statuses[hashX] = status
|
||||
else:
|
||||
self.session_mgr.mempool_statuses.pop(hashX, None)
|
||||
|
||||
self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
|
||||
notifications.append((method, (alias, status)))
|
||||
|
||||
start = time.perf_counter()
|
||||
self.session_mgr.notifications_in_flight_metric.inc()
|
||||
for method, args in notifications:
|
||||
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
|
||||
try:
|
||||
await self.send_notifications(
|
||||
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
|
||||
)
|
||||
self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start)
|
||||
finally:
|
||||
self.session_mgr.notifications_in_flight_metric.dec()
|
||||
|
|
Loading…
Reference in a new issue