From ba6b985d714e975175165824ab740838f96e021a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 16 Jan 2022 13:59:38 -0500 Subject: [PATCH] cleanup debug logging --- lbry/testcase.py | 3 --- lbry/wallet/server/block_processor.py | 2 +- lbry/wallet/server/chain_reader.py | 10 +++++----- lbry/wallet/server/db/db.py | 9 +-------- lbry/wallet/server/db/elasticsearch/notifier.py | 14 +++++++++----- lbry/wallet/server/db/elasticsearch/sync.py | 10 ++++------ lbry/wallet/server/mempool.py | 4 ++-- lbry/wallet/server/session.py | 5 ----- .../integration/takeovers/test_resolve_command.py | 6 +++--- 9 files changed, 25 insertions(+), 38 deletions(-) diff --git a/lbry/testcase.py b/lbry/testcase.py index 31e7add4c..e761a2602 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -287,14 +287,11 @@ class IntegrationTestCase(AsyncioTestCase): async def generate(self, blocks): """ Ask lbrycrd to generate some blocks and wait until ledger has them. """ - print("generate", blocks) prepare = self.ledger.on_header.where(self.blockchain.is_expected_block) self.conductor.spv_node.server.synchronized.clear() await self.blockchain.generate(blocks) await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate - print("wait for synchronized") await self.conductor.spv_node.server.synchronized.wait() - print("finished waiting for synchronized") class FakeExchangeRateManager(ExchangeRateManager): diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 837ce6d67..643c71245 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -234,7 +234,7 @@ class BlockProcessor: await self.run_in_thread(self.advance_block, block) await self.flush() - self.logger.warning("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start) + self.logger.info("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start) if self.height == self.coin.nExtendedClaimExpirationForkHeight: self.logger.warning( "applying extended claim expiration fork on claims accepted by, %i", self.height diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index 729357b43..cc7ff1b4c 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -57,7 +57,7 @@ class BlockchainReader: self.db.read_db_state() if not self.last_state or self.last_state.height < state.height: for height in range(last_height + 1, state.height + 1): - self.log.warning("advancing to %i", height) + self.log.info("advancing to %i", height) self.advance(height) self.clear_caches() self.last_state = state @@ -77,7 +77,7 @@ class BlockchainReader: await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) async def refresh_blocks_forever(self, synchronized: asyncio.Event): - self.log.warning("start refresh blocks forever") + self.log.info("start refresh blocks forever") while True: try: async with self._lock: @@ -158,7 +158,7 @@ class BlockchainReaderServer(BlockchainReader): if self.notifications_to_send: for (touched, height) in self.notifications_to_send: await self.mempool.on_block(touched, height) - self.log.warning("reader advanced to %i", height) + self.log.info("reader advanced to %i", height) if self._es_height == self.db.db_height: self.synchronized.set() # print("reader notified") @@ -176,9 +176,9 @@ class BlockchainReaderServer(BlockchainReader): self.clear_search_cache() if self.last_state and self._es_block_hash == self.last_state.tip: self.synchronized.set() - self.log.warning("es and reader are in sync") + self.log.info("es and reader are in sync") else: - self.log.warning("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height) + self.log.info("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height) finally: self.es_notification_client.close() diff --git a/lbry/wallet/server/db/db.py b/lbry/wallet/server/db/db.py index 097f91a4b..679f9763e 100644 --- a/lbry/wallet/server/db/db.py +++ b/lbry/wallet/server/db/db.py @@ -731,7 +731,6 @@ class HubDB: return results async def _read_tx_counts(self): - print("read tx counts") if self.tx_counts is not None: return # tx_counts[N] has the cumulative number of txs at the end of @@ -957,11 +956,7 @@ class HubDB: return None, tx_height def get_block_txs(self, height: int) -> List[bytes]: - try: - return self.prefix_db.block_txs.get(height).tx_hashes - except: - print("failed", height) - raise + return self.prefix_db.block_txs.get(height).tx_hashes async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]): tx_infos = {} @@ -988,8 +983,6 @@ class HubDB: else: fill_cache = False tx_height = bisect_right(self.tx_counts, tx_num) - if tx_height == 737: - print("wat") tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) if tx_height == -1: merkle = { diff --git a/lbry/wallet/server/db/elasticsearch/notifier.py b/lbry/wallet/server/db/elasticsearch/notifier.py index 702bc23ae..8964f0671 100644 --- a/lbry/wallet/server/db/elasticsearch/notifier.py +++ b/lbry/wallet/server/db/elasticsearch/notifier.py @@ -17,15 +17,15 @@ class ElasticNotifierProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport self._listeners.append(self) - log.warning("got es notifier connection") + log.info("got es notifier connection") def connection_lost(self, exc) -> None: self._listeners.remove(self) self.transport = None def send_height(self, height: int, block_hash: bytes): - log.warning("notify es update '%s'", height) - self.transport.write(struct.pack(b'>Q32s', height, block_hash) + b'\n') + log.info("notify es update '%s'", height) + self.transport.write(struct.pack(b'>Q32s', height, block_hash)) class ElasticNotifierClientProtocol(asyncio.Protocol): @@ -41,11 +41,15 @@ class ElasticNotifierClientProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport - log.warning("connected to es notifier") + log.info("connected to es notifier") def connection_lost(self, exc) -> None: self.transport = None def data_received(self, data: bytes) -> None: - height, block_hash = struct.unpack(b'>Q32s', data.rstrip(b'\n')) + try: + height, block_hash = struct.unpack(b'>Q32s', data) + except: + log.exception("failed to decode %s", (data or b'').hex()) + raise self.notifications.put_nowait((height, block_hash)) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 5f1076240..d0740ea16 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -55,7 +55,7 @@ class ElasticWriter(BlockchainReader): server = await asyncio.get_event_loop().create_server( lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port ) - self.log.warning("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port) + self.log.info("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port) synchronized.set() async with server: await server.serve_forever() @@ -63,7 +63,7 @@ class ElasticWriter(BlockchainReader): def notify_es_notification_listeners(self, height: int, block_hash: bytes): for p in self._listeners: p.send_height(height, block_hash) - self.log.warning("notify listener %i", height) + self.log.info("notify listener %i", height) def _read_es_height(self): with open(self._es_info_path, 'r') as f: @@ -174,7 +174,6 @@ class ElasticWriter(BlockchainReader): for touched in self._touched_claims: claim = self.db.claim_producer(touched) if claim: - self.log.warning("send es %s %i", claim['claim_id'], claim['activation_height']) yield { 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, '_id': claim['claim_id'], @@ -183,7 +182,6 @@ class ElasticWriter(BlockchainReader): 'doc_as_upsert': True } for claim_hash, notifications in self._trending.items(): - self.log.warning("send es trending for %s", claim_hash.hex()) yield { '_id': claim_hash.hex(), '_index': self.index, @@ -218,7 +216,7 @@ class ElasticWriter(BlockchainReader): for to_del in touched_or_deleted.deleted_claims: if to_del in self._trending: self._trending.pop(to_del) - self.log.warning("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims), + self.log.info("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims), len(self._touched_claims), len(self._deleted_claims)) self._advanced = True @@ -263,7 +261,7 @@ class ElasticWriter(BlockchainReader): success += 1 await self.sync_client.indices.refresh(self.index) self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex()) - self.log.warning("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt) + self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt) self._touched_claims.clear() self._deleted_claims.clear() self._removed_during_undo.clear() diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index cecb9f299..3304dbda0 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -207,7 +207,7 @@ class MemPool: for session in self.session_manager.sessions.values() if session.subscribe_headers ] if header_tasks: - self.logger.warning(f'notify {len(header_tasks)} sessions of new header') + self.logger.info(f'notify {len(header_tasks)} sessions of new header') asyncio.create_task(asyncio.wait(header_tasks)) for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()): self.session_manager.mempool_statuses.pop(hashX, None) @@ -230,4 +230,4 @@ class MemPool: for session_id, hashXes in session_hashxes_to_notify.items(): asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes)) if session_hashxes_to_notify: - self.logger.warning(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') + self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 0318e932c..5e276e367 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -950,7 +950,6 @@ class LBRYElectrumX(SessionBase): self.session_manager.address_history_metric.observe(time.perf_counter() - start) notifications.append((method, (alias, status))) - print(f"notify {alias} {method}") start = time.perf_counter() self.session_manager.notifications_in_flight_metric.inc() @@ -1506,10 +1505,6 @@ class LBRYElectrumX(SessionBase): 'block_height': block_height } await asyncio.sleep(0) # heavy call, give other tasks a chance - print("return tx batch") - for tx_hash, (_, info) in batch_result.items(): - print(tx_hash, info['block_height']) - self.session_manager.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index 563adb7e7..cc14cbae6 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -1604,7 +1604,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.blockchain.generate(2) # wait for the client to catch up and verify the reorg - await asyncio.wait_for(self.on_header(209), 30.0) + await asyncio.wait_for(self.on_header(209), 3.0) await self.assertBlockHash(207) await self.assertBlockHash(208) await self.assertBlockHash(209) @@ -1633,7 +1633,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.blockchain.generate(1) # wait for the client to catch up - await asyncio.wait_for(self.on_header(210), 30.0) + await asyncio.wait_for(self.on_header(210), 3.0) # verify the claim is in the new block and that it is returned by claim_search republished = await self.resolve('hovercraft') @@ -1712,7 +1712,7 @@ class ResolveAfterReorg(BaseResolveTestCase): await self.blockchain.generate(1) # wait for the client to catch up - await asyncio.wait_for(self.on_header(210), 10.0) + await asyncio.wait_for(self.on_header(210), 1.0) # verify the claim is in the new block and that it is returned by claim_search republished = await self.resolve('hovercraft')