From 499ee74dfc22b803c597f0755a5515fb6db29e07 Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Thu, 6 Jan 2022 12:39:24 -0500
Subject: [PATCH] add chain reader and reader server, new mempool, update block
 processor

---
 lbry/wallet/server/block_processor.py | 126 ++++----
 lbry/wallet/server/chain_reader.py    | 243 ++++++++++++++++
 lbry/wallet/server/coin.py            |   2 -
 lbry/wallet/server/mempool.py         | 398 +++++++++-----------------
 lbry/wallet/server/server.py          |  94 ------
 lbry/wallet/server/session.py         | 220 +++++++-------
 6 files changed, 563 insertions(+), 520 deletions(-)
 create mode 100644 lbry/wallet/server/chain_reader.py
 delete mode 100644 lbry/wallet/server/server.py

diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py
index 827f654db..f988b15aa 100644
--- a/lbry/wallet/server/block_processor.py
+++ b/lbry/wallet/server/block_processor.py
@@ -92,7 +92,6 @@ class BlockProcessor:
         )
         self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
         self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
-        self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
         self.shutdown_event = asyncio.Event()
         self.coin = env.coin
         if env.coin.NET == 'mainnet':
@@ -118,14 +117,13 @@ class BlockProcessor:
         self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {}
 
         # Claimtrie cache
-        self.db_op_stack: Optional[RevertableOpStack] = None
+        self.db_op_stack: Optional['RevertableOpStack'] = None
 
         # self.search_cache = {}
         self.resolve_cache = LRUCache(2**16)
         self.resolve_outputs_cache = LRUCache(2 ** 16)
 
         self.history_cache = {}
-        self.status_server = StatusServer()
 
         #################################
         # attributes used for calculating stake activations and takeovers per block
@@ -162,7 +160,6 @@ class BlockProcessor:
 
         self.removed_claims_to_send_es = set()  # cumulative changes across blocks to send ES
         self.touched_claims_to_send_es = set()
-        self.activation_info_to_send_es: DefaultDict[str, List[TrendingNotification]] = defaultdict(list)
 
         self.removed_claim_hashes: Set[bytes] = set()  # per block changes
         self.touched_claim_hashes: Set[bytes] = set()
@@ -251,29 +248,12 @@ class BlockProcessor:
                     await self.run_in_thread(self.advance_block, block)
                     await self.flush()
 
-                    self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
+                    self.logger.warning("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
                     if self.height == self.coin.nExtendedClaimExpirationForkHeight:
                         self.logger.warning(
                             "applying extended claim expiration fork on claims accepted by, %i", self.height
                         )
                         await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
-                    if self.db.first_sync:
-                        self.db.search_index.clear_caches()
-                        self.touched_claims_to_send_es.clear()
-                        self.removed_claims_to_send_es.clear()
-                        self.activation_info_to_send_es.clear()
-                # TODO: we shouldnt wait on the search index updating before advancing to the next block
-                if not self.db.first_sync:
-                    await self.db.reload_blocking_filtering_streams()
-                    await self.db.search_index.claim_consumer(self.claim_producer())
-                    await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
-                                                                 self.db.filtered_streams, self.db.filtered_channels)
-                    await self.db.search_index.update_trending_score(self.activation_info_to_send_es)
-                    await self._es_caught_up()
-                self.db.search_index.clear_caches()
-                self.touched_claims_to_send_es.clear()
-                self.removed_claims_to_send_es.clear()
-                self.activation_info_to_send_es.clear()
                 # print("******************\n")
             except:
                 self.logger.exception("advance blocks failed")
@@ -281,12 +261,9 @@ class BlockProcessor:
             processed_time = time.perf_counter() - total_start
             self.block_count_metric.set(self.height)
             self.block_update_time_metric.observe(processed_time)
-            self.status_server.set_height(self.db.fs_height, self.db.db_tip)
             if not self.db.first_sync:
                 s = '' if len(blocks) == 1 else 's'
                 self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
-            if self._caught_up_event.is_set():
-                await self.mempool.on_block(self.touched_hashXs, self.height)
             self.touched_hashXs.clear()
         elif hprevs[0] != chain[0]:
             min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
@@ -309,15 +286,6 @@ class BlockProcessor:
 
                     if self.env.cache_all_claim_txos:
                         await self.db._read_claim_txos()  # TODO: don't do this
-                    for touched in self.touched_claims_to_send_es:
-                        if not self.db.get_claim_txo(touched):
-                            self.removed_claims_to_send_es.add(touched)
-                    self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es)
-                    await self.db.search_index.claim_consumer(self.claim_producer())
-                    self.db.search_index.clear_caches()
-                    self.touched_claims_to_send_es.clear()
-                    self.removed_claims_to_send_es.clear()
-                    self.activation_info_to_send_es.clear()
                 await self.prefetcher.reset_height(self.height)
                 self.reorg_count_metric.inc()
             except:
@@ -652,8 +620,6 @@ class BlockProcessor:
             self.support_txo_to_claim.pop(support_txo_to_clear)
         self.support_txos_by_claim[claim_hash].clear()
         self.support_txos_by_claim.pop(claim_hash)
-        if claim_hash.hex() in self.activation_info_to_send_es:
-            self.activation_info_to_send_es.pop(claim_hash.hex())
         if normalized_name.startswith('@'):  # abandon a channel, invalidate signatures
             self._invalidate_channel_signatures(claim_hash)
 
@@ -1226,10 +1192,6 @@ class BlockProcessor:
                     self.touched_claim_hashes.add(controlling.claim_hash)
                 self.touched_claim_hashes.add(winning)
 
-    def _add_claim_activation_change_notification(self, claim_id: str, height: int, prev_amount: int,
-                                                  new_amount: int):
-        self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, prev_amount, new_amount))
-
     def _get_cumulative_update_ops(self, height: int):
         # update the last takeover height for names with takeovers
         for name in self.taken_over_names:
@@ -1493,7 +1455,6 @@ class BlockProcessor:
         self.utxo_cache.clear()
         self.hashXs_by_tx.clear()
         self.history_cache.clear()
-        self.mempool.notified_mempool_txs.clear()
         self.removed_claim_hashes.clear()
         self.touched_claim_hashes.clear()
         self.pending_reposted.clear()
@@ -1518,8 +1479,8 @@ class BlockProcessor:
         self.logger.info("backup block %i", self.height)
         # Check and update self.tip
 
-        self.db.headers.pop()
         self.db.tx_counts.pop()
+        reverted_block_hash = self.coin.header_hash(self.db.headers.pop())
         self.tip = self.coin.header_hash(self.db.headers[-1])
         if self.env.cache_all_tx_hashes:
             while len(self.db.total_transactions) > self.db.tx_counts[-1]:
@@ -1613,21 +1574,32 @@ class BlockProcessor:
             self.touched_hashXs.add(hashX)
             return hashX
 
-    async def _process_prefetched_blocks(self):
+    async def process_blocks_and_mempool_forever(self):
         """Loop forever processing blocks as they arrive."""
         while True:
             if self.height == self.daemon.cached_height():
                 if not self._caught_up_event.is_set():
                     await self._first_caught_up()
                     self._caught_up_event.set()
-            await self.blocks_event.wait()
+            try:
+                await asyncio.wait_for(self.blocks_event.wait(), 0.25)
+            except asyncio.TimeoutError:
+                pass
             self.blocks_event.clear()
             blocks = self.prefetcher.get_prefetched_blocks()
-            try:
-                await self.check_and_advance_blocks(blocks)
-            except Exception:
-                self.logger.exception("error while processing txs")
-                raise
+            if not blocks:
+                try:
+                    await self.check_mempool()
+                    self.db.prefix_db.unsafe_commit()
+                except Exception:
+                    self.logger.exception("error while updating mempool txs")
+                    raise
+            else:
+                try:
+                    await self.check_and_advance_blocks(blocks)
+                except Exception:
+                    self.logger.exception("error while processing txs")
+                    raise
 
     async def _es_caught_up(self):
         self.db.es_sync_height = self.height
@@ -1659,6 +1631,13 @@ class BlockProcessor:
                              f'height {self.height:,d}, halting here.')
             self.shutdown_event.set()
 
