From 0901f67d89a17b403dd43aed939dd3f056af3d58 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 25 May 2022 16:30:57 -0400 Subject: [PATCH] split up secondary/primary db classes --- hub/db/__init__.py | 2 +- hub/db/db.py | 389 ++------------------------------ hub/db/migrators/migrate7to8.py | 4 +- hub/elastic_sync/db.py | 253 +++++++++++++++++++++ hub/elastic_sync/service.py | 10 + hub/herald/db.py | 33 +++ hub/herald/mempool.py | 4 +- hub/herald/search.py | 4 +- hub/herald/service.py | 11 +- hub/herald/session.py | 4 +- hub/scribe/db.py | 117 ++++++++++ hub/scribe/mempool.py | 4 +- hub/scribe/service.py | 12 +- hub/service.py | 24 +- 14 files changed, 482 insertions(+), 389 deletions(-) create mode 100644 hub/elastic_sync/db.py create mode 100644 hub/herald/db.py create mode 100644 hub/scribe/db.py diff --git a/hub/db/__init__.py b/hub/db/__init__.py index 0b2ebca..dc6cf05 100644 --- a/hub/db/__init__.py +++ b/hub/db/__init__.py @@ -1 +1 @@ -from .db import HubDB +from .db import SecondaryDB diff --git a/hub/db/db.py b/hub/db/db.py index 117501f..e2110d4 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -33,19 +33,18 @@ TXO_STRUCT_pack = TXO_STRUCT.pack NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db" -class HubDB: +class SecondaryDB: DB_VERSIONS = [7, 8, 9] - def __init__(self, coin, db_dir: str, reorg_limit: int = 200, + def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, - secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, + blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, index_address_status=False): self.logger = logging.getLogger(__name__) self.coin = coin self._executor = executor self._db_dir = db_dir - self._reorg_limit = reorg_limit self._cache_all_claim_txos = cache_all_claim_txos self._cache_all_tx_hashes = cache_all_tx_hashes @@ -81,7 +80,7 @@ class HubDB: } self.tx_counts = None - self.headers = None + # self.headers = None self.block_hashes = None self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE) self.last_flush = time.time() @@ -470,245 +469,6 @@ class HubDB: self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex()) return - def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult): - metadata = self.get_claim_metadata(claim.tx_hash, claim.position) - if not metadata: - return - metadata = metadata - if not metadata.is_stream or not metadata.stream.has_fee: - fee_amount = 0 - else: - fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000) - if fee_amount >= 9223372036854775807: - return - reposted_claim_hash = claim.reposted_claim_hash - reposted_claim = None - reposted_metadata = None - if reposted_claim_hash: - reposted_claim = self.get_cached_claim_txo(reposted_claim_hash) - if not reposted_claim: - return - reposted_metadata = self.get_claim_metadata( - self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position - ) - if not reposted_metadata: - return - reposted_tags = [] - reposted_languages = [] - reposted_has_source = False - reposted_claim_type = None - reposted_stream_type = None - reposted_media_type = None - reposted_fee_amount = None - reposted_fee_currency = None - reposted_duration = None - if reposted_claim: - raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False) - try: - reposted_metadata = self.coin.transaction( - raw_reposted_claim_tx - ).outputs[reposted_claim.position].metadata - except: - self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s", - claim.reposted_claim_hash.hex(), claim_hash.hex()) - return - if reposted_metadata: - if reposted_metadata.is_stream: - meta = reposted_metadata.stream - elif reposted_metadata.is_channel: - meta = reposted_metadata.channel - elif reposted_metadata.is_collection: - meta = reposted_metadata.collection - elif reposted_metadata.is_repost: - meta = reposted_metadata.repost - else: - return - reposted_tags = [tag for tag in meta.tags] - reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none'] - reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source - reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type] - reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \ - if reposted_has_source else 0 - reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0 - if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee: - reposted_fee_amount = 0 - else: - reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000) - if reposted_fee_amount >= 9223372036854775807: - return - reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency - reposted_duration = None - if reposted_metadata.is_stream and \ - (reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration): - reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration - if metadata.is_stream: - meta = metadata.stream - elif metadata.is_channel: - meta = metadata.channel - elif metadata.is_collection: - meta = metadata.collection - elif metadata.is_repost: - meta = metadata.repost - else: - return - claim_tags = [tag for tag in meta.tags] - claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none'] - - tags = list(set(claim_tags).union(set(reposted_tags))) - languages = list(set(claim_languages).union(set(reposted_languages))) - blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get( - reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get( - reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash) - filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get( - reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get( - reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash) - value = { - 'claim_id': claim_hash.hex(), - 'claim_name': claim.name, - 'normalized_name': claim.normalized_name, - 'tx_id': claim.tx_hash[::-1].hex(), - 'tx_num': claim.tx_num, - 'tx_nout': claim.position, - 'amount': claim.amount, - 'timestamp': self.estimate_timestamp(claim.height), - 'creation_timestamp': self.estimate_timestamp(claim.creation_height), - 'height': claim.height, - 'creation_height': claim.creation_height, - 'activation_height': claim.activation_height, - 'expiration_height': claim.expiration_height, - 'effective_amount': claim.effective_amount, - 'support_amount': claim.support_amount, - 'is_controlling': bool(claim.is_controlling), - 'last_take_over_height': claim.last_takeover_height, - 'short_url': claim.short_url, - 'canonical_url': claim.canonical_url, - 'title': None if not metadata.is_stream else metadata.stream.title, - 'author': None if not metadata.is_stream else metadata.stream.author, - 'description': None if not metadata.is_stream else metadata.stream.description, - 'claim_type': CLAIM_TYPES[metadata.claim_type], - 'has_source': reposted_has_source if metadata.is_repost else ( - False if not metadata.is_stream else metadata.stream.has_source), - 'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None, - 'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)] - if metadata.is_stream and metadata.stream.has_source - else reposted_stream_type if metadata.is_repost else 0, - 'media_type': metadata.stream.source.media_type - if metadata.is_stream else reposted_media_type if metadata.is_repost else None, - 'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount, - 'fee_currency': metadata.stream.fee.currency - if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None, - 'repost_count': self.get_reposted_count(claim_hash), - 'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(), - 'reposted_claim_type': reposted_claim_type, - 'reposted_has_source': reposted_has_source, - 'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(), - 'public_key_id': None if not metadata.is_channel else - self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)), - 'signature': (metadata.signature or b'').hex() or None, - # 'signature_digest': metadata.signature, - 'is_signature_valid': bool(claim.signature_valid), - 'tags': tags, - 'languages': languages, - 'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED, - 'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None, - 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash), - 'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(), - 'reposted_tx_position': claim.reposted_tx_position, - 'reposted_height': claim.reposted_height, - 'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(), - 'channel_tx_position': claim.channel_tx_position, - 'channel_height': claim.channel_height, - } - - if metadata.is_repost and reposted_duration is not None: - value['duration'] = reposted_duration - elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration): - value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration - if metadata.is_stream: - value['release_time'] = metadata.stream.release_time or value['creation_timestamp'] - elif metadata.is_repost or metadata.is_collection: - value['release_time'] = value['creation_timestamp'] - return value - - async def all_claims_producer(self, batch_size=500_000): - batch = [] - if self._cache_all_claim_txos: - claim_iterator = self.claim_to_txo.items() - else: - claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate()) - - for claim_hash, claim_txo in claim_iterator: - # TODO: fix the couple of claim txos that dont have controlling names - if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): - continue - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if claim: - batch.append(claim) - if len(batch) == batch_size: - batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits - for claim in batch: - meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if meta: - yield meta - batch.clear() - batch.sort(key=lambda x: x.tx_hash) - for claim in batch: - meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if meta: - yield meta - batch.clear() - - def claim_producer(self, claim_hash: bytes) -> Optional[Dict]: - claim_txo = self.get_cached_claim_txo(claim_hash) - if not claim_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if not claim: - self.logger.warning("wat") - return - return self._prepare_claim_metadata(claim.claim_hash, claim) - - def claims_producer(self, claim_hashes: Set[bytes]): - batch = [] - results = [] - - for claim_hash in claim_hashes: - claim_txo = self.get_cached_claim_txo(claim_hash) - if not claim_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if claim: - batch.append(claim) - - batch.sort(key=lambda x: x.tx_hash) - - for claim in batch: - _meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if _meta: - results.append(_meta) - return results - def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list) for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)): @@ -761,20 +521,20 @@ class HubDB: ts = time.perf_counter() - start self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4)) - async def _read_headers(self): - # if self.headers is not None: - # return - - def get_headers(): - return [ - header for header in self.prefix_db.header.iterate( - start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False - ) - ] - - headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers) - assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" - self.headers = headers + # async def _read_headers(self): + # # if self.headers is not None: + # # return + # + # def get_headers(): + # return [ + # header for header in self.prefix_db.header.iterate( + # start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False + # ) + # ] + # + # headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers) + # assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" + # self.headers = headers async def _read_block_hashes(self): def get_block_hashes(): @@ -785,7 +545,7 @@ class HubDB: ] block_hashes = await asyncio.get_event_loop().run_in_executor(self._executor, get_block_hashes) - assert len(block_hashes) == len(self.headers) + # assert len(block_hashes) == len(self.headers) self.block_hashes = block_hashes async def _read_tx_hashes(self): @@ -803,11 +563,6 @@ class HubDB: ts = time.perf_counter() - start self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4)) - def estimate_timestamp(self, height: int) -> int: - if height < len(self.headers): - return struct.unpack(' bytes: - tx_counts = self.tx_counts - hist_tx_nums = array.array('I') - hist_tx_nums.frombytes(history) - hist = '' - for tx_num in hist_tx_nums: - hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:' - return sha256(hist.encode()) - - start = time.perf_counter() - - if start_height <= 0: - self.logger.info("loading all blockchain addresses, this will take a little while...") - hashXs = [hashX for hashX in hashX_iterator()] - else: - self.logger.info("loading addresses since block %i...", start_height) - hashXs = set() - for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,), - include_key=False): - hashXs.update(touched.touched_hashXs) - hashXs = list(hashXs) - - self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, " - f"now building the status index...") - op_cnt = 0 - hashX_cnt = 0 - for hashX in hashXs: - hashX_cnt += 1 - key = prefix_db.hashX_status.pack_key(hashX) - history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) - status = hashX_status_from_history(history) - existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) - if existing_status and existing_status == status: - continue - elif not existing_status: - prefix_db.stage_raw_put(key, status) - op_cnt += 1 - else: - prefix_db.stage_raw_delete(key, existing_status) - prefix_db.stage_raw_put(key, status) - op_cnt += 2 - if op_cnt > 100000: - prefix_db.unsafe_commit() - self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") - op_cnt = 0 - if op_cnt: - prefix_db.unsafe_commit() - self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") - self._index_address_status = True - self.write_db_state() - self.prefix_db.unsafe_commit() - self.logger.info("finished indexing address statuses") - - def rebuild_hashX_status_index(self, start_height: int): - return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height) - def _get_hashX_status(self, hashX: bytes): mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False) if mempool_status: @@ -1169,9 +851,9 @@ class HubDB: return {txid: tx_infos.get(txid) for txid in txids} # match ordering of the txs in the request async def fs_block_hashes(self, height, count): - if height + count > len(self.headers): - raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') - return [self.coin.header_hash(header) for header in self.headers[height:height + count]] + if height + count > self.db_height + 1: + raise DBError(f'only got {len(self.block_hashes) - height:,d} headers starting at {height:,d}, not {count:,d}') + return self.block_hashes[height:height + count] def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: txs = [] @@ -1209,33 +891,6 @@ class HubDB: """Returns a height from which we should store undo info.""" return max_height - self._reorg_limit + 1 - def apply_expiration_extension_fork(self): - # TODO: this can't be reorged - for k, v in self.prefix_db.claim_expiration.iterate(): - self.prefix_db.claim_expiration.stage_delete(k, v) - self.prefix_db.claim_expiration.stage_put( - (bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime, - k.tx_num, k.position), v - ) - self.prefix_db.unsafe_commit() - - def write_db_state(self): - """Write (UTXO) state to the batch.""" - last_indexed_address_status = 0 - if self.db_height > 0: - existing = self.prefix_db.db_state.get() - last_indexed_address_status = existing.hashX_status_last_indexed_height - self.prefix_db.db_state.stage_delete((), existing.expanded) - if self._index_address_status: - last_indexed_address_status = self.db_height - self.prefix_db.db_state.stage_put((), ( - self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, - self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version, - self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, - self.es_sync_height, last_indexed_address_status - ) - ) - def read_db_state(self): state = self.prefix_db.db_state.get() diff --git a/hub/db/migrators/migrate7to8.py b/hub/db/migrators/migrate7to8.py index cfeaae5..0c5db9b 100644 --- a/hub/db/migrators/migrate7to8.py +++ b/hub/db/migrators/migrate7to8.py @@ -5,7 +5,7 @@ import typing from bisect import bisect_right from hub.common import sha256 if typing.TYPE_CHECKING: - from hub.db.db import HubDB + from hub.scribe.db import PrimaryDB FROM_VERSION = 7 TO_VERSION = 8 @@ -35,7 +35,7 @@ def hashX_history(db: 'HubDB', hashX: bytes): return history, to_delete -def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes: +def hashX_status_from_history(db: 'PrimaryDB', history: bytes) -> bytes: tx_counts = db.tx_counts hist_tx_nums = array.array('I') hist_tx_nums.frombytes(history) diff --git a/hub/elastic_sync/db.py b/hub/elastic_sync/db.py new file mode 100644 index 0000000..96ce8fa --- /dev/null +++ b/hub/elastic_sync/db.py @@ -0,0 +1,253 @@ +from typing import Optional, Set, Dict +from hub.schema.claim import guess_stream_type +from hub.schema.result import Censor +from hub.common import hash160, STREAM_TYPES, CLAIM_TYPES +from hub.db import SecondaryDB +from hub.db.common import ResolveResult + + +class ElasticSyncDB(SecondaryDB): + def estimate_timestamp(self, height: int) -> int: + header = self.prefix_db.header.get(height, deserialize_value=False) + if header: + return int.from_bytes(header[100:104], byteorder='little') + return int(160.6855883050695 * height) + + def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult): + metadata = self.get_claim_metadata(claim.tx_hash, claim.position) + if not metadata: + return + metadata = metadata + if not metadata.is_stream or not metadata.stream.has_fee: + fee_amount = 0 + else: + fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000) + if fee_amount >= 9223372036854775807: + return + reposted_claim_hash = claim.reposted_claim_hash + reposted_claim = None + reposted_metadata = None + if reposted_claim_hash: + reposted_claim = self.get_cached_claim_txo(reposted_claim_hash) + if not reposted_claim: + return + reposted_metadata = self.get_claim_metadata( + self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position + ) + if not reposted_metadata: + return + reposted_tags = [] + reposted_languages = [] + reposted_has_source = False + reposted_claim_type = None + reposted_stream_type = None + reposted_media_type = None + reposted_fee_amount = None + reposted_fee_currency = None + reposted_duration = None + if reposted_claim: + raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False) + try: + reposted_metadata = self.coin.transaction( + raw_reposted_claim_tx + ).outputs[reposted_claim.position].metadata + except: + self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s", + claim.reposted_claim_hash.hex(), claim_hash.hex()) + return + if reposted_metadata: + if reposted_metadata.is_stream: + meta = reposted_metadata.stream + elif reposted_metadata.is_channel: + meta = reposted_metadata.channel + elif reposted_metadata.is_collection: + meta = reposted_metadata.collection + elif reposted_metadata.is_repost: + meta = reposted_metadata.repost + else: + return + reposted_tags = [tag for tag in meta.tags] + reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none'] + reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source + reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type] + reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \ + if reposted_has_source else 0 + reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0 + if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee: + reposted_fee_amount = 0 + else: + reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000) + if reposted_fee_amount >= 9223372036854775807: + return + reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency + reposted_duration = None + if reposted_metadata.is_stream and \ + (reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration): + reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration + if metadata.is_stream: + meta = metadata.stream + elif metadata.is_channel: + meta = metadata.channel + elif metadata.is_collection: + meta = metadata.collection + elif metadata.is_repost: + meta = metadata.repost + else: + return + claim_tags = [tag for tag in meta.tags] + claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none'] + + tags = list(set(claim_tags).union(set(reposted_tags))) + languages = list(set(claim_languages).union(set(reposted_languages))) + blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get( + reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get( + reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash) + filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get( + reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get( + reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash) + value = { + 'claim_id': claim_hash.hex(), + 'claim_name': claim.name, + 'normalized_name': claim.normalized_name, + 'tx_id': claim.tx_hash[::-1].hex(), + 'tx_num': claim.tx_num, + 'tx_nout': claim.position, + 'amount': claim.amount, + 'timestamp': self.estimate_timestamp(claim.height), + 'creation_timestamp': self.estimate_timestamp(claim.creation_height), + 'height': claim.height, + 'creation_height': claim.creation_height, + 'activation_height': claim.activation_height, + 'expiration_height': claim.expiration_height, + 'effective_amount': claim.effective_amount, + 'support_amount': claim.support_amount, + 'is_controlling': bool(claim.is_controlling), + 'last_take_over_height': claim.last_takeover_height, + 'short_url': claim.short_url, + 'canonical_url': claim.canonical_url, + 'title': None if not metadata.is_stream else metadata.stream.title, + 'author': None if not metadata.is_stream else metadata.stream.author, + 'description': None if not metadata.is_stream else metadata.stream.description, + 'claim_type': CLAIM_TYPES[metadata.claim_type], + 'has_source': reposted_has_source if metadata.is_repost else ( + False if not metadata.is_stream else metadata.stream.has_source), + 'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None, + 'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)] + if metadata.is_stream and metadata.stream.has_source + else reposted_stream_type if metadata.is_repost else 0, + 'media_type': metadata.stream.source.media_type + if metadata.is_stream else reposted_media_type if metadata.is_repost else None, + 'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount, + 'fee_currency': metadata.stream.fee.currency + if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None, + 'repost_count': self.get_reposted_count(claim_hash), + 'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(), + 'reposted_claim_type': reposted_claim_type, + 'reposted_has_source': reposted_has_source, + 'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(), + 'public_key_id': None if not metadata.is_channel else + self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)), + 'signature': (metadata.signature or b'').hex() or None, + # 'signature_digest': metadata.signature, + 'is_signature_valid': bool(claim.signature_valid), + 'tags': tags, + 'languages': languages, + 'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED, + 'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None, + 'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash), + 'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(), + 'reposted_tx_position': claim.reposted_tx_position, + 'reposted_height': claim.reposted_height, + 'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(), + 'channel_tx_position': claim.channel_tx_position, + 'channel_height': claim.channel_height, + } + + if metadata.is_repost and reposted_duration is not None: + value['duration'] = reposted_duration + elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration): + value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration + if metadata.is_stream: + value['release_time'] = metadata.stream.release_time or value['creation_timestamp'] + elif metadata.is_repost or metadata.is_collection: + value['release_time'] = value['creation_timestamp'] + return value + + async def all_claims_producer(self, batch_size=500_000): + batch = [] + if self._cache_all_claim_txos: + claim_iterator = self.claim_to_txo.items() + else: + claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate()) + + for claim_hash, claim_txo in claim_iterator: + # TODO: fix the couple of claim txos that dont have controlling names + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): + continue + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: + batch.append(claim) + if len(batch) == batch_size: + batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if meta: + yield meta + batch.clear() + batch.sort(key=lambda x: x.tx_hash) + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if meta: + yield meta + batch.clear() + + def claim_producer(self, claim_hash: bytes) -> Optional[Dict]: + claim_txo = self.get_cached_claim_txo(claim_hash) + if not claim_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + return + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + return + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if not claim: + self.logger.warning("wat") + return + return self._prepare_claim_metadata(claim.claim_hash, claim) + + def claims_producer(self, claim_hashes: Set[bytes]): + batch = [] + results = [] + + for claim_hash in claim_hashes: + claim_txo = self.get_cached_claim_txo(claim_hash) + if not claim_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: + batch.append(claim) + + batch.sort(key=lambda x: x.tx_hash) + + for claim in batch: + _meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if _meta: + results.append(_meta) + return results diff --git a/hub/elastic_sync/service.py b/hub/elastic_sync/service.py index 67e9fd4..4a09451 100644 --- a/hub/elastic_sync/service.py +++ b/hub/elastic_sync/service.py @@ -12,6 +12,7 @@ from hub.db.revertable import RevertableOp from hub.db.common import TrendingNotification, DB_PREFIXES from hub.notifier_protocol import ElasticNotifierProtocol from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT +from hub.elastic_sync.db import ElasticSyncDB if typing.TYPE_CHECKING: from hub.elastic_sync.env import ElasticEnv @@ -44,6 +45,15 @@ class ElasticSyncService(BlockchainReaderService): self._listeners: typing.List[ElasticNotifierProtocol] = [] self._force_reindex = False + def open_db(self): + env = self.env + self.db = ElasticSyncDB( + env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, + env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, + filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, + index_address_status=env.index_address_status + ) + async def run_es_notifier(self, synchronized: asyncio.Event): server = await asyncio.get_event_loop().create_server( lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port diff --git a/hub/herald/db.py b/hub/herald/db.py new file mode 100644 index 0000000..0f9a612 --- /dev/null +++ b/hub/herald/db.py @@ -0,0 +1,33 @@ +import asyncio +from typing import List +from concurrent.futures.thread import ThreadPoolExecutor +from hub.db import SecondaryDB + + +class HeraldDB(SecondaryDB): + def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, + cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, + blocking_channel_ids: List[str] = None, + filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, + index_address_status=False): + super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos, + cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor, + index_address_status) + # self.headers = None + + # async def _read_headers(self): + # def get_headers(): + # return [ + # header for header in self.prefix_db.header.iterate( + # start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, + # deserialize_value=False + # ) + # ] + # + # headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers) + # assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" + # self.headers = headers + + # async def initialize_caches(self): + # await super().initialize_caches() + # await self._read_headers() diff --git a/hub/herald/mempool.py b/hub/herald/mempool.py index 0ec1e0d..26e1ad7 100644 --- a/hub/herald/mempool.py +++ b/hub/herald/mempool.py @@ -13,7 +13,7 @@ from hub.scribe.transaction.deserializer import Deserializer if typing.TYPE_CHECKING: from hub.herald.session import SessionManager - from hub.db import HubDB + from hub.db import SecondaryDB @attr.s(slots=True) @@ -46,7 +46,7 @@ mempool_touched_address_count_metric = Gauge( class HubMemPool: - def __init__(self, coin, db: 'HubDB', refresh_secs=1.0): + def __init__(self, coin, db: 'SecondaryDB', refresh_secs=1.0): self.coin = coin self._db = db self.logger = logging.getLogger(__name__) diff --git a/hub/herald/search.py b/hub/herald/search.py index 4ef0acd..d3b6b7f 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -10,7 +10,7 @@ from hub.schema.result import Censor, Outputs from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result from hub.db.common import ResolveResult if TYPE_CHECKING: - from hub.db import HubDB + from hub.db import SecondaryDB class ChannelResolution(str): @@ -28,7 +28,7 @@ class StreamResolution(str): class SearchIndex: VERSION = 1 - def __init__(self, hub_db: 'HubDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', + def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200): self.hub_db = hub_db self.search_timeout = search_timeout diff --git a/hub/herald/service.py b/hub/herald/service.py index df7bedb..f5a7d3d 100644 --- a/hub/herald/service.py +++ b/hub/herald/service.py @@ -5,6 +5,7 @@ from hub.scribe.daemon import LBCDaemon from hub.herald.session import SessionManager from hub.herald.mempool import HubMemPool from hub.herald.udp import StatusServer +from hub.herald.db import HeraldDB from hub.service import BlockchainReaderService from hub.notifier_protocol import ElasticNotifierClientProtocol if typing.TYPE_CHECKING: @@ -35,6 +36,15 @@ class HubServerService(BlockchainReaderService): self._es_height = None self._es_block_hash = None + def open_db(self): + env = self.env + self.db = HeraldDB( + env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, + env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, + filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, + index_address_status=env.index_address_status + ) + def clear_caches(self): self.session_manager.clear_caches() # self.clear_search_cache() @@ -54,7 +64,6 @@ class HubServerService(BlockchainReaderService): self.session_manager.hashX_history_cache.clear() prev_count = self.db.tx_counts.pop() tx_count = self.db.tx_counts[-1] - self.db.headers.pop() self.db.block_hashes.pop() current_count = prev_count for _ in range(prev_count - tx_count): diff --git a/hub/herald/session.py b/hub/herald/session.py index d452eea..953e6a9 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -28,7 +28,7 @@ from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2, from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.framer import NewlineFramer if typing.TYPE_CHECKING: - from hub.db import HubDB + from hub.db import SecondaryDB from hub.herald.env import ServerEnv from hub.scribe.daemon import LBCDaemon from hub.herald.mempool import HubMemPool @@ -179,7 +179,7 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool', + def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool', daemon: 'LBCDaemon', shutdown_event: asyncio.Event, on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) diff --git a/hub/scribe/db.py b/hub/scribe/db.py new file mode 100644 index 0000000..b9f453e --- /dev/null +++ b/hub/scribe/db.py @@ -0,0 +1,117 @@ +import asyncio +import array +import time +from typing import List +from concurrent.futures.thread import ThreadPoolExecutor +from bisect import bisect_right +from hub.common import sha256 +from hub.db import SecondaryDB + + +class PrimaryDB(SecondaryDB): + def __init__(self, coin, db_dir: str, reorg_limit: int = 200, + cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, + max_open_files: int = 64, blocking_channel_ids: List[str] = None, + filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, + index_address_status=False): + super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes, + blocking_channel_ids, filtering_channel_ids, executor, index_address_status) + + def _rebuild_hashX_status_index(self, start_height: int): + self.logger.warning("rebuilding the address status index...") + prefix_db = self.prefix_db + + def hashX_iterator(): + last_hashX = None + for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False): + hashX = k[1:12] + if last_hashX is None: + last_hashX = hashX + if last_hashX != hashX: + yield hashX + last_hashX = hashX + if last_hashX: + yield last_hashX + + def hashX_status_from_history(history: bytes) -> bytes: + tx_counts = self.tx_counts + hist_tx_nums = array.array('I') + hist_tx_nums.frombytes(history) + hist = '' + for tx_num in hist_tx_nums: + hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:' + return sha256(hist.encode()) + + start = time.perf_counter() + + if start_height <= 0: + self.logger.info("loading all blockchain addresses, this will take a little while...") + hashXs = [hashX for hashX in hashX_iterator()] + else: + self.logger.info("loading addresses since block %i...", start_height) + hashXs = set() + for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,), + include_key=False): + hashXs.update(touched.touched_hashXs) + hashXs = list(hashXs) + + self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, " + f"now building the status index...") + op_cnt = 0 + hashX_cnt = 0 + for hashX in hashXs: + hashX_cnt += 1 + key = prefix_db.hashX_status.pack_key(hashX) + history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False)) + status = hashX_status_from_history(history) + existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) + if existing_status and existing_status == status: + continue + elif not existing_status: + prefix_db.stage_raw_put(key, status) + op_cnt += 1 + else: + prefix_db.stage_raw_delete(key, existing_status) + prefix_db.stage_raw_put(key, status) + op_cnt += 2 + if op_cnt > 100000: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + op_cnt = 0 + if op_cnt: + prefix_db.unsafe_commit() + self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...") + self._index_address_status = True + self.write_db_state() + self.prefix_db.unsafe_commit() + self.logger.info("finished indexing address statuses") + + def rebuild_hashX_status_index(self, start_height: int): + return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height) + + def apply_expiration_extension_fork(self): + # TODO: this can't be reorged + for k, v in self.prefix_db.claim_expiration.iterate(): + self.prefix_db.claim_expiration.stage_delete(k, v) + self.prefix_db.claim_expiration.stage_put( + (bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime, + k.tx_num, k.position), v + ) + self.prefix_db.unsafe_commit() + + def write_db_state(self): + """Write (UTXO) state to the batch.""" + last_indexed_address_status = 0 + if self.db_height > 0: + existing = self.prefix_db.db_state.get() + last_indexed_address_status = existing.hashX_status_last_indexed_height + self.prefix_db.db_state.stage_delete((), existing.expanded) + if self._index_address_status: + last_indexed_address_status = self.db_height + self.prefix_db.db_state.stage_put((), ( + self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip, + self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version, + self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor, + self.es_sync_height, last_indexed_address_status + ) + ) diff --git a/hub/scribe/mempool.py b/hub/scribe/mempool.py index 9bf6221..799611e 100644 --- a/hub/scribe/mempool.py +++ b/hub/scribe/mempool.py @@ -5,7 +5,7 @@ from collections import defaultdict from hub.scribe.transaction.deserializer import Deserializer if typing.TYPE_CHECKING: - from hub.db import HubDB + from hub.scribe.db import PrimaryDB @attr.s(slots=True) @@ -27,7 +27,7 @@ class MemPoolTxSummary: class MemPool: - def __init__(self, coin, db: 'HubDB'): + def __init__(self, coin, db: 'PrimaryDB'): self.coin = coin self._db = db self.txs = {} diff --git a/hub/scribe/service.py b/hub/scribe/service.py index 74bd73c..9dedc30 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -12,6 +12,7 @@ from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.error.base import ChainError from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache +from hub.scribe.db import PrimaryDB from hub.scribe.daemon import LBCDaemon from hub.scribe.transaction import Tx, TxOutput, TxInput, Block from hub.scribe.prefetcher import Prefetcher @@ -121,6 +122,15 @@ class BlockchainProcessorService(BlockchainService): self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) self.history_tx_info_cache = LRUCache(2 ** 16) + def open_db(self): + env = self.env + self.db = PrimaryDB( + env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos, + cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files, + blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, + executor=self._executor, index_address_status=env.index_address_status + ) + async def run_in_thread_with_lock(self, func, *args): # Run in a thread to prevent blocking. Shielded so that # cancellations from shutdown don't lose work - when the task @@ -1383,7 +1393,6 @@ class BlockchainProcessorService(BlockchainService): ) self.height = height - self.db.headers.append(block.header) self.db.block_hashes.append(self.env.coin.header_hash(block.header)) self.tip = self.coin.header_hash(block.header) @@ -1549,7 +1558,6 @@ class BlockchainProcessorService(BlockchainService): # Check and update self.tip self.db.tx_counts.pop() - self.db.headers.pop() reverted_block_hash = self.db.block_hashes.pop() self.tip = self.db.block_hashes[-1] if self.env.cache_all_tx_hashes: diff --git a/hub/service.py b/hub/service.py index 52ddaf1..d1d4f0e 100644 --- a/hub/service.py +++ b/hub/service.py @@ -7,7 +7,7 @@ from prometheus_client import Gauge, Histogram from hub import __version__, PROMETHEUS_NAMESPACE from hub.env import Env -from hub.db import HubDB +from hub.db import SecondaryDB from hub.db.prefixes import DBState from hub.common import HISTOGRAM_BUCKETS from hub.metrics import PrometheusServer @@ -17,6 +17,7 @@ class BlockchainService: """ Base class for blockchain readers as well as the block processor """ + def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'): self.env = env self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) @@ -27,13 +28,19 @@ class BlockchainService: self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix) self.lock = asyncio.Lock() self.last_state: typing.Optional[DBState] = None - self.db = HubDB( - env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, - secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, + self.secondary_name = secondary_name + self._stopping = False + self.db = None + self.open_db() + + def open_db(self): + env = self.env + self.db = SecondaryDB( + env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos, + env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, index_address_status=env.index_address_status ) - self._stopping = False def start_cancellable(self, run, *args): _flag = asyncio.Event() @@ -167,7 +174,7 @@ class BlockchainReaderService(BlockchainService): assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" header = self.db.prefix_db.header.get(height, deserialize_value=False) - self.db.headers.append(header) + # self.db.headers.append(header) self.db.block_hashes.append(self.env.coin.header_hash(header)) def unwind(self): @@ -176,7 +183,7 @@ class BlockchainReaderService(BlockchainService): """ prev_count = self.db.tx_counts.pop() tx_count = self.db.tx_counts[-1] - self.db.headers.pop() + # self.db.headers.pop() self.db.block_hashes.pop() if self.db._cache_all_tx_hashes: for _ in range(prev_count - tx_count): @@ -202,7 +209,8 @@ class BlockchainReaderService(BlockchainService): rewound = False if self.last_state: while True: - if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False): + if self.db.block_hashes[-1] == self.env.coin.header_hash( + self.db.prefix_db.header.get(last_height, deserialize_value=False)): self.log.debug("connects to block %i", last_height) break else: