diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index ceae4b125..762bb21cd 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -496,6 +496,17 @@ class RPCSession(SessionBase): self.abort() return False + async def send_notifications(self, notifications) -> bool: + """Send an RPC notification over the network.""" + message, _ = self.connection.send_batch(notifications) + try: + await self._send_message(message) + return True + except asyncio.TimeoutError: + self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) + self.abort() + return False + def send_batch(self, raise_errors=False): """Return a BatchRequest. Intended to be used like so: diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6f97d7231..5e3e94662 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -29,11 +29,12 @@ from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics from lbry.wallet.rpc.framing import NewlineFramer + import lbry.wallet.server.version as VERSION from lbry.wallet.rpc import ( RPCSession, JSONRPCAutoDetect, JSONRPCConnection, - handler_invocation, RPCError, Request, JSONRPC + handler_invocation, RPCError, Request, JSONRPC, Notification, Batch ) from lbry.wallet.server import text from lbry.wallet.server import util @@ -637,15 +638,17 @@ class SessionManager: if touched or (height_changed and self.mempool_statuses): notified_hashxs = 0 - notified_sessions = set() + session_hashxes_to_notify = defaultdict(list) to_notify = touched if height_changed else new_touched + for hashX in to_notify: for session_id in self.hashx_subscriptions_by_session[hashX]: - asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) - notified_sessions.add(session_id) - notified_hashxs += 1 - if notified_sessions: - self.logger.info(f'notified {len(notified_sessions)} sessions/{notified_hashxs:,d} touched addresses') + session_hashxes_to_notify[session_id].append(hashX) + notified_hashxs += 1 + for session_id, hashXes in session_hashxes_to_notify.items(): + asyncio.create_task(self.sessions[session_id].send_history_notifications(*hashXes)) + if session_hashxes_to_notify: + self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') def add_session(self, session): self.sessions[id(session)] = session @@ -914,19 +917,44 @@ class LBRYElectrumX(SessionBase): def sub_count(self): return len(self.hashX_subs) - async def send_history_notification(self, hashX): - start = time.perf_counter() - alias = self.hashX_subs[hashX] - if len(alias) == 64: - method = 'blockchain.scripthash.subscribe' - else: - method = 'blockchain.address.subscribe' - try: - self.session_mgr.notifications_in_flight_metric.inc() - status = await self.address_status(hashX) - self.session_mgr.address_history_metric.observe(time.perf_counter() - start) + async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): + notifications = [] + for hashX in hashXes: + alias = self.hashX_subs[hashX] + if len(alias) == 64: + method = 'blockchain.scripthash.subscribe' + else: + method = 'blockchain.address.subscribe' start = time.perf_counter() - await self.send_notification(method, (alias, status)) + db_history = await self.session_mgr.limited_history(hashX) + mempool = self.mempool.transaction_summaries(hashX) + + status = ''.join(f'{hash_to_hex_str(tx_hash)}:' + f'{height:d}:' + for tx_hash, height in db_history) + status += ''.join(f'{hash_to_hex_str(tx.hash)}:' + f'{-tx.has_unconfirmed_inputs:d}:' + for tx in mempool) + if status: + status = sha256(status.encode()).hex() + else: + status = None + if mempool: + self.session_mgr.mempool_statuses[hashX] = status + else: + self.session_mgr.mempool_statuses.pop(hashX, None) + + self.session_mgr.address_history_metric.observe(time.perf_counter() - start) + notifications.append((method, (alias, status))) + + start = time.perf_counter() + self.session_mgr.notifications_in_flight_metric.inc() + for method, args in notifications: + self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() + try: + await self.send_notifications( + Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications]) + ) self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start) finally: self.session_mgr.notifications_in_flight_metric.dec()