logging
This commit is contained in:
parent
704ec9e553
commit
32d2208fd9
5 changed files with 14 additions and 18 deletions
|
@ -122,7 +122,7 @@ class ClientSession(BaseClientSession):
|
||||||
await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
|
await asyncio.sleep(max(0, max_idle - (now - self.last_send)))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if isinstance(err, asyncio.CancelledError):
|
if isinstance(err, asyncio.CancelledError):
|
||||||
log.warning("closing connection to %s:%i", *self.server)
|
log.info("closing connection to %s:%i", *self.server)
|
||||||
else:
|
else:
|
||||||
log.exception("lost connection to spv")
|
log.exception("lost connection to spv")
|
||||||
finally:
|
finally:
|
||||||
|
@ -140,7 +140,7 @@ class ClientSession(BaseClientSession):
|
||||||
controller.add(request.args)
|
controller.add(request.args)
|
||||||
|
|
||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
log.warning("Connection lost: %s:%d", *self.server)
|
log.debug("Connection lost: %s:%d", *self.server)
|
||||||
super().connection_lost(exc)
|
super().connection_lost(exc)
|
||||||
self.response_time = None
|
self.response_time = None
|
||||||
self.connection_latency = None
|
self.connection_latency = None
|
||||||
|
@ -303,7 +303,7 @@ class Network:
|
||||||
concurrency=self.config.get('concurrent_hub_requests', 30))
|
concurrency=self.config.get('concurrent_hub_requests', 30))
|
||||||
try:
|
try:
|
||||||
await client.create_connection()
|
await client.create_connection()
|
||||||
log.warning("Connected to spv server %s:%i", host, port)
|
log.info("Connected to spv server %s:%i", host, port)
|
||||||
await client.ensure_server_version()
|
await client.ensure_server_version()
|
||||||
return client
|
return client
|
||||||
except (asyncio.TimeoutError, ConnectionError, OSError, IncompatibleWalletServerError, RPCError):
|
except (asyncio.TimeoutError, ConnectionError, OSError, IncompatibleWalletServerError, RPCError):
|
||||||
|
@ -357,7 +357,7 @@ class Network:
|
||||||
self._keepalive_task = None
|
self._keepalive_task = None
|
||||||
self.client = None
|
self.client = None
|
||||||
self.server_features = None
|
self.server_features = None
|
||||||
log.warning("connection lost to %s", server_str)
|
log.info("connection lost to %s", server_str)
|
||||||
log.info("network loop finished")
|
log.info("network loop finished")
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
|
|
|
@ -13,7 +13,6 @@ from collections import defaultdict
|
||||||
import lbry
|
import lbry
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
||||||
from lbry.utils import LRUCache
|
|
||||||
from lbry.wallet.rpc.jsonrpc import RPCError
|
from lbry.wallet.rpc.jsonrpc import RPCError
|
||||||
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
||||||
from lbry.wallet.server.hash import hash_to_hex_str
|
from lbry.wallet.server.hash import hash_to_hex_str
|
||||||
|
@ -26,6 +25,7 @@ from lbry.wallet.server.db.db import HubDB
|
||||||
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
from lbry.wallet.transaction import OutputScript, Output, Transaction
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
from lbry.wallet.server.env import Env
|
||||||
from lbry.wallet.server.db.revertable import RevertableOpStack
|
from lbry.wallet.server.db.revertable import RevertableOpStack
|
||||||
|
|
||||||
|
|
||||||
|
@ -263,16 +263,12 @@ class BlockProcessor:
|
||||||
"applying extended claim expiration fork on claims accepted by, %i", self.height
|
"applying extended claim expiration fork on claims accepted by, %i", self.height
|
||||||
)
|
)
|
||||||
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
|
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
|
||||||
# print("******************\n")
|
|
||||||
except:
|
except:
|
||||||
self.logger.exception("advance blocks failed")
|
self.logger.exception("advance blocks failed")
|
||||||
raise
|
raise
|
||||||
processed_time = time.perf_counter() - total_start
|
processed_time = time.perf_counter() - total_start
|
||||||
self.block_count_metric.set(self.height)
|
self.block_count_metric.set(self.height)
|
||||||
self.block_update_time_metric.observe(processed_time)
|
self.block_update_time_metric.observe(processed_time)
|
||||||
if not self.db.first_sync:
|
|
||||||
s = '' if len(blocks) == 1 else 's'
|
|
||||||
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
|
|
||||||
self.touched_hashXs.clear()
|
self.touched_hashXs.clear()
|
||||||
elif hprevs[0] != chain[0]:
|
elif hprevs[0] != chain[0]:
|
||||||
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
|
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
|
||||||
|
|
|
@ -180,9 +180,10 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
self.clear_search_cache()
|
self.clear_search_cache()
|
||||||
if self.last_state and self._es_block_hash == self.last_state.tip:
|
if self.last_state and self._es_block_hash == self.last_state.tip:
|
||||||
self.synchronized.set()
|
self.synchronized.set()
|
||||||
self.log.info("es and reader are in sync")
|
self.log.info("es and reader are in sync at block %i", self.last_state.height)
|
||||||
else:
|
else:
|
||||||
self.log.info("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height)
|
self.log.info("es and reader are not yet in sync (block %s vs %s)", self._es_height,
|
||||||
|
self.db.db_height)
|
||||||
finally:
|
finally:
|
||||||
self.es_notification_client.close()
|
self.es_notification_client.close()
|
||||||
|
|
||||||
|
@ -208,8 +209,6 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
|
|
||||||
await self.start_prometheus()
|
await self.start_prometheus()
|
||||||
if self.env.udp_port and int(self.env.udp_port):
|
if self.env.udp_port and int(self.env.udp_port):
|
||||||
self.log.warning("country=%s interface=%s:%s allow_lan=%s", self.env.country,
|
|
||||||
self.env.host, self.env.udp_port, self.env.allow_lan_udp)
|
|
||||||
await self.status_server.start(
|
await self.status_server.start(
|
||||||
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
||||||
self.env.host, self.env.udp_port, self.env.allow_lan_udp
|
self.env.host, self.env.udp_port, self.env.allow_lan_udp
|
||||||
|
|
|
@ -234,8 +234,6 @@ class ElasticWriter(BlockchainReader):
|
||||||
for to_del in touched_or_deleted.deleted_claims:
|
for to_del in touched_or_deleted.deleted_claims:
|
||||||
if to_del in self._trending:
|
if to_del in self._trending:
|
||||||
self._trending.pop(to_del)
|
self._trending.pop(to_del)
|
||||||
self.log.info("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims),
|
|
||||||
len(self._touched_claims), len(self._deleted_claims))
|
|
||||||
self._advanced = True
|
self._advanced = True
|
||||||
|
|
||||||
def unwind(self):
|
def unwind(self):
|
||||||
|
@ -314,7 +312,10 @@ class ElasticWriter(BlockchainReader):
|
||||||
await _start_cancellable(self.run_es_notifier)
|
await _start_cancellable(self.run_es_notifier)
|
||||||
|
|
||||||
if reindex or self._last_wrote_height == 0 and self.db.db_height > 0:
|
if reindex or self._last_wrote_height == 0 and self.db.db_height > 0:
|
||||||
self.log.warning("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height)
|
if self._last_wrote_height == 0:
|
||||||
|
self.log.info("running initial ES indexing of rocksdb at block height %i", self.db.db_height)
|
||||||
|
else:
|
||||||
|
self.log.info("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height)
|
||||||
await self.reindex()
|
await self.reindex()
|
||||||
await _start_cancellable(self.refresh_blocks_forever)
|
await _start_cancellable(self.refresh_blocks_forever)
|
||||||
|
|
||||||
|
@ -360,7 +361,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
await self.sync_client.indices.refresh(self.index)
|
await self.sync_client.indices.refresh(self.index)
|
||||||
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
|
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
|
||||||
self.notify_es_notification_listeners(self.db.db_height, self.db.db_tip)
|
self.notify_es_notification_listeners(self.db.db_height, self.db.db_tip)
|
||||||
self.log.warning("finished reindexing")
|
self.log.info("finished reindexing")
|
||||||
|
|
||||||
async def _sync_all_claims(self, batch_size=100000):
|
async def _sync_all_claims(self, batch_size=100000):
|
||||||
def load_historic_trending():
|
def load_historic_trending():
|
||||||
|
|
|
@ -560,7 +560,7 @@ class SessionManager:
|
||||||
raise err
|
raise err
|
||||||
finally:
|
finally:
|
||||||
await self._close_servers(list(self.servers.keys()))
|
await self._close_servers(list(self.servers.keys()))
|
||||||
log.warning("disconnect %i sessions", len(self.sessions))
|
log.info("disconnect %i sessions", len(self.sessions))
|
||||||
if self.sessions:
|
if self.sessions:
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
session.close(force_after=1) for session in self.sessions.values()
|
session.close(force_after=1) for session in self.sessions.values()
|
||||||
|
|
Loading…
Add table
Reference in a new issue