From 1694af8b5e55103253c38e1af3333530bd861a1c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 9 Nov 2020 15:34:42 -0500 Subject: [PATCH] get transactions from leveldb instead of lbrycrd rpc --- lbry/wallet/ledger.py | 2 +- lbry/wallet/server/block_processor.py | 35 ++++--- lbry/wallet/server/leveldb.py | 128 +++++++++++++++++++++++--- lbry/wallet/server/merkle.py | 2 +- lbry/wallet/server/session.py | 29 ++++-- lbry/wallet/server/storage.py | 10 +- 6 files changed, 161 insertions(+), 45 deletions(-) diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 77666e509..1676b01f1 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -722,7 +722,7 @@ class Ledger(metaclass=LedgerRegistry): if cache_item is None: cache_item = TransactionCacheItem() self._tx_cache[txid] = cache_item - tx = cache_item.tx or Transaction(unhexlify(raw), height=remote_height) + tx = cache_item.tx or Transaction(bytes.fromhex(raw), height=remote_height) tx.height = remote_height cache_item.tx = tx if 'merkle' in merkle and remote_heights[txid] > 0: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 72d72cef9..bfd70ec50 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -171,6 +171,7 @@ class BlockProcessor: # Caches of unflushed items. self.headers = [] + self.block_hashes = [] self.block_txs = [] self.undo_infos = [] @@ -336,7 +337,7 @@ class BlockProcessor: def flush_data(self): """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.headers, + return FlushData(self.height, self.tx_count, self.headers, self.block_hashes, self.block_txs, self.undo_infos, self.utxo_cache, self.db_deletes, self.tip) @@ -392,7 +393,8 @@ class BlockProcessor: for block in blocks: height += 1 undo_info = self.advance_txs( - height, block.transactions, self.coin.electrum_header(block.header, height) + height, block.transactions, self.coin.electrum_header(block.header, height), + self.coin.header_hash(block.header) ) if height >= min_height: self.undo_infos.append((undo_info, height)) @@ -403,14 +405,22 @@ class BlockProcessor: self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) - def advance_txs(self, height, txs, header): + def advance_txs(self, height, txs, header, block_hash): + self.block_hashes.append(block_hash) self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) - # Use local vars for speed in the loops undo_info = [] tx_num = self.tx_count hashXs_by_tx = [] + # Use local vars for speed in the loops + put_utxo = self.utxo_cache.__setitem__ + spend_utxo = self.spend_utxo + undo_info_append = undo_info.append + update_touched = self.touched.update + append_hashX_by_tx = hashXs_by_tx.append + hashX_from_script = self.coin.hashX_from_script + for tx, tx_hash in txs: hashXs = [] append_hashX = hashXs.append @@ -420,24 +430,23 @@ class BlockProcessor: for txin in tx.inputs: if txin.is_generation(): continue - cache_value = self.spend_utxo(txin.prev_hash, txin.prev_idx) - undo_info.append(cache_value) + cache_value = spend_utxo(txin.prev_hash, txin.prev_idx) + undo_info_append(cache_value) append_hashX(cache_value[:-12]) # Add the new UTXOs for idx, txout in enumerate(tx.outputs): # Get the hashX. Ignore unspendable outputs - hashX = self.coin.hashX_from_script(txout.pk_script) + hashX = hashX_from_script(txout.pk_script) if hashX: append_hashX(hashX) - self.utxo_cache[tx_hash + pack('= self.db_height: + yield tx_hash, (None, {'block_height': -1}) + continue + tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + if tx_height not in block_txs: + block_txs[tx_height] = list(tx_iterator( + start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]), + stop=None if tx_height + 1 == len(tx_counts) else + TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height]), + include_key=False + )) + tx_pos = tx_counts[tx_height] - tx_num + branch, root = branch_and_root(block_txs[tx_height], tx_pos) + merkle = { + 'block_height': tx_height, + 'merkle': [hash_to_hex_str(hash) for hash in branch], + 'pos': tx_pos + } + yield tx_hash, (None if not tx else tx.hex(), merkle) + return { + _tx_hash: _val for (_tx_hash, _val) in _iter_transactions() + } + + async def fs_transactions(self, txids): + return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) + async def fs_block_hashes(self, height, count): headers_concat, headers_count = await self.read_headers(height, count) if headers_count != count: @@ -446,25 +514,54 @@ class LevelDB: transactions. By default returns at most 1000 entries. Set limit to None to get them all. """ - def read_history(): - hashx_history = [] + # def read_history(): + # hashx_history = [] + # for key, hist in self.history.db.iterator(prefix=hashX): + # a = array.array('I') + # a.frombytes(hist) + # for tx_num in a: + # tx_height = bisect_right(self.tx_counts, tx_num) + # if tx_height > self.db_height: + # tx_hash = None + # else: + # tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) + # + # hashx_history.append((tx_hash, tx_height)) + # if limit and len(hashx_history) >= limit: + # return hashx_history + # return hashx_history + + def iter_tx_heights(): + db_height = self.db_height + tx_counts = self.tx_counts + tx_db_get = self.tx_db.get + pack_be_uint64 = util.pack_be_uint64 + + cnt = 0 + for key, hist in self.history.db.iterator(prefix=hashX): a = array.array('I') a.frombytes(hist) for tx_num in a: - tx_height = bisect_right(self.tx_counts, tx_num) - if tx_height > self.db_height: - tx_hash = None - else: - tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) - hashx_history.append((tx_hash, tx_height)) - if limit and len(hashx_history) >= limit: - return hashx_history - return hashx_history + tx_height = bisect_right(tx_counts, tx_num) + if tx_height > db_height: + yield None, tx_height + return + yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height + cnt += 1 + if limit and cnt >= limit: + return + if limit and cnt >= limit: + return + + def read_history(): + return [ + (tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights() + ] while True: history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) - if all(hash is not None for hash, height in history): + if not history or history[-1][0] is not None: return history self.logger.warning(f'limited_history: tx hash ' f'not found (reorg?), retrying...') @@ -622,13 +719,14 @@ class LevelDB: utxos = [] utxos_append = utxos.append s_unpack = unpack + fs_tx_hash = self.fs_tx_hash # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer prefix = b'u' + hashX for db_key, db_value in self.utxo_db.iterator(prefix=prefix): tx_pos, tx_num = s_unpack(' 100: raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') for tx_hash in tx_hashes: assert_tx_hash(tx_hash) - batch_result = {} + batch_result = await self.db.fs_transactions(tx_hashes) for tx_hash in tx_hashes: + if tx_hash in batch_result and batch_result[tx_hash][0]: + continue tx_info = await self.daemon_request('getrawtransaction', tx_hash, True) raw_tx = tx_info['hex'] block_hash = tx_info.get('blockhash') @@ -1558,6 +1572,7 @@ class LBRYElectrumX(SessionBase): height = -1 merkle['block_height'] = height batch_result[tx_hash] = [raw_tx, merkle] + self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result async def transaction_get(self, tx_hash, verbose=False): @@ -1579,11 +1594,7 @@ class LBRYElectrumX(SessionBase): block_hash is a hexadecimal string, and tx_hashes is an ordered list of hexadecimal strings. """ - height = non_negative_integer(height) - hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) - block_hash = hex_hashes[0] - block = await self.daemon.deserialised_block(block_hash) - return block_hash, block['tx'] + return await self.db.fs_block_tx_hashes(height) def _get_merkle_branch(self, tx_hashes, tx_pos): """Return a merkle branch to a transaction. diff --git a/lbry/wallet/server/storage.py b/lbry/wallet/server/storage.py index 5e7db97dd..127166204 100644 --- a/lbry/wallet/server/storage.py +++ b/lbry/wallet/server/storage.py @@ -77,18 +77,16 @@ class LevelDB(Storage): import plyvel cls.module = plyvel - def open(self, name, create): - mof = 512 if self.for_sync else 128 + def open(self, name, create, lru_cache_size=None): + mof = 10000 path = os.path.join(self.db_dir, name) # Use snappy compression (the default) - self.db = self.module.DB(path, create_if_missing=create, - max_open_files=mof) + self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof) self.close = self.db.close self.get = self.db.get self.put = self.db.put self.iterator = self.db.iterator - self.write_batch = partial(self.db.write_batch, transaction=True, - sync=True) + self.write_batch = partial(self.db.write_batch, transaction=True, sync=True) class RocksDB(Storage):