asyncify for loop

This commit is contained in:
Jack Robison 2022-05-20 14:40:26 -04:00
parent 4466bb1451
commit 7263ec553e
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 18 additions and 5 deletions

View file

@ -1,4 +1,5 @@
import struct import struct
import asyncio
import hashlib import hashlib
import hmac import hmac
import ipaddress import ipaddress
@ -766,3 +767,11 @@ def expand_result(results):
if inner_hits: if inner_hits:
return expand_result(inner_hits) return expand_result(inner_hits)
return expanded return expanded
async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000):
async_sleep = asyncio.sleep
for cnt, item in enumerate(gen):
yield item
if cnt % ticks_per_sleep == 0:
await async_sleep(0)

View file

@ -22,7 +22,8 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION
from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from hub.herald.search import SearchIndex from hub.herald.search import SearchIndex
from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS
from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS
from hub.common import LRUCache, LRUCacheWithMetrics, asyncify_for_loop
from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
from hub.herald.framer import NewlineFramer from hub.herald.framer import NewlineFramer
@ -204,8 +205,8 @@ class SessionManager:
elastic_host=env.elastic_host, elastic_port=env.elastic_port elastic_host=env.elastic_host, elastic_port=env.elastic_port
) )
self.running = False self.running = False
self.hashX_history_cache = LRUCache(2 ** 14) self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='raw_history', namespace=NAMESPACE)
self.hashX_full_cache = LRUCache(2 ** 12) self.hashX_full_cache = LRUCacheWithMetrics(2 ** 12, metric_name='full_history', namespace=NAMESPACE)
self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE) self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE)
def clear_caches(self): def clear_caches(self):
@ -1463,8 +1464,11 @@ class LBRYElectrumX(asyncio.Protocol):
async def confirmed_and_unconfirmed_history(self, hashX): async def confirmed_and_unconfirmed_history(self, hashX):
# Note history is ordered but unconfirmed is unordered in e-s # Note history is ordered but unconfirmed is unordered in e-s
history = await self.session_manager.limited_history(hashX) history = await self.session_manager.limited_history(hashX)
conf = [{'tx_hash': txid, 'height': height} conf = [
for txid, height in history] item async for item in asyncify_for_loop(
({'tx_hash': txid, 'height': height} for txid, height in history), 1000
)
]
return conf + self.unconfirmed_history(hashX) return conf + self.unconfirmed_history(hashX)
async def scripthash_get_history(self, scripthash): async def scripthash_get_history(self, scripthash):