From 1c4c18dec9e6c4f29b967429fa467735a176c77b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 26 May 2020 11:52:12 -0400 Subject: [PATCH] debug --- lbry/wallet/ledger.py | 26 +++++++++++++++++++++----- lbry/wallet/network.py | 24 ++++++++++++++++-------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 194f1baa7..0a49ce71a 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_transaction_controller = StreamController() self.on_transaction = self._on_transaction_controller.stream self.on_transaction.listen( - lambda e: log.info( + lambda e: log.debug( '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s', self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id ) @@ -497,12 +497,16 @@ class Ledger(metaclass=LedgerRegistry): local_status, local_history = await self.get_local_status_and_history(address) if local_status == remote_status: + log.info("no new txs needed for %s, remote matches local", address) return True remote_history = await self.network.retriable_call(self.network.get_history, address) remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) we_need = set(remote_history) - set(local_history) if not we_need: + log.info( + "no new txs needed for %s, local status %s, remote status %s", address, local_status, remote_status + ) return True cache_tasks: List[asyncio.Task[Transaction]] = [] @@ -568,16 +572,28 @@ class Ledger(metaclass=LedgerRegistry): await self.get_local_status_and_history(address, synced_history.getvalue()) if local_status != remote_status: if local_history == remote_history: + log.warning( + "%s has a synced history but a mismatched status", address + ) return True + remote_set = set(remote_history) + local_set = set(local_history) log.warning( - "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", - remote_status, len(remote_history), local_status, len(local_history) + "%s is out of sync after syncing. Remote: %s with %d items (%i unique), local: %s with %d items (%i unique). " + "Histories are mismatched on %i items.\nLocal is missing\n%s\nRemote is missing\n%s\n******", + address, remote_status, len(remote_history), len(remote_set), + local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)), + "\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)]), + "\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]) ) - log.warning("local: %s", local_history) - log.warning("remote: %s", remote_history) + # log.warning("local: %s", local_history) + # log.warning("remote: %s", remote_history) self._known_addresses_out_of_sync.add(address) return False else: + log.warning( + "%s is synced", address + ) return True async def cache_transaction(self, txid, remote_height, check_local=True): diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 07b2cb59f..e853d054e 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -1,6 +1,7 @@ import logging import asyncio import json +import time from time import perf_counter from operator import itemgetter from typing import Dict, Optional, Tuple @@ -54,8 +55,8 @@ class ClientSession(BaseClientSession): return result async def send_request(self, method, args=()): + log.info("%i in flight, send %s%s to %s:%i ", self.pending_amount, method, tuple(args), *self.server) self.pending_amount += 1 - log.debug("send %s%s to %s:%i", method, tuple(args), *self.server) try: if method == 'server.version': return await self.send_timed_server_version_request(args, self.timeout) @@ -63,11 +64,11 @@ class ClientSession(BaseClientSession): while not request.done(): done, pending = await asyncio.wait([request], timeout=self.timeout) if pending: - log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received) + log.info("Time since last packet: %s", perf_counter() - self.last_packet_received) if (perf_counter() - self.last_packet_received) < self.timeout: continue - log.info("timeout sending %s to %s:%i", method, *self.server) - raise asyncio.TimeoutError + log.warning("timeout sending %s(%s) to %s:%i", method, str(args), *self.server) + raise asyncio.TimeoutError() if done: try: return request.result() @@ -91,6 +92,7 @@ class ClientSession(BaseClientSession): raise finally: self.pending_amount -= 1 + log.info("%i in flight, finished %s%s ", self.pending_amount, method, tuple(args)) async def ensure_session(self): # Handles reconnecting and maintaining a session alive @@ -144,12 +146,12 @@ class ClientSession(BaseClientSession): controller.add(request.args) def connection_lost(self, exc): - log.debug("Connection lost: %s:%d", *self.server) + log.warning("Connection lost: %s:%d", *self.server) super().connection_lost(exc) self.response_time = None self.connection_latency = None self._response_samples = 0 - self.pending_amount = 0 + # self.pending_amount = 0 self._on_disconnect_controller.add(True) @@ -274,8 +276,13 @@ class Network: return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted) # --- Subscribes, history and broadcasts are always aimed towards the master client directly - def get_history(self, address): - return self.rpc('blockchain.address.get_history', [address], True) + async def get_history(self, address): + log.info("get history %s", address) + start = time.perf_counter() + try: + return await self.rpc('blockchain.address.get_history', [address], True) + finally: + log.info("%s history took %s", address, time.perf_counter() - start) def broadcast(self, raw_transaction): return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True) @@ -296,6 +303,7 @@ class Network: # abort and cancel, we can't lose a subscription, it will happen again on reconnect if self.client: self.client.abort() + log.warning("raise cancelled") raise asyncio.CancelledError() def unsubscribe_address(self, address):