add --index_address_statuses option

-scribe no longer writes address statuses nor compacts them during initial sync
-scribe will only precompute address statuses if `--index_address_statuses` is set
-combine history compaction with updating the address status
This commit is contained in:
Jack Robison 2022-05-10 18:55:55 -04:00
parent e4ac106b98
commit d244136efd
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
7 changed files with 128 additions and 34 deletions

View file

@ -5,14 +5,16 @@ class BlockchainEnv(Env):
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None,
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None):
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
index_address_status=None):
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
self.db_max_open_files = db_max_open_files
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000)
@classmethod
def contribute_to_arg_parser(cls, parser):
super().contribute_to_arg_parser(parser)

View file

@ -146,8 +146,9 @@ class BlockchainProcessorService(BlockchainService):
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
if self.env.index_address_status:
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
for tx_hash, raw_tx in to_put:
mempool_prefix.stage_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items():
@ -1290,22 +1291,6 @@ class BlockchainProcessorService(BlockchainService):
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes):
if height > self.env.reorg_limit: # compactify existing history
hist_txs = b''
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit
for k, hist in self.db.prefix_db.hashX_history.iterate(
start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit),
deserialize_key=False, deserialize_value=False):
hist_txs += hist
self.db.prefix_db.stage_raw_delete(k, hist)
if hist_txs:
# add the accumulated histories onto the existing compacted history at height 0
key = self.db.prefix_db.hashX_history.pack_key(hashX, 0)
existing = self.db.prefix_db.get(key)
if existing is not None:
self.db.prefix_db.stage_raw_delete(key, existing)
self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs)
def advance_block(self, block: Block):
height = self.height + 1
@ -1398,15 +1383,11 @@ class BlockchainProcessorService(BlockchainService):
# clear the mempool tx index
self._get_clear_mempool_ops()
for hashX, new_history in self.hashXs_by_tx.items():
# TODO: combine this with compaction so that we only read the history once
self._get_update_hashX_status_ops(
hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history]
)
self._get_compactify_hashX_history_ops(height, hashX)
if not new_history:
continue
self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,))
# update hashX history status hashes and compactify the histories
self._get_update_hashX_histories_ops(height)
if not self.db.catching_up and self.env.index_address_status:
self._get_compactify_ops(height)
self.tx_count = tx_count
self.db.tx_counts.append(self.tx_count)
@ -1455,6 +1436,88 @@ class BlockchainProcessorService(BlockchainService):
deserialize_key=False, deserialize_value=False))
)
def _get_update_hashX_histories_ops(self, height: int):
self.db.prefix_db.hashX_history.stage_multi_put(
[((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()]
)
def _get_compactify_ops(self, height: int):
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
if existing_hashX_statuses:
pack_key = self.db.prefix_db.hashX_status.pack_key
keys = [
pack_key(hashX) for hashX, existing in zip(
self.hashXs_by_tx, existing_hashX_statuses
)
]
self.db.prefix_db.multi_delete([(k, v) for k, v in zip(keys, existing_hashX_statuses) if v is not None])
block_hashX_history_deletes = []
append_deletes_hashX_history = block_hashX_history_deletes.append
block_hashX_history_puts = []
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
tx_nums = []
txs_extend = tx_nums.extend
compact_hist_txs = []
compact_txs_extend = compact_hist_txs.extend
history_item_0 = None
existing_item_0 = None
reorg_limit = self.env.reorg_limit
unpack_history = self.db.prefix_db.hashX_history.unpack_value
unpack_key = self.db.prefix_db.hashX_history.unpack_key
needs_compaction = False
total_hist_txs = b''
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
deserialize_value=False):
hist_txs = unpack_history(hist)
total_hist_txs += hist
txs_extend(hist_txs)
hist_height = unpack_key(k).height
if height > reorg_limit and hist_height < height - reorg_limit:
compact_txs_extend(hist_txs)
if hist_height == 0:
history_item_0 = (k, hist)
elif hist_height > 0:
needs_compaction = True
# self.db.prefix_db.stage_raw_delete(k, hist)
append_deletes_hashX_history((k, hist))
existing_item_0 = history_item_0
if needs_compaction:
# add the accumulated histories onto the existing compacted history at height 0
if existing_item_0 is not None: # delete if key 0 exists
key, existing = existing_item_0
append_deletes_hashX_history((key, existing))
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
if not new_history:
continue
needed_tx_infos = []
append_needed_tx_info = needed_tx_infos.append
tx_infos = {}
for tx_num in tx_nums:
if tx_num in self.history_tx_info_cache:
tx_infos[tx_num] = self.history_tx_info_cache[tx_num]
else:
append_needed_tx_info(tx_num)
if needed_tx_infos:
for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)):
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
tx_infos[tx_num] = tx_info
self.history_tx_info_cache[tx_num] = tx_info
history = ''.join(map(tx_infos.__getitem__, tx_nums))
for tx_hash, height in new_history:
history += f'{tx_hash[::-1].hex()}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts)
def clear_after_advance_or_reorg(self):
self.txo_to_claim.clear()
self.claim_hash_to_txo.clear()

