Merge pull request #35 from lbryio/optional-address-history-status

Expose `--index_address_statuses` setting and improve first sync performance
This commit is contained in:
Jack Robison 2022-05-17 20:14:59 -04:00 committed by GitHub
commit 3b7850802a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 476 additions and 107 deletions

View file

@ -88,6 +88,8 @@ For various reasons it may be desirable to block or filtering content from claim
#### Options for `scribe`
- `--db_max_open_files` This setting translates into the max_open_files option given to rocksdb. A higher number will use more memory. Defaults to 64.
- `--address_history_cache_size` The count of items in the address history cache used for processing blocks and mempool updates. A higher number will use more memory, shouldn't ever need to be higher than 10000. Defaults to 1000.
- `--index_address_statuses` Maintain an index of the statuses of address transaction histories, this makes handling notifications for transactions in a block uniformly fast at the expense of more time to process new blocks and somewhat more disk space (~10gb as of block 1161417).
#### Options for `scribe-elastic-sync`
- `--reindex` If this flag is set drop and rebuild the elasticsearch index.
@ -103,6 +105,7 @@ For various reasons it may be desirable to block or filtering content from claim
- `--query_timeout_ms` Timeout for claim searches in elasticsearch in milliseconds. Can be set from the environment with `QUERY_TIMEOUT_MS`
- `--blocking_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels can't be resolved or returned in search results. Can be set from the environment with `BLOCKING_CHANNEL_IDS`.
- `--filtering_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels aren't returned in search results. Can be set from the environment with `FILTERING_CHANNEL_IDS`
- `--index_address_statuses` Use the address history status index, this makes handling notifications for transactions in a block uniformly fast (must be turned on in `scribe` too).
## Contributing

View file

@ -55,8 +55,8 @@ class LBCDaemon:
self._height = None
self.available_rpcs = {}
self.connector = aiohttp.TCPConnector(ssl=False)
self._block_hash_cache = LRUCacheWithMetrics(100000)
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
self._block_hash_cache = LRUCacheWithMetrics(1024)
self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE)
async def close(self):
if self.connector:

View file

@ -5,11 +5,16 @@ class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None):
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None, rebuild_address_status_from_height=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000)
self.rebuild_address_status_from_height = rebuild_address_status_from_height \
if isinstance(rebuild_address_status_from_height, int) else -1
@classmethod
def contribute_to_arg_parser(cls, parser):
@ -22,6 +27,14 @@ class BlockchainEnv(Env):
parser.add_argument('--db_max_open_files', type=int, default=64,
help='This setting translates into the max_open_files option given to rocksdb. '
'A higher number will use more memory. Defaults to 64.')
parser.add_argument('--address_history_cache_size', type=int,
default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000),
help="LRU cache size for address histories, used when processing new blocks "
"and when processing mempool updates. Can be set in env with "
"'ADDRESS_HISTORY_CACHE_SIZE'")
parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1,
help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a "
"block height to start reindexing from. Defaults to -1 (off).")
@classmethod
def from_arg_parser(cls, args):
@ -29,5 +42,7 @@ class BlockchainEnv(Env):
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
cache_all_claim_txos=args.cache_all_claim_txos
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
hashX_history_cache_size=args.address_history_cache_size,
rebuild_address_status_from_height=args.rebuild_address_status_from_height
)

View file