+    async def open(self):
+        self.db.open_db()
+        self.height = self.db.db_height
+        self.tip = self.db.db_tip
+        self.tx_count = self.db.db_tx_count
+        await self.db.initialize_caches()
+
     async def fetch_and_process_blocks(self, caught_up_event):
         """Fetch, process and index blocks from the daemon.
 
@@ -1674,16 +1653,9 @@ class BlockProcessor:
 
         self._caught_up_event = caught_up_event
         try:
-            self.db.open_db()
-            self.height = self.db.db_height
-            self.tip = self.db.db_tip
-            self.tx_count = self.db.db_tx_count
-            self.status_server.set_height(self.db.fs_height, self.db.db_tip)
-            await self.db.initialize_caches()
-            await self.db.search_index.start()
             await asyncio.wait([
                 self.prefetcher.main_loop(self.height),
-                self._process_prefetched_blocks()
+                self.process_blocks_and_mempool_forever()
             ])
         except asyncio.CancelledError:
             raise
@@ -1691,9 +1663,47 @@ class BlockProcessor:
             self.logger.exception("Block processing failed!")
             raise
         finally:
-            self.status_server.stop()
             # Shut down block processing
             self.logger.info('closing the DB for a clean shutdown...')
             self._sync_reader_executor.shutdown(wait=True)
             self._chain_executor.shutdown(wait=True)
             self.db.close()
+
+    async def start(self):
+        env = self.env
+        min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
+        self.logger.info(f'software version: {lbry.__version__}')
+        self.logger.info(f'supported protocol versions: {min_str}-{max_str}')
+        self.logger.info(f'event loop policy: {env.loop_policy}')
+        self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
+
+        await self.daemon.height()
+
+        def _start_cancellable(run, *args):
+            _flag = asyncio.Event()
+            self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
+            return _flag.wait()
+
+        await _start_cancellable(self.fetch_and_process_blocks)
+
+    async def stop(self):
+        for task in reversed(self.cancellable_tasks):
+            task.cancel()
+        await asyncio.wait(self.cancellable_tasks)
+        self.shutdown_event.set()
+        await self.daemon.close()
+
+    def run(self):
+        loop = asyncio.get_event_loop()
+
+        def __exit():
+            raise SystemExit()
+        try:
+            loop.add_signal_handler(signal.SIGINT, __exit)
+            loop.add_signal_handler(signal.SIGTERM, __exit)
+            loop.run_until_complete(self.start())
+            loop.run_until_complete(self.shutdown_event.wait())
+        except (SystemExit, KeyboardInterrupt):
+            pass
+        finally:
+            loop.run_until_complete(self.stop())
diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py
new file mode 100644
index 000000000..5f4a31aad
--- /dev/null
+++ b/lbry/wallet/server/chain_reader.py
@@ -0,0 +1,243 @@
+import signal
+import logging
+import asyncio
+from concurrent.futures.thread import ThreadPoolExecutor
+import typing
+
+import lbry
+from lbry.wallet.server.mempool import MemPool
+from lbry.wallet.server.db.prefixes import DBState
+from lbry.wallet.server.udp import StatusServer
+from lbry.wallet.server.db.db import HubDB
+from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierClientProtocol
+from lbry.wallet.server.session import LBRYSessionManager
+from lbry.prometheus import PrometheusServer
+
+
+class BlockchainReader:
+    def __init__(self, env, secondary_name: str):
+        self.env = env
+        self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
+        self.shutdown_event = asyncio.Event()
+        self.cancellable_tasks = []
+
+        self.db = HubDB(
+            env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
+            secondary_name=secondary_name, max_open_files=-1
+        )
+        self.last_state: typing.Optional[DBState] = None
+        self._refresh_interval = 0.1
+
+    def _detect_changes(self):
+        try:
+            self.db.prefix_db.try_catch_up_with_primary()
+        except:
+            self.log.exception('failed to update secondary db')
+            raise
+        state = self.db.prefix_db.db_state.get()
+        if not state or state.height <= 0:
+            return
+        # if state and self.last_state and self.db.headers and self.last_state.tip == self.db.coin.header_hash(self.db.headers[-1]):
+        #     return
+        if self.last_state and self.last_state.height > state.height:  # FIXME: derp
+            self.log.debug("reorg detected, waiting until the writer has flushed the new blocks to advance")
+            return
+        last_height = 0 if not self.last_state else self.last_state.height
+        if self.last_state:
+            while True:
+                if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
+                    self.log.debug("connects to block %i", last_height)
+                    break
+                else:
+                    self.log.warning("disconnect block %i", last_height)
+                    self.unwind()
+                    last_height -= 1
+        self.db.read_db_state()
+        if not self.last_state or self.last_state.height < state.height:
+            for height in range(last_height + 1, state.height + 1):
+                self.log.warning("advancing to %i", height)
+                self.advance(height)
+            self.clear_caches()
+            self.last_state = state
+
+        # elif self.last_state and self.last_state.height > state.height:
+        #     last_height = self.last_state.height
+        #     for height in range(last_height, state.height, -1):
+        #         self.log.warning("unwind %i", height)
+        #         self.unwind()
+        #     self.clear_caches()
+        #     self.last_state = state
+        #     self.log.warning("unwound to %i", self.last_state.height)
+
+            # print("reader rewound to ", self.last_state.height)
+
+    async def poll_for_changes(self):
+        await asyncio.get_event_loop().run_in_executor(None, self._detect_changes)
+
+    async def refresh_blocks_forever(self, synchronized: asyncio.Event):
+        self.log.warning("start refresh blocks forever")
+        while True:
+            try:
+                await self.poll_for_changes()
+            except:
+                self.log.exception("boom")
+                raise
+            await asyncio.sleep(self._refresh_interval)
+            synchronized.set()
+
+    def clear_caches(self):
+        pass
+
+    def advance(self, height: int):
+        tx_count = self.db.prefix_db.tx_count.get(height).tx_count
+        assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
+        assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}"
+        self.db.tx_counts.append(tx_count)
+        self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False))
+
+    def unwind(self):
+        self.db.tx_counts.pop()
+        self.db.headers.pop()
+
+
+class BlockchainReaderServer(BlockchainReader):
+    def __init__(self, env):
+        super().__init__(env, 'lbry-reader')
+        self.history_cache = {}
+        self.resolve_outputs_cache = {}
+        self.resolve_cache = {}
+        self.notifications_to_send = []
+        self.status_server = StatusServer()
+        self.daemon = env.coin.DAEMON(env.coin, env.daemon_url)  # only needed for broadcasting txs
+        self.prometheus_server: typing.Optional[PrometheusServer] = None
+        self.mempool = MemPool(self.env.coin, self.db)
+        self.session_manager = LBRYSessionManager(
+            env, self.db, self.mempool, self.history_cache, self.resolve_cache,
+            self.resolve_outputs_cache, self.daemon,
+            self.shutdown_event,
+            on_available_callback=self.status_server.set_available,
+            on_unavailable_callback=self.status_server.set_unavailable
+        )
+        self.mempool.session_manager = self.session_manager
+        self.es_notifications = asyncio.Queue()
+        self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications)
+        self.synchronized = asyncio.Event()
+        self._es_height = None
+
+    def clear_caches(self):
+        self.history_cache.clear()
+        self.resolve_outputs_cache.clear()
+        self.resolve_cache.clear()
+        # self.clear_search_cache()
+        # self.mempool.notified_mempool_txs.clear()
+
+    def clear_search_cache(self):
+        self.session_manager.search_index.clear_caches()
+
+    def advance(self, height: int):
+        super().advance(height)
+        touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs
+        self.notifications_to_send.append((set(touched_hashXs), height))
+
+    def _detect_changes(self):
+        super()._detect_changes()
+        self.mempool.raw_mempool.clear()
+        self.mempool.raw_mempool.update(
+            {k.tx_hash: v.raw_tx for k, v in self.db.prefix_db.mempool_tx.iterate()}
+        )
+
+    async def poll_for_changes(self):
+        await super().poll_for_changes()
+        self.status_server.set_height(self.db.fs_height, self.db.db_tip)
+        if self.notifications_to_send:
+            for (touched, height) in self.notifications_to_send:
+                await self.mempool.on_block(touched, height)
+                self.log.warning("reader advanced to %i", height)
+                if self._es_height == self.db.db_height:
+                    self.synchronized.set()
+                # print("reader notified")
+        await self.mempool.refresh_hashes(self.db.db_height)
+        self.notifications_to_send.clear()
+
+    async def receive_es_notifications(self, synchronized: asyncio.Event):
+        await asyncio.get_event_loop().create_connection(
+            lambda: self.es_notification_client, '127.0.0.1', self.env.elastic_notifier_port
+        )
+        synchronized.set()
+        try:
+            while True:
+                self._es_height = await self.es_notifications.get()
+                self.clear_search_cache()
+                if self._es_height == self.db.db_height:
+                    self.synchronized.set()
+                    self.log.warning("es and reader are in sync")
+                else:
+                    self.log.warning("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height)
+        finally:
+            self.es_notification_client.close()
+
+    async def start(self):
+        env = self.env
+        min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
+        self.log.info(f'software version: {lbry.__version__}')
+        self.log.info(f'supported protocol versions: {min_str}-{max_str}')
+        self.log.info(f'event loop policy: {env.loop_policy}')
+        self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
+        await self.daemon.height()
+
+        def _start_cancellable(run, *args):
+            _flag = asyncio.Event()
+            self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
+            return _flag.wait()
+
+        self.db.open_db()
+        await self.db.initialize_caches()
+
+        self.last_state = self.db.read_db_state()
+
+        await self.start_prometheus()
+        if self.env.udp_port:
+            await self.status_server.start(
+                0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
+                self.env.host, self.env.udp_port, self.env.allow_lan_udp
+            )
+        await _start_cancellable(self.receive_es_notifications)
+        await _start_cancellable(self.refresh_blocks_forever)
+        await self.session_manager.search_index.start()
+        await _start_cancellable(self.session_manager.serve, self.mempool)
+
+    async def stop(self):
+        self.status_server.stop()
+        for task in reversed(self.cancellable_tasks):
+            task.cancel()
+        await asyncio.wait(self.cancellable_tasks)
+        self.session_manager.search_index.stop()
+        self.db.close()
+        if self.prometheus_server:
+            await self.prometheus_server.stop()
+            self.prometheus_server = None
+        self.shutdown_event.set()
+        await self.daemon.close()
+
+    def run(self):
+        loop = asyncio.get_event_loop()
+        executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
+        loop.set_default_executor(executor)
+
+        def __exit():
+            raise SystemExit()
+        try:
+            loop.add_signal_handler(signal.SIGINT, __exit)
+            loop.add_signal_handler(signal.SIGTERM, __exit)
+            loop.run_until_complete(self.start())
+            loop.run_until_complete(self.shutdown_event.wait())
+        except (SystemExit, KeyboardInterrupt):
+            pass
+        finally:
+            loop.run_until_complete(self.stop())
+            executor.shutdown(True)
+
+    async def start_prometheus(self):
+        if not self.prometheus_server and self.env.prometheus_port:
+            self.prometheus_server = PrometheusServer()
+            await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)
diff --git a/lbry/wallet/server/coin.py b/lbry/wallet/server/coin.py
index 059b95502..afe2a9423 100644
--- a/lbry/wallet/server/coin.py
+++ b/lbry/wallet/server/coin.py
@@ -13,7 +13,6 @@ from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_
 from lbry.wallet.server.daemon import Daemon, LBCDaemon
 from lbry.wallet.server.script import ScriptPubKey, OpCodes
 from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
-from lbry.wallet.server.block_processor import BlockProcessor
 
 
 Block = namedtuple("Block", "raw header transactions")
@@ -37,7 +36,6 @@ class Coin:
     SESSIONCLS = LBRYElectrumX
     DESERIALIZER = lib_tx.Deserializer
     DAEMON = Daemon
-    BLOCK_PROCESSOR = BlockProcessor
     SESSION_MANAGER = LBRYSessionManager
     HEADER_VALUES = [
         'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce'
diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py
index 27d7a2352..cecb9f299 100644
--- a/lbry/wallet/server/mempool.py
+++ b/lbry/wallet/server/mempool.py
@@ -11,14 +11,13 @@ import itertools
 import time
 import attr
 import typing
-from typing import Set, Optional, Callable, Awaitable
 from collections import defaultdict
 from prometheus_client import Histogram
-from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash
-from lbry.wallet.server.util import class_logger, chunks
-from lbry.wallet.server.leveldb import UTXO
+from lbry.wallet.server.util import class_logger
+
 if typing.TYPE_CHECKING:
     from lbry.wallet.server.session import LBRYSessionManager
+    from wallet.server.db.db import LevelDB
 
 
 @attr.s(slots=True)
@@ -50,70 +49,99 @@ mempool_process_time_metric = Histogram(
 
 
 class MemPool:
-    def __init__(self, coin, daemon, db, state_lock: asyncio.Lock, refresh_secs=1.0, log_status_secs=120.0):
+    def __init__(self, coin, db: 'LevelDB', refresh_secs=1.0):
         self.coin = coin
-        self._daemon = daemon
         self._db = db
-        self._touched_mp = {}
-        self._touched_bp = {}
-        self._highest_block = -1
-
         self.logger = class_logger(__name__, self.__class__.__name__)
         self.txs = {}
-        self.hashXs = defaultdict(set)  # None can be a key
-        self.cached_compact_histogram = []
+        self.raw_mempool = {}
+        self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set)  # None can be a key
         self.refresh_secs = refresh_secs
-        self.log_status_secs = log_status_secs
-        # Prevents mempool refreshes during fee histogram calculation
-        self.lock = state_lock
-        self.wakeup = asyncio.Event()
         self.mempool_process_time_metric = mempool_process_time_metric
-        self.notified_mempool_txs = set()
-        self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None
+        self.session_manager: typing.Optional['LBRYSessionManager'] = None
 
-    async def _logging(self, synchronized_event):
-        """Print regular logs of mempool stats."""
-        self.logger.info('beginning processing of daemon mempool.  '
-                         'This can take some time...')
+    async def refresh_hashes(self, height: int):
         start = time.perf_counter()
-        await synchronized_event.wait()
-        elapsed = time.perf_counter() - start
-        self.logger.info(f'synced in {elapsed:.2f}s')
-        while True:
-            self.logger.info(f'{len(self.txs):,d} txs '
-                             f'touching {len(self.hashXs):,d} addresses')
-            await asyncio.sleep(self.log_status_secs)
-            await synchronized_event.wait()
+        new_touched = await self._process_mempool()
+        await self.on_mempool(set(self.touched_hashXs), new_touched, height)
+        duration = time.perf_counter() - start
+        self.mempool_process_time_metric.observe(duration)
 
-    def _accept_transactions(self, tx_map, utxo_map, touched):
-        """Accept transactions in tx_map to the mempool if all their inputs
-        can be found in the existing mempool or a utxo_map from the
-        DB.
+    async def _process_mempool(self) -> typing.Set[bytes]:  # returns list of new touched hashXs
+        # Re-sync with the new set of hashes
 
