Expose --index_address_statuses
setting and improve first sync performance #35
14 changed files with 476 additions and 107 deletions
|
@ -88,6 +88,8 @@ For various reasons it may be desirable to block or filtering content from claim
|
|||
|
||||
#### Options for `scribe`
|
||||
- `--db_max_open_files` This setting translates into the max_open_files option given to rocksdb. A higher number will use more memory. Defaults to 64.
|
||||
- `--address_history_cache_size` The count of items in the address history cache used for processing blocks and mempool updates. A higher number will use more memory, shouldn't ever need to be higher than 10000. Defaults to 1000.
|
||||
- `--index_address_statuses` Maintain an index of the statuses of address transaction histories, this makes handling notifications for transactions in a block uniformly fast at the expense of more time to process new blocks and somewhat more disk space (~10gb as of block 1161417).
|
||||
|
||||
#### Options for `scribe-elastic-sync`
|
||||
- `--reindex` If this flag is set drop and rebuild the elasticsearch index.
|
||||
|
@ -103,6 +105,7 @@ For various reasons it may be desirable to block or filtering content from claim
|
|||
- `--query_timeout_ms` Timeout for claim searches in elasticsearch in milliseconds. Can be set from the environment with `QUERY_TIMEOUT_MS`
|
||||
- `--blocking_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels can't be resolved or returned in search results. Can be set from the environment with `BLOCKING_CHANNEL_IDS`.
|
||||
- `--filtering_channel_ids` Space separated list of channel claim ids used for blocking. Claims that are reposted by these channels aren't returned in search results. Can be set from the environment with `FILTERING_CHANNEL_IDS`
|
||||
- `--index_address_statuses` Use the address history status index, this makes handling notifications for transactions in a block uniformly fast (must be turned on in `scribe` too).
|
||||
|
||||
## Contributing
|
||||
|
||||
|
|
|
@ -55,8 +55,8 @@ class LBCDaemon:
|
|||
self._height = None
|
||||
self.available_rpcs = {}
|
||||
self.connector = aiohttp.TCPConnector(ssl=False)
|
||||
self._block_hash_cache = LRUCacheWithMetrics(100000)
|
||||
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
|
||||
self._block_hash_cache = LRUCacheWithMetrics(1024)
|
||||
self._block_cache = LRUCacheWithMetrics(64, metric_name='block', namespace=NAMESPACE)
|
||||
|
||||
async def close(self):
|
||||
if self.connector:
|
||||
|
|
|
@ -5,11 +5,16 @@ class BlockchainEnv(Env):
|
|||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None,
|
||||
db_max_open_files=64, daemon_url=None):
|
||||
db_max_open_files=64, daemon_url=None, hashX_history_cache_size=None,
|
||||
index_address_status=None, rebuild_address_status_from_height=None):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
self.db_max_open_files = db_max_open_files
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.hashX_history_cache_size = hashX_history_cache_size if hashX_history_cache_size is not None \
|
||||
else self.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000)
|
||||
self.rebuild_address_status_from_height = rebuild_address_status_from_height \
|
||||
if isinstance(rebuild_address_status_from_height, int) else -1
|
||||
|
||||
@classmethod
|
||||
def contribute_to_arg_parser(cls, parser):
|
||||
|
@ -22,6 +27,14 @@ class BlockchainEnv(Env):
|
|||
parser.add_argument('--db_max_open_files', type=int, default=64,
|
||||
help='This setting translates into the max_open_files option given to rocksdb. '
|
||||
'A higher number will use more memory. Defaults to 64.')
|
||||
parser.add_argument('--address_history_cache_size', type=int,
|
||||
default=cls.integer('ADDRESS_HISTORY_CACHE_SIZE', 1000),
|
||||
help="LRU cache size for address histories, used when processing new blocks "
|
||||
"and when processing mempool updates. Can be set in env with "
|
||||
"'ADDRESS_HISTORY_CACHE_SIZE'")
|
||||
parser.add_argument('--rebuild_address_status_from_height', type=int, default=-1,
|
||||
help="Rebuild address statuses, set to 0 to reindex all address statuses or provide a "
|
||||
"block height to start reindexing from. Defaults to -1 (off).")
|
||||
|
||||
@classmethod
|
||||
def from_arg_parser(cls, args):
|
||||
|
@ -29,5 +42,7 @@ class BlockchainEnv(Env):
|
|||
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
|
||||
max_query_workers=args.max_query_workers, chain=args.chain, reorg_limit=args.reorg_limit,
|
||||
prometheus_port=args.prometheus_port, cache_all_tx_hashes=args.cache_all_tx_hashes,
|
||||
cache_all_claim_txos=args.cache_all_claim_txos
|
||||
cache_all_claim_txos=args.cache_all_claim_txos, index_address_status=args.index_address_statuses,
|
||||
hashX_history_cache_size=args.address_history_cache_size,
|
||||
rebuild_address_status_from_height=args.rebuild_address_status_from_height
|
||||
)
|
||||
|
|
|
@ -45,30 +45,22 @@ class BlockchainProcessorService(BlockchainService):
|
|||
|
||||
def __init__(self, env: 'BlockchainEnv'):
|
||||
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
|
||||
self.env = env
|
||||
self.daemon = LBCDaemon(env.coin, env.daemon_url)
|
||||
self.mempool = MemPool(env.coin, self.db)
|
||||
self.coin = env.coin
|
||||
self.wait_for_blocks_duration = 0.1
|
||||
self._ready_to_stop = asyncio.Event()
|
||||
|
||||
self.blocks_event = asyncio.Event()
|
||||
self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event)
|
||||
self._caught_up_event: Optional[asyncio.Event] = None
|
||||
self.height = 0
|
||||
self.tip = bytes.fromhex(self.coin.GENESIS_HASH)[::-1]
|
||||
self.tx_count = 0
|
||||
|
||||
self.blocks_event = asyncio.Event()
|
||||
self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event)
|
||||
# self.logger = logging.getLogger(__name__)
|
||||
|
||||
# Meta
|
||||
self.touched_hashXs: Set[bytes] = set()
|
||||
|
||||
# UTXO cache
|
||||
self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {}
|
||||
|
||||
# Claimtrie cache
|
||||
self.db_op_stack: Optional['RevertableOpStack'] = None
|
||||
|
||||
#################################
|
||||
# attributes used for calculating stake activations and takeovers per block
|
||||
#################################
|
||||
|
@ -125,8 +117,9 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
|
||||
self.pending_transactions: Dict[int, bytes] = {}
|
||||
|
||||
self.hashX_history_cache = LRUCache(1000)
|
||||
self.hashX_full_cache = LRUCache(1000)
|
||||
self.hashX_history_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
|
||||
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
|
||||
self.history_tx_info_cache = LRUCache(2 ** 16)
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
# Run in a thread to prevent blocking. Shielded so that
|
||||
|
@ -154,8 +147,9 @@ class BlockchainProcessorService(BlockchainService):
|
|||
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
|
||||
self.mempool.remove(to_delete)
|
||||
touched_hashXs = self.mempool.update_mempool(to_put)
|
||||
for hashX in touched_hashXs:
|
||||
self._get_update_hashX_mempool_status_ops(hashX)
|
||||
if self.env.index_address_status:
|
||||
for hashX in touched_hashXs:
|
||||
self._get_update_hashX_mempool_status_ops(hashX)
|
||||
for tx_hash, raw_tx in to_put:
|
||||
mempool_prefix.stage_put((tx_hash,), (raw_tx,))
|
||||
for tx_hash, raw_tx in to_delete.items():
|
||||
|
@ -1254,29 +1248,24 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self.hashX_history_cache[hashX] = tx_nums = self.db.read_history(hashX, limit=None)
|
||||
else:
|
||||
tx_nums = self.hashX_history_cache[hashX]
|
||||
needed_tx_infos = []
|
||||
append_needed_tx_info = needed_tx_infos.append
|
||||
tx_infos = {}
|
||||
for tx_num in tx_nums:
|
||||
if tx_num in self.history_tx_info_cache:
|
||||
tx_infos[tx_num] = self.history_tx_info_cache[tx_num]
|
||||
else:
|
||||
append_needed_tx_info(tx_num)
|
||||
if needed_tx_infos:
|
||||
for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)):
|
||||
tx_infos[tx_num] = self.history_tx_info_cache[tx_num] = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
|
||||
history = ''
|
||||
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
|
||||
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
for tx_num in tx_nums:
|
||||
history += tx_infos[tx_num]
|
||||
self.hashX_full_cache[hashX] = history
|
||||
return history
|
||||
|
||||
def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
|
||||
existing = self.db.prefix_db.hashX_status.get(hashX)
|
||||
if existing:
|
||||
self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
|
||||
if hashX not in self.hashX_history_cache:
|
||||
tx_nums = self.db.read_history(hashX, limit=None)
|
||||
else:
|
||||
tx_nums = self.hashX_history_cache[hashX]
|
||||
history = ''
|
||||
for tx_num, tx_hash in zip(tx_nums, self.db.get_tx_hashes(tx_nums)):
|
||||
history += f'{hash_to_hex_str(tx_hash)}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
for tx_hash, height in new_history:
|
||||
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
|
||||
if history:
|
||||
status = sha256(history.encode())
|
||||
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
|
||||
|
||||
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
|
||||
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
|
||||
if existing:
|
||||
|
@ -1286,23 +1275,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
status = sha256(history.encode())
|
||||
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
|
||||
|
||||
def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes):
|
||||
if height > self.env.reorg_limit: # compactify existing history
|
||||
hist_txs = b''
|
||||
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit
|
||||
for k, hist in self.db.prefix_db.hashX_history.iterate(
|
||||
start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit),
|
||||
deserialize_key=False, deserialize_value=False):
|
||||
hist_txs += hist
|
||||
self.db.prefix_db.stage_raw_delete(k, hist)
|
||||
if hist_txs:
|
||||
# add the accumulated histories onto the existing compacted history at height 0
|
||||
key = self.db.prefix_db.hashX_history.pack_key(hashX, 0)
|
||||
existing = self.db.prefix_db.get(key)
|
||||
if existing is not None:
|
||||
self.db.prefix_db.stage_raw_delete(key, existing)
|
||||
self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs)
|
||||
|
||||
def advance_block(self, block: Block):
|
||||
height = self.height + 1
|
||||
# print("advance ", height)
|
||||
|
@ -1391,19 +1363,14 @@ class BlockchainProcessorService(BlockchainService):
|
|||
|
||||
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
|
||||
|
||||
for k, v in self.db.prefix_db.hashX_mempool_status.iterate(
|
||||
start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False):
|
||||
self.db.prefix_db.stage_raw_delete(k, v)
|
||||
# clear the mempool tx index
|
||||
self._get_clear_mempool_ops()
|
||||
|
||||
for hashX, new_history in self.hashXs_by_tx.items():
|
||||
# TODO: combine this with compaction so that we only read the history once
|
||||
self._get_update_hashX_status_ops(
|
||||
hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history]
|
||||
)
|
||||
self._get_compactify_hashX_history_ops(height, hashX)
|
||||
if not new_history:
|
||||
continue
|
||||
self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,))
|
||||
# update hashX history status hashes and compactify the histories
|
||||
self._get_update_hashX_histories_ops(height)
|
||||
|
||||
if not self.db.catching_up and self.env.index_address_status:
|
||||
self._get_compactify_ops(height)
|
||||
|
||||
self.tx_count = tx_count
|
||||
self.db.tx_counts.append(self.tx_count)
|
||||
|
@ -1446,6 +1413,94 @@ class BlockchainProcessorService(BlockchainService):
|
|||
# print("*************\n")
|
||||
return txo_count
|
||||
|
||||
def _get_clear_mempool_ops(self):
|
||||
self.db.prefix_db.multi_delete(
|
||||
list(self.db.prefix_db.hashX_mempool_status.iterate(start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ),
|
||||
deserialize_key=False, deserialize_value=False))
|
||||
)
|
||||
|
||||
def _get_update_hashX_histories_ops(self, height: int):
|
||||
self.db.prefix_db.hashX_history.stage_multi_put(
|
||||
[((hashX, height), (new_tx_nums,)) for hashX, new_tx_nums in self.hashXs_by_tx.items()]
|
||||
)
|
||||
|
||||
def _get_compactify_ops(self, height: int):
|
||||
existing_hashX_statuses = self.db.prefix_db.hashX_status.multi_get([(hashX,) for hashX in self.hashXs_by_tx.keys()], deserialize_value=False)
|
||||
if existing_hashX_statuses:
|
||||
pack_key = self.db.prefix_db.hashX_status.pack_key
|
||||
keys = [
|
||||
pack_key(hashX) for hashX, existing in zip(
|
||||
self.hashXs_by_tx, existing_hashX_statuses
|
||||
)
|
||||
]
|
||||
self.db.prefix_db.multi_delete([(k, v) for k, v in zip(keys, existing_hashX_statuses) if v is not None])
|
||||
|
||||
block_hashX_history_deletes = []
|
||||
append_deletes_hashX_history = block_hashX_history_deletes.append
|
||||
block_hashX_history_puts = []
|
||||
|
||||
for (hashX, new_tx_nums), existing in zip(self.hashXs_by_tx.items(), existing_hashX_statuses):
|
||||
new_history = [(self.pending_transactions[tx_num], height) for tx_num in new_tx_nums]
|
||||
|
||||
tx_nums = []
|
||||
txs_extend = tx_nums.extend
|
||||
compact_hist_txs = []
|
||||
compact_txs_extend = compact_hist_txs.extend
|
||||
history_item_0 = None
|
||||
existing_item_0 = None
|
||||
reorg_limit = self.env.reorg_limit
|
||||
unpack_history = self.db.prefix_db.hashX_history.unpack_value
|
||||
unpack_key = self.db.prefix_db.hashX_history.unpack_key
|
||||
needs_compaction = False
|
||||
|
||||
total_hist_txs = b''
|
||||
for k, hist in self.db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_key=False,
|
||||
deserialize_value=False):
|
||||
hist_txs = unpack_history(hist)
|
||||
total_hist_txs += hist
|
||||
txs_extend(hist_txs)
|
||||
hist_height = unpack_key(k).height
|
||||
if height > reorg_limit and hist_height < height - reorg_limit:
|
||||
compact_txs_extend(hist_txs)
|
||||
if hist_height == 0:
|
||||
history_item_0 = (k, hist)
|
||||
elif hist_height > 0:
|
||||
needs_compaction = True
|
||||
# self.db.prefix_db.stage_raw_delete(k, hist)
|
||||
append_deletes_hashX_history((k, hist))
|
||||
existing_item_0 = history_item_0
|
||||
if needs_compaction:
|
||||
# add the accumulated histories onto the existing compacted history at height 0
|
||||
if existing_item_0 is not None: # delete if key 0 exists
|
||||
key, existing = existing_item_0
|
||||
append_deletes_hashX_history((key, existing))
|
||||
block_hashX_history_puts.append(((hashX, 0), (compact_hist_txs,)))
|
||||
if not new_history:
|
||||
continue
|
||||
|
||||
needed_tx_infos = []
|
||||
append_needed_tx_info = needed_tx_infos.append
|
||||
tx_infos = {}
|
||||
for tx_num in tx_nums:
|
||||
if tx_num in self.history_tx_info_cache:
|
||||
tx_infos[tx_num] = self.history_tx_info_cache[tx_num]
|
||||
else:
|
||||
append_needed_tx_info(tx_num)
|
||||
if needed_tx_infos:
|
||||
for tx_num, tx_hash in zip(needed_tx_infos, self.db.get_tx_hashes(needed_tx_infos)):
|
||||
tx_info = f'{tx_hash[::-1].hex()}:{bisect_right(self.db.tx_counts, tx_num):d}:'
|
||||
tx_infos[tx_num] = tx_info
|
||||
self.history_tx_info_cache[tx_num] = tx_info
|
||||
history = ''.join(map(tx_infos.__getitem__, tx_nums))
|
||||
for tx_hash, height in new_history:
|
||||
history += f'{tx_hash[::-1].hex()}:{height:d}:'
|
||||
if history:
|
||||
status = sha256(history.encode())
|
||||
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
|
||||
|
||||
self.db.prefix_db.multi_delete(block_hashX_history_deletes)
|
||||
self.db.prefix_db.hashX_history.stage_multi_put(block_hashX_history_puts)
|
||||
|
||||
def clear_after_advance_or_reorg(self):
|
||||
self.txo_to_claim.clear()
|
||||
self.claim_hash_to_txo.clear()
|
||||
|
@ -1500,9 +1555,15 @@ class BlockchainProcessorService(BlockchainService):
|
|||
if self.env.cache_all_tx_hashes:
|
||||
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
|
||||
self.db.tx_num_mapping.pop(self.db.total_transactions.pop())
|
||||
if self.tx_count in self.history_tx_info_cache:
|
||||
self.history_tx_info_cache.pop(self.tx_count)
|
||||
self.tx_count -= 1
|
||||
else:
|
||||
self.tx_count = self.db.tx_counts[-1]
|
||||
new_tx_count = self.db.tx_counts[-1]
|
||||
while self.tx_count > new_tx_count:
|
||||
if self.tx_count in self.history_tx_info_cache:
|
||||
self.history_tx_info_cache.pop(self.tx_count)
|
||||
self.tx_count -= 1
|
||||
self.height -= 1
|
||||
|
||||
# self.touched can include other addresses which is
|
||||
|
@ -1632,6 +1693,8 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self._ready_to_stop.set()
|
||||
|
||||
async def _need_catch_up(self):
|
||||
self.log.info("database has fallen behind blockchain daemon, catching up")
|
||||
|
||||
self.db.catching_up = True
|
||||
|
||||
def flush():
|
||||
|
@ -1644,6 +1707,10 @@ class BlockchainProcessorService(BlockchainService):
|
|||
|
||||
async def _finished_initial_catch_up(self):
|
||||
self.log.info(f'caught up to height {self.height}')
|
||||
|
||||
if self.env.index_address_status and self.db.last_indexed_address_status_height < self.db.db_height:
|
||||
await self.db.rebuild_hashX_status_index(self.db.last_indexed_address_status_height)
|
||||
|
||||
# Flush everything but with catching_up->False state.
|
||||
self.db.catching_up = False
|
||||
|
||||
|
@ -1659,6 +1726,9 @@ class BlockchainProcessorService(BlockchainService):
|
|||
while self.db.db_version < max(self.db.DB_VERSIONS):
|
||||
if self.db.db_version == 7:
|
||||
from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION
|
||||
elif self.db.db_version == 8:
|
||||
from scribe.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION
|
||||
self.db._index_address_status = self.env.index_address_status
|
||||
else:
|
||||
raise RuntimeError("unknown db version")
|
||||
self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")
|
||||
|
@ -1666,6 +1736,18 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self.log.info("finished migration")
|
||||
self.db.read_db_state()
|
||||
|
||||
# update the hashX status index if was off before and is now on of if requested from a height
|
||||
if (self.env.index_address_status and not self.db._index_address_status and self.db.last_indexed_address_status_height < self.db.db_height) or self.env.rebuild_address_status_from_height >= 0:
|
||||
starting_height = self.db.last_indexed_address_status_height
|
||||
if self.env.rebuild_address_status_from_height >= 0:
|
||||
starting_height = self.env.rebuild_address_status_from_height
|
||||
yield self.db.rebuild_hashX_status_index(starting_height)
|
||||
elif self.db._index_address_status and not self.env.index_address_status:
|
||||
self.log.warning("turned off address indexing at block %i", self.db.db_height)
|
||||
self.db._index_address_status = False
|
||||
self.db.write_db_state()
|
||||
self.db.prefix_db.unsafe_commit()
|
||||
|
||||
self.height = self.db.db_height
|
||||
self.tip = self.db.db_tip
|
||||
self.tx_count = self.db.db_tx_count
|
||||
|
|
103
scribe/db/db.py
103
scribe/db/db.py
|
@ -18,7 +18,7 @@ from scribe.schema.url import URL, normalize_name
|
|||
from scribe.schema.claim import guess_stream_type
|
||||
from scribe.schema.result import Censor
|
||||
from scribe.blockchain.transaction import TxInput
|
||||
from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics
|
||||
from scribe.common import hash_to_hex_str, hash160, LRUCacheWithMetrics, sha256
|
||||
from scribe.db.merkle import Merkle, MerkleCache, FastMerkleCacheItem
|
||||
from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedResolveResult, DBError, UTXO
|
||||
from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
|
||||
|
@ -34,12 +34,13 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
|
|||
|
||||
|
||||
class HubDB:
|
||||
DB_VERSIONS = [7, 8]
|
||||
DB_VERSIONS = [7, 8, 9]
|
||||
|
||||
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None):
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.coin = coin
|
||||
self._executor = executor
|
||||
|
@ -52,6 +53,7 @@ class HubDB:
|
|||
if secondary_name:
|
||||
assert max_open_files == -1, 'max open files must be -1 for secondary readers'
|
||||
self._db_max_open_files = max_open_files
|
||||
self._index_address_status = index_address_status
|
||||
self.prefix_db: typing.Optional[PrefixDB] = None
|
||||
|
||||
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
|
||||
|
@ -61,6 +63,7 @@ class HubDB:
|
|||
self.hist_comp_cursor = -1
|
||||
|
||||
self.es_sync_height = 0
|
||||
self.last_indexed_address_status_height = 0
|
||||
|
||||
# blocking/filtering dicts
|
||||
blocking_channels = blocking_channel_ids or []
|
||||
|
@ -80,7 +83,7 @@ class HubDB:
|
|||
self.tx_counts = None
|
||||
self.headers = None
|
||||
self.block_hashes = None
|
||||
self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace=NAMESPACE)
|
||||
self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE)
|
||||
self.last_flush = time.time()
|
||||
|
||||
# Header merkle cache
|
||||
|
@ -826,6 +829,8 @@ class HubDB:
|
|||
# read db state
|
||||
self.read_db_state()
|
||||
|
||||
self.logger.info("index address statuses: %s", self._index_address_status)
|
||||
|
||||
# These are our state as we move ahead of DB state
|
||||
self.fs_height = self.db_height
|
||||
self.fs_tx_count = self.db_tx_count
|
||||
|
@ -860,7 +865,79 @@ class HubDB:
|
|||
self.prefix_db.close()
|
||||
self.prefix_db = None
|
||||
|
||||
def get_hashX_status(self, hashX: bytes):
|
||||
def _rebuild_hashX_status_index(self, start_height: int):
|
||||
self.logger.warning("rebuilding the address status index...")
|
||||
prefix_db = self.prefix_db
|
||||
|
||||
def hashX_iterator():
|
||||
last_hashX = None
|
||||
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
|
||||
hashX = k[1:12]
|
||||
if last_hashX is None:
|
||||
last_hashX = hashX
|
||||
if last_hashX != hashX:
|
||||
yield hashX
|
||||
last_hashX = hashX
|
||||
if last_hashX:
|
||||
yield last_hashX
|
||||
|
||||
def hashX_status_from_history(history: bytes) -> bytes:
|
||||
tx_counts = self.tx_counts
|
||||
hist_tx_nums = array.array('I')
|
||||
hist_tx_nums.frombytes(history)
|
||||
hist = ''
|
||||
for tx_num in hist_tx_nums:
|
||||
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
|
||||
return sha256(hist.encode())
|
||||
|
||||
start = time.perf_counter()
|
||||
|
||||
if start_height <= 0:
|
||||
self.logger.info("loading all blockchain addresses, this will take a little while...")
|
||||
hashXs = [hashX for hashX in hashX_iterator()]
|
||||
else:
|
||||
self.logger.info("loading addresses since block %i...", start_height)
|
||||
hashXs = set()
|
||||
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
|
||||
include_key=False):
|
||||
hashXs.update(touched.touched_hashXs)
|
||||
hashXs = list(hashXs)
|
||||
|
||||
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
|
||||
f"now building the status index...")
|
||||
op_cnt = 0
|
||||
hashX_cnt = 0
|
||||
for hashX in hashXs:
|
||||
hashX_cnt += 1
|
||||
key = prefix_db.hashX_status.pack_key(hashX)
|
||||
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
||||
status = hashX_status_from_history(history)
|
||||
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
||||
if existing_status and existing_status == status:
|
||||
continue
|
||||
elif not existing_status:
|
||||
prefix_db.stage_raw_put(key, status)
|
||||
op_cnt += 1
|
||||
else:
|
||||
prefix_db.stage_raw_delete(key, existing_status)
|
||||
prefix_db.stage_raw_put(key, status)
|
||||
op_cnt += 2
|
||||
if op_cnt > 100000:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
op_cnt = 0
|
||||
if op_cnt:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
self._index_address_status = True
|
||||
self.write_db_state()
|
||||
self.prefix_db.unsafe_commit()
|
||||
self.logger.info("finished indexing address statuses")
|
||||
|
||||
def rebuild_hashX_status_index(self, start_height: int):
|
||||
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
|
||||
|
||||
def _get_hashX_status(self, hashX: bytes):
|
||||
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
|
||||
if mempool_status:
|
||||
return mempool_status.hex()
|
||||
|
@ -868,6 +945,9 @@ class HubDB:
|
|||
if status:
|
||||
return status.hex()
|
||||
|
||||
async def get_hashX_status(self, hashX: bytes):
|
||||
return await asyncio.get_event_loop().run_in_executor(self._executor, self._get_hashX_status, hashX)
|
||||
|
||||
def get_tx_hash(self, tx_num: int) -> bytes:
|
||||
if self._cache_all_tx_hashes:
|
||||
return self.total_transactions[tx_num]
|
||||
|
@ -1119,13 +1199,18 @@ class HubDB:
|
|||
|
||||
def write_db_state(self):
|
||||
"""Write (UTXO) state to the batch."""
|
||||
last_indexed_address_status = 0
|
||||
if self.db_height > 0:
|
||||
self.prefix_db.db_state.stage_delete((), self.prefix_db.db_state.get())
|
||||
existing = self.prefix_db.db_state.get()
|
||||
last_indexed_address_status = existing.hashX_status_last_indexed_height
|
||||
self.prefix_db.db_state.stage_delete((), existing.expanded)
|
||||
if self._index_address_status:
|
||||
last_indexed_address_status = self.db_height
|
||||
self.prefix_db.db_state.stage_put((), (
|
||||
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
||||
self.utxo_flush_count, int(self.wall_time), self.catching_up, self.db_version,
|
||||
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
|
||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
|
||||
self.es_sync_height
|
||||
self.es_sync_height, last_indexed_address_status
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -1145,6 +1230,7 @@ class HubDB:
|
|||
self.hist_comp_cursor = -1
|
||||
self.hist_db_version = max(self.DB_VERSIONS)
|
||||
self.es_sync_height = 0
|
||||
self.last_indexed_address_status_height = 0
|
||||
else:
|
||||
self.db_version = state.db_version
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
|
@ -1166,6 +1252,7 @@ class HubDB:
|
|||
self.hist_comp_cursor = state.comp_cursor
|
||||
self.hist_db_version = state.db_version
|
||||
self.es_sync_height = state.es_sync_height
|
||||
self.last_indexed_address_status_height = state.hashX_status_last_indexed_height
|
||||
return state
|
||||
|
||||
def assert_db_state(self):
|
||||
|
|
|
@ -101,6 +101,9 @@ class PrefixRow(metaclass=PrefixRowType):
|
|||
handle_value(result[packed_keys[tuple(k_args)]]) for k_args in key_args
|
||||
]
|
||||
|
||||
def stage_multi_put(self, items):
|
||||
self._op_stack.multi_put([RevertablePut(self.pack_key(*k), self.pack_value(*v)) for k, v in items])
|
||||
|
||||
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
|
||||
packed_key = self.pack_key(*key_args)
|
||||
last_op = self._op_stack.get_last_op_for_key(packed_key)
|
||||
|
@ -178,7 +181,7 @@ class BasePrefixDB:
|
|||
cf = self._db.get_column_family(prefix.value)
|
||||
self.column_families[prefix.value] = cf
|
||||
|
||||
self._op_stack = RevertableOpStack(self.get, unsafe_prefixes=unsafe_prefixes)
|
||||
self._op_stack = RevertableOpStack(self.get, self.multi_get, unsafe_prefixes=unsafe_prefixes)
|
||||
self._max_undo_depth = max_undo_depth
|
||||
|
||||
def unsafe_commit(self):
|
||||
|
@ -259,6 +262,17 @@ class BasePrefixDB:
|
|||
cf = self.column_families[key[:1]]
|
||||
return self._db.get((cf, key), fill_cache=fill_cache)
|
||||
|
||||
def multi_get(self, keys: typing.List[bytes], fill_cache=True):
|
||||
first_key = keys[0]
|
||||
if not all(first_key[0] == key[0] for key in keys):
|
||||
raise ValueError('cannot multi-delete across column families')
|
||||
cf = self.column_families[first_key[:1]]
|
||||
db_result = self._db.multi_get([(cf, k) for k in keys], fill_cache=fill_cache)
|
||||
return list(db_result.values())
|
||||
|
||||
def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]):
|
||||
self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items])
|
||||
|
||||
def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None,
|
||||
iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None,
|
||||
reverse: bool = False, include_key: bool = True, include_value: bool = True,
|
||||
|
|
26
scribe/db/migrators/migrate8to9.py
Normal file
26
scribe/db/migrators/migrate8to9.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
import logging
|
||||
|
||||
FROM_VERSION = 8
|
||||
TO_VERSION = 9
|
||||
|
||||
|
||||
def migrate(db):
|
||||
log = logging.getLogger(__name__)
|
||||
prefix_db = db.prefix_db
|
||||
index_address_status = db._index_address_status
|
||||
|
||||
log.info("migrating the db to version 9")
|
||||
|
||||
if not index_address_status:
|
||||
log.info("deleting the existing address status index")
|
||||
to_delete = list(prefix_db.hashX_status.iterate(deserialize_key=False, deserialize_value=False))
|
||||
while to_delete:
|
||||
batch, to_delete = to_delete[:10000], to_delete[10000:]
|
||||
if batch:
|
||||
prefix_db.multi_delete(batch)
|
||||
prefix_db.unsafe_commit()
|
||||
|
||||
db.db_version = 9
|
||||
db.write_db_state()
|
||||
db.prefix_db.unsafe_commit()
|
||||
log.info("finished migration")
|
|
@ -420,12 +420,40 @@ class DBState(typing.NamedTuple):
|
|||
tip: bytes
|
||||
utxo_flush_count: int
|
||||
wall_time: int
|
||||
catching_up: bool
|
||||
bit_fields: int
|
||||
db_version: int
|
||||
hist_flush_count: int
|
||||
comp_flush_count: int
|
||||
comp_cursor: int
|
||||
es_sync_height: int
|
||||
hashX_status_last_indexed_height: int
|
||||
|
||||
@property
|
||||
def catching_up(self) -> bool:
|
||||
return self.bit_fields & 1 == 1
|
||||
|
||||
@property
|
||||
def index_address_statuses(self) -> bool:
|
||||
return self.bit_fields & 2 == 1
|
||||
|
||||
@property
|
||||
def expanded(self):
|
||||
return (
|
||||
self.genesis,
|
||||
self.height,
|
||||
self.tx_count,
|
||||
self.tip,
|
||||
self.utxo_flush_count,
|
||||
self.wall_time,
|
||||
self.catching_up,
|
||||
self.index_address_statuses,
|
||||
self.db_version,
|
||||
self.hist_flush_count,
|
||||
self.comp_flush_count,
|
||||
self.comp_cursor,
|
||||
self.es_sync_height,
|
||||
self.hashX_status_last_indexed_height
|
||||
)
|
||||
|
||||
|
||||
class ActiveAmountPrefixRow(PrefixRow):
|
||||
|
@ -1420,7 +1448,7 @@ class SupportAmountPrefixRow(PrefixRow):
|
|||
|
||||
class DBStatePrefixRow(PrefixRow):
|
||||
prefix = DB_PREFIXES.db_state.value
|
||||
value_struct = struct.Struct(b'>32sLL32sLLBBlllL')
|
||||
value_struct = struct.Struct(b'>32sLL32sLLBBlllLL')
|
||||
key_struct = struct.Struct(b'')
|
||||
|
||||
key_part_lambdas = [
|
||||
|
@ -1437,12 +1465,16 @@ class DBStatePrefixRow(PrefixRow):
|
|||
|
||||
@classmethod
|
||||
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
|
||||
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
comp_cursor: int, es_sync_height: int) -> bytes:
|
||||
catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int,
|
||||
comp_flush_count: int, comp_cursor: int, es_sync_height: int,
|
||||
last_indexed_address_statuses: int) -> bytes:
|
||||
bit_fields = 0
|
||||
bit_fields |= int(catching_up) << 0
|
||||
bit_fields |= int(index_address_statuses) << 1
|
||||
return super().pack_value(
|
||||
genesis, height, tx_count, tip, utxo_flush_count,
|
||||
wall_time, 1 if catching_up else 0, db_version, hist_flush_count,
|
||||
comp_flush_count, comp_cursor, es_sync_height
|
||||
wall_time, bit_fields, db_version, hist_flush_count,
|
||||
comp_flush_count, comp_cursor, es_sync_height, last_indexed_address_statuses
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
@ -1451,15 +1483,18 @@ class DBStatePrefixRow(PrefixRow):
|
|||
# TODO: delete this after making a new snapshot - 10/20/21
|
||||
# migrate in the es_sync_height if it doesnt exist
|
||||
data += data[32:36]
|
||||
if len(data) == 98:
|
||||
data += data[32:36]
|
||||
return DBState(*super().unpack_value(data))
|
||||
|
||||
@classmethod
|
||||
def pack_item(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
|
||||
catching_up: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
|
||||
comp_cursor: int, es_sync_height: int):
|
||||
catching_up: bool, index_address_statuses: bool, db_version: int, hist_flush_count: int,
|
||||
comp_flush_count: int, comp_cursor: int, es_sync_height: int, last_indexed_address_statuses: int):
|
||||
return cls.pack_key(), cls.pack_value(
|
||||
genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, db_version, hist_flush_count,
|
||||
comp_flush_count, comp_cursor, es_sync_height
|
||||
genesis, height, tx_count, tip, utxo_flush_count, wall_time, catching_up, index_address_statuses,
|
||||
db_version, hist_flush_count, comp_flush_count, comp_cursor, es_sync_height,
|
||||
last_indexed_address_statuses
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ import struct
|
|||
import logging
|
||||
from string import printable
|
||||
from collections import defaultdict
|
||||
from typing import Tuple, Iterable, Callable, Optional
|
||||
from typing import Tuple, Iterable, Callable, Optional, List
|
||||
from scribe.db.common import DB_PREFIXES
|
||||
|
||||
_OP_STRUCT = struct.Struct('>BLL')
|
||||
|
@ -82,7 +82,8 @@ class OpStackIntegrity(Exception):
|
|||
|
||||
|
||||
class RevertableOpStack:
|
||||
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
|
||||
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]],
|
||||
multi_get_fn: Callable[[List[bytes]], Iterable[Optional[bytes]]], unsafe_prefixes=None):
|
||||
"""
|
||||
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
|
||||
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
|
||||
|
@ -95,6 +96,7 @@ class RevertableOpStack:
|
|||
:param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged
|
||||
"""
|
||||
self._get = get_fn
|
||||
self._multi_get = multi_get_fn
|
||||
self._items = defaultdict(list)
|
||||
self._unsafe_prefixes = unsafe_prefixes or set()
|
||||
|
||||
|
@ -133,6 +135,88 @@ class RevertableOpStack:
|
|||
raise err
|
||||
self._items[op.key].append(op)
|
||||
|
||||
def multi_put(self, ops: List[RevertablePut]):
|
||||
"""
|
||||
Apply a put or delete op, checking that it introduces no integrity errors
|
||||
"""
|
||||
|
||||
if not ops:
|
||||
return
|
||||
|
||||
need_put = []
|
||||
|
||||
if not all(op.is_put for op in ops):
|
||||
raise ValueError(f"list must contain only puts")
|
||||
if not len(set(map(lambda op: op.key, ops))) == len(ops):
|
||||
raise ValueError(f"list must contain unique keys")
|
||||
|
||||
for op in ops:
|
||||
if self._items[op.key] and op.invert() == self._items[op.key][-1]:
|
||||
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
|
||||
continue
|
||||
elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op
|
||||
continue # raise an error?
|
||||
else:
|
||||
need_put.append(op)
|
||||
|
||||
for op, stored_val in zip(need_put, self._multi_get(list(map(lambda item: item.key, need_put)))):
|
||||
has_stored_val = stored_val is not None
|
||||
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
|
||||
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
|
||||
try:
|
||||
if has_stored_val and not will_delete_existing_stored:
|
||||
raise OpStackIntegrity(f"db op tries to overwrite before deleting existing: {op}")
|
||||
except OpStackIntegrity as err:
|
||||
if op.key[:1] in self._unsafe_prefixes:
|
||||
log.debug(f"skipping over integrity error: {err}")
|
||||
else:
|
||||
raise err
|
||||
self._items[op.key].append(op)
|
||||
|
||||
def multi_delete(self, ops: List[RevertableDelete]):
|
||||
"""
|
||||
Apply a put or delete op, checking that it introduces no integrity errors
|
||||
"""
|
||||
|
||||
if not ops:
|
||||
return
|
||||
|
||||
need_delete = []
|
||||
|
||||
if not all(op.is_delete for op in ops):
|
||||
raise ValueError(f"list must contain only deletes")
|
||||
if not len(set(map(lambda op: op.key, ops))) == len(ops):
|
||||
raise ValueError(f"list must contain unique keys")
|
||||
|
||||
for op in ops:
|
||||
if self._items[op.key] and op.invert() == self._items[op.key][-1]:
|
||||
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
|
||||
continue
|
||||
elif self._items[op.key] and self._items[op.key][-1] == op: # duplicate of last op
|
||||
continue # raise an error?
|
||||
else:
|
||||
need_delete.append(op)
|
||||
|
||||
for op, stored_val in zip(need_delete, self._multi_get(list(map(lambda item: item.key, need_delete)))):
|
||||
has_stored_val = stored_val is not None
|
||||
delete_stored_op = None if not has_stored_val else RevertableDelete(op.key, stored_val)
|
||||
will_delete_existing_stored = False if delete_stored_op is None else (delete_stored_op in self._items[op.key])
|
||||
try:
|
||||
if op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
|
||||
# there is a value and we're not deleting it in this op
|
||||
# check that a delete for the stored value is in the stack
|
||||
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}")
|
||||
elif not stored_val:
|
||||
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
|
||||
elif op.is_delete and stored_val != op.value:
|
||||
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
|
||||
except OpStackIntegrity as err:
|
||||
if op.key[:1] in self._unsafe_prefixes:
|
||||
log.debug(f"skipping over integrity error: {err}")
|
||||
else:
|
||||
raise err
|
||||
self._items[op.key].append(op)
|
||||
|
||||
def extend_ops(self, ops: Iterable[RevertableOp]):
|
||||
"""
|
||||
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
|
||||
|
|
|
@ -31,7 +31,7 @@ class Env:
|
|||
|
||||
def __init__(self, db_dir=None, max_query_workers=None, chain=None, reorg_limit=None,
|
||||
prometheus_port=None, cache_all_tx_hashes=None, cache_all_claim_txos=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None):
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, index_address_status=None):
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
|
||||
|
@ -52,6 +52,8 @@ class Env:
|
|||
'BLOCKING_CHANNEL_IDS', '').split(' ')
|
||||
self.filtering_channel_ids = filtering_channel_ids if filtering_channel_ids is not None else self.default(
|
||||
'FILTERING_CHANNEL_IDS', '').split(' ')
|
||||
self.index_address_status = index_address_status if index_address_status is not None else \
|
||||
self.boolean('INDEX_ADDRESS_STATUS', False)
|
||||
|
||||
@classmethod
|
||||
def default(cls, envvar, default):
|
||||
|
@ -187,6 +189,12 @@ class Env:
|
|||
"Claims that are reposted by these channels aren't returned in search results. "
|
||||
"Can be set in env with 'FILTERING_CHANNEL_IDS'",
|
||||
default=cls.default('FILTERING_CHANNEL_IDS', '').split(' '))
|
||||
parser.add_argument('--index_address_statuses', action='store_true',
|
||||
help="Use precomputed address statuses, must be enabled in the reader and the writer to "
|
||||
"use it. If disabled (the default), the status of an address must be calculated at "
|
||||
"runtime when clients request it (address subscriptions, address history sync). "
|
||||
"If enabled, scribe will maintain an index of precomputed statuses",
|
||||
default=cls.boolean('INDEX_ADDRESS_STATUS', False))
|
||||
|
||||
@classmethod
|
||||
def from_arg_parser(cls, args):
|
||||
|
|
|
@ -10,9 +10,10 @@ class ServerEnv(Env):
|
|||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
||||
database_query_timeout=None, elastic_notifier_host=None, elastic_notifier_port=None,
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None):
|
||||
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None,
|
||||
index_address_status=None):
|
||||
super().__init__(db_dir, max_query_workers, chain, reorg_limit, prometheus_port, cache_all_tx_hashes,
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids)
|
||||
cache_all_claim_txos, blocking_channel_ids, filtering_channel_ids, index_address_status)
|
||||
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
|
||||
self.host = host if host is not None else self.default('HOST', 'localhost')
|
||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
||||
|
@ -109,5 +110,5 @@ class ServerEnv(Env):
|
|||
drop_client=args.drop_client, description=args.description, daily_fee=args.daily_fee,
|
||||
database_query_timeout=args.query_timeout_ms, blocking_channel_ids=args.blocking_channel_ids,
|
||||
filtering_channel_ids=args.filtering_channel_ids, elastic_notifier_host=args.elastic_notifier_host,
|
||||
elastic_notifier_port=args.elastic_notifier_port
|
||||
elastic_notifier_port=args.elastic_notifier_port, index_address_status=args.index_address_statuses
|
||||
)
|
||||
|
|
|
@ -157,6 +157,14 @@ class HubMemPool:
|
|||
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
|
||||
return result
|
||||
|
||||
def mempool_history(self, hashX: bytes) -> str:
|
||||
result = ''
|
||||
for tx_hash in self.touched_hashXs.get(hashX, ()):
|
||||
if tx_hash not in self.txs:
|
||||
continue # the tx hash for the touched address is an input that isn't in mempool anymore
|
||||
result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:'
|
||||
return result
|
||||
|
||||
def unordered_UTXOs(self, hashX):
|
||||
"""Return an unordered list of UTXO named tuples from mempool
|
||||
transactions that pay to hashX.
|
||||
|
@ -276,7 +284,6 @@ class HubMemPool:
|
|||
if session.subscribe_headers and height_changed:
|
||||
sent_headers += 1
|
||||
self._notification_q.put_nowait((session_id, height_changed, hashXes))
|
||||
|
||||
if sent_headers:
|
||||
self.logger.info(f'notified {sent_headers} sessions of new block header')
|
||||
if session_hashxes_to_notify:
|
||||
|
|
|
@ -128,7 +128,7 @@ class SessionManager:
|
|||
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
|
||||
labelnames=("version",))
|
||||
request_count_metric = Counter("requests_count", "Number of requests received", namespace=NAMESPACE,
|
||||
labelnames=("method", "version"))
|
||||
labelnames=("method",))
|
||||
tx_request_count_metric = Counter("requested_transaction", "Number of transactions requested", namespace=NAMESPACE)
|
||||
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
|
||||
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
|
||||
|
@ -644,9 +644,9 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
MAX_CHUNK_SIZE = 40960
|
||||
session_counter = itertools.count()
|
||||
RESPONSE_TIMES = Histogram("response_time", "Response times", namespace=NAMESPACE,
|
||||
labelnames=("method", "version"), buckets=HISTOGRAM_BUCKETS)
|
||||
labelnames=("method",), buckets=HISTOGRAM_BUCKETS)
|
||||
NOTIFICATION_COUNT = Counter("notification", "Number of notifications sent (for subscriptions)",
|
||||
namespace=NAMESPACE, labelnames=("method", "version"))
|
||||
namespace=NAMESPACE, labelnames=("method",))
|
||||
REQUEST_ERRORS_COUNT = Counter(
|
||||
"request_error", "Number of requests that returned errors", namespace=NAMESPACE,
|
||||
labelnames=("method", "version")
|
||||
|
@ -793,7 +793,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
"""Handle an incoming request. ElectrumX doesn't receive
|
||||
notifications from client sessions.
|
||||
"""
|
||||
self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc()
|
||||
self.session_manager.request_count_metric.labels(method=request.method).inc()
|
||||
|
||||
if isinstance(request, Request):
|
||||
method = request.method
|
||||
|
@ -981,10 +981,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
'internal server error')
|
||||
if isinstance(request, Request):
|
||||
message = request.send_result(result)
|
||||
self.RESPONSE_TIMES.labels(
|
||||
method=request.method,
|
||||
version=self.client_version
|
||||
).observe(time.perf_counter() - start)
|
||||
self.RESPONSE_TIMES.labels(method=request.method).observe(time.perf_counter() - start)
|
||||
if message:
|
||||
await self._send_message(message)
|
||||
if isinstance(result, Exception):
|
||||
|
@ -1014,7 +1011,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
async def send_notification(self, method, args=()) -> bool:
|
||||
"""Send an RPC notification over the network."""
|
||||
message = self.connection.send_notification(Notification(method, args))
|
||||
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
|
||||
self.NOTIFICATION_COUNT.labels(method=method).inc()
|
||||
try:
|
||||
await self._send_message(message)
|
||||
return True
|
||||
|
@ -1089,7 +1086,16 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
return len(self.hashX_subs)
|
||||
|
||||
async def get_hashX_status(self, hashX: bytes):
|
||||
return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX)
|
||||
if self.env.index_address_status:
|
||||
return await self.db.get_hashX_status(hashX)
|
||||
history = ''.join(
|
||||
f"{tx_hash[::-1].hex()}:{height:d}:"
|
||||
for tx_hash, height in await self.db.limited_history(hashX, limit=None)
|
||||
) + self.mempool.mempool_history(hashX)
|
||||
if not history:
|
||||
return
|
||||
status = sha256(history.encode())
|
||||
return status.hex()
|
||||
|
||||
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
|
||||
notifications = []
|
||||
|
@ -1110,7 +1116,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
start = time.perf_counter()
|
||||
self.session_manager.notifications_in_flight_metric.inc()
|
||||
for method, args in notifications:
|
||||
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
|
||||
self.NOTIFICATION_COUNT.labels(method=method,).inc()
|
||||
try:
|
||||
await self.send_notifications(
|
||||
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
|
||||
|
|
|
@ -30,7 +30,8 @@ class BlockchainService:
|
|||
self.db = HubDB(
|
||||
env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
||||
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
)
|
||||
self._stopping = False
|
||||
|
||||
|
|
Loading…
Reference in a new issue