diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index d1d2d8b08..f0453e5de 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1674,6 +1674,10 @@ class Daemon(metaclass=JSONRPCServerType): Usage: claim_search [ | --name=] [--claim_id=] [--txid= --nout=] [--channel_id=] [--channel_name=] [--is_controlling] + [--order_by=...] + [--published_since=] [--released_since=] + [--block_height=] [--after_block_height=] + [--before_block_height=] [--any_tags=...] [--all_tags=...] [--not_tags=...] [--any_languages=...] [--all_languages=...] [--not_languages=...] @@ -1689,6 +1693,20 @@ class Daemon(metaclass=JSONRPCServerType): --channel_id= : (str) limit search to specific channel claim id (returns stream claims) --channel_name= : (str) limit search to specific channel name (returns stream claims) --is_controlling : (bool) limit to controlling claims for their respective name + --order_by= : (str) field to order by, default is descending order, to do an + ascending order prepend ^ to the field name, eg. '^amount' + available fields: 'name', 'block_height', 'release_time', + 'publish_time', 'amount', 'effective_amount', 'support_amount', + 'trending_amount' + --published_since= : (int) limit to claims confirmed into blocks on or after + this UTC timestamp + --released_since= : (int) limit to claims self-described as having been + released to the public on or after this UTC + timestamp, when claim does not provide + a release time the block time is used instead + --block_height= : (int) limit to claims at specific block height + --after_block_height= : (int) limit to claims after specific block height + --before_block_height= : (int) limit to claims before specific block height --any_tags= : (list) find claims containing any of the tags --all_tags= : (list) find claims containing every tag --not_tags= : (list) find claims not containing any of these tags diff --git a/lbrynet/extras/daemon/json_response_encoder.py b/lbrynet/extras/daemon/json_response_encoder.py index 1289d63af..2089eaf2d 100644 --- a/lbrynet/extras/daemon/json_response_encoder.py +++ b/lbrynet/extras/daemon/json_response_encoder.py @@ -138,7 +138,7 @@ class JSONResponseEncoder(JSONEncoder): 'hex': hexlify(tx.raw).decode(), } - def encode_output(self, txo, check_signature=True): + def encode_output(self, txo, check_signature=True, include_meta=True): tx_height = txo.tx_ref.height best_height = self.ledger.headers.height output = { @@ -171,9 +171,10 @@ class JSONResponseEncoder(JSONEncoder): 'name': txo.claim_name, 'normalized': txo.normalized_name, 'claim_id': txo.claim_id, - 'permanent_url': txo.permanent_url, - 'meta': self.encode_claim_meta(txo.meta) + 'permanent_url': txo.permanent_url }) + if include_meta: + output['meta'] = self.encode_claim_meta(txo.meta) if txo.script.is_claim_name or txo.script.is_update_claim: try: output['value'] = txo.claim @@ -181,7 +182,7 @@ class JSONResponseEncoder(JSONEncoder): if self.include_protobuf: output['protobuf'] = hexlify(txo.claim.to_bytes()) if txo.channel is not None: - output['signing_channel'] = txo.channel + output['signing_channel'] = self.encode_output(txo.channel, include_meta=False) if check_signature and txo.claim.is_signed: output['is_channel_signature_valid'] = False if txo.channel: @@ -191,10 +192,10 @@ class JSONResponseEncoder(JSONEncoder): return output def encode_claim_meta(self, meta): - if isinstance(meta.get('effective_amount'), int): - meta['effective_amount'] = dewies_to_lbc(meta['effective_amount']) - if isinstance(meta.get('trending_amount'), int): - meta['trending_amount'] = dewies_to_lbc(meta['trending_amount']) + for key, value in meta.items(): + if key.endswith('_amount') or key.startswith('trending_'): + if isinstance(value, int): + meta[key] = dewies_to_lbc(value) return meta def encode_input(self, txi): diff --git a/lbrynet/schema/result.py b/lbrynet/schema/result.py index fa2faa805..41c2460c2 100644 --- a/lbrynet/schema/result.py +++ b/lbrynet/schema/result.py @@ -23,7 +23,14 @@ class Outputs: 'is_controlling': message.is_controlling, 'activation_height': message.activation_height, 'effective_amount': message.effective_amount, - 'trending_amount': message.trending_amount, + 'support_amount': message.support_amount, + 'claims_in_channel': message.claims_in_channel, + 'trending_daily': message.trending_daily, + 'trending_day_one': message.trending_day_one, + 'trending_day_two': message.trending_day_two, + 'trending_weekly': message.trending_weekly, + 'trending_week_one': message.trending_week_one, + 'trending_week_two': message.trending_week_two, } try: if txo.claim.is_channel: @@ -89,14 +96,17 @@ class Outputs: txo_message.claim.is_controlling = bool(txo['is_controlling']) txo_message.claim.activation_height = txo['activation_height'] txo_message.claim.effective_amount = txo['effective_amount'] - txo_message.claim.trending_amount = txo['trending_amount'] + txo_message.claim.support_amount = txo['support_amount'] txo_message.claim.claims_in_channel = txo['claims_in_channel'] + txo_message.claim.trending_daily = txo['trending_daily'] + txo_message.claim.trending_day_one = txo['trending_day_one'] + txo_message.claim.trending_day_two = txo['trending_day_two'] + txo_message.claim.trending_weekly = txo['trending_weekly'] + txo_message.claim.trending_week_one = txo['trending_week_one'] + txo_message.claim.trending_week_two = txo['trending_week_two'] if txo['channel_txo_hash']: channel = txo_message.claim.channel channel.height = txo['channel_height'] channel.tx_hash = txo['channel_txo_hash'][:32] channel.nout, = struct.unpack(' {height-self.TRENDING_BLOCKS}), 0 + trending_day_one = COALESCE( + (SELECT SUM(amount) FROM support WHERE claim_hash=claim.claim_hash + AND height >= {day_ago}), 0 + ), + trending_day_two = COALESCE( + (SELECT SUM(amount) FROM support WHERE claim_hash=claim.claim_hash + AND {day_ago} > height and height >= {two_day_ago} + ), 0 + ), + trending_week_one = COALESCE( + (SELECT SUM(amount) FROM support WHERE claim_hash=claim.claim_hash + AND height >= {week_ago} + ), 0 + ), + trending_week_two = COALESCE( + (SELECT SUM(amount) FROM support WHERE claim_hash=claim.claim_hash + AND {week_ago} > height and height >= {two_week_ago} + ), 0 ) """) + self.execute(f""" + UPDATE claim SET + trending_daily = trending_day_one - trending_day_two, + trending_weekly = trending_week_one - trending_week_two + """) def _update_support_amount(self, claim_hashes): if claim_hashes: @@ -320,106 +362,92 @@ class SQLDB: WHERE claim_hash IN ({','.join('?' for _ in claim_hashes)}) """, claim_hashes) - def _make_claims_without_competition_become_controlling(self, height, changed, timer): - if not changed: - return + def _update_effective_amount(self, height, claim_hashes=None): + self.execute( + f"UPDATE claim SET effective_amount = amount + support_amount " + f"WHERE activation_height = {height}" + ) + if claim_hashes: + self.execute( + f"UPDATE claim SET effective_amount = amount + support_amount " + f"WHERE activation_height < {height} " + f" AND claim_hash IN ({','.join('?' for _ in claim_hashes)})", + claim_hashes + ) - t = timer.add_timer('insert into claimtrie') - t.start() - self.execute(f""" - INSERT INTO claimtrie (normalized, claim_hash, last_take_over_height) - SELECT claim.normalized, claim.claim_hash, {height} FROM claim - WHERE normalized IN ({','.join('?' for _ in changed)}) AND - normalized NOT IN (SELECT normalized FROM claimtrie) - GROUP BY normalized HAVING COUNT(*) = 1 - """, list(changed)) - t.stop() - - t = timer.add_timer('set activation_height to current height for default winner') - t.start() - self.execute(f""" - UPDATE claim SET activation_height = {height} - WHERE (activation_height IS NULL OR activation_height > {height}) - AND EXISTS(SELECT * FROM claimtrie WHERE claimtrie.claim_hash=claim.claim_hash) - """) - t.stop() - - t = timer.add_timer('calculate activation_height for contentious claim name') - t.start() + def _calculate_activation_height(self, height): + last_take_over_height = f"""COALESCE( + (SELECT last_take_over_height FROM claimtrie + WHERE claimtrie.normalized=claim.normalized), + {height} + ) + """ self.execute(f""" UPDATE claim SET activation_height = - {height} + - min(4032, cast(({height} - (SELECT last_take_over_height FROM claimtrie - WHERE claimtrie.normalized=claim.normalized)) / 32 AS INT)) + {height} + min(4032, cast(({height} - {last_take_over_height}) / 32 AS INT)) WHERE activation_height IS NULL """) - t.stop() - def _update_effective_amount(self, constraints): - where, values = constraints_to_sql(constraints) - self.execute("UPDATE claim SET effective_amount = amount + support_amount WHERE "+where, values) - - def _perform_overtake(self, height, constraints): - where, values = constraints_to_sql(constraints) + def _perform_overtake(self, height, changed): + constraint = f"normalized IN ({','.join('?' for _ in changed)}) OR " if changed else "" overtakes = self.execute(f""" - SELECT winner.normalized, winner.claim_hash FROM claimtrie JOIN ( - SELECT normalized, claim_hash, MAX(effective_amount) - FROM claim WHERE normalized IN (SELECT normalized FROM claim WHERE {where}) - GROUP BY normalized - ) AS winner USING (normalized) WHERE claimtrie.claim_hash <> winner.claim_hash - """, values) + SELECT winner.normalized, winner.claim_hash, claimtrie.claim_hash AS current_winner FROM ( + SELECT normalized, claim_hash FROM claim + WHERE {constraint} + normalized IN (SELECT normalized FROM claim WHERE activation_height={height}) + ORDER BY effective_amount, height, tx_position DESC + ) AS winner LEFT JOIN claimtrie USING (normalized) + GROUP BY winner.normalized + HAVING current_winner IS NULL + OR current_winner <> winner.claim_hash + """, changed) for overtake in overtakes: + if overtake['current_winner']: + self.execute( + f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} " + f"WHERE normalized = ?", + (sqlite3.Binary(overtake['claim_hash']), overtake['normalized']) + ) + else: + self.execute( + f"INSERT INTO claimtrie (claim_hash, normalized, last_take_over_height) " + f"VALUES (?, ?, {height})", + (sqlite3.Binary(overtake['claim_hash']), overtake['normalized']) + ) self.execute( f"UPDATE claim SET activation_height = {height} WHERE normalized = ? " f"AND (activation_height IS NULL OR activation_height > {height})", (overtake['normalized'],) ) - self.execute( - f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} " - f"WHERE normalized = ?", - (sqlite3.Binary(overtake['claim_hash']), overtake['normalized']) - ) def _copy(self, height): if height > 50: self.execute(f"DROP TABLE claimtrie{height-50}") self.execute(f"CREATE TABLE claimtrie{height} AS SELECT * FROM claimtrie") - def update_claimtrie(self, height, changed_names, recalc_claims, timer): - binary_recalc_claims = [sqlite3.Binary(claim_hash) for claim_hash in recalc_claims] + def update_claimtrie(self, height, changed_names, amount_affected_claim_hashes, timer): + binary_claim_hashes = [ + sqlite3.Binary(claim_hash) for claim_hash in amount_affected_claim_hashes + ] r = timer.run - r(self._make_claims_without_competition_become_controlling, height, changed_names, forward_timer=True) - r(self._update_support_amount, binary_recalc_claims) + r(self._calculate_activation_height, height) + r(self._update_support_amount, binary_claim_hashes) + + r(self._update_effective_amount, height, binary_claim_hashes) + r(self._perform_overtake, height, list(changed_names)) + + r(self._update_effective_amount, height) + r(self._perform_overtake, height, []) + if not self.main.first_sync: r(self._update_trending_amount, height) - if binary_recalc_claims: - claims_filter = { - '__or': { - 'activation_height': height, - '__and': { - 'claim_hash__in': binary_recalc_claims, - 'activation_height__lte': height - } - } - } - else: - claims_filter = { - 'activation_height': height - } - - r(self._update_effective_amount, claims_filter) - r(self._perform_overtake, height, claims_filter) - r(self._update_effective_amount, {'activation_height': height}) - r(self._perform_overtake, height, {'activation_height': height}) - #r(self._copy, height) - - def advance_txs(self, height, all_txs, timer): + def advance_txs(self, height, all_txs, header, timer): insert_claims = set() update_claims = set() - delete_claims = set() - changed_names = set() - recalc_claims = set() + delete_claim_hashes = set() + deleted_and_inserted_names = set() + amount_affected_claim_hashes = set() insert_supports = set() delete_supports = set() body_timer = timer.add_timer('body') @@ -431,34 +459,69 @@ class SQLDB: self.split_inputs_into_claims_supports_and_other, tx.inputs ) body_timer.start() - delete_claims.update(spent_claims.keys()) - changed_names.update(spent_claims.values()) - recalc_claims.update(spent_supports.values()) + delete_claim_hashes.update(spent_claims.keys()) + deleted_and_inserted_names.update(spent_claims.values()) + amount_affected_claim_hashes.update(spent_supports.values()) delete_supports.update(spent_supports) for output in tx.outputs: if output.is_support: insert_supports.add(output) - recalc_claims.add(output.claim_hash) + amount_affected_claim_hashes.add(output.claim_hash) elif output.script.is_claim_name: insert_claims.add(output) - recalc_claims.add(output.claim_hash) - changed_names.add(output.normalized_name) + try: + deleted_and_inserted_names.add(output.normalized_name) + except: + self.logger.exception( + f"Could not decode claim name for claim_id: {output.claim_id}, " + f"txid: {output.tx_ref.id}, nout: {output.position}.") + print(output.script.values['claim_name']) + continue elif output.script.is_update_claim: claim_hash = output.claim_hash - if claim_hash in delete_claims: - delete_claims.remove(claim_hash) + if claim_hash in delete_claim_hashes: + delete_claim_hashes.remove(claim_hash) update_claims.add(output) - recalc_claims.add(claim_hash) + amount_affected_claim_hashes.add(claim_hash) body_timer.stop() r = timer.run - r(self.delete_claims, delete_claims) + r(self.delete_claims, delete_claim_hashes) r(self.delete_supports, delete_supports) - r(self.insert_claims, insert_claims) - r(self.update_claims, update_claims) + r(self.insert_claims, insert_claims, header) + r(self.update_claims, update_claims, header) r(self.insert_supports, insert_supports) - r(self.update_claimtrie, height, changed_names, recalc_claims, forward_timer=True) + r(self.update_claimtrie, height, + deleted_and_inserted_names, + amount_affected_claim_hashes, + forward_timer=True) def get_claims(self, cols, **constraints): + if 'order_by' in constraints: + sql_order_by = [] + for order_by in constraints['order_by']: + is_asc = order_by.startswith('^') + column = order_by[1:] if is_asc else order_by + if column not in self.ORDER_FIELDS: + raise NameError(f'{column} is not a valid order_by field') + elif column == 'name': + column = 'normalized' + sql_order_by.append( + f"claim.{column} ASC" if is_asc else f"claim.{column} DESC" + ) + constraints['order_by'] = sql_order_by + + ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'} + for constraint in self.INTEGER_PARAMS: + if constraint in constraints: + value = constraints.pop(constraint) + postfix = '' + if isinstance(value, str): + if len(value) >= 2 and value[:2] in ops: + postfix, value = ops[value[:2]], int(value[2:]) + elif len(value) >= 1 and value[0] in ops: + postfix, value = ops[value[0]], int(value[1:]) + constraints[f'claim.{constraint}{postfix}'] = value + if 'is_controlling' in constraints: if {'sequence', 'amount_order'}.isdisjoint(constraints): constraints['claimtrie.claim_hash__is_not_null'] = '' @@ -503,13 +566,24 @@ class SQLDB: _apply_constraints_for_array_attributes(constraints, 'language') _apply_constraints_for_array_attributes(constraints, 'location') - return self.db.execute(*query( - f""" + try: + return self.db.execute(*query( + f""" + SELECT {cols} FROM claim + LEFT JOIN claimtrie USING (claim_hash) + LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash) + """, **constraints + )).fetchall() + except: + self.logger.exception('Failed to execute claim search query:') + print(query( + f""" SELECT {cols} FROM claim LEFT JOIN claimtrie USING (claim_hash) LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash) """, **constraints - )).fetchall() + )) + raise def get_claims_count(self, **constraints): constraints.pop('offset', None) @@ -523,26 +597,36 @@ class SQLDB: """ claimtrie.claim_hash as is_controlling, claim.claim_hash, claim.txo_hash, claim.height, - claim.activation_height, claim.effective_amount, claim.trending_amount, - channel.txo_hash as channel_txo_hash, channel.height as channel_height, - channel.activation_height as channel_activation_height, - channel.effective_amount as channel_effective_amount, - channel.trending_amount as channel_trending_amount, + claim.activation_height, claim.effective_amount, claim.support_amount, + claim.trending_daily, claim.trending_day_one, claim.trending_day_two, + claim.trending_weekly, claim.trending_week_one, claim.trending_week_two, CASE WHEN claim.is_channel=1 THEN ( SELECT COUNT(*) FROM claim as claim_in_channel WHERE claim_in_channel.channel_hash=claim.claim_hash - ) ELSE 0 END AS claims_in_channel + ) ELSE 0 END AS claims_in_channel, + channel.txo_hash as channel_txo_hash, channel.height as channel_height """, **constraints ) + INTEGER_PARAMS = { + 'height', 'release_time', 'publish_time', + 'amount', 'effective_amount', 'support_amount', + 'trending_daily', 'trending_day_one', 'trending_day_two', + 'trending_weekly', 'trending_week_one', 'trending_week_two' + } + SEARCH_PARAMS = { 'name', 'claim_id', 'txid', 'nout', 'channel', 'channel_id', 'channel_name', 'any_tags', 'all_tags', 'not_tags', 'any_locations', 'all_locations', 'not_locations', 'any_languages', 'all_languages', 'not_languages', - 'is_controlling', 'limit', 'offset' - } + 'is_controlling', 'limit', 'offset', 'order_by' + } | INTEGER_PARAMS + + ORDER_FIELDS = { + 'name', + } | INTEGER_PARAMS def search(self, constraints) -> Tuple[List, int, int]: assert set(constraints).issubset(self.SEARCH_PARAMS), \ @@ -550,7 +634,8 @@ class SQLDB: total = self.get_claims_count(**constraints) constraints['offset'] = abs(constraints.get('offset', 0)) constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) - constraints['order_by'] = ["claim.height DESC", "claim.normalized ASC"] + if 'order_by' not in constraints: + constraints['order_by'] = ["height", "^name"] txo_rows = self._search(**constraints) return txo_rows, constraints['offset'], total diff --git a/tests/integration/test_claim_commands.py b/tests/integration/test_claim_commands.py index 424c056b8..2f74024d0 100644 --- a/tests/integration/test_claim_commands.py +++ b/tests/integration/test_claim_commands.py @@ -148,6 +148,22 @@ class ClaimSearchCommand(CommandTestCase): await self.assertFindsClaims([claim3], all_tags=['abc', 'ghi'], any_tags=['jkl'], not_tags=['mno']) await self.assertFindsClaims([claim4, claim3, claim2], all_tags=['abc'], any_tags=['def', 'ghi']) + async def test_order_by(self): + height = await self.ledger.network.get_server_height() + claims = [await self.stream_create(f'claim{i}') for i in range(5)] + + await self.assertFindsClaims(claims, order_by=["^height"]) + await self.assertFindsClaims(list(reversed(claims)), order_by=["height"]) + + await self.assertFindsClaims([claims[0]], height=height+1) + await self.assertFindsClaims([claims[4]], height=height+5) + await self.assertFindsClaims(claims[:1], height=f'<{height+2}', order_by=["^height"]) + await self.assertFindsClaims(claims[:2], height=f'<={height+2}', order_by=["^height"]) + await self.assertFindsClaims(claims[2:], height=f'>{height+2}', order_by=["^height"]) + await self.assertFindsClaims(claims[1:], height=f'>={height+2}', order_by=["^height"]) + + await self.assertFindsClaims(claims, order_by=["^name"]) + class ChannelCommands(CommandTestCase): diff --git a/tests/unit/wallet/server/test_sqldb.py b/tests/unit/wallet/server/test_sqldb.py index 2620c7d47..5d15d743a 100644 --- a/tests/unit/wallet/server/test_sqldb.py +++ b/tests/unit/wallet/server/test_sqldb.py @@ -67,6 +67,13 @@ class TestSQLDB(unittest.TestCase): Input.spend(claim) ) + def get_stream_abandon(self, tx): + claim = Transaction(tx[0].serialize()).outputs[0] + return self._make_tx( + Output.pay_pubkey_hash(claim.amount, b'abc'), + Input.spend(claim) + ) + def get_support(self, tx, amount): claim = Transaction(tx[0].serialize()).outputs[0] return self._make_tx( @@ -101,10 +108,8 @@ class TestSQLDB(unittest.TestCase): return accepted def advance(self, height, txs): - #for skipped_height in range(self._current_height+1, height): - # self.sql.advance_txs(skipped_height, [], self.timer) self._current_height = height - self.sql.advance_txs(height, txs, self.timer) + self.sql.advance_txs(height, txs, {'timestamp': 1}, self.timer) def state(self, controlling=None, active=None, accepted=None): self.assertEqual(controlling or [], self.get_controlling()) @@ -175,3 +180,59 @@ class TestSQLDB(unittest.TestCase): ], accepted=[] ) + + def test_competing_claims_in_single_block_height_wins(self): + advance, state = self.advance, self.state + stream = self.get_stream('Claim A', 10*COIN) + stream2 = self.get_stream('Claim B', 10*COIN) + advance(13, [stream, stream2]) + state( + controlling=('Claim A', 10*COIN, 10*COIN, 13), + active=[('Claim B', 10*COIN, 10*COIN, 13)], + accepted=[] + ) + + def test_competing_claims_in_single_block_effective_amount_wins(self): + advance, state = self.advance, self.state + stream = self.get_stream('Claim A', 10*COIN) + stream2 = self.get_stream('Claim B', 11*COIN) + advance(13, [stream, stream2]) + state( + controlling=('Claim B', 11*COIN, 11*COIN, 13), + active=[('Claim A', 10*COIN, 10*COIN, 13)], + accepted=[] + ) + + def test_winning_claim_deleted(self): + advance, state = self.advance, self.state + stream = self.get_stream('Claim A', 10*COIN) + stream2 = self.get_stream('Claim B', 11*COIN) + advance(13, [stream, stream2]) + state( + controlling=('Claim B', 11*COIN, 11*COIN, 13), + active=[('Claim A', 10*COIN, 10*COIN, 13)], + accepted=[] + ) + advance(14, [self.get_stream_abandon(stream2)]) + state( + controlling=('Claim A', 10*COIN, 10*COIN, 13), + active=[], + accepted=[] + ) + + def test_winning_claim_deleted_and_new_claim_becomes_winner(self): + advance, state = self.advance, self.state + stream = self.get_stream('Claim A', 10*COIN) + stream2 = self.get_stream('Claim B', 11*COIN) + advance(13, [stream, stream2]) + state( + controlling=('Claim B', 11*COIN, 11*COIN, 13), + active=[('Claim A', 10*COIN, 10*COIN, 13)], + accepted=[] + ) + advance(15, [self.get_stream_abandon(stream2), self.get_stream('Claim C', 12*COIN)]) + state( + controlling=('Claim C', 12*COIN, 12*COIN, 15), + active=[('Claim A', 10*COIN, 10*COIN, 13)], + accepted=[] + )