forked from LBRYCommunity/lbry-sdk
batch address history notifications
This commit is contained in:
parent
02cf478d91
commit
18e1256037
2 changed files with 58 additions and 19 deletions
|
@ -496,6 +496,17 @@ class RPCSession(SessionBase):
|
||||||
self.abort()
|
self.abort()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
async def send_notifications(self, notifications) -> bool:
|
||||||
|
"""Send an RPC notification over the network."""
|
||||||
|
message, _ = self.connection.send_batch(notifications)
|
||||||
|
try:
|
||||||
|
await self._send_message(message)
|
||||||
|
return True
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
|
||||||
|
self.abort()
|
||||||
|
return False
|
||||||
|
|
||||||
def send_batch(self, raise_errors=False):
|
def send_batch(self, raise_errors=False):
|
||||||
"""Return a BatchRequest. Intended to be used like so:
|
"""Return a BatchRequest. Intended to be used like so:
|
||||||
|
|
||||||
|
|
|
@ -29,11 +29,12 @@ from lbry.wallet.server.leveldb import LevelDB
|
||||||
from lbry.wallet.server.websocket import AdminWebSocket
|
from lbry.wallet.server.websocket import AdminWebSocket
|
||||||
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
|
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
|
||||||
from lbry.wallet.rpc.framing import NewlineFramer
|
from lbry.wallet.rpc.framing import NewlineFramer
|
||||||
|
|
||||||
import lbry.wallet.server.version as VERSION
|
import lbry.wallet.server.version as VERSION
|
||||||
|
|
||||||
from lbry.wallet.rpc import (
|
from lbry.wallet.rpc import (
|
||||||
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
|
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
|
||||||
handler_invocation, RPCError, Request, JSONRPC
|
handler_invocation, RPCError, Request, JSONRPC, Notification, Batch
|
||||||
)
|
)
|
||||||
from lbry.wallet.server import text
|
from lbry.wallet.server import text
|
||||||
from lbry.wallet.server import util
|
from lbry.wallet.server import util
|
||||||
|
@ -637,15 +638,17 @@ class SessionManager:
|
||||||
|
|
||||||
if touched or (height_changed and self.mempool_statuses):
|
if touched or (height_changed and self.mempool_statuses):
|
||||||
notified_hashxs = 0
|
notified_hashxs = 0
|
||||||
notified_sessions = set()
|
session_hashxes_to_notify = defaultdict(list)
|
||||||
to_notify = touched if height_changed else new_touched
|
to_notify = touched if height_changed else new_touched
|
||||||
|
|
||||||
for hashX in to_notify:
|
for hashX in to_notify:
|
||||||
for session_id in self.hashx_subscriptions_by_session[hashX]:
|
for session_id in self.hashx_subscriptions_by_session[hashX]:
|
||||||
asyncio.create_task(self.sessions[session_id].send_history_notification(hashX))
|
session_hashxes_to_notify[session_id].append(hashX)
|
||||||
notified_sessions.add(session_id)
|
notified_hashxs += 1
|
||||||
notified_hashxs += 1
|
for session_id, hashXes in session_hashxes_to_notify.items():
|
||||||
if notified_sessions:
|
asyncio.create_task(self.sessions[session_id].send_history_notifications(*hashXes))
|
||||||
self.logger.info(f'notified {len(notified_sessions)} sessions/{notified_hashxs:,d} touched addresses')
|
if session_hashxes_to_notify:
|
||||||
|
self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
|
||||||
|
|
||||||
def add_session(self, session):
|
def add_session(self, session):
|
||||||
self.sessions[id(session)] = session
|
self.sessions[id(session)] = session
|
||||||
|
@ -914,19 +917,44 @@ class LBRYElectrumX(SessionBase):
|
||||||
def sub_count(self):
|
def sub_count(self):
|
||||||
return len(self.hashX_subs)
|
return len(self.hashX_subs)
|
||||||
|
|
||||||
async def send_history_notification(self, hashX):
|
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
|
||||||
start = time.perf_counter()
|
notifications = []
|
||||||
alias = self.hashX_subs[hashX]
|
for hashX in hashXes:
|
||||||
if len(alias) == 64:
|
alias = self.hashX_subs[hashX]
|
||||||
method = 'blockchain.scripthash.subscribe'
|
if len(alias) == 64:
|
||||||
else:
|
method = 'blockchain.scripthash.subscribe'
|
||||||
method = 'blockchain.address.subscribe'
|
else:
|
||||||
try:
|
method = 'blockchain.address.subscribe'
|
||||||
self.session_mgr.notifications_in_flight_metric.inc()
|
|
||||||
status = await self.address_status(hashX)
|
|
||||||
self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
await self.send_notification(method, (alias, status))
|
db_history = await self.session_mgr.limited_history(hashX)
|
||||||
|
mempool = self.mempool.transaction_summaries(hashX)
|
||||||
|
|
||||||
|
status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
|
||||||
|
f'{height:d}:'
|
||||||
|
for tx_hash, height in db_history)
|
||||||
|
status += ''.join(f'{hash_to_hex_str(tx.hash)}:'
|
||||||
|
f'{-tx.has_unconfirmed_inputs:d}:'
|
||||||
|
for tx in mempool)
|
||||||
|
if status:
|
||||||
|
status = sha256(status.encode()).hex()
|
||||||
|
else:
|
||||||
|
status = None
|
||||||
|
if mempool:
|
||||||
|
self.session_mgr.mempool_statuses[hashX] = status
|
||||||
|
else:
|
||||||
|
self.session_mgr.mempool_statuses.pop(hashX, None)
|
||||||
|
|
||||||
|
self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
|
||||||
|
notifications.append((method, (alias, status)))
|
||||||
|
|
||||||
|
start = time.perf_counter()
|
||||||
|
self.session_mgr.notifications_in_flight_metric.inc()
|
||||||
|
for method, args in notifications:
|
||||||
|
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
|
||||||
|
try:
|
||||||
|
await self.send_notifications(
|
||||||
|
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
|
||||||
|
)
|
||||||
self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start)
|
self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start)
|
||||||
finally:
|
finally:
|
||||||
self.session_mgr.notifications_in_flight_metric.dec()
|
self.session_mgr.notifications_in_flight_metric.dec()
|
||||||
|
|
Loading…
Reference in a new issue