reorg claims in the search index

This commit is contained in:
Jack Robison 2021-07-16 14:46:46 -04:00
parent 7a56eff1ac
commit 496f89f184
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 150 additions and 30 deletions

View file

@ -249,8 +249,12 @@ class BlockProcessor:
self.possible_future_claim_amount_by_name_and_hash: Dict[Tuple[str, bytes], int] = {}
self.possible_future_support_txos_by_claim_hash: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
self.removed_claims_to_send_es = set()
self.removed_claims_to_send_es = set() # cumulative changes across blocks to send ES
self.touched_claims_to_send_es = set()
self.removed_claim_hashes: Set[bytes] = set() # per block changes
self.touched_claim_hashes: Set[bytes] = set()
self.signatures_changed = set()
self.pending_reposted = set()
@ -268,16 +272,9 @@ class BlockProcessor:
if self.db.db_height <= 1:
return
to_send_es = set(self.touched_claims_to_send_es)
to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es))
to_send_es.update(
{k for k, v in self.pending_channel_counts.items() if v != 0}.difference(
self.removed_claims_to_send_es)
)
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
for claim in self.db.claims_producer(to_send_es):
for claim in self.db.claims_producer(self.touched_claims_to_send_es):
yield 'update', claim
async def run_in_thread_with_lock(self, func, *args):
@ -313,14 +310,12 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.advance_block, block)
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
# TODO: we shouldnt wait on the search index updating before advancing to the next block
if not self.db.first_sync:
await self.db.search_index.claim_consumer(self.claim_producer())
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
self.pending_reposted.clear()
self.pending_channel_counts.clear()
# print("******************\n")
if not self.db.first_sync:
await self.db.search_index.claim_consumer(self.claim_producer())
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
# print("******************\n")
except:
self.logger.exception("advance blocks failed")
raise
@ -351,6 +346,14 @@ class BlockProcessor:
assert count > 0, count
for _ in range(count):
await self.run_in_thread_with_lock(self.backup_block)
for touched in self.touched_claims_to_send_es:
if not self.db.get_claim_txo(touched):
self.removed_claims_to_send_es.add(touched)
self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es)
await self.db.search_index.claim_consumer(self.claim_producer())
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
except:
@ -995,6 +998,9 @@ class BlockProcessor:
).get_activate_ops()
)
self.db_op_stack.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
self.touched_claim_hashes.add(winning_including_future_activations)
if controlling and controlling.claim_hash not in self.abandoned_claims:
self.touched_claim_hashes.add(controlling.claim_hash)
elif not controlling or (winning_claim_hash != controlling.claim_hash and
name in names_with_abandoned_controlling_claims) or \
((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])):
@ -1025,6 +1031,9 @@ class BlockProcessor:
).get_activate_ops()
)
self.db_op_stack.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
if controlling and controlling.claim_hash not in self.abandoned_claims:
self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning_claim_hash)
elif winning_claim_hash == controlling.claim_hash:
# print("\tstill winning")
pass
@ -1048,19 +1057,23 @@ class BlockProcessor:
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
self.db_op_stack.extend(get_takeover_name_ops(name, winning, height, controlling))
if controlling:
self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning)
def _get_cumulative_update_ops(self):
# gather cumulative removed/touched sets to update the search index
self.removed_claims_to_send_es.update(set(self.abandoned_claims.keys()))
self.touched_claims_to_send_es.update(
self.removed_claim_hashes.update(set(self.abandoned_claims.keys()))
self.touched_claim_hashes.update(
set(self.activated_support_amount_by_claim.keys()).union(
set(claim_hash for (_, claim_hash) in self.activated_claim_amount_by_name_and_hash.keys())
).union(self.signatures_changed).union(
set(self.removed_active_support_amount_by_claim.keys())
).difference(self.removed_claims_to_send_es)
).difference(self.removed_claim_hashes)
)
# use the cumulative changes to update bid ordered resolve
for removed in self.removed_claims_to_send_es:
for removed in self.removed_claim_hashes:
removed_claim = self.db.get_claim_txo(removed)
if removed_claim:
amt = self.db.get_url_effective_amount(
@ -1071,7 +1084,7 @@ class BlockProcessor:
removed_claim.name, amt.effective_amount, amt.tx_num,
amt.position, removed
))
for touched in self.touched_claims_to_send_es:
for touched in self.touched_claim_hashes:
if touched in self.claim_hash_to_txo:
pending = self.txo_to_claim[self.claim_hash_to_txo[touched]]
name, tx_num, position = pending.name, pending.tx_num, pending.position
@ -1098,6 +1111,16 @@ class BlockProcessor:
tx_num, position, touched)
)
self.touched_claim_hashes.update(
{k for k in self.pending_reposted if k not in self.removed_claim_hashes}
)
self.touched_claim_hashes.update(
{k for k, v in self.pending_channel_counts.items() if v != 0 and k not in self.removed_claim_hashes}
)
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
self.touched_claims_to_send_es.update(self.touched_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes)
def advance_block(self, block):
height = self.height + 1
# print("advance ", height)
@ -1168,6 +1191,9 @@ class BlockProcessor:
# activate claims and process takeovers
self._get_takeover_ops(height)
# update effective amount and update sets of touched and deleted claims
self._get_cumulative_update_ops()
self.db_op_stack.append(RevertablePut(*Prefixes.header.pack_item(height, block.header)))
self.db_op_stack.append(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
@ -1185,8 +1211,20 @@ class BlockProcessor:
self.tx_count = tx_count
self.db.tx_counts.append(self.tx_count)
if height >= self.daemon.cached_height() - self.env.reorg_limit:
self.db_op_stack.append(RevertablePut(*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())))
cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit
if height >= cached_max_reorg_depth:
self.db_op_stack.append(
RevertablePut(
*Prefixes.touched_or_deleted.pack_item(
height, self.touched_claim_hashes, self.removed_claim_hashes
)
)
)
self.db_op_stack.append(
RevertablePut(
*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())
)
)
self.height = height
self.db.headers.append(block.header)
@ -1220,16 +1258,26 @@ class BlockProcessor:
self.hashXs_by_tx.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
self.removed_claim_hashes.clear()
self.touched_claim_hashes.clear()
self.pending_reposted.clear()
self.pending_channel_counts.clear()
def backup_block(self):
self.db.assert_flushed(self.flush_data())
self.logger.info("backup block %i", self.height)
# Check and update self.tip
undo_ops = self.db.read_undo_info(self.height)
undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height)
if undo_ops is None:
raise ChainError(f'no undo information found for height {self.height:,d}')
self.db_op_stack.apply_packed_undo_ops(undo_ops)
self.db_op_stack.append(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops))
self.db_op_stack.apply_packed_undo_ops(undo_ops)
touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes)
self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims)
self.removed_claims_to_send_es.difference_update(touched_and_deleted.touched_claims)
self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims)
self.db.headers.pop()
self.block_hashes.pop()
self.db.tx_counts.pop()

View file

@ -25,6 +25,7 @@ class DB_PREFIXES(enum.Enum):
reposted_claim = b'W'
undo = b'M'
claim_diff = b'Y'
tx = b'B'
block_hash = b'C'

View file

@ -387,6 +387,20 @@ class RepostedValue(typing.NamedTuple):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class TouchedOrDeletedClaimKey(typing.NamedTuple):
height: int
class TouchedOrDeletedClaimValue(typing.NamedTuple):
touched_claims: typing.Set[bytes]
deleted_claims: typing.Set[bytes]
def __str__(self):
return f"{self.__class__.__name__}(" \
f"touched_claims={','.join(map(lambda x: x.hex(), self.touched_claims))}," \
f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})"
class ActiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.active_amount.value
key_struct = struct.Struct(b'>20sBLLH')
@ -1219,6 +1233,38 @@ class HashXHistoryPrefixRow(PrefixRow):
return cls.pack_key(hashX, height), cls.pack_value(history)
class TouchedOrDeletedPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_diff.value
key_struct = struct.Struct(b'>L')
value_struct = struct.Struct(b'>LL')
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@classmethod
def unpack_key(cls, key: bytes) -> TouchedOrDeletedClaimKey:
return TouchedOrDeletedClaimKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, touched, deleted) -> bytes:
return cls.value_struct.pack(len(touched), len(deleted)) + b''.join(touched) + b''.join(deleted)
@classmethod
def unpack_value(cls, data: bytes) -> TouchedOrDeletedClaimValue:
touched_len, deleted_len = cls.value_struct.unpack(data[:8])
assert len(data) == 20 * (touched_len + deleted_len) + 8
touched_bytes, deleted_bytes = data[8:touched_len*20+8], data[touched_len*20+8:touched_len*20+deleted_len*20+8]
return TouchedOrDeletedClaimValue(
{touched_bytes[8+20*i:8+20*(i+1)] for i in range(touched_len)},
{deleted_bytes[8+20*i:8+20*(i+1)] for i in range(deleted_len)}
)
@classmethod
def pack_item(cls, height, touched, deleted):
return cls.pack_key(height), cls.pack_value(touched, deleted)
class Prefixes:
claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow
@ -1252,6 +1298,8 @@ class Prefixes:
tx_num = TXNumPrefixRow
tx = TXPrefixRow
header = BlockHeaderPrefixRow
touched_or_deleted = TouchedOrDeletedPrefixRow
ROW_TYPES = {

View file

@ -784,12 +784,18 @@ class LevelDB:
min_height = self.min_undo_height(self.db_height)
delete_undo_keys = []
if min_height > 0:
if min_height > 0: # delete undos for blocks deep enough they can't be reorged
delete_undo_keys.extend(
self.db.iterator(
start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False
)
)
delete_undo_keys.extend(
self.db.iterator(
start=Prefixes.touched_or_deleted.pack_key(0),
stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False
)
)
with self.db.write_batch() as batch:
batch_put = batch.put
@ -1006,7 +1012,7 @@ class LevelDB:
return Prefixes.undo.pack_key(height)
def read_undo_info(self, height: int):
return self.db.get(Prefixes.undo.pack_key(height))
return self.db.get(Prefixes.undo.pack_key(height)), self.db.get(Prefixes.touched_or_deleted.pack_key(height))
# -- UTXO database

View file

@ -813,10 +813,15 @@ class TransactionOutputCommands(ClaimTestCase):
stream_id = self.get_claim_id(await self.stream_create())
await self.support_create(stream_id, '0.3')
await self.support_create(stream_id, '0.2')
await self.generate(day_blocks)
await self.generate(day_blocks // 2)
await self.stream_update(stream_id)
await self.generate(day_blocks // 2)
await self.support_create(stream_id, '0.4')
await self.support_create(stream_id, '0.5')
await self.generate(day_blocks)
await self.stream_update(stream_id)
await self.generate(day_blocks // 2)
await self.stream_update(stream_id)
await self.generate(day_blocks // 2)
await self.support_create(stream_id, '0.6')
plot = await self.txo_plot(type='support')

View file

@ -17,8 +17,13 @@ class BaseResolveTestCase(CommandTestCase):
if claim_id is None:
self.assertIn('error', other)
self.assertEqual(other['error']['name'], 'NOT_FOUND')
claims_from_es = (await self.conductor.spv_node.server.bp.db.search_index.search(name=name))[0]
claims_from_es = [c['claim_hash'][::-1].hex() for c in claims_from_es]
self.assertNotIn(claim_id, claims_from_es)
else:
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
self.assertEqual(claim_id, other['claim_id'])
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
async def assertNoClaimForName(self, name: str):
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
@ -28,11 +33,18 @@ class BaseResolveTestCase(CommandTestCase):
self.assertIsInstance(stream, LookupError)
else:
self.assertIsInstance(channel, LookupError)
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name)
self.assertListEqual([], claim_from_es[0])
async def assertMatchWinningClaim(self, name):
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
stream, channel = await self.conductor.spv_node.server.bp.db.fs_resolve(name)
claim = stream if stream else channel
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
claim_id=claim.claim_hash.hex()
)
self.assertEqual(len(claim_from_es[0]), 1)
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex())
self.assertEqual(expected['claimId'], claim.claim_hash.hex())
self.assertEqual(expected['validAtHeight'], claim.activation_height)
self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height)