diff --git a/lbry/wallet/database.py b/lbry/wallet/database.py index bc6d880f8..55a10b62b 100644 --- a/lbry/wallet/database.py +++ b/lbry/wallet/database.py @@ -529,13 +529,14 @@ def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decode def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: List, amount_to_reserve: int, floor: int, - fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool): + fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool, + base_multiplier: int = 100): txs = defaultdict(list) decoded_transactions = {} reserved = [] reserved_dewies = 0 - multiplier = 10 + multiplier = base_multiplier gap_count = 0 while reserved_dewies < amount_to_reserve and gap_count < 5 and floor * multiplier < SQLITE_MAX_INTEGER: @@ -550,7 +551,7 @@ def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: L multiplier **= 2 else: gap_count = 0 - multiplier = 10 + multiplier = base_multiplier # reserve the accumulated txos if enough were found if reserved_dewies >= amount_to_reserve: @@ -762,7 +763,7 @@ class Database(SQLiteMixin): # 2. update address histories removing deleted TXs return True - async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000, + async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 1, fee_per_byte: int = 50, set_reserved: bool = True, return_insufficient_funds: bool = False) -> List: to_spend = await self.db.run( diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 997a0dfeb..fbd90aa53 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -244,8 +244,7 @@ class Ledger(metaclass=LedgerRegistry): def get_address_count(self, **constraints): return self.db.get_address_count(**constraints) - async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], - min_amount=100000): + async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], min_amount=1): min_amount = min(amount // 10, min_amount) fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self) selector = CoinSelector(amount, fee) @@ -698,7 +697,7 @@ class Ledger(metaclass=LedgerRegistry): local_height, height ) return False - log.warning( + log.debug( "local history does not contain %s, requested height %i", tx.id, height ) return False diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 437b24f18..eceb62e08 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -251,7 +251,7 @@ class BlockProcessor: async def get_raw_blocks(last_height, hex_hashes): heights = range(last_height, last_height - len(hex_hashes), -1) try: - blocks = [self.db.read_raw_block(height) for height in heights] + blocks = [await self.db.read_raw_block(height) for height in heights] self.logger.info(f'read {len(blocks)} blocks from disk') return blocks except FileNotFoundError: @@ -351,11 +351,9 @@ class BlockProcessor: # performed on the DB. if self._caught_up_event.is_set(): await self.flush(True) - elif time.time() > self.next_cache_check: - flush_arg = self.check_cache_size() - if flush_arg is not None: - await self.flush(flush_arg) - self.next_cache_check = time.time() + 30 + elif time.perf_counter() > self.next_cache_check: + await self.flush(True) + self.next_cache_check = time.perf_counter() + 30 def check_cache_size(self): """Flush a cache if it gets too big.""" diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index cdcbbe50a..ccd3fcd60 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -32,6 +32,9 @@ from lbry.wallet.server.history import History UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") +HEADER_PREFIX = b'H' +TX_COUNT_PREFIX = b'T' +TX_HASH_PREFIX = b'X' @attr.s(slots=True) @@ -63,15 +66,7 @@ class LevelDB: self.logger = util.class_logger(__name__, self.__class__.__name__) self.env = env self.coin = env.coin - self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) - - # Setup block header size handlers - if self.coin.STATIC_BLOCK_HEADERS: - self.header_offset = self.coin.static_header_offset - self.header_len = self.coin.static_header_len - else: - self.header_offset = self.dynamic_header_offset - self.header_len = self.dynamic_header_len + self.executor = None self.logger.info(f'switching current directory to {env.db_dir}') @@ -87,51 +82,71 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - path = partial(os.path.join, self.env.db_dir) - self.headers_file = util.LogicalFile(path('meta/headers'), 2, 16000000) - self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000) - self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000) - if not self.coin.STATIC_BLOCK_HEADERS: - self.headers_offsets_file = util.LogicalFile( - path('meta/headers_offsets'), 2, 16000000) + self.hashes_db = None + self.headers_db = None + self.tx_count_db = None async def _read_tx_counts(self): if self.tx_counts is not None: return # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase - size = (self.db_height + 1) * 4 - tx_counts = self.tx_counts_file.read(0, size) - assert len(tx_counts) == size + + def get_counts(): + return tuple( + util.unpack_be_uint64(tx_count) + for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + ) + + tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) + assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}" self.tx_counts = array.array('I', tx_counts) + if self.tx_counts: - assert self.db_tx_count == self.tx_counts[-1] + assert self.db_tx_count == self.tx_counts[-1], \ + f"{self.db_tx_count} vs {self.tx_counts[-1]} ({len(self.tx_counts)} counts)" else: assert self.db_tx_count == 0 async def _open_dbs(self, for_sync, compacting): - assert self.utxo_db is None - - # First UTXO DB - self.utxo_db = self.db_class('utxo', for_sync) - if self.utxo_db.is_new: - self.logger.info('created new database') - self.logger.info('creating metadata directory') - os.mkdir(os.path.join(self.env.db_dir, 'meta')) - coin_path = os.path.join(self.env.db_dir, 'meta', 'COIN') + if self.executor is None: + self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) + coin_path = os.path.join(self.env.db_dir, 'COIN') + if not os.path.isfile(coin_path): with util.open_file(coin_path, create=True) as f: f.write(f'ElectrumX databases and metadata for ' f'{self.coin.NAME} {self.coin.NET}'.encode()) - if not self.coin.STATIC_BLOCK_HEADERS: - self.headers_offsets_file.write(0, bytes(8)) - else: - self.logger.info(f'opened UTXO DB (for sync: {for_sync})') + + assert self.headers_db is None + self.headers_db = self.db_class('headers', for_sync) + if self.headers_db.is_new: + self.logger.info('created new headers db') + self.logger.info(f'opened headers DB (for sync: {for_sync})') + + assert self.tx_count_db is None + self.tx_count_db = self.db_class('tx_count', for_sync) + if self.tx_count_db.is_new: + self.logger.info('created new tx count db') + self.logger.info(f'opened tx count DB (for sync: {for_sync})') + + assert self.hashes_db is None + self.hashes_db = self.db_class('hashes', for_sync) + if self.hashes_db.is_new: + self.logger.info('created new tx hashes db') + self.logger.info(f'opened tx hashes DB (for sync: {for_sync})') + + assert self.utxo_db is None + # First UTXO DB + self.utxo_db = self.db_class('utxo', for_sync) + if self.utxo_db.is_new: + self.logger.info('created new utxo db') + self.logger.info(f'opened utxo db (for sync: {for_sync})') self.read_utxo_state() # Then history DB - self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count, - compacting) + self.utxo_flush_count = self.history.open_db( + self.db_class, for_sync, self.utxo_flush_count, compacting + ) self.clear_excess_undo_info() # Read TX counts (requires meta directory) @@ -140,7 +155,11 @@ class LevelDB: def close(self): self.utxo_db.close() self.history.close_db() + self.headers_db.close() + self.tx_count_db.close() + self.hashes_db.close() self.executor.shutdown(wait=True) + self.executor = None async def open_for_compacting(self): await self._open_dbs(True, True) @@ -152,18 +171,31 @@ class LevelDB: synchronization. When serving clients we want the open files for serving network connections. """ + self.logger.info("opened for sync") await self._open_dbs(True, False) async def open_for_serving(self): """Open the databases for serving. If they are already open they are closed first. """ + self.logger.info('closing DBs to re-open for serving') if self.utxo_db: self.logger.info('closing DBs to re-open for serving') self.utxo_db.close() self.history.close_db() self.utxo_db = None + if self.headers_db: + self.headers_db.close() + self.headers_db = None + if self.tx_count_db: + self.tx_count_db.close() + self.tx_count_db = None + if self.hashes_db: + self.hashes_db.close() + self.hashes_db = None + await self._open_dbs(False, False) + self.logger.info("opened for serving") # Header merkle cache @@ -248,31 +280,30 @@ class LevelDB: assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts else 0) assert len(self.tx_counts) == flush_data.height + 1 - hashes = b''.join(flush_data.block_tx_hashes) - flush_data.block_tx_hashes.clear() - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count + assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes - start_time = time.time() + start_time = time.perf_counter() height_start = self.fs_height + 1 - offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(flush_data.headers)) - self.fs_update_header_offsets(offset, height_start, flush_data.headers) - flush_data.headers.clear() + tx_num = prior_tx_count - offset = height_start * self.tx_counts.itemsize - self.tx_counts_file.write(offset, - self.tx_counts[height_start:].tobytes()) - offset = prior_tx_count * 32 - self.hashes_file.write(offset, hashes) + for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes): + tx_count = self.tx_counts[height_start] + self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) + height_start += 1 + offset = 0 + while offset < len(tx_hashes): + self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + tx_num += 1 + offset += 32 + flush_data.block_tx_hashes.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count - - if self.utxo_db.for_sync: - elapsed = time.time() - start_time - self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') + flush_data.headers.clear() + elapsed = time.perf_counter() - start_time + self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') def flush_history(self): self.history.flush() @@ -350,28 +381,6 @@ class LevelDB: f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - def fs_update_header_offsets(self, offset_start, height_start, headers): - if self.coin.STATIC_BLOCK_HEADERS: - return - offset = offset_start - offsets = [] - for h in headers: - offset += len(h) - offsets.append(pack(" self.db_height: tx_hash = None else: - tx_hash = self.hashes_file.read(tx_num * 32, 32) + tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height async def fs_block_hashes(self, height, count): @@ -428,9 +441,8 @@ class LevelDB: offset = 0 headers = [] for n in range(count): - hlen = self.header_len(height + n) - headers.append(headers_concat[offset:offset + hlen]) - offset += hlen + headers.append(headers_concat[offset:offset + self.coin.BASIC_HEADER_SIZE]) + offset += self.coin.BASIC_HEADER_SIZE return [self.coin.header_hash(header) for header in headers] @@ -442,9 +454,20 @@ class LevelDB: limit to None to get them all. """ def read_history(): - tx_nums = list(self.history.get_txnums(hashX, limit)) - fs_tx_hash = self.fs_tx_hash - return [fs_tx_hash(tx_num) for tx_num in tx_nums] + hashx_history = [] + for key, hist in self.history.db.iterator(prefix=hashX): + a = array.array('I') + a.frombytes(hist) + for tx_num in a: + tx_height = bisect_right(self.tx_counts, tx_num) + if tx_height > self.db_height: + tx_hash = None + else: + tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + hashx_history.append((tx_hash, tx_height)) + if limit and len(hashx_history) >= limit: + return hashx_history + return hashx_history while True: history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) @@ -474,16 +497,20 @@ class LevelDB: batch_put(self.undo_key(height), b''.join(undo_info)) def raw_block_prefix(self): - return 'meta/block' + return 'block' def raw_block_path(self, height): return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}') - def read_raw_block(self, height): + async def read_raw_block(self, height): """Returns a raw block read from disk. Raises FileNotFoundError if the block isn't on-disk.""" - with util.open_file(self.raw_block_path(height)) as f: - return f.read(-1) + + def read(): + with util.open_file(self.raw_block_path(height)) as f: + return f.read(-1) + + return await asyncio.get_event_loop().run_in_executor(self.executor, read) def write_raw_block(self, block, height): """Write a raw block to disk.""" diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 03d3cd03c..7ca887892 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -13,6 +13,7 @@ import time from abc import ABC, abstractmethod from collections import defaultdict from concurrent.futures.thread import ThreadPoolExecutor +from prometheus_client import Histogram import attr @@ -79,6 +80,16 @@ class MemPoolAPI(ABC): daemon's height at the time the mempool was obtained.""" +NAMESPACE = "wallet_server" +HISTOGRAM_BUCKETS = ( + .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') +) +mempool_process_time_metric = Histogram( + "processed_mempool", "Time to process mempool and notify touched addresses", + namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS +) + + class MemPool: """Representation of the daemon's mempool. @@ -93,7 +104,7 @@ class MemPool: hashXs: hashX -> set of all hashes of txs touching the hashX """ - def __init__(self, coin, api, refresh_secs=5.0, log_status_secs=120.0): + def __init__(self, coin, api, refresh_secs=1.0, log_status_secs=120.0): assert isinstance(api, MemPoolAPI) self.coin = coin self.api = api @@ -107,6 +118,7 @@ class MemPool: self.lock = asyncio.Lock() self.wakeup = asyncio.Event() self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1)) + self.mempool_process_time_metric = mempool_process_time_metric async def _logging(self, synchronized_event): """Print regular logs of mempool stats.""" @@ -207,6 +219,7 @@ class MemPool: async def _refresh_hashes(self, synchronized_event): """Refresh our view of the daemon's mempool.""" while True: + start = time.perf_counter() height = self.api.cached_height() hex_hashes = await self.api.mempool_hashes() if height != await self.api.height(): @@ -217,6 +230,8 @@ class MemPool: synchronized_event.set() synchronized_event.clear() await self.api.on_mempool(touched, height) + duration = time.perf_counter() - start + self.mempool_process_time_metric.observe(duration) try: # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 3e6bfb771..748060da2 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -123,6 +123,7 @@ HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) + class SessionManager: """Holds global state about all sessions.""" @@ -160,6 +161,18 @@ class SessionManager: "clients", "Number of connections received per client version", namespace=NAMESPACE, labelnames=("version",) ) + address_history_metric = Histogram( + "address_history", "Time to fetch an address history", + namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) + notifications_in_flight_metric = Gauge( + "notifications_in_flight", "Count of notifications in flight", + namespace=NAMESPACE + ) + notifications_sent_metric = Histogram( + "notifications_sent", "Time to send an address notification", + namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS + ) def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool', shutdown_event: asyncio.Event): @@ -602,7 +615,6 @@ class SessionManager: async def broadcast_transaction(self, raw_tx): hex_hash = await self.daemon.broadcast_transaction(raw_tx) - self.mempool.wakeup.set() self.txs_sent += 1 return hex_hash @@ -923,36 +935,40 @@ class LBRYElectrumX(SessionBase): args = (await self.subscribe_headers_result(), ) if not (await self.send_notification('blockchain.headers.subscribe', args)): return + + async def send_history_notification(alias, hashX): + start = time.perf_counter() + if len(alias) == 64: + method = 'blockchain.scripthash.subscribe' + else: + method = 'blockchain.address.subscribe' + try: + self.session_mgr.notifications_in_flight_metric.inc() + status = await self.address_status(hashX) + self.session_mgr.address_history_metric.observe(time.perf_counter() - start) + start = time.perf_counter() + await self.send_notification(method, (alias, status)) + self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start) + finally: + self.session_mgr.notifications_in_flight_metric.dec() + touched = touched.intersection(self.hashX_subs) if touched or (height_changed and self.mempool_statuses): - changed = {} - + notified = set() + mempool_addrs = tuple(self.mempool_statuses.keys()) for hashX in touched: alias = self.hashX_subs[hashX] - status = await self.address_status(hashX) - changed[alias] = status - - # Check mempool hashXs - the status is a function of the - # confirmed state of other transactions. Note: we cannot - # iterate over mempool_statuses as it changes size. - for hashX in tuple(self.mempool_statuses): - # Items can be evicted whilst await-ing status; False - # ensures such hashXs are notified - old_status = self.mempool_statuses.get(hashX, False) - status = await self.address_status(hashX) - if status != old_status: + asyncio.create_task(send_history_notification(alias, hashX)) + notified.add(hashX) + for hashX in mempool_addrs: + if hashX not in notified: alias = self.hashX_subs[hashX] - changed[alias] = status + asyncio.create_task(send_history_notification(alias, hashX)) + notified.add(hashX) - for alias, status in changed.items(): - if len(alias) == 64: - method = 'blockchain.scripthash.subscribe' - else: - method = 'blockchain.address.subscribe' - asyncio.create_task(self.send_notification(method, (alias, status))) - if changed: - es = '' if len(changed) == 1 else 'es' - self.logger.info(f'notified of {len(changed):,d} address{es}') + if touched: + es = '' if len(touched) == 1 else 'es' + self.logger.info(f'notified {len(notified)} mempool/{len(touched):,d} touched address{es}') def get_metrics_or_placeholder_for_api(self, query_name): """ Do not hold on to a reference to the metrics @@ -1181,7 +1197,6 @@ class LBRYElectrumX(SessionBase): self.mempool_statuses[hashX] = status else: self.mempool_statuses.pop(hashX, None) - return status async def hashX_listunspent(self, hashX): @@ -1497,6 +1512,7 @@ class LBRYElectrumX(SessionBase): try: hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) self.txs_sent += 1 + self.mempool.wakeup.set() self.logger.info(f'sent tx: {hex_hash}') return hex_hash except DaemonError as e: diff --git a/lbry/wallet/server/util.py b/lbry/wallet/server/util.py index 915a6975c..bc27f7d51 100644 --- a/lbry/wallet/server/util.py +++ b/lbry/wallet/server/util.py @@ -333,11 +333,13 @@ unpack_le_uint64_from = struct_le_Q.unpack_from unpack_be_uint16_from = struct_be_H.unpack_from unpack_be_uint32_from = struct_be_I.unpack_from +unpack_be_uint64 = lambda x: int.from_bytes(x, byteorder='big') + pack_le_int32 = struct_le_i.pack pack_le_int64 = struct_le_q.pack pack_le_uint16 = struct_le_H.pack pack_le_uint32 = struct_le_I.pack -pack_le_uint64 = struct_le_Q.pack +pack_be_uint64 = lambda x: x.to_bytes(8, byteorder='big') pack_be_uint16 = struct_be_H.pack pack_be_uint32 = struct_be_I.pack pack_byte = structB.pack diff --git a/tests/integration/other/test_chris45.py b/tests/integration/other/test_chris45.py index e837a4ec5..51c3cd3e8 100644 --- a/tests/integration/other/test_chris45.py +++ b/tests/integration/other/test_chris45.py @@ -4,6 +4,7 @@ from lbry.testcase import CommandTestCase class EpicAdventuresOfChris45(CommandTestCase): async def test_no_this_is_not_a_test_its_an_adventure(self): + # Chris45 is an avid user of LBRY and this is his story. It's fact and fiction # and everything in between; it's also the setting of some record setting # integration tests. @@ -194,3 +195,9 @@ class EpicAdventuresOfChris45(CommandTestCase): }}, await self.resolve(uri) ) + + # He closes and opens the wallet server databases to see how horribly they break + db = self.conductor.spv_node.server.db + db.close() + await db.open_for_serving() + # They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup)