From e5713dc63c35ef8fc2622b4a1ba0fe709fa4406e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 19 May 2022 12:53:49 -0400 Subject: [PATCH 1/6] improve caching for `blockchain.address.get_history` --- hub/db/db.py | 26 +++++++++++++++++----- hub/herald/service.py | 18 +++++++++++++++ hub/herald/session.py | 51 +++++++++++++++++++++++++++++++------------ hub/scribe/service.py | 6 ++--- 4 files changed, 79 insertions(+), 22 deletions(-) diff --git a/hub/db/db.py b/hub/db/db.py index 8a05cf7..669af9a 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -959,11 +959,25 @@ class HubDB: return self.total_transactions[tx_num] return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) - def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: + def _get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: if self._cache_all_tx_hashes: return [None if tx_num > self.db_tx_count else self.total_transactions[tx_num] for tx_num in tx_nums] return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False) + async def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: + if self._cache_all_tx_hashes: + result = [] + append_result = result.append + for tx_num in tx_nums: + append_result(None if tx_num > self.db_tx_count else self.total_transactions[tx_num]) + await asyncio.sleep(0) + return result + + def _get_tx_hashes(): + return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False) + + return await asyncio.get_event_loop().run_in_executor(self._executor, _get_tx_hashes) + def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]: return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False) @@ -1159,7 +1173,7 @@ class HubDB: raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') return [self.coin.header_hash(header) for header in self.headers[height:height + count]] - def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + def _read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: txs = [] txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): @@ -1168,6 +1182,9 @@ class HubDB: break return txs + async def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + return await asyncio.get_event_loop().run_in_executor(self._executor, self._read_history, hashX, limit) + async def limited_history(self, hashX, *, limit=1000): """Return an unpruned, sorted list of (tx_hash, height) tuples of confirmed transactions that touched the address, earliest in @@ -1176,13 +1193,12 @@ class HubDB: limit to None to get them all. """ run_in_executor = asyncio.get_event_loop().run_in_executor - tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit) + tx_nums = await run_in_executor(self._executor, self._read_history, hashX, limit) history = [] append_history = history.append while tx_nums: batch, tx_nums = tx_nums[:100], tx_nums[100:] - batch_result = self.get_tx_hashes(batch) if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hashes, batch) - for tx_num, tx_hash in zip(batch, batch_result): + for tx_num, tx_hash in zip(batch, await self.get_tx_hashes(batch)): append_history((tx_hash, bisect_right(self.tx_counts, tx_num))) await asyncio.sleep(0) return history diff --git a/hub/herald/service.py b/hub/herald/service.py index 64f9415..fefa3ee 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -48,6 +48,24 @@ class HubServerService(BlockchainReaderService): touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs self.notifications_to_send.append((set(touched_hashXs), height)) + def unwind(self): + prev_count = self.db.tx_counts.pop() + tx_count = self.db.tx_counts[-1] + self.db.headers.pop() + self.db.block_hashes.pop() + current_count = prev_count + for _ in range(prev_count - tx_count): + if current_count in self.session_manager.history_tx_info_cache: + self.session_manager.history_tx_info_cache.pop(current_count) + current_count -= 1 + if self.db._cache_all_tx_hashes: + for _ in range(prev_count - tx_count): + tx_hash = self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + if tx_hash in self.db.tx_cache: + self.db.tx_cache.pop(tx_hash) + assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" + self.db.merkle_cache.clear() + def _detect_changes(self): super()._detect_changes() start = time.perf_counter() diff --git a/hub/herald/session.py b/hub/herald/session.py index 4907c87..d69a12a 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -22,7 +22,7 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time -from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS +from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -183,7 +183,6 @@ class SessionManager: self.cur_group = SessionGroup(0) self.txs_sent = 0 self.start_time = time.time() - self.history_cache = {} self.resolve_outputs_cache = {} self.resolve_cache = {} self.notified_height: typing.Optional[int] = None @@ -198,9 +197,13 @@ class SessionManager: elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.running = False + self.hashX_history_cache = LRUCache(2 ** 14) + self.hashX_full_cache = LRUCache(2 ** 12) + self.history_tx_info_cache = LRUCache(2 ** 17) def clear_caches(self): - self.history_cache.clear() + self.hashX_history_cache.clear() + self.hashX_full_cache.clear() self.resolve_outputs_cache.clear() self.resolve_cache.clear() @@ -592,16 +595,36 @@ class SessionManager: self.txs_sent += 1 return hex_hash - async def limited_history(self, hashX): - """A caching layer.""" - if hashX not in self.history_cache: - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. + async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]: + if hashX in self.hashX_full_cache: + return self.hashX_full_cache[hashX] + if hashX not in self.hashX_history_cache: limit = self.env.max_send // 97 - self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) - return self.history_cache[hashX] + self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) + else: + tx_nums = self.hashX_history_cache[hashX] + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for tx_num in tx_nums: + if tx_num in self.history_tx_info_cache: + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + else: + append_needed_tx_info(tx_num) + await asyncio.sleep(0) + if needed_tx_infos: + + for tx_num, tx_hash in zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos)): + hist = tx_hash[::-1].hex(), bisect_right(self.db.tx_counts, tx_num) + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = hist + await asyncio.sleep(0) + history = [] + history_append = history.append + for tx_num in tx_nums: + history_append(tx_infos[tx_num]) + await asyncio.sleep(0) + self.hashX_full_cache[hashX] = history + return history def _notify_peer(self, peer): notify_tasks = [ @@ -1419,8 +1442,8 @@ class LBRYElectrumX(asyncio.Protocol): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s history = await self.session_manager.limited_history(hashX) - conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} - for tx_hash, height in history] + conf = [{'tx_hash': txid, 'height': height} + for txid, height in history] return conf + self.unconfirmed_history(hashX) async def scripthash_get_history(self, scripthash): diff --git a/hub/scribe/service.py b/hub/scribe/service.py index b22451c..74bd73c 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -1245,7 +1245,7 @@ class BlockchainProcessorService(BlockchainService): if hashX in self.hashX_full_cache: return self.hashX_full_cache[hashX] if hashX not in self.hashX_history_cache: - self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None) + self.hashX_history_cache[hashX] = tx_nums = self.db._read_history(hashX, limit=None) else: tx_nums = self.hashX_history_cache[hashX] needed_tx_infos = [] @@ -1257,7 +1257,7 @@ class BlockchainProcessorService(BlockchainService): else: append_needed_tx_info(tx_num) if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' history = '' @@ -1487,7 +1487,7 @@ class BlockchainProcessorService(BlockchainService): else: append_needed_tx_info(tx_num) if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' tx_infos[tx_num] = tx_info self.history_tx_info_cache[tx_num] = tx_info -- 2.45.3 From cb1ed3beb1946fbafee6364470683dbb8a0849ed Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 19 May 2022 16:25:23 -0400 Subject: [PATCH 2/6] history_tx_info_cache metrics --- hub/herald/session.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hub/herald/session.py b/hub/herald/session.py index d69a12a..962906b 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -22,7 +22,7 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time -from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache +from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -199,7 +199,7 @@ class SessionManager: self.running = False self.hashX_history_cache = LRUCache(2 ** 14) self.hashX_full_cache = LRUCache(2 ** 12) - self.history_tx_info_cache = LRUCache(2 ** 17) + self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): self.hashX_history_cache.clear() @@ -607,8 +607,9 @@ class SessionManager: append_needed_tx_info = needed_tx_infos.append tx_infos = {} for tx_num in tx_nums: - if tx_num in self.history_tx_info_cache: - tx_infos[tx_num] = self.history_tx_info_cache[tx_num] + cached = self.history_tx_info_cache.get(tx_num) + if cached is not None: + tx_infos[tx_num] = cached else: append_needed_tx_info(tx_num) await asyncio.sleep(0) -- 2.45.3 From e9f2b1efea591409bf604238fc1a7a16521583c1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 20 May 2022 11:32:39 -0400 Subject: [PATCH 3/6] asyncio.sleep less often --- hub/herald/session.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/hub/herald/session.py b/hub/herald/session.py index 962906b..7060684 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -606,25 +606,31 @@ class SessionManager: needed_tx_infos = [] append_needed_tx_info = needed_tx_infos.append tx_infos = {} + cnt = 0 for tx_num in tx_nums: cached = self.history_tx_info_cache.get(tx_num) if cached is not None: tx_infos[tx_num] = cached else: append_needed_tx_info(tx_num) - await asyncio.sleep(0) + cnt += 1 + if cnt % 1000 == 0: + await asyncio.sleep(0) if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos)): hist = tx_hash[::-1].hex(), bisect_right(self.db.tx_counts, tx_num) tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = hist - await asyncio.sleep(0) + cnt += 1 + if cnt % 1000 == 0: + await asyncio.sleep(0) history = [] history_append = history.append for tx_num in tx_nums: history_append(tx_infos[tx_num]) - await asyncio.sleep(0) - self.hashX_full_cache[hashX] = history + self.hashX_full_cache[hashX] = history + cnt += 1 + if cnt % 1000 == 0: + await asyncio.sleep(0) return history def _notify_peer(self, peer): -- 2.45.3 From 4466bb14518497b6ddda0100862ec9b5562b168a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 20 May 2022 11:58:18 -0400 Subject: [PATCH 4/6] address_subscriptions gauge and history_size histogram metrics --- hub/common.py | 3 +++ hub/herald/session.py | 23 +++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/hub/common.py b/hub/common.py index f077c01..4376663 100644 --- a/hub/common.py +++ b/hub/common.py @@ -28,6 +28,9 @@ CLAIM_HASH_LEN = 20 HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) +SIZE_BUCKETS = ( + 1, 10, 100, 500, 1000, 2000, 4000, 7500, 10000, 15000, 25000, 50000, 75000, 100000, 150000, 250000, float('inf') +) CLAIM_TYPES = { 'stream': 1, diff --git a/hub/herald/session.py b/hub/herald/session.py index 7060684..6c5159e 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -21,7 +21,7 @@ from hub import __version__, PROMETHEUS_NAMESPACE from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex -from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time +from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification @@ -146,7 +146,6 @@ class SessionManager: pending_query_metric = Gauge( "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE ) - client_version_metric = Counter( "clients", "Number of connections received per client version", namespace=NAMESPACE, labelnames=("version",) @@ -155,6 +154,14 @@ class SessionManager: "address_history", "Time to fetch an address history", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) + address_subscription_metric = Gauge( + "address_subscriptions", "Number of subscribed addresses", + namespace=NAMESPACE + ) + address_history_size_metric = Histogram( + "history_size", "Sizes of histories for subscribed addresses", + namespace=NAMESPACE, buckets=SIZE_BUCKETS + ) notifications_in_flight_metric = Gauge( "notifications_in_flight", "Count of notifications in flight", namespace=NAMESPACE @@ -596,13 +603,16 @@ class SessionManager: return hex_hash async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]: - if hashX in self.hashX_full_cache: - return self.hashX_full_cache[hashX] + cached_full_history = self.hashX_full_cache.get(hashX) + if cached_full_history is not None: + self.address_history_size_metric.observe(len(cached_full_history)) + return cached_full_history if hashX not in self.hashX_history_cache: limit = self.env.max_send // 97 self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) else: tx_nums = self.hashX_history_cache[hashX] + self.address_history_size_metric.observe(len(tx_nums)) needed_tx_infos = [] append_needed_tx_info = needed_tx_infos.append tx_infos = {} @@ -653,6 +663,7 @@ class SessionManager: def remove_session(self, session): """Remove a session from our sessions list if there.""" session_id = id(session) + self.address_subscription_metric.dec(len(session.hashX_subs)) for hashX in session.hashX_subs: sessions = self.hashx_subscriptions_by_session[hashX] sessions.remove(session_id) @@ -1378,6 +1389,8 @@ class LBRYElectrumX(asyncio.Protocol): sessions.remove(id(self)) except KeyError: pass + else: + self.session_manager.address_subscription_metric.dec() if not sessions: self.hashX_subs.pop(hashX, None) @@ -1415,6 +1428,7 @@ class LBRYElectrumX(asyncio.Protocol): if len(addresses) > 1000: raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') results = [] + self.session_manager.address_subscription_metric.inc(len(addresses)) for address in addresses: results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) await asyncio.sleep(0) @@ -1473,6 +1487,7 @@ class LBRYElectrumX(asyncio.Protocol): scripthash: the SHA256 hash of the script to subscribe to""" hashX = scripthash_to_hashX(scripthash) + self.session_manager.address_subscription_metric.inc() return await self.hashX_subscribe(hashX, scripthash) async def scripthash_unsubscribe(self, scripthash: str): -- 2.45.3 From 7263ec553e8e4e2739016b0cfba8f99f87a4867e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 20 May 2022 14:40:26 -0400 Subject: [PATCH 5/6] asyncify for loop --- hub/common.py | 9 +++++++++ hub/herald/session.py | 14 +++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/hub/common.py b/hub/common.py index 4376663..6579d6f 100644 --- a/hub/common.py +++ b/hub/common.py @@ -1,4 +1,5 @@ import struct +import asyncio import hashlib import hmac import ipaddress @@ -766,3 +767,11 @@ def expand_result(results): if inner_hits: return expand_result(inner_hits) return expanded + + +async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000): + async_sleep = asyncio.sleep + for cnt, item in enumerate(gen): + yield item + if cnt % ticks_per_sleep == 0: + await async_sleep(0) diff --git a/hub/herald/session.py b/hub/herald/session.py index 6c5159e..ef0ed51 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -22,7 +22,8 @@ from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS -from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS, LRUCache, LRUCacheWithMetrics +from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS +from hub.common import LRUCache, LRUCacheWithMetrics, asyncify_for_loop from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -204,8 +205,8 @@ class SessionManager: elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.running = False - self.hashX_history_cache = LRUCache(2 ** 14) - self.hashX_full_cache = LRUCache(2 ** 12) + self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='raw_history', namespace=NAMESPACE) + self.hashX_full_cache = LRUCacheWithMetrics(2 ** 12, metric_name='full_history', namespace=NAMESPACE) self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): @@ -1463,8 +1464,11 @@ class LBRYElectrumX(asyncio.Protocol): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s history = await self.session_manager.limited_history(hashX) - conf = [{'tx_hash': txid, 'height': height} - for txid, height in history] + conf = [ + item async for item in asyncify_for_loop( + ({'tx_hash': txid, 'height': height} for txid, height in history), 1000 + ) + ] return conf + self.unconfirmed_history(hashX) async def scripthash_get_history(self, scripthash): -- 2.45.3 From 75e9123eaf338a1eaa5491c081a8522471106734 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 21 May 2022 15:25:07 -0400 Subject: [PATCH 6/6] update the history cache in place instead of clearing/rebuilding --- hub/db/db.py | 4 +- hub/herald/service.py | 3 ++ hub/herald/session.py | 102 +++++++++++++++++++++++++++++------------- 3 files changed, 75 insertions(+), 34 deletions(-) diff --git a/hub/db/db.py b/hub/db/db.py index 669af9a..117501f 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -1173,7 +1173,7 @@ class HubDB: raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') return [self.coin.header_hash(header) for header in self.headers[height:height + count]] - def _read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: txs = [] txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): @@ -1182,7 +1182,7 @@ class HubDB: break return txs - async def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + async def read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: return await asyncio.get_event_loop().run_in_executor(self._executor, self._read_history, hashX, limit) async def limited_history(self, hashX, *, limit=1000): diff --git a/hub/herald/service.py b/hub/herald/service.py index fefa3ee..df7bedb 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -46,9 +46,12 @@ class HubServerService(BlockchainReaderService): def advance(self, height: int): super().advance(height) touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs + self.session_manager.update_history_caches(touched_hashXs) self.notifications_to_send.append((set(touched_hashXs), height)) def unwind(self): + self.session_manager.hashX_raw_history_cache.clear() + self.session_manager.hashX_history_cache.clear() prev_count = self.db.tx_counts.pop() tx_count = self.db.tx_counts[-1] self.db.headers.pop() diff --git a/hub/herald/session.py b/hub/herald/session.py index ef0ed51..83745a0 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -1,5 +1,5 @@ import os -import ssl +import sys import math import time import codecs @@ -23,7 +23,7 @@ from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS -from hub.common import LRUCache, LRUCacheWithMetrics, asyncify_for_loop +from hub.common import LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -33,6 +33,8 @@ if typing.TYPE_CHECKING: from hub.scribe.daemon import LBCDaemon from hub.herald.mempool import HubMemPool +PYTHON_VERSION = sys.version_info.major, sys.version_info.minor +TypedDict = dict if PYTHON_VERSION < (3, 8) else typing.TypedDict BAD_REQUEST = 1 DAEMON_ERROR = 2 @@ -43,6 +45,11 @@ SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' 'required_names other_names') +class CachedAddressHistoryItem(TypedDict): + tx_hash: str + height: int + + def scripthash_to_hashX(scripthash: str) -> bytes: try: bin_hash = hex_str_to_hash(scripthash) @@ -205,16 +212,50 @@ class SessionManager: elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.running = False - self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='raw_history', namespace=NAMESPACE) - self.hashX_full_cache = LRUCacheWithMetrics(2 ** 12, metric_name='full_history', namespace=NAMESPACE) - self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 18, metric_name='history_tx', namespace=NAMESPACE) + # hashX: List[int] + self.hashX_raw_history_cache = LRUCacheWithMetrics(2 ** 16, metric_name='raw_history', namespace=NAMESPACE) + # hashX: List[CachedAddressHistoryItem] + self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='full_history', namespace=NAMESPACE) + # tx_num: Tuple[txid, height] + self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 19, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): - self.hashX_history_cache.clear() - self.hashX_full_cache.clear() self.resolve_outputs_cache.clear() self.resolve_cache.clear() + def update_history_caches(self, touched_hashXs: typing.List[bytes]): + update_history_cache = {} + for hashX in set(touched_hashXs): + history_tx_nums = None + # if the history is the raw_history_cache, update it + # TODO: use a reversed iterator for this instead of rescanning it all + if hashX in self.hashX_raw_history_cache: + self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None) + # if it's in hashX_history_cache, prepare to update it in a batch + if hashX in self.hashX_history_cache: + full_cached = self.hashX_history_cache[hashX] + if history_tx_nums is None: + history_tx_nums = self.db._read_history(hashX, None) + new_txs = history_tx_nums[len(full_cached):] + update_history_cache[hashX] = full_cached, new_txs + if update_history_cache: + # get the set of new tx nums that were touched in all of the new histories to be cached + total_tx_nums = set() + for _, new_txs in update_history_cache.values(): + total_tx_nums.update(new_txs) + total_tx_nums = list(total_tx_nums) + # collect the total new tx infos + referenced_new_txs = { + tx_num: (CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num))) + for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums)) + } + # update the cached history lists + get_referenced = referenced_new_txs.__getitem__ + for hashX, (full, new_txs) in update_history_cache.items(): + append_to_full = full.append + for tx_num in new_txs: + append_to_full(get_referenced(tx_num)) + async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -603,45 +644,47 @@ class SessionManager: self.txs_sent += 1 return hex_hash - async def limited_history(self, hashX: bytes) -> typing.List[typing.Tuple[str, int]]: - cached_full_history = self.hashX_full_cache.get(hashX) + async def _cached_raw_history(self, hashX: bytes, limit: typing.Optional[int] = None): + tx_nums = self.hashX_raw_history_cache.get(hashX) + if tx_nums is None: + self.hashX_raw_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) + return tx_nums + + async def cached_confirmed_history(self, hashX: bytes, + limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]: + cached_full_history = self.hashX_history_cache.get(hashX) + # return the cached history if cached_full_history is not None: self.address_history_size_metric.observe(len(cached_full_history)) return cached_full_history - if hashX not in self.hashX_history_cache: - limit = self.env.max_send // 97 - self.hashX_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) - else: - tx_nums = self.hashX_history_cache[hashX] - self.address_history_size_metric.observe(len(tx_nums)) + # return the history and update the caches + tx_nums = await self._cached_raw_history(hashX, limit) needed_tx_infos = [] append_needed_tx_info = needed_tx_infos.append tx_infos = {} - cnt = 0 - for tx_num in tx_nums: + for cnt, tx_num in enumerate(tx_nums): # determine which tx_hashes are cached and which we need to look up cached = self.history_tx_info_cache.get(tx_num) if cached is not None: tx_infos[tx_num] = cached else: append_needed_tx_info(tx_num) - cnt += 1 if cnt % 1000 == 0: await asyncio.sleep(0) - if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos)): - hist = tx_hash[::-1].hex(), bisect_right(self.db.tx_counts, tx_num) + if needed_tx_infos: # request all the needed tx hashes in one batch, cache the txids and heights + for cnt, (tx_num, tx_hash) in enumerate(zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos))): + hist = CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num)) tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = hist - cnt += 1 if cnt % 1000 == 0: await asyncio.sleep(0) + # ensure the ordering of the txs history = [] history_append = history.append - for tx_num in tx_nums: + for cnt, tx_num in enumerate(tx_nums): history_append(tx_infos[tx_num]) - self.hashX_full_cache[hashX] = history - cnt += 1 if cnt % 1000 == 0: await asyncio.sleep(0) + self.hashX_history_cache[hashX] = history + self.address_history_size_metric.observe(len(history)) return history def _notify_peer(self, peer): @@ -1463,13 +1506,8 @@ class LBRYElectrumX(asyncio.Protocol): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.session_manager.limited_history(hashX) - conf = [ - item async for item in asyncify_for_loop( - ({'tx_hash': txid, 'height': height} for txid, height in history), 1000 - ) - ] - return conf + self.unconfirmed_history(hashX) + history = await self.session_manager.cached_confirmed_history(hashX) + return history + self.unconfirmed_history(hashX) async def scripthash_get_history(self, scripthash): """Return the confirmed and unconfirmed history of a scripthash.""" -- 2.45.3