From 02cf478d9121ca0b2ba093c5b219d994e29839c6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 22 Sep 2021 12:13:43 -0400 Subject: [PATCH] improve leveldb caching --- lbry/wallet/server/block_processor.py | 10 +++++++--- lbry/wallet/server/db/prefixes.py | 16 ++++++++++------ lbry/wallet/server/leveldb.py | 20 +++++++++++--------- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 9bdb236b2..ed02612d4 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -517,7 +517,7 @@ class BlockProcessor: self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name ) - self.db.txo_to_claim[(tx_num, nout)] = claim_hash + self.db.txo_to_claim[tx_num][nout] = claim_hash pending = StagedClaimtrieItem( claim_name, normalized_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, @@ -577,14 +577,18 @@ class BlockProcessor: if (txin_num, txin.prev_idx) in self.txo_to_claim: spent = self.txo_to_claim[(txin_num, txin.prev_idx)] else: - if (txin_num, txin.prev_idx) not in self.db.txo_to_claim: # txo is not a claim + if txin_num not in self.db.txo_to_claim or txin.prev_idx not in self.db.txo_to_claim[txin_num]: + # txo is not a claim return False spent_claim_hash_and_name = self.db.get_claim_from_txo( txin_num, txin.prev_idx ) assert spent_claim_hash_and_name is not None spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) - self.db.claim_to_txo.pop(self.db.txo_to_claim.pop((txin_num, txin.prev_idx))) + claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) + if not self.db.txo_to_claim[txin_num]: + self.db.txo_to_claim.pop(txin_num) + self.db.claim_to_txo.pop(claim_hash) if spent.reposted_claim_hash: self.pending_reposted.add(spent.reposted_claim_hash) if spent.signing_hash and spent.channel_signature_is_valid: diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 875900678..a6f8a8dbf 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -43,7 +43,8 @@ class PrefixRow(metaclass=PrefixRowType): self._op_stack = op_stack def iterate(self, prefix=None, start=None, stop=None, - reverse: bool = False, include_key: bool = True, include_value: bool = True): + reverse: bool = False, include_key: bool = True, include_value: bool = True, + fill_cache: bool = True): if not prefix and not start and not stop: prefix = () if prefix is not None: @@ -54,19 +55,22 @@ class PrefixRow(metaclass=PrefixRowType): stop = self.pack_partial_key(*stop) if include_key and include_value: - for k, v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse): + for k, v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, + fill_cache=fill_cache): yield self.unpack_key(k), self.unpack_value(v) elif include_key: - for k in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_value=False): + for k in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_value=False, + fill_cache=fill_cache): yield self.unpack_key(k) elif include_value: - for v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False): + for v in self._db.iterator(prefix=prefix, start=start, stop=stop, reverse=reverse, include_key=False, + fill_cache=fill_cache): yield self.unpack_value(v) else: raise RuntimeError - def get(self, *key_args): - v = self._db.get(self.pack_key(*key_args)) + def get(self, *key_args, fill_cache=True): + v = self._db.get(self.pack_key(*key_args), fill_cache=fill_cache) if v: return self.unpack_value(v) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index df34a30b2..ee226cac2 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -153,7 +153,7 @@ class LevelDB: self.transaction_num_mapping = {} self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} - self.txo_to_claim: Dict[Tuple[int, int], bytes] = {} + self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) # Search index self.search_index = SearchIndex( @@ -318,7 +318,7 @@ class LevelDB: for k, v in self.db.iterator(prefix=prefix): key = Prefixes.claim_short_id.unpack_key(k) claim_txo = Prefixes.claim_short_id.unpack_value(v) - claim_hash = self.txo_to_claim[(claim_txo.tx_num, claim_txo.position)] + claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position] non_normalized_name = self.claim_to_txo.get(claim_hash).name signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid return self._prepare_resolve_result( @@ -820,7 +820,8 @@ class LevelDB: def get_counts(): return tuple( Prefixes.tx_count.unpack_value(packed_tx_count).tx_count - for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False) + for packed_tx_count in self.db.iterator(prefix=Prefixes.tx_count.prefix, include_key=False, + fill_cache=False) ) tx_counts = await asyncio.get_event_loop().run_in_executor(None, get_counts) @@ -835,7 +836,7 @@ class LevelDB: async def _read_txids(self): def get_txids(): - return list(self.db.iterator(prefix=Prefixes.tx_hash.prefix, include_key=False)) + return list(self.db.iterator(prefix=Prefixes.tx_hash.prefix, include_key=False, fill_cache=False)) start = time.perf_counter() self.logger.info("loading txids") @@ -850,11 +851,10 @@ class LevelDB: async def _read_claim_txos(self): def read_claim_txos(): - set_txo_to_claim = self.txo_to_claim.__setitem__ set_claim_to_txo = self.claim_to_txo.__setitem__ - for k, v in self.prefix_db.claim_to_txo.iterate(): + for k, v in self.prefix_db.claim_to_txo.iterate(fill_cache=False): set_claim_to_txo(k.claim_hash, v) - set_txo_to_claim((v.tx_num, v.position), k.claim_hash) + self.txo_to_claim[v.tx_num][v.position] = k.claim_hash self.claim_to_txo.clear() self.txo_to_claim.clear() @@ -870,7 +870,8 @@ class LevelDB: def get_headers(): return [ - header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False) + header for header in self.db.iterator(prefix=Prefixes.header.prefix, include_key=False, + fill_cache=False) ] headers = await asyncio.get_event_loop().run_in_executor(None, get_headers) @@ -1126,8 +1127,9 @@ class LevelDB: tx = None tx_height = -1 if tx_num is not None: + fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 tx_height = bisect_right(tx_counts, tx_num) - tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes)) + tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes), fill_cache=fill_cache) if tx_height == -1: merkle = { 'block_height': -1