Merge pull request #3090 from lbryio/fast-headers

store headers in memory on wallet server
This commit is contained in:
Jack Robison 2020-11-20 23:33:18 -05:00 committed by GitHub
commit 87f1895405
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -80,6 +80,7 @@ class LevelDB:
self.history = History()
self.utxo_db = None
self.tx_counts = None
self.headers = None
self.last_flush = time.time()
self.logger.info(f'using {self.env.db_engine} for DB backend')
@ -113,6 +114,19 @@ class LevelDB:
else:
assert self.db_tx_count == 0
async def _read_headers(self):
if self.headers is not None:
return
def get_headers():
return [
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False)
]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers
async def _open_dbs(self, for_sync, compacting):
if self.executor is None:
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
@ -150,6 +164,7 @@ class LevelDB:
# Read TX counts (requires meta directory)
await self._read_tx_counts()
await self._read_headers()
def close(self):
self.utxo_db.close()
@ -286,6 +301,7 @@ class LevelDB:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
self.headers.append(header)
flush_data.headers.clear()
height_start = self.fs_height + 1
@ -392,7 +408,9 @@ class LevelDB:
def backup_fs(self, height, tx_count):
"""Back up during a reorg. This just updates our pointers."""
self.fs_height = height
while self.fs_height > height:
self.fs_height -= 1
self.headers.pop()
self.fs_tx_count = tx_count
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1)
@ -422,13 +440,7 @@ class LevelDB:
# Read some from disk
disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count:
headers = b''.join(
self.headers_db.iterator(
start=HEADER_PREFIX + util.pack_be_uint64(start_height),
stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count),
include_key=False
)
)
headers = b''.join(self.headers[start_height:start_height+disk_count])
if b16:
return headers.hex().encode(), disk_count
elif b64:
@ -437,6 +449,10 @@ class LevelDB:
return headers, disk_count
return b'', 0
if not b16 and not b64:
disk_count = max(0, min(count, self.db_height + 1 - start_height))
return b''.join(header for header in self.headers[start_height:start_height + disk_count]), disk_count
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
def fs_tx_hash(self, tx_num):
@ -495,16 +511,9 @@ class LevelDB:
return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids)
async def fs_block_hashes(self, height, count):
headers_concat, headers_count = await self.read_headers(height, count)
if headers_count != count:
raise self.DBError(f'only got {headers_count:,d} headers starting at {height:,d}, not {count:,d}')
offset = 0
headers = []
for n in range(count):
headers.append(headers_concat[offset:offset + self.coin.BASIC_HEADER_SIZE])
offset += self.coin.BASIC_HEADER_SIZE
return [self.coin.header_hash(header) for header in headers]
if height + count > len(self.headers):
raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
async def limited_history(self, hashX, *, limit=1000):
"""Return an unpruned, sorted list of (tx_hash, height) tuples of