add CACHE_ALL_TX_HASHES setting to optionally use more memory to save i/o

This commit is contained in:
Jack Robison 2021-10-15 20:32:11 -04:00
parent 7ea1a2b361
commit bfac02ccab
3 changed files with 51 additions and 15 deletions

View file

@ -490,9 +490,7 @@ class BlockProcessor:
if signing_channel: if signing_channel:
raw_channel_tx = self.db.prefix_db.tx.get( raw_channel_tx = self.db.prefix_db.tx.get(
self.db.prefix_db.tx_hash.get( self.db.get_tx_hash(signing_channel.tx_num), deserialize_value=False
signing_channel.tx_num, deserialize_value=False
), deserialize_value=False
) )
channel_pub_key_bytes = None channel_pub_key_bytes = None
try: try:
@ -1501,6 +1499,9 @@ class BlockProcessor:
self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name)
self.pending_transactions[tx_count] = tx_hash self.pending_transactions[tx_count] = tx_hash
self.pending_transaction_num_mapping[tx_hash] = tx_count self.pending_transaction_num_mapping[tx_hash] = tx_count
if self.env.cache_all_tx_hashes:
self.db.total_transactions.append(tx_hash)
self.db.tx_num_mapping[tx_hash] = tx_count
tx_count += 1 tx_count += 1
# handle expired claims # handle expired claims
@ -1608,7 +1609,12 @@ class BlockProcessor:
self.db.headers.pop() self.db.headers.pop()
self.db.tx_counts.pop() self.db.tx_counts.pop()
self.tip = self.coin.header_hash(self.db.headers[-1]) self.tip = self.coin.header_hash(self.db.headers[-1])
self.tx_count = self.db.tx_counts[-1] if self.env.cache_all_tx_hashes:
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
self.db.tx_num_mapping.pop(self.db.total_transactions.pop())
self.tx_count -= 1
else:
self.tx_count = self.db.tx_counts[-1]
self.height -= 1 self.height -= 1
# self.touched can include other addresses which is # self.touched can include other addresses which is
# harmless, but remove None. # harmless, but remove None.
@ -1659,7 +1665,7 @@ class BlockProcessor:
if tx_hash in self.pending_transaction_num_mapping: if tx_hash in self.pending_transaction_num_mapping:
return self.pending_transaction_num_mapping[tx_hash] return self.pending_transaction_num_mapping[tx_hash]
else: else:
return self.db.prefix_db.tx_num.get(tx_hash).tx_num return self.db.get_tx_num(tx_hash)
def spend_utxo(self, tx_hash: bytes, nout: int): def spend_utxo(self, tx_hash: bytes, nout: int):
hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None)) hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None))

View file

@ -78,6 +78,7 @@ class Env:
self.anon_logs = self.boolean('ANON_LOGS', False) self.anon_logs = self.boolean('ANON_LOGS', False)
self.log_sessions = self.integer('LOG_SESSIONS', 3600) self.log_sessions = self.integer('LOG_SESSIONS', 3600)
self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False)
self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False)
self.country = self.default('COUNTRY', 'US') self.country = self.default('COUNTRY', 'US')
# Peer discovery # Peer discovery
self.peer_discovery = self.peer_discovery_enum() self.peer_discovery = self.peer_discovery_enum()

View file