@ -45,30 +45,22 @@ class BlockchainProcessorService(BlockchainService):
def __init__(self, env: 'BlockchainEnv'):
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.env = env
self.daemon = LBCDaemon(env.coin, env.daemon_url)
self.mempool = MemPool(env.coin, self.db)
self.coin = env.coin
self.wait_for_blocks_duration = 0.1
self._ready_to_stop = asyncio.Event()
self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event)
self._caught_up_event: Optional[asyncio.Event] = None
self.height = 0
self.tip = bytes.fromhex(self.coin.GENESIS_HASH)[::-1]
self.tx_count = 0
self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event)
# self.logger = logging.getLogger(__name__)
# Meta
self.touched_hashXs: Set[bytes] = set()
# UTXO cache
self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {}
# Claimtrie cache
self.db_op_stack: Optional['RevertableOpStack'] = None
#################################
# attributes used for calculating stake activations and takeovers per block
#################################
@ -125,8 +117,9 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {}
self.hashX_history_cache = LRUCache(1000)
self.hashX_full_cache = LRUCache(1000)
self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
self.history_tx_info_cache = LRUCache(2 ** 16)
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
@ -154,6 +147,7 @@ class BlockchainProcessorService(BlockchainService):
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
if self.env.index_address_status:
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
for tx_hash, raw_tx in to_put:
@ -1254,29 +1248,24 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None)
else:
tx_nums = self.hashX_history_cache[hashX]
needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append
tx_infos = {}
for tx_num in tx_nums:
if tx_num in self.history_tx_info_cache:
tx_infos[tx_num] = self.history_tx_info_cache[tx_num]
else:
append_needed_tx_info(tx_num)
if needed_tx_infos:
for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)):
tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
history = ''
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_num in tx_nums:
history += tx_infos[tx_num]
self.hashX_full_cache[hashX] = history
return history
def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
existing = self.db.prefix_db.hashX_status.get(hashX)
if existing:
self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
if hashX not in self.hashX_history_cache:
tx_nums = self.db.read_history(hashX, limit=None)
else:
tx_nums = self.hashX_history_cache[hashX]
history = ''
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_hash, height in new_history:
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
@ -1286,23 +1275,6 @@ class BlockchainProcessorService(BlockchainService):
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes):
if height > self.env.reorg_limit: # compactify existing history
hist_txs = b''
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit
for k, hist in self.db.prefix_db.hashX_history.iterate(
start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit),
deserialize_key=False, deserialize_value=False):
hist_txs += hist
self.db.prefix_db.stage_raw_delete(k, hist)
if hist_txs:
# add the accumulated histories onto the existing compacted history at height 0
key = self.db.prefix_db.hashX_history.pack_key(hashX, 0)
existing = self.db.prefix_db.get(key)
if existing is not None:
self.db.prefix_db.stage_raw_delete(key, existing)
self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs)
def advance_block(self, block: Block):
height = self.height + 1
# print("advance ", height)
@ -1391,19 +1363,14 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
for k, v in self.db.prefix_db.hashX_mempool_status.iterate(
start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False):
self.db.prefix_db.stage_raw_delete(k, v)
# clear the mempool tx index
self._get_clear_mempool_ops()
for hashX, new_history in self.hashXs_by_tx.items():
# TODO: combine this with compaction so that we only read the history once
self._get_update_hashX_status_ops(
hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history]
)
self._get_compactify_hashX_history_ops(height, hashX)
if not new_history:
continue
self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,))
# update hashX history status hashes and compactify the histories
self._get_update_hashX_histories_ops(height)
if not self.db.catching_up and self.env.index_address_status:
self._get_compactify_ops(height)
self.tx_count = tx_count
self.db.tx_counts.append(self.tx_count)
@ -1446,6 +1413,94 @@ class BlockchainProcessorService(BlockchainService):
# print("*************\n")
return txo_count
def _get_clear_mempool_ops(self):
self.db.prefix_db.multi_delete(
list(self.db.prefix_db.hashX_mempool_status.iterate(start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ),
deserialize_key=False, deserialize_value=False))
)
def _get_update_hashX_histories_ops(self, height: int):
self.db.prefix_db.hashX_history.stage_multi_put(
[((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()]
)
def _get_compactify_ops(self, height: int):
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
if existing_hashX_statuses:
pack_key = self.db.prefix_db.hashX_status.pack_key
keys = [
pack_key(hashX) for hashX, existing in zip(
self.hashXs_by_tx, existing_hashX_statuses
)
]
self.db.prefix_db.multi_delete([(k, v) for k, v in zip(keys, existing_hashX_statuses) if v is not None])
block_hashX_history_deletes = []
append_deletes_hashX_history = block_hashX_history_deletes.append
block_hashX_history_puts = []
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
tx_nums = []
txs_extend = tx_nums.extend
compact_hist_txs = []
compact_txs_extend = compact_hist_txs.extend
history_item_0 = None
existing_item_0 = None
reorg_limit = self.env.reorg_limit
unpack_history = self.db.prefix_db.hashX_history.unpack_value
unpack_key = self.db.prefix_db.hashX_history.unpack_key
needs_compaction = False
total_hist_txs = b''
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
deserialize_value=False):
hist_txs = unpack_history(hist)
total_hist_txs += hist
txs_extend(hist_txs)
hist_height = unpack_key(k).height
if height > reorg_limit and hist_height < height - reorg_limit:
compact_txs_extend(hist_txs)
if hist_height == 0:
history_item_0 = (k, hist)
elif hist_height > 0:
needs_compaction = True
# self.db.prefix_db.stage_raw_delete(k, hist)
append_deletes_hashX_history((k, hist))
existing_item_0 = history_item_0
if needs_compaction:
# add the accumulated histories onto the existing compacted history at height 0
if existing_item_0 is not None: # delete if key 0 exists
key, existing = existing_item_0
append_deletes_hashX_history((key, existing))
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
if not new_history:
continue
needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append
tx_infos = {}
for tx_num in tx_nums:
if tx_num in self.history_tx_info_cache:
tx_infos[tx_num] = self.history_tx_info_cache[tx_num]
else:
append_needed_tx_info(tx_num)
if needed_tx_infos:
for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)):
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
tx_infos[tx_num] = tx_info
self.history_tx_info_cache[tx_num] = tx_info
history = ''.join(map(tx_infos.__getitem__, tx_nums))
for tx_hash, height in new_history:
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts)
def clear_after_advance_or_reorg(self):
self.txo_to_claim.clear()
self.claim_hash_to_txo.clear()
@ -1500,9 +1555,15 @@ class BlockchainProcessorService(BlockchainService):
if self.env.cache_all_tx_hashes:
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
self.db.tx_num_mapping.pop(self.db.total_transactions.pop())
if self.tx_count in self.history_tx_info_cache:
self.history_tx_info_cache.pop(self.tx_count)
self.tx_count -= 1
else:
self.tx_count = self.db.tx_counts[-1]
new_tx_count = self.db.tx_counts[-1]
while self.tx_count > new_tx_count:
if self.tx_count in self.history_tx_info_cache:
self.history_tx_info_cache.pop(self.tx_count)
self.tx_count -= 1
self.height -= 1
# self.touched can include other addresses which is
@ -1632,6 +1693,8 @@ class BlockchainProcessorService(BlockchainService):
self._ready_to_stop.set()
async def _need_catch_up(self):
self.log.info("database has fallen behind blockchain daemon, catching up")
self.db.catching_up = True
def flush():
@ -1644,6 +1707,10 @@ class BlockchainProcessorService(BlockchainService):
async def _finished_initial_catch_up(self):
self.log.info(f'caught up to height {self.height}')
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
# Flush everything but with catching_up->False state.
self.db.catching_up = False
@ -1659,6 +1726,9 @@ class BlockchainProcessorService(BlockchainService):
while self.db.db_version < max(self.db.DB_VERSIONS):
if self.db.db_version == 7:
from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION
elif self.db.db_version == 8:
from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION
self.db._index_address_status = self.env.index_address_status
else:
raise RuntimeError("unknown db version")
self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")
@ -1666,6 +1736,18 @@ class BlockchainProcessorService(BlockchainService):
self.log.info("finished migration")
self.db.read_db_state()
# update the hashX status index if was off before and is now on of if requested from a height
if (self.env.index_address_status and not self.db._index_address_status and self.db.last_indexed_address_status_height < self.db.db_height) or self.env.rebuild_address_status_from_height >= 0:
starting_height = self.db.last_indexed_address_status_height
if self.env.rebuild_address_status_from_height >= 0:
starting_height = self.env.rebuild_address_status_from_height
yield self.db.rebuild_hashX_status_index(starting_height)
elif self.db._index_address_status and not self.env.index_address_status:
self.log.warning("turned off address indexing at block %i", self.db.db_height)
self.db._index_address_status = False
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count

