Merge pull request #2981 from lbryio/wallet-server-meta-db

Move wallet server tx hashes and headers to leveldb to speed up the rate mempool notifications are sent
This commit is contained in:
Lex Berezhny 2020-07-06 10:50:24 -04:00 committed by GitHub
commit d4bec79451
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 196 additions and 131 deletions

View file

@ -529,13 +529,14 @@ def _get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, decode
def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: List, amount_to_reserve: int, floor: int,
fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool):
fee_per_byte: int, set_reserved: bool, return_insufficient_funds: bool,
base_multiplier: int = 100):
txs = defaultdict(list)
decoded_transactions = {}
reserved = []
reserved_dewies = 0
multiplier = 10
multiplier = base_multiplier
gap_count = 0
while reserved_dewies < amount_to_reserve and gap_count < 5 and floor * multiplier < SQLITE_MAX_INTEGER:
@ -550,7 +551,7 @@ def get_and_reserve_spendable_utxos(transaction: sqlite3.Connection, accounts: L
multiplier **= 2
else:
gap_count = 0
multiplier = 10
multiplier = base_multiplier
# reserve the accumulated txos if enough were found
if reserved_dewies >= amount_to_reserve:
@ -762,7 +763,7 @@ class Database(SQLiteMixin):
# 2. update address histories removing deleted TXs
return True
async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000,
async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 1,
fee_per_byte: int = 50, set_reserved: bool = True,
return_insufficient_funds: bool = False) -> List:
to_spend = await self.db.run(

View file

@ -244,8 +244,7 @@ class Ledger(metaclass=LedgerRegistry):
def get_address_count(self, **constraints):
return self.db.get_address_count(**constraints)
async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']],
min_amount=100000):
async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']], min_amount=1):
min_amount = min(amount // 10, min_amount)
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
selector = CoinSelector(amount, fee)
@ -698,7 +697,7 @@ class Ledger(metaclass=LedgerRegistry):
local_height, height
)
return False
log.warning(
log.debug(
"local history does not contain %s, requested height %i", tx.id, height
)
return False

View file

@ -251,7 +251,7 @@ class BlockProcessor:
async def get_raw_blocks(last_height, hex_hashes):
heights = range(last_height, last_height - len(hex_hashes), -1)
try:
blocks = [self.db.read_raw_block(height) for height in heights]
blocks = [await self.db.read_raw_block(height) for height in heights]
self.logger.info(f'read {len(blocks)} blocks from disk')
return blocks
except FileNotFoundError:
@ -351,11 +351,9 @@ class BlockProcessor:
# performed on the DB.
if self._caught_up_event.is_set():
await self.flush(True)
elif time.time() > self.next_cache_check:
flush_arg = self.check_cache_size()
if flush_arg is not None:
await self.flush(flush_arg)
self.next_cache_check = time.time() + 30
elif time.perf_counter() > self.next_cache_check:
await self.flush(True)
self.next_cache_check = time.perf_counter() + 30
def check_cache_size(self):
"""Flush a cache if it gets too big."""

View file

@ -32,6 +32,9 @@ from lbry.wallet.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HEADER_PREFIX = b'H'
TX_COUNT_PREFIX = b'T'
TX_HASH_PREFIX = b'X'
@attr.s(slots=True)
@ -63,15 +66,7 @@ class LevelDB:
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.env = env
self.coin = env.coin
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
# Setup block header size handlers
if self.coin.STATIC_BLOCK_HEADERS:
self.header_offset = self.coin.static_header_offset
self.header_len = self.coin.static_header_len
else:
self.header_offset = self.dynamic_header_offset
self.header_len = self.dynamic_header_len
self.executor = None
self.logger.info(f'switching current directory to {env.db_dir}')
@ -87,51 +82,71 @@ class LevelDB:
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
path = partial(os.path.join, self.env.db_dir)
self.headers_file = util.LogicalFile(path('meta/headers'), 2, 16000000)
self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000)
self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000)
if not self.coin.STATIC_BLOCK_HEADERS:
self.headers_offsets_file = util.LogicalFile(
path('meta/headers_offsets'), 2, 16000000)
self.hashes_db = None
self.headers_db = None
self.tx_count_db = None
async def _read_tx_counts(self):
if self.tx_counts is not None:
return
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
size = (self.db_height + 1) * 4
tx_counts = self.tx_counts_file.read(0, size)
assert len(tx_counts) == size
def get_counts():
return tuple(
util.unpack_be_uint64(tx_count)
for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
)
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
self.tx_counts = array.array('I', tx_counts)
if self.tx_counts:
assert self.db_tx_count == self.tx_counts[-1]
assert self.db_tx_count == self.tx_counts[-1], \
f"{self.db_tx_count} vs {self.tx_counts[-1]} ({len(self.tx_counts)} counts)"
else:
assert self.db_tx_count == 0
async def _open_dbs(self, for_sync, compacting):
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new database')
self.logger.info('creating metadata directory')
os.mkdir(os.path.join(self.env.db_dir, 'meta'))
coin_path = os.path.join(self.env.db_dir, 'meta', 'COIN')
if self.executor is None:
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
coin_path = os.path.join(self.env.db_dir, 'COIN')
if not os.path.isfile(coin_path):
with util.open_file(coin_path, create=True) as f:
f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode())
if not self.coin.STATIC_BLOCK_HEADERS:
self.headers_offsets_file.write(0, bytes(8))
else:
self.logger.info(f'opened UTXO DB (for sync: {for_sync})')
assert self.headers_db is None
self.headers_db = self.db_class('headers', for_sync)
if self.headers_db.is_new:
self.logger.info('created new headers db')
self.logger.info(f'opened headers DB (for sync: {for_sync})')
assert self.tx_count_db is None
self.tx_count_db = self.db_class('tx_count', for_sync)
if self.tx_count_db.is_new:
self.logger.info('created new tx count db')
self.logger.info(f'opened tx count DB (for sync: {for_sync})')
assert self.hashes_db is None
self.hashes_db = self.db_class('hashes', for_sync)
if self.hashes_db.is_new:
self.logger.info('created new tx hashes db')
self.logger.info(f'opened tx hashes DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new utxo db')
self.logger.info(f'opened utxo db (for sync: {for_sync})')
self.read_utxo_state()
# Then history DB
self.utxo_flush_count = self.history.open_db(self.db_class, for_sync,
self.utxo_flush_count,
compacting)
self.utxo_flush_count = self.history.open_db(
self.db_class, for_sync, self.utxo_flush_count, compacting
)
self.clear_excess_undo_info()
# Read TX counts (requires meta directory)
@ -140,7 +155,11 @@ class LevelDB:
def close(self):
self.utxo_db.close()
self.history.close_db()
self.headers_db.close()
self.tx_count_db.close()
self.hashes_db.close()
self.executor.shutdown(wait=True)
self.executor = None
async def open_for_compacting(self):
await self._open_dbs(True, True)
@ -152,18 +171,31 @@ class LevelDB:
synchronization. When serving clients we want the open files for
serving network connections.
"""
self.logger.info("opened for sync")
await self._open_dbs(True, False)
async def open_for_serving(self):
"""Open the databases for serving. If they are already open they are
closed first.
"""
self.logger.info('closing DBs to re-open for serving')
if self.utxo_db:
self.logger.info('closing DBs to re-open for serving')
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
if self.headers_db:
self.headers_db.close()
self.headers_db = None
if self.tx_count_db:
self.tx_count_db.close()
self.tx_count_db = None
if self.hashes_db:
self.hashes_db.close()
self.hashes_db = None
await self._open_dbs(False, False)
self.logger.info("opened for serving")
# Header merkle cache
@ -248,30 +280,29 @@ class LevelDB:
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
hashes = b''.join(flush_data.block_tx_hashes)
flush_data.block_tx_hashes.clear()
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == flush_data.tx_count - prior_tx_count
assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers, tx counts, and tx hashes
start_time = time.time()
start_time = time.perf_counter()
height_start = self.fs_height + 1
offset = self.header_offset(height_start)
self.headers_file.write(offset, b''.join(flush_data.headers))
self.fs_update_header_offsets(offset, height_start, flush_data.headers)
flush_data.headers.clear()
tx_num = prior_tx_count
offset = height_start * self.tx_counts.itemsize
self.tx_counts_file.write(offset,
self.tx_counts[height_start:].tobytes())
offset = prior_tx_count * 32
self.hashes_file.write(offset, hashes)
for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes):
tx_count = self.tx_counts[height_start]
self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1
offset = 0
while offset < len(tx_hashes):
self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
tx_num += 1
offset += 32
flush_data.block_tx_hashes.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
if self.utxo_db.for_sync:
elapsed = time.time() - start_time
flush_data.headers.clear()
elapsed = time.perf_counter() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
@ -350,28 +381,6 @@ class LevelDB:
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def fs_update_header_offsets(self, offset_start, height_start, headers):
if self.coin.STATIC_BLOCK_HEADERS:
return
offset = offset_start
offsets = []
for h in headers:
offset += len(h)
offsets.append(pack("<Q", offset))
# For each header we get the offset of the next header, hence we
# start writing from the next height
pos = (height_start + 1) * 8
self.headers_offsets_file.write(pos, b''.join(offsets))
def dynamic_header_offset(self, height):
assert not self.coin.STATIC_BLOCK_HEADERS
offset, = unpack('<Q', self.headers_offsets_file.read(height * 8, 8))
return offset
def dynamic_header_len(self, height):
return self.dynamic_header_offset(height + 1)\
- self.dynamic_header_offset(height)
def backup_fs(self, height, tx_count):
"""Back up during a reorg. This just updates our pointers."""
self.fs_height = height
@ -403,9 +412,13 @@ class LevelDB:
# Read some from disk
disk_count = max(0, min(count, self.db_height + 1 - start_height))
if disk_count:
offset = self.header_offset(start_height)
size = self.header_offset(start_height + disk_count) - offset
return self.headers_file.read(offset, size), disk_count
return b''.join(
self.headers_db.iterator(
start=HEADER_PREFIX + util.pack_be_uint64(start_height),
stop=HEADER_PREFIX + util.pack_be_uint64(start_height + disk_count),
include_key=False
)
), disk_count
return b'', 0
return await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
@ -418,7 +431,7 @@ class LevelDB:
if tx_height > self.db_height:
tx_hash = None
else:
tx_hash = self.hashes_file.read(tx_num * 32, 32)
tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
return tx_hash, tx_height
async def fs_block_hashes(self, height, count):
@ -428,9 +441,8 @@ class LevelDB:
offset = 0
headers = []
for n in range(count):
hlen = self.header_len(height + n)
headers.append(headers_concat[offset:offset + hlen])
offset += hlen
headers.append(headers_concat[offset:offset + self.coin.BASIC_HEADER_SIZE])
offset += self.coin.BASIC_HEADER_SIZE
return [self.coin.header_hash(header) for header in headers]
@ -442,9 +454,20 @@ class LevelDB:
limit to None to get them all.
"""
def read_history():
tx_nums = list(self.history.get_txnums(hashX, limit))
fs_tx_hash = self.fs_tx_hash
return [fs_tx_hash(tx_num) for tx_num in tx_nums]
hashx_history = []
for key, hist in self.history.db.iterator(prefix=hashX):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > self.db_height:
tx_hash = None
else:
tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
hashx_history.append((tx_hash, tx_height))
if limit and len(hashx_history) >= limit:
return hashx_history
return hashx_history
while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
@ -474,17 +497,21 @@ class LevelDB:
batch_put(self.undo_key(height), b''.join(undo_info))
def raw_block_prefix(self):
return 'meta/block'
return 'block'
def raw_block_path(self, height):
return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}')
def read_raw_block(self, height):
async def read_raw_block(self, height):
"""Returns a raw block read from disk. Raises FileNotFoundError
if the block isn't on-disk."""
def read():
with util.open_file(self.raw_block_path(height)) as f:
return f.read(-1)
return await asyncio.get_event_loop().run_in_executor(self.executor, read)
def write_raw_block(self, block, height):
"""Write a raw block to disk."""
with util.open_truncate(self.raw_block_path(height)) as f:

