improve resolve caching

This commit is contained in:
Jack Robison 2021-10-14 16:08:28 -04:00
parent 6416d8ce9c
commit 99df418f1d
2 changed files with 50 additions and 29 deletions

View file

@ -236,6 +236,8 @@ class BlockProcessor:
# self.search_cache = {}
self.resolve_cache = LRUCache(2**16)
self.resolve_outputs_cache = LRUCache(2 ** 16)
self.history_cache = {}
self.status_server = StatusServer()
@ -1590,6 +1592,7 @@ class BlockProcessor:
self.pending_transactions.clear()
self.pending_support_amount_change.clear()
self.resolve_cache.clear()
self.resolve_outputs_cache.clear()
async def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0

View file

@ -1016,35 +1016,53 @@ class LBRYElectrumX(SessionBase):
return self.bp.resolve_cache[url]
async def claimtrie_resolve(self, *urls):
rows, extra = [], []
for url in urls:
self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel, repost, reposted_channel = await self._cached_resolve_url(url)
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None)
sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
def _cached_resolve():
rows, extra = [], []
for url in urls:
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = self.db._resolve(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64(
rows, extra, 0, None, None
)
return serialized_outputs
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
else:
return await self.loop.run_in_executor(None, _cached_resolve)
finally:
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self):
return self.bp.height