From 76b3bfe975a8dd1dd9921d60ec7f29adf9be081d Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor.shyba@gmail.com> Date: Mon, 19 Oct 2020 10:47:26 -0300 Subject: [PATCH] add simple trending --- lbry/blockchain/sync/synchronizer.py | 5 +-- lbry/db/queries/search.py | 11 ++++-- lbry/db/queries/txio.py | 3 +- lbry/db/tables.py | 14 +++++--- lbry/db/trending.py | 36 +++++++++++++++++++ .../integration/blockchain/test_blockchain.py | 25 +++++++++++++ 6 files changed, 83 insertions(+), 11 deletions(-) create mode 100644 lbry/db/trending.py diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 8bb390235..f6fc5f7b7 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -4,7 +4,7 @@ import logging from typing import Optional, Tuple, Set, List, Coroutine from concurrent.futures import ThreadPoolExecutor -from lbry.db import Database +from lbry.db import Database, trending from lbry.db import queries as q from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.db.query_context import Event, Progress @@ -348,7 +348,8 @@ class BlockchainSync(Sync): await self.db.run(claim_phase.update_channel_stats, blocks, initial_sync) async def sync_trends(self): - pass + ending_height = await self.chain.db.get_best_height() + await self.db.run(trending.calculate_trending, ending_height) async def advance(self): blocks_added = await self.sync_blocks() diff --git a/lbry/db/queries/search.py b/lbry/db/queries/search.py index 37f216c3e..4a8b136b8 100644 --- a/lbry/db/queries/search.py +++ b/lbry/db/queries/search.py @@ -14,7 +14,7 @@ from lbry.blockchain.transaction import Output from ..utils import query from ..query_context import context -from ..tables import TX, TXO, Claim, Support +from ..tables import TX, TXO, Claim, Support, Trending from ..constants import ( TXO_TYPES, STREAM_TYPES, ATTRIBUTE_ARRAY_MAX_LENGTH, SEARCH_INTEGER_PARAMS, SEARCH_ORDER_FIELDS @@ -123,6 +123,10 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ channel_claim.c.short_url.isnot(None), channel_claim.c.short_url + '/' + Claim.c.short_url )]).label('canonical_url'), + func.coalesce(Trending.c.trending_local, 0).label('trending_local'), + func.coalesce(Trending.c.trending_mixed, 0).label('trending_mixed'), + func.coalesce(Trending.c.trending_global, 0).label('trending_global'), + func.coalesce(Trending.c.trending_group, 0).label('trending_group') ] @@ -145,8 +149,9 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: nulls_last = '' if column == 'release_time': nulls_last = ' NULLs LAST' + table = "trend" if column.startswith('trend') else "claim" sql_order_by.append( - f"claim.{column} ASC{nulls_last}" if is_asc else f"claim.{column} DESC{nulls_last}" + f"{table}.{column} ASC{nulls_last}" if is_asc else f"{table}.{column} DESC{nulls_last}" ) constraints['order_by'] = sql_order_by @@ -269,7 +274,7 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: [Claim, TXO], select(*cols) .select_from( - Claim.join(TXO).join(TX) + Claim.join(TXO).join(TX).join(Trending, Trending.c.claim_hash == Claim.c.claim_hash, isouter=True) .join(channel_claim, Claim.c.channel_hash == channel_claim.c.claim_hash, isouter=True) ), **constraints ) diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index 2b7d422c6..7e5e7f911 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -388,7 +388,8 @@ META_ATTRS = ( 'activation_height', 'takeover_height', 'creation_height', 'staked_amount', 'short_url', 'canonical_url', 'staked_support_amount', 'staked_support_count', 'signed_claim_count', 'signed_support_count', 'is_signature_valid', - 'reposted_count', 'expiration_height' + 'reposted_count', 'expiration_height', 'trending_group', 'trending_mixed', + 'trending_local', 'trending_global' ) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 9569b9b1e..5d7860740 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -258,11 +258,6 @@ Claim = Table( # claims which are inside channels Column('channel_hash', LargeBinary, nullable=True), Column('is_signature_valid', Boolean, nullable=True), - - Column('trending_group', BigInteger, server_default='0'), - Column('trending_mixed', BigInteger, server_default='0'), - Column('trending_local', BigInteger, server_default='0'), - Column('trending_global', BigInteger, server_default='0'), ) Tag = Table( @@ -330,3 +325,12 @@ Stake = Table( Column('stake_count', Integer), Column('stake_unique', Integer), ) + +Trending = Table( + 'trend', metadata, + Column('claim_hash', LargeBinary, primary_key=True), + Column('trending_group', BigInteger, server_default='0'), + Column('trending_mixed', BigInteger, server_default='0'), + Column('trending_local', BigInteger, server_default='0'), + Column('trending_global', BigInteger, server_default='0'), +) diff --git a/lbry/db/trending.py b/lbry/db/trending.py new file mode 100644 index 000000000..9e524eb65 --- /dev/null +++ b/lbry/db/trending.py @@ -0,0 +1,36 @@ +from sqlalchemy import select +from sqlalchemy.sql import func + +from lbry.db.query_context import event_emitter, ProgressContext +from lbry.db.tables import Trending, Support, Claim +WINDOW = 576 # a day + + +@event_emitter("blockchain.sync.trending.update", "steps") +def calculate_trending(height, p: ProgressContext): + # zero all as decay + with p.ctx.engine.begin() as ctx: + _trending(height, ctx) + + +def _trending(height, ctx): + ctx.execute(Trending.delete()) + start = height - WINDOW + trending = func.sum(Support.c.amount * (WINDOW - (height - Support.c.height))) + sql = select([Claim.c.claim_hash, trending, trending, trending, 4]).where( + (Support.c.claim_hash == Claim.c.claim_hash) + & (Support.c.height <= height) + & (Support.c.height >= start)).group_by(Claim.c.claim_hash) + ctx.execute(Trending.insert().from_select( + ['claim_hash', 'trending_global', 'trending_local', 'trending_mixed', 'trending_group'], sql)) + + +if __name__ == "__main__": + from sqlalchemy import create_engine + import time + start = time.time() + engine = create_engine("postgresql:///lbry") + for height in range(830000, 840000, 1000): + start = time.time() + _trending(height, engine) + print(f"{height} took {time.time() - start} seconds") diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 019ed3c1a..d99799914 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -1138,6 +1138,31 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): results = await self.db.search_claims(channel="@404") self.assertEqual([], results.rows) + async def test_simple_support_trending(self): + claim1 = await self.get_claim(await self.create_claim(name="one")) + claim2 = await self.get_claim(await self.create_claim(name="two")) + await self.generate(1) + results = await self.db.search_claims(order_by=["trending_group", "trending_mixed"]) + self.assertEqual(0, results.rows[0].meta['trending_mixed']) + self.assertEqual(0, results.rows[1].meta['trending_mixed']) + self.assertEqual(0, results.rows[0].meta['trending_group']) + self.assertEqual(0, results.rows[1].meta['trending_group']) + await self.support_claim(claim1, '1.0') + await self.generate(1) + results = await self.db.search_claims(order_by=["trending_group", "trending_mixed"]) + self.assertEqual(57600000000, results.rows[0].meta['trending_mixed']) + self.assertEqual(0, results.rows[1].meta['trending_mixed']) + self.assertEqual(4, results.rows[0].meta['trending_group']) + self.assertEqual(0, results.rows[1].meta['trending_group']) + await self.support_claim(claim2, '1.0') + await self.generate(1) + results = await self.db.search_claims(order_by=["trending_group", "trending_mixed"]) + self.assertEqual(57600000000, results.rows[0].meta['trending_mixed']) + self.assertEqual(57500000000, results.rows[1].meta['trending_mixed']) + self.assertEqual(4, results.rows[0].meta['trending_group']) + self.assertEqual(4, results.rows[1].meta['trending_group']) + + class TestClaimtrieSync(SyncingBlockchainTestCase):