Merge pull request #43 from lbryio/cleanup-db
Split up the db classes used by the different hub services
This commit is contained in:
commit
9f2e329d99
15 changed files with 500 additions and 399 deletions
|
@ -347,14 +347,9 @@ INVALID_ARGS = -32602
|
||||||
|
|
||||||
|
|
||||||
class CodeMessageError(Exception):
|
class CodeMessageError(Exception):
|
||||||
|
def __init__(self, code: int, message: str):
|
||||||
@property
|
self.code = code
|
||||||
def code(self):
|
self.message = message
|
||||||
return self.args[0]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def message(self):
|
|
||||||
return self.args[1]
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
def __eq__(self, other):
|
||||||
return (isinstance(other, self.__class__) and
|
return (isinstance(other, self.__class__) and
|
||||||
|
@ -382,7 +377,6 @@ class RPCError(CodeMessageError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DaemonError(Exception):
|
class DaemonError(Exception):
|
||||||
"""Raised when the daemon returns an error in its results."""
|
"""Raised when the daemon returns an error in its results."""
|
||||||
|
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
from .db import HubDB
|
from .db import SecondaryDB
|
||||||
|
|
389
hub/db/db.py
389
hub/db/db.py
|
@ -33,19 +33,18 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
|
||||||
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
|
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
|
||||||
|
|
||||||
|
|
||||||
class HubDB:
|
class SecondaryDB:
|
||||||
DB_VERSIONS = [7, 8, 9]
|
DB_VERSIONS = [7, 8, 9]
|
||||||
|
|
||||||
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
blocking_channel_ids: List[str] = None,
|
||||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
index_address_status=False):
|
index_address_status=False):
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
self.coin = coin
|
self.coin = coin
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
self._db_dir = db_dir
|
self._db_dir = db_dir
|
||||||
|
|
||||||
self._reorg_limit = reorg_limit
|
self._reorg_limit = reorg_limit
|
||||||
self._cache_all_claim_txos = cache_all_claim_txos
|
self._cache_all_claim_txos = cache_all_claim_txos
|
||||||
self._cache_all_tx_hashes = cache_all_tx_hashes
|
self._cache_all_tx_hashes = cache_all_tx_hashes
|
||||||
|
@ -81,7 +80,7 @@ class HubDB:
|
||||||
}
|
}
|
||||||
|
|
||||||
self.tx_counts = None
|
self.tx_counts = None
|
||||||
self.headers = None
|
# self.headers = None
|
||||||
self.block_hashes = None
|
self.block_hashes = None
|
||||||
self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE)
|
self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE)
|
||||||
self.last_flush = time.time()
|
self.last_flush = time.time()
|
||||||
|
@ -470,245 +469,6 @@ class HubDB:
|
||||||
self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex())
|
self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex())
|
||||||
return
|
return
|
||||||
|
|
||||||
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
|
|
||||||
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
|
|
||||||
if not metadata:
|
|
||||||
return
|
|
||||||
metadata = metadata
|
|
||||||
if not metadata.is_stream or not metadata.stream.has_fee:
|
|
||||||
fee_amount = 0
|
|
||||||
else:
|
|
||||||
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
|
|
||||||
if fee_amount >= 9223372036854775807:
|
|
||||||
return
|
|
||||||
reposted_claim_hash = claim.reposted_claim_hash
|
|
||||||
reposted_claim = None
|
|
||||||
reposted_metadata = None
|
|
||||||
if reposted_claim_hash:
|
|
||||||
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
|
|
||||||
if not reposted_claim:
|
|
||||||
return
|
|
||||||
reposted_metadata = self.get_claim_metadata(
|
|
||||||
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
|
|
||||||
)
|
|
||||||
if not reposted_metadata:
|
|
||||||
return
|
|
||||||
reposted_tags = []
|
|
||||||
reposted_languages = []
|
|
||||||
reposted_has_source = False
|
|
||||||
reposted_claim_type = None
|
|
||||||
reposted_stream_type = None
|
|
||||||
reposted_media_type = None
|
|
||||||
reposted_fee_amount = None
|
|
||||||
reposted_fee_currency = None
|
|
||||||
reposted_duration = None
|
|
||||||
if reposted_claim:
|
|
||||||
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
|
|
||||||
try:
|
|
||||||
reposted_metadata = self.coin.transaction(
|
|
||||||
raw_reposted_claim_tx
|
|
||||||
).outputs[reposted_claim.position].metadata
|
|
||||||
except:
|
|
||||||
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
|
|
||||||
claim.reposted_claim_hash.hex(), claim_hash.hex())
|
|
||||||
return
|
|
||||||
if reposted_metadata:
|
|
||||||
if reposted_metadata.is_stream:
|
|
||||||
meta = reposted_metadata.stream
|
|
||||||
elif reposted_metadata.is_channel:
|
|
||||||
meta = reposted_metadata.channel
|
|
||||||
elif reposted_metadata.is_collection:
|
|
||||||
meta = reposted_metadata.collection
|
|
||||||
elif reposted_metadata.is_repost:
|
|
||||||
meta = reposted_metadata.repost
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
reposted_tags = [tag for tag in meta.tags]
|
|
||||||
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
|
||||||
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
|
|
||||||
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
|
|
||||||
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
|
|
||||||
if reposted_has_source else 0
|
|
||||||
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
|
|
||||||
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
|
|
||||||
reposted_fee_amount = 0
|
|
||||||
else:
|
|
||||||
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
|
|
||||||
if reposted_fee_amount >= 9223372036854775807:
|
|
||||||
return
|
|
||||||
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
|
|
||||||
reposted_duration = None
|
|
||||||
if reposted_metadata.is_stream and \
|
|
||||||
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
|
|
||||||
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
|
|
||||||
if metadata.is_stream:
|
|
||||||
meta = metadata.stream
|
|
||||||
elif metadata.is_channel:
|
|
||||||
meta = metadata.channel
|
|
||||||
elif metadata.is_collection:
|
|
||||||
meta = metadata.collection
|
|
||||||
elif metadata.is_repost:
|
|
||||||
meta = metadata.repost
|
|
||||||
else:
|
|
||||||
return
|
|
||||||
claim_tags = [tag for tag in meta.tags]
|
|
||||||
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
|
||||||
|
|
||||||
tags = list(set(claim_tags).union(set(reposted_tags)))
|
|
||||||
languages = list(set(claim_languages).union(set(reposted_languages)))
|
|
||||||
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
|
|
||||||
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
|
|
||||||
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
|
|
||||||
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
|
|
||||||
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
|
|
||||||
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
|
|
||||||
value = {
|
|
||||||
'claim_id': claim_hash.hex(),
|
|
||||||
'claim_name': claim.name,
|
|
||||||
'normalized_name': claim.normalized_name,
|
|
||||||
'tx_id': claim.tx_hash[::-1].hex(),
|
|
||||||
'tx_num': claim.tx_num,
|
|
||||||
'tx_nout': claim.position,
|
|
||||||
'amount': claim.amount,
|
|
||||||
'timestamp': self.estimate_timestamp(claim.height),
|
|
||||||
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
|
|
||||||
'height': claim.height,
|
|
||||||
'creation_height': claim.creation_height,
|
|
||||||
'activation_height': claim.activation_height,
|
|
||||||
'expiration_height': claim.expiration_height,
|
|
||||||
'effective_amount': claim.effective_amount,
|
|
||||||
'support_amount': claim.support_amount,
|
|
||||||
'is_controlling': bool(claim.is_controlling),
|
|
||||||
'last_take_over_height': claim.last_takeover_height,
|
|
||||||
'short_url': claim.short_url,
|
|
||||||
'canonical_url': claim.canonical_url,
|
|
||||||
'title': None if not metadata.is_stream else metadata.stream.title,
|
|
||||||
'author': None if not metadata.is_stream else metadata.stream.author,
|
|
||||||
'description': None if not metadata.is_stream else metadata.stream.description,
|
|
||||||
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
|
||||||
'has_source': reposted_has_source if metadata.is_repost else (
|
|
||||||
False if not metadata.is_stream else metadata.stream.has_source),
|
|
||||||
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
|
|
||||||
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
|
|
||||||
if metadata.is_stream and metadata.stream.has_source
|
|
||||||
else reposted_stream_type if metadata.is_repost else 0,
|
|
||||||
'media_type': metadata.stream.source.media_type
|
|
||||||
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
|
|
||||||
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
|
|
||||||
'fee_currency': metadata.stream.fee.currency
|
|
||||||
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
|
|
||||||
'repost_count': self.get_reposted_count(claim_hash),
|
|
||||||
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
|
|
||||||
'reposted_claim_type': reposted_claim_type,
|
|
||||||
'reposted_has_source': reposted_has_source,
|
|
||||||
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
|
|
||||||
'public_key_id': None if not metadata.is_channel else
|
|
||||||
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
|
|
||||||
'signature': (metadata.signature or b'').hex() or None,
|
|
||||||
# 'signature_digest': metadata.signature,
|
|
||||||
'is_signature_valid': bool(claim.signature_valid),
|
|
||||||
'tags': tags,
|
|
||||||
'languages': languages,
|
|
||||||
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
|
|
||||||
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
|
|
||||||
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
|
|
||||||
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
|
|
||||||
'reposted_tx_position': claim.reposted_tx_position,
|
|
||||||
'reposted_height': claim.reposted_height,
|
|
||||||
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
|
|
||||||
'channel_tx_position': claim.channel_tx_position,
|
|
||||||
'channel_height': claim.channel_height,
|
|
||||||
}
|
|
||||||
|
|
||||||
if metadata.is_repost and reposted_duration is not None:
|
|
||||||
value['duration'] = reposted_duration
|
|
||||||
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
|
||||||
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
|
|
||||||
if metadata.is_stream:
|
|
||||||
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
|
|
||||||
elif metadata.is_repost or metadata.is_collection:
|
|
||||||
value['release_time'] = value['creation_timestamp']
|
|
||||||
return value
|
|
||||||
|
|
||||||
async def all_claims_producer(self, batch_size=500_000):
|
|
||||||
batch = []
|
|
||||||
if self._cache_all_claim_txos:
|
|
||||||
claim_iterator = self.claim_to_txo.items()
|
|
||||||
else:
|
|
||||||
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
|
|
||||||
|
|
||||||
for claim_hash, claim_txo in claim_iterator:
|
|
||||||
# TODO: fix the couple of claim txos that dont have controlling names
|
|
||||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
|
||||||
continue
|
|
||||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
|
||||||
claim = self._prepare_resolve_result(
|
|
||||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
|
||||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
|
||||||
)
|
|
||||||
if claim:
|
|
||||||
batch.append(claim)
|
|
||||||
if len(batch) == batch_size:
|
|
||||||
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
|
|
||||||
for claim in batch:
|
|
||||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
|
||||||
if meta:
|
|
||||||
yield meta
|
|
||||||
batch.clear()
|
|
||||||
batch.sort(key=lambda x: x.tx_hash)
|
|
||||||
for claim in batch:
|
|
||||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
|
||||||
if meta:
|
|
||||||
yield meta
|
|
||||||
batch.clear()
|
|
||||||
|
|
||||||
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
|
|
||||||
claim_txo = self.get_cached_claim_txo(claim_hash)
|
|
||||||
if not claim_txo:
|
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
|
||||||
return
|
|
||||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
|
||||||
return
|
|
||||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
|
||||||
claim = self._prepare_resolve_result(
|
|
||||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
|
||||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
|
||||||
)
|
|
||||||
if not claim:
|
|
||||||
self.logger.warning("wat")
|
|
||||||
return
|
|
||||||
return self._prepare_claim_metadata(claim.claim_hash, claim)
|
|
||||||
|
|
||||||
def claims_producer(self, claim_hashes: Set[bytes]):
|
|
||||||
batch = []
|
|
||||||
results = []
|
|
||||||
|
|
||||||
for claim_hash in claim_hashes:
|
|
||||||
claim_txo = self.get_cached_claim_txo(claim_hash)
|
|
||||||
if not claim_txo:
|
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
|
||||||
continue
|
|
||||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
|
||||||
continue
|
|
||||||
|
|
||||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
|
||||||
claim = self._prepare_resolve_result(
|
|
||||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
|
||||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
|
||||||
)
|
|
||||||
if claim:
|
|
||||||
batch.append(claim)
|
|
||||||
|
|
||||||
batch.sort(key=lambda x: x.tx_hash)
|
|
||||||
|
|
||||||
for claim in batch:
|
|
||||||
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
|
||||||
if _meta:
|
|
||||||
results.append(_meta)
|
|
||||||
return results
|
|
||||||
|
|
||||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||||
activated = defaultdict(list)
|
activated = defaultdict(list)
|
||||||
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):
|
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):
|
||||||
|
@ -761,20 +521,20 @@ class HubDB:
|
||||||
ts = time.perf_counter() - start
|
ts = time.perf_counter() - start
|
||||||
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
|
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
|
||||||
|
|
||||||
async def _read_headers(self):
|
# async def _read_headers(self):
|
||||||
# if self.headers is not None:
|
# # if self.headers is not None:
|
||||||
# return
|
# # return
|
||||||
|
#
|
||||||
def get_headers():
|
# def get_headers():
|
||||||
return [
|
# return [
|
||||||
header for header in self.prefix_db.header.iterate(
|
# header for header in self.prefix_db.header.iterate(
|
||||||
start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False
|
# start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False
|
||||||
)
|
# )
|
||||||
]
|
# ]
|
||||||
|
#
|
||||||
headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
|
# headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
|
||||||
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
# assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
||||||
self.headers = headers
|
# self.headers = headers
|
||||||
|
|
||||||
async def _read_block_hashes(self):
|
async def _read_block_hashes(self):
|
||||||
def get_block_hashes():
|
def get_block_hashes():
|
||||||
|
@ -785,7 +545,7 @@ class HubDB:
|
||||||
]
|
]
|
||||||
|
|
||||||
block_hashes = await asyncio.get_event_loop().run_in_executor(self._executor, get_block_hashes)
|
block_hashes = await asyncio.get_event_loop().run_in_executor(self._executor, get_block_hashes)
|
||||||
assert len(block_hashes) == len(self.headers)
|
# assert len(block_hashes) == len(self.headers)
|
||||||
self.block_hashes = block_hashes
|
self.block_hashes = block_hashes
|
||||||
|
|
||||||
async def _read_tx_hashes(self):
|
async def _read_tx_hashes(self):
|
||||||
|
@ -803,11 +563,6 @@ class HubDB:
|
||||||
ts = time.perf_counter() - start
|
ts = time.perf_counter() - start
|
||||||
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))
|
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))
|
||||||
|
|
||||||
def estimate_timestamp(self, height: int) -> int:
|
|
||||||
if height < len(self.headers):
|
|
||||||
return struct.unpack('<I', self.headers[height][100:104])[0]
|
|
||||||
return int(160.6855883050695 * height)
|
|
||||||
|
|
||||||
def open_db(self):
|
def open_db(self):
|
||||||
if self.prefix_db:
|
if self.prefix_db:
|
||||||
return
|
return
|
||||||
|
@ -858,7 +613,6 @@ class HubDB:
|
||||||
|
|
||||||
async def initialize_caches(self):
|
async def initialize_caches(self):
|
||||||
await self._read_tx_counts()
|
await self._read_tx_counts()
|
||||||
await self._read_headers()
|
|
||||||
await self._read_block_hashes()
|
await self._read_block_hashes()
|
||||||
if self._cache_all_claim_txos:
|
if self._cache_all_claim_txos:
|
||||||
await self._read_claim_txos()
|
await self._read_claim_txos()
|
||||||
|
@ -871,78 +625,6 @@ class HubDB:
|
||||||
self.prefix_db.close()
|
self.prefix_db.close()
|
||||||
self.prefix_db = None
|
self.prefix_db = None
|
||||||
|
|
||||||
def _rebuild_hashX_status_index(self, start_height: int):
|
|
||||||
self.logger.warning("rebuilding the address status index...")
|
|
||||||
prefix_db = self.prefix_db
|
|
||||||
|
|
||||||
def hashX_iterator():
|
|
||||||
last_hashX = None
|
|
||||||
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
|
|
||||||
hashX = k[1:12]
|
|
||||||
if last_hashX is None:
|
|
||||||
last_hashX = hashX
|
|
||||||
if last_hashX != hashX:
|
|
||||||
yield hashX
|
|
||||||
last_hashX = hashX
|
|
||||||
if last_hashX:
|
|
||||||
yield last_hashX
|
|
||||||
|
|
||||||
def hashX_status_from_history(history: bytes) -> bytes:
|
|
||||||
tx_counts = self.tx_counts
|
|
||||||
hist_tx_nums = array.array('I')
|
|
||||||
hist_tx_nums.frombytes(history)
|
|
||||||
hist = ''
|
|
||||||
for tx_num in hist_tx_nums:
|
|
||||||
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
|
|
||||||
return sha256(hist.encode())
|
|
||||||
|
|
||||||
start = time.perf_counter()
|
|
||||||
|
|
||||||
if start_height <= 0:
|
|
||||||
self.logger.info("loading all blockchain addresses, this will take a little while...")
|
|
||||||
hashXs = [hashX for hashX in hashX_iterator()]
|
|
||||||
else:
|
|
||||||
self.logger.info("loading addresses since block %i...", start_height)
|
|
||||||
hashXs = set()
|
|
||||||
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
|
|
||||||
include_key=False):
|
|
||||||
hashXs.update(touched.touched_hashXs)
|
|
||||||
hashXs = list(hashXs)
|
|
||||||
|
|
||||||
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
|
|
||||||
f"now building the status index...")
|
|
||||||
op_cnt = 0
|
|
||||||
hashX_cnt = 0
|
|
||||||
for hashX in hashXs:
|
|
||||||
hashX_cnt += 1
|
|
||||||
key = prefix_db.hashX_status.pack_key(hashX)
|
|
||||||
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
|
||||||
status = hashX_status_from_history(history)
|
|
||||||
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
|
||||||
if existing_status and existing_status == status:
|
|
||||||
continue
|
|
||||||
elif not existing_status:
|
|
||||||
prefix_db.stage_raw_put(key, status)
|
|
||||||
op_cnt += 1
|
|
||||||
else:
|
|
||||||
prefix_db.stage_raw_delete(key, existing_status)
|
|
||||||
prefix_db.stage_raw_put(key, status)
|
|
||||||
op_cnt += 2
|
|
||||||
if op_cnt > 100000:
|
|
||||||
prefix_db.unsafe_commit()
|
|
||||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
|
||||||
op_cnt = 0
|
|
||||||
if op_cnt:
|
|
||||||
prefix_db.unsafe_commit()
|
|
||||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
|
||||||
self._index_address_status = True
|
|
||||||
self.write_db_state()
|
|
||||||
self.prefix_db.unsafe_commit()
|
|
||||||
self.logger.info("finished indexing address statuses")
|
|
||||||
|
|
||||||
def rebuild_hashX_status_index(self, start_height: int):
|
|
||||||
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
|
|
||||||
|
|
||||||
def _get_hashX_status(self, hashX: bytes):
|
def _get_hashX_status(self, hashX: bytes):
|
||||||
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
|
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
|
||||||
if mempool_status:
|
if mempool_status:
|
||||||
|
@ -1169,9 +851,9 @@ class HubDB:
|
||||||
return {txid: tx_infos.get(txid) for txid in txids} # match ordering of the txs in the request
|
return {txid: tx_infos.get(txid) for txid in txids} # match ordering of the txs in the request
|
||||||
|
|
||||||
async def fs_block_hashes(self, height, count):
|
async def fs_block_hashes(self, height, count):
|
||||||
if height + count > len(self.headers):
|
if height + count > self.db_height + 1:
|
||||||
raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
raise DBError(f'only got {len(self.block_hashes) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
||||||
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
|
return self.block_hashes[height:height + count]
|
||||||
|
|
||||||
def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]:
|
def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]:
|
||||||
txs = []
|
txs = []
|
||||||
|
@ -1209,33 +891,6 @@ class HubDB:
|
||||||
"""Returns a height from which we should store undo info."""
|
"""Returns a height from which we should store undo info."""
|
||||||
return max_height - self._reorg_limit + 1
|
return max_height - self._reorg_limit + 1
|
||||||
|
|
||||||
def apply_expiration_extension_fork(self):
|
|
||||||
# TODO: this can't be reorged
|
|
||||||
for k, v in self.prefix_db.claim_expiration.iterate():
|
|
||||||
self.prefix_db.claim_expiration.stage_delete(k, v)
|
|
||||||
self.prefix_db.claim_expiration.stage_put(
|
|
||||||
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
|
|
||||||
k.tx_num, k.position), v
|
|
||||||
)
|
|
||||||
self.prefix_db.unsafe_commit()
|
|
||||||
|
|
||||||
def write_db_state(self):
|
|
||||||
"""Write (UTXO) state to the batch."""
|
|
||||||
last_indexed_address_status = 0
|
|
||||||
if self.db_height > 0:
|
|
||||||
existing = self.prefix_db.db_state.get()
|
|
||||||
last_indexed_address_status = existing.hashX_status_last_indexed_height
|
|
||||||
self.prefix_db.db_state.stage_delete((), existing.expanded)
|
|
||||||
if self._index_address_status:
|
|
||||||
last_indexed_address_status = self.db_height
|
|
||||||
self.prefix_db.db_state.stage_put((), (
|
|
||||||
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
|
||||||
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
|
|
||||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
|
|
||||||
self.es_sync_height, last_indexed_address_status
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
def read_db_state(self):
|
def read_db_state(self):
|
||||||
state = self.prefix_db.db_state.get()
|
state = self.prefix_db.db_state.get()
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ import typing
|
||||||
from bisect import bisect_right
|
from bisect import bisect_right
|
||||||
from hub.common import sha256
|
from hub.common import sha256
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.db.db import HubDB
|
from hub.scribe.db import PrimaryDB
|
||||||
|
|
||||||
FROM_VERSION = 7
|
FROM_VERSION = 7
|
||||||
TO_VERSION = 8
|
TO_VERSION = 8
|
||||||
|
@ -35,7 +35,7 @@ def hashX_history(db: 'HubDB', hashX: bytes):
|
||||||
return history, to_delete
|
return history, to_delete
|
||||||
|
|
||||||
|
|
||||||
def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes:
|
def hashX_status_from_history(db: 'PrimaryDB', history: bytes) -> bytes:
|
||||||
tx_counts = db.tx_counts
|
tx_counts = db.tx_counts
|
||||||
hist_tx_nums = array.array('I')
|
hist_tx_nums = array.array('I')
|
||||||
hist_tx_nums.frombytes(history)
|
hist_tx_nums.frombytes(history)
|
||||||
|
|
266
hub/elastic_sync/db.py
Normal file
266
hub/elastic_sync/db.py
Normal file
|
@ -0,0 +1,266 @@
|
||||||
|
from typing import Optional, Set, Dict, List
|
||||||
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
from hub.schema.claim import guess_stream_type
|
||||||
|
from hub.schema.result import Censor
|
||||||
|
from hub.common import hash160, STREAM_TYPES, CLAIM_TYPES, LRUCache
|
||||||
|
from hub.db import SecondaryDB
|
||||||
|
from hub.db.common import ResolveResult
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticSyncDB(SecondaryDB):
|
||||||
|
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||||
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
|
blocking_channel_ids: List[str] = None,
|
||||||
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
|
index_address_status=False):
|
||||||
|
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||||
|
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||||
|
index_address_status)
|
||||||
|
self.block_timestamp_cache = LRUCache(1024)
|
||||||
|
|
||||||
|
def estimate_timestamp(self, height: int) -> int:
|
||||||
|
if height in self.block_timestamp_cache:
|
||||||
|
return self.block_timestamp_cache[height]
|
||||||
|
header = self.prefix_db.header.get(height, deserialize_value=False)
|
||||||
|
timestamp = int(160.6855883050695 * height) if header else int.from_bytes(header[100:104], byteorder='little')
|
||||||
|
self.block_timestamp_cache[height] = timestamp
|
||||||
|
return timestamp
|
||||||
|
|
||||||
|
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
|
||||||
|
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
|
||||||
|
if not metadata:
|
||||||
|
return
|
||||||
|
metadata = metadata
|
||||||
|
if not metadata.is_stream or not metadata.stream.has_fee:
|
||||||
|
fee_amount = 0
|
||||||
|
else:
|
||||||
|
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
|
||||||
|
if fee_amount >= 9223372036854775807:
|
||||||
|
return
|
||||||
|
reposted_claim_hash = claim.reposted_claim_hash
|
||||||
|
reposted_claim = None
|
||||||
|
reposted_metadata = None
|
||||||
|
if reposted_claim_hash:
|
||||||
|
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
|
||||||
|
if not reposted_claim:
|
||||||
|
return
|
||||||
|
reposted_metadata = self.get_claim_metadata(
|
||||||
|
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
|
||||||
|
)
|
||||||
|
if not reposted_metadata:
|
||||||
|
return
|
||||||
|
reposted_tags = []
|
||||||
|
reposted_languages = []
|
||||||
|
reposted_has_source = False
|
||||||
|
reposted_claim_type = None
|
||||||
|
reposted_stream_type = None
|
||||||
|
reposted_media_type = None
|
||||||
|
reposted_fee_amount = None
|
||||||
|
reposted_fee_currency = None
|
||||||
|
reposted_duration = None
|
||||||
|
if reposted_claim:
|
||||||
|
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
|
||||||
|
try:
|
||||||
|
reposted_metadata = self.coin.transaction(
|
||||||
|
raw_reposted_claim_tx
|
||||||
|
).outputs[reposted_claim.position].metadata
|
||||||
|
except:
|
||||||
|
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
|
||||||
|
claim.reposted_claim_hash.hex(), claim_hash.hex())
|
||||||
|
return
|
||||||
|
if reposted_metadata:
|
||||||
|
if reposted_metadata.is_stream:
|
||||||
|
meta = reposted_metadata.stream
|
||||||
|
elif reposted_metadata.is_channel:
|
||||||
|
meta = reposted_metadata.channel
|
||||||
|
elif reposted_metadata.is_collection:
|
||||||
|
meta = reposted_metadata.collection
|
||||||
|
elif reposted_metadata.is_repost:
|
||||||
|
meta = reposted_metadata.repost
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
reposted_tags = [tag for tag in meta.tags]
|
||||||
|
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
||||||
|
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
|
||||||
|
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
|
||||||
|
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
|
||||||
|
if reposted_has_source else 0
|
||||||
|
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
|
||||||
|
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
|
||||||
|
reposted_fee_amount = 0
|
||||||
|
else:
|
||||||
|
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
|
||||||
|
if reposted_fee_amount >= 9223372036854775807:
|
||||||
|
return
|
||||||
|
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
|
||||||
|
reposted_duration = None
|
||||||
|
if reposted_metadata.is_stream and \
|
||||||
|
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
|
||||||
|
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
|
||||||
|
if metadata.is_stream:
|
||||||
|
meta = metadata.stream
|
||||||
|
elif metadata.is_channel:
|
||||||
|
meta = metadata.channel
|
||||||
|
elif metadata.is_collection:
|
||||||
|
meta = metadata.collection
|
||||||
|
elif metadata.is_repost:
|
||||||
|
meta = metadata.repost
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
claim_tags = [tag for tag in meta.tags]
|
||||||
|
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
||||||
|
|
||||||
|
tags = list(set(claim_tags).union(set(reposted_tags)))
|
||||||
|
languages = list(set(claim_languages).union(set(reposted_languages)))
|
||||||
|
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
|
||||||
|
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
|
||||||
|
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
|
||||||
|
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
|
||||||
|
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
|
||||||
|
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
|
||||||
|
value = {
|
||||||
|
'claim_id': claim_hash.hex(),
|
||||||
|
'claim_name': claim.name,
|
||||||
|
'normalized_name': claim.normalized_name,
|
||||||
|
'tx_id': claim.tx_hash[::-1].hex(),
|
||||||
|
'tx_num': claim.tx_num,
|
||||||
|
'tx_nout': claim.position,
|
||||||
|
'amount': claim.amount,
|
||||||
|
'timestamp': self.estimate_timestamp(claim.height),
|
||||||
|
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
|
||||||
|
'height': claim.height,
|
||||||
|
'creation_height': claim.creation_height,
|
||||||
|
'activation_height': claim.activation_height,
|
||||||
|
'expiration_height': claim.expiration_height,
|
||||||
|
'effective_amount': claim.effective_amount,
|
||||||
|
'support_amount': claim.support_amount,
|
||||||
|
'is_controlling': bool(claim.is_controlling),
|
||||||
|
'last_take_over_height': claim.last_takeover_height,
|
||||||
|
'short_url': claim.short_url,
|
||||||
|
'canonical_url': claim.canonical_url,
|
||||||
|
'title': None if not metadata.is_stream else metadata.stream.title,
|
||||||
|
'author': None if not metadata.is_stream else metadata.stream.author,
|
||||||
|
'description': None if not metadata.is_stream else metadata.stream.description,
|
||||||
|
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
||||||
|
'has_source': reposted_has_source if metadata.is_repost else (
|
||||||
|
False if not metadata.is_stream else metadata.stream.has_source),
|
||||||
|
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
|
||||||
|
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
|
||||||
|
if metadata.is_stream and metadata.stream.has_source
|
||||||
|
else reposted_stream_type if metadata.is_repost else 0,
|
||||||
|
'media_type': metadata.stream.source.media_type
|
||||||
|
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
|
||||||
|
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
|
||||||
|
'fee_currency': metadata.stream.fee.currency
|
||||||
|
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
|
||||||
|
'repost_count': self.get_reposted_count(claim_hash),
|
||||||
|
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
|
||||||
|
'reposted_claim_type': reposted_claim_type,
|
||||||
|
'reposted_has_source': reposted_has_source,
|
||||||
|
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
|
||||||
|
'public_key_id': None if not metadata.is_channel else
|
||||||
|
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
|
||||||
|
'signature': (metadata.signature or b'').hex() or None,
|
||||||
|
# 'signature_digest': metadata.signature,
|
||||||
|
'is_signature_valid': bool(claim.signature_valid),
|
||||||
|
'tags': tags,
|
||||||
|
'languages': languages,
|
||||||
|
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
|
||||||
|
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
|
||||||
|
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
|
||||||
|
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
|
||||||
|
'reposted_tx_position': claim.reposted_tx_position,
|
||||||
|
'reposted_height': claim.reposted_height,
|
||||||
|
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
|
||||||
|
'channel_tx_position': claim.channel_tx_position,
|
||||||
|
'channel_height': claim.channel_height,
|
||||||
|
}
|
||||||
|
|
||||||
|
if metadata.is_repost and reposted_duration is not None:
|
||||||
|
value['duration'] = reposted_duration
|
||||||
|
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
||||||
|
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
|
||||||
|
if metadata.is_stream:
|
||||||
|
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
|
||||||
|
elif metadata.is_repost or metadata.is_collection:
|
||||||
|
value['release_time'] = value['creation_timestamp']
|
||||||
|
return value
|
||||||
|
|
||||||
|
async def all_claims_producer(self, batch_size=500_000):
|
||||||
|
batch = []
|
||||||
|
if self._cache_all_claim_txos:
|
||||||
|
claim_iterator = self.claim_to_txo.items()
|
||||||
|
else:
|
||||||
|
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
|
||||||
|
|
||||||
|
for claim_hash, claim_txo in claim_iterator:
|
||||||
|
# TODO: fix the couple of claim txos that dont have controlling names
|
||||||
|
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||||
|
continue
|
||||||
|
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||||
|
claim = self._prepare_resolve_result(
|
||||||
|
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||||
|
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||||
|
)
|
||||||
|
if claim:
|
||||||
|
batch.append(claim)
|
||||||
|
if len(batch) == batch_size:
|
||||||
|
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
|
||||||
|
for claim in batch:
|
||||||
|
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||||
|
if meta:
|
||||||
|
yield meta
|
||||||
|
batch.clear()
|
||||||
|
batch.sort(key=lambda x: x.tx_hash)
|
||||||
|
for claim in batch:
|
||||||
|
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||||
|
if meta:
|
||||||
|
yield meta
|
||||||
|
batch.clear()
|
||||||
|
|
||||||
|
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
|
||||||
|
claim_txo = self.get_cached_claim_txo(claim_hash)
|
||||||
|
if not claim_txo:
|
||||||
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
|
return
|
||||||
|
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||||
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
|
return
|
||||||
|
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||||
|
claim = self._prepare_resolve_result(
|
||||||
|
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||||
|
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||||
|
)
|
||||||
|
if not claim:
|
||||||
|
self.logger.warning("wat")
|
||||||
|
return
|
||||||
|
return self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||||
|
|
||||||
|
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||||
|
batch = []
|
||||||
|
results = []
|
||||||
|
|
||||||
|
for claim_hash in claim_hashes:
|
||||||
|
claim_txo = self.get_cached_claim_txo(claim_hash)
|
||||||
|
if not claim_txo:
|
||||||
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
|
continue
|
||||||
|
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||||
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
|
continue
|
||||||
|
|
||||||
|
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||||
|
claim = self._prepare_resolve_result(
|
||||||
|
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||||
|
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||||
|
)
|
||||||
|
if claim:
|
||||||
|
batch.append(claim)
|
||||||
|
|
||||||
|
batch.sort(key=lambda x: x.tx_hash)
|
||||||
|
|
||||||
|
for claim in batch:
|
||||||
|
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||||
|
if _meta:
|
||||||
|
results.append(_meta)
|
||||||
|
return results
|
|
@ -12,6 +12,7 @@ from hub.db.revertable import RevertableOp
|
||||||
from hub.db.common import TrendingNotification, DB_PREFIXES
|
from hub.db.common import TrendingNotification, DB_PREFIXES
|
||||||
from hub.notifier_protocol import ElasticNotifierProtocol
|
from hub.notifier_protocol import ElasticNotifierProtocol
|
||||||
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
||||||
|
from hub.elastic_sync.db import ElasticSyncDB
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.elastic_sync.env import ElasticEnv
|
from hub.elastic_sync.env import ElasticEnv
|
||||||
|
|
||||||
|
@ -44,6 +45,15 @@ class ElasticSyncService(BlockchainReaderService):
|
||||||
self._listeners: typing.List[ElasticNotifierProtocol] = []
|
self._listeners: typing.List[ElasticNotifierProtocol] = []
|
||||||
self._force_reindex = False
|
self._force_reindex = False
|
||||||
|
|
||||||
|
def open_db(self):
|
||||||
|
env = self.env
|
||||||
|
self.db = ElasticSyncDB(
|
||||||
|
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||||
|
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||||
|
index_address_status=env.index_address_status
|
||||||
|
)
|
||||||
|
|
||||||
async def run_es_notifier(self, synchronized: asyncio.Event):
|
async def run_es_notifier(self, synchronized: asyncio.Event):
|
||||||
server = await asyncio.get_event_loop().create_server(
|
server = await asyncio.get_event_loop().create_server(
|
||||||
lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port
|
lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port
|
||||||
|
@ -233,6 +243,7 @@ class ElasticSyncService(BlockchainReaderService):
|
||||||
self._advanced = True
|
self._advanced = True
|
||||||
|
|
||||||
def unwind(self):
|
def unwind(self):
|
||||||
|
self.db.block_timestamp_cache.clear()
|
||||||
reverted_block_hash = self.db.block_hashes[-1]
|
reverted_block_hash = self.db.block_hashes[-1]
|
||||||
super().unwind()
|
super().unwind()
|
||||||
packed = self.db.prefix_db.undo.get(len(self.db.tx_counts), reverted_block_hash)
|
packed = self.db.prefix_db.undo.get(len(self.db.tx_counts), reverted_block_hash)
|
||||||
|
|
33
hub/herald/db.py
Normal file
33
hub/herald/db.py
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
import asyncio
|
||||||
|
from typing import List
|
||||||
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
from hub.db import SecondaryDB
|
||||||
|
|
||||||
|
|
||||||
|
class HeraldDB(SecondaryDB):
|
||||||
|
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||||
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
|
blocking_channel_ids: List[str] = None,
|
||||||
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
|
index_address_status=False):
|
||||||
|
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||||
|
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||||
|
index_address_status)
|
||||||
|
# self.headers = None
|
||||||
|
|
||||||
|
# async def _read_headers(self):
|
||||||
|
# def get_headers():
|
||||||
|
# return [
|
||||||
|
# header for header in self.prefix_db.header.iterate(
|
||||||
|
# start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False,
|
||||||
|
# deserialize_value=False
|
||||||
|
# )
|
||||||
|
# ]
|
||||||
|
#
|
||||||
|
# headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
|
||||||
|
# assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
||||||
|
# self.headers = headers
|
||||||
|
|
||||||
|
# async def initialize_caches(self):
|
||||||
|
# await super().initialize_caches()
|
||||||
|
# await self._read_headers()
|
|
@ -13,7 +13,7 @@ from hub.scribe.transaction.deserializer import Deserializer
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.herald.session import SessionManager
|
from hub.herald.session import SessionManager
|
||||||
from hub.db import HubDB
|
from hub.db import SecondaryDB
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True)
|
@attr.s(slots=True)
|
||||||
|
@ -46,7 +46,7 @@ mempool_touched_address_count_metric = Gauge(
|
||||||
|
|
||||||
|
|
||||||
class HubMemPool:
|
class HubMemPool:
|
||||||
def __init__(self, coin, db: 'HubDB', refresh_secs=1.0):
|
def __init__(self, coin, db: 'SecondaryDB', refresh_secs=1.0):
|
||||||
self.coin = coin
|
self.coin = coin
|
||||||
self._db = db
|
self._db = db
|
||||||
self.logger = logging.getLogger(__name__)
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
|
@ -10,7 +10,7 @@ from hub.schema.result import Censor, Outputs
|
||||||
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
|
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
|
||||||
from hub.db.common import ResolveResult
|
from hub.db.common import ResolveResult
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from hub.db import HubDB
|
from hub.db import SecondaryDB
|
||||||
|
|
||||||
|
|
||||||
class ChannelResolution(str):
|
class ChannelResolution(str):
|
||||||
|
@ -28,7 +28,7 @@ class StreamResolution(str):
|
||||||
class SearchIndex:
|
class SearchIndex:
|
||||||
VERSION = 1
|
VERSION = 1
|
||||||
|
|
||||||
def __init__(self, hub_db: 'HubDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
||||||
elastic_port=9200):
|
elastic_port=9200):
|
||||||
self.hub_db = hub_db
|
self.hub_db = hub_db
|
||||||
self.search_timeout = search_timeout
|
self.search_timeout = search_timeout
|
||||||
|
|
|
@ -5,6 +5,7 @@ from hub.scribe.daemon import LBCDaemon
|
||||||
from hub.herald.session import SessionManager
|
from hub.herald.session import SessionManager
|
||||||
from hub.herald.mempool import HubMemPool
|
from hub.herald.mempool import HubMemPool
|
||||||
from hub.herald.udp import StatusServer
|
from hub.herald.udp import StatusServer
|
||||||
|
from hub.herald.db import HeraldDB
|
||||||
from hub.service import BlockchainReaderService
|
from hub.service import BlockchainReaderService
|
||||||
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
@ -35,6 +36,15 @@ class HubServerService(BlockchainReaderService):
|
||||||
self._es_height = None
|
self._es_height = None
|
||||||
self._es_block_hash = None
|
self._es_block_hash = None
|
||||||
|
|
||||||
|
def open_db(self):
|
||||||
|
env = self.env
|
||||||
|
self.db = HeraldDB(
|
||||||
|
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||||
|
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||||
|
index_address_status=env.index_address_status
|
||||||
|
)
|
||||||
|
|
||||||
def clear_caches(self):
|
def clear_caches(self):
|
||||||
self.session_manager.clear_caches()
|
self.session_manager.clear_caches()
|
||||||
# self.clear_search_cache()
|
# self.clear_search_cache()
|
||||||
|
@ -54,7 +64,6 @@ class HubServerService(BlockchainReaderService):
|
||||||
self.session_manager.hashX_history_cache.clear()
|
self.session_manager.hashX_history_cache.clear()
|
||||||
prev_count = self.db.tx_counts.pop()
|
prev_count = self.db.tx_counts.pop()
|
||||||
tx_count = self.db.tx_counts[-1]
|
tx_count = self.db.tx_counts[-1]
|
||||||
self.db.headers.pop()
|
|
||||||
self.db.block_hashes.pop()
|
self.db.block_hashes.pop()
|
||||||
current_count = prev_count
|
current_count = prev_count
|
||||||
for _ in range(prev_count - tx_count):
|
for _ in range(prev_count - tx_count):
|
||||||
|
|
|
@ -28,7 +28,7 @@ from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2,
|
||||||
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
|
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
|
||||||
from hub.herald.framer import NewlineFramer
|
from hub.herald.framer import NewlineFramer
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.db import HubDB
|
from hub.db import SecondaryDB
|
||||||
from hub.herald.env import ServerEnv
|
from hub.herald.env import ServerEnv
|
||||||
from hub.scribe.daemon import LBCDaemon
|
from hub.scribe.daemon import LBCDaemon
|
||||||
from hub.herald.mempool import HubMemPool
|
from hub.herald.mempool import HubMemPool
|
||||||
|
@ -179,7 +179,7 @@ class SessionManager:
|
||||||
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool',
|
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
|
||||||
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
|
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
|
||||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||||
env.max_send = max(350000, env.max_send)
|
env.max_send = max(350000, env.max_send)
|
||||||
|
@ -1750,7 +1750,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
||||||
if not verbose:
|
if not verbose:
|
||||||
return raw_tx.hex()
|
return raw_tx.hex()
|
||||||
return self.coin.transaction(raw_tx).as_dict(self.coin)
|
return self.coin.transaction(raw_tx).as_dict(self.coin)
|
||||||
return RPCError("No such mempool or blockchain transaction.")
|
return RPCError(BAD_REQUEST, "No such mempool or blockchain transaction.")
|
||||||
|
|
||||||
def _get_merkle_branch(self, tx_hashes, tx_pos):
|
def _get_merkle_branch(self, tx_hashes, tx_pos):
|
||||||
"""Return a merkle branch to a transaction.
|
"""Return a merkle branch to a transaction.
|
||||||
|
|
117
hub/scribe/db.py
Normal file
117
hub/scribe/db.py
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
import asyncio
|
||||||
|
import array
|
||||||
|
import time
|
||||||
|
from typing import List
|
||||||
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
from bisect import bisect_right
|
||||||
|
from hub.common import sha256
|
||||||
|
from hub.db import SecondaryDB
|
||||||
|
|
||||||
|
|
||||||
|
class PrimaryDB(SecondaryDB):
|
||||||
|
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
||||||
|
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||||
|
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
||||||
|
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||||
|
index_address_status=False):
|
||||||
|
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
|
||||||
|
blocking_channel_ids, filtering_channel_ids, executor, index_address_status)
|
||||||
|
|
||||||
|
def _rebuild_hashX_status_index(self, start_height: int):
|
||||||
|
self.logger.warning("rebuilding the address status index...")
|
||||||
|
prefix_db = self.prefix_db
|
||||||
|
|
||||||
|
def hashX_iterator():
|
||||||
|
last_hashX = None
|
||||||
|
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
|
||||||
|
hashX = k[1:12]
|
||||||
|
if last_hashX is None:
|
||||||
|
last_hashX = hashX
|
||||||
|
if last_hashX != hashX:
|
||||||
|
yield hashX
|
||||||
|
last_hashX = hashX
|
||||||
|
if last_hashX:
|
||||||
|
yield last_hashX
|
||||||
|
|
||||||
|
def hashX_status_from_history(history: bytes) -> bytes:
|
||||||
|
tx_counts = self.tx_counts
|
||||||
|
hist_tx_nums = array.array('I')
|
||||||
|
hist_tx_nums.frombytes(history)
|
||||||
|
hist = ''
|
||||||
|
for tx_num in hist_tx_nums:
|
||||||
|
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
|
||||||
|
return sha256(hist.encode())
|
||||||
|
|
||||||
|
start = time.perf_counter()
|
||||||
|
|
||||||
|
if start_height <= 0:
|
||||||
|
self.logger.info("loading all blockchain addresses, this will take a little while...")
|
||||||
|
hashXs = [hashX for hashX in hashX_iterator()]
|
||||||
|
else:
|
||||||
|
self.logger.info("loading addresses since block %i...", start_height)
|
||||||
|
hashXs = set()
|
||||||
|
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
|
||||||
|
include_key=False):
|
||||||
|
hashXs.update(touched.touched_hashXs)
|
||||||
|
hashXs = list(hashXs)
|
||||||
|
|
||||||
|
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
|
||||||
|
f"now building the status index...")
|
||||||
|
op_cnt = 0
|
||||||
|
hashX_cnt = 0
|
||||||
|
for hashX in hashXs:
|
||||||
|
hashX_cnt += 1
|
||||||
|
key = prefix_db.hashX_status.pack_key(hashX)
|
||||||
|
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
||||||
|
status = hashX_status_from_history(history)
|
||||||
|
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
||||||
|
if existing_status and existing_status == status:
|
||||||
|
continue
|
||||||
|
elif not existing_status:
|
||||||
|
prefix_db.stage_raw_put(key, status)
|
||||||
|
op_cnt += 1
|
||||||
|
else:
|
||||||
|
prefix_db.stage_raw_delete(key, existing_status)
|
||||||
|
prefix_db.stage_raw_put(key, status)
|
||||||
|
op_cnt += 2
|
||||||
|
if op_cnt > 100000:
|
||||||
|
prefix_db.unsafe_commit()
|
||||||
|
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||||
|
op_cnt = 0
|
||||||
|
if op_cnt:
|
||||||
|
prefix_db.unsafe_commit()
|
||||||
|
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||||
|
self._index_address_status = True
|
||||||
|
self.write_db_state()
|
||||||
|
self.prefix_db.unsafe_commit()
|
||||||
|
self.logger.info("finished indexing address statuses")
|
||||||
|
|
||||||
|
def rebuild_hashX_status_index(self, start_height: int):
|
||||||
|
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
|
||||||
|
|
||||||
|
def apply_expiration_extension_fork(self):
|
||||||
|
# TODO: this can't be reorged
|
||||||
|
for k, v in self.prefix_db.claim_expiration.iterate():
|
||||||
|
self.prefix_db.claim_expiration.stage_delete(k, v)
|
||||||
|
self.prefix_db.claim_expiration.stage_put(
|
||||||
|
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
|
||||||
|
k.tx_num, k.position), v
|
||||||
|
)
|
||||||
|
self.prefix_db.unsafe_commit()
|
||||||
|
|
||||||
|
def write_db_state(self):
|
||||||
|
"""Write (UTXO) state to the batch."""
|
||||||
|
last_indexed_address_status = 0
|
||||||
|
if self.db_height > 0:
|
||||||
|
existing = self.prefix_db.db_state.get()
|
||||||
|
last_indexed_address_status = existing.hashX_status_last_indexed_height
|
||||||
|
self.prefix_db.db_state.stage_delete((), existing.expanded)
|
||||||
|
if self._index_address_status:
|
||||||
|
last_indexed_address_status = self.db_height
|
||||||
|
self.prefix_db.db_state.stage_put((), (
|
||||||
|
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
||||||
|
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
|
||||||
|
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
|
||||||
|
self.es_sync_height, last_indexed_address_status
|
||||||
|
)
|
||||||
|
)
|
|
@ -5,7 +5,7 @@ from collections import defaultdict
|
||||||
from hub.scribe.transaction.deserializer import Deserializer
|
from hub.scribe.transaction.deserializer import Deserializer
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from hub.db import HubDB
|
from hub.scribe.db import PrimaryDB
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True)
|
@attr.s(slots=True)
|
||||||
|
@ -27,7 +27,7 @@ class MemPoolTxSummary:
|
||||||
|
|
||||||
|
|
||||||
class MemPool:
|
class MemPool:
|
||||||
def __init__(self, coin, db: 'HubDB'):
|
def __init__(self, coin, db: 'PrimaryDB'):
|
||||||
self.coin = coin
|
self.coin = coin
|
||||||
self._db = db
|
self._db = db
|
||||||
self.txs = {}
|
self.txs = {}
|
||||||
|
|
|
@ -12,6 +12,7 @@ from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||||
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
||||||
from hub.error.base import ChainError
|
from hub.error.base import ChainError
|
||||||
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
|
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
|
||||||
|
from hub.scribe.db import PrimaryDB
|
||||||
from hub.scribe.daemon import LBCDaemon
|
from hub.scribe.daemon import LBCDaemon
|
||||||
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
|
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
|
||||||
from hub.scribe.prefetcher import Prefetcher
|
from hub.scribe.prefetcher import Prefetcher
|
||||||
|
@ -121,6 +122,15 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
|
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
|
||||||
self.history_tx_info_cache = LRUCache(2 ** 16)
|
self.history_tx_info_cache = LRUCache(2 ** 16)
|
||||||
|
|
||||||
|
def open_db(self):
|
||||||
|
env = self.env
|
||||||
|
self.db = PrimaryDB(
|
||||||
|
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
|
||||||
|
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
|
||||||
|
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
|
||||||
|
executor=self._executor, index_address_status=env.index_address_status
|
||||||
|
)
|
||||||
|
|
||||||
async def run_in_thread_with_lock(self, func, *args):
|
async def run_in_thread_with_lock(self, func, *args):
|
||||||
# Run in a thread to prevent blocking. Shielded so that
|
# Run in a thread to prevent blocking. Shielded so that
|
||||||
# cancellations from shutdown don't lose work - when the task
|
# cancellations from shutdown don't lose work - when the task
|
||||||
|
@ -1383,7 +1393,6 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.height = height
|
self.height = height
|
||||||
self.db.headers.append(block.header)
|
|
||||||
self.db.block_hashes.append(self.env.coin.header_hash(block.header))
|
self.db.block_hashes.append(self.env.coin.header_hash(block.header))
|
||||||
self.tip = self.coin.header_hash(block.header)
|
self.tip = self.coin.header_hash(block.header)
|
||||||
|
|
||||||
|
@ -1549,7 +1558,6 @@ class BlockchainProcessorService(BlockchainService):
|
||||||
# Check and update self.tip
|
# Check and update self.tip
|
||||||
|
|
||||||
self.db.tx_counts.pop()
|
self.db.tx_counts.pop()
|
||||||
self.db.headers.pop()
|
|
||||||
reverted_block_hash = self.db.block_hashes.pop()
|
reverted_block_hash = self.db.block_hashes.pop()
|
||||||
self.tip = self.db.block_hashes[-1]
|
self.tip = self.db.block_hashes[-1]
|
||||||
if self.env.cache_all_tx_hashes:
|
if self.env.cache_all_tx_hashes:
|
||||||
|
|
|
@ -7,7 +7,7 @@ from prometheus_client import Gauge, Histogram
|
||||||
|
|
||||||
from hub import __version__, PROMETHEUS_NAMESPACE
|
from hub import __version__, PROMETHEUS_NAMESPACE
|
||||||
from hub.env import Env
|
from hub.env import Env
|
||||||
from hub.db import HubDB
|
from hub.db import SecondaryDB
|
||||||
from hub.db.prefixes import DBState
|
from hub.db.prefixes import DBState
|
||||||
from hub.common import HISTOGRAM_BUCKETS
|
from hub.common import HISTOGRAM_BUCKETS
|
||||||
from hub.metrics import PrometheusServer
|
from hub.metrics import PrometheusServer
|
||||||
|
@ -17,6 +17,7 @@ class BlockchainService:
|
||||||
"""
|
"""
|
||||||
Base class for blockchain readers as well as the block processor
|
Base class for blockchain readers as well as the block processor
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'):
|
def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'):
|
||||||
self.env = env
|
self.env = env
|
||||||
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
||||||
|
@ -27,13 +28,19 @@ class BlockchainService:
|
||||||
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
|
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
|
||||||
self.lock = asyncio.Lock()
|
self.lock = asyncio.Lock()
|
||||||
self.last_state: typing.Optional[DBState] = None
|
self.last_state: typing.Optional[DBState] = None
|
||||||
self.db = HubDB(
|
self.secondary_name = secondary_name
|
||||||
env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
self._stopping = False
|
||||||
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
|
self.db = None
|
||||||
|
self.open_db()
|
||||||
|
|
||||||
|
def open_db(self):
|
||||||
|
env = self.env
|
||||||
|
self.db = SecondaryDB(
|
||||||
|
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||||
|
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||||
index_address_status=env.index_address_status
|
index_address_status=env.index_address_status
|
||||||
)
|
)
|
||||||
self._stopping = False
|
|
||||||
|
|
||||||
def start_cancellable(self, run, *args):
|
def start_cancellable(self, run, *args):
|
||||||
_flag = asyncio.Event()
|
_flag = asyncio.Event()
|
||||||
|
@ -167,7 +174,7 @@ class BlockchainReaderService(BlockchainService):
|
||||||
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
|
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
|
||||||
|
|
||||||
header = self.db.prefix_db.header.get(height, deserialize_value=False)
|
header = self.db.prefix_db.header.get(height, deserialize_value=False)
|
||||||
self.db.headers.append(header)
|
# self.db.headers.append(header)
|
||||||
self.db.block_hashes.append(self.env.coin.header_hash(header))
|
self.db.block_hashes.append(self.env.coin.header_hash(header))
|
||||||
|
|
||||||
def unwind(self):
|
def unwind(self):
|
||||||
|
@ -176,7 +183,7 @@ class BlockchainReaderService(BlockchainService):
|
||||||
"""
|
"""
|
||||||
prev_count = self.db.tx_counts.pop()
|
prev_count = self.db.tx_counts.pop()
|
||||||
tx_count = self.db.tx_counts[-1]
|
tx_count = self.db.tx_counts[-1]
|
||||||
self.db.headers.pop()
|
# self.db.headers.pop()
|
||||||
self.db.block_hashes.pop()
|
self.db.block_hashes.pop()
|
||||||
if self.db._cache_all_tx_hashes:
|
if self.db._cache_all_tx_hashes:
|
||||||
for _ in range(prev_count - tx_count):
|
for _ in range(prev_count - tx_count):
|
||||||
|
@ -202,7 +209,8 @@ class BlockchainReaderService(BlockchainService):
|
||||||
rewound = False
|
rewound = False
|
||||||
if self.last_state:
|
if self.last_state:
|
||||||
while True:
|
while True:
|
||||||
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
|
if self.db.block_hashes[-1] == self.env.coin.header_hash(
|
||||||
|
self.db.prefix_db.header.get(last_height, deserialize_value=False)):
|
||||||
self.log.debug("connects to block %i", last_height)
|
self.log.debug("connects to block %i", last_height)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in a new issue