add CACHE_ALL_CLAIM_TXOS hub setting

This commit is contained in:
Jack Robison 2021-10-19 13:29:13 -04:00 committed by Victor Shyba
parent dafd62104b
commit 64509ca95d
6 changed files with 90 additions and 34 deletions

View file

@ -404,6 +404,7 @@ class BlockProcessor:
await self.backup_block() await self.backup_block()
self.logger.info(f'backed up to height {self.height:,d}') self.logger.info(f'backed up to height {self.height:,d}')
if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this await self.db._read_claim_txos() # TODO: don't do this
for touched in self.touched_claims_to_send_es: for touched in self.touched_claims_to_send_es:
if not self.db.get_claim_txo(touched): if not self.db.get_claim_txo(touched):
@ -545,6 +546,7 @@ class BlockProcessor:
previous_amount = previous_claim.amount previous_amount = previous_claim.amount
self.updated_claims.add(claim_hash) self.updated_claims.add(claim_hash)
if self.env.cache_all_claim_txos:
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue( self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name
) )
@ -693,7 +695,7 @@ class BlockProcessor:
if (txin_num, txin.prev_idx) in self.txo_to_claim: if (txin_num, txin.prev_idx) in self.txo_to_claim:
spent = self.txo_to_claim[(txin_num, txin.prev_idx)] spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
else: else:
if txin_num not in self.db.txo_to_claim or txin.prev_idx not in self.db.txo_to_claim[txin_num]: if not self.db.get_cached_claim_exists(txin_num, txin.prev_idx):
# txo is not a claim # txo is not a claim
return False return False
spent_claim_hash_and_name = self.db.get_claim_from_txo( spent_claim_hash_and_name = self.db.get_claim_from_txo(
@ -701,6 +703,8 @@ class BlockProcessor:
) )
assert spent_claim_hash_and_name is not None assert spent_claim_hash_and_name is not None
spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash)
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx)
if not self.db.txo_to_claim[txin_num]: if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num) self.db.txo_to_claim.pop(txin_num)
@ -1022,8 +1026,8 @@ class BlockProcessor:
# prepare to activate or delay activation of the pending claims being added this block # prepare to activate or delay activation of the pending claims being added this block
for (tx_num, nout), staged in self.txo_to_claim.items(): for (tx_num, nout), staged in self.txo_to_claim.items():
is_delayed = not staged.is_update is_delayed = not staged.is_update
if staged.claim_hash in self.db.claim_to_txo: prev_txo = self.db.get_cached_claim_txo(staged.claim_hash)
prev_txo = self.db.claim_to_txo[staged.claim_hash] if prev_txo:
prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position) prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position)
if height < prev_activation or prev_activation < 0: if height < prev_activation or prev_activation < 0:
is_delayed = True is_delayed = True

View file

@ -85,6 +85,18 @@ class PrefixRow(metaclass=PrefixRowType):
if v: if v:
return v if not deserialize_value else self.unpack_value(v) return v if not deserialize_value else self.unpack_value(v)
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
packed_key = self.pack_key(*key_args)
last_op = self._op_stack.get_last_op_for_key(packed_key)
if last_op:
if last_op.is_put:
return last_op.value if not deserialize_value else self.unpack_value(last_op.value)
else: # it's a delete
return
v = self._db.get(packed_key, fill_cache=fill_cache)
if v:
return v if not deserialize_value else self.unpack_value(v)
def stage_put(self, key_args=(), value_args=()): def stage_put(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args))) self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args)))

View file

@ -169,3 +169,7 @@ class RevertableOpStack:
while packed: while packed:
op, packed = RevertableOp.unpack(packed) op, packed = RevertableOp.unpack(packed)
self.append_op(op) self.append_op(op)
def get_last_op_for_key(self, key: bytes) -> Optional[RevertableOp]:
if key in self._items and self._items[key]:
return self._items[key][-1]

View file

@ -79,6 +79,7 @@ class Env:
self.log_sessions = self.integer('LOG_SESSIONS', 3600) self.log_sessions = self.integer('LOG_SESSIONS', 3600)
self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False)
self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False) self.cache_all_tx_hashes = self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = self.boolean('CACHE_ALL_CLAIM_TXOS', False)
self.country = self.default('COUNTRY', 'US') self.country = self.default('COUNTRY', 'US')
# Peer discovery # Peer discovery
self.peer_discovery = self.peer_discovery_enum() self.peer_discovery = self.peer_discovery_enum()

View file

