diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index f541ed0d1..c24efe6f6 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -490,9 +490,7 @@ class BlockProcessor: if signing_channel: raw_channel_tx = self.db.prefix_db.tx.get( - self.db.prefix_db.tx_hash.get( - signing_channel.tx_num, deserialize_value=False - ), deserialize_value=False + self.db.get_tx_hash(signing_channel.tx_num), deserialize_value=False ) channel_pub_key_bytes = None try: @@ -1501,6 +1499,9 @@ class BlockProcessor: self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name) self.pending_transactions[tx_count] = tx_hash self.pending_transaction_num_mapping[tx_hash] = tx_count + if self.env.cache_all_tx_hashes: + self.db.total_transactions.append(tx_hash) + self.db.tx_num_mapping[tx_hash] = tx_count tx_count += 1 # handle expired claims @@ -1608,7 +1609,12 @@ class BlockProcessor: self.db.headers.pop() self.db.tx_counts.pop() self.tip = self.coin.header_hash(self.db.headers[-1]) - self.tx_count = self.db.tx_counts[-1] + if self.env.cache_all_tx_hashes: + while len(self.db.total_transactions) > self.db.tx_counts[-1]: + self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + self.tx_count -= 1 + else: + self.tx_count = self.db.tx_counts[-1] self.height -= 1 # self.touched can include other addresses which is # harmless, but remove None. @@ -1659,7 +1665,7 @@ class BlockProcessor: if tx_hash in self.pending_transaction_num_mapping: return self.pending_transaction_num_mapping[tx_hash] else: - return self.db.prefix_db.tx_num.get(tx_hash).tx_num + return self.db.get_tx_num(tx_hash) def spend_utxo(self, tx_hash: bytes, nout: int): hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None)) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 2b4c489b3..82ce3d7fc 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -78,6 +78,7 @@ class Env: self.anon_logs = self.boolean('ANON_LOGS', False) self.log_sessions = self.integer('LOG_SESSIONS', 3600) self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) + self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False) self.country = self.default('COUNTRY', 'US') # Peer discovery self.peer_discovery = self.peer_discovery_enum() diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index b5fbdc313..7348dc086 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -113,6 +113,8 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server") + self.total_transactions: List[bytes] = [] + self.tx_num_mapping: Dict[bytes, int] = {} self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) @@ -201,7 +203,7 @@ class LevelDB: normalized_name = name controlling_claim = self.get_controlling_claim(normalized_name) - tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) + tx_hash = self.get_tx_hash(tx_num) height = bisect_right(self.tx_counts, tx_num) created_height = bisect_right(self.tx_counts, root_tx_num) last_take_over_height = controlling_claim.height @@ -462,7 +464,7 @@ class LevelDB: def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]: expired = {} for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)): - tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False) + tx_hash = self.get_tx_hash(k.tx_num) tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False)) # treat it like a claim spend so it will delete/abandon properly # the _spend_claim function this result is fed to expects a txi, so make a mock one @@ -527,7 +529,7 @@ class LevelDB: if not reposted_claim: return reposted_metadata = self.get_claim_metadata( - self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position + self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position ) if not reposted_metadata: return @@ -541,7 +543,7 @@ class LevelDB: reposted_fee_currency = None reposted_duration = None if reposted_claim: - reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False) + reposted_tx_hash = self.get_tx_hash(reposted_claim.tx_num) raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False) try: reposted_claim_txo = self.coin.transaction( @@ -793,6 +795,21 @@ class LevelDB: assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" self.headers = headers + async def _read_tx_hashes(self): + def _read_tx_hashes(): + return list(self.prefix_db.tx_hash.iterate(include_key=False, fill_cache=False, deserialize_value=False)) + + self.logger.info("loading tx hashes") + self.total_transactions.clear() + self.tx_num_mapping.clear() + start = time.perf_counter() + self.total_transactions.extend(await asyncio.get_event_loop().run_in_executor(None, _read_tx_hashes)) + self.tx_num_mapping = { + tx_hash: tx_num for tx_num, tx_hash in enumerate(self.total_transactions) + } + ts = time.perf_counter() - start + self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4)) + def estimate_timestamp(self, height: int) -> int: if height < len(self.headers): return struct.unpack(' bytes: + if self.env.cache_all_tx_hashes: + return self.total_transactions[tx_num] + return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) + + def get_tx_num(self, tx_hash: bytes) -> int: + if self.env.cache_all_tx_hashes: + return self.tx_num_mapping[tx_hash] + return self.prefix_db.tx_num.get(tx_hash).tx_num + # Header merkle cache async def populate_header_merkle_cache(self): @@ -900,7 +929,7 @@ class LevelDB: if tx_height > self.db_height: return None, tx_height try: - return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height + return self.get_tx_hash(tx_num), tx_height except IndexError: self.logger.exception( "Failed to access a cached transaction, known bug #3142 " @@ -964,13 +993,13 @@ class LevelDB: txs = [] txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): - txs_extend([ - (self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num)) - for tx_num in hist - ]) + txs_extend(hist) if len(txs) >= limit: break - return txs + return [ + (self.get_tx_hash(tx_num), bisect_right(self.tx_counts, tx_num)) + for tx_num in txs + ] async def limited_history(self, hashX, *, limit=1000): """Return an unpruned, sorted list of (tx_hash, height) tuples of