diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 5eddb6c54..5e032e0b0 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -80,6 +80,7 @@ class LevelDB: self.history = History() self.utxo_db = None self.tx_counts = None + self.headers = None self.last_flush = time.time() self.logger.info(f'using {self.env.db_engine} for DB backend') @@ -113,6 +114,19 @@ class LevelDB: else: assert self.db_tx_count == 0 + async def _read_headers(self): + if self.headers is not None: + return + + def get_headers(): + return [ + header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False) + ] + + headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) + assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" + self.headers = headers + async def _open_dbs(self, for_sync, compacting): if self.executor is None: self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) @@ -150,6 +164,7 @@ class LevelDB: # Read TX counts (requires meta directory) await self._read_tx_counts() + await self._read_headers() def close(self): self.utxo_db.close() @@ -286,6 +301,7 @@ class LevelDB: batch_put = batch.put for i, header in enumerate(flush_data.headers): batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header) + self.headers.append(header) flush_data.headers.clear() height_start = self.fs_height + 1 @@ -392,7 +408,9 @@ class LevelDB: def backup_fs(self, height, tx_count): """Back up during a reorg. This just updates our pointers.""" - self.fs_height = height + while self.fs_height > height: + self.fs_height -= 1 + self.headers.pop() self.fs_tx_count = tx_count # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(height + 1) @@ -422,13 +440,7 @@ class LevelDB: # Read some from disk disk_count = max(0, min(count, self.db_height + 1 - start_height)) if disk_count: - headers = b''.join( - self.headers_db.iterator( - start=HEADER_PREFIX + util.pack_be_uint64(start_height), - stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count), - include_key=False - ) - ) + headers = b''.join(self.headers[start_height:start_height+disk_count]) if b16: return headers.hex().encode(), disk_count elif b64: @@ -437,6 +449,10 @@ class LevelDB: return headers, disk_count return b'', 0 + if not b16 and not b64: + disk_count = max(0, min(count, self.db_height + 1 - start_height)) + return b''.join(header for header in self.headers[start_height:start_height + disk_count]), disk_count + return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers) def fs_tx_hash(self, tx_num): @@ -495,16 +511,9 @@ class LevelDB: return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) async def fs_block_hashes(self, height, count): - headers_concat, headers_count = await self.read_headers(height, count) - if headers_count != count: - raise self.DBError(f'only got {headers_count:,d} headers starting at {height:,d}, not {count:,d}') - offset = 0 - headers = [] - for n in range(count): - headers.append(headers_concat[offset:offset + self.coin.BASIC_HEADER_SIZE]) - offset += self.coin.BASIC_HEADER_SIZE - - return [self.coin.header_hash(header) for header in headers] + if height + count > len(self.headers): + raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') + return [self.coin.header_hash(header) for header in self.headers[height:height + count]] async def limited_history(self, hashX, *, limit=1000): """Return an unpruned, sorted list of (tx_hash, height) tuples of