@ -113,9 +113,12 @@ class LevelDB:
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server") self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
# these are only used if the cache_all_tx_hashes setting is on
self.total_transactions: List[bytes] = [] self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {} self.tx_num_mapping: Dict[bytes, int] = {}
# these are only used if the cache_all_claim_txos setting is on
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {} self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict) self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
@ -210,7 +213,7 @@ class LevelDB:
expiration_height = self.coin.get_expiration_height(height) expiration_height = self.coin.get_expiration_height(height)
support_amount = self.get_support_amount(claim_hash) support_amount = self.get_support_amount(claim_hash)
claim_amount = self.claim_to_txo[claim_hash].amount claim_amount = self.get_cached_claim_txo(claim_hash).amount
effective_amount = support_amount + claim_amount effective_amount = support_amount + claim_amount
channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position) channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position)
@ -219,7 +222,7 @@ class LevelDB:
canonical_url = short_url canonical_url = short_url
claims_in_channel = self.get_claims_in_channel_count(claim_hash) claims_in_channel = self.get_claims_in_channel_count(claim_hash)
if channel_hash: if channel_hash:
channel_vals = self.claim_to_txo.get(channel_hash) channel_vals = self.get_cached_claim_txo(channel_hash)
if channel_vals: if channel_vals:
channel_short_url = self.get_short_claim_id_url( channel_short_url = self.get_short_claim_id_url(
channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num, channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num,
@ -271,11 +274,13 @@ class LevelDB:
) )
# resolve by partial/complete claim id # resolve by partial/complete claim id
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])): for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])):
claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position] full_claim_hash = self.get_cached_claim_hash(claim_txo.tx_num, claim_txo.position)
non_normalized_name = self.claim_to_txo.get(claim_hash).name c = self.get_cached_claim_txo(full_claim_hash)
signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid
non_normalized_name = c.name
signature_is_valid = c.channel_signature_is_valid
return self._prepare_resolve_result( return self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, non_normalized_name, key.root_tx_num, claim_txo.tx_num, claim_txo.position, full_claim_hash, non_normalized_name, key.root_tx_num,
key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position), key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position),
signature_is_valid signature_is_valid
) )
@ -285,7 +290,7 @@ class LevelDB:
for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))): for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))):
if amount_order > idx + 1: if amount_order > idx + 1:
continue continue
claim_txo = self.claim_to_txo.get(claim_val.claim_hash) claim_txo = self.get_cached_claim_txo(claim_val.claim_hash)
activation = self.get_activation(key.tx_num, key.position) activation = self.get_activation(key.tx_num, key.position)
return self._prepare_resolve_result( return self._prepare_resolve_result(
key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num, key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num,
@ -360,7 +365,7 @@ class LevelDB:
return await asyncio.get_event_loop().run_in_executor(None, self._resolve, url) return await asyncio.get_event_loop().run_in_executor(None, self._resolve, url)
def _fs_get_claim_by_hash(self, claim_hash): def _fs_get_claim_by_hash(self, claim_hash):
claim = self.claim_to_txo.get(claim_hash) claim = self.get_cached_claim_txo(claim_hash)
if claim: if claim:
activation = self.get_activation(claim.tx_num, claim.position) activation = self.get_activation(claim.tx_num, claim.position)
return self._prepare_resolve_result( return self._prepare_resolve_result(
@ -525,7 +530,7 @@ class LevelDB:
reposted_claim = None reposted_claim = None
reposted_metadata = None reposted_metadata = None
if reposted_claim_hash: if reposted_claim_hash:
reposted_claim = self.claim_to_txo.get(reposted_claim_hash) reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
if not reposted_claim: if not reposted_claim:
return return
reposted_metadata = self.get_claim_metadata( reposted_metadata = self.get_claim_metadata(
@ -677,11 +682,21 @@ class LevelDB:
async def all_claims_producer(self, batch_size=500_000): async def all_claims_producer(self, batch_size=500_000):
batch = [] batch = []
for claim_hash, claim_txo in self.claim_to_txo.items(): if self.env.cache_all_claim_txos:
claim_iterator = self.claim_to_txo.items()
else:
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
for claim_hash, claim_txo in claim_iterator:
# TODO: fix the couple of claim txos that dont have controlling names # TODO: fix the couple of claim txos that dont have controlling names
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue continue
claim = self._fs_get_claim_by_hash(claim_hash) activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim: if claim:
batch.append(claim) batch.append(claim)
if len(batch) == batch_size: if len(batch) == batch_size:
@ -703,16 +718,14 @@ class LevelDB:
results = [] results = []
for claim_hash in claim_hashes: for claim_hash in claim_hashes:
if claim_hash not in self.claim_to_txo: claim_txo = self.get_cached_claim_txo(claim_hash)
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo: if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position) activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result( claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
@ -764,7 +777,6 @@ class LevelDB:
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
async def _read_claim_txos(self): async def _read_claim_txos(self):
def read_claim_txos(): def read_claim_txos():
set_claim_to_txo = self.claim_to_txo.__setitem__ set_claim_to_txo = self.claim_to_txo.__setitem__
@ -853,6 +865,7 @@ class LevelDB:
# Read TX counts (requires meta directory) # Read TX counts (requires meta directory)
await self._read_tx_counts() await self._read_tx_counts()
await self._read_headers() await self._read_headers()
if self.env.cache_all_claim_txos:
await self._read_claim_txos() await self._read_claim_txos()
if self.env.cache_all_tx_hashes: if self.env.cache_all_tx_hashes:
await self._read_tx_hashes() await self._read_tx_hashes()
@ -873,6 +886,22 @@ class LevelDB:
return self.tx_num_mapping[tx_hash] return self.tx_num_mapping[tx_hash]
return self.prefix_db.tx_num.get(tx_hash).tx_num return self.prefix_db.tx_num.get(tx_hash).tx_num
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self.env.cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
if self.env.cache_all_claim_txos:
if tx_num not in self.txo_to_claim:
return
return self.txo_to_claim[tx_num].get(position, None)
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
return None if not v else v.claim_hash
def get_cached_claim_exists(self, tx_num: int, position: int) -> bool:
return self.get_cached_claim_hash(tx_num, position) is not None
# Header merkle cache # Header merkle cache
async def populate_header_merkle_cache(self): async def populate_header_merkle_cache(self):
@ -960,7 +989,10 @@ class LevelDB:
tx_height = -1 tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None: if tx_num is not None:
if self.env.cache_all_claim_txos:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
else:
fill_cache = False
tx_height = bisect_right(self.tx_counts, tx_num) tx_height = bisect_right(self.tx_counts, tx_num)
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1: if tx_height == -1:

View file

@ -123,6 +123,9 @@ class TestRevertablePrefixDB(unittest.TestCase):
self.assertIsNone(self.db.claim_takeover.get(name)) self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height)) self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height))
self.assertIsNone(self.db.claim_takeover.get(name))
self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height)
self.db.commit(10000000) self.db.commit(10000000)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height) self.assertEqual(10000000, self.db.claim_takeover.get(name).height)