Compare commits

...

4 commits

Author SHA1 Message Date
Victor Shyba
51a9e0307d simple address subs instead of batch 2019-08-22 16:32:14 -03:00
Jack Robison
99556d1ce4
debug 2019-08-22 14:35:22 -04:00
Jack Robison
a667278c99
switch_to_fastest 2019-08-22 13:51:08 -04:00
Jack Robison
a432c47e62
logging 2019-08-22 13:51:07 -04:00
5 changed files with 70 additions and 41 deletions

View file

@ -151,7 +151,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
return self._blob_bytes_received, self.close()
msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \
f" timeout in {self.peer_timeout}"
log.info(msg)
log.debug(msg)
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
log.info("%s at %fMB/s", msg,

View file

@ -22,13 +22,12 @@ class BlobAnnouncer:
if peers > 4:
return blob_hash
else:
log.warning("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
if not self.node.joined.is_set():

View file

@ -88,10 +88,10 @@ class Node:
)
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
if stored_to:
log.info("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
log.debug("Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
len(stored_to), len(peers))
else:
log.warning("Failed announcing %s, stored to 0 peers", blob_hash[:8])
log.debug("Failed announcing %s, stored to 0 peers", blob_hash[:8])
return stored_to
def stop(self) -> None:

View file

@ -395,9 +395,13 @@ class BaseLedger(metaclass=LedgerRegistry):
async def subscribe_addresses(self, address_manager: baseaccount.AddressManager, addresses: List[str]):
if self.network.is_connected and addresses:
async for address, remote_status in self.network.subscribe_address(*addresses):
# subscribe isnt a retriable call as it happens right after a connection is made
self._update_tasks.add(self.update_history(address, remote_status, address_manager))
await asyncio.wait([
self.subscribe_address(address_manager, address) for address in addresses
])
async def subscribe_address(self, address_manager: baseaccount.AddressManager, address: str):
remote_status = await self.network.subscribe_address(address)
self._update_tasks.add(self.update_history(address, remote_status, address_manager))
def process_status_update(self, update):
address, remote_status = update

View file

@ -40,11 +40,12 @@ class ClientSession(BaseClientSession):
return None
return self.transport.get_extra_info('peername')
async def send_timed_server_version_request(self, args=()):
async def send_timed_server_version_request(self, args=(), timeout=None):
timeout = timeout or self.timeout
log.debug("send version request to %s:%i", *self.server)
start = perf_counter()
result = await asyncio.wait_for(
super().send_request('server.version', args), timeout=self.timeout
super().send_request('server.version', args), timeout=timeout
)
current_response_time = perf_counter() - start
response_sum = (self.response_time or 0) * self._response_samples + current_response_time
@ -52,18 +53,34 @@ class ClientSession(BaseClientSession):
self._response_samples += 1
return result
async def send_request(self, method, args=()):
async def send_request(self, method, args=(), timeout=None):
log.info("send %s to %s:%i", method, *self.server)
timeout = timeout or self.timeout
self.pending_amount += 1
try:
if method == 'server.version':
return await self.send_timed_server_version_request(args)
return await asyncio.wait_for(
super().send_request(method, args), timeout=self.timeout
)
reply = await self.send_timed_server_version_request(args, timeout)
else:
reply = await asyncio.wait_for(
super().send_request(method, args), timeout=timeout
)
log.info("got reply for %s from %s:%i", method, *self.server)
return reply
except RPCError as e:
log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s",
*self.server, *e.args)
raise e
except ConnectionError:
log.warning("connection to %s:%i lost", *self.server)
self.synchronous_close()
raise asyncio.CancelledError(f"connection to {self.server[0]}:{self.server[1]} lost")
except asyncio.TimeoutError:
log.info("timeout sending %s to %s:%i", method, *self.server)
raise
except asyncio.CancelledError:
log.info("cancelled sending %s to %s:%i", method, *self.server)
self.synchronous_close()
raise
finally:
self.pending_amount -= 1
@ -83,19 +100,16 @@ class ClientSession(BaseClientSession):
except (asyncio.TimeoutError, OSError):
await self.close()
retry_delay = min(60, retry_delay * 2)
log.warning("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server)
log.debug("Wallet server timeout (retry in %s seconds): %s:%d", retry_delay, *self.server)
try:
await asyncio.wait_for(self.trigger_urgent_reconnect.wait(), timeout=retry_delay)
except asyncio.TimeoutError:
pass
except asyncio.CancelledError:
self.synchronous_close()
raise
finally:
self.trigger_urgent_reconnect.clear()
def ensure_server_version(self, required='1.2'):
return self.send_request('server.version', [__version__, required])
async def ensure_server_version(self, required='1.2', timeout=3):
return await self.send_request('server.version', [__version__, required], timeout)
async def create_connection(self, timeout=6):
connector = Connector(lambda: self, *self.server)
@ -120,7 +134,6 @@ class ClientSession(BaseClientSession):
class BaseNetwork:
def __init__(self, ledger):
self.switch_event = asyncio.Event()
self.config = ledger.config
self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6))
self.client: Optional[ClientSession] = None
@ -141,24 +154,41 @@ class BaseNetwork:
'blockchain.address.subscribe': self._on_status_controller,
}
async def switch_to_fastest(self):
try:
client = await asyncio.wait_for(self.session_pool.wait_for_fastest_session(), 30)
except asyncio.TimeoutError:
if self.client:
await self.client.close()
self.client = None
for session in self.session_pool.sessions:
session.synchronous_close()
log.warning("not connected to any wallet servers")
return
if not self.client or client.server_address_and_port != self.client.server_address_and_port:
current_client = self.client
self.client = client
log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
try:
self._update_remote_height((await self.subscribe_headers(),))
log.info("Subscribed to headers: %s:%d", *self.client.server)
if current_client:
await current_client.close()
log.info("Closed connection to %s:%i", *current_client.server)
except asyncio.TimeoutError:
if self.client:
await self.client.close()
self.client = current_client
return
self._on_connected_controller.add(True)
await asyncio.sleep(30)
async def start(self):
self.running = True
self.session_pool.start(self.config['default_servers'])
self.on_header.listen(self._update_remote_height)
while self.running:
try:
self.client = await self.session_pool.wait_for_fastest_session()
self._update_remote_height((await self.subscribe_headers(),))
log.info("Switching to SPV wallet server: %s:%d", *self.client.server)
self._on_connected_controller.add(True)
self.client.on_disconnected.listen(lambda _: self.switch_event.set())
await self.switch_event.wait()
self.switch_event.clear()
except asyncio.CancelledError:
await self.stop()
raise
except asyncio.TimeoutError:
pass
await self.switch_to_fastest()
async def stop(self):
self.running = False
@ -212,12 +242,8 @@ class BaseNetwork:
def subscribe_headers(self):
return self.rpc('blockchain.headers.subscribe', [True], session=self.client)
async def subscribe_address(self, *addresses):
async with self.client.send_batch() as batch:
for address in addresses:
batch.add_request('blockchain.address.subscribe', [address])
for address, status in zip(addresses, batch.results):
yield address, status
def subscribe_address(self, address):
return self.rpc('blockchain.address.subscribe', [address], session=self.client)
class SessionPool: