From 4592e21424e16d9cd840dea1fb2d05f9c6feb790 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Mon, 7 Dec 2020 07:39:27 -0500 Subject: [PATCH] minor refactoring --- lbry/blockchain/sync/claims.py | 30 +++++++++------- lbry/blockchain/sync/synchronizer.py | 15 ++++---- lbry/conf.py | 9 ++--- lbry/db/constants.py | 3 +- lbry/db/queries/search.py | 30 +++++++++------- lbry/db/queries/txio.py | 4 +-- lbry/db/query_context.py | 4 +-- lbry/db/tables.py | 20 ++++++----- lbry/db/trending.py | 30 ++++++++-------- lbry/schema/result.py | 21 ++++++----- .../integration/blockchain/test_blockchain.py | 35 +++++++++---------- 11 files changed, 105 insertions(+), 96 deletions(-) diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index f796ec3d9..7b07951ea 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -12,9 +12,13 @@ from lbry.db.queries.txio import ( where_claims_with_changed_reposts, ) from lbry.db.query_context import ProgressContext, event_emitter -from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes, ClaimFilter +from lbry.db.tables import ( + TX, TXO, Claim, Support, CensoredClaim, + pg_add_claim_and_tag_constraints_and_indexes +) from lbry.db.utils import least from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES +from lbry.schema.result import Censor from lbry.blockchain.transaction import Output from .context import get_or_initialize_lbrycrd @@ -283,7 +287,7 @@ def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: Progress p.step(result.rowcount) -def select_reposts(channel_hashes, filter_type=0): +def select_reposts(channel_hashes, filter_type): return ( select(Claim.c.reposted_claim_hash, filter_type, Claim.c.channel_hash).where( (Claim.c.channel_hash.in_(channel_hashes)) & @@ -293,13 +297,15 @@ def select_reposts(channel_hashes, filter_type=0): @event_emitter("blockchain.sync.claims.filters", "claim_filters") -def update_claim_filters(blocking_channel_hashes, filtering_channel_hashes, p: ProgressContext): - p.ctx.execute(ClaimFilter.delete()) - # order matters: first we insert the blocked ones. Then the filtered ones. - # If there is already a block in place, that takes priority because a block is just a harder filter - p.ctx.execute(ClaimFilter.insert().from_select( - ['claim_hash', 'filter_type', 'owner_channel_hash'], select_reposts(blocking_channel_hashes, 2)) - ) - p.ctx.execute(p.ctx.insert_or_ignore(ClaimFilter).from_select( - ['claim_hash', 'filter_type', 'owner_channel_hash'], select_reposts(filtering_channel_hashes, 1)) - ) +def update_claim_filters(resolve_censor_channel_hashes, search_censor_channel_hashes, p: ProgressContext): + p.ctx.execute(CensoredClaim.delete()) + # order matters: first we insert the resolve filters; then the search ones. + # a claim that's censored in resolve is automatically also censored in search results. + p.ctx.execute(CensoredClaim.insert().from_select( + ['claim_hash', 'censor_type', 'censoring_channel_hash'], + select_reposts(resolve_censor_channel_hashes, Censor.RESOLVE) + )) + p.ctx.execute(p.ctx.insert_or_ignore(CensoredClaim).from_select( + ['claim_hash', 'censor_type', 'censoring_channel_hash'], + select_reposts(search_censor_channel_hashes, Censor.SEARCH) + )) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index eb95f42b0..fbda6b3a7 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -54,11 +54,11 @@ class BlockchainSync(Sync): self.block_hash_event = asyncio.Event() self.tx_hash_event = asyncio.Event() self.mempool = [] - self.filtering_channel_hashes = { - unhexlify(channel_id)[::-1] for channel_id in self.conf.spv_filtering_channel_ids + self.search_censor_channel_hashes = { + unhexlify(channel_id)[::-1] for channel_id in self.conf.search_censor_channel_ids } - self.blocking_channel_hashes = { - unhexlify(channel_id)[::-1] for channel_id in self.conf.spv_blocking_channel_ids + self.resolve_censor_channel_hashes = { + unhexlify(channel_id)[::-1] for channel_id in self.conf.resolve_censor_channel_ids } async def wait_for_chain_ready(self): @@ -178,8 +178,6 @@ class BlockchainSync(Sync): return starting_height, best_height_processed async def sync_filters(self): - if not self.conf.spv_address_filters: - return with Progress(self.db.message_queue, FILTER_INIT_EVENT) as p: p.start(2) initial_sync = not await self.db.has_filters() @@ -361,7 +359,10 @@ class BlockchainSync(Sync): async def sync_claim_filtering(self): await self.db.run( - claim_phase.update_claim_filters, self.blocking_channel_hashes, self.filtering_channel_hashes) + claim_phase.update_claim_filters, + self.resolve_censor_channel_hashes, + self.search_censor_channel_hashes + ) async def advance(self): blocks_added = await self.sync_blocks() diff --git a/lbry/conf.py b/lbry/conf.py index 1c3b362df..b033c8623 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -626,13 +626,8 @@ class Config(CLIConfig): lbrycrd_peer_port = Integer("Peer port for lbrycrd.", 9246) lbrycrd_zmq = String("ZMQ events address.") lbrycrd_dir = Path("Directory containing lbrycrd data.", metavar='DIR') - spv_address_filters = Toggle( - "Generate Golomb-Rice coding filters for blocks and transactions. Enables " - "light client to synchronize with a full node.", - True - ) - spv_filtering_channel_ids = Strings("List of channel claim ids for filtering claim search results out.", []) - spv_blocking_channel_ids = Strings("List of channel claim ids for blocking resolve results.", []) + search_censor_channel_ids = Strings("List of channel ids for filtering out search results.", []) + resolve_censor_channel_ids = Strings("List of channel ids for filtering out resolve results.", []) # daemon save_files = Toggle("Save downloaded files when calling `get` by default", True) diff --git a/lbry/db/constants.py b/lbry/db/constants.py index 88ba55ab4..0e28df765 100644 --- a/lbry/db/constants.py +++ b/lbry/db/constants.py @@ -54,8 +54,7 @@ SEARCH_INTEGER_PARAMS = { 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', 'tx_position', 'channel_join', 'reposted', 'amount', 'staked_amount', 'support_amount', - 'trending_group', 'trending_mixed', - 'trending_local', 'trending_global', + 'trend_group', 'trend_mixed', 'trend_local', 'trend_global', } SEARCH_PARAMS = { diff --git a/lbry/db/queries/search.py b/lbry/db/queries/search.py index 0c8205b92..25cde4a73 100644 --- a/lbry/db/queries/search.py +++ b/lbry/db/queries/search.py @@ -14,7 +14,7 @@ from lbry.blockchain.transaction import Output from ..utils import query from ..query_context import context -from ..tables import TX, TXO, Claim, Support, Trending, ClaimFilter +from ..tables import TX, TXO, Claim, Support, Trend, CensoredClaim from ..constants import ( TXO_TYPES, STREAM_TYPES, ATTRIBUTE_ARRAY_MAX_LENGTH, SEARCH_INTEGER_PARAMS, SEARCH_ORDER_FIELDS @@ -123,12 +123,12 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ channel_claim.c.short_url.isnot(None), channel_claim.c.short_url + '/' + Claim.c.short_url )]).label('canonical_url'), - func.coalesce(Trending.c.trending_local, 0).label('trending_local'), - func.coalesce(Trending.c.trending_mixed, 0).label('trending_mixed'), - func.coalesce(Trending.c.trending_global, 0).label('trending_global'), - func.coalesce(Trending.c.trending_group, 0).label('trending_group'), - func.coalesce(ClaimFilter.c.filter_type, 0).label('censor_type'), - ClaimFilter.c.owner_channel_hash.label('censor_owner_hash') + func.coalesce(Trend.c.trend_local, 0).label('trend_local'), + func.coalesce(Trend.c.trend_mixed, 0).label('trend_mixed'), + func.coalesce(Trend.c.trend_global, 0).label('trend_global'), + func.coalesce(Trend.c.trend_group, 0).label('trend_group'), + CensoredClaim.c.censor_type, + CensoredClaim.c.censoring_channel_hash ] @@ -150,7 +150,7 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: column = 'claim_name' table = "trend" if column.startswith('trend') else "claim" column = f"{table}.{column}" - if column in ('trending_group', 'trending_mixed', 'release_time'): + if column in ('trend_group', 'trend_mixed', 'release_time'): column = f"COALESCE({column}, {1<<32})" sql_order_by.append( f"{column} {'ASC' if is_asc else 'DESC'}" @@ -276,12 +276,16 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: [Claim, TXO], select(*cols) .select_from( - Claim.join(TXO).join(TX).join(Trending, Trending.c.claim_hash == Claim.c.claim_hash, isouter=True) + Claim.join(TXO).join(TX) + .join(Trend, Trend.c.claim_hash == Claim.c.claim_hash, isouter=True) .join(channel_claim, Claim.c.channel_hash == channel_claim.c.claim_hash, isouter=True) - .join(ClaimFilter, - (ClaimFilter.c.claim_hash == Claim.c.claim_hash) | - (ClaimFilter.c.claim_hash == Claim.c.reposted_claim_hash) | - (ClaimFilter.c.claim_hash == Claim.c.channel_hash), isouter=True) + .join( + CensoredClaim, + (CensoredClaim.c.claim_hash == Claim.c.claim_hash) | + (CensoredClaim.c.claim_hash == Claim.c.reposted_claim_hash) | + (CensoredClaim.c.claim_hash == Claim.c.channel_hash), + isouter=True + ) ), **constraints ) diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index 7e5e7f911..99db5c38b 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -388,8 +388,8 @@ META_ATTRS = ( 'activation_height', 'takeover_height', 'creation_height', 'staked_amount', 'short_url', 'canonical_url', 'staked_support_amount', 'staked_support_count', 'signed_claim_count', 'signed_support_count', 'is_signature_valid', - 'reposted_count', 'expiration_height', 'trending_group', 'trending_mixed', - 'trending_local', 'trending_global' + 'trend_group', 'trend_mixed', 'trend_local', 'trend_global', + 'reposted_count', 'expiration_height', ) diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index bb3f3be52..68e192500 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -87,11 +87,11 @@ class QueryContext: @classmethod def get_resolve_censor(cls) -> Censor: - return Censor(level=2) + return Censor(Censor.RESOLVE) @classmethod def get_search_censor(cls) -> Censor: - return Censor(level=1) + return Censor(Censor.SEARCH) def pg_copy(self, table, rows): with self.engine.begin() as c: diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 54557b91b..acb3206e4 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -326,18 +326,20 @@ Stake = Table( Column('stake_unique', Integer), ) -Trending = Table( + +Trend = Table( 'trend', metadata, Column('claim_hash', LargeBinary, primary_key=True), - Column('trending_group', BigInteger, server_default='0'), - Column('trending_mixed', BigInteger, server_default='0'), - Column('trending_local', BigInteger, server_default='0'), - Column('trending_global', BigInteger, server_default='0'), + Column('trend_group', BigInteger, server_default='0'), + Column('trend_mixed', BigInteger, server_default='0'), + Column('trend_local', BigInteger, server_default='0'), + Column('trend_global', BigInteger, server_default='0'), ) -ClaimFilter = Table( - 'claim_filter', metadata, + +CensoredClaim = Table( + 'censored_claim', metadata, Column('claim_hash', LargeBinary, primary_key=True), - Column('owner_channel_hash', LargeBinary), - Column('filter_type', SmallInteger), + Column('censor_type', SmallInteger), + Column('censoring_channel_hash', LargeBinary), ) diff --git a/lbry/db/trending.py b/lbry/db/trending.py index 1bc49ad38..e62e0dc49 100644 --- a/lbry/db/trending.py +++ b/lbry/db/trending.py @@ -2,24 +2,24 @@ from sqlalchemy import select from sqlalchemy.sql import func from lbry.db.query_context import event_emitter, ProgressContext -from lbry.db.tables import Trending, Support, Claim +from lbry.db.tables import Trend, Support, Claim WINDOW = 576 # a day @event_emitter("blockchain.sync.trending.update", "steps") def calculate_trending(height, p: ProgressContext): - # zero all as decay with p.ctx.engine.begin() as ctx: - _trending(height, ctx) - - -def _trending(height, ctx): - ctx.execute(Trending.delete()) - start = height - WINDOW - trending = func.sum(Support.c.amount * (WINDOW - (height - Support.c.height))) - sql = select([Claim.c.claim_hash, trending, trending, trending, 4]).where( - (Support.c.claim_hash == Claim.c.claim_hash) - & (Support.c.height <= height) - & (Support.c.height >= start)).group_by(Claim.c.claim_hash) - ctx.execute(Trending.insert().from_select( - ['claim_hash', 'trending_global', 'trending_local', 'trending_mixed', 'trending_group'], sql)) + ctx.execute(Trend.delete()) + start = height - WINDOW + trending = func.sum(Support.c.amount * (WINDOW - (height - Support.c.height))) + sql = ( + select([Claim.c.claim_hash, trending, trending, trending, 4]) + .where( + (Support.c.claim_hash == Claim.c.claim_hash) & + (Support.c.height <= height) & + (Support.c.height >= start) + ).group_by(Claim.c.claim_hash) + ) + ctx.execute(Trend.insert().from_select( + ['claim_hash', 'trend_global', 'trend_local', 'trend_mixed', 'trend_group'], sql + )) diff --git a/lbry/schema/result.py b/lbry/schema/result.py index 00e69daea..742bc44d6 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -24,19 +24,22 @@ def set_reference(reference, claim_hash, rows): class Censor: - __slots__ = 'level', 'censored' + SEARCH = 1 + RESOLVE = 2 - def __init__(self, level=1): - self.level = level + __slots__ = 'censor_type', 'censored' + + def __init__(self, censor_type): + self.censor_type = censor_type self.censored = {} def apply(self, rows): return [row for row in rows if not self.censor(row)] def censor(self, row) -> bool: - was_censored = row['censor_type'] >= self.level + was_censored = (row['censor_type'] or 0) >= self.censor_type if was_censored: - censoring_channel_hash = row['censor_owner_hash'] + censoring_channel_hash = row['censoring_channel_hash'] self.censored.setdefault(censoring_channel_hash, set()) self.censored[censoring_channel_hash].add(row['tx_hash']) return was_censored @@ -110,10 +113,10 @@ class Outputs: 'expiration_height': claim.expiration_height, 'effective_amount': claim.effective_amount, 'support_amount': claim.support_amount, - 'trending_group': claim.trending_group, - 'trending_mixed': claim.trending_mixed, - 'trending_local': claim.trending_local, - 'trending_global': claim.trending_global, + 'trend_group': claim.trend_group, + 'trend_mixed': claim.trend_mixed, + 'trend_local': claim.trend_local, + 'trend_global': claim.trend_global, } if claim.HasField('channel'): txo.channel = tx_map[claim.channel.tx_hash].outputs[claim.channel.nout] diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 9e6d3f257..1fd1e65a5 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -80,7 +80,6 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): await self.generate(101, wait=False) self.db = await self.make_db(self.chain) - self.chain.ledger.conf.spv_address_filters = False self.sync = BlockchainSync(self.chain, self.db) await self.sync.start() self.addCleanup(self.sync.stop) @@ -1150,25 +1149,25 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): claim1 = await self.get_claim(await self.create_claim(name="one")) claim2 = await self.get_claim(await self.create_claim(name="two")) await self.generate(1) - results = await self.db.search_claims(order_by=["trending_group", "trending_mixed"]) - self.assertEqual(0, results.rows[0].meta['trending_mixed']) - self.assertEqual(0, results.rows[1].meta['trending_mixed']) - self.assertEqual(0, results.rows[0].meta['trending_group']) - self.assertEqual(0, results.rows[1].meta['trending_group']) + results = await self.db.search_claims(order_by=["trend_group", "trend_mixed"]) + self.assertEqual(0, results.rows[0].meta['trend_mixed']) + self.assertEqual(0, results.rows[1].meta['trend_mixed']) + self.assertEqual(0, results.rows[0].meta['trend_group']) + self.assertEqual(0, results.rows[1].meta['trend_group']) await self.support_claim(claim1, '1.0') await self.generate(1) - results = await self.db.search_claims(order_by=["trending_group", "trending_mixed"]) - self.assertEqual(57600000000, results.rows[0].meta['trending_mixed']) - self.assertEqual(0, results.rows[1].meta['trending_mixed']) - self.assertEqual(4, results.rows[0].meta['trending_group']) - self.assertEqual(0, results.rows[1].meta['trending_group']) + results = await self.db.search_claims(order_by=["trend_group", "trend_mixed"]) + self.assertEqual(57600000000, results.rows[0].meta['trend_mixed']) + self.assertEqual(0, results.rows[1].meta['trend_mixed']) + self.assertEqual(4, results.rows[0].meta['trend_group']) + self.assertEqual(0, results.rows[1].meta['trend_group']) await self.support_claim(claim2, '1.0') await self.generate(1) - results = await self.db.search_claims(order_by=["trending_group", "trending_mixed"]) - self.assertEqual(57600000000, results.rows[0].meta['trending_mixed']) - self.assertEqual(57500000000, results.rows[1].meta['trending_mixed']) - self.assertEqual(4, results.rows[0].meta['trending_group']) - self.assertEqual(4, results.rows[1].meta['trending_group']) + results = await self.db.search_claims(order_by=["trend_group", "trend_mixed"]) + self.assertEqual(57600000000, results.rows[0].meta['trend_mixed']) + self.assertEqual(57500000000, results.rows[1].meta['trend_mixed']) + self.assertEqual(4, results.rows[0].meta['trend_group']) + self.assertEqual(4, results.rows[1].meta['trend_group']) class TestClaimtrieSync(SyncingBlockchainTestCase): @@ -1390,7 +1389,7 @@ class TestClaimtrieSync(SyncingBlockchainTestCase): moderator_chan = await self.get_claim( await self.create_claim(is_channel=True, name="@filters")) await self.create_claim(sign=moderator_chan, name="blocking_bad", repost=bad_content.claim_id) - self.sync.filtering_channel_hashes.add(moderator_chan.claim_hash) + self.sync.search_censor_channel_hashes.add(moderator_chan.claim_hash) await self.generate(1) results = await self.db.search_claims(channel="@some_channel") self.assertEqual(len(results.rows), 1) @@ -1409,7 +1408,7 @@ class TestClaimtrieSync(SyncingBlockchainTestCase): result = await self.db.resolve([bad_content.permanent_url]) self.assertEqual(bad_content.claim_id, result[bad_content.permanent_url].claim_id) # blocklist, now applied to resolve as well - self.sync.blocking_channel_hashes.add(moderator_chan.claim_hash) + self.sync.resolve_censor_channel_hashes.add(moderator_chan.claim_hash) await self.generate(1) result = await self.db.resolve([bad_content.permanent_url]) self.assertIsInstance(result[bad_content.permanent_url], ResolveCensoredError)