View file

@ -39,7 +39,8 @@ class HubDB:
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None):
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
self.logger = logging.getLogger(__name__)
self.coin = coin
self._executor = executor
@ -52,6 +53,7 @@ class HubDB:
if secondary_name:
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
self._db_max_open_files = max_open_files
self._index_address_status = index_address_status
self.prefix_db: typing.Optional[PrefixDB] = None
self.hist_unflushed = defaultdict(partial(array.array, 'I'))

View file

@ -31,7 +31,7 @@ class Env:
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
blocking_channel_ids=None, filtering_channel_ids=None):
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
self.logger = logging.getLogger(__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
@ -52,6 +52,8 @@ class Env:
'BLOCKING_CHANNEL_IDS', '').split(' ')
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default(
'FILTERING_CHANNEL_IDS', '').split(' ')
self.index_address_status = index_address_status if index_address_status is not None else \
self.boolean('INDEX_ADDRESS_STATUS', False)
@classmethod
def default(cls, envvar, default):
@ -187,6 +189,12 @@ class Env:
"Claims that are reposted by these channels aren't returned in search results. "
"Can be set in env with 'FILTERING_CHANNEL_IDS'",
default=cls.default('FILTERING_CHANNEL_IDS', '').split(' '))
parser.add_argument('--index_address_statuses', action='store_true',
help="Use precomputed address statuses, must be enabled in the reader and the writer to "
"use it. If disabled (the default), the status of an address must be calculated at "
"runtime when clients request it (address subscriptions, address history sync). "
"If enabled, scribe will maintain an index of precomputed statuses",
default=cls.boolean('INDEX_ADDRESS_STATUS', False))
@classmethod
def from_arg_parser(cls, args):

View file

@ -157,6 +157,14 @@ class HubMemPool:
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
return result
def mempool_history(self, hashX: bytes) -> str:
result = ''
for tx_hash in self.touched_hashXs.get(hashX, ()):
if tx_hash not in self.txs:
continue # the tx hash for the touched address is an input that isn't in mempool anymore
result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:'
return result
def unordered_UTXOs(self, hashX):
"""Return an unordered list of UTXO named tuples from mempool
transactions that pay to hashX.
@ -276,7 +284,6 @@ class HubMemPool:
if session.subscribe_headers and height_changed:
sent_headers += 1
self._notification_q.put_nowait((session_id, height_changed, hashXes))
if sent_headers:
self.logger.info(f'notified {sent_headers} sessions of new block header')
if session_hashxes_to_notify:

View file

@ -1089,7 +1089,18 @@ class LBRYElectrumX(asyncio.Protocol):
return len(self.hashX_subs)
async def get_hashX_status(self, hashX: bytes):
return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX)
self.session_manager.db.last_flush
if self.env.index_address_status:
loop = self.loop
return await loop.run_in_executor(None, self.db.get_hashX_status, hashX)
history = ''.join(
f"{tx_hash[::-1].hex()}:{height:d}:"
for tx_hash, height in await self.db.limited_history(hashX, limit=None)
) + self.mempool.mempool_history(hashX)
if not history:
return
status = sha256(history.encode())
return status.hex()
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
notifications = []

View file

@ -30,7 +30,8 @@ class BlockchainService:
self.db = HubDB(
env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
)
self._stopping = False