View file

@ -18,7 +18,7 @@ from scribe.schema.url import URL, normalize_name
from scribe.schema.claim import guess_stream_type
from scribe.schema.result import Censor
from scribe.blockchain.transaction import TxInput
from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics
from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256
from scribe.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO
from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
@ -34,12 +34,13 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
class HubDB:
DB_VERSIONS = [7, 8]
DB_VERSIONS = [7, 8, 9]
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None):
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
self.logger = logging.getLogger(__name__)
self.coin = coin
self._executor = executor
@ -52,6 +53,7 @@ class HubDB:
if secondary_name:
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files
self._index_address_status = index_address_status
self.prefix_db: typing.Optional[PrefixDB] = None
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
@ -61,6 +63,7 @@ class HubDB:
self.hist_comp_cursor = -1
self.es_sync_height = 0
self.last_indexed_address_status_height = 0
# blocking/filtering dicts
blocking_channels = blocking_channel_ids or []
@ -80,7 +83,7 @@ class HubDB:
self.tx_counts = None
self.headers = None
self.block_hashes = None
self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace=NAMESPACE)
self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE)
self.last_flush = time.time()
# Header merkle cache
@ -826,6 +829,8 @@ class HubDB:
# read db state
self.read_db_state()
self.logger.info("index address statuses: %s", self._index_address_status)
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
@ -860,7 +865,79 @@ class HubDB:
self.prefix_db.close()
self.prefix_db = None
def get_hashX_status(self, hashX: bytes):
def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...")
prefix_db = self.prefix_db
def hashX_iterator():
last_hashX = None
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
start = time.perf_counter()
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
include_key=False):
hashXs.update(touched.touched_hashXs)
hashXs = list(hashXs)
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
self._index_address_status = True
self.write_db_state()
self.prefix_db.unsafe_commit()
self.logger.info("finished indexing address statuses")
def rebuild_hashX_status_index(self, start_height: int):
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
def _get_hashX_status(self, hashX: bytes):
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
if mempool_status:
return mempool_status.hex()
@ -868,6 +945,9 @@ class HubDB:
if status:
return status.hex()
async def get_hashX_status(self, hashX: bytes):
return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_status, hashX)
def get_tx_hash(self, tx_num: int) -> bytes:
if self._cache_all_tx_hashes:
return self.total_transactions[tx_num]
@ -1119,13 +1199,18 @@ class HubDB:
def write_db_state(self):
"""Write (UTXO) state to the batch."""
last_indexed_address_status = 0
if self.db_height > 0:
self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get())
existing = self.prefix_db.db_state.get()
last_indexed_address_status = existing.hashX_status_last_indexed_height
self.prefix_db.db_state.stage_delete((), existing.expanded)
if self._index_address_status:
last_indexed_address_status = self.db_height
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height
self.es_sync_height, last_indexed_address_status
)
)
@ -1145,6 +1230,7 @@ class HubDB:
self.hist_comp_cursor = -1
self.hist_db_version = max(self.DB_VERSIONS)
self.es_sync_height = 0
self.last_indexed_address_status_height = 0
else:
self.db_version = state.db_version
if self.db_version not in self.DB_VERSIONS:
@ -1166,6 +1252,7 @@ class HubDB:
self.hist_comp_cursor = state.comp_cursor
self.hist_db_version = state.db_version
self.es_sync_height = state.es_sync_height
self.last_indexed_address_status_height = state.hashX_status_last_indexed_height
return state
def assert_db_state(self):

