From 570bda9c8b1b132c1c3ab077e48e7db7d49f80b4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 18 Aug 2022 18:06:12 -0400 Subject: [PATCH] remove redundant hashX_history_cache -remove redundant cache that used a lot of memory --- hub/herald/service.py | 1 - hub/herald/session.py | 41 ++++------------------------------------- 2 files changed, 4 insertions(+), 38 deletions(-) diff --git a/hub/herald/service.py b/hub/herald/service.py index 3a2a71c..bffe7c9 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -62,7 +62,6 @@ class HubServerService(BlockchainReaderService): def unwind(self): self.session_manager.hashX_raw_history_cache.clear() - self.session_manager.hashX_history_cache.clear() prev_count = self.db.tx_counts.pop() tx_count = self.db.tx_counts[-1] self.db.block_hashes.pop() diff --git a/hub/herald/session.py b/hub/herald/session.py index 99e29e5..fb702c2 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -215,47 +215,20 @@ class SessionManager: self.running = False # hashX: List[int] self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE) - # hashX: List[CachedAddressHistoryItem] - self.hashX_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='full_history', namespace=NAMESPACE) - # tx_num: Tuple[txid, height] + # tx_num: CachedAddressHistoryItem self.history_tx_info_cache = LFUCacheWithMetrics(env.history_tx_cache_size, metric_name='history_tx', namespace=NAMESPACE) def clear_caches(self): self.resolve_cache.clear() def update_history_caches(self, touched_hashXs: typing.List[bytes]): - update_history_cache = {} + # update_history_cache = {} for hashX in set(touched_hashXs): - history_tx_nums = None + # history_tx_nums = None # if the history is the raw_history_cache, update it # TODO: use a reversed iterator for this instead of rescanning it all if hashX in self.hashX_raw_history_cache: - self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None) - # if it's in hashX_history_cache, prepare to update it in a batch - if hashX in self.hashX_history_cache: - full_cached = self.hashX_history_cache[hashX] - if history_tx_nums is None: - history_tx_nums = self.db._read_history(hashX, None) - new_txs = history_tx_nums[len(full_cached):] - update_history_cache[hashX] = full_cached, new_txs - if update_history_cache: - # get the set of new tx nums that were touched in all of the new histories to be cached - total_tx_nums = set() - for _, new_txs in update_history_cache.values(): - total_tx_nums.update(new_txs) - total_tx_nums = list(total_tx_nums) - # collect the total new tx infos - referenced_new_txs = { - tx_num: (CachedAddressHistoryItem( - tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num) - )) for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums)) - } - # update the cached history lists - get_referenced = referenced_new_txs.__getitem__ - for hashX, (full, new_txs) in update_history_cache.items(): - append_to_full = full.append - for tx_num in new_txs: - append_to_full(get_referenced(tx_num)) + self.hashX_raw_history_cache[hashX] = self.db._read_history(hashX, None) async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -653,11 +626,6 @@ class SessionManager: async def cached_confirmed_history(self, hashX: bytes, limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]: - cached_full_history = self.hashX_history_cache.get(hashX) - # return the cached history - if cached_full_history is not None: - self.address_history_size_metric.observe(len(cached_full_history)) - return cached_full_history # return the history and update the caches tx_nums = await self._cached_raw_history(hashX, limit) needed_tx_infos = [] @@ -684,7 +652,6 @@ class SessionManager: history_append(tx_infos[tx_num]) if cnt % 1000 == 0: await asyncio.sleep(0) - self.hashX_history_cache[hashX] = history self.address_history_size_metric.observe(len(history)) return history