This commit is contained in:
Victor Shyba 2020-11-05 22:12:41 -03:00 committed by Lex Berezhny
parent d3ec200e46
commit 4e0728572d
6 changed files with 25 additions and 36 deletions

View file

@ -5,7 +5,7 @@ from sqlalchemy import case, func, desc, text
from sqlalchemy.future import select from sqlalchemy.future import select
from lbry.db.queries.txio import ( from lbry.db.queries.txio import (
minimum_txo_columns, row_to_txo, MINIMIUM_TXO_COLUMNS, row_to_txo,
where_unspent_txos, where_claims_with_changed_supports, where_unspent_txos, where_claims_with_changed_supports,
count_unspent_txos, where_channels_with_changed_content, count_unspent_txos, where_channels_with_changed_content,
where_abandoned_claims, count_channels_with_changed_content, where_abandoned_claims, count_channels_with_changed_content,
@ -31,29 +31,29 @@ def channel_content_count_calc(signable):
) )
support = TXO.alias('support') SUPPORT = TXO.alias('support')
def staked_support_aggregation(aggregate): def staked_support_aggregation(aggregate):
return ( return (
select(aggregate).where( select(aggregate).where(
(support.c.txo_type == TXO_TYPES['support']) & (SUPPORT.c.txo_type == TXO_TYPES['support']) &
(support.c.spent_height == 0) (SUPPORT.c.spent_height == 0)
).scalar_subquery() ).scalar_subquery()
) )
def staked_support_amount_calc(other): def staked_support_amount_calc(other):
return ( return (
staked_support_aggregation(func.coalesce(func.sum(support.c.amount), 0)) staked_support_aggregation(func.coalesce(func.sum(SUPPORT.c.amount), 0))
.where(support.c.claim_hash == other.c.claim_hash) .where(SUPPORT.c.claim_hash == other.c.claim_hash)
) )
def staked_support_count_calc(other): def staked_support_count_calc(other):
return ( return (
staked_support_aggregation(func.coalesce(func.count('*'), 0)) staked_support_aggregation(func.coalesce(func.count('*'), 0))
.where(support.c.claim_hash == other.c.claim_hash) .where(SUPPORT.c.claim_hash == other.c.claim_hash)
) )
@ -82,7 +82,7 @@ def select_claims_for_saving(
): ):
channel_txo = TXO.alias('channel_txo') channel_txo = TXO.alias('channel_txo')
return select( return select(
*minimum_txo_columns, TXO.c.claim_hash, *MINIMIUM_TXO_COLUMNS, TXO.c.claim_hash,
staked_support_amount_calc(TXO).label('staked_support_amount'), staked_support_amount_calc(TXO).label('staked_support_amount'),
staked_support_count_calc(TXO).label('staked_support_count'), staked_support_count_calc(TXO).label('staked_support_count'),
reposted_claim_count_calc(TXO).label('reposted_count'), reposted_claim_count_calc(TXO).label('reposted_count'),

View file

@ -9,7 +9,7 @@ from lbry.db.query_context import ProgressContext, event_emitter
from lbry.db.queries import row_to_txo from lbry.db.queries import row_to_txo
from lbry.db.constants import TXO_TYPES from lbry.db.constants import TXO_TYPES
from lbry.db.queries.txio import ( from lbry.db.queries.txio import (
minimum_txo_columns, MINIMIUM_TXO_COLUMNS,
where_unspent_txos, where_abandoned_supports, where_unspent_txos, where_abandoned_supports,
count_unspent_txos, count_unspent_txos,
) )
@ -35,7 +35,7 @@ def supports_insert(
) )
channel_txo = TXO.alias('channel_txo') channel_txo = TXO.alias('channel_txo')
select_supports = select( select_supports = select(
*minimum_txo_columns, TXO.c.claim_hash, *MINIMIUM_TXO_COLUMNS, TXO.c.claim_hash,
TXO.c.signature, TXO.c.signature_digest, TXO.c.signature, TXO.c.signature_digest,
case([( case([(
TXO.c.channel_hash.isnot(None), TXO.c.channel_hash.isnot(None),

View file

@ -102,7 +102,7 @@ def search_support_count(**constraints) -> int:
return count[0]['total'] or 0 return count[0]['total'] or 0
channel_claim = Claim.alias('channel') CHANNEL_CLAIM = Claim.alias('channel')
BASE_SELECT_CLAIM_COLUMNS = [Claim.c.claim_hash.distinct()] + BASE_SELECT_TXO_COLUMNS + [ BASE_SELECT_CLAIM_COLUMNS = [Claim.c.claim_hash.distinct()] + BASE_SELECT_TXO_COLUMNS + [
Claim.c.activation_height, Claim.c.activation_height,
Claim.c.takeover_height, Claim.c.takeover_height,
@ -120,8 +120,8 @@ BASE_SELECT_CLAIM_COLUMNS = [Claim.c.claim_hash.distinct()] + BASE_SELECT_TXO_CO
Claim.c.staked_support_count, Claim.c.staked_support_count,
Claim.c.is_signature_valid, Claim.c.is_signature_valid,
case([( case([(
channel_claim.c.short_url.isnot(None), CHANNEL_CLAIM.c.short_url.isnot(None),
channel_claim.c.short_url + '/' + Claim.c.short_url CHANNEL_CLAIM.c.short_url + '/' + Claim.c.short_url
)]).label('canonical_url'), )]).label('canonical_url'),
func.coalesce(Trending.c.trending_local, 0).label('trending_local'), func.coalesce(Trending.c.trending_local, 0).label('trending_local'),
func.coalesce(Trending.c.trending_mixed, 0).label('trending_mixed'), func.coalesce(Trending.c.trending_mixed, 0).label('trending_mixed'),
@ -277,7 +277,7 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select:
select(*cols) select(*cols)
.select_from( .select_from(
Claim.join(TXO).join(TX).join(Trending, Trending.c.claim_hash == Claim.c.claim_hash, isouter=True) Claim.join(TXO).join(TX).join(Trending, Trending.c.claim_hash == Claim.c.claim_hash, isouter=True)
.join(channel_claim, Claim.c.channel_hash == channel_claim.c.claim_hash, isouter=True) .join(CHANNEL_CLAIM, Claim.c.channel_hash == CHANNEL_CLAIM.c.claim_hash, isouter=True)
.join(ClaimFilter, .join(ClaimFilter,
(ClaimFilter.c.claim_hash == Claim.c.claim_hash) | (ClaimFilter.c.claim_hash == Claim.c.claim_hash) |
(ClaimFilter.c.claim_hash == Claim.c.reposted_claim_hash) | (ClaimFilter.c.claim_hash == Claim.c.reposted_claim_hash) |

View file

@ -20,7 +20,7 @@ from ..constants import TXO_TYPES, CLAIM_TYPE_CODES, MAX_QUERY_VARIABLES
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
minimum_txo_columns = ( MINIMIUM_TXO_COLUMNS = (
TXO.c.amount, TXO.c.position.label('txo_position'), TXO.c.amount, TXO.c.position.label('txo_position'),
TX.c.tx_hash, TX.c.height, TX.c.timestamp, TX.c.tx_hash, TX.c.height, TX.c.timestamp,
func.substr(TX.c.raw, TXO.c.script_offset + 1, TXO.c.script_length).label('src'), func.substr(TX.c.raw, TXO.c.script_offset + 1, TXO.c.script_length).label('src'),

View file

@ -14,23 +14,12 @@ def calculate_trending(height, p: ProgressContext):
def _trending(height, ctx): def _trending(height, ctx):
ctx.execute(Trending.delete()) ctx.execute(Trending.delete())
start = height - WINDOW start = height - WINDOW
trending = func.sum(Support.c.amount * (WINDOW - (height - Support.c.height))) trending = func.sum(Support.c.amount * (WINDOW - (height - Support.c.height)))
sql = select([Claim.c.claim_hash, trending, trending, trending, 4]).where( sql = select([Claim.c.claim_hash, trending, trending, trending, 4]).where(
(Support.c.claim_hash == Claim.c.claim_hash) (Support.c.claim_hash == Claim.c.claim_hash)
& (Support.c.height <= height) & (Support.c.height <= height)
& (Support.c.height >= start)).group_by(Claim.c.claim_hash) & (Support.c.height >= start)).group_by(Claim.c.claim_hash)
ctx.execute(Trending.insert().from_select( ctx.execute(Trending.insert().from_select(
['claim_hash', 'trending_global', 'trending_local', 'trending_mixed', 'trending_group'], sql)) ['claim_hash', 'trending_global', 'trending_local', 'trending_mixed', 'trending_group'], sql))
if __name__ == "__main__":
from sqlalchemy import create_engine
import time
start = time.time()
engine = create_engine("postgresql:///lbry")
for height in range(830000, 840000, 1000):
start = time.time()
_trending(height, engine)
print(f"{height} took {time.time() - start} seconds")

View file

@ -832,7 +832,7 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
if support_channel is not None: if support_channel is not None:
r, = await self.db.search_supports() r, = await self.db.search_supports()
self.assertEqual(r.meta['is_signature_valid'], support_valid) self.assertEqual(r.meta['is_signature_valid'], support_valid)
self.assertEqual(r.support.signing_channel_id, support_channel.claim_id) self.assertEqual(r.SUPPORT.signing_channel_id, support_channel.claim_id)
async def test_claim_and_support_signing(self): async def test_claim_and_support_signing(self):
search = self.db.search_claims search = self.db.search_claims