improve resolve caching

This commit is contained in:
Jack Robison 2021-10-14 16:08:28 -04:00 committed by Victor Shyba
parent eeaf9a72e2
commit e6d470f110
2 changed files with 50 additions and 29 deletions

View file

@ -236,6 +236,8 @@ class BlockProcessor:
# self.search_cache = {} # self.search_cache = {}
self.resolve_cache = LRUCache(2**16) self.resolve_cache = LRUCache(2**16)
self.resolve_outputs_cache = LRUCache(2 ** 16)
self.history_cache = {} self.history_cache = {}
self.status_server = StatusServer() self.status_server = StatusServer()
@ -1590,6 +1592,7 @@ class BlockProcessor:
self.pending_transactions.clear() self.pending_transactions.clear()
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.resolve_cache.clear() self.resolve_cache.clear()
self.resolve_outputs_cache.clear()
async def backup_block(self): async def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0

View file

@ -1016,35 +1016,53 @@ class LBRYElectrumX(SessionBase):
return self.bp.resolve_cache[url] return self.bp.resolve_cache[url]
async def claimtrie_resolve(self, *urls): async def claimtrie_resolve(self, *urls):
rows, extra = [], [] sorted_urls = tuple(sorted(urls))
for url in urls: self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel, repost, reposted_channel = await self._cached_resolve_url(url) def _cached_resolve():
if isinstance(channel, ResolveCensoredError): rows, extra = [], []
rows.append(channel)
extra.append(channel.censor_row) for url in urls:
elif isinstance(stream, ResolveCensoredError): if url not in self.bp.resolve_cache:
rows.append(stream) self.bp.resolve_cache[url] = self.db._resolve(url)
extra.append(stream.censor_row) stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
elif channel and not stream: if isinstance(channel, ResolveCensoredError):
rows.append(channel) rows.append(channel)
# print("resolved channel", channel.name.decode()) extra.append(channel.censor_row)
if repost: elif isinstance(stream, ResolveCensoredError):
extra.append(repost) rows.append(stream)
if reposted_channel: extra.append(stream.censor_row)
extra.append(reposted_channel) elif channel and not stream:
elif stream: rows.append(channel)
# print("resolved stream", stream.name.decode()) # print("resolved channel", channel.name.decode())
rows.append(stream) if repost:
if channel: extra.append(repost)
# print("and channel", channel.name.decode()) if reposted_channel:
extra.append(channel) extra.append(reposted_channel)
if repost: elif stream:
extra.append(repost) # print("resolved stream", stream.name.decode())
if reposted_channel: rows.append(stream)
extra.append(reposted_channel) if channel:
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) # print("and channel", channel.name.decode())
return Outputs.to_base64(rows, extra, 0, None, None) extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64(
rows, extra, 0, None, None
)
return serialized_outputs
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
else:
return await self.loop.run_in_executor(None, _cached_resolve)
finally:
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.bp.height