diff --git a/hub/common.py b/hub/common.py index f077c01..6579d6f 100644 --- a/hub/common.py +++ b/hub/common.py @@ -1,4 +1,5 @@ import struct +import asyncio import hashlib import hmac import ipaddress @@ -28,6 +29,9 @@ CLAIM_HASH_LEN = 20 HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) +SIZE_BUCKETS = ( + 1, 10, 100, 500, 1000, 2000, 4000, 7500, 10000, 15000, 25000, 50000, 75000, 100000, 150000, 250000, float('inf') +) CLAIM_TYPES = { 'stream': 1, @@ -763,3 +767,11 @@ def expand_result(results): if inner_hits: return expand_result(inner_hits) return expanded + + +async def asyncify_for_loop(gen, ticks_per_sleep: int = 1000): + async_sleep = asyncio.sleep + for cnt, item in enumerate(gen): + yield item + if cnt % ticks_per_sleep == 0: + await async_sleep(0) diff --git a/hub/db/db.py b/hub/db/db.py index 8a05cf7..117501f 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -959,11 +959,25 @@ class HubDB: return self.total_transactions[tx_num] return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False) - def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: + def _get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: if self._cache_all_tx_hashes: return [None if tx_num > self.db_tx_count else self.total_transactions[tx_num] for tx_num in tx_nums] return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False) + async def get_tx_hashes(self, tx_nums: List[int]) -> List[Optional[bytes]]: + if self._cache_all_tx_hashes: + result = [] + append_result = result.append + for tx_num in tx_nums: + append_result(None if tx_num > self.db_tx_count else self.total_transactions[tx_num]) + await asyncio.sleep(0) + return result + + def _get_tx_hashes(): + return self.prefix_db.tx_hash.multi_get([(tx_num,) for tx_num in tx_nums], deserialize_value=False) + + return await asyncio.get_event_loop().run_in_executor(self._executor, _get_tx_hashes) + def get_raw_mempool_tx(self, tx_hash: bytes) -> Optional[bytes]: return self.prefix_db.mempool_tx.get(tx_hash, deserialize_value=False) @@ -1159,7 +1173,7 @@ class HubDB: raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') return [self.coin.header_hash(header) for header in self.headers[height:height + count]] - def read_history(self, hashX: bytes, limit: int = 1000) -> List[int]: + def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: txs = [] txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): @@ -1168,6 +1182,9 @@ class HubDB: break return txs + async def read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: + return await asyncio.get_event_loop().run_in_executor(self._executor, self._read_history, hashX, limit) + async def limited_history(self, hashX, *, limit=1000): """Return an unpruned, sorted list of (tx_hash, height) tuples of confirmed transactions that touched the address, earliest in @@ -1176,13 +1193,12 @@ class HubDB: limit to None to get them all. """ run_in_executor = asyncio.get_event_loop().run_in_executor - tx_nums = await run_in_executor(self._executor, self.read_history, hashX, limit) + tx_nums = await run_in_executor(self._executor, self._read_history, hashX, limit) history = [] append_history = history.append while tx_nums: batch, tx_nums = tx_nums[:100], tx_nums[100:] - batch_result = self.get_tx_hashes(batch) if self._cache_all_tx_hashes else await run_in_executor(self._executor, self.get_tx_hashes, batch) - for tx_num, tx_hash in zip(batch, batch_result): + for tx_num, tx_hash in zip(batch, await self.get_tx_hashes(batch)): append_history((tx_hash, bisect_right(self.tx_counts, tx_num))) await asyncio.sleep(0) return history diff --git a/hub/herald/service.py b/hub/herald/service.py index 64f9415..df7bedb 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -46,8 +46,29 @@ class HubServerService(BlockchainReaderService): def advance(self, height: int): super().advance(height) touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs + self.session_manager.update_history_caches(touched_hashXs) self.notifications_to_send.append((set(touched_hashXs), height)) + def unwind(self): + self.session_manager.hashX_raw_history_cache.clear() + self.session_manager.hashX_history_cache.clear() + prev_count = self.db.tx_counts.pop() + tx_count = self.db.tx_counts[-1] + self.db.headers.pop() + self.db.block_hashes.pop() + current_count = prev_count + for _ in range(prev_count - tx_count): + if current_count in self.session_manager.history_tx_info_cache: + self.session_manager.history_tx_info_cache.pop(current_count) + current_count -= 1 + if self.db._cache_all_tx_hashes: + for _ in range(prev_count - tx_count): + tx_hash = self.db.tx_num_mapping.pop(self.db.total_transactions.pop()) + if tx_hash in self.db.tx_cache: + self.db.tx_cache.pop(tx_hash) + assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" + self.db.merkle_cache.clear() + def _detect_changes(self): super()._detect_changes() start = time.perf_counter() diff --git a/hub/herald/session.py b/hub/herald/session.py index 4907c87..83745a0 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -1,5 +1,5 @@ import os -import ssl +import sys import math import time import codecs @@ -21,8 +21,9 @@ from hub import __version__, PROMETHEUS_NAMESPACE from hub.herald import PROTOCOL_MIN, PROTOCOL_MAX, HUB_PROTOCOL_VERSION from hub.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from hub.herald.search import SearchIndex -from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time +from hub.common import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, version_string, formatted_time, SIZE_BUCKETS from hub.common import protocol_version, RPCError, DaemonError, TaskGroup, HISTOGRAM_BUCKETS +from hub.common import LRUCacheWithMetrics from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, JSONRPC from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer @@ -32,6 +33,8 @@ if typing.TYPE_CHECKING: from hub.scribe.daemon import LBCDaemon from hub.herald.mempool import HubMemPool +PYTHON_VERSION = sys.version_info.major, sys.version_info.minor +TypedDict = dict if PYTHON_VERSION < (3, 8) else typing.TypedDict BAD_REQUEST = 1 DAEMON_ERROR = 2 @@ -42,6 +45,11 @@ SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' 'required_names other_names') +class CachedAddressHistoryItem(TypedDict): + tx_hash: str + height: int + + def scripthash_to_hashX(scripthash: str) -> bytes: try: bin_hash = hex_str_to_hash(scripthash) @@ -146,7 +154,6 @@ class SessionManager: pending_query_metric = Gauge( "pending_queries_count", "Number of pending and running sqlite queries", namespace=NAMESPACE ) - client_version_metric = Counter( "clients", "Number of connections received per client version", namespace=NAMESPACE, labelnames=("version",) @@ -155,6 +162,14 @@ class SessionManager: "address_history", "Time to fetch an address history", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) + address_subscription_metric = Gauge( + "address_subscriptions", "Number of subscribed addresses", + namespace=NAMESPACE + ) + address_history_size_metric = Histogram( + "history_size", "Sizes of histories for subscribed addresses", + namespace=NAMESPACE, buckets=SIZE_BUCKETS + ) notifications_in_flight_metric = Gauge( "notifications_in_flight", "Count of notifications in flight", namespace=NAMESPACE @@ -183,7 +198,6 @@ class SessionManager: self.cur_group = SessionGroup(0) self.txs_sent = 0 self.start_time = time.time() - self.history_cache = {} self.resolve_outputs_cache = {} self.resolve_cache = {} self.notified_height: typing.Optional[int] = None @@ -198,12 +212,50 @@ class SessionManager: elastic_host=env.elastic_host, elastic_port=env.elastic_port ) self.running = False + # hashX: List[int] + self.hashX_raw_history_cache = LRUCacheWithMetrics(2 ** 16, metric_name='raw_history', namespace=NAMESPACE) + # hashX: List[CachedAddressHistoryItem] + self.hashX_history_cache = LRUCacheWithMetrics(2 ** 14, metric_name='full_history', namespace=NAMESPACE) + # tx_num: Tuple[txid, height] + self.history_tx_info_cache = LRUCacheWithMetrics(2 ** 19, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): - self.history_cache.clear() self.resolve_outputs_cache.clear() self.resolve_cache.clear() + def update_history_caches(self, touched_hashXs: typing.List[bytes]): + update_history_cache = {} + for hashX in set(touched_hashXs): + history_tx_nums = None + # if the history is the raw_history_cache, update it + # TODO: use a reversed iterator for this instead of rescanning it all + if hashX in self.hashX_raw_history_cache: + self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None) + # if it's in hashX_history_cache, prepare to update it in a batch + if hashX in self.hashX_history_cache: + full_cached = self.hashX_history_cache[hashX] + if history_tx_nums is None: + history_tx_nums = self.db._read_history(hashX, None) + new_txs = history_tx_nums[len(full_cached):] + update_history_cache[hashX] = full_cached, new_txs + if update_history_cache: + # get the set of new tx nums that were touched in all of the new histories to be cached + total_tx_nums = set() + for _, new_txs in update_history_cache.values(): + total_tx_nums.update(new_txs) + total_tx_nums = list(total_tx_nums) + # collect the total new tx infos + referenced_new_txs = { + tx_num: (CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num))) + for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums)) + } + # update the cached history lists + get_referenced = referenced_new_txs.__getitem__ + for hashX, (full, new_txs) in update_history_cache.items(): + append_to_full = full.append + for tx_num in new_txs: + append_to_full(get_referenced(tx_num)) + async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -592,16 +644,48 @@ class SessionManager: self.txs_sent += 1 return hex_hash - async def limited_history(self, hashX): - """A caching layer.""" - if hashX not in self.history_cache: - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. - limit = self.env.max_send // 97 - self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) - return self.history_cache[hashX] + async def _cached_raw_history(self, hashX: bytes, limit: typing.Optional[int] = None): + tx_nums = self.hashX_raw_history_cache.get(hashX) + if tx_nums is None: + self.hashX_raw_history_cache[hashX] = tx_nums = await self.db.read_history(hashX, limit) + return tx_nums + + async def cached_confirmed_history(self, hashX: bytes, + limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]: + cached_full_history = self.hashX_history_cache.get(hashX) + # return the cached history + if cached_full_history is not None: + self.address_history_size_metric.observe(len(cached_full_history)) + return cached_full_history + # return the history and update the caches + tx_nums = await self._cached_raw_history(hashX, limit) + needed_tx_infos = [] + append_needed_tx_info = needed_tx_infos.append + tx_infos = {} + for cnt, tx_num in enumerate(tx_nums): # determine which tx_hashes are cached and which we need to look up + cached = self.history_tx_info_cache.get(tx_num) + if cached is not None: + tx_infos[tx_num] = cached + else: + append_needed_tx_info(tx_num) + if cnt % 1000 == 0: + await asyncio.sleep(0) + if needed_tx_infos: # request all the needed tx hashes in one batch, cache the txids and heights + for cnt, (tx_num, tx_hash) in enumerate(zip(needed_tx_infos, await self.db.get_tx_hashes(needed_tx_infos))): + hist = CachedAddressHistoryItem(tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num)) + tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = hist + if cnt % 1000 == 0: + await asyncio.sleep(0) + # ensure the ordering of the txs + history = [] + history_append = history.append + for cnt, tx_num in enumerate(tx_nums): + history_append(tx_infos[tx_num]) + if cnt % 1000 == 0: + await asyncio.sleep(0) + self.hashX_history_cache[hashX] = history + self.address_history_size_metric.observe(len(history)) + return history def _notify_peer(self, peer): notify_tasks = [ @@ -623,6 +707,7 @@ class SessionManager: def remove_session(self, session): """Remove a session from our sessions list if there.""" session_id = id(session) + self.address_subscription_metric.dec(len(session.hashX_subs)) for hashX in session.hashX_subs: sessions = self.hashx_subscriptions_by_session[hashX] sessions.remove(session_id) @@ -1348,6 +1433,8 @@ class LBRYElectrumX(asyncio.Protocol): sessions.remove(id(self)) except KeyError: pass + else: + self.session_manager.address_subscription_metric.dec() if not sessions: self.hashX_subs.pop(hashX, None) @@ -1385,6 +1472,7 @@ class LBRYElectrumX(asyncio.Protocol): if len(addresses) > 1000: raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') results = [] + self.session_manager.address_subscription_metric.inc(len(addresses)) for address in addresses: results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) await asyncio.sleep(0) @@ -1418,10 +1506,8 @@ class LBRYElectrumX(asyncio.Protocol): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.session_manager.limited_history(hashX) - conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} - for tx_hash, height in history] - return conf + self.unconfirmed_history(hashX) + history = await self.session_manager.cached_confirmed_history(hashX) + return history + self.unconfirmed_history(hashX) async def scripthash_get_history(self, scripthash): """Return the confirmed and unconfirmed history of a scripthash.""" @@ -1443,6 +1529,7 @@ class LBRYElectrumX(asyncio.Protocol): scripthash: the SHA256 hash of the script to subscribe to""" hashX = scripthash_to_hashX(scripthash) + self.session_manager.address_subscription_metric.inc() return await self.hashX_subscribe(hashX, scripthash) async def scripthash_unsubscribe(self, scripthash: str): diff --git a/hub/scribe/service.py b/hub/scribe/service.py index b22451c..74bd73c 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -1245,7 +1245,7 @@ class BlockchainProcessorService(BlockchainService): if hashX in self.hashX_full_cache: return self.hashX_full_cache[hashX] if hashX not in self.hashX_history_cache: - self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None) + self.hashX_history_cache[hashX] = tx_nums = self.db._read_history(hashX, limit=None) else: tx_nums = self.hashX_history_cache[hashX] needed_tx_infos = [] @@ -1257,7 +1257,7 @@ class BlockchainProcessorService(BlockchainService): else: append_needed_tx_info(tx_num) if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' history = '' @@ -1487,7 +1487,7 @@ class BlockchainProcessorService(BlockchainService): else: append_needed_tx_info(tx_num) if needed_tx_infos: - for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)): + for tx_num, tx_hash in zip(needed_tx_infos, self.db._get_tx_hashes(needed_tx_infos)): tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:' tx_infos[tx_num] = tx_info self.history_tx_info_cache[tx_num] = tx_info