-        Returns an (unprocessed tx_map, unspent utxo_map) pair.
-        """
-        hashXs = self.hashXs
-        txs = self.txs
+        # hashXs = self.hashXs  # hashX: [tx_hash, ...]
+        touched_hashXs = set()
 
-        deferred = {}
-        unspent = set(utxo_map)
-        # Try to find all prevouts so we can accept the TX
-        for hash, tx in tx_map.items():
-            in_pairs = []
-            try:
-                for prevout in tx.prevouts:
-                    utxo = utxo_map.get(prevout)
-                    if not utxo:
-                        prev_hash, prev_index = prevout
-                        # Raises KeyError if prev_hash is not in txs
-                        utxo = txs[prev_hash].out_pairs[prev_index]
-                    in_pairs.append(utxo)
-            except KeyError:
-                deferred[hash] = tx
+        # Remove txs that aren't in mempool anymore
+        for tx_hash in set(self.txs).difference(self.raw_mempool.keys()):
+            tx = self.txs.pop(tx_hash)
+            tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs})
+            for hashX in tx_hashXs:
+                if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]:
+                    self.touched_hashXs[hashX].remove(tx_hash)
+                    if not self.touched_hashXs[hashX]:
+                        self.touched_hashXs.pop(hashX)
+            touched_hashXs.update(tx_hashXs)
+
+        tx_map = {}
+        for tx_hash, raw_tx in self.raw_mempool.items():
+            if tx_hash in self.txs:
                 continue
+            tx, tx_size = self.coin.DESERIALIZER(raw_tx).read_tx_and_vsize()
+            # Convert the inputs and outputs into (hashX, value) pairs
+            # Drop generation-like inputs from MemPoolTx.prevouts
+            txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
+                               for txin in tx.inputs
+                               if not txin.is_generation())
+            txout_pairs = tuple((self.coin.hashX_from_script(txout.pk_script), txout.value)
+                                for txout in tx.outputs)
 
-            # Spend the prevouts
-            unspent.difference_update(tx.prevouts)
+            tx_map[tx_hash] = MemPoolTx(txin_pairs, None, txout_pairs, 0, tx_size, raw_tx)
+
+        # Determine all prevouts not in the mempool, and fetch the
+        # UTXO information from the database.  Failed prevout lookups
+        # return None - concurrent database updates happen - which is
+        # relied upon by _accept_transactions. Ignore prevouts that are
+        # generation-like.
+        # prevouts = tuple(prevout for tx in tx_map.values()
+        #                  for prevout in tx.prevouts
+        #                  if prevout[0] not in self.raw_mempool)
+        # utxos = await self._db.lookup_utxos(prevouts)
+        # utxo_map = dict(zip(prevouts, utxos))
+        # unspent = set(utxo_map)
+
+        for tx_hash, tx in tx_map.items():
+            in_pairs = []
+            for prevout in tx.prevouts:
+                # utxo = utxo_map.get(prevout)
+                # if not utxo:
+                prev_hash, prev_index = prevout
+                if prev_hash in self.txs:  # accepted mempool
+                    utxo = self.txs[prev_hash].out_pairs[prev_index]
+                elif prev_hash in tx_map:  # this set of changes
+                    utxo = tx_map[prev_hash].out_pairs[prev_index]
+                else:  # get it from the db
+                    prev_tx_num = self._db.prefix_db.tx_num.get(prev_hash)
+                    if not prev_tx_num:
+                        continue
+                    prev_tx_num = prev_tx_num.tx_num
+                    hashX_val = self._db.prefix_db.hashX_utxo.get(tx_hash[:4], prev_tx_num, prev_index)
+                    if not hashX_val:
+                        continue
+                    hashX = hashX_val.hashX
+                    utxo_value = self._db.prefix_db.utxo.get(hashX, prev_tx_num, prev_index)
+                    utxo = (hashX, utxo_value.amount)
+                    # if not prev_raw:
+                    #     print("derp", prev_hash[::-1].hex())
+                    #     print(self._db.get_tx_num(prev_hash))
+                    # prev_tx, prev_tx_size = self.coin.DESERIALIZER(prev_raw.raw_tx).read_tx_and_vsize()
+                    # prev_txo = prev_tx.outputs[prev_index]
+                    # utxo = (self.coin.hashX_from_script(prev_txo.pk_script), prev_txo.value)
+                in_pairs.append(utxo)
+
+            # # Spend the prevouts
+            # unspent.difference_update(tx.prevouts)
 
             # Save the in_pairs, compute the fee and accept the TX
             tx.in_pairs = tuple(in_pairs)
@@ -121,198 +149,26 @@ class MemPool:
             # because some in_parts would be missing
             tx.fee = max(0, (sum(v for _, v in tx.in_pairs) -
                              sum(v for _, v in tx.out_pairs)))
-            txs[hash] = tx
+            self.txs[tx_hash] = tx
+            # print(f"added {tx_hash[::-1].hex()} reader to mempool")
 
             for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs):
-                touched.add(hashX)
-                hashXs[hashX].add(hash)
+                self.touched_hashXs[hashX].add(tx_hash)
+                touched_hashXs.add(hashX)
+        # utxo_map = {prevout: utxo_map[prevout] for prevout in unspent}
 
-        return deferred, {prevout: utxo_map[prevout] for prevout in unspent}
-
-    async def _mempool_loop(self, synchronized_event):
-        try:
-            return await self._refresh_hashes(synchronized_event)
-        except asyncio.CancelledError:
-            raise
-        except Exception as e:
-            self.logger.exception("MEMPOOL DIED")
-            raise e
-
-    async def _refresh_hashes(self, synchronized_event):
-        """Refresh our view of the daemon's mempool."""
-        while True:
-            start = time.perf_counter()
-            height = self._daemon.cached_height()
-            hex_hashes = await self._daemon.mempool_hashes()
-            if height != await self._daemon.height():
-                continue
-            hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
-            async with self.lock:
-                new_hashes = hashes.difference(self.notified_mempool_txs)
-                touched = await self._process_mempool(hashes)
-                self.notified_mempool_txs.update(new_hashes)
-                new_touched = {
-                    touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes)
-                }
-            synchronized_event.set()
-            synchronized_event.clear()
-            await self.on_mempool(touched, new_touched, height)
-            duration = time.perf_counter() - start
-            self.mempool_process_time_metric.observe(duration)
-            try:
-                # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
-                await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
-            except asyncio.TimeoutError:
-                pass
-            finally:
-                self.wakeup.clear()
-
-    async def _process_mempool(self, all_hashes):
-        # Re-sync with the new set of hashes
-        txs = self.txs
-
-        hashXs = self.hashXs  # hashX: [tx_hash, ...]
-        touched = set()
-
-        # First handle txs that have disappeared
-        for tx_hash in set(txs).difference(all_hashes):
-            tx = txs.pop(tx_hash)
-            tx_hashXs = {hashX for hashX, value in tx.in_pairs}
-            tx_hashXs.update(hashX for hashX, value in tx.out_pairs)
-            for hashX in tx_hashXs:
-                hashXs[hashX].remove(tx_hash)
-                if not hashXs[hashX]:
-                    del hashXs[hashX]
-            touched.update(tx_hashXs)
-
-        # Process new transactions
-        new_hashes = list(all_hashes.difference(txs))
-        if new_hashes:
-            fetches = []
-            for hashes in chunks(new_hashes, 200):
-                fetches.append(self._fetch_and_accept(hashes, all_hashes, touched))
-            tx_map = {}
-            utxo_map = {}
-            for fetch in asyncio.as_completed(fetches):
-                deferred, unspent = await fetch
-                tx_map.update(deferred)
-                utxo_map.update(unspent)
-
-            prior_count = 0
-            # FIXME: this is not particularly efficient
-            while tx_map and len(tx_map) != prior_count:
-                prior_count = len(tx_map)
-                tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched)
-
-            if tx_map:
-                self.logger.info(f'{len(tx_map)} txs dropped')
-
-        return touched
-
-    async def _fetch_and_accept(self, hashes, all_hashes, touched):
-        """Fetch a list of mempool transactions."""
-        raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes))
-
-        to_hashX = self.coin.hashX_from_script
-        deserializer = self.coin.DESERIALIZER
-
-        tx_map = {}
-        for hash, raw_tx in zip(hashes, raw_txs):
-            # The daemon may have evicted the tx from its
-            # mempool or it may have gotten in a block
-            if not raw_tx:
-                continue
-            tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
-            # Convert the inputs and outputs into (hashX, value) pairs
-            # Drop generation-like inputs from MemPoolTx.prevouts
-            txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
-                               for txin in tx.inputs
-                               if not txin.is_generation())
-            txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
-                                for txout in tx.outputs)
-            tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
-                                     0, tx_size, raw_tx)
-
-        # Determine all prevouts not in the mempool, and fetch the
-        # UTXO information from the database.  Failed prevout lookups
-        # return None - concurrent database updates happen - which is
-        # relied upon by _accept_transactions. Ignore prevouts that are
-        # generation-like.
-        prevouts = tuple(prevout for tx in tx_map.values()
-                         for prevout in tx.prevouts
-                         if prevout[0] not in all_hashes)
-        utxos = await self._db.lookup_utxos(prevouts)
-        utxo_map = dict(zip(prevouts, utxos))
-
-        return self._accept_transactions(tx_map, utxo_map, touched)
-
-    #
-    # External interface
-    #
-
-    async def keep_synchronized(self, synchronized_event):
-        """Keep the mempool synchronized with the daemon."""
-        await asyncio.wait([
-            self._mempool_loop(synchronized_event),
-            # self._refresh_histogram(synchronized_event),
-            self._logging(synchronized_event)
-        ])
-
-    async def balance_delta(self, hashX):
-        """Return the unconfirmed amount in the mempool for hashX.
-
-        Can be positive or negative.
-        """
-        value = 0
-        if hashX in self.hashXs:
-            for hash in self.hashXs[hashX]:
-                tx = self.txs[hash]
-                value -= sum(v for h168, v in tx.in_pairs if h168 == hashX)
-                value += sum(v for h168, v in tx.out_pairs if h168 == hashX)
-        return value
-
-    def compact_fee_histogram(self):
-        """Return a compact fee histogram of the current mempool."""
-        return self.cached_compact_histogram
-
-    async def potential_spends(self, hashX):
-        """Return a set of (prev_hash, prev_idx) pairs from mempool
-        transactions that touch hashX.
-
-        None, some or all of these may be spends of the hashX, but all
-        actual spends of it (in the DB or mempool) will be included.
-        """
-        result = set()
-        for tx_hash in self.hashXs.get(hashX, ()):
-            tx = self.txs[tx_hash]
-            result.update(tx.prevouts)
-        return result
+        return touched_hashXs
 
     def transaction_summaries(self, hashX):
         """Return a list of MemPoolTxSummary objects for the hashX."""
         result = []
