Merge pull request #3092 from lbryio/faster-transaction-getbatch

faster `blockchain.transaction.get_batch`
This commit is contained in:
Jack Robison 2020-11-23 12:29:14 -05:00 committed by GitHub
commit 7204ddafec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 86 deletions

View file

@ -15,7 +15,9 @@ import ast
import os import os
import time import time
import zlib import zlib
import base64 import pylru
import typing
from typing import Optional, List, Tuple, Iterable
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
@ -92,6 +94,9 @@ class LevelDB:
self.headers_db = None self.headers_db = None
self.tx_db = None self.tx_db = None
self._block_txs_cache = pylru.lrucache(50000)
self._merkle_tx_cache = pylru.lrucache(100000)
async def _read_tx_counts(self): async def _read_tx_counts(self):
if self.tx_counts is not None: if self.tx_counts is not None:
return return
@ -415,14 +420,14 @@ class LevelDB:
# Truncate header_mc: header count is 1 more than the height. # Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1) self.header_mc.truncate(height + 1)
async def raw_header(self, height): def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""
header, n = await self.read_headers(height, 1) header, n = self.read_headers(height, 1)
if n != 1: if n != 1:
raise IndexError(f'height {height:,d} out of range') raise IndexError(f'height {height:,d} out of range')
return header return header
async def read_headers(self, start_height, count, b16=False, b64=False): def read_headers(self, start_height, count) -> typing.Tuple[bytes, int]:
"""Requires start_height >= 0, count >= 0. Reads as many headers as """Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This are available starting at start_height up to count. This
would be zero if start_height is beyond self.db_height, for would be zero if start_height is beyond self.db_height, for
@ -436,24 +441,10 @@ class LevelDB:
raise self.DBError(f'{count:,d} headers starting at ' raise self.DBError(f'{count:,d} headers starting at '
f'{start_height:,d} not on disk') f'{start_height:,d} not on disk')
def read_headers(): disk_count = max(0, min(count, self.db_height + 1 - start_height))
# Read some from disk if disk_count:
disk_count = max(0, min(count, self.db_height + 1 - start_height)) return b''.join(self.headers[start_height:start_height + disk_count]), disk_count
if disk_count: return b'', 0
headers = b''.join(self.headers[start_height:start_height+disk_count])
if b16:
return headers.hex().encode(), disk_count
elif b64:
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
return base64.b64encode(compressobj.compress(headers) + compressobj.flush()), disk_count
return headers, disk_count
return b'', 0
if not b16 and not b64:
disk_count = max(0, min(count, self.db_height + 1 - start_height))
return b''.join(header for header in self.headers[start_height:start_height + disk_count]), disk_count
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
def fs_tx_hash(self, tx_num): def fs_tx_hash(self, tx_num):
"""Return a par (tx_hash, tx_height) for the given tx number. """Return a par (tx_hash, tx_height) for the given tx number.
@ -466,49 +457,75 @@ class LevelDB:
tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
return tx_hash, tx_height return tx_hash, tx_height
def _fs_transactions(self, txids): async def tx_merkle(self, tx_num, tx_height):
def _iter_transactions(): if tx_height == -1:
block_txs = {} return {
branch_and_root = self.merkle.branch_and_root 'block_height': -1
tx_iterator = self.tx_db.iterator }
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.tx_db.get tx_pos = tx_num - tx_counts[tx_height - 1]
unpack_be_uint64 = util.unpack_be_uint64
pack_be_uint64 = util.pack_be_uint64
for tx_hash in txids: def _update_block_txs_cache():
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] block_txs = list(self.tx_db.iterator(
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
if tx_num is not None: stop=None if tx_height + 1 == len(tx_counts) else
tx_num = unpack_be_uint64(tx_num) TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height] + 1), include_key=False
tx_height = bisect_right(tx_counts, tx_num) ))
else: if tx_height + 100 > self.db_height:
yield tx_hash, (None, {'block_height': -1}) return block_txs
continue self._block_txs_cache[tx_height] = block_txs
if tx_height >= self.db_height:
yield tx_hash, (None, {'block_height': -1}) uncached = None
continue if (tx_num, tx_height) in self._merkle_tx_cache:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes) return self._merkle_tx_cache[(tx_num, tx_height)]
if tx_height not in block_txs: if tx_height not in self._block_txs_cache:
block_txs[tx_height] = list(tx_iterator( uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache)
start=TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height - 1]), merkle = {
stop=None if tx_height + 1 == len(tx_counts) else 'block_height': tx_height,
TX_HASH_PREFIX + pack_be_uint64(tx_counts[tx_height] + 1), include_key=False 'merkle': [
)) hash_to_hex_str(hash) for hash in self.merkle.branch_and_root(
tx_pos = tx_num - tx_counts[tx_height - 1] self._block_txs_cache.get(tx_height, uncached), tx_pos
branch, root = branch_and_root(block_txs[tx_height], tx_pos) )[0]
merkle = { ],
'block_height': tx_height, 'pos': tx_pos
'merkle': [hash_to_hex_str(hash) for hash in branch],
'pos': tx_pos
}
yield tx_hash, (None if not tx else tx.hex(), merkle)
return {
_tx_hash: _val for (_tx_hash, _val) in _iter_transactions()
} }
if tx_height + 100 < self.db_height:
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
tx_infos = []
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < self.db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height))
return tx_infos
async def fs_transactions(self, txids): async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) txs = await asyncio.get_event_loop().run_in_executor(
self.executor, self._fs_transactions, txids
)
async def add_result(item):
_txid, _tx, _tx_num, _tx_height = item
result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
result = {}
if txs:
await asyncio.gather(*map(add_result, txs))
return result
async def fs_block_hashes(self, height, count): async def fs_block_hashes(self, height, count):
if height + count > len(self.headers): if height + count > len(self.headers):

