diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 523386951..f541ed0d1 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -236,6 +236,8 @@ class BlockProcessor: # self.search_cache = {} self.resolve_cache = LRUCache(2**16) + self.resolve_outputs_cache = LRUCache(2 ** 16) + self.history_cache = {} self.status_server = StatusServer() @@ -1590,6 +1592,7 @@ class BlockProcessor: self.pending_transactions.clear() self.pending_support_amount_change.clear() self.resolve_cache.clear() + self.resolve_outputs_cache.clear() async def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index dfd81f761..9a9346880 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1016,35 +1016,53 @@ class LBRYElectrumX(SessionBase): return self.bp.resolve_cache[url] async def claimtrie_resolve(self, *urls): - rows, extra = [], [] - for url in urls: - self.session_mgr.urls_to_resolve_count_metric.inc() - stream, channel, repost, reposted_channel = await self._cached_resolve_url(url) - if isinstance(channel, ResolveCensoredError): - rows.append(channel) - extra.append(channel.censor_row) - elif isinstance(stream, ResolveCensoredError): - rows.append(stream) - extra.append(stream.censor_row) - elif channel and not stream: - rows.append(channel) - # print("resolved channel", channel.name.decode()) - if repost: - extra.append(repost) - if reposted_channel: - extra.append(reposted_channel) - elif stream: - # print("resolved stream", stream.name.decode()) - rows.append(stream) - if channel: - # print("and channel", channel.name.decode()) - extra.append(channel) - if repost: - extra.append(repost) - if reposted_channel: - extra.append(reposted_channel) - # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) - return Outputs.to_base64(rows, extra, 0, None, None) + sorted_urls = tuple(sorted(urls)) + self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) + + def _cached_resolve(): + rows, extra = [], [] + + for url in urls: + if url not in self.bp.resolve_cache: + self.bp.resolve_cache[url] = self.db._resolve(url) + stream, channel, repost, reposted_channel = self.bp.resolve_cache[url] + if isinstance(channel, ResolveCensoredError): + rows.append(channel) + extra.append(channel.censor_row) + elif isinstance(stream, ResolveCensoredError): + rows.append(stream) + extra.append(stream.censor_row) + elif channel and not stream: + rows.append(channel) + # print("resolved channel", channel.name.decode()) + if repost: + extra.append(repost) + if reposted_channel: + extra.append(reposted_channel) + elif stream: + # print("resolved stream", stream.name.decode()) + rows.append(stream) + if channel: + # print("and channel", channel.name.decode()) + extra.append(channel) + if repost: + extra.append(repost) + if reposted_channel: + extra.append(reposted_channel) + # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) + self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64( + rows, extra, 0, None, None + ) + return serialized_outputs + + try: + if sorted_urls in self.bp.resolve_outputs_cache: + return self.bp.resolve_outputs_cache[sorted_urls] + else: + + return await self.loop.run_in_executor(None, _cached_resolve) + finally: + self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) async def get_server_height(self): return self.bp.height