-        for tx_hash in self.hashXs.get(hashX, ()):
+        for tx_hash in self.touched_hashXs.get(hashX, ()):
             tx = self.txs[tx_hash]
             has_ui = any(hash in self.txs for hash, idx in tx.prevouts)
             result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
         return result
 
-    async def unordered_UTXOs(self, hashX):
-        """Return an unordered list of UTXO named tuples from mempool
-        transactions that pay to hashX.
-
-        This does not consider if any other mempool transactions spend
-        the outputs.
-        """
-        utxos = []
-        for tx_hash in self.hashXs.get(hashX, ()):
-            tx = self.txs.get(tx_hash)
-            for pos, (hX, value) in enumerate(tx.out_pairs):
-                if hX == hashX:
-                    utxos.append(UTXO(-1, pos, tx_hash, 0, value))
-        return utxos
-
-    def get_mempool_height(self, tx_hash):
+    def get_mempool_height(self, tx_hash: bytes) -> int:
         # Height Progression
         #   -2: not broadcast
         #   -1: in mempool but has unconfirmed inputs
@@ -321,41 +177,57 @@ class MemPool:
         if tx_hash not in self.txs:
             return -2
         tx = self.txs[tx_hash]
-        unspent_inputs = sum(1 if hash in self.txs else 0 for hash, idx in tx.prevouts)
+        unspent_inputs = any(hash in self.raw_mempool for hash, idx in tx.prevouts)
         if unspent_inputs:
             return -1
         return 0
 
