This commit is contained in:
Jack Robison 2020-05-26 11:52:12 -04:00
parent 597a101030
commit 1c4c18dec9
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 37 additions and 13 deletions

View file

@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry):
self._on_transaction_controller = StreamController() self._on_transaction_controller = StreamController()
self.on_transaction = self._on_transaction_controller.stream self.on_transaction = self._on_transaction_controller.stream
self.on_transaction.listen( self.on_transaction.listen(
lambda e: log.info( lambda e: log.debug(
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s', '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
) )
@ -497,12 +497,16 @@ class Ledger(metaclass=LedgerRegistry):
local_status, local_history = await self.get_local_status_and_history(address) local_status, local_history = await self.get_local_status_and_history(address)
if local_status == remote_status: if local_status == remote_status:
log.info("no new txs needed for %s, remote matches local", address)
return True return True
remote_history = await self.network.retriable_call(self.network.get_history, address) remote_history = await self.network.retriable_call(self.network.get_history, address)
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
we_need = set(remote_history) - set(local_history) we_need = set(remote_history) - set(local_history)
if not we_need: if not we_need:
log.info(
"no new txs needed for %s, local status %s, remote status %s", address, local_status, remote_status
)
return True return True
cache_tasks: List[asyncio.Task[Transaction]] = [] cache_tasks: List[asyncio.Task[Transaction]] = []
@ -568,16 +572,28 @@ class Ledger(metaclass=LedgerRegistry):
await self.get_local_status_and_history(address, synced_history.getvalue()) await self.get_local_status_and_history(address, synced_history.getvalue())
if local_status != remote_status: if local_status != remote_status:
if local_history == remote_history: if local_history == remote_history:
return True
log.warning( log.warning(
"Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", "%s has a synced history but a mismatched status", address
remote_status, len(remote_history), local_status, len(local_history)
) )
log.warning("local: %s", local_history) return True
log.warning("remote: %s", remote_history) remote_set = set(remote_history)
local_set = set(local_history)
log.warning(
"%s is out of sync after syncing. Remote: %s with %d items (%i unique), local: %s with %d items (%i unique). "
"Histories are mismatched on %i items.\nLocal is missing\n%s\nRemote is missing\n%s\n******",
address, remote_status, len(remote_history), len(remote_set),
local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
"\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]),
"\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)])
)
# log.warning("local: %s", local_history)
# log.warning("remote: %s", remote_history)
self._known_addresses_out_of_sync.add(address) self._known_addresses_out_of_sync.add(address)
return False return False
else: else:
log.warning(
"%s is synced", address
)
return True return True
async def cache_transaction(self, txid, remote_height, check_local=True): async def cache_transaction(self, txid, remote_height, check_local=True):

View file

@ -1,6 +1,7 @@
import logging import logging
import asyncio import asyncio
import json import json
import time
from time import perf_counter from time import perf_counter
from operator import itemgetter from operator import itemgetter
from typing import Dict, Optional, Tuple from typing import Dict, Optional, Tuple
@ -54,8 +55,8 @@ class ClientSession(BaseClientSession):
return result return result
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
log.info("%i in flight, send %s%s to %s:%i ", self.pending_amount, method, tuple(args), *self.server)
self.pending_amount += 1 self.pending_amount += 1
log.debug("send %s%s to %s:%i", method, tuple(args), *self.server)
try: try:
if method == 'server.version': if method == 'server.version':
return await self.send_timed_server_version_request(args, self.timeout) return await self.send_timed_server_version_request(args, self.timeout)
@ -63,11 +64,11 @@ class ClientSession(BaseClientSession):
while not request.done(): while not request.done():
done, pending = await asyncio.wait([request], timeout=self.timeout) done, pending = await asyncio.wait([request], timeout=self.timeout)
if pending: if pending:
log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received) log.info("Time since last packet: %s", perf_counter() - self.last_packet_received)
if (perf_counter() - self.last_packet_received) < self.timeout: if (perf_counter() - self.last_packet_received) < self.timeout:
continue continue
log.info("timeout sending %s to %s:%i", method, *self.server) log.warning("timeout sending %s(%s) to %s:%i", method, str(args), *self.server)
raise asyncio.TimeoutError raise asyncio.TimeoutError()
if done: if done:
try: try:
return request.result() return request.result()
@ -91,6 +92,7 @@ class ClientSession(BaseClientSession):
raise raise
finally: finally:
self.pending_amount -= 1 self.pending_amount -= 1
log.info("%i in flight, finished %s%s ", self.pending_amount, method, tuple(args))
async def ensure_session(self): async def ensure_session(self):
# Handles reconnecting and maintaining a session alive # Handles reconnecting and maintaining a session alive
@ -144,12 +146,12 @@ class ClientSession(BaseClientSession):
controller.add(request.args) controller.add(request.args)
def connection_lost(self, exc): def connection_lost(self, exc):
log.debug("Connection lost: %s:%d", *self.server) log.warning("Connection lost: %s:%d", *self.server)
super().connection_lost(exc) super().connection_lost(exc)
self.response_time = None self.response_time = None
self.connection_latency = None self.connection_latency = None
self._response_samples = 0 self._response_samples = 0
self.pending_amount = 0 # self.pending_amount = 0
self._on_disconnect_controller.add(True) self._on_disconnect_controller.add(True)
@ -274,8 +276,13 @@ class Network:
return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted) return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted)
# --- Subscribes, history and broadcasts are always aimed towards the master client directly # --- Subscribes, history and broadcasts are always aimed towards the master client directly
def get_history(self, address): async def get_history(self, address):
return self.rpc('blockchain.address.get_history', [address], True) log.info("get history %s", address)
start = time.perf_counter()
try:
return await self.rpc('blockchain.address.get_history', [address], True)
finally:
log.info("%s history took %s", address, time.perf_counter() - start)
def broadcast(self, raw_transaction): def broadcast(self, raw_transaction):
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True) return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
@ -296,6 +303,7 @@ class Network:
# abort and cancel, we can't lose a subscription, it will happen again on reconnect # abort and cancel, we can't lose a subscription, it will happen again on reconnect
if self.client: if self.client:
self.client.abort() self.client.abort()
log.warning("raise cancelled")
raise asyncio.CancelledError() raise asyncio.CancelledError()
def unsubscribe_address(self, address): def unsubscribe_address(self, address):