Use multi_put/delete to update TX and UTXO records in advance_block().

This commit is contained in:
Jonathan Moody 2022-08-29 15:46:03 -05:00
parent f7eca425eb
commit af0c666b69
3 changed files with 51 additions and 21 deletions

View file

@ -1369,13 +1369,14 @@ class SecondaryDB:
return state
def assert_db_state(self):
state = self.prefix_db.db_state.get()
assert self.db_version == state.db_version, f"{self.db_version} != {state.db_version}"
assert self.db_height == state.height, f"{self.db_height} != {state.height}"
assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}"
assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}"
assert self.catching_up == state.catching_up, f"{self.catching_up} != {state.catching_up}"
assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}"
if __debug__:
state = self.prefix_db.db_state.get()
assert self.db_version == state.db_version, f"{self.db_version} != {state.db_version}"
assert self.db_height == state.height, f"{self.db_height} != {state.height}"
assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}"
assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}"
assert self.catching_up == state.catching_up, f"{self.catching_up} != {state.catching_up}"
assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}"
async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order."""

View file

@ -118,7 +118,7 @@ class PrefixRow(metaclass=PrefixRowType):
if idx % step == 0:
await asyncio.sleep(0)
def stage_multi_put(self, items):
def stage_multi_put(self, items: typing.List[typing.Tuple[typing.Tuple,typing.Tuple]]):
self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
@ -139,6 +139,9 @@ class PrefixRow(metaclass=PrefixRowType):
def stage_delete(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertableDelete(self.pack_key(*key_args), self.pack_value(*value_args)))
def stage_multi_delete(self, items: typing.List[typing.Tuple[typing.Tuple,typing.Tuple]]):
self._op_stack.multi_delete([RevertableDelete(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
@classmethod
def pack_partial_key(cls, *args) -> bytes:
return cls.prefix + cls.key_part_lambdas[len(args)](*args)
@ -280,6 +283,8 @@ class BasePrefixDB:
return self._db.get((cf, key), fill_cache=fill_cache)
def multi_get(self, keys: typing.List[bytes], fill_cache=True):
if len(keys) == 0:
return []
first_key = keys[0]
if not all(first_key[0] == key[0] for key in keys):
raise ValueError('cannot multi-delete across column families')

View file

@ -1314,6 +1314,11 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
# Accumulate the tx, tx_num, tx_hash put operations for this block.
tx_puts: List[Tuple[Tuple,Tuple]] = []
tx_num_puts: List[Tuple[Tuple,Tuple]] = []
tx_hash_puts: List[Tuple[Tuple,Tuple]] = []
for tx, tx_hash in txs:
spent_claims = {}
# clean up mempool, delete txs that were already in mempool/staged to be added
@ -1322,16 +1327,22 @@ class BlockchainProcessorService(BlockchainService):
if mempool_tx:
self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), mempool_tx)
self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,))
self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,))
self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,))
tx_puts.append(((tx_hash,), (tx.raw,)))
tx_num_puts.append(((tx_hash,), (tx_count,)))
tx_hash_puts.append(((tx_count,),(tx_hash,)))
# Accumulate the utxo, hashX_utxo operations for this transaction.
utxo_deletes: List[Tuple[Tuple,Tuple]] = []
hashX_utxo_deletes: List[Tuple[Tuple,Tuple]] = []
utxo_puts: List[Tuple[Tuple,Tuple]] = []
hashX_utxo_puts: List[Tuple[Tuple,Tuple]] = []
# Spend the inputs
for txin in tx.inputs:
if txin.is_generation():
continue
# spend utxo for address histories
hashX = spend_utxo(txin.prev_hash, txin.prev_idx)
hashX = spend_utxo(txin.prev_hash, txin.prev_idx, utxo_deletes, hashX_utxo_deletes)
if hashX:
if tx_count not in self.hashXs_by_tx[hashX]:
self.hashXs_by_tx[hashX].append(tx_count)
@ -1342,7 +1353,7 @@ class BlockchainProcessorService(BlockchainService):
for nout, txout in enumerate(tx.outputs):
txo_count += 1
# Get the hashX. Ignore unspendable outputs
hashX = add_utxo(tx_hash, tx_count, nout, txout)
hashX = add_utxo(tx_hash, tx_count, nout, txout, utxo_puts, hashX_utxo_puts)
if hashX:
# self._set_hashX_cache(hashX)
if tx_count not in self.hashXs_by_tx[hashX]:
@ -1352,6 +1363,12 @@ class BlockchainProcessorService(BlockchainService):
height, tx_hash, tx_count, nout, txout, spent_claims, tx.inputs[0]
)
# Apply the utxo, hashX_utxo put/delete operations as multi-put/multi-delete.
self.db.prefix_db.utxo.stage_multi_delete(utxo_deletes)
self.db.prefix_db.hashX_utxo.stage_multi_delete(hashX_utxo_deletes)
self.db.prefix_db.utxo.stage_multi_put(utxo_puts)
self.db.prefix_db.hashX_utxo.stage_multi_put(hashX_utxo_puts)
# Handle abandoned claims
abandoned_channels = {}
# abandon the channels last to handle abandoned signed claims in the same tx,
@ -1373,6 +1390,11 @@ class BlockchainProcessorService(BlockchainService):
self.db.tx_num_mapping[tx_hash] = tx_count
tx_count += 1
# Apply the tx, tx_num, tx_hash put operations as a multi-put.
self.db.prefix_db.tx.stage_multi_put(tx_puts)
self.db.prefix_db.tx_num.stage_multi_put(tx_num_puts)
self.db.prefix_db.tx_hash.stage_multi_put(tx_hash_puts)
# handle expired claims
self._expire_claims(height)
@ -1628,13 +1650,14 @@ class BlockchainProcessorService(BlockchainService):
self.log.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
f'Height {self.height:,d} txs: {self.tx_count:,d} ({tx_delta:+,d})')
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]:
def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput',
utxo_puts: List[Tuple[Tuple,Tuple]], hashX_utxo_puts: List[Tuple[Tuple,Tuple]]) -> Optional[bytes]:
hashX = self.coin.hashX_from_txo(txout)
if hashX:
self.touched_hashXs.add(hashX)
self.utxo_cache[(tx_hash, nout)] = (hashX, txout.value)
self.db.prefix_db.utxo.stage_put((hashX, tx_num, nout), (txout.value,))
self.db.prefix_db.hashX_utxo.stage_put((tx_hash[:4], tx_num, nout), (hashX,))
utxo_puts.append(((hashX, tx_num, nout), (txout.value,)))
hashX_utxo_puts.append(((tx_hash[:4], tx_num, nout), (hashX,)))
return hashX
def get_pending_tx_num(self, tx_hash: bytes) -> int:
@ -1643,7 +1666,8 @@ class BlockchainProcessorService(BlockchainService):
else:
return self.db.get_tx_num(tx_hash)
def spend_utxo(self, tx_hash: bytes, nout: int):
def spend_utxo(self, tx_hash: bytes, nout: int,
utxo_deletes: List[Tuple[Tuple,Tuple]], hashX_utxo_deletes: List[Tuple[Tuple,Tuple]]):
hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None))
txin_num = self.get_pending_tx_num(tx_hash)
if not hashX:
@ -1660,12 +1684,12 @@ class BlockchainProcessorService(BlockchainService):
f"{hash_to_hex_str(tx_hash)}:{nout} is not found in UTXO db for {hash_to_hex_str(hashX)}"
)
self.touched_hashXs.add(hashX)
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), hashX_value)
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), utxo_value)
hashX_utxo_deletes.append(((tx_hash[:4], txin_num, nout), hashX_value))
utxo_deletes.append(((hashX, txin_num, nout), utxo_value))
return hashX
elif amount is not None:
self.db.prefix_db.hashX_utxo.stage_delete((tx_hash[:4], txin_num, nout), (hashX,))
self.db.prefix_db.utxo.stage_delete((hashX, txin_num, nout), (amount,))
hashX_utxo_deletes.append(((tx_hash[:4], txin_num, nout), (hashX,)))
utxo_deletes.append(((hashX, txin_num, nout), (amount,)))
self.touched_hashXs.add(hashX)
return hashX