diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 86ead1cce..1a6add799 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -12,7 +12,7 @@ from lbry.db.queries.txio import ( where_claims_with_changed_reposts, ) from lbry.db.query_context import ProgressContext, event_emitter -from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes +from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes, ClaimFilter from lbry.db.utils import least from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.blockchain.transaction import Output @@ -281,3 +281,17 @@ def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: Progress if result.rowcount and p.ctx.is_postgres: p.ctx.execute_notx(text("VACUUM claim;")) p.step(result.rowcount) + + +@event_emitter("blockchain.sync.claims.filters", "claim_filters") +def update_claim_filters(blocking_channel_hashes, filtering_channel_hashes, p: ProgressContext): + def select_reposts(channel_hashes, filter_type=0): + return select( + Claim.c.reposted_claim_hash, filter_type).where( + (Claim.c.channel_hash.in_(filtering_channel_hashes)) & (Claim.c.reposted_claim_hash.isnot(None))) + + p.ctx.execute(ClaimFilter.delete()) + p.ctx.execute(ClaimFilter.insert().from_select( + ['claim_hash', 'filter_type'], select_reposts(blocking_channel_hashes, 1))) + p.ctx.execute(p.ctx.insert_or_ignore(ClaimFilter).from_select( + ['claim_hash', 'filter_type'], select_reposts(filtering_channel_hashes, 0))) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index f6fc5f7b7..5daeb248d 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -53,6 +53,8 @@ class BlockchainSync(Sync): self.block_hash_event = asyncio.Event() self.tx_hash_event = asyncio.Event() self.mempool = [] + self.filtering_channel_hashes = set() + self.blocking_channel_hashes = set() async def wait_for_chain_ready(self): while True: @@ -351,6 +353,10 @@ class BlockchainSync(Sync): ending_height = await self.chain.db.get_best_height() await self.db.run(trending.calculate_trending, ending_height) + async def sync_claim_filtering(self): + await self.db.run( + claim_phase.update_claim_filters, self.blocking_channel_hashes, self.filtering_channel_hashes) + async def advance(self): blocks_added = await self.sync_blocks() await self.sync_spends(blocks_added) @@ -359,6 +365,7 @@ class BlockchainSync(Sync): await self.sync_supports(blocks_added) await self.sync_channel_stats(blocks_added, initial_claim_sync) await self.sync_trends() + await self.sync_claim_filtering() if blocks_added: await self._on_block_controller.add(BlockEvent(blocks_added[-1])) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 5d7860740..f764d85bb 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -334,3 +334,9 @@ Trending = Table( Column('trending_local', BigInteger, server_default='0'), Column('trending_global', BigInteger, server_default='0'), ) + +ClaimFilter = Table( + 'claim_filter', metadata, + Column('claim_hash', LargeBinary, primary_key=True), + Column('filter_type', SmallInteger), +) diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index d99799914..45d851dab 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -127,10 +127,15 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): name, hexlify(claim.to_bytes()).decode(), amount ) meta_class = Channel if is_channel else Stream + if repost: + claim = Claim() + claim.repost.reference.claim_id = repost + else: + claim = meta_class().update(title='claim #001').claim tx = Transaction().add_outputs([ Output.pay_claim_name_pubkey_hash( lbc_to_dewies(amount), name, - meta_class().update(title='claim #001').claim, + claim, self.chain.ledger.address_to_hash160(self.address) ) ]) @@ -152,11 +157,14 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): signed = await self.chain.sign_raw_transaction_with_wallet(hexlify(tx.raw).decode()) tx = Transaction(unhexlify(signed['hex'])) txo = self.find_claim_txo(tx) - claim = txo.claim.channel if is_channel else txo.claim.stream if txo.claim_id.startswith(claim_id_startswith): if txo.claim_id[len(claim_id_startswith)] not in not_after_startswith: break + if repost: + assert not claim_id_startswith, "not supported with repost" + break i += 1 + claim = txo.claim.channel if is_channel else txo.claim.stream claim.update(title=f'claim #{i:03}') txo.script.generate() if private_key: @@ -1163,7 +1171,6 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): self.assertEqual(4, results.rows[1].meta['trending_group']) - class TestClaimtrieSync(SyncingBlockchainTestCase): async def test_claimtrie_name_normalization_query_bug(self): @@ -1374,6 +1381,18 @@ class TestClaimtrieSync(SyncingBlockchainTestCase): accepted=[] ) + async def test_content_filtering(self): + user_chan = await self.get_claim( + await self.create_claim(claim_id_startswith='ab', is_channel=True, name="@some_channel")) + await self.create_claim(claim_id_startswith='cd', sign=user_chan, name="good_content") + bad_content = await self.get_claim( + await self.create_claim(claim_id_startswith='ef', sign=user_chan, name="bad_content")) + moderator_chan = await self.get_claim( + await self.create_claim(claim_id_startswith='ab', is_channel=True, name="@filters")) + await self.create_claim(sign=moderator_chan, name="blocking_bad", repost=bad_content.claim_id) + self.sync.filtering_channel_hashes.add(moderator_chan.claim_hash) + await self.generate(1) + @skip class TestTrending(SyncingBlockchainTestCase):