forked from LBRYCommunity/lbry-sdk
cleanup debug logging
This commit is contained in:
parent
49802b39cb
commit
ba6b985d71
9 changed files with 25 additions and 38 deletions
|
@ -287,14 +287,11 @@ class IntegrationTestCase(AsyncioTestCase):
|
||||||
|
|
||||||
async def generate(self, blocks):
|
async def generate(self, blocks):
|
||||||
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
|
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
|
||||||
print("generate", blocks)
|
|
||||||
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
|
prepare = self.ledger.on_header.where(self.blockchain.is_expected_block)
|
||||||
self.conductor.spv_node.server.synchronized.clear()
|
self.conductor.spv_node.server.synchronized.clear()
|
||||||
await self.blockchain.generate(blocks)
|
await self.blockchain.generate(blocks)
|
||||||
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
|
await prepare # no guarantee that it didn't happen already, so start waiting from before calling generate
|
||||||
print("wait for synchronized")
|
|
||||||
await self.conductor.spv_node.server.synchronized.wait()
|
await self.conductor.spv_node.server.synchronized.wait()
|
||||||
print("finished waiting for synchronized")
|
|
||||||
|
|
||||||
|
|
||||||
class FakeExchangeRateManager(ExchangeRateManager):
|
class FakeExchangeRateManager(ExchangeRateManager):
|
||||||
|
|
|
@ -234,7 +234,7 @@ class BlockProcessor:
|
||||||
await self.run_in_thread(self.advance_block, block)
|
await self.run_in_thread(self.advance_block, block)
|
||||||
await self.flush()
|
await self.flush()
|
||||||
|
|
||||||
self.logger.warning("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
self.logger.info("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
||||||
if self.height == self.coin.nExtendedClaimExpirationForkHeight:
|
if self.height == self.coin.nExtendedClaimExpirationForkHeight:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
"applying extended claim expiration fork on claims accepted by, %i", self.height
|
"applying extended claim expiration fork on claims accepted by, %i", self.height
|
||||||
|
|
|
@ -57,7 +57,7 @@ class BlockchainReader:
|
||||||
self.db.read_db_state()
|
self.db.read_db_state()
|
||||||
if not self.last_state or self.last_state.height < state.height:
|
if not self.last_state or self.last_state.height < state.height:
|
||||||
for height in range(last_height + 1, state.height + 1):
|
for height in range(last_height + 1, state.height + 1):
|
||||||
self.log.warning("advancing to %i", height)
|
self.log.info("advancing to %i", height)
|
||||||
self.advance(height)
|
self.advance(height)
|
||||||
self.clear_caches()
|
self.clear_caches()
|
||||||
self.last_state = state
|
self.last_state = state
|
||||||
|
@ -77,7 +77,7 @@ class BlockchainReader:
|
||||||
await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)
|
await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes)
|
||||||
|
|
||||||
async def refresh_blocks_forever(self, synchronized: asyncio.Event):
|
async def refresh_blocks_forever(self, synchronized: asyncio.Event):
|
||||||
self.log.warning("start refresh blocks forever")
|
self.log.info("start refresh blocks forever")
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
|
@ -158,7 +158,7 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
if self.notifications_to_send:
|
if self.notifications_to_send:
|
||||||
for (touched, height) in self.notifications_to_send:
|
for (touched, height) in self.notifications_to_send:
|
||||||
await self.mempool.on_block(touched, height)
|
await self.mempool.on_block(touched, height)
|
||||||
self.log.warning("reader advanced to %i", height)
|
self.log.info("reader advanced to %i", height)
|
||||||
if self._es_height == self.db.db_height:
|
if self._es_height == self.db.db_height:
|
||||||
self.synchronized.set()
|
self.synchronized.set()
|
||||||
# print("reader notified")
|
# print("reader notified")
|
||||||
|
@ -176,9 +176,9 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
self.clear_search_cache()
|
self.clear_search_cache()
|
||||||
if self.last_state and self._es_block_hash == self.last_state.tip:
|
if self.last_state and self._es_block_hash == self.last_state.tip:
|
||||||
self.synchronized.set()
|
self.synchronized.set()
|
||||||
self.log.warning("es and reader are in sync")
|
self.log.info("es and reader are in sync")
|
||||||
else:
|
else:
|
||||||
self.log.warning("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height)
|
self.log.info("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height)
|
||||||
finally:
|
finally:
|
||||||
self.es_notification_client.close()
|
self.es_notification_client.close()
|
||||||
|
|
||||||
|
|
|
@ -731,7 +731,6 @@ class HubDB:
|
||||||
return results
|
return results
|
||||||
|
|
||||||
async def _read_tx_counts(self):
|
async def _read_tx_counts(self):
|
||||||
print("read tx counts")
|
|
||||||
if self.tx_counts is not None:
|
if self.tx_counts is not None:
|
||||||
return
|
return
|
||||||
# tx_counts[N] has the cumulative number of txs at the end of
|
# tx_counts[N] has the cumulative number of txs at the end of
|
||||||
|
@ -957,11 +956,7 @@ class HubDB:
|
||||||
return None, tx_height
|
return None, tx_height
|
||||||
|
|
||||||
def get_block_txs(self, height: int) -> List[bytes]:
|
def get_block_txs(self, height: int) -> List[bytes]:
|
||||||
try:
|
return self.prefix_db.block_txs.get(height).tx_hashes
|
||||||
return self.prefix_db.block_txs.get(height).tx_hashes
|
|
||||||
except:
|
|
||||||
print("failed", height)
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
|
||||||
tx_infos = {}
|
tx_infos = {}
|
||||||
|
@ -988,8 +983,6 @@ class HubDB:
|
||||||
else:
|
else:
|
||||||
fill_cache = False
|
fill_cache = False
|
||||||
tx_height = bisect_right(self.tx_counts, tx_num)
|
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||||
if tx_height == 737:
|
|
||||||
print("wat")
|
|
||||||
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
|
||||||
if tx_height == -1:
|
if tx_height == -1:
|
||||||
merkle = {
|
merkle = {
|
||||||
|
|
|
@ -17,15 +17,15 @@ class ElasticNotifierProtocol(asyncio.Protocol):
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
self._listeners.append(self)
|
self._listeners.append(self)
|
||||||
log.warning("got es notifier connection")
|
log.info("got es notifier connection")
|
||||||
|
|
||||||
def connection_lost(self, exc) -> None:
|
def connection_lost(self, exc) -> None:
|
||||||
self._listeners.remove(self)
|
self._listeners.remove(self)
|
||||||
self.transport = None
|
self.transport = None
|
||||||
|
|
||||||
def send_height(self, height: int, block_hash: bytes):
|
def send_height(self, height: int, block_hash: bytes):
|
||||||
log.warning("notify es update '%s'", height)
|
log.info("notify es update '%s'", height)
|
||||||
self.transport.write(struct.pack(b'>Q32s', height, block_hash) + b'\n')
|
self.transport.write(struct.pack(b'>Q32s', height, block_hash))
|
||||||
|
|
||||||
|
|
||||||
class ElasticNotifierClientProtocol(asyncio.Protocol):
|
class ElasticNotifierClientProtocol(asyncio.Protocol):
|
||||||
|
@ -41,11 +41,15 @@ class ElasticNotifierClientProtocol(asyncio.Protocol):
|
||||||
|
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
log.warning("connected to es notifier")
|
log.info("connected to es notifier")
|
||||||
|
|
||||||
def connection_lost(self, exc) -> None:
|
def connection_lost(self, exc) -> None:
|
||||||
self.transport = None
|
self.transport = None
|
||||||
|
|
||||||
def data_received(self, data: bytes) -> None:
|
def data_received(self, data: bytes) -> None:
|
||||||
height, block_hash = struct.unpack(b'>Q32s', data.rstrip(b'\n'))
|
try:
|
||||||
|
height, block_hash = struct.unpack(b'>Q32s', data)
|
||||||
|
except:
|
||||||
|
log.exception("failed to decode %s", (data or b'').hex())
|
||||||
|
raise
|
||||||
self.notifications.put_nowait((height, block_hash))
|
self.notifications.put_nowait((height, block_hash))
|
||||||
|
|
|
@ -55,7 +55,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
server = await asyncio.get_event_loop().create_server(
|
server = await asyncio.get_event_loop().create_server(
|
||||||
lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port
|
lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port
|
||||||
)
|
)
|
||||||
self.log.warning("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port)
|
self.log.info("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port)
|
||||||
synchronized.set()
|
synchronized.set()
|
||||||
async with server:
|
async with server:
|
||||||
await server.serve_forever()
|
await server.serve_forever()
|
||||||
|
@ -63,7 +63,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
def notify_es_notification_listeners(self, height: int, block_hash: bytes):
|
def notify_es_notification_listeners(self, height: int, block_hash: bytes):
|
||||||
for p in self._listeners:
|
for p in self._listeners:
|
||||||
p.send_height(height, block_hash)
|
p.send_height(height, block_hash)
|
||||||
self.log.warning("notify listener %i", height)
|
self.log.info("notify listener %i", height)
|
||||||
|
|
||||||
def _read_es_height(self):
|
def _read_es_height(self):
|
||||||
with open(self._es_info_path, 'r') as f:
|
with open(self._es_info_path, 'r') as f:
|
||||||
|
@ -174,7 +174,6 @@ class ElasticWriter(BlockchainReader):
|
||||||
for touched in self._touched_claims:
|
for touched in self._touched_claims:
|
||||||
claim = self.db.claim_producer(touched)
|
claim = self.db.claim_producer(touched)
|
||||||
if claim:
|
if claim:
|
||||||
self.log.warning("send es %s %i", claim['claim_id'], claim['activation_height'])
|
|
||||||
yield {
|
yield {
|
||||||
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
|
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
|
||||||
'_id': claim['claim_id'],
|
'_id': claim['claim_id'],
|
||||||
|
@ -183,7 +182,6 @@ class ElasticWriter(BlockchainReader):
|
||||||
'doc_as_upsert': True
|
'doc_as_upsert': True
|
||||||
}
|
}
|
||||||
for claim_hash, notifications in self._trending.items():
|
for claim_hash, notifications in self._trending.items():
|
||||||
self.log.warning("send es trending for %s", claim_hash.hex())
|
|
||||||
yield {
|
yield {
|
||||||
'_id': claim_hash.hex(),
|
'_id': claim_hash.hex(),
|
||||||
'_index': self.index,
|
'_index': self.index,
|
||||||
|
@ -218,7 +216,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
for to_del in touched_or_deleted.deleted_claims:
|
for to_del in touched_or_deleted.deleted_claims:
|
||||||
if to_del in self._trending:
|
if to_del in self._trending:
|
||||||
self._trending.pop(to_del)
|
self._trending.pop(to_del)
|
||||||
self.log.warning("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims),
|
self.log.info("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims),
|
||||||
len(self._touched_claims), len(self._deleted_claims))
|
len(self._touched_claims), len(self._deleted_claims))
|
||||||
self._advanced = True
|
self._advanced = True
|
||||||
|
|
||||||
|
@ -263,7 +261,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
success += 1
|
success += 1
|
||||||
await self.sync_client.indices.refresh(self.index)
|
await self.sync_client.indices.refresh(self.index)
|
||||||
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
|
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
|
||||||
self.log.warning("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt)
|
self.log.info("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt)
|
||||||
self._touched_claims.clear()
|
self._touched_claims.clear()
|
||||||
self._deleted_claims.clear()
|
self._deleted_claims.clear()
|
||||||
self._removed_during_undo.clear()
|
self._removed_during_undo.clear()
|
||||||
|
|
|
@ -207,7 +207,7 @@ class MemPool:
|
||||||
for session in self.session_manager.sessions.values() if session.subscribe_headers
|
for session in self.session_manager.sessions.values() if session.subscribe_headers
|
||||||
]
|
]
|
||||||
if header_tasks:
|
if header_tasks:
|
||||||
self.logger.warning(f'notify {len(header_tasks)} sessions of new header')
|
self.logger.info(f'notify {len(header_tasks)} sessions of new header')
|
||||||
asyncio.create_task(asyncio.wait(header_tasks))
|
asyncio.create_task(asyncio.wait(header_tasks))
|
||||||
for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()):
|
for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()):
|
||||||
self.session_manager.mempool_statuses.pop(hashX, None)
|
self.session_manager.mempool_statuses.pop(hashX, None)
|
||||||
|
@ -230,4 +230,4 @@ class MemPool:
|
||||||
for session_id, hashXes in session_hashxes_to_notify.items():
|
for session_id, hashXes in session_hashxes_to_notify.items():
|
||||||
asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes))
|
asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes))
|
||||||
if session_hashxes_to_notify:
|
if session_hashxes_to_notify:
|
||||||
self.logger.warning(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
|
self.logger.info(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
|
||||||
|
|
|
@ -950,7 +950,6 @@ class LBRYElectrumX(SessionBase):
|
||||||
|
|
||||||
self.session_manager.address_history_metric.observe(time.perf_counter() - start)
|
self.session_manager.address_history_metric.observe(time.perf_counter() - start)
|
||||||
notifications.append((method, (alias, status)))
|
notifications.append((method, (alias, status)))
|
||||||
print(f"notify {alias} {method}")
|
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
self.session_manager.notifications_in_flight_metric.inc()
|
self.session_manager.notifications_in_flight_metric.inc()
|
||||||
|
@ -1506,10 +1505,6 @@ class LBRYElectrumX(SessionBase):
|
||||||
'block_height': block_height
|
'block_height': block_height
|
||||||
}
|
}
|
||||||
await asyncio.sleep(0) # heavy call, give other tasks a chance
|
await asyncio.sleep(0) # heavy call, give other tasks a chance
|
||||||
print("return tx batch")
|
|
||||||
for tx_hash, (_, info) in batch_result.items():
|
|
||||||
print(tx_hash, info['block_height'])
|
|
||||||
|
|
||||||
self.session_manager.tx_replied_count_metric.inc(len(tx_hashes))
|
self.session_manager.tx_replied_count_metric.inc(len(tx_hashes))
|
||||||
return batch_result
|
return batch_result
|
||||||
|
|
||||||
|
|
|
@ -1604,7 +1604,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
||||||
await self.blockchain.generate(2)
|
await self.blockchain.generate(2)
|
||||||
|
|
||||||
# wait for the client to catch up and verify the reorg
|
# wait for the client to catch up and verify the reorg
|
||||||
await asyncio.wait_for(self.on_header(209), 30.0)
|
await asyncio.wait_for(self.on_header(209), 3.0)
|
||||||
await self.assertBlockHash(207)
|
await self.assertBlockHash(207)
|
||||||
await self.assertBlockHash(208)
|
await self.assertBlockHash(208)
|
||||||
await self.assertBlockHash(209)
|
await self.assertBlockHash(209)
|
||||||
|
@ -1633,7 +1633,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
||||||
await self.blockchain.generate(1)
|
await self.blockchain.generate(1)
|
||||||
|
|
||||||
# wait for the client to catch up
|
# wait for the client to catch up
|
||||||
await asyncio.wait_for(self.on_header(210), 30.0)
|
await asyncio.wait_for(self.on_header(210), 3.0)
|
||||||
|
|
||||||
# verify the claim is in the new block and that it is returned by claim_search
|
# verify the claim is in the new block and that it is returned by claim_search
|
||||||
republished = await self.resolve('hovercraft')
|
republished = await self.resolve('hovercraft')
|
||||||
|
@ -1712,7 +1712,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
|
||||||
await self.blockchain.generate(1)
|
await self.blockchain.generate(1)
|
||||||
|
|
||||||
# wait for the client to catch up
|
# wait for the client to catch up
|
||||||
await asyncio.wait_for(self.on_header(210), 10.0)
|
await asyncio.wait_for(self.on_header(210), 1.0)
|
||||||
|
|
||||||
# verify the claim is in the new block and that it is returned by claim_search
|
# verify the claim is in the new block and that it is returned by claim_search
|
||||||
republished = await self.resolve('hovercraft')
|
republished = await self.resolve('hovercraft')
|
||||||
|
|
Loading…
Add table
Reference in a new issue