-    async def _maybe_notify(self, new_touched):
-        tmp, tbp = self._touched_mp, self._touched_bp
-        common = set(tmp).intersection(tbp)
-        if common:
-            height = max(common)
-        elif tmp and max(tmp) == self._highest_block:
-            height = self._highest_block
-        else:
-            # Either we are processing a block and waiting for it to
-            # come in, or we have not yet had a mempool update for the
-            # new block height
-            return
-        touched = tmp.pop(height)
-        for old in [h for h in tmp if h <= height]:
-            del tmp[old]
-        for old in [h for h in tbp if h <= height]:
-            touched.update(tbp.pop(old))
-        # print("notify", height, len(touched), len(new_touched))
-        await self.notify_sessions(height, touched, new_touched)
-
     async def start(self, height, session_manager: 'LBRYSessionManager'):
-        self._highest_block = height
         self.notify_sessions = session_manager._notify_sessions
-        await self.notify_sessions(height, set(), set())
+        await self._notify_sessions(height, set(), set())
 
     async def on_mempool(self, touched, new_touched, height):
-        self._touched_mp[height] = touched
-        await self._maybe_notify(new_touched)
+        await self._notify_sessions(height, touched, new_touched)
 
     async def on_block(self, touched, height):
-        self._touched_bp[height] = touched
-        self._highest_block = height
-        await self._maybe_notify(set())
+        await self._notify_sessions(height, touched, set())
+
+    async def _notify_sessions(self, height, touched, new_touched):
+        """Notify sessions about height changes and touched addresses."""
+        height_changed = height != self.session_manager.notified_height
+        if height_changed:
+            await self.session_manager._refresh_hsub_results(height)
+
+        if not self.session_manager.sessions:
+            return
+
+        if height_changed:
+            header_tasks = [
+                session.send_notification('blockchain.headers.subscribe', (self.session_manager.hsub_results[session.subscribe_headers_raw], ))
+                for session in self.session_manager.sessions.values() if session.subscribe_headers
+            ]
+            if header_tasks:
+                self.logger.warning(f'notify {len(header_tasks)} sessions of new header')
+                asyncio.create_task(asyncio.wait(header_tasks))
+            for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()):
+                self.session_manager.mempool_statuses.pop(hashX, None)
+        # self.bp._chain_executor
+        await asyncio.get_event_loop().run_in_executor(
+            None, touched.intersection_update, self.session_manager.hashx_subscriptions_by_session.keys()
+        )
+
+        if touched or new_touched or (height_changed and self.session_manager.mempool_statuses):
+            notified_hashxs = 0
+            session_hashxes_to_notify = defaultdict(list)
+            to_notify = touched if height_changed else new_touched
+
+            for hashX in to_notify:
+                if hashX not in self.session_manager.hashx_subscriptions_by_session:
+                    continue
+                for session_id in self.session_manager.hashx_subscriptions_by_session[hashX]:
+                    session_hashxes_to_notify[session_id].append(hashX)
+                    notified_hashxs += 1
+            for session_id, hashXes in session_hashxes_to_notify.items():
+                asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes))
+            if session_hashxes_to_notify:
+                self.logger.warning(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py
deleted file mode 100644
index e53297541..000000000
--- a/lbry/wallet/server/server.py
+++ /dev/null
@@ -1,94 +0,0 @@
-import signal
-import logging
-import asyncio
-from concurrent.futures.thread import ThreadPoolExecutor
-import typing
-
-import lbry
-from lbry.wallet.server.mempool import MemPool
-from lbry.wallet.server.block_processor import BlockProcessor
-from lbry.wallet.server.leveldb import LevelDB
-from lbry.wallet.server.session import LBRYSessionManager
-from lbry.prometheus import PrometheusServer
-
-
-class Server:
-
-    def __init__(self, env):
-        self.env = env
-        self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
-        self.shutdown_event = asyncio.Event()
-        self.cancellable_tasks = []
-
-        self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
-        self.db = db = LevelDB(env)
-        self.bp = bp = BlockProcessor(env, db, daemon, self.shutdown_event)
-        self.prometheus_server: typing.Optional[PrometheusServer] = None
-
-        self.session_mgr = LBRYSessionManager(
-            env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
-            self.shutdown_event,
-            on_available_callback=bp.status_server.set_available,
-            on_unavailable_callback=bp.status_server.set_unavailable
-        )
-        self._indexer_task = None
-
-    async def start(self):
-        env = self.env
-        min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
-        self.log.info(f'software version: {lbry.__version__}')
-        self.log.info(f'supported protocol versions: {min_str}-{max_str}')
-        self.log.info(f'event loop policy: {env.loop_policy}')
-        self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
-
-        await self.daemon.height()
-
-        def _start_cancellable(run, *args):
-            _flag = asyncio.Event()
-            self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
-            return _flag.wait()
-
-        await self.start_prometheus()
-        if self.env.udp_port:
-            await self.bp.status_server.start(
-                0, bytes.fromhex(self.bp.coin.GENESIS_HASH)[::-1], self.env.country,
-                self.env.host, self.env.udp_port, self.env.allow_lan_udp
-            )
-        await _start_cancellable(self.bp.fetch_and_process_blocks)
-
-        await self.db.populate_header_merkle_cache()
-        await _start_cancellable(self.bp.mempool.keep_synchronized)
-        await _start_cancellable(self.session_mgr.serve, self.bp.mempool)
-
-    async def stop(self):
-        for task in reversed(self.cancellable_tasks):
-            task.cancel()
-        await asyncio.wait(self.cancellable_tasks)
-        if self.prometheus_server:
-            await self.prometheus_server.stop()
-            self.prometheus_server = None
-        self.shutdown_event.set()
-        await self.daemon.close()
-
-    def run(self):
-        loop = asyncio.get_event_loop()
-        executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
-        loop.set_default_executor(executor)
-
-        def __exit():
-            raise SystemExit()
-        try:
-            loop.add_signal_handler(signal.SIGINT, __exit)
-            loop.add_signal_handler(signal.SIGTERM, __exit)
-            loop.run_until_complete(self.start())
-            loop.run_until_complete(self.shutdown_event.wait())
-        except (SystemExit, KeyboardInterrupt):
-            pass
-        finally:
-            loop.run_until_complete(self.stop())
-            executor.shutdown(True)
-
-    async def start_prometheus(self):
-        if not self.prometheus_server and self.env.prometheus_port:
-            self.prometheus_server = PrometheusServer()
-            await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)
diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py
index 4ff192dc5..0318e932c 100644
--- a/lbry/wallet/server/session.py
+++ b/lbry/wallet/server/session.py
@@ -20,8 +20,7 @@ import lbry
 from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
 from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
 from lbry.schema.result import Outputs
