cleanup debug logging

This commit is contained in:
Jack Robison 2022-01-16 13:59:38 -05:00
parent a7d64de361
commit d7707d0053
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
9 changed files with 25 additions and 38 deletions

View file

@ -287,14 +287,11 @@ class IntegrationTestCase(AsyncioTestCase):
async def generate(self, blocks): async def generate(self, blocks):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """ """ Ask lbrycrd to generate some blocks and wait until ledger has them. """
print("generate", blocks)
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block) prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
self.conductor.spv_node.server.synchronized.clear() self.conductor.spv_node.server.synchronized.clear()
await self.blockchain.generate(blocks) await self.blockchain.generate(blocks)
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
print("wait for synchronized")
await self.conductor.spv_node.server.synchronized.wait() await self.conductor.spv_node.server.synchronized.wait()
print("finished waiting for synchronized")
class FakeExchangeRateManager(ExchangeRateManager): class FakeExchangeRateManager(ExchangeRateManager):

View file

@ -234,7 +234,7 @@ class BlockProcessor:
await self.run_in_thread(self.advance_block, block) await self.run_in_thread(self.advance_block, block)
await self.flush() await self.flush()
self.logger.warning("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start) self.logger.info("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
if self.height == self.coin.nExtendedClaimExpirationForkHeight: if self.height == self.coin.nExtendedClaimExpirationForkHeight:
self.logger.warning( self.logger.warning(
"applying extended claim expiration fork on claims accepted by, %i", self.height "applying extended claim expiration fork on claims accepted by, %i", self.height

View file

@ -57,7 +57,7 @@ class BlockchainReader:
self.db.read_db_state() self.db.read_db_state()
if not self.last_state or self.last_state.height < state.height: if not self.last_state or self.last_state.height < state.height:
for height in range(last_height + 1, state.height + 1): for height in range(last_height + 1, state.height + 1):
self.log.warning("advancing to %i", height) self.log.info("advancing to %i", height)
self.advance(height) self.advance(height)
self.clear_caches() self.clear_caches()
self.last_state = state self.last_state = state
@ -77,7 +77,7 @@ class BlockchainReader:
await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)
async def refresh_blocks_forever(self, synchronized: asyncio.Event): async def refresh_blocks_forever(self, synchronized: asyncio.Event):
self.log.warning("start refresh blocks forever") self.log.info("start refresh blocks forever")
while True: while True:
try: try:
async with self._lock: async with self._lock:
@ -158,7 +158,7 @@ class BlockchainReaderServer(BlockchainReader):
if self.notifications_to_send: if self.notifications_to_send:
for (touched, height) in self.notifications_to_send: for (touched, height) in self.notifications_to_send:
await self.mempool.on_block(touched, height) await self.mempool.on_block(touched, height)
self.log.warning("reader advanced to %i", height) self.log.info("reader advanced to %i", height)
if self._es_height == self.db.db_height: if self._es_height == self.db.db_height:
self.synchronized.set() self.synchronized.set()
# print("reader notified") # print("reader notified")
@ -176,9 +176,9 @@ class BlockchainReaderServer(BlockchainReader):
self.clear_search_cache() self.clear_search_cache()
if self.last_state and self._es_block_hash == self.last_state.tip: if self.last_state and self._es_block_hash == self.last_state.tip:
self.synchronized.set() self.synchronized.set()
self.log.warning("es and reader are in sync") self.log.info("es and reader are in sync")
else: else:
self.log.warning("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height) self.log.info("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height)
finally: finally:
self.es_notification_client.close() self.es_notification_client.close()

View file

@ -731,7 +731,6 @@ class HubDB:
return results return results
async def _read_tx_counts(self): async def _read_tx_counts(self):
print("read tx counts")
if self.tx_counts is not None: if self.tx_counts is not None:
return return
# tx_counts[N] has the cumulative number of txs at the end of # tx_counts[N] has the cumulative number of txs at the end of
@ -957,11 +956,7 @@ class HubDB:
return None, tx_height return None, tx_height
def get_block_txs(self, height: int) -> List[bytes]: def get_block_txs(self, height: int) -> List[bytes]:
try: return self.prefix_db.block_txs.get(height).tx_hashes
return self.prefix_db.block_txs.get(height).tx_hashes
except:
print("failed", height)
raise
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]): async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
tx_infos = {} tx_infos = {}
@ -988,8 +983,6 @@ class HubDB:
else: else:
fill_cache = False fill_cache = False
tx_height = bisect_right(self.tx_counts, tx_num) tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height == 737:
print("wat")
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1: if tx_height == -1:
merkle = { merkle = {

View file

@ -17,15 +17,15 @@ class ElasticNotifierProtocol(asyncio.Protocol):
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
self._listeners.append(self) self._listeners.append(self)
log.warning("got es notifier connection") log.info("got es notifier connection")
def connection_lost(self, exc) -> None: def connection_lost(self, exc) -> None:
self._listeners.remove(self) self._listeners.remove(self)
self.transport = None self.transport = None
def send_height(self, height: int, block_hash: bytes): def send_height(self, height: int, block_hash: bytes):
log.warning("notify es update '%s'", height) log.info("notify es update '%s'", height)
self.transport.write(struct.pack(b'>Q32s', height, block_hash) + b'\n') self.transport.write(struct.pack(b'>Q32s', height, block_hash))
class ElasticNotifierClientProtocol(asyncio.Protocol): class ElasticNotifierClientProtocol(asyncio.Protocol):
@ -41,11 +41,15 @@ class ElasticNotifierClientProtocol(asyncio.Protocol):
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
log.warning("connected to es notifier") log.info("connected to es notifier")
def connection_lost(self, exc) -> None: def connection_lost(self, exc) -> None:
self.transport = None self.transport = None
def data_received(self, data: bytes) -> None: def data_received(self, data: bytes) -> None:
height, block_hash = struct.unpack(b'>Q32s', data.rstrip(b'\n')) try:
height, block_hash = struct.unpack(b'>Q32s', data)
except:
log.exception("failed to decode %s", (data or b'').hex())
raise
self.notifications.put_nowait((height, block_hash)) self.notifications.put_nowait((height, block_hash))

View file

@ -55,7 +55,7 @@ class ElasticWriter(BlockchainReader):
server = await asyncio.get_event_loop().create_server( server = await asyncio.get_event_loop().create_server(
lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port
) )
self.log.warning("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port) self.log.info("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port)
synchronized.set() synchronized.set()
async with server: async with server:
await server.serve_forever() await server.serve_forever()
@ -63,7 +63,7 @@ class ElasticWriter(BlockchainReader):
def notify_es_notification_listeners(self, height: int, block_hash: bytes): def notify_es_notification_listeners(self, height: int, block_hash: bytes):
for p in self._listeners: for p in self._listeners:
p.send_height(height, block_hash) p.send_height(height, block_hash)
self.log.warning("notify listener %i", height) self.log.info("notify listener %i", height)
def _read_es_height(self): def _read_es_height(self):
with open(self._es_info_path, 'r') as f: with open(self._es_info_path, 'r') as f:
@ -174,7 +174,6 @@ class ElasticWriter(BlockchainReader):
for touched in self._touched_claims: for touched in self._touched_claims:
claim = self.db.claim_producer(touched) claim = self.db.claim_producer(touched)
if claim: if claim:
self.log.warning("send es %s %i", claim['claim_id'], claim['activation_height'])
yield { yield {
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
'_id': claim['claim_id'], '_id': claim['claim_id'],
@ -183,7 +182,6 @@ class ElasticWriter(BlockchainReader):
'doc_as_upsert': True 'doc_as_upsert': True
} }
for claim_hash, notifications in self._trending.items(): for claim_hash, notifications in self._trending.items():
self.log.warning("send es trending for %s", claim_hash.hex())
yield { yield {
'_id': claim_hash.hex(), '_id': claim_hash.hex(),
'_index': self.index, '_index': self.index,
@ -218,7 +216,7 @@ class ElasticWriter(BlockchainReader):
for to_del in touched_or_deleted.deleted_claims: for to_del in touched_or_deleted.deleted_claims:
if to_del in self._trending: if to_del in self._trending:
self._trending.pop(to_del) self._trending.pop(to_del)
self.log.warning("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims), self.log.info("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims),
len(self._touched_claims), len(self._deleted_claims)) len(self._touched_claims), len(self._deleted_claims))
self._advanced = True self._advanced = True
@ -263,7 +261,7 @@ class ElasticWriter(BlockchainReader):
success += 1 success += 1
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex()) self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
self.log.warning("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt) self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt)
self._touched_claims.clear() self._touched_claims.clear()
self._deleted_claims.clear() self._deleted_claims.clear()
self._removed_during_undo.clear() self._removed_during_undo.clear()

View file

@ -207,7 +207,7 @@ class MemPool:
for session in self.session_manager.sessions.values() if session.subscribe_headers for session in self.session_manager.sessions.values() if session.subscribe_headers
] ]
if header_tasks: if header_tasks:
self.logger.warning(f'notify {len(header_tasks)} sessions of new header') self.logger.info(f'notify {len(header_tasks)} sessions of new header')
asyncio.create_task(asyncio.wait(header_tasks)) asyncio.create_task(asyncio.wait(header_tasks))
for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()): for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()):
self.session_manager.mempool_statuses.pop(hashX, None) self.session_manager.mempool_statuses.pop(hashX, None)
@ -230,4 +230,4 @@ class MemPool:
for session_id, hashXes in session_hashxes_to_notify.items(): for session_id, hashXes in session_hashxes_to_notify.items():
asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes)) asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes))
if session_hashxes_to_notify: if session_hashxes_to_notify:
self.logger.warning(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')

View file

@ -950,7 +950,6 @@ class LBRYElectrumX(SessionBase):
self.session_manager.address_history_metric.observe(time.perf_counter() - start) self.session_manager.address_history_metric.observe(time.perf_counter() - start)
notifications.append((method, (alias, status))) notifications.append((method, (alias, status)))
print(f"notify {alias} {method}")
start = time.perf_counter() start = time.perf_counter()
self.session_manager.notifications_in_flight_metric.inc() self.session_manager.notifications_in_flight_metric.inc()
@ -1506,10 +1505,6 @@ class LBRYElectrumX(SessionBase):
'block_height': block_height 'block_height': block_height
} }
await asyncio.sleep(0) # heavy call, give other tasks a chance await asyncio.sleep(0) # heavy call, give other tasks a chance
print("return tx batch")
for tx_hash, (_, info) in batch_result.items():
print(tx_hash, info['block_height'])
self.session_manager.tx_replied_count_metric.inc(len(tx_hashes)) self.session_manager.tx_replied_count_metric.inc(len(tx_hashes))
return batch_result return batch_result

View file

@ -1604,7 +1604,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(2) await self.blockchain.generate(2)
# wait for the client to catch up and verify the reorg # wait for the client to catch up and verify the reorg
await asyncio.wait_for(self.on_header(209), 30.0) await asyncio.wait_for(self.on_header(209), 3.0)
await self.assertBlockHash(207) await self.assertBlockHash(207)
await self.assertBlockHash(208) await self.assertBlockHash(208)
await self.assertBlockHash(209) await self.assertBlockHash(209)
@ -1633,7 +1633,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(1) await self.blockchain.generate(1)
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 30.0) await asyncio.wait_for(self.on_header(210), 3.0)
# verify the claim is in the new block and that it is returned by claim_search # verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft') republished = await self.resolve('hovercraft')
@ -1712,7 +1712,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
await self.blockchain.generate(1) await self.blockchain.generate(1)
# wait for the client to catch up # wait for the client to catch up
await asyncio.wait_for(self.on_header(210), 10.0) await asyncio.wait_for(self.on_header(210), 1.0)
# verify the claim is in the new block and that it is returned by claim_search # verify the claim is in the new block and that it is returned by claim_search
republished = await self.resolve('hovercraft') republished = await self.resolve('hovercraft')