faster read_headers

This commit is contained in:
Jack Robison 2020-11-21 15:48:20 -05:00
parent 715451b5fb
commit 190d238a1f
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 20 additions and 46 deletions

View file

@ -415,14 +415,14 @@ class LevelDB:
# Truncate header_mc: header count is 1 more than the height. # Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1) self.header_mc.truncate(height + 1)
async def raw_header(self, height): def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""
header, n = await self.read_headers(height, 1) header, n = self.read_headers(height, 1)
if n != 1: if n != 1:
raise IndexError(f'height {height:,d} out of range') raise IndexError(f'height {height:,d} out of range')
return header return header
async def read_headers(self, start_height, count, b16=False, b64=False): def read_headers(self, start_height, count) -> typing.Tuple[bytes, int]:
"""Requires start_height >= 0, count >= 0. Reads as many headers as """Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This are available starting at start_height up to count. This
would be zero if start_height is beyond self.db_height, for would be zero if start_height is beyond self.db_height, for
@ -436,25 +436,11 @@ class LevelDB:
raise self.DBError(f'{count:,d} headers starting at ' raise self.DBError(f'{count:,d} headers starting at '
f'{start_height:,d} not on disk') f'{start_height:,d} not on disk')
def read_headers():
# Read some from disk
disk_count = max(0, min(count, self.db_height + 1 - start_height)) disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count: if disk_count:
headers = b''.join(self.headers[start_height:start_height+disk_count]) return b''.join(self.headers[start_height:start_height + disk_count]), disk_count
if b16:
return headers.hex().encode(), disk_count
elif b64:
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
return base64.b64encode(compressobj.compress(headers) + compressobj.flush()), disk_count
return headers, disk_count
return b'', 0 return b'', 0
if not b16 and not b64:
disk_count = max(0, min(count, self.db_height + 1 - start_height))
return b''.join(header for header in self.headers[start_height:start_height + disk_count]), disk_count
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
def fs_tx_hash(self, tx_num): def fs_tx_hash(self, tx_num):
"""Return a par (tx_hash, tx_height) for the given tx number. """Return a par (tx_hash, tx_height) for the given tx number.

View file

@ -319,7 +319,7 @@ class PeerManager:
# Check prior header too in case of hard fork. # Check prior header too in case of hard fork.
check_height = min(our_height, their_height) check_height = min(our_height, their_height)
raw_header = await self.db.raw_header(check_height) raw_header = self.db.raw_header(check_height)
if ptuple >= (1, 4): if ptuple >= (1, 4):
ours = raw_header.hex() ours = raw_header.hex()
message = 'blockchain.block.header' message = 'blockchain.block.header'

View file

@ -607,7 +607,7 @@ class SessionManager:
async def raw_header(self, height): async def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""
try: try:
return await self.db.raw_header(height) return self.db.raw_header(height)
except IndexError: except IndexError:
raise RPCError(BAD_REQUEST, f'height {height:,d} ' raise RPCError(BAD_REQUEST, f'height {height:,d} '
'out of range') from None 'out of range') from None
@ -1329,31 +1329,12 @@ class LBRYElectrumX(SessionBase):
f'require header height {height:,d} <= ' f'require header height {height:,d} <= '
f'cp_height {cp_height:,d} <= ' f'cp_height {cp_height:,d} <= '
f'chain height {max_height:,d}') f'chain height {max_height:,d}')
branch, root = await self.db.header_branch_and_root(cp_height + 1, branch, root = await self.db.header_branch_and_root(cp_height + 1, height)
height)
return { return {
'branch': [hash_to_hex_str(elt) for elt in branch], 'branch': [hash_to_hex_str(elt) for elt in branch],
'root': hash_to_hex_str(root), 'root': hash_to_hex_str(root),
} }
async def block_header(self, height, cp_height=0):
"""Return a raw block header as a hexadecimal string, or as a
dictionary with a merkle proof."""
height = non_negative_integer(height)
cp_height = non_negative_integer(cp_height)
raw_header_hex = (await self.session_mgr.raw_header(height)).hex()
if cp_height == 0:
return raw_header_hex
result = {'header': raw_header_hex}
result.update(await self._merkle_proof(cp_height, height))
return result
async def block_header_13(self, height):
"""Return a raw block header as a hexadecimal string.
height: the header's height"""
return await self.block_header(height)
async def block_headers(self, start_height, count, cp_height=0, b64=False): async def block_headers(self, start_height, count, cp_height=0, b64=False):
"""Return count concatenated block headers as hex for the main chain; """Return count concatenated block headers as hex for the main chain;
starting at start_height. starting at start_height.
@ -1367,9 +1348,15 @@ class LBRYElectrumX(SessionBase):
max_size = self.MAX_CHUNK_SIZE max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size) count = min(count, max_size)
headers, count = await self.db.read_headers(start_height, count, b16=not b64, b64=b64) headers, count = self.db.read_headers(start_height, count)
if b64:
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
headers = base64.b64encode(compressobj.compress(headers) + compressobj.flush()).decode()
else:
headers = headers.hex()
result = { result = {
'base64' if b64 else 'hex': headers.decode(), 'base64' if b64 else 'hex': headers,
'count': count, 'count': count,
'max': max_size 'max': max_size
} }
@ -1385,7 +1372,7 @@ class LBRYElectrumX(SessionBase):
index = non_negative_integer(index) index = non_negative_integer(index)
size = self.coin.CHUNK_SIZE size = self.coin.CHUNK_SIZE
start_height = index * size start_height = index * size
headers, _ = await self.db.read_headers(start_height, size) headers, _ = self.db.read_headers(start_height, size)
return headers.hex() return headers.hex()
async def block_get_header(self, height): async def block_get_header(self, height):
@ -1537,6 +1524,7 @@ class LBRYElectrumX(SessionBase):
assert_tx_hash(tx_hash) assert_tx_hash(tx_hash)
batch_result = await self.db.fs_transactions(tx_hashes) batch_result = await self.db.fs_transactions(tx_hashes)
needed_merkles = {} needed_merkles = {}
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
if tx_hash in batch_result and batch_result[tx_hash][0]: if tx_hash in batch_result and batch_result[tx_hash][0]:
continue continue