diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 637a4a0f7..c33bd19f5 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -249,8 +249,12 @@ class BlockProcessor: self.possible_future_claim_amount_by_name_and_hash: Dict[Tuple[str, bytes], int] = {} self.possible_future_support_txos_by_claim_hash: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list) - self.removed_claims_to_send_es = set() + self.removed_claims_to_send_es = set() # cumulative changes across blocks to send ES self.touched_claims_to_send_es = set() + + self.removed_claim_hashes: Set[bytes] = set() # per block changes + self.touched_claim_hashes: Set[bytes] = set() + self.signatures_changed = set() self.pending_reposted = set() @@ -268,16 +272,9 @@ class BlockProcessor: if self.db.db_height <= 1: return - to_send_es = set(self.touched_claims_to_send_es) - to_send_es.update(self.pending_reposted.difference(self.removed_claims_to_send_es)) - to_send_es.update( - {k for k, v in self.pending_channel_counts.items() if v != 0}.difference( - self.removed_claims_to_send_es) - ) - for claim_hash in self.removed_claims_to_send_es: yield 'delete', claim_hash.hex() - for claim in self.db.claims_producer(to_send_es): + for claim in self.db.claims_producer(self.touched_claims_to_send_es): yield 'update', claim async def run_in_thread_with_lock(self, func, *args): @@ -313,14 +310,12 @@ class BlockProcessor: await self.run_in_thread_with_lock(self.advance_block, block) self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) # TODO: we shouldnt wait on the search index updating before advancing to the next block - if not self.db.first_sync: - await self.db.search_index.claim_consumer(self.claim_producer()) - self.db.search_index.clear_caches() - self.touched_claims_to_send_es.clear() - self.removed_claims_to_send_es.clear() - self.pending_reposted.clear() - self.pending_channel_counts.clear() - # print("******************\n") + if not self.db.first_sync: + await self.db.search_index.claim_consumer(self.claim_producer()) + self.db.search_index.clear_caches() + self.touched_claims_to_send_es.clear() + self.removed_claims_to_send_es.clear() + # print("******************\n") except: self.logger.exception("advance blocks failed") raise @@ -351,6 +346,14 @@ class BlockProcessor: assert count > 0, count for _ in range(count): await self.run_in_thread_with_lock(self.backup_block) + for touched in self.touched_claims_to_send_es: + if not self.db.get_claim_txo(touched): + self.removed_claims_to_send_es.add(touched) + self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es) + await self.db.search_index.claim_consumer(self.claim_producer()) + self.db.search_index.clear_caches() + self.touched_claims_to_send_es.clear() + self.removed_claims_to_send_es.clear() await self.prefetcher.reset_height(self.height) self.reorg_count_metric.inc() except: @@ -995,6 +998,9 @@ class BlockProcessor: ).get_activate_ops() ) self.db_op_stack.extend(get_takeover_name_ops(name, winning_including_future_activations, height, controlling)) + self.touched_claim_hashes.add(winning_including_future_activations) + if controlling and controlling.claim_hash not in self.abandoned_claims: + self.touched_claim_hashes.add(controlling.claim_hash) elif not controlling or (winning_claim_hash != controlling.claim_hash and name in names_with_abandoned_controlling_claims) or \ ((winning_claim_hash != controlling.claim_hash) and (amounts[winning_claim_hash] > amounts[controlling.claim_hash])): @@ -1025,6 +1031,9 @@ class BlockProcessor: ).get_activate_ops() ) self.db_op_stack.extend(get_takeover_name_ops(name, winning_claim_hash, height, controlling)) + if controlling and controlling.claim_hash not in self.abandoned_claims: + self.touched_claim_hashes.add(controlling.claim_hash) + self.touched_claim_hashes.add(winning_claim_hash) elif winning_claim_hash == controlling.claim_hash: # print("\tstill winning") pass @@ -1048,19 +1057,23 @@ class BlockProcessor: if (controlling and winning != controlling.claim_hash) or (not controlling and winning): # print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}") self.db_op_stack.extend(get_takeover_name_ops(name, winning, height, controlling)) + if controlling: + self.touched_claim_hashes.add(controlling.claim_hash) + self.touched_claim_hashes.add(winning) + def _get_cumulative_update_ops(self): # gather cumulative removed/touched sets to update the search index - self.removed_claims_to_send_es.update(set(self.abandoned_claims.keys())) - self.touched_claims_to_send_es.update( + self.removed_claim_hashes.update(set(self.abandoned_claims.keys())) + self.touched_claim_hashes.update( set(self.activated_support_amount_by_claim.keys()).union( set(claim_hash for (_, claim_hash) in self.activated_claim_amount_by_name_and_hash.keys()) ).union(self.signatures_changed).union( set(self.removed_active_support_amount_by_claim.keys()) - ).difference(self.removed_claims_to_send_es) + ).difference(self.removed_claim_hashes) ) # use the cumulative changes to update bid ordered resolve - for removed in self.removed_claims_to_send_es: + for removed in self.removed_claim_hashes: removed_claim = self.db.get_claim_txo(removed) if removed_claim: amt = self.db.get_url_effective_amount( @@ -1071,7 +1084,7 @@ class BlockProcessor: removed_claim.name, amt.effective_amount, amt.tx_num, amt.position, removed )) - for touched in self.touched_claims_to_send_es: + for touched in self.touched_claim_hashes: if touched in self.claim_hash_to_txo: pending = self.txo_to_claim[self.claim_hash_to_txo[touched]] name, tx_num, position = pending.name, pending.tx_num, pending.position @@ -1098,6 +1111,16 @@ class BlockProcessor: tx_num, position, touched) ) + self.touched_claim_hashes.update( + {k for k in self.pending_reposted if k not in self.removed_claim_hashes} + ) + self.touched_claim_hashes.update( + {k for k, v in self.pending_channel_counts.items() if v != 0 and k not in self.removed_claim_hashes} + ) + self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) + self.touched_claims_to_send_es.update(self.touched_claim_hashes) + self.removed_claims_to_send_es.update(self.removed_claim_hashes) + def advance_block(self, block): height = self.height + 1 # print("advance ", height) @@ -1168,6 +1191,9 @@ class BlockProcessor: # activate claims and process takeovers self._get_takeover_ops(height) + # update effective amount and update sets of touched and deleted claims + self._get_cumulative_update_ops() + self.db_op_stack.append(RevertablePut(*Prefixes.header.pack_item(height, block.header))) self.db_op_stack.append(RevertablePut(*Prefixes.tx_count.pack_item(height, tx_count))) @@ -1185,8 +1211,20 @@ class BlockProcessor: self.tx_count = tx_count self.db.tx_counts.append(self.tx_count) - if height >= self.daemon.cached_height() - self.env.reorg_limit: - self.db_op_stack.append(RevertablePut(*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops()))) + cached_max_reorg_depth = self.daemon.cached_height() - self.env.reorg_limit + if height >= cached_max_reorg_depth: + self.db_op_stack.append( + RevertablePut( + *Prefixes.touched_or_deleted.pack_item( + height, self.touched_claim_hashes, self.removed_claim_hashes + ) + ) + ) + self.db_op_stack.append( + RevertablePut( + *Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops()) + ) + ) self.height = height self.db.headers.append(block.header) @@ -1220,16 +1258,26 @@ class BlockProcessor: self.hashXs_by_tx.clear() self.history_cache.clear() self.notifications.notified_mempool_txs.clear() + self.removed_claim_hashes.clear() + self.touched_claim_hashes.clear() + self.pending_reposted.clear() + self.pending_channel_counts.clear() def backup_block(self): self.db.assert_flushed(self.flush_data()) self.logger.info("backup block %i", self.height) # Check and update self.tip - undo_ops = self.db.read_undo_info(self.height) + undo_ops, touched_and_deleted_bytes = self.db.read_undo_info(self.height) if undo_ops is None: raise ChainError(f'no undo information found for height {self.height:,d}') - self.db_op_stack.apply_packed_undo_ops(undo_ops) self.db_op_stack.append(RevertableDelete(Prefixes.undo.pack_key(self.height), undo_ops)) + self.db_op_stack.apply_packed_undo_ops(undo_ops) + + touched_and_deleted = Prefixes.touched_or_deleted.unpack_value(touched_and_deleted_bytes) + self.touched_claims_to_send_es.update(touched_and_deleted.touched_claims) + self.removed_claims_to_send_es.difference_update(touched_and_deleted.touched_claims) + self.removed_claims_to_send_es.update(touched_and_deleted.deleted_claims) + self.db.headers.pop() self.block_hashes.pop() self.db.tx_counts.pop() diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index c6723afbf..ec33b6ead 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -25,6 +25,7 @@ class DB_PREFIXES(enum.Enum): reposted_claim = b'W' undo = b'M' + claim_diff = b'Y' tx = b'B' block_hash = b'C' diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index d1cc21b99..86254f394 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -387,6 +387,20 @@ class RepostedValue(typing.NamedTuple): return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})" +class TouchedOrDeletedClaimKey(typing.NamedTuple): + height: int + + +class TouchedOrDeletedClaimValue(typing.NamedTuple): + touched_claims: typing.Set[bytes] + deleted_claims: typing.Set[bytes] + + def __str__(self): + return f"{self.__class__.__name__}(" \ + f"touched_claims={','.join(map(lambda x: x.hex(), self.touched_claims))}," \ + f"deleted_claims={','.join(map(lambda x: x.hex(), self.deleted_claims))})" + + class ActiveAmountPrefixRow(PrefixRow): prefix = DB_PREFIXES.active_amount.value key_struct = struct.Struct(b'>20sBLLH') @@ -1219,6 +1233,38 @@ class HashXHistoryPrefixRow(PrefixRow): return cls.pack_key(hashX, height), cls.pack_value(history) +class TouchedOrDeletedPrefixRow(PrefixRow): + prefix = DB_PREFIXES.claim_diff.value + key_struct = struct.Struct(b'>L') + value_struct = struct.Struct(b'>LL') + + @classmethod + def pack_key(cls, height: int): + return super().pack_key(height) + + @classmethod + def unpack_key(cls, key: bytes) -> TouchedOrDeletedClaimKey: + return TouchedOrDeletedClaimKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, touched, deleted) -> bytes: + return cls.value_struct.pack(len(touched), len(deleted)) + b''.join(touched) + b''.join(deleted) + + @classmethod + def unpack_value(cls, data: bytes) -> TouchedOrDeletedClaimValue: + touched_len, deleted_len = cls.value_struct.unpack(data[:8]) + assert len(data) == 20 * (touched_len + deleted_len) + 8 + touched_bytes, deleted_bytes = data[8:touched_len*20+8], data[touched_len*20+8:touched_len*20+deleted_len*20+8] + return TouchedOrDeletedClaimValue( + {touched_bytes[8+20*i:8+20*(i+1)] for i in range(touched_len)}, + {deleted_bytes[8+20*i:8+20*(i+1)] for i in range(deleted_len)} + ) + + @classmethod + def pack_item(cls, height, touched, deleted): + return cls.pack_key(height), cls.pack_value(touched, deleted) + + class Prefixes: claim_to_support = ClaimToSupportPrefixRow support_to_claim = SupportToClaimPrefixRow @@ -1252,6 +1298,8 @@ class Prefixes: tx_num = TXNumPrefixRow tx = TXPrefixRow header = BlockHeaderPrefixRow + touched_or_deleted = TouchedOrDeletedPrefixRow + ROW_TYPES = { diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 4132e8c33..61dec5697 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -784,12 +784,18 @@ class LevelDB: min_height = self.min_undo_height(self.db_height) delete_undo_keys = [] - if min_height > 0: + if min_height > 0: # delete undos for blocks deep enough they can't be reorged delete_undo_keys.extend( self.db.iterator( start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False ) ) + delete_undo_keys.extend( + self.db.iterator( + start=Prefixes.touched_or_deleted.pack_key(0), + stop=Prefixes.touched_or_deleted.pack_key(min_height), include_value=False + ) + ) with self.db.write_batch() as batch: batch_put = batch.put @@ -1006,7 +1012,7 @@ class LevelDB: return Prefixes.undo.pack_key(height) def read_undo_info(self, height: int): - return self.db.get(Prefixes.undo.pack_key(height)) + return self.db.get(Prefixes.undo.pack_key(height)), self.db.get(Prefixes.touched_or_deleted.pack_key(height)) # -- UTXO database diff --git a/tests/integration/blockchain/test_claim_commands.py b/tests/integration/blockchain/test_claim_commands.py index 541b6e72c..2e7b36cea 100644 --- a/tests/integration/blockchain/test_claim_commands.py +++ b/tests/integration/blockchain/test_claim_commands.py @@ -813,10 +813,15 @@ class TransactionOutputCommands(ClaimTestCase): stream_id = self.get_claim_id(await self.stream_create()) await self.support_create(stream_id, '0.3') await self.support_create(stream_id, '0.2') - await self.generate(day_blocks) + await self.generate(day_blocks // 2) + await self.stream_update(stream_id) + await self.generate(day_blocks // 2) await self.support_create(stream_id, '0.4') await self.support_create(stream_id, '0.5') - await self.generate(day_blocks) + await self.stream_update(stream_id) + await self.generate(day_blocks // 2) + await self.stream_update(stream_id) + await self.generate(day_blocks // 2) await self.support_create(stream_id, '0.6') plot = await self.txo_plot(type='support') diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index 5f230f53f..55b710323 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -17,8 +17,13 @@ class BaseResolveTestCase(CommandTestCase): if claim_id is None: self.assertIn('error', other) self.assertEqual(other['error']['name'], 'NOT_FOUND') + claims_from_es = (await self.conductor.spv_node.server.bp.db.search_index.search(name=name))[0] + claims_from_es = [c['claim_hash'][::-1].hex() for c in claims_from_es] + self.assertNotIn(claim_id, claims_from_es) else: + claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(claim_id=claim_id) self.assertEqual(claim_id, other['claim_id']) + self.assertEqual(claim_id, claim_from_es[0][0]['claim_hash'][::-1].hex()) async def assertNoClaimForName(self, name: str): lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) @@ -28,11 +33,18 @@ class BaseResolveTestCase(CommandTestCase): self.assertIsInstance(stream, LookupError) else: self.assertIsInstance(channel, LookupError) + claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(name=name) + self.assertListEqual([], claim_from_es[0]) async def assertMatchWinningClaim(self, name): expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name)) stream, channel = await self.conductor.spv_node.server.bp.db.fs_resolve(name) claim = stream if stream else channel + claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search( + claim_id=claim.claim_hash.hex() + ) + self.assertEqual(len(claim_from_es[0]), 1) + self.assertEqual(claim_from_es[0][0]['claim_hash'][::-1].hex(), claim.claim_hash.hex()) self.assertEqual(expected['claimId'], claim.claim_hash.hex()) self.assertEqual(expected['validAtHeight'], claim.activation_height) self.assertEqual(expected['lastTakeoverHeight'], claim.last_takeover_height)