View file

@ -101,6 +101,9 @@ class PrefixRow(metaclass=PrefixRowType):
handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args
]
def stage_multi_put(self, items):
self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
packed_key = self.pack_key(*key_args)
last_op = self._op_stack.get_last_op_for_key(packed_key)
@ -178,7 +181,7 @@ class BasePrefixDB:
cf = self._db.get_column_family(prefix.value)
self.column_families[prefix.value] = cf
self._op_stack = RevertableOpStack(self.get, unsafe_prefixes=unsafe_prefixes)
self._op_stack = RevertableOpStack(self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes)
self._max_undo_depth = max_undo_depth
def unsafe_commit(self):
@ -259,6 +262,17 @@ class BasePrefixDB:
cf = self.column_families[key[:1]]
return self._db.get((cf, key), fill_cache=fill_cache)
def multi_get(self, keys: typing.List[bytes], fill_cache=True):
first_key = keys[0]
if not all(first_key[0] == key[0] for key in keys):
raise ValueError('cannot multi-delete across column families')
cf = self.column_families[first_key[:1]]
db_result = self._db.multi_get([(cf, k) for k in keys], fill_cache=fill_cache)
return list(db_result.values())
def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]):
self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items])
def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None,
iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None,
reverse: bool = False, include_key: bool = True, include_value: bool = True,

