Merge pull request #2967 from lbryio/transaction-cache-size
add `transaction_cache_size` to config
This commit is contained in:
commit
2ee572e68f
8 changed files with 29 additions and 8 deletions
|
@ -636,6 +636,7 @@ class Config(CLIConfig):
|
||||||
"Strategy to use when selecting UTXOs for a transaction",
|
"Strategy to use when selecting UTXOs for a transaction",
|
||||||
STRATEGIES, "standard")
|
STRATEGIES, "standard")
|
||||||
|
|
||||||
|
transaction_cache_size = Integer("Transaction cache size", 100_000)
|
||||||
save_resolved_claims = Toggle(
|
save_resolved_claims = Toggle(
|
||||||
"Save content claims to the database when they are resolved to keep file_list up to date, "
|
"Save content claims to the database when they are resolved to keep file_list up to date, "
|
||||||
"only disable this if file_x commands are not needed", True
|
"only disable this if file_x commands are not needed", True
|
||||||
|
|
|
@ -10,6 +10,7 @@ from asyncio.protocols import DatagramProtocol
|
||||||
from asyncio.transports import DatagramTransport
|
from asyncio.transports import DatagramTransport
|
||||||
|
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
|
from lbry.dht.serialization.bencoding import DecodeError
|
||||||
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
|
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
|
||||||
from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY
|
from lbry.dht.serialization.datagram import RESPONSE_TYPE, ERROR_TYPE, PAGE_KEY
|
||||||
from lbry.dht.error import RemoteException, TransportNotConnected
|
from lbry.dht.error import RemoteException, TransportNotConnected
|
||||||
|
@ -554,7 +555,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-differ
|
def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: # pylint: disable=arguments-differ
|
||||||
try:
|
try:
|
||||||
message = decode_datagram(datagram)
|
message = decode_datagram(datagram)
|
||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError, DecodeError):
|
||||||
self.peer_manager.report_failure(address[0], address[1])
|
self.peer_manager.report_failure(address[0], address[1])
|
||||||
log.warning("Couldn't decode dht datagram from %s: %s", address, binascii.hexlify(datagram).decode())
|
log.warning("Couldn't decode dht datagram from %s: %s", address, binascii.hexlify(datagram).decode())
|
||||||
return
|
return
|
||||||
|
|
|
@ -158,7 +158,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._on_ready_controller = StreamController()
|
self._on_ready_controller = StreamController()
|
||||||
self.on_ready = self._on_ready_controller.stream
|
self.on_ready = self._on_ready_controller.stream
|
||||||
|
|
||||||
self._tx_cache = pylru.lrucache(100000)
|
self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 100_000))
|
||||||
self._update_tasks = TaskGroup()
|
self._update_tasks = TaskGroup()
|
||||||
self._other_tasks = TaskGroup() # that we dont need to start
|
self._other_tasks = TaskGroup() # that we dont need to start
|
||||||
self._utxo_reservation_lock = asyncio.Lock()
|
self._utxo_reservation_lock = asyncio.Lock()
|
||||||
|
|
|
@ -184,6 +184,7 @@ class WalletManager:
|
||||||
'auto_connect': True,
|
'auto_connect': True,
|
||||||
'default_servers': config.lbryum_servers,
|
'default_servers': config.lbryum_servers,
|
||||||
'data_path': config.wallet_dir,
|
'data_path': config.wallet_dir,
|
||||||
|
'tx_cache_size': config.transaction_cache_size
|
||||||
}
|
}
|
||||||
|
|
||||||
wallets_directory = os.path.join(config.wallet_dir, 'wallets')
|
wallets_directory = os.path.join(config.wallet_dir, 'wallets')
|
||||||
|
|
|
@ -40,6 +40,10 @@ from .jsonrpc import Request, JSONRPCConnection, JSONRPCv2, JSONRPC, Batch, Noti
|
||||||
from .jsonrpc import RPCError, ProtocolError
|
from .jsonrpc import RPCError, ProtocolError
|
||||||
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
|
from .framing import BadMagicError, BadChecksumError, OversizedPayloadError, BitcoinFramer, NewlineFramer
|
||||||
|
|
||||||
|
HISTOGRAM_BUCKETS = (
|
||||||
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Connector:
|
class Connector:
|
||||||
|
|
||||||
|
@ -379,7 +383,7 @@ class RPCSession(SessionBase):
|
||||||
for example JSON RPC."""
|
for example JSON RPC."""
|
||||||
|
|
||||||
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE,
|
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE,
|
||||||
labelnames=("method", "version"))
|
labelnames=("method", "version"), buckets=HISTOGRAM_BUCKETS)
|
||||||
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
|
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
|
||||||
namespace=NAMESPACE, labelnames=("method", "version"))
|
namespace=NAMESPACE, labelnames=("method", "version"))
|
||||||
REQUEST_ERRORS_COUNT = Counter(
|
REQUEST_ERRORS_COUNT = Counter(
|
||||||
|
|
|
@ -130,6 +130,9 @@ class ChainError(Exception):
|
||||||
|
|
||||||
|
|
||||||
NAMESPACE = "wallet_server"
|
NAMESPACE = "wallet_server"
|
||||||
|
HISTOGRAM_BUCKETS = (
|
||||||
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class BlockProcessor:
|
class BlockProcessor:
|
||||||
|
@ -142,7 +145,9 @@ class BlockProcessor:
|
||||||
block_count_metric = Gauge(
|
block_count_metric = Gauge(
|
||||||
"block_count", "Number of processed blocks", namespace=NAMESPACE
|
"block_count", "Number of processed blocks", namespace=NAMESPACE
|
||||||
)
|
)
|
||||||
block_update_time_metric = Histogram("block_time", "Block update times", namespace=NAMESPACE)
|
block_update_time_metric = Histogram(
|
||||||
|
"block_time", "Block update times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||||
|
)
|
||||||
reorg_count_metric = Gauge(
|
reorg_count_metric = Gauge(
|
||||||
"reorg_count", "Number of reorgs", namespace=NAMESPACE
|
"reorg_count", "Number of reorgs", namespace=NAMESPACE
|
||||||
)
|
)
|
||||||
|
|
|
@ -119,7 +119,9 @@ class SessionGroup:
|
||||||
|
|
||||||
|
|
||||||
NAMESPACE = "wallet_server"
|
NAMESPACE = "wallet_server"
|
||||||
|
HISTOGRAM_BUCKETS = (
|
||||||
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
||||||
|
)
|
||||||
|
|
||||||
class SessionManager:
|
class SessionManager:
|
||||||
"""Holds global state about all sessions."""
|
"""Holds global state about all sessions."""
|
||||||
|
@ -147,7 +149,9 @@ class SessionManager:
|
||||||
db_error_metric = Counter(
|
db_error_metric = Counter(
|
||||||
"internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE
|
"internal_error", "Number of queries raising unexpected errors", namespace=NAMESPACE
|
||||||
)
|
)
|
||||||
executor_time_metric = Histogram("executor_time", "SQLite executor times", namespace=NAMESPACE)
|
executor_time_metric = Histogram(
|
||||||
|
"executor_time", "SQLite executor times", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||||
|
)
|
||||||
pending_query_metric = Gauge(
|
pending_query_metric = Gauge(
|
||||||
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
|
"pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE
|
||||||
)
|
)
|
||||||
|
@ -990,7 +994,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
except reader.SQLiteInterruptedError as error:
|
except reader.SQLiteInterruptedError as error:
|
||||||
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
metrics.query_interrupt(start, error.metrics)
|
metrics.query_interrupt(start, error.metrics)
|
||||||
self.session_mgr.self.session_mgr.SQLITE_INTERRUPT_COUNT.inc()
|
self.session_mgr.interrupt_count_metric.inc()
|
||||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
|
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out')
|
||||||
except reader.SQLiteOperationalError as error:
|
except reader.SQLiteOperationalError as error:
|
||||||
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
metrics = self.get_metrics_or_placeholder_for_api(query_name)
|
||||||
|
@ -1216,7 +1220,10 @@ class LBRYElectrumX(SessionBase):
|
||||||
return await self.address_status(hashX)
|
return await self.address_status(hashX)
|
||||||
|
|
||||||
async def hashX_unsubscribe(self, hashX, alias):
|
async def hashX_unsubscribe(self, hashX, alias):
|
||||||
del self.hashX_subs[hashX]
|
try:
|
||||||
|
del self.hashX_subs[hashX]
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
def address_to_hashX(self, address):
|
def address_to_hashX(self, address):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -120,6 +120,8 @@ class FakeComponent:
|
||||||
class FakeDelayedWallet(FakeComponent):
|
class FakeDelayedWallet(FakeComponent):
|
||||||
component_name = "wallet"
|
component_name = "wallet"
|
||||||
depends_on = []
|
depends_on = []
|
||||||
|
ledger = None
|
||||||
|
default_wallet = None
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
Loading…
Reference in a new issue