View file

@ -319,7 +319,7 @@ class PeerManager:
# Check prior header too in case of hard fork. # Check prior header too in case of hard fork.
check_height = min(our_height, their_height) check_height = min(our_height, their_height)
raw_header = await self.db.raw_header(check_height) raw_header = self.db.raw_header(check_height)
if ptuple >= (1, 4): if ptuple >= (1, 4):
ours = raw_header.hex() ours = raw_header.hex()
message = 'blockchain.block.header' message = 'blockchain.block.header'

View file

@ -607,7 +607,7 @@ class SessionManager:
async def raw_header(self, height): async def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""
try: try:
return await self.db.raw_header(height) return self.db.raw_header(height)
except IndexError: except IndexError:
raise RPCError(BAD_REQUEST, f'height {height:,d} ' raise RPCError(BAD_REQUEST, f'height {height:,d} '
'out of range') from None 'out of range') from None
@ -1329,31 +1329,12 @@ class LBRYElectrumX(SessionBase):
f'require header height {height:,d} <= ' f'require header height {height:,d} <= '
f'cp_height {cp_height:,d} <= ' f'cp_height {cp_height:,d} <= '
f'chain height {max_height:,d}') f'chain height {max_height:,d}')
branch, root = await self.db.header_branch_and_root(cp_height + 1, branch, root = await self.db.header_branch_and_root(cp_height + 1, height)
height)
return { return {
'branch': [hash_to_hex_str(elt) for elt in branch], 'branch': [hash_to_hex_str(elt) for elt in branch],
'root': hash_to_hex_str(root), 'root': hash_to_hex_str(root),
} }
async def block_header(self, height, cp_height=0):
"""Return a raw block header as a hexadecimal string, or as a
dictionary with a merkle proof."""
height = non_negative_integer(height)
cp_height = non_negative_integer(cp_height)
raw_header_hex = (await self.session_mgr.raw_header(height)).hex()
if cp_height == 0:
return raw_header_hex
result = {'header': raw_header_hex}
result.update(await self._merkle_proof(cp_height, height))
return result
async def block_header_13(self, height):
"""Return a raw block header as a hexadecimal string.
height: the header's height"""
return await self.block_header(height)
async def block_headers(self, start_height, count, cp_height=0, b64=False): async def block_headers(self, start_height, count, cp_height=0, b64=False):
"""Return count concatenated block headers as hex for the main chain; """Return count concatenated block headers as hex for the main chain;
starting at start_height. starting at start_height.
@ -1367,9 +1348,15 @@ class LBRYElectrumX(SessionBase):
max_size = self.MAX_CHUNK_SIZE max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size) count = min(count, max_size)
headers, count = await self.db.read_headers(start_height, count, b16=not b64, b64=b64) headers, count = self.db.read_headers(start_height, count)
if b64:
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
headers = base64.b64encode(compressobj.compress(headers) + compressobj.flush()).decode()
else:
headers = headers.hex()
result = { result = {
'base64' if b64 else 'hex': headers.decode(), 'base64' if b64 else 'hex': headers,
'count': count, 'count': count,
'max': max_size 'max': max_size
} }
@ -1385,7 +1372,7 @@ class LBRYElectrumX(SessionBase):
index = non_negative_integer(index) index = non_negative_integer(index)
size = self.coin.CHUNK_SIZE size = self.coin.CHUNK_SIZE
start_height = index * size start_height = index * size
headers, _ = await self.db.read_headers(start_height, size) headers, _ = self.db.read_headers(start_height, size)
return headers.hex() return headers.hex()
async def block_get_header(self, height): async def block_get_header(self, height):
@ -1537,6 +1524,7 @@ class LBRYElectrumX(SessionBase):
assert_tx_hash(tx_hash) assert_tx_hash(tx_hash)
batch_result = await self.db.fs_transactions(tx_hashes) batch_result = await self.db.fs_transactions(tx_hashes)
needed_merkles = {} needed_merkles = {}
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
if tx_hash in batch_result and batch_result[tx_hash][0]: if tx_hash in batch_result and batch_result[tx_hash][0]:
continue continue