don't block the notification loop on sending the notifications

This commit is contained in:
Jack Robison 2020-06-04 09:25:41 -04:00
parent 8d93dd5adc
commit 9d44bbdb48
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 12 additions and 17 deletions

View file

@ -206,7 +206,7 @@ class SessionBase(asyncio.Protocol):
""" """
return self._address return self._address
def peer_address_str(self): def peer_address_str(self, for_log=True):
"""Returns the peer's IP address and port as a human-readable """Returns the peer's IP address and port as a human-readable
string.""" string."""
if not self._address: if not self._address:
@ -483,11 +483,17 @@ class RPCSession(SessionBase):
raise result raise result
return result return result
async def send_notification(self, method, args=()): async def send_notification(self, method, args=()) -> bool:
"""Send an RPC notification over the network.""" """Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args)) message = self.connection.send_notification(Notification(method, args))
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
await self._send_message(message) try:
await self._send_message(message)
return True
except asyncio.TimeoutError:
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
self.abort()
return False
def send_batch(self, raise_errors=False): def send_batch(self, raise_errors=False):
"""Return a BatchRequest. Intended to be used like so: """Return a BatchRequest. Intended to be used like so:

View file

@ -921,13 +921,8 @@ class LBRYElectrumX(SessionBase):
""" """
if height_changed and self.subscribe_headers: if height_changed and self.subscribe_headers:
args = (await self.subscribe_headers_result(), ) args = (await self.subscribe_headers_result(), )
try: if not (await self.send_notification('blockchain.headers.subscribe', args)):
await self.send_notification('blockchain.headers.subscribe', args)
except asyncio.TimeoutError:
self.logger.info("timeout sending headers notification to %s", self.peer_address_str(for_log=True))
self.abort()
return return
touched = touched.intersection(self.hashX_subs) touched = touched.intersection(self.hashX_subs)
if touched or (height_changed and self.mempool_statuses): if touched or (height_changed and self.mempool_statuses):
changed = {} changed = {}
@ -954,14 +949,7 @@ class LBRYElectrumX(SessionBase):
method = 'blockchain.scripthash.subscribe' method = 'blockchain.scripthash.subscribe'
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
asyncio.create_task(self.send_notification(method, (alias, status)))
try:
await self.send_notification(method, (alias, status))
except asyncio.TimeoutError:
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
self.abort()
return
if changed: if changed:
es = '' if len(changed) == 1 else 'es' es = '' if len(changed) == 1 else 'es'
self.logger.info(f'notified of {len(changed):,d} address{es}') self.logger.info(f'notified of {len(changed):,d} address{es}')
@ -1174,6 +1162,7 @@ class LBRYElectrumX(SessionBase):
""" """
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
db_history = await self.session_mgr.limited_history(hashX) db_history = await self.session_mgr.limited_history(hashX)
mempool = await self.mempool.transaction_summaries(hashX) mempool = await self.mempool.transaction_summaries(hashX)