diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index fa25006a4..0e06aebd7 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -30,7 +30,6 @@ if typing.TYPE_CHECKING: class TrendingNotification(NamedTuple): height: int - added: bool prev_amount: int new_amount: int @@ -1322,9 +1321,9 @@ class BlockProcessor: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning) - def _add_claim_activation_change_notification(self, claim_id: str, height: int, added: bool, prev_amount: int, + def _add_claim_activation_change_notification(self, claim_id: str, height: int, prev_amount: int, new_amount: int): - self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, added, prev_amount, new_amount)) + self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, prev_amount, new_amount)) def _get_cumulative_update_ops(self, height: int): # update the last takeover height for names with takeovers @@ -1402,25 +1401,13 @@ class BlockProcessor: (name, prev_effective_amount, amt.tx_num, amt.position), (touched,) ) - if (name, touched) in self.activated_claim_amount_by_name_and_hash: - self._add_claim_activation_change_notification( - touched.hex(), height, True, prev_effective_amount, - self.activated_claim_amount_by_name_and_hash[(name, touched)] - ) - if touched in self.activated_support_amount_by_claim: - for support_amount in self.activated_support_amount_by_claim[touched]: - self._add_claim_activation_change_notification( - touched.hex(), height, True, prev_effective_amount, support_amount - ) - if touched in self.removed_active_support_amount_by_claim: - for support_amount in self.removed_active_support_amount_by_claim[touched]: - self._add_claim_activation_change_notification( - touched.hex(), height, False, prev_effective_amount, support_amount - ) new_effective_amount = self._get_pending_effective_amount(name, touched) self.db.prefix_db.effective_amount.stage_put( (name, new_effective_amount, tx_num, position), (touched,) ) + self._add_claim_activation_change_notification( + touched.hex(), height, prev_effective_amount, new_effective_amount + ) for channel_hash, count in self.pending_channel_counts.items(): if count != 0: diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index c762920ef..68383959a 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -154,21 +154,102 @@ class SearchIndex: async def update_trending_score(self, params): update_trending_score_script = """ - double softenLBC(double lbc) { Math.pow(lbc, 1.0f / 3.0f) } - double inflateUnits(int height) { - int renormalizationPeriod = 100000; - double doublingRate = 400.0f; - Math.pow(2.0, (height % renormalizationPeriod) / doublingRate) + double softenLBC(double lbc) { return (Math.pow(lbc, 1.0 / 3.0)); } + + double logsumexp(double x, double y) + { + double top; + if(x > y) + top = x; + else + top = y; + double result = top + Math.log(Math.exp(x-top) + Math.exp(y-top)); + return(result); } + + double logdiffexp(double big, double small) + { + return big + Math.log(1.0 - Math.exp(small - big)); + } + + double squash(double x) + { + if(x < 0.0) + return -Math.log(1.0 - x); + else + return Math.log(x + 1.0); + } + + double unsquash(double x) + { + if(x < 0.0) + return 1.0 - Math.exp(-x); + else + return Math.exp(x) - 1.0; + } + + double log_to_squash(double x) + { + return logsumexp(x, 0.0); + } + + double squash_to_log(double x) + { + //assert x > 0.0; + return logdiffexp(x, 0.0); + } + + double squashed_add(double x, double y) + { + // squash(unsquash(x) + unsquash(y)) but avoiding overflow. + // Cases where the signs are the same + if (x < 0.0 && y < 0.0) + return -logsumexp(-x, logdiffexp(-y, 0.0)); + if (x >= 0.0 && y >= 0.0) + return logsumexp(x, logdiffexp(y, 0.0)); + // Where the signs differ + if (x >= 0.0 && y < 0.0) + if (Math.abs(x) >= Math.abs(y)) + return logsumexp(0.0, logdiffexp(x, -y)); + else + return -logsumexp(0.0, logdiffexp(-y, x)); + if (x < 0.0 && y >= 0.0) + { + // Addition is commutative, hooray for new math + return squashed_add(y, x); + } + return 0.0; + } + + double squashed_multiply(double x, double y) + { + // squash(unsquash(x)*unsquash(y)) but avoiding overflow. + int sign; + if(x*y >= 0.0) + sign = 1; + else + sign = -1; + return sign*logsumexp(squash_to_log(Math.abs(x)) + + squash_to_log(Math.abs(y)), 0.0); + } + + // Squashed inflated units + double inflateUnits(int height) { + double timescale = 576.0; // Half life of 400 = e-folding time of a day + // by coincidence, so may as well go with it + return log_to_squash(height / timescale); + } + double spikePower(double newAmount) { if (newAmount < 50.0) { - 0.5 + return(0.5); } else if (newAmount < 85.0) { - newAmount / 100.0 + return(newAmount / 100.0); } else { - 0.85 + return(0.85); } } + double spikeMass(double oldAmount, double newAmount) { double softenedChange = softenLBC(Math.abs(newAmount - oldAmount)); double changeInSoftened = Math.abs(softenLBC(newAmount) - softenLBC(oldAmount)); @@ -181,19 +262,11 @@ class SearchIndex: } for (i in params.src.changes) { double units = inflateUnits(i.height); - if (i.added) { - if (ctx._source.trending_score == null) { - ctx._source.trending_score = (units * spikeMass(i.prev_amount, i.prev_amount + i.new_amount)); - } else { - ctx._source.trending_score += (units * spikeMass(i.prev_amount, i.prev_amount + i.new_amount)); - } - } else { - if (ctx._source.trending_score == null) { - ctx._source.trending_score = (units * spikeMass(i.prev_amount, i.prev_amount - i.new_amount)); - } else { - ctx._source.trending_score += (units * spikeMass(i.prev_amount, i.prev_amount - i.new_amount)); - } + if (ctx._source.trending_score == null) { + ctx._source.trending_score = 0.0; } + double bigSpike = squashed_multiply(units, squash(spikeMass(i.prev_amount, i.new_amount))); + ctx._source.trending_score = squashed_add(ctx._source.trending_score, bigSpike); } """ start = time.perf_counter() @@ -211,9 +284,8 @@ class SearchIndex: 'changes': [ { 'height': p.height, - 'added': p.added, - 'prev_amount': p.prev_amount * 1E-9, - 'new_amount': p.new_amount * 1E-9, + 'prev_amount': p.prev_amount / 1E8, + 'new_amount': p.new_amount / 1E8, } for p in claim_updates ] }} diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index b5ec87fc8..45d8dc6b1 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -1397,47 +1397,32 @@ class ResolveClaimTakeovers(BaseResolveTestCase): ))[0][0]['trending_score'] claim_id1 = (await self.stream_create('derp', '1.0'))['outputs'][0]['claim_id'] - claim_id2 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_id3 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_id4 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - claim_id5 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id'] - - COIN = 1E9 + COIN = 1E8 height = 99000 - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id1, height, True, 1 * COIN, 1_000_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id2, height, True, 1 * COIN, 100_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id2, height + 1, False, 100_001 * COIN, 100_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id3, height, True, 1 * COIN, 1_000 * COIN - ) - self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id4, height, True, 1 * COIN, 10 * COIN + claim_id1, height, 0, 10 * COIN ) await self.generate(1) - - self.assertEqual(3.1711298570548195e+76, await get_trending_score(claim_id1)) - self.assertEqual(-1.369652719234026e+74, await get_trending_score(claim_id2)) - self.assertEqual(2.925275298842502e+75, await get_trending_score(claim_id3)) - self.assertEqual(5.193711055804491e+74, await get_trending_score(claim_id4)) - self.assertEqual(0.6690521635580086, await get_trending_score(claim_id5)) - + self.assertEqual(172.64252836433135, await get_trending_score(claim_id1)) self.conductor.spv_node.server.bp._add_claim_activation_change_notification( - claim_id5, height + 100, True, 2 * COIN, 10 * COIN + claim_id1, height + 1, 10 * COIN, 100 * COIN ) await self.generate(1) - self.assertEqual(5.664516565750028e+74, await get_trending_score(claim_id5)) - + self.assertEqual(173.45931832928875, await get_trending_score(claim_id1)) + self.conductor.spv_node.server.bp._add_claim_activation_change_notification( + claim_id1, height + 100, 100 * COIN, 1000000 * COIN + ) + await self.generate(1) + self.assertEqual(176.65517070393514, await get_trending_score(claim_id1)) + self.conductor.spv_node.server.bp._add_claim_activation_change_notification( + claim_id1, height + 200, 1000000 * COIN, 1 * COIN + ) + await self.generate(1) + self.assertEqual(-174.951347102643, await get_trending_score(claim_id1)) search_results = (await self.conductor.spv_node.server.bp.db.search_index.search(claim_name="derp"))[0] - self.assertEqual(5, len(search_results)) - self.assertListEqual([claim_id1, claim_id3, claim_id4, claim_id2, claim_id5], [c['claim_id'] for c in search_results]) + self.assertEqual(1, len(search_results)) + self.assertListEqual([claim_id1], [c['claim_id'] for c in search_results]) class ResolveAfterReorg(BaseResolveTestCase):