remove redundant hashX_history_cache
-remove redundant cache that used a lot of memory
This commit is contained in:
parent
34c5ab2e56
commit
570bda9c8b
2 changed files with 4 additions and 38 deletions
|
@ -62,7 +62,6 @@ class HubServerService(BlockchainReaderService):
|
||||||
|
|
||||||
def unwind(self):
|
def unwind(self):
|
||||||
self.session_manager.hashX_raw_history_cache.clear()
|
self.session_manager.hashX_raw_history_cache.clear()
|
||||||
self.session_manager.hashX_history_cache.clear()
|
|
||||||
prev_count = self.db.tx_counts.pop()
|
prev_count = self.db.tx_counts.pop()
|
||||||
tx_count = self.db.tx_counts[-1]
|
tx_count = self.db.tx_counts[-1]
|
||||||
self.db.block_hashes.pop()
|
self.db.block_hashes.pop()
|
||||||
|
|
|
@ -215,47 +215,20 @@ class SessionManager:
|
||||||
self.running = False
|
self.running = False
|
||||||
# hashX: List[int]
|
# hashX: List[int]
|
||||||
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
|
self.hashX_raw_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='raw_history', namespace=NAMESPACE)
|
||||||
# hashX: List[CachedAddressHistoryItem]
|
# tx_num: CachedAddressHistoryItem
|
||||||
self.hashX_history_cache = LFUCacheWithMetrics(env.hashX_history_cache_size, metric_name='full_history', namespace=NAMESPACE)
|
|
||||||
# tx_num: Tuple[txid, height]
|
|
||||||
self.history_tx_info_cache = LFUCacheWithMetrics(env.history_tx_cache_size, metric_name='history_tx', namespace=NAMESPACE)
|
self.history_tx_info_cache = LFUCacheWithMetrics(env.history_tx_cache_size, metric_name='history_tx', namespace=NAMESPACE)
|
||||||
|
|
||||||
def clear_caches(self):
|
def clear_caches(self):
|
||||||
self.resolve_cache.clear()
|
self.resolve_cache.clear()
|
||||||
|
|
||||||
def update_history_caches(self, touched_hashXs: typing.List[bytes]):
|
def update_history_caches(self, touched_hashXs: typing.List[bytes]):
|
||||||
update_history_cache = {}
|
# update_history_cache = {}
|
||||||
for hashX in set(touched_hashXs):
|
for hashX in set(touched_hashXs):
|
||||||
history_tx_nums = None
|
# history_tx_nums = None
|
||||||
# if the history is the raw_history_cache, update it
|
# if the history is the raw_history_cache, update it
|
||||||
# TODO: use a reversed iterator for this instead of rescanning it all
|
# TODO: use a reversed iterator for this instead of rescanning it all
|
||||||
if hashX in self.hashX_raw_history_cache:
|
if hashX in self.hashX_raw_history_cache:
|
||||||
self.hashX_raw_history_cache[hashX] = history_tx_nums = self.db._read_history(hashX, None)
|
self.hashX_raw_history_cache[hashX] = self.db._read_history(hashX, None)
|
||||||
# if it's in hashX_history_cache, prepare to update it in a batch
|
|
||||||
if hashX in self.hashX_history_cache:
|
|
||||||
full_cached = self.hashX_history_cache[hashX]
|
|
||||||
if history_tx_nums is None:
|
|
||||||
history_tx_nums = self.db._read_history(hashX, None)
|
|
||||||
new_txs = history_tx_nums[len(full_cached):]
|
|
||||||
update_history_cache[hashX] = full_cached, new_txs
|
|
||||||
if update_history_cache:
|
|
||||||
# get the set of new tx nums that were touched in all of the new histories to be cached
|
|
||||||
total_tx_nums = set()
|
|
||||||
for _, new_txs in update_history_cache.values():
|
|
||||||
total_tx_nums.update(new_txs)
|
|
||||||
total_tx_nums = list(total_tx_nums)
|
|
||||||
# collect the total new tx infos
|
|
||||||
referenced_new_txs = {
|
|
||||||
tx_num: (CachedAddressHistoryItem(
|
|
||||||
tx_hash=tx_hash[::-1].hex(), height=bisect_right(self.db.tx_counts, tx_num)
|
|
||||||
)) for tx_num, tx_hash in zip(total_tx_nums, self.db._get_tx_hashes(total_tx_nums))
|
|
||||||
}
|
|
||||||
# update the cached history lists
|
|
||||||
get_referenced = referenced_new_txs.__getitem__
|
|
||||||
for hashX, (full, new_txs) in update_history_cache.items():
|
|
||||||
append_to_full = full.append
|
|
||||||
for tx_num in new_txs:
|
|
||||||
append_to_full(get_referenced(tx_num))
|
|
||||||
|
|
||||||
async def _start_server(self, kind, *args, **kw_args):
|
async def _start_server(self, kind, *args, **kw_args):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
@ -653,11 +626,6 @@ class SessionManager:
|
||||||
|
|
||||||
async def cached_confirmed_history(self, hashX: bytes,
|
async def cached_confirmed_history(self, hashX: bytes,
|
||||||
limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]:
|
limit: typing.Optional[int] = None) -> typing.List[CachedAddressHistoryItem]:
|
||||||
cached_full_history = self.hashX_history_cache.get(hashX)
|
|
||||||
# return the cached history
|
|
||||||
if cached_full_history is not None:
|
|
||||||
self.address_history_size_metric.observe(len(cached_full_history))
|
|
||||||
return cached_full_history
|
|
||||||
# return the history and update the caches
|
# return the history and update the caches
|
||||||
tx_nums = await self._cached_raw_history(hashX, limit)
|
tx_nums = await self._cached_raw_history(hashX, limit)
|
||||||
needed_tx_infos = []
|
needed_tx_infos = []
|
||||||
|
@ -684,7 +652,6 @@ class SessionManager:
|
||||||
history_append(tx_infos[tx_num])
|
history_append(tx_infos[tx_num])
|
||||||
if cnt % 1000 == 0:
|
if cnt % 1000 == 0:
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
self.hashX_history_cache[hashX] = history
|
|
||||||
self.address_history_size_metric.observe(len(history))
|
self.address_history_size_metric.observe(len(history))
|
||||||
return history
|
return history
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue