threaded compress headers

This commit is contained in:
Jack Robison 2020-11-20 12:52:15 -05:00
parent 982f2c9634
commit 2c7fd58e34
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 14 additions and 9 deletions

View file

@ -14,11 +14,11 @@ import array
import ast import ast
import os import os
import time import time
import zlib
import base64
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from functools import partial
from binascii import unhexlify, hexlify
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -404,7 +404,7 @@ class LevelDB:
raise IndexError(f'height {height:,d} out of range') raise IndexError(f'height {height:,d} out of range')
return header return header
async def read_headers(self, start_height, count): async def read_headers(self, start_height, count, b16=False, b64=False):
"""Requires start_height >= 0, count >= 0. Reads as many headers as """Requires start_height >= 0, count >= 0. Reads as many headers as
are available starting at start_height up to count. This are available starting at start_height up to count. This
would be zero if start_height is beyond self.db_height, for would be zero if start_height is beyond self.db_height, for
@ -413,6 +413,7 @@ class LevelDB:
Returns a (binary, n) pair where binary is the concatenated Returns a (binary, n) pair where binary is the concatenated
binary headers, and n is the count of headers returned. binary headers, and n is the count of headers returned.
""" """
if start_height < 0 or count < 0: if start_height < 0 or count < 0:
raise self.DBError(f'{count:,d} headers starting at ' raise self.DBError(f'{count:,d} headers starting at '
f'{start_height:,d} not on disk') f'{start_height:,d} not on disk')
@ -421,13 +422,19 @@ class LevelDB:
# Read some from disk # Read some from disk
disk_count = max(0, min(count, self.db_height + 1 - start_height)) disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count: if disk_count:
return b''.join( headers = b''.join(
self.headers_db.iterator( self.headers_db.iterator(
start=HEADER_PREFIX + util.pack_be_uint64(start_height), start=HEADER_PREFIX + util.pack_be_uint64(start_height),
stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count), stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count),
include_key=False include_key=False
) )
), disk_count )
if b16:
return headers.hex().encode(), disk_count
elif b64:
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
return base64.b64encode(compressobj.compress(headers) + compressobj.flush()), disk_count
return headers, disk_count
return b'', 0 return b'', 0
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)

View file

@ -1370,11 +1370,9 @@ class LBRYElectrumX(SessionBase):
max_size = self.MAX_CHUNK_SIZE max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size) count = min(count, max_size)
headers, count = await self.db.read_headers(start_height, count) headers, count = await self.db.read_headers(start_height, count, b16=not b64, b64=b64)
compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9)
headers = base64.b64encode(compressobj.compress(headers) + compressobj.flush()).decode() if b64 else headers.hex()
result = { result = {
'base64' if b64 else 'hex': headers, 'base64' if b64 else 'hex': headers.decode(),
'count': count, 'count': count,
'max': max_size 'max': max_size
} }