From 98565eb67cab40e8214b8f96ed9f4fb42c39acad Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 9 Jun 2020 16:04:14 -0400 Subject: [PATCH 01/19] run read_raw_block in executor --- lbry/wallet/server/block_processor.py | 2 +- lbry/wallet/server/leveldb.py | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 437b24f18..e2ecb69ce 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -251,7 +251,7 @@ class BlockProcessor: async def get_raw_blocks(last_height, hex_hashes): heights = range(last_height, last_height - len(hex_hashes), -1) try: - blocks = [self.db.read_raw_block(height) for height in heights] + blocks = [await self.db.read_raw_block(height) for height in heights] self.logger.info(f'read {len(blocks)} blocks from disk') return blocks except FileNotFoundError: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index cdcbbe50a..fe048c9b4 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -479,11 +479,15 @@ class LevelDB: def raw_block_path(self, height): return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}') - def read_raw_block(self, height): + async def read_raw_block(self, height): """Returns a raw block read from disk. Raises FileNotFoundError if the block isn't on-disk.""" - with util.open_file(self.raw_block_path(height)) as f: - return f.read(-1) + + def read(): + with util.open_file(self.raw_block_path(height)) as f: + return f.read(-1) + + return await asyncio.get_event_loop().run_in_executor(self.executor, read) def write_raw_block(self, block, height): """Write a raw block to disk.""" From 22540390e1f8711143a5dfd4ab4afb81287e4a31 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 9 Jun 2020 16:04:39 -0400 Subject: [PATCH 02/19] break the wallet server with chris45 test --- tests/integration/other/test_chris45.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/integration/other/test_chris45.py b/tests/integration/other/test_chris45.py index e837a4ec5..51c3cd3e8 100644 --- a/tests/integration/other/test_chris45.py +++ b/tests/integration/other/test_chris45.py @@ -4,6 +4,7 @@ from lbry.testcase import CommandTestCase class EpicAdventuresOfChris45(CommandTestCase): async def test_no_this_is_not_a_test_its_an_adventure(self): + # Chris45 is an avid user of LBRY and this is his story. It's fact and fiction # and everything in between; it's also the setting of some record setting # integration tests. @@ -194,3 +195,9 @@ class EpicAdventuresOfChris45(CommandTestCase): }}, await self.resolve(uri) ) + + # He closes and opens the wallet server databases to see how horribly they break + db = self.conductor.spv_node.server.db + db.close() + await db.open_for_serving() + # They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup) From cc51543851728e376e73f14eee1acc801370b797 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 9 Jun 2020 16:21:48 -0400 Subject: [PATCH 03/19] headers db --- lbry/wallet/server/leveldb.py | 89 ++++++++++++++--------------------- lbry/wallet/server/util.py | 4 +- 2 files changed, 38 insertions(+), 55 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index fe048c9b4..86dba8955 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -32,6 +32,9 @@ from lbry.wallet.server.history import History UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") +HEADER_PREFIX = b'H' +TX_COUNT_PREFIX = b'T' +TX_HASH_PREFIX = b'X' @attr.s(slots=True) @@ -63,15 +66,7 @@ class LevelDB: self.logger = util.class_logger(__name__, self.__class__.__name__) self.env = env self.coin = env.coin - self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) - - # Setup block header size handlers - if self.coin.STATIC_BLOCK_HEADERS: - self.header_offset = self.coin.static_header_offset - self.header_len = self.coin.static_header_len - else: - self.header_offset = self.dynamic_header_offset - self.header_len = self.dynamic_header_len + self.executor = None self.logger.info(f'switching current directory to {env.db_dir}') @@ -88,12 +83,9 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) path = partial(os.path.join, self.env.db_dir) - self.headers_file = util.LogicalFile(path('meta/headers'), 2, 16000000) self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000) self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000) - if not self.coin.STATIC_BLOCK_HEADERS: - self.headers_offsets_file = util.LogicalFile( - path('meta/headers_offsets'), 2, 16000000) + self.headers_db = None async def _read_tx_counts(self): if self.tx_counts is not None: @@ -110,22 +102,24 @@ class LevelDB: assert self.db_tx_count == 0 async def _open_dbs(self, for_sync, compacting): - assert self.utxo_db is None + if self.executor is None: + self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) + assert self.headers_db is None + self.headers_db = self.db_class('headers', for_sync) + if self.headers_db.is_new: + self.logger.info('created new headers db') + self.logger.info(f'opened headers DB (for sync: {for_sync})') + + assert self.utxo_db is None # First UTXO DB self.utxo_db = self.db_class('utxo', for_sync) if self.utxo_db.is_new: self.logger.info('created new database') self.logger.info('creating metadata directory') os.mkdir(os.path.join(self.env.db_dir, 'meta')) - coin_path = os.path.join(self.env.db_dir, 'meta', 'COIN') - with util.open_file(coin_path, create=True) as f: - f.write(f'ElectrumX databases and metadata for ' - f'{self.coin.NAME} {self.coin.NET}'.encode()) - if not self.coin.STATIC_BLOCK_HEADERS: - self.headers_offsets_file.write(0, bytes(8)) - else: - self.logger.info(f'opened UTXO DB (for sync: {for_sync})') + self.logger.info('created new utxo db') + self.logger.info(f'opened utxo db (for sync: {for_sync})') self.read_utxo_state() # Then history DB @@ -140,7 +134,9 @@ class LevelDB: def close(self): self.utxo_db.close() self.history.close_db() + self.headers_db.close() self.executor.shutdown(wait=True) + self.executor = None async def open_for_compacting(self): await self._open_dbs(True, True) @@ -163,6 +159,9 @@ class LevelDB: self.utxo_db.close() self.history.close_db() self.utxo_db = None + if self.headers_db: + self.headers_db.close() + self.headers_db = None await self._open_dbs(False, False) # Header merkle cache @@ -256,11 +255,11 @@ class LevelDB: # Write the headers, tx counts, and tx hashes start_time = time.time() height_start = self.fs_height + 1 - offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(flush_data.headers)) - self.fs_update_header_offsets(offset, height_start, flush_data.headers) - flush_data.headers.clear() + for header in flush_data.headers: + tx_count = self.tx_counts[height_start] + self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + height_start += 1 offset = height_start * self.tx_counts.itemsize self.tx_counts_file.write(offset, self.tx_counts[height_start:].tobytes()) @@ -270,6 +269,7 @@ class LevelDB: self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count + flush_data.headers.clear() if self.utxo_db.for_sync: elapsed = time.time() - start_time self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') @@ -350,28 +350,6 @@ class LevelDB: f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - def fs_update_header_offsets(self, offset_start, height_start, headers): - if self.coin.STATIC_BLOCK_HEADERS: - return - offset = offset_start - offsets = [] - for h in headers: - offset += len(h) - offsets.append(pack(" Date: Tue, 9 Jun 2020 17:54:13 -0400 Subject: [PATCH 04/19] tx count db --- lbry/wallet/server/leveldb.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 86dba8955..69c6da06b 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -83,21 +83,29 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) path = partial(os.path.join, self.env.db_dir) - self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000) self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000) self.headers_db = None + self.tx_count_db = None async def _read_tx_counts(self): if self.tx_counts is not None: return # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase - size = (self.db_height + 1) * 4 - tx_counts = self.tx_counts_file.read(0, size) - assert len(tx_counts) == size + + def get_counts(): + return tuple( + util.unpack_be_uint64(tx_count) + for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + ) + + tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) + assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" self.tx_counts = array.array('I', tx_counts) + if self.tx_counts: - assert self.db_tx_count == self.tx_counts[-1] + assert self.db_tx_count == self.tx_counts[-1], \ + f"{self.db_tx_count} vs {self.tx_counts[-1]} ({len(self.tx_counts)} counts)" else: assert self.db_tx_count == 0 @@ -111,6 +119,12 @@ class LevelDB: self.logger.info('created new headers db') self.logger.info(f'opened headers DB (for sync: {for_sync})') + assert self.tx_count_db is None + self.tx_count_db = self.db_class('tx_count', for_sync) + if self.tx_count_db.is_new: + self.logger.info('created new tx count db') + self.logger.info(f'opened tx count DB (for sync: {for_sync})') + assert self.utxo_db is None # First UTXO DB self.utxo_db = self.db_class('utxo', for_sync) @@ -135,6 +149,7 @@ class LevelDB: self.utxo_db.close() self.history.close_db() self.headers_db.close() + self.tx_count_db.close() self.executor.shutdown(wait=True) self.executor = None @@ -162,6 +177,9 @@ class LevelDB: if self.headers_db: self.headers_db.close() self.headers_db = None + if self.tx_count_db: + self.tx_count_db.close() + self.tx_count_db = None await self._open_dbs(False, False) # Header merkle cache @@ -259,10 +277,8 @@ class LevelDB: for header in flush_data.headers: tx_count = self.tx_counts[height_start] self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 - offset = height_start * self.tx_counts.itemsize - self.tx_counts_file.write(offset, - self.tx_counts[height_start:].tobytes()) offset = prior_tx_count * 32 self.hashes_file.write(offset, hashes) From 639b1e48f5cf2107043a74b56ae73e920f8eff46 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 11 Jun 2020 14:22:47 -0400 Subject: [PATCH 05/19] blocks dir --- lbry/wallet/server/leveldb.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 69c6da06b..a69c94e70 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -112,6 +112,11 @@ class LevelDB: async def _open_dbs(self, for_sync, compacting): if self.executor is None: self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) + coin_path = os.path.join(self.env.db_dir, 'COIN') + if not os.path.isfile(coin_path): + with util.open_file(coin_path, create=True) as f: + f.write(f'ElectrumX databases and metadata for ' + f'{self.coin.NAME} {self.coin.NET}'.encode()) assert self.headers_db is None self.headers_db = self.db_class('headers', for_sync) @@ -129,17 +134,14 @@ class LevelDB: # First UTXO DB self.utxo_db = self.db_class('utxo', for_sync) if self.utxo_db.is_new: - self.logger.info('created new database') - self.logger.info('creating metadata directory') - os.mkdir(os.path.join(self.env.db_dir, 'meta')) self.logger.info('created new utxo db') self.logger.info(f'opened utxo db (for sync: {for_sync})') self.read_utxo_state() # Then history DB - self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count, - compacting) + self.utxo_flush_count = self.history.open_db( + self.db_class, for_sync, self.utxo_flush_count, compacting + ) self.clear_excess_undo_info() # Read TX counts (requires meta directory) @@ -163,12 +165,14 @@ class LevelDB: synchronization. When serving clients we want the open files for serving network connections. """ + self.logger.info("opened for sync") await self._open_dbs(True, False) async def open_for_serving(self): """Open the databases for serving. If they are already open they are closed first. """ + self.logger.info('closing DBs to re-open for serving') if self.utxo_db: self.logger.info('closing DBs to re-open for serving') self.utxo_db.close() @@ -181,6 +185,7 @@ class LevelDB: self.tx_count_db.close() self.tx_count_db = None await self._open_dbs(False, False) + self.logger.info("opened for serving") # Header merkle cache @@ -471,7 +476,7 @@ class LevelDB: batch_put(self.undo_key(height), b''.join(undo_info)) def raw_block_prefix(self): - return 'meta/block' + return 'block' def raw_block_path(self, height): return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}') From 71eccdc0e3903261f5925d9bc9b08c9214a22f43 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 11 Jun 2020 19:09:09 -0400 Subject: [PATCH 06/19] hashes path --- lbry/wallet/server/leveldb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index a69c94e70..1db38f26f 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -83,7 +83,7 @@ class LevelDB: self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) path = partial(os.path.join, self.env.db_dir) - self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000) + self.hashes_file = util.LogicalFile(path('hashes'), 4, 16000000) self.headers_db = None self.tx_count_db = None From 375187aa708ee7cf92f192a07ad32e1f8754ed1a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 12 Jun 2020 00:11:26 -0400 Subject: [PATCH 07/19] tx hashes db --- lbry/wallet/server/leveldb.py | 41 +++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 1db38f26f..f7bb7faba 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -82,8 +82,7 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - path = partial(os.path.join, self.env.db_dir) - self.hashes_file = util.LogicalFile(path('hashes'), 4, 16000000) + self.hashes_db = None self.headers_db = None self.tx_count_db = None @@ -130,6 +129,12 @@ class LevelDB: self.logger.info('created new tx count db') self.logger.info(f'opened tx count DB (for sync: {for_sync})') + assert self.hashes_db is None + self.hashes_db = self.db_class('hashes', for_sync) + if self.hashes_db.is_new: + self.logger.info('created new tx hashes db') + self.logger.info(f'opened tx hashes DB (for sync: {for_sync})') + assert self.utxo_db is None # First UTXO DB self.utxo_db = self.db_class('utxo', for_sync) @@ -152,6 +157,7 @@ class LevelDB: self.history.close_db() self.headers_db.close() self.tx_count_db.close() + self.hashes_db.close() self.executor.shutdown(wait=True) self.executor = None @@ -184,6 +190,10 @@ class LevelDB: if self.tx_count_db: self.tx_count_db.close() self.tx_count_db = None + if self.hashes_db: + self.hashes_db.close() + self.hashes_db = None + await self._open_dbs(False, False) self.logger.info("opened for serving") @@ -270,13 +280,10 @@ class LevelDB: assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts else 0) assert len(self.tx_counts) == flush_data.height + 1 - hashes = b''.join(flush_data.block_tx_hashes) - flush_data.block_tx_hashes.clear() - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count + assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes - start_time = time.time() + start_time = time.perf_counter() height_start = self.fs_height + 1 for header in flush_data.headers: @@ -284,16 +291,22 @@ class LevelDB: self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 - offset = prior_tx_count * 32 - self.hashes_file.write(offset, hashes) + + tx_num = prior_tx_count + for tx_hashes in flush_data.block_tx_hashes: + offset = 0 + while offset < len(tx_hashes): + self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + tx_num += 1 + offset += 32 + + flush_data.block_tx_hashes.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count - flush_data.headers.clear() - if self.utxo_db.for_sync: - elapsed = time.time() - start_time - self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') + elapsed = time.perf_counter() - start_time + self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') def flush_history(self): self.history.flush() @@ -421,7 +434,7 @@ class LevelDB: if tx_height > self.db_height: tx_hash = None else: - tx_hash = self.hashes_file.read(tx_num * 32, 32) + tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height async def fs_block_hashes(self, height, count): From caf616234bb7986d2d890e6a25935f75fb91eb3f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 12 Jun 2020 00:51:02 -0400 Subject: [PATCH 08/19] flush databases during sync --- lbry/wallet/server/block_processor.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e2ecb69ce..eceb62e08 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -351,11 +351,9 @@ class BlockProcessor: # performed on the DB. if self._caught_up_event.is_set(): await self.flush(True) - elif time.time() > self.next_cache_check: - flush_arg = self.check_cache_size() - if flush_arg is not None: - await self.flush(flush_arg) - self.next_cache_check = time.time() + 30 + elif time.perf_counter() > self.next_cache_check: + await self.flush(True) + self.next_cache_check = time.perf_counter() + 30 def check_cache_size(self): """Flush a cache if it gets too big.""" From 70596042d603bc0400b45c0ca6900973bbc6f93c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 12 Jun 2020 14:01:50 -0400 Subject: [PATCH 09/19] mempool_process_time_metric --- lbry/wallet/server/mempool.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 03d3cd03c..bffd4e8ee 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -13,6 +13,7 @@ import time from abc import ABC, abstractmethod from collections import defaultdict from concurrent.futures.thread import ThreadPoolExecutor +from prometheus_client import Histogram import attr @@ -79,6 +80,12 @@ class MemPoolAPI(ABC): daemon's height at the time the mempool was obtained.""" +NAMESPACE = "wallet_server" +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) + + class MemPool: """Representation of the daemon's mempool. @@ -107,6 +114,9 @@ class MemPool: self.lock = asyncio.Lock() self.wakeup = asyncio.Event() self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1)) + self.mempool_process_time_metric = Histogram( + "processed_mempool", "Time to process mempool and notify touched addresses", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -207,6 +217,7 @@ class MemPool: async def _refresh_hashes(self, synchronized_event): """Refresh our view of the daemon's mempool.""" while True: + start = time.perf_counter() height = self.api.cached_height() hex_hashes = await self.api.mempool_hashes() if height != await self.api.height(): @@ -217,13 +228,16 @@ class MemPool: synchronized_event.set() synchronized_event.clear() await self.api.on_mempool(touched, height) + timed_out = False try: # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) except asyncio.TimeoutError: - pass + timed_out = True finally: self.wakeup.clear() + duration = time.perf_counter() - start - (0 if not timed_out else self.refresh_secs) + self.mempool_process_time_metric.observe(duration) async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes From 0aa7fd47d584c48bf027a3149d899bc74ea9e6b6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 12 Jun 2020 17:49:29 -0400 Subject: [PATCH 10/19] combine loops --- lbry/wallet/server/leveldb.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index f7bb7faba..ab89cd2c6 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -285,15 +285,13 @@ class LevelDB: # Write the headers, tx counts, and tx hashes start_time = time.perf_counter() height_start = self.fs_height + 1 + tx_num = prior_tx_count - for header in flush_data.headers: + for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes): tx_count = self.tx_counts[height_start] self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 - - tx_num = prior_tx_count - for tx_hashes in flush_data.block_tx_hashes: offset = 0 while offset < len(tx_hashes): self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) @@ -301,7 +299,6 @@ class LevelDB: offset += 32 flush_data.block_tx_hashes.clear() - self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count flush_data.headers.clear() From 8c695e42ca551b4b37dccfe6493574fef72f9b2f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 10:20:55 -0400 Subject: [PATCH 11/19] fix sqlite coin chooser floor --- lbry/wallet/database.py | 9 +++++---- lbry/wallet/ledger.py | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index bc6d880f8..55a10b62b 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -529,13 +529,14 @@ def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decode def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: List, amount_to_reserve: int, floor: int, - fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool): + fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool, + base_multiplier: int = 100): txs = defaultdict(list) decoded_transactions = {} reserved = [] reserved_dewies = 0 - multiplier = 10 + multiplier = base_multiplier gap_count = 0 while reserved_dewies < amount_to_reserve and gap_count < 5 and floor * multiplier < SQLITE_MAX_INTEGER: @@ -550,7 +551,7 @@ def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: L multiplier **= 2 else: gap_count = 0 - multiplier = 10 + multiplier = base_multiplier # reserve the accumulated txos if enough were found if reserved_dewies >= amount_to_reserve: @@ -762,7 +763,7 @@ class Database(SQLiteMixin): # 2. update address histories removing deleted TXs return True - async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000, + async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 1, fee_per_byte: int = 50, set_reserved: bool = True, return_insufficient_funds: bool = False) -> List: to_spend = await self.db.run( diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 997a0dfeb..54c26f521 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -244,8 +244,7 @@ class Ledger(metaclass=LedgerRegistry): def get_address_count(self, **constraints): return self.db.get_address_count(**constraints) - async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], - min_amount=100000): + async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], min_amount=1): min_amount = min(amount // 10, min_amount) fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self) selector = CoinSelector(amount, fee) From a9eeca1302319e143e38154833285a280d06654d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 10:21:40 -0400 Subject: [PATCH 12/19] mempool processing time metric --- lbry/wallet/server/mempool.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index bffd4e8ee..40311bd20 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -84,6 +84,10 @@ NAMESPACE = "wallet_server" HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) +mempool_process_time_metric = Histogram( + "processed_mempool", "Time to process mempool and notify touched addresses", + namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS +) class MemPool: @@ -114,9 +118,7 @@ class MemPool: self.lock = asyncio.Lock() self.wakeup = asyncio.Event() self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1)) - self.mempool_process_time_metric = Histogram( - "processed_mempool", "Time to process mempool and notify touched addresses", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS - ) + self.mempool_process_time_metric = mempool_process_time_metric async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" From e6cae9bcc37db47bc94aafcaabcd3c1418a11bef Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 10:22:30 -0400 Subject: [PATCH 13/19] remove mempool wakeup event, lower refresh delay --- lbry/wallet/server/mempool.py | 13 +++---------- lbry/wallet/server/session.py | 1 - 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 40311bd20..fd045ced7 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -104,7 +104,7 @@ class MemPool: hashXs: hashX -> set of all hashes of txs touching the hashX """ - def __init__(self, coin, api, refresh_secs=5.0, log_status_secs=120.0): + def __init__(self, coin, api, refresh_secs=1.0, log_status_secs=120.0): assert isinstance(api, MemPoolAPI) self.coin = coin self.api = api @@ -230,16 +230,9 @@ class MemPool: synchronized_event.set() synchronized_event.clear() await self.api.on_mempool(touched, height) - timed_out = False - try: - # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) - await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) - except asyncio.TimeoutError: - timed_out = True - finally: - self.wakeup.clear() - duration = time.perf_counter() - start - (0 if not timed_out else self.refresh_secs) + duration = time.perf_counter() - start self.mempool_process_time_metric.observe(duration) + await asyncio.sleep(self.refresh_secs) async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 3e6bfb771..6eb514335 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -602,7 +602,6 @@ class SessionManager: async def broadcast_transaction(self, raw_tx): hex_hash = await self.daemon.broadcast_transaction(raw_tx) - self.mempool.wakeup.set() self.txs_sent += 1 return hex_hash From fc9023386c66d62184b4b6e10e88bdfb43afca42 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 15:56:44 -0400 Subject: [PATCH 14/19] non-blocking history lookup in notify --- lbry/wallet/server/session.py | 39 ++++++++++++++--------------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6eb514335..98f7b6fd5 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -922,36 +922,27 @@ class LBRYElectrumX(SessionBase): args = (await self.subscribe_headers_result(), ) if not (await self.send_notification('blockchain.headers.subscribe', args)): return + + async def send_history_notification(alias, hashX): + if len(alias) == 64: + method = 'blockchain.scripthash.subscribe' + else: + method = 'blockchain.address.subscribe' + status = await self.address_status(hashX) + await self.send_notification(method, (alias, status)) + touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): - changed = {} - for hashX in touched: alias = self.hashX_subs[hashX] - status = await self.address_status(hashX) - changed[alias] = status + asyncio.create_task(send_history_notification(alias, hashX)) + + if touched: + es = '' if len(touched) == 1 else 'es' + self.logger.info(f'notified of {len(touched):,d} address{es}') + - # Check mempool hashXs - the status is a function of the - # confirmed state of other transactions. Note: we cannot - # iterate over mempool_statuses as it changes size. - for hashX in tuple(self.mempool_statuses): - # Items can be evicted whilst await-ing status; False - # ensures such hashXs are notified - old_status = self.mempool_statuses.get(hashX, False) - status = await self.address_status(hashX) - if status != old_status: - alias = self.hashX_subs[hashX] - changed[alias] = status - for alias, status in changed.items(): - if len(alias) == 64: - method = 'blockchain.scripthash.subscribe' - else: - method = 'blockchain.address.subscribe' - asyncio.create_task(self.send_notification(method, (alias, status))) - if changed: - es = '' if len(changed) == 1 else 'es' - self.logger.info(f'notified of {len(changed):,d} address{es}') def get_metrics_or_placeholder_for_api(self, query_name): """ Do not hold on to a reference to the metrics From 6c28713a4ce9dbd9fa116af3dd8812d0ec5a51b5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 15:57:51 -0400 Subject: [PATCH 15/19] read history in one loop --- lbry/wallet/server/leveldb.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index ab89cd2c6..ccd3fcd60 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -454,9 +454,20 @@ class LevelDB: limit to None to get them all. """ def read_history(): - tx_nums = list(self.history.get_txnums(hashX, limit)) - fs_tx_hash = self.fs_tx_hash - return [fs_tx_hash(tx_num) for tx_num in tx_nums] + hashx_history = [] + for key, hist in self.history.db.iterator(prefix=hashX): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + tx_height = bisect_right(self.tx_counts, tx_num) + if tx_height > self.db_height: + tx_hash = None + else: + tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + hashx_history.append((tx_hash, tx_height)) + if limit and len(hashx_history) >= limit: + return hashx_history + return hashx_history while True: history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) From e22bc01cbdf71647ad5ba35a8e0aa92e185ff0dd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 15 Jun 2020 16:21:43 -0400 Subject: [PATCH 16/19] re-add wakeup event, add address history metric --- lbry/wallet/server/mempool.py | 8 +++++++- lbry/wallet/server/session.py | 11 ++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index fd045ced7..7ca887892 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -232,7 +232,13 @@ class MemPool: await self.api.on_mempool(touched, height) duration = time.perf_counter() - start self.mempool_process_time_metric.observe(duration) - await asyncio.sleep(self.refresh_secs) + try: + # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) + await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) + except asyncio.TimeoutError: + pass + finally: + self.wakeup.clear() async def _process_mempool(self, all_hashes): # Re-sync with the new set of hashes diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 98f7b6fd5..810b11dba 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -123,6 +123,7 @@ HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) + class SessionManager: """Holds global state about all sessions.""" @@ -160,6 +161,10 @@ class SessionManager: "clients", "Number of connections received per client version", namespace=NAMESPACE, labelnames=("version",) ) + address_history_metric = Histogram( + "address_history", "Time to fetch an address history", + namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', shutdown_event: asyncio.Event): @@ -924,11 +929,13 @@ class LBRYElectrumX(SessionBase): return async def send_history_notification(alias, hashX): + start = time.perf_counter() if len(alias) == 64: method = 'blockchain.scripthash.subscribe' else: method = 'blockchain.address.subscribe' status = await self.address_status(hashX) + self.session_mgr.address_history_metric.observe(time.perf_counter() - start) await self.send_notification(method, (alias, status)) touched = touched.intersection(self.hashX_subs) @@ -941,9 +948,6 @@ class LBRYElectrumX(SessionBase): es = '' if len(touched) == 1 else 'es' self.logger.info(f'notified of {len(touched):,d} address{es}') - - - def get_metrics_or_placeholder_for_api(self, query_name): """ Do not hold on to a reference to the metrics returned by this method past an `await` or @@ -1487,6 +1491,7 @@ class LBRYElectrumX(SessionBase): try: hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) self.txs_sent += 1 + self.mempool.wakeup.set() self.logger.info(f'sent tx: {hex_hash}') return hex_hash except DaemonError as e: From 50ecb0dac9366b0e3a846ea0f591e184613c9d1b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 16 Jun 2020 17:58:37 -0400 Subject: [PATCH 17/19] fix notify --- lbry/wallet/server/session.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 810b11dba..6a831ec1a 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -940,13 +940,21 @@ class LBRYElectrumX(SessionBase): touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): + notified = set() + mempool_addrs = tuple(self.mempool_statuses.keys()) for hashX in touched: alias = self.hashX_subs[hashX] asyncio.create_task(send_history_notification(alias, hashX)) + notified.add(hashX) + for hashX in mempool_addrs: + if hashX not in notified: + alias = self.hashX_subs[hashX] + asyncio.create_task(send_history_notification(alias, hashX)) + notified.add(hashX) if touched: es = '' if len(touched) == 1 else 'es' - self.logger.info(f'notified of {len(touched):,d} address{es}') + self.logger.info(f'notified {len(notified)} mempool/{len(touched):,d} touched address{es}') def get_metrics_or_placeholder_for_api(self, query_name): """ Do not hold on to a reference to the metrics @@ -1175,7 +1183,6 @@ class LBRYElectrumX(SessionBase): self.mempool_statuses[hashX] = status else: self.mempool_statuses.pop(hashX, None) - return status async def hashX_listunspent(self, hashX): From 28838c17590521a10368890f030e5fa7f5b6304b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 16 Jun 2020 23:07:19 -0400 Subject: [PATCH 18/19] notifications_in_flight_metric --- lbry/wallet/ledger.py | 2 +- lbry/wallet/server/session.py | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 54c26f521..fbd90aa53 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -697,7 +697,7 @@ class Ledger(metaclass=LedgerRegistry): local_height, height ) return False - log.warning( + log.debug( "local history does not contain %s, requested height %i", tx.id, height ) return False diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6a831ec1a..d3e9634b1 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -165,6 +165,10 @@ class SessionManager: "address_history", "Time to fetch an address history", namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) + notifications_in_flight_metric = Gauge( + "notifications_in_flight", "Count of notifications in flight", + namespace=NAMESPACE + ) def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', shutdown_event: asyncio.Event): @@ -934,9 +938,13 @@ class LBRYElectrumX(SessionBase): method = 'blockchain.scripthash.subscribe' else: method = 'blockchain.address.subscribe' - status = await self.address_status(hashX) - self.session_mgr.address_history_metric.observe(time.perf_counter() - start) - await self.send_notification(method, (alias, status)) + try: + self.session_mgr.notifications_in_flight_metric.inc() + status = await self.address_status(hashX) + self.session_mgr.address_history_metric.observe(time.perf_counter() - start) + await self.send_notification(method, (alias, status)) + finally: + self.session_mgr.notifications_in_flight_metric.dec() touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): From ac1a8b4dafaa168218385ef4d0c4c73b9e7465cc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 18 Jun 2020 12:39:19 -0400 Subject: [PATCH 19/19] metric for time to send notifications --- lbry/wallet/server/session.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index d3e9634b1..748060da2 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -169,6 +169,10 @@ class SessionManager: "notifications_in_flight", "Count of notifications in flight", namespace=NAMESPACE ) + notifications_sent_metric = Histogram( + "notifications_sent", "Time to send an address notification", + namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', shutdown_event: asyncio.Event): @@ -942,7 +946,9 @@ class LBRYElectrumX(SessionBase): self.session_mgr.notifications_in_flight_metric.inc() status = await self.address_status(hashX) self.session_mgr.address_history_metric.observe(time.perf_counter() - start) + start = time.perf_counter() await self.send_notification(method, (alias, status)) + self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start) finally: self.session_mgr.notifications_in_flight_metric.dec()