diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 5e032e0b0..5a21b2853 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -15,7 +15,9 @@ import ast import os import time import zlib -import base64 +import pylru +import typing +from typing import Optional, List, Tuple, Iterable from asyncio import sleep from bisect import bisect_right from collections import namedtuple @@ -92,6 +94,9 @@ class LevelDB: self.headers_db = None self.tx_db = None + self._block_txs_cache = pylru.lrucache(50000) + self._merkle_tx_cache = pylru.lrucache(100000) + async def _read_tx_counts(self): if self.tx_counts is not None: return @@ -415,14 +420,14 @@ class LevelDB: # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(height + 1) - async def raw_header(self, height): + def raw_header(self, height): """Return the binary header at the given height.""" - header, n = await self.read_headers(height, 1) + header, n = self.read_headers(height, 1) if n != 1: raise IndexError(f'height {height:,d} out of range') return header - async def read_headers(self, start_height, count, b16=False, b64=False): + def read_headers(self, start_height, count) -> typing.Tuple[bytes, int]: """Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This would be zero if start_height is beyond self.db_height, for @@ -436,24 +441,10 @@ class LevelDB: raise self.DBError(f'{count:,d} headers starting at ' f'{start_height:,d} not on disk') - def read_headers(): - # Read some from disk - disk_count = max(0, min(count, self.db_height + 1 - start_height)) - if disk_count: - headers = b''.join(self.headers[start_height:start_height+disk_count]) - if b16: - return headers.hex().encode(), disk_count - elif b64: - compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9) - return base64.b64encode(compressobj.compress(headers) + compressobj.flush()), disk_count - return headers, disk_count - return b'', 0 - - if not b16 and not b64: - disk_count = max(0, min(count, self.db_height + 1 - start_height)) - return b''.join(header for header in self.headers[start_height:start_height + disk_count]), disk_count - - return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) + disk_count = max(0, min(count, self.db_height + 1 - start_height)) + if disk_count: + return b''.join(self.headers[start_height:start_height + disk_count]), disk_count + return b'', 0 def fs_tx_hash(self, tx_num): """Return a par (tx_hash, tx_height) for the given tx number. @@ -466,49 +457,75 @@ class LevelDB: tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) return tx_hash, tx_height - def _fs_transactions(self, txids): - def _iter_transactions(): - block_txs = {} - branch_and_root = self.merkle.branch_and_root - tx_iterator = self.tx_db.iterator - tx_counts = self.tx_counts - tx_db_get = self.tx_db.get - unpack_be_uint64 = util.unpack_be_uint64 - pack_be_uint64 = util.pack_be_uint64 + async def tx_merkle(self, tx_num, tx_height): + if tx_height == -1: + return { + 'block_height': -1 + } + tx_counts = self.tx_counts + tx_pos = tx_num - tx_counts[tx_height - 1] - for tx_hash in txids: - tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) - if tx_num is not None: - tx_num = unpack_be_uint64(tx_num) - tx_height = bisect_right(tx_counts, tx_num) - else: - yield tx_hash, (None, {'block_height': -1}) - continue - if tx_height >= self.db_height: - yield tx_hash, (None, {'block_height': -1}) - continue - tx = tx_db_get(TX_PREFIX + tx_hash_bytes) - if tx_height not in block_txs: - block_txs[tx_height] = list(tx_iterator( - start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]), - stop=None if tx_height + 1 == len(tx_counts) else - TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height] + 1), include_key=False - )) - tx_pos = tx_num - tx_counts[tx_height - 1] - branch, root = branch_and_root(block_txs[tx_height], tx_pos) - merkle = { - 'block_height': tx_height, - 'merkle': [hash_to_hex_str(hash) for hash in branch], - 'pos': tx_pos - } - yield tx_hash, (None if not tx else tx.hex(), merkle) - return { - _tx_hash: _val for (_tx_hash, _val) in _iter_transactions() + def _update_block_txs_cache(): + block_txs = list(self.tx_db.iterator( + start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]), + stop=None if tx_height + 1 == len(tx_counts) else + TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height] + 1), include_key=False + )) + if tx_height + 100 > self.db_height: + return block_txs + self._block_txs_cache[tx_height] = block_txs + + uncached = None + if (tx_num, tx_height) in self._merkle_tx_cache: + return self._merkle_tx_cache[(tx_num, tx_height)] + if tx_height not in self._block_txs_cache: + uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache) + merkle = { + 'block_height': tx_height, + 'merkle': [ + hash_to_hex_str(hash) for hash in self.merkle.branch_and_root( + self._block_txs_cache.get(tx_height, uncached), tx_pos + )[0] + ], + 'pos': tx_pos } + if tx_height + 100 < self.db_height: + self._merkle_tx_cache[(tx_num, tx_height)] = merkle + return merkle + + def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]: + unpack_be_uint64 = util.unpack_be_uint64 + tx_counts = self.tx_counts + tx_db_get = self.tx_db.get + tx_infos = [] + + for tx_hash in txids: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) + tx = None + tx_height = -1 + if tx_num is not None: + tx_num = unpack_be_uint64(tx_num) + tx_height = bisect_right(tx_counts, tx_num) + if tx_height < self.db_height: + tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height)) + + return tx_infos async def fs_transactions(self, txids): - return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) + txs = await asyncio.get_event_loop().run_in_executor( + self.executor, self._fs_transactions, txids + ) + + async def add_result(item): + _txid, _tx, _tx_num, _tx_height = item + result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height)) + + result = {} + if txs: + await asyncio.gather(*map(add_result, txs)) + return result async def fs_block_hashes(self, height, count): if height + count > len(self.headers): diff --git a/lbry/wallet/server/peers.py b/lbry/wallet/server/peers.py index f1407339b..a65ce07e3 100644 --- a/lbry/wallet/server/peers.py +++ b/lbry/wallet/server/peers.py @@ -319,7 +319,7 @@ class PeerManager: # Check prior header too in case of hard fork. check_height = min(our_height, their_height) - raw_header = await self.db.raw_header(check_height) + raw_header = self.db.raw_header(check_height) if ptuple >= (1, 4): ours = raw_header.hex() message = 'blockchain.block.header' diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 79fbda84f..3696f8bc8 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -607,7 +607,7 @@ class SessionManager: async def raw_header(self, height): """Return the binary header at the given height.""" try: - return await self.db.raw_header(height) + return self.db.raw_header(height) except IndexError: raise RPCError(BAD_REQUEST, f'height {height:,d} ' 'out of range') from None @@ -1329,31 +1329,12 @@ class LBRYElectrumX(SessionBase): f'require header height {height:,d} <= ' f'cp_height {cp_height:,d} <= ' f'chain height {max_height:,d}') - branch, root = await self.db.header_branch_and_root(cp_height + 1, - height) + branch, root = await self.db.header_branch_and_root(cp_height + 1, height) return { 'branch': [hash_to_hex_str(elt) for elt in branch], 'root': hash_to_hex_str(root), } - async def block_header(self, height, cp_height=0): - """Return a raw block header as a hexadecimal string, or as a - dictionary with a merkle proof.""" - height = non_negative_integer(height) - cp_height = non_negative_integer(cp_height) - raw_header_hex = (await self.session_mgr.raw_header(height)).hex() - if cp_height == 0: - return raw_header_hex - result = {'header': raw_header_hex} - result.update(await self._merkle_proof(cp_height, height)) - return result - - async def block_header_13(self, height): - """Return a raw block header as a hexadecimal string. - - height: the header's height""" - return await self.block_header(height) - async def block_headers(self, start_height, count, cp_height=0, b64=False): """Return count concatenated block headers as hex for the main chain; starting at start_height. @@ -1367,9 +1348,15 @@ class LBRYElectrumX(SessionBase): max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) - headers, count = await self.db.read_headers(start_height, count, b16=not b64, b64=b64) + headers, count = self.db.read_headers(start_height, count) + + if b64: + compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9) + headers = base64.b64encode(compressobj.compress(headers) + compressobj.flush()).decode() + else: + headers = headers.hex() result = { - 'base64' if b64 else 'hex': headers.decode(), + 'base64' if b64 else 'hex': headers, 'count': count, 'max': max_size } @@ -1385,7 +1372,7 @@ class LBRYElectrumX(SessionBase): index = non_negative_integer(index) size = self.coin.CHUNK_SIZE start_height = index * size - headers, _ = await self.db.read_headers(start_height, size) + headers, _ = self.db.read_headers(start_height, size) return headers.hex() async def block_get_header(self, height): @@ -1537,6 +1524,7 @@ class LBRYElectrumX(SessionBase): assert_tx_hash(tx_hash) batch_result = await self.db.fs_transactions(tx_hashes) needed_merkles = {} + for tx_hash in tx_hashes: if tx_hash in batch_result and batch_result[tx_hash][0]: continue