only notify hashxs touched since last notification

This commit is contained in:
Jack Robison 2020-12-14 13:42:20 -05:00
parent 751cc4c44d
commit 20dad7f07f
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 27 additions and 14 deletions

View file

@ -216,6 +216,7 @@ class BlockProcessor:
for cache in self.search_cache.values(): for cache in self.search_cache.values():
cache.clear() cache.clear()
self.history_cache.clear() self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
await self._maybe_flush() await self._maybe_flush()
processed_time = time.perf_counter() - start processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height) self.block_count_metric.set(self.height)

View file

@ -72,7 +72,7 @@ class MemPoolAPI(ABC):
""" """
@abstractmethod @abstractmethod
async def on_mempool(self, touched, height): async def on_mempool(self, touched, new_touched, height):
"""Called each time the mempool is synchronized. touched is a set of """Called each time the mempool is synchronized. touched is a set of
hashXs touched since the previous call. height is the hashXs touched since the previous call. height is the
daemon's height at the time the mempool was obtained.""" daemon's height at the time the mempool was obtained."""
@ -116,6 +116,7 @@ class MemPool:
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.wakeup = asyncio.Event() self.wakeup = asyncio.Event()
self.mempool_process_time_metric = mempool_process_time_metric self.mempool_process_time_metric = mempool_process_time_metric
self.notified_mempool_txs = set()
async def _logging(self, synchronized_event): async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats.""" """Print regular logs of mempool stats."""
@ -219,10 +220,15 @@ class MemPool:
continue continue
hashes = {hex_str_to_hash(hh) for hh in hex_hashes} hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
async with self.lock: async with self.lock:
new_hashes = hashes.difference(self.notified_mempool_txs)
touched = await self._process_mempool(hashes) touched = await self._process_mempool(hashes)
self.notified_mempool_txs.update(new_hashes)
new_touched = {
touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes)
}
synchronized_event.set() synchronized_event.set()
synchronized_event.clear() synchronized_event.clear()
await self.api.on_mempool(touched, height) await self.api.on_mempool(touched, new_touched, height)
duration = time.perf_counter() - start duration = time.perf_counter() - start
self.mempool_process_time_metric.observe(duration) self.mempool_process_time_metric.observe(duration)
try: try:
@ -236,7 +242,8 @@ class MemPool:
async def _process_mempool(self, all_hashes): async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes # Re-sync with the new set of hashes
txs = self.txs txs = self.txs
hashXs = self.hashXs
hashXs = self.hashXs # hashX: [tx_hash, ...]
touched = set() touched = set()
# First handle txs that have disappeared # First handle txs that have disappeared
@ -267,8 +274,8 @@ class MemPool:
# FIXME: this is not particularly efficient # FIXME: this is not particularly efficient
while tx_map and len(tx_map) != prior_count: while tx_map and len(tx_map) != prior_count:
prior_count = len(tx_map) prior_count = len(tx_map)
tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched)
touched)
if tx_map: if tx_map:
self.logger.info(f'{len(tx_map)} txs dropped') self.logger.info(f'{len(tx_map)} txs dropped')

View file

@ -25,9 +25,10 @@ class Notifications:
def __init__(self): def __init__(self):
self._touched_mp = {} self._touched_mp = {}
self._touched_bp = {} self._touched_bp = {}
self.notified_mempool_txs = set()
self._highest_block = -1 self._highest_block = -1
async def _maybe_notify(self): async def _maybe_notify(self, new_touched):
tmp, tbp = self._touched_mp, self._touched_bp tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp) common = set(tmp).intersection(tbp)
if common: if common:
@ -44,24 +45,24 @@ class Notifications:
del tmp[old] del tmp[old]
for old in [h for h in tbp if h <= height]: for old in [h for h in tbp if h <= height]:
touched.update(tbp.pop(old)) touched.update(tbp.pop(old))
await self.notify(height, touched) await self.notify(height, touched, new_touched)
async def notify(self, height, touched): async def notify(self, height, touched, new_touched):
pass pass
async def start(self, height, notify_func): async def start(self, height, notify_func):
self._highest_block = height self._highest_block = height
self.notify = notify_func self.notify = notify_func
await self.notify(height, set()) await self.notify(height, set(), set())
async def on_mempool(self, touched, height): async def on_mempool(self, touched, new_touched, height):
self._touched_mp[height] = touched self._touched_mp[height] = touched
await self._maybe_notify() await self._maybe_notify(new_touched)
async def on_block(self, touched, height): async def on_block(self, touched, height):
self._touched_bp[height] = touched self._touched_bp[height] = touched
self._highest_block = height self._highest_block = height
await self._maybe_notify() await self._maybe_notify(set())
class Server: class Server:
@ -84,9 +85,12 @@ class Server:
notifications.mempool_hashes = daemon.mempool_hashes notifications.mempool_hashes = daemon.mempool_hashes
notifications.raw_transactions = daemon.getrawtransactions notifications.raw_transactions = daemon.getrawtransactions
notifications.lookup_utxos = db.lookup_utxos notifications.lookup_utxos = db.lookup_utxos
MemPoolAPI.register(Notifications) MemPoolAPI.register(Notifications)
self.mempool = mempool = MemPool(env.coin, notifications) self.mempool = mempool = MemPool(env.coin, notifications)
notifications.notified_mempool_txs = self.mempool.notified_mempool_txs
self.session_mgr = env.coin.SESSION_MANAGER( self.session_mgr = env.coin.SESSION_MANAGER(
env, db, bp, daemon, mempool, self.shutdown_event env, db, bp, daemon, mempool, self.shutdown_event
) )

View file

@ -635,7 +635,7 @@ class SessionManager:
self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
return self.history_cache[hashX] return self.history_cache[hashX]
async def _notify_sessions(self, height, touched): async def _notify_sessions(self, height, touched, new_touched):
"""Notify sessions about height changes and touched addresses.""" """Notify sessions about height changes and touched addresses."""
height_changed = height != self.notified_height height_changed = height != self.notified_height
if height_changed: if height_changed:
@ -660,7 +660,8 @@ class SessionManager:
if touched or (height_changed and self.mempool_statuses): if touched or (height_changed and self.mempool_statuses):
notified_hashxs = 0 notified_hashxs = 0
notified_sessions = 0 notified_sessions = 0
for hashX in touched.union(self.mempool_statuses.keys()): to_notify = touched.union(self.mempool_statuses.keys() if height_changed else new_touched.intersection(self.mempool_statuses.keys()))
for hashX in to_notify:
for session_id in self.hashx_subscriptions_by_session[hashX]: for session_id in self.hashx_subscriptions_by_session[hashX]:
asyncio.create_task(self.sessions[session_id].send_history_notification(hashX)) asyncio.create_task(self.sessions[session_id].send_history_notification(hashX))
notified_sessions += 1 notified_sessions += 1