forked from LBRYCommunity/lbry-sdk
in memory claim_to_txo and txo_to_claim dictionaries
This commit is contained in:
parent
68596be1b9
commit
ef6ec03161
2 changed files with 31 additions and 4 deletions
|
@ -345,6 +345,8 @@ class BlockProcessor:
|
||||||
await self.flush()
|
await self.flush()
|
||||||
self.logger.info(f'backed up to height {self.height:,d}')
|
self.logger.info(f'backed up to height {self.height:,d}')
|
||||||
|
|
||||||
|
await self.db._read_claim_txos()
|
||||||
|
|
||||||
for touched in self.touched_claims_to_send_es:
|
for touched in self.touched_claims_to_send_es:
|
||||||
if not self.db.get_claim_txo(touched):
|
if not self.db.get_claim_txo(touched):
|
||||||
self.removed_claims_to_send_es.add(touched)
|
self.removed_claims_to_send_es.add(touched)
|
||||||
|
@ -478,6 +480,11 @@ class BlockProcessor:
|
||||||
).get_remove_activate_ops()
|
).get_remove_activate_ops()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
|
||||||
|
tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name
|
||||||
|
)
|
||||||
|
self.db.txo_to_claim[(tx_num, nout)] = claim_hash
|
||||||
|
|
||||||
pending = StagedClaimtrieItem(
|
pending = StagedClaimtrieItem(
|
||||||
claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num,
|
claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num,
|
||||||
root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
|
root_idx, channel_signature_is_valid, signing_channel_hash, reposted_claim_hash
|
||||||
|
@ -536,12 +543,14 @@ class BlockProcessor:
|
||||||
if (txin_num, txin.prev_idx) in self.txo_to_claim:
|
if (txin_num, txin.prev_idx) in self.txo_to_claim:
|
||||||
spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
|
spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
|
||||||
else:
|
else:
|
||||||
|
if (txin_num, txin.prev_idx) not in self.db.txo_to_claim: # txo is not a claim
|
||||||
|
return False
|
||||||
spent_claim_hash_and_name = self.db.get_claim_from_txo(
|
spent_claim_hash_and_name = self.db.get_claim_from_txo(
|
||||||
txin_num, txin.prev_idx
|
txin_num, txin.prev_idx
|
||||||
)
|
)
|
||||||
if not spent_claim_hash_and_name: # txo is not a claim
|
assert spent_claim_hash_and_name is not None
|
||||||
return False
|
|
||||||
spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash)
|
spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash)
|
||||||
|
self.db.claim_to_txo.pop(self.db.txo_to_claim.pop((txin_num, txin.prev_idx)))
|
||||||
if spent.reposted_claim_hash:
|
if spent.reposted_claim_hash:
|
||||||
self.pending_reposted.add(spent.reposted_claim_hash)
|
self.pending_reposted.add(spent.reposted_claim_hash)
|
||||||
if spent.signing_hash and spent.channel_signature_is_valid:
|
if spent.signing_hash and spent.channel_signature_is_valid:
|
||||||
|
|
|
@ -21,7 +21,7 @@ from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from asyncio import sleep
|
from asyncio import sleep
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from collections import defaultdict
|
from collections import defaultdict, OrderedDict
|
||||||
from lbry.utils import LRUCacheWithMetrics
|
from lbry.utils import LRUCacheWithMetrics
|
||||||
from lbry.schema.url import URL
|
from lbry.schema.url import URL
|
||||||
from lbry.wallet.server import util
|
from lbry.wallet.server import util
|
||||||
|
@ -131,6 +131,9 @@ class LevelDB:
|
||||||
self.total_transactions = None
|
self.total_transactions = None
|
||||||
self.transaction_num_mapping = {}
|
self.transaction_num_mapping = {}
|
||||||
|
|
||||||
|
self.claim_to_txo: typing.OrderedDict[bytes, ClaimToTXOValue] = OrderedDict()
|
||||||
|
self.txo_to_claim: typing.OrderedDict[Tuple[int, int], bytes] = OrderedDict()
|
||||||
|
|
||||||
# Search index
|
# Search index
|
||||||
self.search_index = SearchIndex(
|
self.search_index = SearchIndex(
|
||||||
self.env.es_index_prefix, self.env.database_query_timeout,
|
self.env.es_index_prefix, self.env.database_query_timeout,
|
||||||
|
@ -281,7 +284,7 @@ class LevelDB:
|
||||||
for k, v in self.db.iterator(prefix=prefix):
|
for k, v in self.db.iterator(prefix=prefix):
|
||||||
key = Prefixes.claim_short_id.unpack_key(k)
|
key = Prefixes.claim_short_id.unpack_key(k)
|
||||||
claim_txo = Prefixes.claim_short_id.unpack_value(v)
|
claim_txo = Prefixes.claim_short_id.unpack_value(v)
|
||||||
claim_hash = self.get_claim_from_txo(claim_txo.tx_num, claim_txo.position).claim_hash
|
claim_hash = self.txo_to_claim[(claim_txo.tx_num, claim_txo.position)]
|
||||||
signature_is_valid = self.get_claim_txo(claim_hash).channel_signature_is_valid
|
signature_is_valid = self.get_claim_txo(claim_hash).channel_signature_is_valid
|
||||||
return self._prepare_resolve_result(
|
return self._prepare_resolve_result(
|
||||||
claim_txo.tx_num, claim_txo.position, claim_hash, key.name, key.root_tx_num,
|
claim_txo.tx_num, claim_txo.position, claim_hash, key.name, key.root_tx_num,
|
||||||
|
@ -701,6 +704,20 @@ class LevelDB:
|
||||||
ts = time.perf_counter() - start
|
ts = time.perf_counter() - start
|
||||||
self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4))
|
self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4))
|
||||||
|
|
||||||
|
async def _read_claim_txos(self):
|
||||||
|
def read_claim_txos():
|
||||||
|
for _k, _v in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix):
|
||||||
|
k = Prefixes.claim_to_txo.unpack_key(_k)
|
||||||
|
v = Prefixes.claim_to_txo.unpack_value(_v)
|
||||||
|
self.claim_to_txo[k.claim_hash] = v
|
||||||
|
self.txo_to_claim[(v.tx_num, v.position)] = k.claim_hash
|
||||||
|
|
||||||
|
start = time.perf_counter()
|
||||||
|
self.logger.info("loading claims")
|
||||||
|
await asyncio.get_event_loop().run_in_executor(None, read_claim_txos)
|
||||||
|
ts = time.perf_counter() - start
|
||||||
|
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
|
||||||
|
|
||||||
async def _read_headers(self):
|
async def _read_headers(self):
|
||||||
if self.headers is not None:
|
if self.headers is not None:
|
||||||
return
|
return
|
||||||
|
@ -756,6 +773,7 @@ class LevelDB:
|
||||||
if self.total_transactions is None:
|
if self.total_transactions is None:
|
||||||
await self._read_txids()
|
await self._read_txids()
|
||||||
await self._read_headers()
|
await self._read_headers()
|
||||||
|
await self._read_claim_txos()
|
||||||
|
|
||||||
# start search index
|
# start search index
|
||||||
await self.search_index.start()
|
await self.search_index.start()
|
||||||
|
|
Loading…
Reference in a new issue