From 2c7fd58e34d9d9b4428a8e4bf621b51d559862cd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 20 Nov 2020 12:52:15 -0500 Subject: [PATCH] threaded compress headers --- lbry/wallet/server/leveldb.py | 17 ++++++++++++----- lbry/wallet/server/session.py | 6 ++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 6ead4946d..5eddb6c54 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -14,11 +14,11 @@ import array import ast import os import time +import zlib +import base64 from asyncio import sleep from bisect import bisect_right from collections import namedtuple -from functools import partial -from binascii import unhexlify, hexlify from glob import glob from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor @@ -404,7 +404,7 @@ class LevelDB: raise IndexError(f'height {height:,d} out of range') return header - async def read_headers(self, start_height, count): + async def read_headers(self, start_height, count, b16=False, b64=False): """Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This would be zero if start_height is beyond self.db_height, for @@ -413,6 +413,7 @@ class LevelDB: Returns a (binary, n) pair where binary is the concatenated binary headers, and n is the count of headers returned. """ + if start_height < 0 or count < 0: raise self.DBError(f'{count:,d} headers starting at ' f'{start_height:,d} not on disk') @@ -421,13 +422,19 @@ class LevelDB: # Read some from disk disk_count = max(0, min(count, self.db_height + 1 - start_height)) if disk_count: - return b''.join( + headers = b''.join( self.headers_db.iterator( start=HEADER_PREFIX + util.pack_be_uint64(start_height), stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count), include_key=False ) - ), disk_count + ) + if b16: + return headers.hex().encode(), disk_count + elif b64: + compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9) + return base64.b64encode(compressobj.compress(headers) + compressobj.flush()), disk_count + return headers, disk_count return b'', 0 return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 2e4c5d0e6..c5f79b116 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -1370,11 +1370,9 @@ class LBRYElectrumX(SessionBase): max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) - headers, count = await self.db.read_headers(start_height, count) - compressobj = zlib.compressobj(wbits=-15, level=1, memLevel=9) - headers = base64.b64encode(compressobj.compress(headers) + compressobj.flush()).decode() if b64 else headers.hex() + headers, count = await self.db.read_headers(start_height, count, b16=not b64, b64=b64) result = { - 'base64' if b64 else 'hex': headers, + 'base64' if b64 else 'hex': headers.decode(), 'count': count, 'max': max_size }