-from lbry.wallet.server.block_processor import BlockProcessor
-from lbry.wallet.server.leveldb import LevelDB
+from lbry.wallet.server.db.db import HubDB
 from lbry.wallet.server.websocket import AdminWebSocket
 from lbry.wallet.rpc.framing import NewlineFramer
 
@@ -34,9 +33,12 @@ from lbry.wallet.rpc import (
 from lbry.wallet.server import util
 from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error
 from lbry.wallet.server.daemon import DaemonError
+from lbry.wallet.server.db.elasticsearch import SearchIndex
+
 if typing.TYPE_CHECKING:
     from lbry.wallet.server.env import Env
     from lbry.wallet.server.daemon import Daemon
+    from lbry.wallet.server.mempool import MemPool
 
 BAD_REQUEST = 1
 DAEMON_ERROR = 2
@@ -170,7 +172,7 @@ class SessionManager:
         namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
     )
 
-    def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
+    def __init__(self, env: 'Env', db: HubDB, mempool: 'MemPool', history_cache, resolve_cache, resolve_outputs_cache,
                  daemon: 'Daemon', shutdown_event: asyncio.Event,
                  on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
         env.max_send = max(350000, env.max_send)
@@ -183,7 +185,7 @@ class SessionManager:
         self.shutdown_event = shutdown_event
         self.logger = util.class_logger(__name__, self.__class__.__name__)
         self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
-        self.sessions: typing.Dict[int, 'SessionBase'] = {}
+        self.sessions: typing.Dict[int, 'LBRYElectrumX'] = {}
         self.hashx_subscriptions_by_session: typing.DefaultDict[str, typing.Set[int]] = defaultdict(set)
         self.mempool_statuses = {}
         self.cur_group = SessionGroup(0)
@@ -198,8 +200,15 @@ class SessionManager:
 
         self.session_event = Event()
 
+        # Search index
+        self.search_index = SearchIndex(
+            self.env.es_index_prefix, self.env.database_query_timeout,
+            elastic_host=env.elastic_host, elastic_port=env.elastic_port
+        )
+
     async def _start_server(self, kind, *args, **kw_args):
         loop = asyncio.get_event_loop()
+
         if kind == 'RPC':
             protocol_class = LocalRPC
         else:
@@ -578,7 +587,7 @@ class SessionManager:
     async def raw_header(self, height):
         """Return the binary header at the given height."""
         try:
-            return self.db.raw_header(height)
+            return await self.db.raw_header(height)
         except IndexError:
             raise RPCError(BAD_REQUEST, f'height {height:,d} '
                                         'out of range') from None
@@ -686,12 +695,12 @@ class SessionBase(RPCSession):
     request_handlers: typing.Dict[str, typing.Callable] = {}
     version = '0.5.7'
 
-    def __init__(self, session_mgr, db, mempool, kind):
+    def __init__(self, session_manager: 'LBRYSessionManager', db: 'LevelDB', mempool: 'MemPool', kind: str):
         connection = JSONRPCConnection(JSONRPCAutoDetect)
-        self.env = session_mgr.env
+        self.env = session_manager.env
         super().__init__(connection=connection)
         self.logger = util.class_logger(__name__, self.__class__.__name__)
-        self.session_mgr = session_mgr
+        self.session_manager = session_manager
         self.db = db
         self.mempool = mempool
         self.kind = kind  # 'RPC', 'TCP' etc.
@@ -699,7 +708,7 @@ class SessionBase(RPCSession):
         self.anon_logs = self.env.anon_logs
         self.txs_sent = 0
         self.log_me = False
-        self.daemon_request = self.session_mgr.daemon_request
+        self.daemon_request = self.session_manager.daemon_request
         # Hijack the connection so we can log messages
         self._receive_message_orig = self.connection.receive_message
         self.connection.receive_message = self.receive_message
@@ -729,17 +738,17 @@ class SessionBase(RPCSession):
         self.session_id = next(self.session_counter)
         context = {'conn_id': f'{self.session_id}'}
         self.logger = util.ConnectionLogger(self.logger, context)
-        self.group = self.session_mgr.add_session(self)
-        self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
+        self.group = self.session_manager.add_session(self)
+        self.session_manager.session_count_metric.labels(version=self.client_version).inc()
         peer_addr_str = self.peer_address_str()
         self.logger.info(f'{self.kind} {peer_addr_str}, '
-                         f'{self.session_mgr.session_count():,d} total')
+                         f'{self.session_manager.session_count():,d} total')
 
     def connection_lost(self, exc):
         """Handle client disconnection."""
         super().connection_lost(exc)
-        self.session_mgr.remove_session(self)
-        self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
+        self.session_manager.remove_session(self)
+        self.session_manager.session_count_metric.labels(version=self.client_version).dec()
         msg = ''
         if not self._can_send.is_set():
             msg += ' whilst paused'
@@ -763,7 +772,7 @@ class SessionBase(RPCSession):
         """Handle an incoming request.  ElectrumX doesn't receive
         notifications from client sessions.
         """
-        self.session_mgr.request_count_metric.labels(method=request.method, version=self.client_version).inc()
+        self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc()
         if isinstance(request, Request):
             handler = self.request_handlers.get(request.method)
             handler = partial(handler, self)
@@ -811,7 +820,7 @@ class LBRYElectrumX(SessionBase):
     PROTOCOL_MIN = VERSION.PROTOCOL_MIN
     PROTOCOL_MAX = VERSION.PROTOCOL_MAX
     max_errors = math.inf  # don't disconnect people for errors! let them happen...
-    session_mgr: LBRYSessionManager
+    session_manager: LBRYSessionManager
     version = lbry.__version__
     cached_server_features = {}
 
@@ -822,17 +831,17 @@ class LBRYElectrumX(SessionBase):
             'blockchain.block.get_header': cls.block_get_header,
             'blockchain.estimatefee': cls.estimatefee,
             'blockchain.relayfee': cls.relayfee,
-            'blockchain.scripthash.get_balance': cls.scripthash_get_balance,
+            # 'blockchain.scripthash.get_balance': cls.scripthash_get_balance,
             'blockchain.scripthash.get_history': cls.scripthash_get_history,
             'blockchain.scripthash.get_mempool': cls.scripthash_get_mempool,
-            'blockchain.scripthash.listunspent': cls.scripthash_listunspent,
+            # 'blockchain.scripthash.listunspent': cls.scripthash_listunspent,
             'blockchain.scripthash.subscribe': cls.scripthash_subscribe,
             'blockchain.transaction.broadcast': cls.transaction_broadcast,
             'blockchain.transaction.get': cls.transaction_get,
             'blockchain.transaction.get_batch': cls.transaction_get_batch,
             'blockchain.transaction.info': cls.transaction_info,
             'blockchain.transaction.get_merkle': cls.transaction_merkle,
-            'server.add_peer': cls.add_peer,
+            # 'server.add_peer': cls.add_peer,
             'server.banner': cls.banner,
             'server.payment_address': cls.payment_address,
             'server.donation_address': cls.donation_address,
@@ -849,10 +858,10 @@ class LBRYElectrumX(SessionBase):
             'blockchain.block.headers': cls.block_headers,
             'server.ping': cls.ping,
             'blockchain.headers.subscribe': cls.headers_subscribe_False,
-            'blockchain.address.get_balance': cls.address_get_balance,
+            # 'blockchain.address.get_balance': cls.address_get_balance,
             'blockchain.address.get_history': cls.address_get_history,
             'blockchain.address.get_mempool': cls.address_get_mempool,
