forked from LBRYCommunity/lbry-sdk
Compare commits
8 commits
master
...
fix-conn-l
Author | SHA1 | Date | |
---|---|---|---|
|
b0802b3473 | ||
|
e265fc28f0 | ||
|
497fe3a376 | ||
|
8fd8ebad1b | ||
|
51a9e0307d | ||
|
99556d1ce4 | ||
|
a667278c99 | ||
|
a432c47e62 |
6 changed files with 96 additions and 50 deletions
|
@ -151,7 +151,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
|||
return self._blob_bytes_received, self.close()
|
||||
msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \
|
||||
f" timeout in {self.peer_timeout}"
|
||||
log.info(msg)
|
||||
log.debug(msg)
|
||||
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
|
||||
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
|
||||
log.info("%s at %fMB/s", msg,
|
||||
|
|
|
@ -22,13 +22,12 @@ class BlobAnnouncer:
|
|||
if peers > 4:
|
||||
return blob_hash
|
||||
else:
|
||||
log.warning("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||
except Exception as err:
|
||||
if isinstance(err, asyncio.CancelledError):
|
||||
raise err
|
||||
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||
|
||||
|
||||
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
||||
while batch_size:
|
||||
if not self.node.joined.is_set():
|
||||
|
|
|
@ -88,10 +88,10 @@ class Node:
|
|||
)
|
||||
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
|
||||
if stored_to:
|
||||
log.info("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
|
||||
log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
|
||||
len(stored_to), len(peers))
|
||||
else:
|
||||
log.warning("Failed announcing %s, stored to 0 peers", blob_hash[:8])
|
||||
log.debug("Failed announcing %s, stored to 0 peers", blob_hash[:8])
|
||||
return stored_to
|
||||
|
||||
def stop(self) -> None:
|
||||
|
|
|
@ -395,9 +395,13 @@ class BaseLedger(metaclass=LedgerRegistry):
|
|||
|
||||
async def subscribe_addresses(self, address_manager: baseaccount.AddressManager, addresses: List[str]):
|
||||
if self.network.is_connected and addresses:
|
||||
async for address, remote_status in self.network.subscribe_address(*addresses):
|
||||
# subscribe isnt a retriable call as it happens right after a connection is made
|
||||
self._update_tasks.add(self.update_history(address, remote_status, address_manager))
|
||||
await asyncio.wait([
|
||||
self.subscribe_address(address_manager, address) for address in addresses
|
||||
])
|
||||
|
||||
async def subscribe_address(self, address_manager: baseaccount.AddressManager, address: str):
|
||||
remote_status = await self.network.subscribe_address(address)
|
||||
self._update_tasks.add(self.update_history(address, remote_status, address_manager))
|
||||
|
||||
def process_status_update(self, update):
|
||||
address, remote_status = update
|
||||
|
@ -465,6 +469,17 @@ class BaseLedger(metaclass=LedgerRegistry):
|
|||
|
||||
if address_manager is not None:
|
||||
await address_manager.ensure_address_gap()
|
||||
local_status, local_history = await self.get_local_status_and_history(address)
|
||||
|
||||
if local_status != remote_status:
|
||||
log.warning(
|
||||
"Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items",
|
||||
remote_status, len(remote_history), local_status, len(local_history)
|
||||
)
|
||||
log.warning("local: %s", local_history)
|
||||
log.warning("remote: %s", remote_history)
|
||||
else:
|
||||
log.info("Sync completed for: %s", address)
|
||||
|
||||
async def cache_transaction(self, txid, remote_height):
|
||||
cache_item = self._tx_cache.get(txid)
|
||||
|
|
|
@ -40,11 +40,12 @@ class ClientSession(BaseClientSession):
|
|||
return None
|
||||
return self.transport.get_extra_info('peername')
|
||||
|
||||
async def send_timed_server_version_request(self, args=()):
|
||||
async def send_timed_server_version_request(self, args=(), timeout=None):
|
||||
timeout = timeout or self.timeout
|
||||
log.debug("send version request to %s:%i", *self.server)
|
||||
start = perf_counter()
|
||||
result = await asyncio.wait_for(
|
||||
super().send_request('server.version', args), timeout=self.timeout
|
||||
super().send_request('server.version', args), timeout=timeout
|
||||
)
|
||||
current_response_time = perf_counter() - start
|
||||
response_sum = (self.response_time or 0) * self._response_samples + current_response_time
|
||||
|
@ -53,24 +54,39 @@ class ClientSession(BaseClientSession):
|
|||
return result
|
||||
|
||||
async def send_request(self, method, args=()):
|
||||
log.debug("send %s to %s:%i", method, *self.server)
|
||||
self.pending_amount += 1
|
||||
try:
|
||||
if method == 'server.version':
|
||||
return await self.send_timed_server_version_request(args)
|
||||
return await asyncio.wait_for(
|
||||
super().send_request(method, args), timeout=self.timeout
|
||||
)
|
||||
reply = await self.send_timed_server_version_request(args, self.timeout)
|
||||
else:
|
||||
reply = await asyncio.wait_for(
|
||||
super().send_request(method, args), timeout=self.timeout
|
||||
)
|
||||
log.debug("got reply for %s from %s:%i", method, *self.server)
|
||||
return reply
|
||||
except RPCError as e:
|
||||
log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s",
|
||||
*self.server, *e.args)
|
||||
raise e
|
||||
except ConnectionError:
|
||||
log.warning("connection to %s:%i lost", *self.server)
|
||||
self.synchronous_close()
|
||||
raise asyncio.CancelledError(f"connection to {self.server[0]}:{self.server[1]} lost")
|
||||
except asyncio.TimeoutError:
|
||||
log.info("timeout sending %s to %s:%i", method, *self.server)
|
||||
raise
|
||||
except asyncio.CancelledError:
|
||||
log.info("cancelled sending %s to %s:%i", method, *self.server)
|
||||
self.synchronous_close()
|
||||
raise
|
||||
finally:
|
||||
self.pending_amount -= 1
|
||||
|
||||
async def ensure_session(self):
|
||||
# Handles reconnecting and maintaining a session alive
|
||||
# TODO: change to 'ping' on newer protocol (above 1.2)
|
||||
retry_delay = default_delay = 0.1
|
||||
retry_delay = default_delay = 1.0
|
||||
while True:
|
||||
try:
|
||||
if self.is_closing():
|
||||
|
@ -83,19 +99,18 @@ class ClientSession(BaseClientSession):
|
|||
except (asyncio.TimeoutError, OSError):
|
||||
await self.close()
|
||||
retry_delay = min(60, retry_delay * 2)
|
||||
log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server)
|
||||
log.debug("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server)
|
||||
try:
|
||||
await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
except asyncio.CancelledError:
|
||||
self.synchronous_close()
|
||||
raise
|
||||
finally:
|
||||
self.trigger_urgent_reconnect.clear()
|
||||
|
||||
def ensure_server_version(self, required='1.2'):
|
||||
return self.send_request('server.version', [__version__, required])
|
||||
async def ensure_server_version(self, required='1.2', timeout=3):
|
||||
return await asyncio.wait_for(
|
||||
self.send_request('server.version', [__version__, required]), timeout=timeout
|
||||
)
|
||||
|
||||
async def create_connection(self, timeout=6):
|
||||
connector = Connector(lambda: self, *self.server)
|
||||
|
@ -120,7 +135,6 @@ class ClientSession(BaseClientSession):
|
|||
class BaseNetwork:
|
||||
|
||||
def __init__(self, ledger):
|
||||
self.switch_event = asyncio.Event()
|
||||
self.config = ledger.config
|
||||
self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6))
|
||||
self.client: Optional[ClientSession] = None
|
||||
|
@ -141,24 +155,38 @@ class BaseNetwork:
|
|||
'blockchain.address.subscribe': self._on_status_controller,
|
||||
}
|
||||
|
||||
async def switch_to_fastest(self):
|
||||
try:
|
||||
client = await asyncio.wait_for(self.session_pool.wait_for_fastest_session(), 30)
|
||||
except asyncio.TimeoutError:
|
||||
if self.client:
|
||||
await self.client.close()
|
||||
self.client = None
|
||||
for session in self.session_pool.sessions:
|
||||
session.synchronous_close()
|
||||
log.warning("not connected to any wallet servers")
|
||||
return
|
||||
current_client = self.client
|
||||
self.client = client
|
||||
log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
|
||||
self._on_connected_controller.add(True)
|
||||
try:
|
||||
self._update_remote_height((await self.subscribe_headers(),))
|
||||
log.info("Subscribed to headers: %s:%d", *self.client.server)
|
||||
except asyncio.TimeoutError:
|
||||
if self.client:
|
||||
await self.client.close()
|
||||
self.client = current_client
|
||||
return
|
||||
self.session_pool.new_connection_event.clear()
|
||||
return await self.session_pool.new_connection_event.wait()
|
||||
|
||||
async def start(self):
|
||||
self.running = True
|
||||
self.session_pool.start(self.config['default_servers'])
|
||||
self.on_header.listen(self._update_remote_height)
|
||||
while self.running:
|
||||
try:
|
||||
self.client = await self.session_pool.wait_for_fastest_session()
|
||||
self._update_remote_height((await self.subscribe_headers(),))
|
||||
log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
|
||||
self._on_connected_controller.add(True)
|
||||
self.client.on_disconnected.listen(lambda _: self.switch_event.set())
|
||||
await self.switch_event.wait()
|
||||
self.switch_event.clear()
|
||||
except asyncio.CancelledError:
|
||||
await self.stop()
|
||||
raise
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
await self.switch_to_fastest()
|
||||
|
||||
async def stop(self):
|
||||
self.running = False
|
||||
|
@ -178,21 +206,21 @@ class BaseNetwork:
|
|||
|
||||
async def retriable_call(self, function, *args, **kwargs):
|
||||
while self.running:
|
||||
if not self.is_connected:
|
||||
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
|
||||
await self.on_connected.first
|
||||
await self.session_pool.wait_for_fastest_session()
|
||||
try:
|
||||
return await function(*args, **kwargs)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("Wallet server call timed out, retrying.")
|
||||
except ConnectionError:
|
||||
if not self.is_connected and self.running:
|
||||
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
|
||||
await self.on_connected.first
|
||||
pass
|
||||
raise asyncio.CancelledError() # if we got here, we are shutting down
|
||||
|
||||
def _update_remote_height(self, header_args):
|
||||
self.remote_height = header_args[0]["height"]
|
||||
|
||||
def get_history(self, address):
|
||||
return self.rpc('blockchain.address.get_history', [address])
|
||||
|
||||
def get_transaction(self, tx_hash):
|
||||
return self.rpc('blockchain.transaction.get', [tx_hash])
|
||||
|
||||
|
@ -205,19 +233,23 @@ class BaseNetwork:
|
|||
def get_headers(self, height, count=10000):
|
||||
return self.rpc('blockchain.block.headers', [height, count])
|
||||
|
||||
# --- Subscribes and broadcasts are always aimed towards the master client directly
|
||||
# --- Subscribes, history and broadcasts are always aimed towards the master client directly
|
||||
def get_history(self, address):
|
||||
return self.rpc('blockchain.address.get_history', [address], session=self.client)
|
||||
|
||||
def broadcast(self, raw_transaction):
|
||||
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], session=self.client)
|
||||
|
||||
def subscribe_headers(self):
|
||||
return self.rpc('blockchain.headers.subscribe', [True], session=self.client)
|
||||
|
||||
async def subscribe_address(self, *addresses):
|
||||
async with self.client.send_batch() as batch:
|
||||
for address in addresses:
|
||||
batch.add_request('blockchain.address.subscribe', [address])
|
||||
for address, status in zip(addresses, batch.results):
|
||||
yield address, status
|
||||
async def subscribe_address(self, address):
|
||||
try:
|
||||
return await self.rpc('blockchain.address.subscribe', [address], session=self.client)
|
||||
except asyncio.TimeoutError:
|
||||
# abort and cancel, we cant lose a subscription, it will happen again on reconnect
|
||||
self.client.abort()
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
|
||||
class SessionPool:
|
||||
|
|
|
@ -145,8 +145,8 @@ class Stream:
|
|||
def first(self):
|
||||
future = asyncio.get_event_loop().create_future()
|
||||
subscription = self.listen(
|
||||
lambda value: self._cancel_and_callback(subscription, future, value),
|
||||
lambda exception: self._cancel_and_error(subscription, future, exception)
|
||||
lambda value: not future.done() and self._cancel_and_callback(subscription, future, value),
|
||||
lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception)
|
||||
)
|
||||
return future
|
||||
|
||||
|
|
Loading…
Reference in a new issue