View file

@ -13,6 +13,7 @@ import time
from abc import ABC, abstractmethod
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from prometheus_client import Histogram
import attr
@ -79,6 +80,16 @@ class MemPoolAPI(ABC):
daemon's height at the time the mempool was obtained."""
NAMESPACE = "wallet_server"
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
mempool_process_time_metric = Histogram(
"processed_mempool", "Time to process mempool and notify touched addresses",
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
class MemPool:
"""Representation of the daemon's mempool.
@ -93,7 +104,7 @@ class MemPool:
hashXs: hashX -> set of all hashes of txs touching the hashX
"""
def __init__(self, coin, api, refresh_secs=5.0, log_status_secs=120.0):
def __init__(self, coin, api, refresh_secs=1.0, log_status_secs=120.0):
assert isinstance(api, MemPoolAPI)
self.coin = coin
self.api = api
@ -107,6 +118,7 @@ class MemPool:
self.lock = asyncio.Lock()
self.wakeup = asyncio.Event()
self.executor = ThreadPoolExecutor(max(os.cpu_count() - 1, 1))
self.mempool_process_time_metric = mempool_process_time_metric
async def _logging(self, synchronized_event):
"""Print regular logs of mempool stats."""
@ -207,6 +219,7 @@ class MemPool:
async def _refresh_hashes(self, synchronized_event):
"""Refresh our view of the daemon's mempool."""
while True:
start = time.perf_counter()
height = self.api.cached_height()
hex_hashes = await self.api.mempool_hashes()
if height != await self.api.height():
@ -217,6 +230,8 @@ class MemPool:
synchronized_event.set()
synchronized_event.clear()
await self.api.on_mempool(touched, height)
duration = time.perf_counter() - start
self.mempool_process_time_metric.observe(duration)
try:
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)

