headers db

This commit is contained in:
Jack Robison 2020-06-09 16:21:48 -04:00
parent 22540390e1
commit cc51543851
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 38 additions and 55 deletions

View file

@ -32,6 +32,9 @@ from lbry.wallet.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HEADER_PREFIX = b'H'
TX_COUNT_PREFIX = b'T'
TX_HASH_PREFIX = b'X'
@attr.s(slots=True)
@ -63,15 +66,7 @@ class LevelDB:
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.env = env
self.coin = env.coin
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
# Setup block header size handlers
if self.coin.STATIC_BLOCK_HEADERS:
self.header_offset = self.coin.static_header_offset
self.header_len = self.coin.static_header_len
else:
self.header_offset = self.dynamic_header_offset
self.header_len = self.dynamic_header_len
self.executor = None
self.logger.info(f'switching current directory to {env.db_dir}')
@ -88,12 +83,9 @@ class LevelDB:
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
path = partial(os.path.join, self.env.db_dir)
self.headers_file = util.LogicalFile(path('meta/headers'), 2, 16000000)
self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000)
self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000)
if not self.coin.STATIC_BLOCK_HEADERS:
self.headers_offsets_file = util.LogicalFile(
path('meta/headers_offsets'), 2, 16000000)
self.headers_db = None
async def _read_tx_counts(self):
if self.tx_counts is not None:
@ -110,22 +102,24 @@ class LevelDB:
assert self.db_tx_count == 0
async def _open_dbs(self, for_sync, compacting):
assert self.utxo_db is None
if self.executor is None:
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
assert self.headers_db is None
self.headers_db = self.db_class('headers', for_sync)
if self.headers_db.is_new:
self.logger.info('created new headers db')
self.logger.info(f'opened headers DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new database')
self.logger.info('creating metadata directory')
os.mkdir(os.path.join(self.env.db_dir, 'meta'))
coin_path = os.path.join(self.env.db_dir, 'meta', 'COIN')
with util.open_file(coin_path, create=True) as f:
f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode())
if not self.coin.STATIC_BLOCK_HEADERS:
self.headers_offsets_file.write(0, bytes(8))
else:
self.logger.info(f'opened UTXO DB (for sync: {for_sync})')
self.logger.info('created new utxo db')
self.logger.info(f'opened utxo db (for sync: {for_sync})')
self.read_utxo_state()
# Then history DB
@ -140,7 +134,9 @@ class LevelDB:
def close(self):
self.utxo_db.close()
self.history.close_db()
self.headers_db.close()
self.executor.shutdown(wait=True)
self.executor = None
async def open_for_compacting(self):
await self._open_dbs(True, True)
@ -163,6 +159,9 @@ class LevelDB:
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
if self.headers_db:
self.headers_db.close()
self.headers_db = None
await self._open_dbs(False, False)
# Header merkle cache
@ -256,11 +255,11 @@ class LevelDB:
# Write the headers, tx counts, and tx hashes
start_time = time.time()
height_start = self.fs_height + 1
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(flush_data.headers))
self.fs_update_header_offsets(offset, height_start, flush_data.headers)
flush_data.headers.clear()
for header in flush_data.headers:
tx_count = self.tx_counts[height_start]
self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
height_start += 1
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
@ -270,6 +269,7 @@ class LevelDB:
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
flush_data.headers.clear()
if self.utxo_db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
@ -350,28 +350,6 @@ class LevelDB:
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def fs_update_header_offsets(self, offset_start, height_start, headers):
if self.coin.STATIC_BLOCK_HEADERS:
return
offset = offset_start
offsets = []
for h in headers:
offset += len(h)
offsets.append(pack("<Q", offset))
# For each header we get the offset of the next header, hence we
# start writing from the next height
pos = (height_start + 1) * 8
self.headers_offsets_file.write(pos, b''.join(offsets))
def dynamic_header_offset(self, height):
assert not self.coin.STATIC_BLOCK_HEADERS
offset, = unpack('<Q', self.headers_offsets_file.read(height * 8, 8))
return offset
def dynamic_header_len(self, height):
return self.dynamic_header_offset(height + 1)\
- self.dynamic_header_offset(height)
def backup_fs(self, height, tx_count):
"""Back up during a reorg. This just updates our pointers."""
self.fs_height = height
@ -403,9 +381,13 @@ class LevelDB:
# Read some from disk
disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count:
offset = self.header_offset(start_height)
size = self.header_offset(start_height + disk_count) - offset
return self.headers_file.read(offset, size), disk_count
return b''.join(
self.headers_db.iterator(
start=HEADER_PREFIX + util.pack_be_uint64(start_height),
stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count),
include_key=False
)
), disk_count
return b'', 0
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
@ -428,9 +410,8 @@ class LevelDB:
offset = 0
headers = []
for n in range(count):
hlen = self.header_len(height + n)
headers.append(headers_concat[offset:offset + hlen])
offset += hlen
headers.append(headers_concat[offset:offset + self.coin.BASIC_HEADER_SIZE])
offset += self.coin.BASIC_HEADER_SIZE
return [self.coin.header_hash(header) for header in headers]

View file

@ -333,11 +333,13 @@ unpack_le_uint64_from = struct_le_Q.unpack_from
unpack_be_uint16_from = struct_be_H.unpack_from
unpack_be_uint32_from = struct_be_I.unpack_from
unpack_be_uint64 = lambda x: int.from_bytes(x, byteorder='big')
pack_le_int32 = struct_le_i.pack
pack_le_int64 = struct_le_q.pack
pack_le_uint16 = struct_le_H.pack
pack_le_uint32 = struct_le_I.pack
pack_le_uint64 = struct_le_Q.pack
pack_be_uint64 = lambda x: x.to_bytes(8, byteorder='big')
pack_be_uint16 = struct_be_H.pack
pack_be_uint32 = struct_be_I.pack
pack_byte = structB.pack