Clear self._urgent_reconnect_needed once more after connection established.
Tweak sleep_delay logic so failures during initial RPC sequence also back off.
This commit is contained in:
parent
93f85d00fd
commit
154921a0ce
1 changed files with 17 additions and 11 deletions
|
@ -309,14 +309,14 @@ class Network:
|
||||||
return
|
return
|
||||||
|
|
||||||
async def network_loop(self):
|
async def network_loop(self):
|
||||||
sleep_delay = 15
|
def reset_sleep():
|
||||||
|
return 10 + random.uniform(0, 5)
|
||||||
|
sleep_delay = reset_sleep()
|
||||||
while self.running:
|
while self.running:
|
||||||
await asyncio.wait(
|
await asyncio.wait(
|
||||||
[asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()],
|
[asyncio.sleep(sleep_delay), self._urgent_need_reconnect.wait()],
|
||||||
return_when=asyncio.FIRST_COMPLETED
|
return_when=asyncio.FIRST_COMPLETED
|
||||||
)
|
)
|
||||||
if self._urgent_need_reconnect.is_set():
|
|
||||||
sleep_delay = 10 + random.uniform(0, 5)
|
|
||||||
self._urgent_need_reconnect.clear()
|
self._urgent_need_reconnect.clear()
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
client = await self.connect_to_fastest()
|
client = await self.connect_to_fastest()
|
||||||
|
@ -325,17 +325,24 @@ class Network:
|
||||||
sleep_delay = min(sleep_delay, 120)
|
sleep_delay = min(sleep_delay, 120)
|
||||||
log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay)
|
log.warning("failed to connect to any spv servers, retrying after %.2fs", sleep_delay)
|
||||||
continue
|
continue
|
||||||
|
sleep_delay = reset_sleep()
|
||||||
|
server_str = "%s:%i" % client.server
|
||||||
try:
|
try:
|
||||||
|
# Perform initial sequence of RPCs.
|
||||||
log.debug("get spv server features %s:%i", *client.server)
|
log.debug("get spv server features %s:%i", *client.server)
|
||||||
features = await client.send_request('server.features', [])
|
features = await client.send_request('server.features', [])
|
||||||
self.client, self.server_features = client, features
|
|
||||||
log.debug("discover other hubs %s:%i", *client.server)
|
log.debug("discover other hubs %s:%i", *client.server)
|
||||||
await self._update_hubs(await client.send_request('server.peers.subscribe', []))
|
await self._update_hubs(await client.send_request('server.peers.subscribe', []))
|
||||||
log.info("subscribe to headers %s:%i", *client.server)
|
log.info("subscribe to headers %s:%i", *client.server)
|
||||||
self._update_remote_height((await self.subscribe_headers(),))
|
self._update_remote_height((await client.send_request('blockchain.headers.subscribe', [True]),))
|
||||||
|
|
||||||
|
# All initial RPCs were successful. We're now connected.
|
||||||
|
self.client, self.server_features = client, features
|
||||||
|
self._urgent_need_reconnect.clear()
|
||||||
|
sleep_delay = reset_sleep()
|
||||||
|
# Release any waiters.
|
||||||
self._on_connected_controller.add(True)
|
self._on_connected_controller.add(True)
|
||||||
sleep_delay = 15
|
|
||||||
server_str = "%s:%i" % client.server
|
|
||||||
log.info("maintaining connection to spv server %s", server_str)
|
log.info("maintaining connection to spv server %s", server_str)
|
||||||
self._keepalive_task = asyncio.create_task(self.client.keepalive_loop())
|
self._keepalive_task = asyncio.create_task(self.client.keepalive_loop())
|
||||||
await asyncio.wait(
|
await asyncio.wait(
|
||||||
|
@ -345,7 +352,9 @@ class Network:
|
||||||
if self._urgent_need_reconnect.is_set():
|
if self._urgent_need_reconnect.is_set():
|
||||||
log.warning("urgent reconnect needed")
|
log.warning("urgent reconnect needed")
|
||||||
except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError):
|
except (asyncio.TimeoutError, ConnectionResetError, ConnectionError, RPCError, ProtocolError):
|
||||||
pass
|
sleep_delay *= 2
|
||||||
|
sleep_delay = min(sleep_delay, 120)
|
||||||
|
log.warning("failed to connect to spv server %s, retrying after %.2fs", server_str, sleep_delay)
|
||||||
finally:
|
finally:
|
||||||
if self._keepalive_task and not self._keepalive_task.done():
|
if self._keepalive_task and not self._keepalive_task.done():
|
||||||
self._keepalive_task.cancel()
|
self._keepalive_task.cancel()
|
||||||
|
@ -436,9 +445,6 @@ class Network:
|
||||||
def broadcast(self, raw_transaction):
|
def broadcast(self, raw_transaction):
|
||||||
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
|
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
|
||||||
|
|
||||||
def subscribe_headers(self):
|
|
||||||
return self.rpc('blockchain.headers.subscribe', [True], True)
|
|
||||||
|
|
||||||
async def subscribe_address(self, address, *addresses):
|
async def subscribe_address(self, address, *addresses):
|
||||||
addresses = list((address, ) + addresses)
|
addresses = list((address, ) + addresses)
|
||||||
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
|
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
|
||||||
|
|
Loading…
Add table
Reference in a new issue