View file

@ -123,6 +123,7 @@ HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
)
class SessionManager:
"""Holds global state about all sessions."""
@ -160,6 +161,18 @@ class SessionManager:
"clients", "Number of connections received per client version",
namespace=NAMESPACE, labelnames=("version",)
)
address_history_metric = Histogram(
"address_history", "Time to fetch an address history",
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
notifications_in_flight_metric = Gauge(
"notifications_in_flight", "Count of notifications in flight",
namespace=NAMESPACE
)
notifications_sent_metric = Histogram(
"notifications_sent", "Time to send an address notification",
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event):
@ -602,7 +615,6 @@ class SessionManager:
async def broadcast_transaction(self, raw_tx):
hex_hash = await self.daemon.broadcast_transaction(raw_tx)
self.mempool.wakeup.set()
self.txs_sent += 1
return hex_hash
@ -923,36 +935,40 @@ class LBRYElectrumX(SessionBase):
args = (await self.subscribe_headers_result(), )
if not (await self.send_notification('blockchain.headers.subscribe', args)):
return
touched = touched.intersection(self.hashX_subs)
if touched or (height_changed and self.mempool_statuses):
changed = {}
for hashX in touched:
alias = self.hashX_subs[hashX]
status = await self.address_status(hashX)
changed[alias] = status
# Check mempool hashXs - the status is a function of the
# confirmed state of other transactions. Note: we cannot
# iterate over mempool_statuses as it changes size.
for hashX in tuple(self.mempool_statuses):
# Items can be evicted whilst await-ing status; False
# ensures such hashXs are notified
old_status = self.mempool_statuses.get(hashX, False)
status = await self.address_status(hashX)
if status != old_status:
alias = self.hashX_subs[hashX]
changed[alias] = status
for alias, status in changed.items():
async def send_history_notification(alias, hashX):
start = time.perf_counter()
if len(alias) == 64:
method = 'blockchain.scripthash.subscribe'
else:
method = 'blockchain.address.subscribe'
asyncio.create_task(self.send_notification(method, (alias, status)))
if changed:
es = '' if len(changed) == 1 else 'es'
self.logger.info(f'notified of {len(changed):,d} address{es}')
try:
self.session_mgr.notifications_in_flight_metric.inc()
status = await self.address_status(hashX)
self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
start = time.perf_counter()
await self.send_notification(method, (alias, status))
self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start)
finally:
self.session_mgr.notifications_in_flight_metric.dec()
touched = touched.intersection(self.hashX_subs)
if touched or (height_changed and self.mempool_statuses):
notified = set()
mempool_addrs = tuple(self.mempool_statuses.keys())
for hashX in touched:
alias = self.hashX_subs[hashX]
asyncio.create_task(send_history_notification(alias, hashX))
notified.add(hashX)
for hashX in mempool_addrs:
if hashX not in notified:
alias = self.hashX_subs[hashX]
asyncio.create_task(send_history_notification(alias, hashX))
notified.add(hashX)
if touched:
es = '' if len(touched) == 1 else 'es'
self.logger.info(f'notified {len(notified)} mempool/{len(touched):,d} touched address{es}')
def get_metrics_or_placeholder_for_api(self, query_name):
""" Do not hold on to a reference to the metrics
@ -1181,7 +1197,6 @@ class LBRYElectrumX(SessionBase):
self.mempool_statuses[hashX] = status
else:
self.mempool_statuses.pop(hashX, None)
return status
async def hashX_listunspent(self, hashX):
@ -1497,6 +1512,7 @@ class LBRYElectrumX(SessionBase):
try:
hex_hash = await self.session_mgr.broadcast_transaction(raw_tx)
self.txs_sent += 1
self.mempool.wakeup.set()
self.logger.info(f'sent tx: {hex_hash}')
return hex_hash
except DaemonError as e:

View file

@ -333,11 +333,13 @@ unpack_le_uint64_from = struct_le_Q.unpack_from
unpack_be_uint16_from = struct_be_H.unpack_from
unpack_be_uint32_from = struct_be_I.unpack_from
unpack_be_uint64 = lambda x: int.from_bytes(x, byteorder='big')
pack_le_int32 = struct_le_i.pack
pack_le_int64 = struct_le_q.pack
pack_le_uint16 = struct_le_H.pack
pack_le_uint32 = struct_le_I.pack
pack_le_uint64 = struct_le_Q.pack
pack_be_uint64 = lambda x: x.to_bytes(8, byteorder='big')
pack_be_uint16 = struct_be_H.pack
pack_be_uint32 = struct_be_I.pack
pack_byte = structB.pack

View file

@ -4,6 +4,7 @@ from lbry.testcase import CommandTestCase
class EpicAdventuresOfChris45(CommandTestCase):
async def test_no_this_is_not_a_test_its_an_adventure(self):
# Chris45 is an avid user of LBRY and this is his story. It's fact and fiction
# and everything in between; it's also the setting of some record setting
# integration tests.
@ -194,3 +195,9 @@ class EpicAdventuresOfChris45(CommandTestCase):
}},
await self.resolve(uri)
)
# He closes and opens the wallet server databases to see how horribly they break
db = self.conductor.spv_node.server.db
db.close()
await db.open_for_serving()
# They didn't! (error would be AssertionError: 276 vs 266 (264 counts) on startup)