@ -113,6 +113,8 @@ class LevelDB:
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server") self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {}
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
@ -201,7 +203,7 @@ class LevelDB:
normalized_name = name normalized_name = name
controlling_claim = self.get_controlling_claim(normalized_name) controlling_claim = self.get_controlling_claim(normalized_name)
tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) tx_hash = self.get_tx_hash(tx_num)
height = bisect_right(self.tx_counts, tx_num) height = bisect_right(self.tx_counts, tx_num)
created_height = bisect_right(self.tx_counts, root_tx_num) created_height = bisect_right(self.tx_counts, root_tx_num)
last_take_over_height = controlling_claim.height last_take_over_height = controlling_claim.height
@ -462,7 +464,7 @@ class LevelDB:
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]: def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
expired = {} expired = {}
for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)): for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)):
tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False) tx_hash = self.get_tx_hash(k.tx_num)
tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False)) tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False))
# treat it like a claim spend so it will delete/abandon properly # treat it like a claim spend so it will delete/abandon properly
# the _spend_claim function this result is fed to expects a txi, so make a mock one # the _spend_claim function this result is fed to expects a txi, so make a mock one
@ -527,7 +529,7 @@ class LevelDB:
if not reposted_claim: if not reposted_claim:
return return
reposted_metadata = self.get_claim_metadata( reposted_metadata = self.get_claim_metadata(
self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
) )
if not reposted_metadata: if not reposted_metadata:
return return
@ -541,7 +543,7 @@ class LevelDB:
reposted_fee_currency = None reposted_fee_currency = None
reposted_duration = None reposted_duration = None
if reposted_claim: if reposted_claim:
reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False) reposted_tx_hash = self.get_tx_hash(reposted_claim.tx_num)
raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False) raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False)
try: try:
reposted_claim_txo = self.coin.transaction( reposted_claim_txo = self.coin.transaction(
@ -793,6 +795,21 @@ class LevelDB:
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers self.headers = headers
async def _read_tx_hashes(self):
def _read_tx_hashes():
return list(self.prefix_db.tx_hash.iterate(include_key=False, fill_cache=False, deserialize_value=False))
self.logger.info("loading tx hashes")
self.total_transactions.clear()
self.tx_num_mapping.clear()
start = time.perf_counter()
self.total_transactions.extend(await asyncio.get_event_loop().run_in_executor(None, _read_tx_hashes))
self.tx_num_mapping = {
tx_hash: tx_num for tx_num, tx_hash in enumerate(self.total_transactions)
}
ts = time.perf_counter() - start
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))
def estimate_timestamp(self, height: int) -> int: def estimate_timestamp(self, height: int) -> int:
if height < len(self.headers): if height < len(self.headers):
return struct.unpack('<I', self.headers[height][100:104])[0] return struct.unpack('<I', self.headers[height][100:104])[0]
@ -837,6 +854,8 @@ class LevelDB:
await self._read_tx_counts() await self._read_tx_counts()
await self._read_headers() await self._read_headers()
await self._read_claim_txos() await self._read_claim_txos()
if self.env.cache_all_tx_hashes:
await self._read_tx_hashes()
# start search index # start search index
await self.search_index.start() await self.search_index.start()
@ -844,6 +863,16 @@ class LevelDB:
def close(self): def close(self):
self.prefix_db.close() self.prefix_db.close()
def get_tx_hash(self, tx_num: int) -> bytes:
if self.env.cache_all_tx_hashes:
return self.total_transactions[tx_num]
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
def get_tx_num(self, tx_hash: bytes) -> int:
if self.env.cache_all_tx_hashes:
return self.tx_num_mapping[tx_hash]
return self.prefix_db.tx_num.get(tx_hash).tx_num
# Header merkle cache # Header merkle cache
async def populate_header_merkle_cache(self): async def populate_header_merkle_cache(self):
@ -900,7 +929,7 @@ class LevelDB:
if tx_height > self.db_height: if tx_height > self.db_height:
return None, tx_height return None, tx_height
try: try:
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height return self.get_tx_hash(tx_num), tx_height
except IndexError: except IndexError:
self.logger.exception( self.logger.exception(
"Failed to access a cached transaction, known bug #3142 " "Failed to access a cached transaction, known bug #3142 "
@ -964,13 +993,13 @@ class LevelDB:
txs = [] txs = []
txs_extend = txs.extend txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
txs_extend([ txs_extend(hist)
(self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num))
for tx_num in hist
])
if len(txs) >= limit: if len(txs) >= limit:
break break
return txs return [
(self.get_tx_hash(tx_num), bisect_right(self.tx_counts, tx_num))
for tx_num in txs
]
async def limited_history(self, hashX, *, limit=1000): async def limited_history(self, hashX, *, limit=1000):
"""Return an unpruned, sorted list of (tx_hash, height) tuples of """Return an unpruned, sorted list of (tx_hash, height) tuples of