diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index c24efe6f6..66cb6af02 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -404,7 +404,8 @@ class BlockProcessor: await self.backup_block() self.logger.info(f'backed up to height {self.height:,d}') - await self.db._read_claim_txos() # TODO: don't do this + if self.env.cache_all_claim_txos: + await self.db._read_claim_txos() # TODO: don't do this for touched in self.touched_claims_to_send_es: if not self.db.get_claim_txo(touched): self.removed_claims_to_send_es.add(touched) @@ -545,10 +546,11 @@ class BlockProcessor: previous_amount = previous_claim.amount self.updated_claims.add(claim_hash) - self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( - tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name - ) - self.db.txo_to_claim[tx_num][nout] = claim_hash + if self.env.cache_all_claim_txos: + self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( + tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name + ) + self.db.txo_to_claim[tx_num][nout] = claim_hash pending = StagedClaimtrieItem( claim_name, normalized_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, @@ -693,7 +695,7 @@ class BlockProcessor: if (txin_num, txin.prev_idx) in self.txo_to_claim: spent = self.txo_to_claim[(txin_num, txin.prev_idx)] else: - if txin_num not in self.db.txo_to_claim or txin.prev_idx not in self.db.txo_to_claim[txin_num]: + if not self.db.get_cached_claim_exists(txin_num, txin.prev_idx): # txo is not a claim return False spent_claim_hash_and_name = self.db.get_claim_from_txo( @@ -701,10 +703,12 @@ class BlockProcessor: ) assert spent_claim_hash_and_name is not None spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) - claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) - if not self.db.txo_to_claim[txin_num]: - self.db.txo_to_claim.pop(txin_num) - self.db.claim_to_txo.pop(claim_hash) + + if self.env.cache_all_claim_txos: + claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) + if not self.db.txo_to_claim[txin_num]: + self.db.txo_to_claim.pop(txin_num) + self.db.claim_to_txo.pop(claim_hash) if spent.reposted_claim_hash: self.pending_reposted.add(spent.reposted_claim_hash) if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims: @@ -1022,8 +1026,8 @@ class BlockProcessor: # prepare to activate or delay activation of the pending claims being added this block for (tx_num, nout), staged in self.txo_to_claim.items(): is_delayed = not staged.is_update - if staged.claim_hash in self.db.claim_to_txo: - prev_txo = self.db.claim_to_txo[staged.claim_hash] + prev_txo = self.db.get_cached_claim_txo(staged.claim_hash) + if prev_txo: prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position) if height < prev_activation or prev_activation < 0: is_delayed = True diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 8b1603312..d71b941b4 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -85,6 +85,18 @@ class PrefixRow(metaclass=PrefixRowType): if v: return v if not deserialize_value else self.unpack_value(v) + def get_pending(self, *key_args, fill_cache=True, deserialize_value=True): + packed_key = self.pack_key(*key_args) + last_op = self._op_stack.get_last_op_for_key(packed_key) + if last_op: + if last_op.is_put: + return last_op.value if not deserialize_value else self.unpack_value(last_op.value) + else: # it's a delete + return + v = self._db.get(packed_key, fill_cache=fill_cache) + if v: + return v if not deserialize_value else self.unpack_value(v) + def stage_put(self, key_args=(), value_args=()): self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))) diff --git a/lbry/wallet/server/db/revertable.py b/lbry/wallet/server/db/revertable.py index 2a05f2a7d..099c2b48e 100644 --- a/lbry/wallet/server/db/revertable.py +++ b/lbry/wallet/server/db/revertable.py @@ -169,3 +169,7 @@ class RevertableOpStack: while packed: op, packed = RevertableOp.unpack(packed) self.append_op(op) + + def get_last_op_for_key(self, key: bytes) -> Optional[RevertableOp]: + if key in self._items and self._items[key]: + return self._items[key][-1] diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 82ce3d7fc..ff9e2a316 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -79,6 +79,7 @@ class Env: self.log_sessions = self.integer('LOG_SESSIONS', 3600) self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False) + self.cache_all_claim_txos = self.boolean('CACHE_ALL_CLAIM_TXOS', False) self.country = self.default('COUNTRY', 'US') # Peer discovery self.peer_discovery = self.peer_discovery_enum() diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 7348dc086..3839c1ee3 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -113,9 +113,12 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server") + + # these are only used if the cache_all_tx_hashes setting is on self.total_transactions: List[bytes] = [] self.tx_num_mapping: Dict[bytes, int] = {} + # these are only used if the cache_all_claim_txos setting is on self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) @@ -210,7 +213,7 @@ class LevelDB: expiration_height = self.coin.get_expiration_height(height) support_amount = self.get_support_amount(claim_hash) - claim_amount = self.claim_to_txo[claim_hash].amount + claim_amount = self.get_cached_claim_txo(claim_hash).amount effective_amount = support_amount + claim_amount channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) @@ -219,7 +222,7 @@ class LevelDB: canonical_url = short_url claims_in_channel = self.get_claims_in_channel_count(claim_hash) if channel_hash: - channel_vals = self.claim_to_txo.get(channel_hash) + channel_vals = self.get_cached_claim_txo(channel_hash) if channel_vals: channel_short_url = self.get_short_claim_id_url( channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num, @@ -271,11 +274,13 @@ class LevelDB: ) # resolve by partial/complete claim id for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])): - claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position] - non_normalized_name = self.claim_to_txo.get(claim_hash).name - signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid + full_claim_hash = self.get_cached_claim_hash(claim_txo.tx_num, claim_txo.position) + c = self.get_cached_claim_txo(full_claim_hash) + + non_normalized_name = c.name + signature_is_valid = c.channel_signature_is_valid return self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, non_normalized_name, key.root_tx_num, + claim_txo.tx_num, claim_txo.position, full_claim_hash, non_normalized_name, key.root_tx_num, key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position), signature_is_valid ) @@ -285,7 +290,7 @@ class LevelDB: for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))): if amount_order > idx + 1: continue - claim_txo = self.claim_to_txo.get(claim_val.claim_hash) + claim_txo = self.get_cached_claim_txo(claim_val.claim_hash) activation = self.get_activation(key.tx_num, key.position) return self._prepare_resolve_result( key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num, @@ -360,7 +365,7 @@ class LevelDB: return await asyncio.get_event_loop().run_in_executor(None, self._resolve, url) def _fs_get_claim_by_hash(self, claim_hash): - claim = self.claim_to_txo.get(claim_hash) + claim = self.get_cached_claim_txo(claim_hash) if claim: activation = self.get_activation(claim.tx_num, claim.position) return self._prepare_resolve_result( @@ -525,7 +530,7 @@ class LevelDB: reposted_claim = None reposted_metadata = None if reposted_claim_hash: - reposted_claim = self.claim_to_txo.get(reposted_claim_hash) + reposted_claim = self.get_cached_claim_txo(reposted_claim_hash) if not reposted_claim: return reposted_metadata = self.get_claim_metadata( @@ -677,11 +682,21 @@ class LevelDB: async def all_claims_producer(self, batch_size=500_000): batch = [] - for claim_hash, claim_txo in self.claim_to_txo.items(): + if self.env.cache_all_claim_txos: + claim_iterator = self.claim_to_txo.items() + else: + claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate()) + + for claim_hash, claim_txo in claim_iterator: # TODO: fix the couple of claim txos that dont have controlling names if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): continue - claim = self._fs_get_claim_by_hash(claim_hash) + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: batch.append(claim) if len(batch) == batch_size: @@ -703,16 +718,14 @@ class LevelDB: results = [] for claim_hash in claim_hashes: - if claim_hash not in self.claim_to_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - name = self.claim_to_txo[claim_hash].normalized_name - if not self.prefix_db.claim_takeover.get(name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - claim_txo = self.claim_to_txo.get(claim_hash) + claim_txo = self.get_cached_claim_txo(claim_hash) if not claim_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) claim = self._prepare_resolve_result( claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, @@ -764,7 +777,6 @@ class LevelDB: else: assert self.db_tx_count == 0 - async def _read_claim_txos(self): def read_claim_txos(): set_claim_to_txo = self.claim_to_txo.__setitem__ @@ -853,7 +865,8 @@ class LevelDB: # Read TX counts (requires meta directory) await self._read_tx_counts() await self._read_headers() - await self._read_claim_txos() + if self.env.cache_all_claim_txos: + await self._read_claim_txos() if self.env.cache_all_tx_hashes: await self._read_tx_hashes() @@ -873,6 +886,22 @@ class LevelDB: return self.tx_num_mapping[tx_hash] return self.prefix_db.tx_num.get(tx_hash).tx_num + def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]: + if self.env.cache_all_claim_txos: + return self.claim_to_txo.get(claim_hash) + return self.prefix_db.claim_to_txo.get_pending(claim_hash) + + def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]: + if self.env.cache_all_claim_txos: + if tx_num not in self.txo_to_claim: + return + return self.txo_to_claim[tx_num].get(position, None) + v = self.prefix_db.txo_to_claim.get_pending(tx_num, position) + return None if not v else v.claim_hash + + def get_cached_claim_exists(self, tx_num: int, position: int) -> bool: + return self.get_cached_claim_hash(tx_num, position) is not None + # Header merkle cache async def populate_header_merkle_cache(self): @@ -960,7 +989,10 @@ class LevelDB: tx_height = -1 tx_num = None if not tx_num else tx_num.tx_num if tx_num is not None: - fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 + if self.env.cache_all_claim_txos: + fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 + else: + fill_cache = False tx_height = bisect_right(self.tx_counts, tx_num) tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) if tx_height == -1: diff --git a/tests/unit/wallet/server/test_revertable.py b/tests/unit/wallet/server/test_revertable.py index f5729689a..79b4cdb0c 100644 --- a/tests/unit/wallet/server/test_revertable.py +++ b/tests/unit/wallet/server/test_revertable.py @@ -123,6 +123,9 @@ class TestRevertablePrefixDB(unittest.TestCase): self.assertIsNone(self.db.claim_takeover.get(name)) self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height)) + self.assertIsNone(self.db.claim_takeover.get(name)) + self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height) + self.db.commit(10000000) self.assertEqual(10000000, self.db.claim_takeover.get(name).height)