View file

@ -0,0 +1,26 @@
import logging
FROM_VERSION = 8
TO_VERSION = 9
def migrate(db):
log = logging.getLogger(__name__)
prefix_db = db.prefix_db
index_address_status = db._index_address_status
log.info("migrating the db to version 9")
if not index_address_status:
log.info("deleting the existing address status index")
to_delete = list(prefix_db.hashX_status.iterate(deserialize_key=False, deserialize_value=False))
while to_delete:
batch, to_delete = to_delete[:10000], to_delete[10000:]
if batch:
prefix_db.multi_delete(batch)
prefix_db.unsafe_commit()
db.db_version = 9
db.write_db_state()
db.prefix_db.unsafe_commit()
log.info("finished migration")

View file

@ -420,12 +420,40 @@ class DBState(typing.NamedTuple):
tip: bytes
utxo_flush_count: int
wall_time: int
catching_up: bool
bit_fields: int
db_version: int
hist_flush_count: int
comp_flush_count: int
comp_cursor: int
es_sync_height: int
hashX_status_last_indexed_height: int
@property
def catching_up(self) -> bool:
return self.bit_fields & 1 == 1
@property
def index_address_statuses(self) -> bool:
return self.bit_fields & 2 == 1
@property
def expanded(self):
return (
self.genesis,
self.height,
self.tx_count,
self.tip,
self.utxo_flush_count,
self.wall_time,
self.catching_up,
self.index_address_statuses,
self.db_version,
self.hist_flush_count,
self.comp_flush_count,
self.comp_cursor,
self.es_sync_height,
self.hashX_status_last_indexed_height
)
class ActiveAmountPrefixRow(PrefixRow):
@ -1420,7 +1448,7 @@ class SupportAmountPrefixRow(PrefixRow):
class DBStatePrefixRow(PrefixRow):
prefix = DB_PREFIXES.db_state.value
value_struct = struct.Struct(b'>32sLL32sLLBBlllL')
value_struct = struct.Struct(b'>32sLL32sLLBBlllLL')
key_struct = struct.Struct(b'')
key_part_lambdas = [
@ -1437,12 +1465,16 @@ class DBStatePrefixRow(PrefixRow):
@classmethod
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int, es_sync_height: int) -> bytes:
catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int,
comp_flush_count: int, comp_cursor: int, es_sync_height: int,
last_indexed_address_statuses: int) -> bytes:
bit_fields = 0
bit_fields |= int(catching_up) << 0
bit_fields |= int(index_address_statuses) << 1
return super().pack_value(
genesis, height, tx_count, tip, utxo_flush_count,
wall_time, 1 if catching_up else 0, db_version, hist_flush_count,
comp_flush_count, comp_cursor, es_sync_height
wall_time, bit_fields, db_version, hist_flush_count,
comp_flush_count, comp_cursor, es_sync_height, last_indexed_address_statuses
)
@classmethod
@ -1451,15 +1483,18 @@ class DBStatePrefixRow(PrefixRow):
# TODO: delete this after making a new snapshot - 10/20/21
# migrate in the es_sync_height if it doesnt exist
data += data[32:36]
if len(data) == 98:
data += data[32:36]
return DBState(*super().unpack_value(data))
@classmethod
def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int, es_sync_height: int):
catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int,
comp_flush_count: int, comp_cursor: int, es_sync_height: int, last_indexed_address_statuses: int):
return cls.pack_key(), cls.pack_value(
genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count,
comp_flush_count, comp_cursor, es_sync_height
genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, index_address_statuses,
db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height,
last_indexed_address_statuses
)

