diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e8f21aec0..52a5bc4f6 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -345,6 +345,8 @@ class BlockProcessor: await self.flush() self.logger.info(f'backed up to height {self.height:,d}') + await self.db._read_claim_txos() + for touched in self.touched_claims_to_send_es: if not self.db.get_claim_txo(touched): self.removed_claims_to_send_es.add(touched) @@ -478,6 +480,11 @@ class BlockProcessor: ).get_remove_activate_ops() ) + self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( + tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name + ) + self.db.txo_to_claim[(tx_num, nout)] = claim_hash + pending = StagedClaimtrieItem( claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num, root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash @@ -536,12 +543,14 @@ class BlockProcessor: if (txin_num, txin.prev_idx) in self.txo_to_claim: spent = self.txo_to_claim[(txin_num, txin.prev_idx)] else: + if (txin_num, txin.prev_idx) not in self.db.txo_to_claim: # txo is not a claim + return False spent_claim_hash_and_name = self.db.get_claim_from_txo( txin_num, txin.prev_idx ) - if not spent_claim_hash_and_name: # txo is not a claim - return False + assert spent_claim_hash_and_name is not None spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) + self.db.claim_to_txo.pop(self.db.txo_to_claim.pop((txin_num, txin.prev_idx))) if spent.reposted_claim_hash: self.pending_reposted.add(spent.reposted_claim_hash) if spent.signing_hash and spent.channel_signature_is_valid: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 4fa44af6a..d834094c6 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -21,7 +21,7 @@ from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List from functools import partial from asyncio import sleep from bisect import bisect_right -from collections import defaultdict +from collections import defaultdict, OrderedDict from lbry.utils import LRUCacheWithMetrics from lbry.schema.url import URL from lbry.wallet.server import util @@ -131,6 +131,9 @@ class LevelDB: self.total_transactions = None self.transaction_num_mapping = {} + self.claim_to_txo: typing.OrderedDict[bytes, ClaimToTXOValue] = OrderedDict() + self.txo_to_claim: typing.OrderedDict[Tuple[int, int], bytes] = OrderedDict() + # Search index self.search_index = SearchIndex( self.env.es_index_prefix, self.env.database_query_timeout, @@ -281,7 +284,7 @@ class LevelDB: for k, v in self.db.iterator(prefix=prefix): key = Prefixes.claim_short_id.unpack_key(k) claim_txo = Prefixes.claim_short_id.unpack_value(v) - claim_hash = self.get_claim_from_txo(claim_txo.tx_num, claim_txo.position).claim_hash + claim_hash = self.txo_to_claim[(claim_txo.tx_num, claim_txo.position)] signature_is_valid = self.get_claim_txo(claim_hash).channel_signature_is_valid return self._prepare_resolve_result( claim_txo.tx_num, claim_txo.position, claim_hash, key.name, key.root_tx_num, @@ -701,6 +704,20 @@ class LevelDB: ts = time.perf_counter() - start self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4)) + async def _read_claim_txos(self): + def read_claim_txos(): + for _k, _v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix): + k = Prefixes.claim_to_txo.unpack_key(_k) + v = Prefixes.claim_to_txo.unpack_value(_v) + self.claim_to_txo[k.claim_hash] = v + self.txo_to_claim[(v.tx_num, v.position)] = k.claim_hash + + start = time.perf_counter() + self.logger.info("loading claims") + await asyncio.get_event_loop().run_in_executor(None, read_claim_txos) + ts = time.perf_counter() - start + self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4)) + async def _read_headers(self): if self.headers is not None: return @@ -756,6 +773,7 @@ class LevelDB: if self.total_transactions is None: await self._read_txids() await self._read_headers() + await self._read_claim_txos() # start search index await self.search_index.start()