diff --git a/hub/common.py b/hub/common.py index 4376663..6579d6f 100644 --- a/hub/common.py +++ b/hub/common.py @@ -1,4 +1,5 @@ import struct +import asyncio import hashlib import hmac import ipaddress @@ -766,3 +767,11 @@ def expand_result(results): if inner_hits: return expand_result(inner_hits) return expanded + + +async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000): + async_sleep = asyncio.sleep + for cnt, item in enumerate(gen): + yield item + if cnt % ticks_per_sleep == 0: + await async_sleep(0) diff --git a/hub/herald/session.py b/hub/herald/session.py index 6c5159e..ef0ed51 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -22,7 +22,8 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS -from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics +from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS +from hub.common import LRUCache, LRUCacheWithMetrics, asyncify_for_loop from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -204,8 +205,8 @@ class SessionManager: elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.running = False - self.hashX_history_cache = LRUCache(2 ** 14) - self.hashX_full_cache = LRUCache(2 ** 12) + self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='raw_history', namespace=NAMESPACE) + self.hashX_full_cache = LRUCacheWithMetrics(2 ** 12, metric_name='full_history', namespace=NAMESPACE) self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): @@ -1463,8 +1464,11 @@ class LBRYElectrumX(asyncio.Protocol): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s history = await self.session_manager.limited_history(hashX) - conf = [{'tx_hash': txid, 'height': height} - for txid, height in history] + conf = [ + item async for item in asyncify_for_loop( + ({'tx_hash': txid, 'height': height} for txid, height in history), 1000 + ) + ] return conf + self.unconfirmed_history(hashX) async def scripthash_get_history(self, scripthash):