-            'blockchain.address.listunspent': cls.address_listunspent,
+            # 'blockchain.address.listunspent': cls.address_listunspent,
             'blockchain.address.subscribe': cls.address_subscribe,
             'blockchain.address.unsubscribe': cls.address_unsubscribe,
         })
@@ -871,8 +880,8 @@ class LBRYElectrumX(SessionBase):
         self.sv_seen = False
         self.protocol_tuple = self.PROTOCOL_MIN
         self.protocol_string = None
-        self.daemon = self.session_mgr.daemon
-        self.db: LevelDB = self.session_mgr.db
+        self.daemon = self.session_manager.daemon
+        self.db: LevelDB = self.session_manager.db
 
     @classmethod
     def protocol_min_max_strings(cls):
@@ -921,7 +930,7 @@ class LBRYElectrumX(SessionBase):
             else:
                 method = 'blockchain.address.subscribe'
             start = time.perf_counter()
-            db_history = await self.session_mgr.limited_history(hashX)
+            db_history = await self.session_manager.limited_history(hashX)
             mempool = self.mempool.transaction_summaries(hashX)
 
             status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
@@ -935,24 +944,25 @@ class LBRYElectrumX(SessionBase):
             else:
                 status = None
             if mempool:
-                self.session_mgr.mempool_statuses[hashX] = status
+                self.session_manager.mempool_statuses[hashX] = status
             else:
-                self.session_mgr.mempool_statuses.pop(hashX, None)
+                self.session_manager.mempool_statuses.pop(hashX, None)
 
-            self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
+            self.session_manager.address_history_metric.observe(time.perf_counter() - start)
             notifications.append((method, (alias, status)))
+            print(f"notify {alias} {method}")
 
         start = time.perf_counter()
-        self.session_mgr.notifications_in_flight_metric.inc()
+        self.session_manager.notifications_in_flight_metric.inc()
         for method, args in notifications:
             self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
         try:
             await self.send_notifications(
                 Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
             )
-            self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start)
+            self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start)
         finally:
-            self.session_mgr.notifications_in_flight_metric.dec()
+            self.session_manager.notifications_in_flight_metric.dec()
 
     # def get_metrics_or_placeholder_for_api(self, query_name):
     #     """ Do not hold on to a reference to the metrics
@@ -960,7 +970,7 @@ class LBRYElectrumX(SessionBase):
     #         you may be working with a stale metrics object.
     #     """
     #     if self.env.track_metrics:
-    #         # return self.session_mgr.metrics.for_api(query_name)
+    #         # return self.session_manager.metrics.for_api(query_name)
     #     else:
     #         return APICallMetrics(query_name)
 
@@ -970,17 +980,17 @@ class LBRYElectrumX(SessionBase):
     #     if isinstance(kwargs, dict):
     #         kwargs['release_time'] = format_release_time(kwargs.get('release_time'))
     #     try:
-    #         self.session_mgr.pending_query_metric.inc()
+    #         self.session_manager.pending_query_metric.inc()
     #         return await self.db.search_index.session_query(query_name, kwargs)
     #     except ConnectionTimeout:
-    #         self.session_mgr.interrupt_count_metric.inc()
+    #         self.session_manager.interrupt_count_metric.inc()
     #         raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
     #     finally:
-    #         self.session_mgr.pending_query_metric.dec()
-    #         self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
+    #         self.session_manager.pending_query_metric.dec()
+    #         self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
 
     async def mempool_compact_histogram(self):
-        return self.mempool.compact_fee_histogram()
+        return [] #self.mempool.compact_fee_histogram()
 
     async def claimtrie_search(self, **kwargs):
         start = time.perf_counter()
@@ -992,16 +1002,16 @@ class LBRYElectrumX(SessionBase):
             except ValueError:
                 pass
         try:
-            self.session_mgr.pending_query_metric.inc()
+            self.session_manager.pending_query_metric.inc()
             if 'channel' in kwargs:
                 channel_url = kwargs.pop('channel')
                 _, channel_claim, _, _ = await self.db.resolve(channel_url)
                 if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
                     return Outputs.to_base64([], [], 0, None, None)
                 kwargs['channel_id'] = channel_claim.claim_hash.hex()
-            return await self.db.search_index.cached_search(kwargs)
+            return await self.session_manager.search_index.cached_search(kwargs)
         except ConnectionTimeout:
-            self.session_mgr.interrupt_count_metric.inc()
+            self.session_manager.interrupt_count_metric.inc()
             raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
         except TooManyClaimSearchParametersError as err:
             await asyncio.sleep(2)
@@ -1009,25 +1019,25 @@ class LBRYElectrumX(SessionBase):
                                 self.peer_address()[0], err.key, err.limit)
             return RPCError(1, str(err))
         finally:
-            self.session_mgr.pending_query_metric.dec()
-            self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
+            self.session_manager.pending_query_metric.dec()
+            self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
 
     async def _cached_resolve_url(self, url):
-        if url not in self.session_mgr.resolve_cache:
-            self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
-        return self.session_mgr.resolve_cache[url]
+        if url not in self.session_manager.resolve_cache:
+            self.session_manager.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
+        return self.session_manager.resolve_cache[url]
 
     async def claimtrie_resolve(self, *urls) -> str:
         sorted_urls = tuple(sorted(urls))
-        self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
+        self.session_manager.urls_to_resolve_count_metric.inc(len(sorted_urls))
         try:
-            if sorted_urls in self.session_mgr.resolve_outputs_cache:
-                return self.session_mgr.resolve_outputs_cache[sorted_urls]
+            if sorted_urls in self.session_manager.resolve_outputs_cache:
+                return self.session_manager.resolve_outputs_cache[sorted_urls]
             rows, extra = [], []
             for url in urls:
-                if url not in self.session_mgr.resolve_cache:
-                    self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
-                stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
+                if url not in self.session_manager.resolve_cache:
+                    self.session_manager.resolve_cache[url] = await self._cached_resolve_url(url)
+                stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url]
                 if isinstance(channel, ResolveCensoredError):
                     rows.append(channel)
                     extra.append(channel.censor_row)
@@ -1052,12 +1062,12 @@ class LBRYElectrumX(SessionBase):
                     if reposted_channel:
                         extra.append(reposted_channel)
                 await asyncio.sleep(0)
-            self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
+            self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
                 None, Outputs.to_base64, rows, extra, 0, None, None
             )
             return result
         finally:
-            self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
+            self.session_manager.resolved_url_count_metric.inc(len(sorted_urls))
 
     async def get_server_height(self):
         return self.db.db_height
@@ -1093,7 +1103,7 @@ class LBRYElectrumX(SessionBase):
 
     async def subscribe_headers_result(self):
         """The result of a header subscription or notification."""
-        return self.session_mgr.hsub_results[self.subscribe_headers_raw]
+        return self.session_manager.hsub_results[self.subscribe_headers_raw]
 
     async def _headers_subscribe(self, raw):
         """Subscribe to get headers of new blocks."""
@@ -1130,7 +1140,7 @@ class LBRYElectrumX(SessionBase):
         # Note history is ordered and mempool unordered in electrum-server
         # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
 
-        db_history = await self.session_mgr.limited_history(hashX)
+        db_history = await self.session_manager.limited_history(hashX)
         mempool = self.mempool.transaction_summaries(hashX)
 
         status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
@@ -1145,32 +1155,32 @@ class LBRYElectrumX(SessionBase):
             status = None
 
         if mempool:
-            self.session_mgr.mempool_statuses[hashX] = status
+            self.session_manager.mempool_statuses[hashX] = status
         else:
-            self.session_mgr.mempool_statuses.pop(hashX, None)
+            self.session_manager.mempool_statuses.pop(hashX, None)
         return status
 