View file

@ -2,7 +2,7 @@ import struct
import logging
from string import printable
from collections import defaultdict
from typing import Tuple, Iterable, Callable, Optional
from typing import Tuple, Iterable, Callable, Optional, List
from scribe.db.common import DB_PREFIXES
_OP_STRUCT = struct.Struct('>BLL')
@ -82,7 +82,8 @@ class OpStackIntegrity(Exception):
class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]],
multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None):
"""
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
@ -95,6 +96,7 @@ class RevertableOpStack:
:param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged
"""
self._get = get_fn
self._multi_get = multi_get_fn
self._items = defaultdict(list)
self._unsafe_prefixes = unsafe_prefixes or set()
@ -133,6 +135,88 @@ class RevertableOpStack:
raise err
self._items[op.key].append(op)
def multi_put(self, ops: List[RevertablePut]):
"""
Apply a put or delete op, checking that it introduces no integrity errors
"""
if not ops:
return
need_put = []
if not all(op.is_put for op in ops):
raise ValueError(f"list must contain only puts")
if not len(set(map(lambda op: op.key, ops))) == len(ops):
raise ValueError(f"list must contain unique keys")
for op in ops:
if self._items[op.key] and op.invert() == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
continue
elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op
continue # raise an error?
else:
need_put.append(op)
for op, stored_val in zip(need_put, self._multi_get(list(map(lambda item: item.key, need_put)))):
has_stored_val = stored_val is not None
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
try:
if has_stored_val and not will_delete_existing_stored:
raise OpStackIntegrity(f"db op tries to overwrite before deleting existing: {op}")
except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes:
log.debug(f"skipping over integrity error: {err}")
else:
raise err
self._items[op.key].append(op)
def multi_delete(self, ops: List[RevertableDelete]):
"""
Apply a put or delete op, checking that it introduces no integrity errors
"""
if not ops:
return
need_delete = []
if not all(op.is_delete for op in ops):
raise ValueError(f"list must contain only deletes")
if not len(set(map(lambda op: op.key, ops))) == len(ops):
raise ValueError(f"list must contain unique keys")
for op in ops:
if self._items[op.key] and op.invert() == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
continue
elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op
continue # raise an error?
else:
need_delete.append(op)
for op, stored_val in zip(need_delete, self._multi_get(list(map(lambda item: item.key, need_delete)))):
has_stored_val = stored_val is not None
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
try:
if op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}")
elif not stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif op.is_delete and stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes:
log.debug(f"skipping over integrity error: {err}")
else:
raise err
self._items[op.key].append(op)
def extend_ops(self, ops: Iterable[RevertableOp]):
"""
Apply a sequence of put or delete ops, checking that they introduce no integrity errors

View file

@ -31,7 +31,7 @@ class Env:
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None):
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
self.logger = logging.getLogger(__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
@ -52,6 +52,8 @@ class Env:
'BLOCKING_CHANNEL_IDS', '').split(' ')
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default(
'FILTERING_CHANNEL_IDS', '').split(' ')
self.index_address_status = index_address_status if index_address_status is not None else \
self.boolean('INDEX_ADDRESS_STATUS', False)
@classmethod
def default(cls, envvar, default):
@ -187,6 +189,12 @@ class Env:
"Claims that are reposted by these channels aren't returned in search results. "
"Can be set in env with 'FILTERING_CHANNEL_IDS'",
default=cls.default('FILTERING_CHANNEL_IDS', '').split(' '))
parser.add_argument('--index_address_statuses', action='store_true',
help="Use precomputed address statuses, must be enabled in the reader and the writer to "
"use it. If disabled (the default), the status of an address must be calculated at "
"runtime when clients request it (address subscriptions, address history sync). "
"If enabled, scribe will maintain an index of precomputed statuses",
default=cls.boolean('INDEX_ADDRESS_STATUS', False))
@classmethod
def from_arg_parser(cls, args):

View file

@ -10,9 +10,10 @@ class ServerEnv(Env):
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None):
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
index_address_status=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.host = host if host is not None else self.default('HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
@ -109,5 +110,5 @@ class ServerEnv(Env):
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
elastic_notifier_port=args.elastic_notifier_port
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses
)

View file

@ -157,6 +157,14 @@ class HubMemPool:
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
return result
def mempool_history(self, hashX: bytes) -> str:
result = ''
for tx_hash in self.touched_hashXs.get(hashX, ()):
if tx_hash not in self.txs:
continue # the tx hash for the touched address is an input that isn't in mempool anymore
result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:'
return result
def unordered_UTXOs(self, hashX):
"""Return an unordered list of UTXO named tuples from mempool
transactions that pay to hashX.
@ -276,7 +284,6 @@ class HubMemPool:
if session.subscribe_headers and height_changed:
sent_headers += 1
self._notification_q.put_nowait((session_id, height_changed, hashXes))
if sent_headers:
self.logger.info(f'notified {sent_headers} sessions of new block header')
if session_hashxes_to_notify:

