load hashes into claim filter table

This commit is contained in:
Victor Shyba 2020-11-04 01:11:20 -03:00 committed by Lex Berezhny
parent 76b3bfe975
commit 6bb8a69efc
4 changed files with 50 additions and 4 deletions

View file

@ -12,7 +12,7 @@ from lbry.db.queries.txio import (
where_claims_with_changed_reposts, where_claims_with_changed_reposts,
) )
from lbry.db.query_context import ProgressContext, event_emitter from lbry.db.query_context import ProgressContext, event_emitter
from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes, ClaimFilter
from lbry.db.utils import least from lbry.db.utils import least
from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES
from lbry.blockchain.transaction import Output from lbry.blockchain.transaction import Output
@ -281,3 +281,17 @@ def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: Progress
if result.rowcount and p.ctx.is_postgres: if result.rowcount and p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM claim;")) p.ctx.execute_notx(text("VACUUM claim;"))
p.step(result.rowcount) p.step(result.rowcount)
@event_emitter("blockchain.sync.claims.filters", "claim_filters")
def update_claim_filters(blocking_channel_hashes, filtering_channel_hashes, p: ProgressContext):
def select_reposts(channel_hashes, filter_type=0):
return select(
Claim.c.reposted_claim_hash, filter_type).where(
(Claim.c.channel_hash.in_(filtering_channel_hashes)) & (Claim.c.reposted_claim_hash.isnot(None)))
p.ctx.execute(ClaimFilter.delete())
p.ctx.execute(ClaimFilter.insert().from_select(
['claim_hash', 'filter_type'], select_reposts(blocking_channel_hashes, 1)))
p.ctx.execute(p.ctx.insert_or_ignore(ClaimFilter).from_select(
['claim_hash', 'filter_type'], select_reposts(filtering_channel_hashes, 0)))

View file

@ -53,6 +53,8 @@ class BlockchainSync(Sync):
self.block_hash_event = asyncio.Event() self.block_hash_event = asyncio.Event()
self.tx_hash_event = asyncio.Event() self.tx_hash_event = asyncio.Event()
self.mempool = [] self.mempool = []
self.filtering_channel_hashes = set()
self.blocking_channel_hashes = set()
async def wait_for_chain_ready(self): async def wait_for_chain_ready(self):
while True: while True:
@ -351,6 +353,10 @@ class BlockchainSync(Sync):
ending_height = await self.chain.db.get_best_height() ending_height = await self.chain.db.get_best_height()
await self.db.run(trending.calculate_trending, ending_height) await self.db.run(trending.calculate_trending, ending_height)
async def sync_claim_filtering(self):
await self.db.run(
claim_phase.update_claim_filters, self.blocking_channel_hashes, self.filtering_channel_hashes)
async def advance(self): async def advance(self):
blocks_added = await self.sync_blocks() blocks_added = await self.sync_blocks()
await self.sync_spends(blocks_added) await self.sync_spends(blocks_added)
@ -359,6 +365,7 @@ class BlockchainSync(Sync):
await self.sync_supports(blocks_added) await self.sync_supports(blocks_added)
await self.sync_channel_stats(blocks_added, initial_claim_sync) await self.sync_channel_stats(blocks_added, initial_claim_sync)
await self.sync_trends() await self.sync_trends()
await self.sync_claim_filtering()
if blocks_added: if blocks_added:
await self._on_block_controller.add(BlockEvent(blocks_added[-1])) await self._on_block_controller.add(BlockEvent(blocks_added[-1]))

View file

@ -334,3 +334,9 @@ Trending = Table(
Column('trending_local', BigInteger, server_default='0'), Column('trending_local', BigInteger, server_default='0'),
Column('trending_global', BigInteger, server_default='0'), Column('trending_global', BigInteger, server_default='0'),
) )
ClaimFilter = Table(
'claim_filter', metadata,
Column('claim_hash', LargeBinary, primary_key=True),
Column('filter_type', SmallInteger),
)

View file

@ -127,10 +127,15 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase):
name, hexlify(claim.to_bytes()).decode(), amount name, hexlify(claim.to_bytes()).decode(), amount
) )
meta_class = Channel if is_channel else Stream meta_class = Channel if is_channel else Stream
if repost:
claim = Claim()
claim.repost.reference.claim_id = repost
else:
claim = meta_class().update(title='claim #001').claim
tx = Transaction().add_outputs([ tx = Transaction().add_outputs([
Output.pay_claim_name_pubkey_hash( Output.pay_claim_name_pubkey_hash(
lbc_to_dewies(amount), name, lbc_to_dewies(amount), name,
meta_class().update(title='claim #001').claim, claim,
self.chain.ledger.address_to_hash160(self.address) self.chain.ledger.address_to_hash160(self.address)
) )
]) ])
@ -152,11 +157,14 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase):
signed = await self.chain.sign_raw_transaction_with_wallet(hexlify(tx.raw).decode()) signed = await self.chain.sign_raw_transaction_with_wallet(hexlify(tx.raw).decode())
tx = Transaction(unhexlify(signed['hex'])) tx = Transaction(unhexlify(signed['hex']))
txo = self.find_claim_txo(tx) txo = self.find_claim_txo(tx)
claim = txo.claim.channel if is_channel else txo.claim.stream
if txo.claim_id.startswith(claim_id_startswith): if txo.claim_id.startswith(claim_id_startswith):
if txo.claim_id[len(claim_id_startswith)] not in not_after_startswith: if txo.claim_id[len(claim_id_startswith)] not in not_after_startswith:
break break
if repost:
assert not claim_id_startswith, "not supported with repost"
break
i += 1 i += 1
claim = txo.claim.channel if is_channel else txo.claim.stream
claim.update(title=f'claim #{i:03}') claim.update(title=f'claim #{i:03}')
txo.script.generate() txo.script.generate()
if private_key: if private_key:
@ -1163,7 +1171,6 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
self.assertEqual(4, results.rows[1].meta['trending_group']) self.assertEqual(4, results.rows[1].meta['trending_group'])
class TestClaimtrieSync(SyncingBlockchainTestCase): class TestClaimtrieSync(SyncingBlockchainTestCase):
async def test_claimtrie_name_normalization_query_bug(self): async def test_claimtrie_name_normalization_query_bug(self):
@ -1374,6 +1381,18 @@ class TestClaimtrieSync(SyncingBlockchainTestCase):
accepted=[] accepted=[]
) )
async def test_content_filtering(self):
user_chan = await self.get_claim(
await self.create_claim(claim_id_startswith='ab', is_channel=True, name="@some_channel"))
await self.create_claim(claim_id_startswith='cd', sign=user_chan, name="good_content")
bad_content = await self.get_claim(
await self.create_claim(claim_id_startswith='ef', sign=user_chan, name="bad_content"))
moderator_chan = await self.get_claim(
await self.create_claim(claim_id_startswith='ab', is_channel=True, name="@filters"))
await self.create_claim(sign=moderator_chan, name="blocking_bad", repost=bad_content.claim_id)
self.sync.filtering_channel_hashes.add(moderator_chan.claim_hash)
await self.generate(1)
@skip @skip
class TestTrending(SyncingBlockchainTestCase): class TestTrending(SyncingBlockchainTestCase):