-    async def hashX_listunspent(self, hashX):
-        """Return the list of UTXOs of a script hash, including mempool
-        effects."""
-        utxos = await self.db.all_utxos(hashX)
-        utxos = sorted(utxos)
-        utxos.extend(await self.mempool.unordered_UTXOs(hashX))
-        spends = await self.mempool.potential_spends(hashX)
-
-        return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
-                 'tx_pos': utxo.tx_pos,
-                 'height': utxo.height, 'value': utxo.value}
-                for utxo in utxos
-                if (utxo.tx_hash, utxo.tx_pos) not in spends]
+    # async def hashX_listunspent(self, hashX):
+    #     """Return the list of UTXOs of a script hash, including mempool
+    #     effects."""
+    #     utxos = await self.db.all_utxos(hashX)
+    #     utxos = sorted(utxos)
+    #     utxos.extend(await self.mempool.unordered_UTXOs(hashX))
+    #     spends = await self.mempool.potential_spends(hashX)
+    #
+    #     return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
+    #              'tx_pos': utxo.tx_pos,
+    #              'height': utxo.height, 'value': utxo.value}
+    #             for utxo in utxos
+    #             if (utxo.tx_hash, utxo.tx_pos) not in spends]
 
     async def hashX_subscribe(self, hashX, alias):
         self.hashX_subs[hashX] = alias
-        self.session_mgr.hashx_subscriptions_by_session[hashX].add(id(self))
+        self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self))
         return await self.address_status(hashX)
 
     async def hashX_unsubscribe(self, hashX, alias):
-        sessions = self.session_mgr.hashx_subscriptions_by_session[hashX]
+        sessions = self.session_manager.hashx_subscriptions_by_session[hashX]
         sessions.remove(id(self))
         if not sessions:
             self.hashX_subs.pop(hashX, None)
@@ -1182,10 +1192,10 @@ class LBRYElectrumX(SessionBase):
             pass
         raise RPCError(BAD_REQUEST, f'{address} is not a valid address')
 
-    async def address_get_balance(self, address):
-        """Return the confirmed and unconfirmed balance of an address."""
-        hashX = self.address_to_hashX(address)
-        return await self.get_balance(hashX)
+    # async def address_get_balance(self, address):
+    #     """Return the confirmed and unconfirmed balance of an address."""
+    #     hashX = self.address_to_hashX(address)
+    #     return await self.get_balance(hashX)
 
     async def address_get_history(self, address):
         """Return the confirmed and unconfirmed history of an address."""
@@ -1197,10 +1207,10 @@ class LBRYElectrumX(SessionBase):
         hashX = self.address_to_hashX(address)
         return self.unconfirmed_history(hashX)
 
-    async def address_listunspent(self, address):
-        """Return the list of UTXOs of an address."""
-        hashX = self.address_to_hashX(address)
-        return await self.hashX_listunspent(hashX)
+    # async def address_listunspent(self, address):
+    #     """Return the list of UTXOs of an address."""
+    #     hashX = self.address_to_hashX(address)
+    #     return await self.hashX_listunspent(hashX)
 
     async def address_subscribe(self, *addresses):
         """Subscribe to an address.
@@ -1221,16 +1231,16 @@ class LBRYElectrumX(SessionBase):
         hashX = self.address_to_hashX(address)
         return await self.hashX_unsubscribe(hashX, address)
 
-    async def get_balance(self, hashX):
-        utxos = await self.db.all_utxos(hashX)
-        confirmed = sum(utxo.value for utxo in utxos)
-        unconfirmed = await self.mempool.balance_delta(hashX)
-        return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
+    # async def get_balance(self, hashX):
+    #     utxos = await self.db.all_utxos(hashX)
+    #     confirmed = sum(utxo.value for utxo in utxos)
+    #     unconfirmed = await self.mempool.balance_delta(hashX)
+    #     return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
 
-    async def scripthash_get_balance(self, scripthash):
-        """Return the confirmed and unconfirmed balance of a scripthash."""
-        hashX = scripthash_to_hashX(scripthash)
-        return await self.get_balance(hashX)
+    # async def scripthash_get_balance(self, scripthash):
+    #     """Return the confirmed and unconfirmed balance of a scripthash."""
+    #     hashX = scripthash_to_hashX(scripthash)
+    #     return await self.get_balance(hashX)
 
     def unconfirmed_history(self, hashX):
         # Note unconfirmed history is unordered in electrum-server
@@ -1242,7 +1252,7 @@ class LBRYElectrumX(SessionBase):
 
     async def confirmed_and_unconfirmed_history(self, hashX):
         # Note history is ordered but unconfirmed is unordered in e-s
-        history = await self.session_mgr.limited_history(hashX)
+        history = await self.session_manager.limited_history(hashX)
         conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
                 for tx_hash, height in history]
         return conf + self.unconfirmed_history(hashX)
@@ -1257,10 +1267,10 @@ class LBRYElectrumX(SessionBase):
         hashX = scripthash_to_hashX(scripthash)
         return self.unconfirmed_history(hashX)
 
-    async def scripthash_listunspent(self, scripthash):
-        """Return the list of UTXOs of a scripthash."""
-        hashX = scripthash_to_hashX(scripthash)
-        return await self.hashX_listunspent(hashX)
+    # async def scripthash_listunspent(self, scripthash):
+    #     """Return the list of UTXOs of a scripthash."""
+    #     hashX = scripthash_to_hashX(scripthash)
+    #     return await self.hashX_listunspent(hashX)
 
     async def scripthash_subscribe(self, scripthash):
         """Subscribe to a script hash.
@@ -1295,7 +1305,7 @@ class LBRYElectrumX(SessionBase):
 
         max_size = self.MAX_CHUNK_SIZE
         count = min(count, max_size)
-        headers, count = self.db.read_headers(start_height, count)
+        headers, count = await self.db.read_headers(start_height, count)
 
         if b64:
             headers = self.db.encode_headers(start_height, count, headers)
@@ -1318,7 +1328,7 @@ class LBRYElectrumX(SessionBase):
         index = non_negative_integer(index)
         size = self.coin.CHUNK_SIZE
         start_height = index * size
-        headers, _ = self.db.read_headers(start_height, size)
+        headers, _ = await self.db.read_headers(start_height, size)
         return headers.hex()
 
     async def block_get_header(self, height):
@@ -1326,7 +1336,7 @@ class LBRYElectrumX(SessionBase):
 
         height: the header's height"""
         height = non_negative_integer(height)
-        return await self.session_mgr.electrum_header(height)
+        return await self.session_manager.electrum_header(height)
 
     def is_tor(self):
         """Try to detect if the connection is to a tor hidden service we are
@@ -1416,10 +1426,10 @@ class LBRYElectrumX(SessionBase):
                 self.close_after_send = True
                 raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}')
             if self.client_version != client_name[:17]:
-                self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
+                self.session_manager.session_count_metric.labels(version=self.client_version).dec()
                 self.client_version = client_name[:17]
-                self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
-        self.session_mgr.client_version_metric.labels(version=self.client_version).inc()
+                self.session_manager.session_count_metric.labels(version=self.client_version).inc()
+        self.session_manager.client_version_metric.labels(version=self.client_version).inc()
 
         # Find the highest common protocol version.  Disconnect if
         # that protocol version in unsupported.
@@ -1440,9 +1450,10 @@ class LBRYElectrumX(SessionBase):
         raw_tx: the raw transaction as a hexadecimal string"""
         # This returns errors as JSON RPC errors, as is natural
         try:
-            hex_hash = await self.session_mgr.broadcast_transaction(raw_tx)
+            hex_hash = await self.session_manager.broadcast_transaction(raw_tx)
             self.txs_sent += 1
-            self.mempool.wakeup.set()
+            # self.mempool.wakeup.set()
+            # await asyncio.sleep(0.5)
             self.logger.info(f'sent tx: {hex_hash}')
             return hex_hash
         except DaemonError as e:
@@ -1456,7 +1467,7 @@ class LBRYElectrumX(SessionBase):
         return (await self.transaction_get_batch(tx_hash))[tx_hash]
 
     async def transaction_get_batch(self, *tx_hashes):
-        self.session_mgr.tx_request_count_metric.inc(len(tx_hashes))
+        self.session_manager.tx_request_count_metric.inc(len(tx_hashes))
         if len(tx_hashes) > 100:
             raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
         for tx_hash in tx_hashes:
@@ -1495,8 +1506,11 @@ class LBRYElectrumX(SessionBase):
                     'block_height': block_height
                 }
                 await asyncio.sleep(0)  # heavy call, give other tasks a chance
+        print("return tx batch")
+        for tx_hash, (_, info) in batch_result.items():
+            print(tx_hash, info['block_height'])
 
-        self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
+        self.session_manager.tx_replied_count_metric.inc(len(tx_hashes))
         return batch_result
 
     async def transaction_get(self, tx_hash, verbose=False):