View file

@ -128,7 +128,7 @@ class SessionManager:
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",))
request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
labelnames=("method", "version"))
labelnames=("method",))
tx_request_count_metric = Counter("requested_transaction", "Number of transactions requested", namespace=NAMESPACE)
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
@ -644,9 +644,9 @@ class LBRYElectrumX(asyncio.Protocol):
MAX_CHUNK_SIZE = 40960
session_counter = itertools.count()
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE,
labelnames=("method", "version"), buckets=HISTOGRAM_BUCKETS)
labelnames=("method",), buckets=HISTOGRAM_BUCKETS)
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
namespace=NAMESPACE, labelnames=("method", "version"))
namespace=NAMESPACE, labelnames=("method",))
REQUEST_ERRORS_COUNT = Counter(
"request_error", "Number of requests that returned errors", namespace=NAMESPACE,
labelnames=("method", "version")
@ -793,7 +793,7 @@ class LBRYElectrumX(asyncio.Protocol):
"""Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions.
"""
self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc()
self.session_manager.request_count_metric.labels(method=request.method).inc()
if isinstance(request, Request):
method = request.method
@ -981,10 +981,7 @@ class LBRYElectrumX(asyncio.Protocol):
'internal server error')
if isinstance(request, Request):
message = request.send_result(result)
self.RESPONSE_TIMES.labels(
method=request.method,
version=self.client_version
).observe(time.perf_counter() - start)
self.RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start)
if message:
await self._send_message(message)
if isinstance(result, Exception):
@ -1014,7 +1011,7 @@ class LBRYElectrumX(asyncio.Protocol):
async def send_notification(self, method, args=()) -> bool:
"""Send an RPC notification over the network."""
message = self.connection.send_notification(Notification(method, args))
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
self.NOTIFICATION_COUNT.labels(method=method).inc()
try:
await self._send_message(message)
return True
@ -1089,7 +1086,16 @@ class LBRYElectrumX(asyncio.Protocol):
return len(self.hashX_subs)
async def get_hashX_status(self, hashX: bytes):
return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX)
if self.env.index_address_status:
return await self.db.get_hashX_status(hashX)
history = ''.join(
f"{tx_hash[::-1].hex()}:{height:d}:"
for tx_hash, height in await self.db.limited_history(hashX, limit=None)
) + self.mempool.mempool_history(hashX)
if not history:
return
status = sha256(history.encode())
return status.hex()
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
notifications = []
@ -1110,7 +1116,7 @@ class LBRYElectrumX(asyncio.Protocol):
start = time.perf_counter()
self.session_manager.notifications_in_flight_metric.inc()
for method, args in notifications:
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
self.NOTIFICATION_COUNT.labels(method=method,).inc()
try:
await self.send_notifications(
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])

View file

@ -30,7 +30,8 @@ class BlockchainService:
self.db = HubDB(
env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
)
self._stopping = False