From bdc7f4b3f5f76db945299084c92940d058dd5d1e Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Mon, 9 Nov 2020 13:09:00 -0500 Subject: [PATCH 1/5] combine tx_count_db and hashes_db, add tx db --- lbry/wallet/server/block_processor.py | 6 +-- lbry/wallet/server/leveldb.py | 59 ++++++++++++--------------- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 7fe0ae059..68c735265 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -171,7 +171,7 @@ class BlockProcessor: # Caches of unflushed items. self.headers = [] - self.tx_hashes = [] + self.block_txs = [] self.undo_infos = [] # UTXO cache @@ -337,7 +337,7 @@ class BlockProcessor: """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() return FlushData(self.height, self.tx_count, self.headers, - self.tx_hashes, self.undo_infos, self.utxo_cache, + self.block_txs, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) async def flush(self, flush_utxos): @@ -404,7 +404,7 @@ class BlockProcessor: self.tip = self.coin.header_hash(headers[-1]) def advance_txs(self, height, txs, header): - self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs)) + self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx for tx, _ in txs])) # Use local vars for speed in the loops undo_info = [] diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index ccd3fcd60..a7f84afe6 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -35,6 +35,7 @@ UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") HEADER_PREFIX = b'H' TX_COUNT_PREFIX = b'T' TX_HASH_PREFIX = b'X' +TX_PREFIX = b'B' @attr.s(slots=True) @@ -42,7 +43,7 @@ class FlushData: height = attr.ib() tx_count = attr.ib() headers = attr.ib() - block_tx_hashes = attr.ib() + block_txs = attr.ib() # The following are flushed to the UTXO DB if undo_infos is not None undo_infos = attr.ib() adds = attr.ib() @@ -82,9 +83,8 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - self.hashes_db = None self.headers_db = None - self.tx_count_db = None + self.tx_db = None async def _read_tx_counts(self): if self.tx_counts is not None: @@ -95,7 +95,7 @@ class LevelDB: def get_counts(): return tuple( util.unpack_be_uint64(tx_count) - for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) ) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) @@ -123,17 +123,11 @@ class LevelDB: self.logger.info('created new headers db') self.logger.info(f'opened headers DB (for sync: {for_sync})') - assert self.tx_count_db is None - self.tx_count_db = self.db_class('tx_count', for_sync) - if self.tx_count_db.is_new: - self.logger.info('created new tx count db') - self.logger.info(f'opened tx count DB (for sync: {for_sync})') - - assert self.hashes_db is None - self.hashes_db = self.db_class('hashes', for_sync) - if self.hashes_db.is_new: - self.logger.info('created new tx hashes db') - self.logger.info(f'opened tx hashes DB (for sync: {for_sync})') + assert self.tx_db is None + self.tx_db = self.db_class('tx', for_sync) + if self.tx_db.is_new: + self.logger.info('created new tx db') + self.logger.info(f'opened tx DB (for sync: {for_sync})') assert self.utxo_db is None # First UTXO DB @@ -156,8 +150,7 @@ class LevelDB: self.utxo_db.close() self.history.close_db() self.headers_db.close() - self.tx_count_db.close() - self.hashes_db.close() + self.tx_db.close() self.executor.shutdown(wait=True) self.executor = None @@ -187,12 +180,9 @@ class LevelDB: if self.headers_db: self.headers_db.close() self.headers_db = None - if self.tx_count_db: - self.tx_count_db.close() - self.tx_count_db = None - if self.hashes_db: - self.hashes_db.close() - self.hashes_db = None + if self.tx_db: + self.tx_db.close() + self.tx_db = None await self._open_dbs(False, False) self.logger.info("opened for serving") @@ -217,7 +207,7 @@ class LevelDB: assert flush_data.height == self.fs_height == self.db_height assert flush_data.tip == self.db_tip assert not flush_data.headers - assert not flush_data.block_tx_hashes + assert not flush_data.block_txs assert not flush_data.adds assert not flush_data.deletes assert not flush_data.undo_infos @@ -275,30 +265,33 @@ class LevelDB: """ prior_tx_count = (self.tx_counts[self.fs_height] if self.fs_height >= 0 else 0) - assert len(flush_data.block_tx_hashes) == len(flush_data.headers) + assert len(flush_data.block_txs) == len(flush_data.headers) assert flush_data.height == self.fs_height + len(flush_data.headers) assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts else 0) assert len(self.tx_counts) == flush_data.height + 1 - assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count + assert len( + b''.join(hashes for hashes, _ in flush_data.block_txs) + ) // 32 == flush_data.tx_count - prior_tx_count # Write the headers, tx counts, and tx hashes start_time = time.perf_counter() height_start = self.fs_height + 1 tx_num = prior_tx_count - for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes): + for header, (tx_hashes, txs) in zip(flush_data.headers, flush_data.block_txs): tx_count = self.tx_counts[height_start] self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) - self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) + self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 offset = 0 while offset < len(tx_hashes): - self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) + self.tx_db.put(TX_PREFIX + util.pack_be_uint64(tx_num), txs[offset // 32]) tx_num += 1 offset += 32 - flush_data.block_tx_hashes.clear() + flush_data.block_txs.clear() self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count flush_data.headers.clear() @@ -362,7 +355,7 @@ class LevelDB: def flush_backup(self, flush_data, touched): """Like flush_dbs() but when backing up. All UTXOs are flushed.""" assert not flush_data.headers - assert not flush_data.block_tx_hashes + assert not flush_data.block_txs assert flush_data.height < self.db_height self.history.assert_flushed() @@ -431,7 +424,7 @@ class LevelDB: if tx_height > self.db_height: tx_hash = None else: - tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height async def fs_block_hashes(self, height, count): @@ -463,7 +456,7 @@ class LevelDB: if tx_height > self.db_height: tx_hash = None else: - tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) hashx_history.append((tx_hash, tx_height)) if limit and len(hashx_history) >= limit: return hashx_history From 83bcab9cd2e0723ff2a999aba9e2720c0b69b27f Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Mon, 9 Nov 2020 13:56:46 -0500 Subject: [PATCH 2/5] cleanup --- lbry/wallet/server/block_processor.py | 39 +++++++++------------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 68c735265..72d72cef9 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -404,44 +404,36 @@ class BlockProcessor: self.tip = self.coin.header_hash(headers[-1]) def advance_txs(self, height, txs, header): - self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx for tx, _ in txs])) + self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) # Use local vars for speed in the loops undo_info = [] tx_num = self.tx_count - script_hashX = self.coin.hashX_from_script - s_pack = pack - put_utxo = self.utxo_cache.__setitem__ - spend_utxo = self.spend_utxo - undo_info_append = undo_info.append - update_touched = self.touched.update hashXs_by_tx = [] - append_hashXs = hashXs_by_tx.append for tx, tx_hash in txs: hashXs = [] append_hashX = hashXs.append - tx_numb = s_pack('<I', tx_num) + tx_numb = pack('<I', tx_num) # Spend the inputs for txin in tx.inputs: if txin.is_generation(): continue - cache_value = spend_utxo(txin.prev_hash, txin.prev_idx) - undo_info_append(cache_value) + cache_value = self.spend_utxo(txin.prev_hash, txin.prev_idx) + undo_info.append(cache_value) append_hashX(cache_value[:-12]) # Add the new UTXOs for idx, txout in enumerate(tx.outputs): # Get the hashX. Ignore unspendable outputs - hashX = script_hashX(txout.pk_script) + hashX = self.coin.hashX_from_script(txout.pk_script) if hashX: append_hashX(hashX) - put_utxo(tx_hash + s_pack('<H', idx), - hashX + tx_numb + s_pack('<Q', txout.value)) + self.utxo_cache[tx_hash + pack('<H', idx)] = hashX + tx_numb + pack('<Q', txout.value) - append_hashXs(hashXs) - update_touched(hashXs) + hashXs_by_tx.append(hashXs) + self.touched.update(hashXs) tx_num += 1 self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) @@ -487,20 +479,16 @@ class BlockProcessor: # Use local vars for speed in the loops s_pack = pack - put_utxo = self.utxo_cache.__setitem__ - spend_utxo = self.spend_utxo - script_hashX = self.coin.hashX_from_script - touched = self.touched undo_entry_len = 12 + HASHX_LEN for tx, tx_hash in reversed(txs): for idx, txout in enumerate(tx.outputs): # Spend the TX outputs. Be careful with unspendable # outputs - we didn't save those in the first place. - hashX = script_hashX(txout.pk_script) + hashX = self.coin.hashX_from_script(txout.pk_script) if hashX: - cache_value = spend_utxo(tx_hash, idx) - touched.add(cache_value[:-12]) + cache_value = self.spend_utxo(tx_hash, idx) + self.touched.add(cache_value[:-12]) # Restore the inputs for txin in reversed(tx.inputs): @@ -508,9 +496,8 @@ class BlockProcessor: continue n -= undo_entry_len undo_item = undo_info[n:n + undo_entry_len] - put_utxo(txin.prev_hash + s_pack('<H', txin.prev_idx), - undo_item) - touched.add(undo_item[:-12]) + self.utxo_cache[txin.prev_hash + s_pack('<H', txin.prev_idx)] = undo_item + self.touched.add(undo_item[:-12]) assert n == 0 self.tx_count -= len(txs) From 1694af8b5e55103253c38e1af3333530bd861a1c Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Mon, 9 Nov 2020 15:34:42 -0500 Subject: [PATCH 3/5] get transactions from leveldb instead of lbrycrd rpc --- lbry/wallet/ledger.py | 2 +- lbry/wallet/server/block_processor.py | 35 ++++--- lbry/wallet/server/leveldb.py | 128 +++++++++++++++++++++++--- lbry/wallet/server/merkle.py | 2 +- lbry/wallet/server/session.py | 29 ++++-- lbry/wallet/server/storage.py | 10 +- 6 files changed, 161 insertions(+), 45 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 77666e509..1676b01f1 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -722,7 +722,7 @@ class Ledger(metaclass=LedgerRegistry): if cache_item is None: cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx = cache_item.tx or Transaction(bytes.fromhex(raw), height=remote_height) tx.height = remote_height cache_item.tx = tx if 'merkle' in merkle and remote_heights[txid] > 0: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 72d72cef9..bfd70ec50 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -171,6 +171,7 @@ class BlockProcessor: # Caches of unflushed items. self.headers = [] + self.block_hashes = [] self.block_txs = [] self.undo_infos = [] @@ -336,7 +337,7 @@ class BlockProcessor: def flush_data(self): """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.headers, + return FlushData(self.height, self.tx_count, self.headers, self.block_hashes, self.block_txs, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) @@ -392,7 +393,8 @@ class BlockProcessor: for block in blocks: height += 1 undo_info = self.advance_txs( - height, block.transactions, self.coin.electrum_header(block.header, height) + height, block.transactions, self.coin.electrum_header(block.header, height), + self.coin.header_hash(block.header) ) if height >= min_height: self.undo_infos.append((undo_info, height)) @@ -403,14 +405,22 @@ class BlockProcessor: self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) - def advance_txs(self, height, txs, header): + def advance_txs(self, height, txs, header, block_hash): + self.block_hashes.append(block_hash) self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) - # Use local vars for speed in the loops undo_info = [] tx_num = self.tx_count hashXs_by_tx = [] + # Use local vars for speed in the loops + put_utxo = self.utxo_cache.__setitem__ + spend_utxo = self.spend_utxo + undo_info_append = undo_info.append + update_touched = self.touched.update + append_hashX_by_tx = hashXs_by_tx.append + hashX_from_script = self.coin.hashX_from_script + for tx, tx_hash in txs: hashXs = [] append_hashX = hashXs.append @@ -420,24 +430,23 @@ class BlockProcessor: for txin in tx.inputs: if txin.is_generation(): continue - cache_value = self.spend_utxo(txin.prev_hash, txin.prev_idx) - undo_info.append(cache_value) + cache_value = spend_utxo(txin.prev_hash, txin.prev_idx) + undo_info_append(cache_value) append_hashX(cache_value[:-12]) # Add the new UTXOs for idx, txout in enumerate(tx.outputs): # Get the hashX. Ignore unspendable outputs - hashX = self.coin.hashX_from_script(txout.pk_script) + hashX = hashX_from_script(txout.pk_script) if hashX: append_hashX(hashX) - self.utxo_cache[tx_hash + pack('<H', idx)] = hashX + tx_numb + pack('<Q', txout.value) + put_utxo(tx_hash + pack('<H', idx), hashX + tx_numb + pack('<Q', txout.value)) - hashXs_by_tx.append(hashXs) - self.touched.update(hashXs) + append_hashX_by_tx(hashXs) + update_touched(hashXs) tx_num += 1 self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) - self.tx_count = tx_num self.db.tx_counts.append(tx_num) @@ -757,9 +766,9 @@ class LBRYBlockProcessor(BlockProcessor): self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES') self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES') - def advance_txs(self, height, txs, header): + def advance_txs(self, height, txs, header, block_hash): timer = self.timer.sub_timers['advance_blocks'] - undo = timer.run(super().advance_txs, height, txs, header, timer_name='super().advance_txs') + undo = timer.run(super().advance_txs, height, txs, header, block_hash, timer_name='super().advance_txs') timer.run(self.sql.advance_txs, height, txs, header, self.daemon.cached_height(), forward_timer=True) if (height % 10000 == 0 or not self.db.first_sync) and self.logger.isEnabledFor(10): self.timer.show(height=height) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index a7f84afe6..a7b24e657 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -18,6 +18,7 @@ from asyncio import sleep from bisect import bisect_right from collections import namedtuple from functools import partial +from binascii import unhexlify, hexlify from glob import glob from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor @@ -36,6 +37,9 @@ HEADER_PREFIX = b'H' TX_COUNT_PREFIX = b'T' TX_HASH_PREFIX = b'X' TX_PREFIX = b'B' +TX_NUM_PREFIX = b'N' +BLOCK_HASH_PREFIX = b'C' + @attr.s(slots=True) @@ -43,6 +47,7 @@ class FlushData: height = attr.ib() tx_count = attr.ib() headers = attr.ib() + block_hashes = attr.ib() block_txs = attr.ib() # The following are flushed to the UTXO DB if undo_infos is not None undo_infos = attr.ib() @@ -279,19 +284,24 @@ class LevelDB: height_start = self.fs_height + 1 tx_num = prior_tx_count - for header, (tx_hashes, txs) in zip(flush_data.headers, flush_data.block_txs): + for header, block_hash, (tx_hashes, txs) in zip( + flush_data.headers, flush_data.block_hashes, flush_data.block_txs): tx_count = self.tx_counts[height_start] self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + self.tx_db.put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) height_start += 1 offset = 0 while offset < len(tx_hashes): self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) - self.tx_db.put(TX_PREFIX + util.pack_be_uint64(tx_num), txs[offset // 32]) + self.tx_db.put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num)) + self.tx_db.put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32]) tx_num += 1 offset += 32 flush_data.block_txs.clear() + flush_data.block_hashes.clear() + self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count flush_data.headers.clear() @@ -416,6 +426,19 @@ class LevelDB: return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) + async def fs_block_tx_hashes(self, height): + def _get_tx_hashes(): + return self.tx_db.get(BLOCK_HASH_PREFIX + util.pack_be_uint64(height)).hex().decode(), list(map( + lambda tx_hash: hash_to_hex_str(tx_hash).decode(), + self.tx_db.iterator( + start=TX_HASH_PREFIX + util.pack_be_uint64(self.tx_counts[height - 1]), + stop=None if height + 1 == len(self.tx_counts) else TX_HASH_PREFIX + util.pack_be_uint64(self.tx_counts[height]), + include_key=False + ) + )) + + return await asyncio.get_event_loop().run_in_executor(self.executor, _get_tx_hashes) + def fs_tx_hash(self, tx_num): """Return a par (tx_hash, tx_height) for the given tx number. @@ -427,6 +450,51 @@ class LevelDB: tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height + def _fs_transactions(self, txids): + def _iter_transactions(): + block_txs = {} + branch_and_root = self.merkle.branch_and_root + tx_iterator = self.tx_db.iterator + tx_counts = self.tx_counts + tx_db_get = self.tx_db.get + unpack_be_uint64 = util.unpack_be_uint64 + pack_be_uint64 = util.pack_be_uint64 + + for tx_hash in txids: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) + if tx_num is not None: + tx_num = unpack_be_uint64(tx_num) + tx_height = bisect_right(tx_counts, tx_num) + else: + yield tx_hash, (None, {'block_height': -1}) + continue + if tx_height >= self.db_height: + yield tx_hash, (None, {'block_height': -1}) + continue + tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + if tx_height not in block_txs: + block_txs[tx_height] = list(tx_iterator( + start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]), + stop=None if tx_height + 1 == len(tx_counts) else + TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height]), + include_key=False + )) + tx_pos = tx_counts[tx_height] - tx_num + branch, root = branch_and_root(block_txs[tx_height], tx_pos) + merkle = { + 'block_height': tx_height, + 'merkle': [hash_to_hex_str(hash) for hash in branch], + 'pos': tx_pos + } + yield tx_hash, (None if not tx else tx.hex(), merkle) + return { + _tx_hash: _val for (_tx_hash, _val) in _iter_transactions() + } + + async def fs_transactions(self, txids): + return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) + async def fs_block_hashes(self, height, count): headers_concat, headers_count = await self.read_headers(height, count) if headers_count != count: @@ -446,25 +514,54 @@ class LevelDB: transactions. By default returns at most 1000 entries. Set limit to None to get them all. """ - def read_history(): - hashx_history = [] + # def read_history(): + # hashx_history = [] + # for key, hist in self.history.db.iterator(prefix=hashX): + # a = array.array('I') + # a.frombytes(hist) + # for tx_num in a: + # tx_height = bisect_right(self.tx_counts, tx_num) + # if tx_height > self.db_height: + # tx_hash = None + # else: + # tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + # + # hashx_history.append((tx_hash, tx_height)) + # if limit and len(hashx_history) >= limit: + # return hashx_history + # return hashx_history + + def iter_tx_heights(): + db_height = self.db_height + tx_counts = self.tx_counts + tx_db_get = self.tx_db.get + pack_be_uint64 = util.pack_be_uint64 + + cnt = 0 + for key, hist in self.history.db.iterator(prefix=hashX): a = array.array('I') a.frombytes(hist) for tx_num in a: - tx_height = bisect_right(self.tx_counts, tx_num) - if tx_height > self.db_height: - tx_hash = None - else: - tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) - hashx_history.append((tx_hash, tx_height)) - if limit and len(hashx_history) >= limit: - return hashx_history - return hashx_history + tx_height = bisect_right(tx_counts, tx_num) + if tx_height > db_height: + yield None, tx_height + return + yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height + cnt += 1 + if limit and cnt >= limit: + return + if limit and cnt >= limit: + return + + def read_history(): + return [ + (tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights() + ] while True: history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) - if all(hash is not None for hash, height in history): + if not history or history[-1][0] is not None: return history self.logger.warning(f'limited_history: tx hash ' f'not found (reorg?), retrying...') @@ -622,13 +719,14 @@ class LevelDB: utxos = [] utxos_append = utxos.append s_unpack = unpack + fs_tx_hash = self.fs_tx_hash # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer prefix = b'u' + hashX for db_key, db_value in self.utxo_db.iterator(prefix=prefix): tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) value, = unpack('<Q', db_value) - tx_hash, height = self.fs_tx_hash(tx_num) + tx_hash, height = fs_tx_hash(tx_num) utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value)) return utxos diff --git a/lbry/wallet/server/merkle.py b/lbry/wallet/server/merkle.py index 174e77b8e..1a42b0185 100644 --- a/lbry/wallet/server/merkle.py +++ b/lbry/wallet/server/merkle.py @@ -63,7 +63,7 @@ class Merkle: raise TypeError('index must be an integer') # This also asserts hashes is not empty if not 0 <= index < len(hashes): - raise ValueError('index out of range') + raise ValueError(f"index '{index}/{len(hashes)}' out of range") natural_length = self.branch_length(len(hashes)) if length is None: length = natural_length diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9e897e29a..9f4a7ba52 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -17,7 +17,7 @@ from asyncio import Event, sleep from collections import defaultdict from functools import partial -from binascii import hexlify +from binascii import hexlify, unhexlify from pylru import lrucache from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from prometheus_client import Counter, Info, Histogram, Gauge @@ -141,7 +141,11 @@ class SessionManager: session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, labelnames=("version",)) request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE, - labelnames=("method", "version")) + labelnames=("method", "version")) + tx_request_count_metric = Counter("requested_transaction", "Number of transactions requested", namespace=NAMESPACE) + tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) + urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) + resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE) interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) db_operational_error_metric = Counter( @@ -1045,7 +1049,12 @@ class LBRYElectrumX(SessionBase): async def claimtrie_resolve(self, *urls): if urls: - return await self.run_and_cache_query('resolve', reader.resolve_to_bytes, urls) + count = len(urls) + try: + self.session_mgr.urls_to_resolve_count_metric.inc(count) + return await self.run_and_cache_query('resolve', reader.resolve_to_bytes, urls) + finally: + self.session_mgr.resolved_url_count_metric.inc(count) async def get_server_height(self): return self.bp.height @@ -1524,6 +1533,7 @@ class LBRYElectrumX(SessionBase): async def transaction_info(self, tx_hash: str): assert_tx_hash(tx_hash) + self.session_mgr.tx_request_count_metric.inc() tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) raw_tx = tx_info['hex'] block_hash = tx_info.get('blockhash') @@ -1531,15 +1541,19 @@ class LBRYElectrumX(SessionBase): return raw_tx, {'block_height': -1} merkle_height = (await self.daemon.deserialised_block(block_hash))['height'] merkle = await self.transaction_merkle(tx_hash, merkle_height) + self.session_mgr.tx_replied_count_metric.inc() return raw_tx, merkle async def transaction_get_batch(self, *tx_hashes): + self.session_mgr.tx_request_count_metric.inc(len(tx_hashes)) if len(tx_hashes) > 100: raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') for tx_hash in tx_hashes: assert_tx_hash(tx_hash) - batch_result = {} + batch_result = await self.db.fs_transactions(tx_hashes) for tx_hash in tx_hashes: + if tx_hash in batch_result and batch_result[tx_hash][0]: + continue tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) raw_tx = tx_info['hex'] block_hash = tx_info.get('blockhash') @@ -1558,6 +1572,7 @@ class LBRYElectrumX(SessionBase): height = -1 merkle['block_height'] = height batch_result[tx_hash] = [raw_tx, merkle] + self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result async def transaction_get(self, tx_hash, verbose=False): @@ -1579,11 +1594,7 @@ class LBRYElectrumX(SessionBase): block_hash is a hexadecimal string, and tx_hashes is an ordered list of hexadecimal strings. """ - height = non_negative_integer(height) - hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) - block_hash = hex_hashes[0] - block = await self.daemon.deserialised_block(block_hash) - return block_hash, block['tx'] + return await self.db.fs_block_tx_hashes(height) def _get_merkle_branch(self, tx_hashes, tx_pos): """Return a merkle branch to a transaction. diff --git a/lbry/wallet/server/storage.py b/lbry/wallet/server/storage.py index 5e7db97dd..127166204 100644 --- a/lbry/wallet/server/storage.py +++ b/lbry/wallet/server/storage.py @@ -77,18 +77,16 @@ class LevelDB(Storage): import plyvel cls.module = plyvel - def open(self, name, create): - mof = 512 if self.for_sync else 128 + def open(self, name, create, lru_cache_size=None): + mof = 10000 path = os.path.join(self.db_dir, name) # Use snappy compression (the default) - self.db = self.module.DB(path, create_if_missing=create, - max_open_files=mof) + self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof) self.close = self.db.close self.get = self.db.get self.put = self.db.put self.iterator = self.db.iterator - self.write_batch = partial(self.db.write_batch, transaction=True, - sync=True) + self.write_batch = partial(self.db.write_batch, transaction=True, sync=True) class RocksDB(Storage): From 48aeb26e02cd929ba1f51f4dba80db933f20a67d Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Tue, 17 Nov 2020 16:05:08 -0500 Subject: [PATCH 4/5] threaded_get_merkle -remove dead code --- lbry/wallet/server/session.py | 64 +++++++++++------------------------ 1 file changed, 19 insertions(+), 45 deletions(-) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9f4a7ba52..520613892 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1532,17 +1532,7 @@ class LBRYElectrumX(SessionBase): f'network rules.\n\n{message}\n[{raw_tx}]') async def transaction_info(self, tx_hash: str): - assert_tx_hash(tx_hash) - self.session_mgr.tx_request_count_metric.inc() - tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) - raw_tx = tx_info['hex'] - block_hash = tx_info.get('blockhash') - if not block_hash: - return raw_tx, {'block_height': -1} - merkle_height = (await self.daemon.deserialised_block(block_hash))['height'] - merkle = await self.transaction_merkle(tx_hash, merkle_height) - self.session_mgr.tx_replied_count_metric.inc() - return raw_tx, merkle + return (await self.transaction_get_batch(tx_hash))[tx_hash] async def transaction_get_batch(self, *tx_hashes): self.session_mgr.tx_request_count_metric.inc(len(tx_hashes)) @@ -1551,13 +1541,13 @@ class LBRYElectrumX(SessionBase): for tx_hash in tx_hashes: assert_tx_hash(tx_hash) batch_result = await self.db.fs_transactions(tx_hashes) + needed_merkles = {} for tx_hash in tx_hashes: if tx_hash in batch_result and batch_result[tx_hash][0]: continue tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) raw_tx = tx_info['hex'] block_hash = tx_info.get('blockhash') - merkle = {} if block_hash: block = await self.daemon.deserialised_block(block_hash) height = block['height'] @@ -1566,12 +1556,20 @@ class LBRYElectrumX(SessionBase): except ValueError: raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' f'block {block_hash} at height {height:,d}') - merkle["merkle"] = self._get_merkle_branch(block['tx'], pos) - merkle["pos"] = pos + needed_merkles[tx_hash] = raw_tx, block['tx'], pos, height else: - height = -1 - merkle['block_height'] = height - batch_result[tx_hash] = [raw_tx, merkle] + batch_result[tx_hash] = [raw_tx, {'block_height': -1}] + + def threaded_get_merkle(): + for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items(): + batch_result[tx_hash] = raw_tx, { + 'merkle': self._get_merkle_branch(block_txs, pos), + 'pos': pos, + 'block_height': block_height + } + if needed_merkles: + await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle) + self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result @@ -1615,35 +1613,11 @@ class LBRYElectrumX(SessionBase): height: the height of the block it is in """ assert_tx_hash(tx_hash) - block_hash, tx_hashes = await self._block_hash_and_tx_hashes(height) - try: - pos = tx_hashes.index(tx_hash) - except ValueError: + result = await self.transaction_get_batch(tx_hash) + if tx_hash not in result or result[tx_hash][1]['block_height'] <= 0: raise RPCError(BAD_REQUEST, f'tx hash {tx_hash} not in ' - f'block {block_hash} at height {height:,d}') - branch = self._get_merkle_branch(tx_hashes, pos) - return {"block_height": height, "merkle": branch, "pos": pos} - - async def transaction_id_from_pos(self, height, tx_pos, merkle=False): - """Return the txid and optionally a merkle proof, given - a block height and position in the block. - """ - tx_pos = non_negative_integer(tx_pos) - if merkle not in (True, False): - raise RPCError(BAD_REQUEST, f'"merkle" must be a boolean') - - block_hash, tx_hashes = await self._block_hash_and_tx_hashes(height) - try: - tx_hash = tx_hashes[tx_pos] - except IndexError: - raise RPCError(BAD_REQUEST, f'no tx at position {tx_pos:,d} in ' - f'block {block_hash} at height {height:,d}') - - if merkle: - branch = self._get_merkle_branch(tx_hashes, tx_pos) - return {"tx_hash": tx_hash, "merkle": branch} - else: - return tx_hash + f'block at height {height:,d}') + return result[tx_hash][1] class LocalRPC(SessionBase): From 632d8d02d2a501af2b1292f48aedb2f8e7d994a8 Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Tue, 17 Nov 2020 16:17:24 -0500 Subject: [PATCH 5/5] remove dead code --- lbry/wallet/server/leveldb.py | 13 ------------- lbry/wallet/server/session.py | 9 --------- 2 files changed, 22 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index a7b24e657..437240072 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -426,19 +426,6 @@ class LevelDB: return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) - async def fs_block_tx_hashes(self, height): - def _get_tx_hashes(): - return self.tx_db.get(BLOCK_HASH_PREFIX + util.pack_be_uint64(height)).hex().decode(), list(map( - lambda tx_hash: hash_to_hex_str(tx_hash).decode(), - self.tx_db.iterator( - start=TX_HASH_PREFIX + util.pack_be_uint64(self.tx_counts[height - 1]), - stop=None if height + 1 == len(self.tx_counts) else TX_HASH_PREFIX + util.pack_be_uint64(self.tx_counts[height]), - include_key=False - ) - )) - - return await asyncio.get_event_loop().run_in_executor(self.executor, _get_tx_hashes) - def fs_tx_hash(self, tx_num): """Return a par (tx_hash, tx_height) for the given tx number. diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 520613892..2e4c5d0e6 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1585,15 +1585,6 @@ class LBRYElectrumX(SessionBase): return await self.daemon_request('getrawtransaction', tx_hash, verbose) - async def _block_hash_and_tx_hashes(self, height): - """Returns a pair (block_hash, tx_hashes) for the main chain block at - the given height. - - block_hash is a hexadecimal string, and tx_hashes is an - ordered list of hexadecimal strings. - """ - return await self.db.fs_block_tx_hashes(height) - def _get_merkle_branch(self, tx_hashes, tx_pos): """Return a merkle branch to a transaction.