forked from LBRYCommunity/lbry-sdk
reorg claims in the search index
This commit is contained in:
parent
4e77fa100b
commit
ea1285cd9f
6 changed files with 150 additions and 30 deletions
|
@ -249,8 +249,12 @@ class BlockProcessor:
|
|||
self.possible_future_claim_amount_by_name_and_hash: Dict[Tuple[str, bytes], int] = {}
|
||||
self.possible_future_support_txos_by_claim_hash: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
|
||||
|
||||
self.removed_claims_to_send_es = set()
|
||||
self.removed_claims_to_send_es = set() # cumulative changes across blocks to send ES
|
||||
self.touched_claims_to_send_es = set()
|
||||
|
||||
self.removed_claim_hashes: Set[bytes] = set() # per block changes
|
||||
self.touched_claim_hashes: Set[bytes] = set()
|
||||
|
||||
self.signatures_changed = set()
|
||||
|
||||
self.pending_reposted = set()
|
||||
|
@ -268,16 +272,9 @@ class BlockProcessor:
|
|||
if self.db.db_height <= 1:
|
||||
return
|
||||
|
||||
to_send_es = set(self.touched_claims_to_send_es)
|
||||
to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es))
|
||||
to_send_es.update(
|
||||
{k for k, v in self.pending_channel_counts.items() if v != 0}.difference(
|
||||
self.removed_claims_to_send_es)
|
||||
)
|
||||
|
||||
for claim_hash in self.removed_claims_to_send_es:
|
||||
yield 'delete', claim_hash.hex()
|
||||
for claim in self.db.claims_producer(to_send_es):
|
||||
for claim in self.db.claims_producer(self.touched_claims_to_send_es):
|
||||
yield 'update', claim
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
|
@ -318,8 +315,6 @@ class BlockProcessor:
|
|||
self.db.search_index.clear_caches()
|
||||
self.touched_claims_to_send_es.clear()
|
||||
self.removed_claims_to_send_es.clear()
|
||||
self.pending_reposted.clear()
|
||||
self.pending_channel_counts.clear()
|
||||
# print("******************\n")
|
||||
except:
|
||||
self.logger.exception("advance blocks failed")
|
||||
|
@ -351,6 +346,14 @@ class BlockProcessor:
|
|||
assert count > 0, count
|
||||
for _ in range(count):
|
||||
await self.run_in_thread_with_lock(self.backup_block)
|
||||
for touched in self.touched_claims_to_send_es:
|
||||
if not self.db.get_claim_txo(touched):
|
||||
self.removed_claims_to_send_es.add(touched)
|
||||
self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es)
|
||||
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||
self.db.search_index.clear_caches()
|
||||
self.touched_claims_to_send_es.clear()
|
||||
self.removed_claims_to_send_es.clear()
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
self.reorg_count_metric.inc()
|
||||
except:
|
||||
|
@ -995,6 +998,9 @@ class BlockProcessor:
|
|||
).get_activate_ops()
|
||||
)
|
||||
self.db_op_stack.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling))
|
||||
self.touched_claim_hashes.add(winning_including_future_activations)
|
||||
if controlling and controlling.claim_hash not in self.abandoned_claims:
|
||||
self.touched_claim_hashes.add(controlling.claim_hash)
|
||||
elif not controlling or (winning_claim_hash != controlling.claim_hash and
|
||||
name in names_with_abandoned_controlling_claims) or \
|
||||
((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])):
|
||||
|
@ -1025,6 +1031,9 @@ class BlockProcessor:
|
|||
).get_activate_ops()
|
||||
)
|
||||
self.db_op_stack.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling))
|
||||
if controlling and controlling.claim_hash not in self.abandoned_claims:
|
||||
self.touched_claim_hashes.add(controlling.claim_hash)
|
||||
self.touched_claim_hashes.add(winning_claim_hash)
|
||||
elif winning_claim_hash == controlling.claim_hash:
|
||||
# print("\tstill winning")
|
||||
pass
|
||||
|
@ -1048,19 +1057,23 @@ class BlockProcessor:
|
|||
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
|
||||
# print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
|
||||
self.db_op_stack.extend(get_takeover_name_ops(name, winning, height, controlling))
|
||||
if controlling:
|
||||
self.touched_claim_hashes.add(controlling.claim_hash)
|
||||
self.touched_claim_hashes.add(winning)
|
||||
|
||||
def _get_cumulative_update_ops(self):
|
||||
# gather cumulative removed/touched sets to update the search index
|
||||
self.removed_claims_to_send_es.update(set(self.abandoned_claims.keys()))
|
||||
self.touched_claims_to_send_es.update(
|
||||
self.removed_claim_hashes.update(set(self.abandoned_claims.keys()))
|
||||
self.touched_claim_hashes.update(
|
||||
set(self.activated_support_amount_by_claim.keys()).union(
|
||||
set(claim_hash for (_, claim_hash) in self.activated_claim_amount_by_name_and_hash.keys())
|
||||
).union(self.signatures_changed).union(
|
||||
set(self.removed_active_support_amount_by_claim.keys())
|
||||
).difference(self.removed_claims_to_send_es)
|
||||
).difference(self.removed_claim_hashes)
|
||||
)
|
||||
|
||||
# use the cumulative changes to update bid ordered resolve
|
||||
for removed in self.removed_claims_to_send_es:
|
||||
for removed in self.removed_claim_hashes:
|
||||
removed_claim = self.db.get_claim_txo(removed)
|
||||
if removed_claim:
|
||||
amt = self.db.get_url_effective_amount(
|
||||
|
@ -1071,7 +1084,7 @@ class BlockProcessor:
|
|||
removed_claim.name, amt.effective_amount, amt.tx_num,
|
||||
amt.position, removed
|
||||
))
|
||||
for touched in self.touched_claims_to_send_es:
|
||||
for touched in self.touched_claim_hashes:
|
||||
if touched in self.claim_hash_to_txo:
|
||||
pending = self.txo_to_claim[self.claim_hash_to_txo[touched]]
|
||||
name, tx_num, position = pending.name, pending.tx_num, pending.position
|
||||
|
@ -1098,6 +1111,16 @@ class BlockProcessor:
|
|||
tx_num, position, touched)
|
||||
)
|
||||
|
||||
self.touched_claim_hashes.update(
|
||||
{k for k in self.pending_reposted if k not in self.removed_claim_hashes}
|
||||
)
|
||||
self.touched_claim_hashes.update(
|
||||
{k for k, v in self.pending_channel_counts.items() if v != 0 and k not in self.removed_claim_hashes}
|
||||
)
|
||||
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
|
||||
self.touched_claims_to_send_es.update(self.touched_claim_hashes)
|
||||
self.removed_claims_to_send_es.update(self.removed_claim_hashes)
|
||||
|
||||
def advance_block(self, block):
|
||||
height = self.height + 1
|
||||
# print("advance ", height)
|
||||
|
@ -1168,6 +1191,9 @@ class BlockProcessor:
|
|||
# activate claims and process takeovers
|
||||
self._get_takeover_ops(height)
|
||||
|
||||
# update effective amount and update sets of touched and deleted claims
|
||||
self._get_cumulative_update_ops()
|
||||
|
||||
self.db_op_stack.append(RevertablePut(*Prefixes.header.pack_item(height, block.header)))
|
||||
self.db_op_stack.append(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count)))
|
||||
|
||||
|
@ -1185,8 +1211,20 @@ class BlockProcessor:
|
|||
self.tx_count = tx_count
|
||||
self.db.tx_counts.append(self.tx_count)
|
||||
|
||||
if height >= self.daemon.cached_height() - self.env.reorg_limit:
|
||||
self.db_op_stack.append(RevertablePut(*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())))
|
||||
cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit
|
||||
if height >= cached_max_reorg_depth:
|
||||
self.db_op_stack.append(
|
||||
RevertablePut(
|
||||
*Prefixes.touched_or_deleted.pack_item(
|
||||
height, self.touched_claim_hashes, self.removed_claim_hashes
|
||||
)
|
||||
)
|
||||
)
|
||||
self.db_op_stack.append(
|
||||
RevertablePut(
|
||||
*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops())
|
||||
)
|
||||
)
|
||||
|
||||
self.height = height
|
||||
self.db.headers.append(block.header)
|
||||
|
@ -1220,16 +1258,26 @@ class BlockProcessor:
|
|||
self.hashXs_by_tx.clear()
|
||||
self.history_cache.clear()
|
||||
self.notifications.notified_mempool_txs.clear()
|
||||
self.removed_claim_hashes.clear()
|
||||
self.touched_claim_hashes.clear()
|
||||
self.pending_reposted.clear()
|
||||
self.pending_channel_counts.clear()
|
||||
|
||||
def backup_block(self):
|
||||
self.db.assert_flushed(self.flush_data())
|
||||
self.logger.info("backup block %i", self.height)
|
||||
# Check and update self.tip
|
||||
undo_ops = self.db.read_undo_info(self.height)
|
||||
undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height)
|
||||
if undo_ops is None:
|
||||
raise ChainError(f'no undo information found for height {self.height:,d}')
|
||||
self.db_op_stack.apply_packed_undo_ops(undo_ops)
|
||||
self.db_op_stack.append(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops))
|
||||
self.db_op_stack.apply_packed_undo_ops(undo_ops)
|
||||
|
||||
touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes)
|
||||
self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims)
|
||||
self.removed_claims_to_send_es.difference_update(touched_and_deleted.touched_claims)
|
||||
self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims)
|
||||
|
||||
self.db.headers.pop()
|
||||
self.block_hashes.pop()
|
||||
self.db.tx_counts.pop()
|
||||
|
|
|
@ -25,6 +25,7 @@ class DB_PREFIXES(enum.Enum):
|
|||
reposted_claim = b'W'
|
||||
|
||||
undo = b'M'
|
||||
claim_diff = b'Y'
|
||||
|
||||
tx = b'B'
|
||||
block_hash = b'C'
|
||||
|
|
|
@ -387,6 +387,20 @@ class RepostedValue(typing.NamedTuple):
|
|||
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
|
||||
|
||||
|
||||
class TouchedOrDeletedClaimKey(typing.NamedTuple):
|
||||
height: int
|
||||
|
||||
|
||||
class TouchedOrDeletedClaimValue(typing.NamedTuple):
|
||||
touched_claims: typing.Set[bytes]
|
||||
deleted_claims: typing.Set[bytes]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.__class__.__name__}(" \
|
||||
f"touched_claims={','.join(map(lambda x: x.hex(), self.touched_claims))}," \
|
||||
f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})"
|
||||
|
||||
|
||||
class ActiveAmountPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.active_amount.value
|
||||
key_struct = struct.Struct(b'>20sBLLH')
|
||||
|
@ -1219,6 +1233,38 @@ class HashXHistoryPrefixRow(PrefixRow):
|
|||
return cls.pack_key(hashX, height), cls.pack_value(history)
|
||||
|
||||
|
||||
class TouchedOrDeletedPrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.claim_diff.value
|
||||
key_struct = struct.Struct(b'>L')
|
||||
value_struct = struct.Struct(b'>LL')
|
||||
|
||||
@classmethod
|
||||
def pack_key(cls, height: int):
|
||||
return super().pack_key(height)
|
||||
|
||||
@classmethod
|
||||
def unpack_key(cls, key: bytes) -> TouchedOrDeletedClaimKey:
|
||||
return TouchedOrDeletedClaimKey(*super().unpack_key(key))
|
||||
|
||||
@classmethod
|
||||
def pack_value(cls, touched, deleted) -> bytes:
|
||||
return cls.value_struct.pack(len(touched), len(deleted)) + b''.join(touched) + b''.join(deleted)
|
||||
|
||||
@classmethod
|
||||
def unpack_value(cls, data: bytes) -> TouchedOrDeletedClaimValue:
|
||||
touched_len, deleted_len = cls.value_struct.unpack(data[:8])
|
||||
assert len(data) == 20 * (touched_len + deleted_len) + 8
|
||||
touched_bytes, deleted_bytes = data[8:touched_len*20+8], data[touched_len*20+8:touched_len*20+deleted_len*20+8]
|
||||
return TouchedOrDeletedClaimValue(
|
||||
{touched_bytes[8+20*i:8+20*(i+1)] for i in range(touched_len)},
|
||||
{deleted_bytes[8+20*i:8+20*(i+1)] for i in range(deleted_len)}
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, height, touched, deleted):
|
||||
return cls.pack_key(height), cls.pack_value(touched, deleted)
|
||||
|
||||
|
||||
class Prefixes:
|
||||
claim_to_support = ClaimToSupportPrefixRow
|
||||
support_to_claim = SupportToClaimPrefixRow
|
||||
|
@ -1252,6 +1298,8 @@ class Prefixes:
|
|||
tx_num = TXNumPrefixRow
|
||||
tx = TXPrefixRow
|
||||
header = BlockHeaderPrefixRow
|
||||
touched_or_deleted = TouchedOrDeletedPrefixRow
|
||||
|
||||
|
||||
|
||||
ROW_TYPES = {
|
||||
|
|
|
@ -784,12 +784,18 @@ class LevelDB:
|
|||
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
delete_undo_keys = []
|
||||
if min_height > 0:
|
||||
if min_height > 0: # delete undos for blocks deep enough they can't be reorged
|
||||
delete_undo_keys.extend(
|
||||
self.db.iterator(
|
||||
start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False
|
||||
)
|
||||
)
|
||||
delete_undo_keys.extend(
|
||||
self.db.iterator(
|
||||
start=Prefixes.touched_or_deleted.pack_key(0),
|
||||
stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False
|
||||
)
|
||||
)
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
|
@ -1006,7 +1012,7 @@ class LevelDB:
|
|||
return Prefixes.undo.pack_key(height)
|
||||
|
||||
def read_undo_info(self, height: int):
|
||||
return self.db.get(Prefixes.undo.pack_key(height))
|
||||
return self.db.get(Prefixes.undo.pack_key(height)), self.db.get(Prefixes.touched_or_deleted.pack_key(height))
|
||||
|
||||
# -- UTXO database
|
||||
|
||||
|
|
|
@ -813,10 +813,15 @@ class TransactionOutputCommands(ClaimTestCase):
|
|||
stream_id = self.get_claim_id(await self.stream_create())
|
||||
await self.support_create(stream_id, '0.3')
|
||||
await self.support_create(stream_id, '0.2')
|
||||
await self.generate(day_blocks)
|
||||
await self.generate(day_blocks // 2)
|
||||
await self.stream_update(stream_id)
|
||||
await self.generate(day_blocks // 2)
|
||||
await self.support_create(stream_id, '0.4')
|
||||
await self.support_create(stream_id, '0.5')
|
||||
await self.generate(day_blocks)
|
||||
await self.stream_update(stream_id)
|
||||
await self.generate(day_blocks // 2)
|
||||
await self.stream_update(stream_id)
|
||||
await self.generate(day_blocks // 2)
|
||||
await self.support_create(stream_id, '0.6')
|
||||
|
||||
plot = await self.txo_plot(type='support')
|
||||
|
|
|
@ -17,8 +17,13 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
if claim_id is None:
|
||||
self.assertIn('error', other)
|
||||
self.assertEqual(other['error']['name'], 'NOT_FOUND')
|
||||
claims_from_es = (await self.conductor.spv_node.server.bp.db.search_index.search(name=name))[0]
|
||||
claims_from_es = [c['claim_hash'][::-1].hex() for c in claims_from_es]
|
||||
self.assertNotIn(claim_id, claims_from_es)
|
||||
else:
|
||||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id)
|
||||
self.assertEqual(claim_id, other['claim_id'])
|
||||
self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex())
|
||||
|
||||
async def assertNoClaimForName(self, name: str):
|
||||
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
|
||||
|
@ -28,11 +33,18 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
self.assertIsInstance(stream, LookupError)
|
||||
else:
|
||||
self.assertIsInstance(channel, LookupError)
|
||||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name)
|
||||
self.assertListEqual([], claim_from_es[0])
|
||||
|
||||
async def assertMatchWinningClaim(self, name):
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
|
||||
stream, channel = await self.conductor.spv_node.server.bp.db.fs_resolve(name)
|
||||
claim = stream if stream else channel
|
||||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
|
||||
claim_id=claim.claim_hash.hex()
|
||||
)
|
||||
self.assertEqual(len(claim_from_es[0]), 1)
|
||||
self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex())
|
||||
self.assertEqual(expected['claimId'], claim.claim_hash.hex())
|
||||
self.assertEqual(expected['validAtHeight'], claim.activation_height)
|
||||
self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height)
|
||||
|
|
Loading…
Reference in a new issue