Split up the db classes used by the different hub services #43
15 changed files with 500 additions and 399 deletions
|
@ -347,14 +347,9 @@ INVALID_ARGS = -32602
|
|||
|
||||
|
||||
class CodeMessageError(Exception):
|
||||
|
||||
@property
|
||||
def code(self):
|
||||
return self.args[0]
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
return self.args[1]
|
||||
def __init__(self, code: int, message: str):
|
||||
self.code = code
|
||||
self.message = message
|
||||
|
||||
def __eq__(self, other):
|
||||
return (isinstance(other, self.__class__) and
|
||||
|
@ -382,7 +377,6 @@ class RPCError(CodeMessageError):
|
|||
pass
|
||||
|
||||
|
||||
|
||||
class DaemonError(Exception):
|
||||
"""Raised when the daemon returns an error in its results."""
|
||||
|
||||
|
|
|
@ -1 +1 @@
|
|||
from .db import HubDB
|
||||
from .db import SecondaryDB
|
||||
|
|
389
hub/db/db.py
389
hub/db/db.py
|
@ -33,19 +33,18 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
|
|||
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
|
||||
|
||||
|
||||
class HubDB:
|
||||
class SecondaryDB:
|
||||
DB_VERSIONS = [7, 8, 9]
|
||||
|
||||
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.coin = coin
|
||||
self._executor = executor
|
||||
self._db_dir = db_dir
|
||||
|
||||
self._reorg_limit = reorg_limit
|
||||
self._cache_all_claim_txos = cache_all_claim_txos
|
||||
self._cache_all_tx_hashes = cache_all_tx_hashes
|
||||
|
@ -81,7 +80,7 @@ class HubDB:
|
|||
}
|
||||
|
||||
self.tx_counts = None
|
||||
self.headers = None
|
||||
# self.headers = None
|
||||
self.block_hashes = None
|
||||
self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE)
|
||||
self.last_flush = time.time()
|
||||
|
@ -470,245 +469,6 @@ class HubDB:
|
|||
self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex())
|
||||
return
|
||||
|
||||
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
|
||||
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
|
||||
if not metadata:
|
||||
return
|
||||
metadata = metadata
|
||||
if not metadata.is_stream or not metadata.stream.has_fee:
|
||||
fee_amount = 0
|
||||
else:
|
||||
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
|
||||
if fee_amount >= 9223372036854775807:
|
||||
return
|
||||
reposted_claim_hash = claim.reposted_claim_hash
|
||||
reposted_claim = None
|
||||
reposted_metadata = None
|
||||
if reposted_claim_hash:
|
||||
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
|
||||
if not reposted_claim:
|
||||
return
|
||||
reposted_metadata = self.get_claim_metadata(
|
||||
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
|
||||
)
|
||||
if not reposted_metadata:
|
||||
return
|
||||
reposted_tags = []
|
||||
reposted_languages = []
|
||||
reposted_has_source = False
|
||||
reposted_claim_type = None
|
||||
reposted_stream_type = None
|
||||
reposted_media_type = None
|
||||
reposted_fee_amount = None
|
||||
reposted_fee_currency = None
|
||||
reposted_duration = None
|
||||
if reposted_claim:
|
||||
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
|
||||
try:
|
||||
reposted_metadata = self.coin.transaction(
|
||||
raw_reposted_claim_tx
|
||||
).outputs[reposted_claim.position].metadata
|
||||
except:
|
||||
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
|
||||
claim.reposted_claim_hash.hex(), claim_hash.hex())
|
||||
return
|
||||
if reposted_metadata:
|
||||
if reposted_metadata.is_stream:
|
||||
meta = reposted_metadata.stream
|
||||
elif reposted_metadata.is_channel:
|
||||
meta = reposted_metadata.channel
|
||||
elif reposted_metadata.is_collection:
|
||||
meta = reposted_metadata.collection
|
||||
elif reposted_metadata.is_repost:
|
||||
meta = reposted_metadata.repost
|
||||
else:
|
||||
return
|
||||
reposted_tags = [tag for tag in meta.tags]
|
||||
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
||||
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
|
||||
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
|
||||
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
|
||||
if reposted_has_source else 0
|
||||
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
|
||||
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
|
||||
reposted_fee_amount = 0
|
||||
else:
|
||||
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
|
||||
if reposted_fee_amount >= 9223372036854775807:
|
||||
return
|
||||
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
|
||||
reposted_duration = None
|
||||
if reposted_metadata.is_stream and \
|
||||
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
|
||||
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
|
||||
if metadata.is_stream:
|
||||
meta = metadata.stream
|
||||
elif metadata.is_channel:
|
||||
meta = metadata.channel
|
||||
elif metadata.is_collection:
|
||||
meta = metadata.collection
|
||||
elif metadata.is_repost:
|
||||
meta = metadata.repost
|
||||
else:
|
||||
return
|
||||
claim_tags = [tag for tag in meta.tags]
|
||||
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
||||
|
||||
tags = list(set(claim_tags).union(set(reposted_tags)))
|
||||
languages = list(set(claim_languages).union(set(reposted_languages)))
|
||||
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
|
||||
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
|
||||
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
|
||||
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
|
||||
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
|
||||
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
|
||||
value = {
|
||||
'claim_id': claim_hash.hex(),
|
||||
'claim_name': claim.name,
|
||||
'normalized_name': claim.normalized_name,
|
||||
'tx_id': claim.tx_hash[::-1].hex(),
|
||||
'tx_num': claim.tx_num,
|
||||
'tx_nout': claim.position,
|
||||
'amount': claim.amount,
|
||||
'timestamp': self.estimate_timestamp(claim.height),
|
||||
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
|
||||
'height': claim.height,
|
||||
'creation_height': claim.creation_height,
|
||||
'activation_height': claim.activation_height,
|
||||
'expiration_height': claim.expiration_height,
|
||||
'effective_amount': claim.effective_amount,
|
||||
'support_amount': claim.support_amount,
|
||||
'is_controlling': bool(claim.is_controlling),
|
||||
'last_take_over_height': claim.last_takeover_height,
|
||||
'short_url': claim.short_url,
|
||||
'canonical_url': claim.canonical_url,
|
||||
'title': None if not metadata.is_stream else metadata.stream.title,
|
||||
'author': None if not metadata.is_stream else metadata.stream.author,
|
||||
'description': None if not metadata.is_stream else metadata.stream.description,
|
||||
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
||||
'has_source': reposted_has_source if metadata.is_repost else (
|
||||
False if not metadata.is_stream else metadata.stream.has_source),
|
||||
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
|
||||
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
|
||||
if metadata.is_stream and metadata.stream.has_source
|
||||
else reposted_stream_type if metadata.is_repost else 0,
|
||||
'media_type': metadata.stream.source.media_type
|
||||
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
|
||||
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
|
||||
'fee_currency': metadata.stream.fee.currency
|
||||
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
|
||||
'repost_count': self.get_reposted_count(claim_hash),
|
||||
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
|
||||
'reposted_claim_type': reposted_claim_type,
|
||||
'reposted_has_source': reposted_has_source,
|
||||
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
|
||||
'public_key_id': None if not metadata.is_channel else
|
||||
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
|
||||
'signature': (metadata.signature or b'').hex() or None,
|
||||
# 'signature_digest': metadata.signature,
|
||||
'is_signature_valid': bool(claim.signature_valid),
|
||||
'tags': tags,
|
||||
'languages': languages,
|
||||
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
|
||||
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
|
||||
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
|
||||
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
|
||||
'reposted_tx_position': claim.reposted_tx_position,
|
||||
'reposted_height': claim.reposted_height,
|
||||
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
|
||||
'channel_tx_position': claim.channel_tx_position,
|
||||
'channel_height': claim.channel_height,
|
||||
}
|
||||
|
||||
if metadata.is_repost and reposted_duration is not None:
|
||||
value['duration'] = reposted_duration
|
||||
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
||||
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
|
||||
if metadata.is_stream:
|
||||
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
|
||||
elif metadata.is_repost or metadata.is_collection:
|
||||
value['release_time'] = value['creation_timestamp']
|
||||
return value
|
||||
|
||||
async def all_claims_producer(self, batch_size=500_000):
|
||||
batch = []
|
||||
if self._cache_all_claim_txos:
|
||||
claim_iterator = self.claim_to_txo.items()
|
||||
else:
|
||||
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
|
||||
|
||||
for claim_hash, claim_txo in claim_iterator:
|
||||
# TODO: fix the couple of claim txos that dont have controlling names
|
||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||
continue
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if claim:
|
||||
batch.append(claim)
|
||||
if len(batch) == batch_size:
|
||||
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
|
||||
for claim in batch:
|
||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if meta:
|
||||
yield meta
|
||||
batch.clear()
|
||||
batch.sort(key=lambda x: x.tx_hash)
|
||||
for claim in batch:
|
||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if meta:
|
||||
yield meta
|
||||
batch.clear()
|
||||
|
||||
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
|
||||
claim_txo = self.get_cached_claim_txo(claim_hash)
|
||||
if not claim_txo:
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
return
|
||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
return
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if not claim:
|
||||
self.logger.warning("wat")
|
||||
return
|
||||
return self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
|
||||
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||
batch = []
|
||||
results = []
|
||||
|
||||
for claim_hash in claim_hashes:
|
||||
claim_txo = self.get_cached_claim_txo(claim_hash)
|
||||
if not claim_txo:
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if claim:
|
||||
batch.append(claim)
|
||||
|
||||
batch.sort(key=lambda x: x.tx_hash)
|
||||
|
||||
for claim in batch:
|
||||
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if _meta:
|
||||
results.append(_meta)
|
||||
return results
|
||||
|
||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||
activated = defaultdict(list)
|
||||
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):
|
||||
|
@ -761,20 +521,20 @@ class HubDB:
|
|||
ts = time.perf_counter() - start
|
||||
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
|
||||
|
||||
async def _read_headers(self):
|
||||
# if self.headers is not None:
|
||||
# return
|
||||
|
||||
def get_headers():
|
||||
return [
|
||||
header for header in self.prefix_db.header.iterate(
|
||||
start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False
|
||||
)
|
||||
]
|
||||
|
||||
headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
|
||||
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
||||
self.headers = headers
|
||||
# async def _read_headers(self):
|
||||
# # if self.headers is not None:
|
||||
# # return
|
||||
#
|
||||
# def get_headers():
|
||||
# return [
|
||||
# header for header in self.prefix_db.header.iterate(
|
||||
# start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False
|
||||
# )
|
||||
# ]
|
||||
#
|
||||
# headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
|
||||
# assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
||||
# self.headers = headers
|
||||
|
||||
async def _read_block_hashes(self):
|
||||
def get_block_hashes():
|
||||
|
@ -785,7 +545,7 @@ class HubDB:
|
|||
]
|
||||
|
||||
block_hashes = await asyncio.get_event_loop().run_in_executor(self._executor, get_block_hashes)
|
||||
assert len(block_hashes) == len(self.headers)
|
||||
# assert len(block_hashes) == len(self.headers)
|
||||
self.block_hashes = block_hashes
|
||||
|
||||
async def _read_tx_hashes(self):
|
||||
|
@ -803,11 +563,6 @@ class HubDB:
|
|||
ts = time.perf_counter() - start
|
||||
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))
|
||||
|
||||
def estimate_timestamp(self, height: int) -> int:
|
||||
if height < len(self.headers):
|
||||
return struct.unpack('<I', self.headers[height][100:104])[0]
|
||||
return int(160.6855883050695 * height)
|
||||
|
||||
def open_db(self):
|
||||
if self.prefix_db:
|
||||
return
|
||||
|
@ -858,7 +613,6 @@ class HubDB:
|
|||
|
||||
async def initialize_caches(self):
|
||||
await self._read_tx_counts()
|
||||
await self._read_headers()
|
||||
await self._read_block_hashes()
|
||||
if self._cache_all_claim_txos:
|
||||
await self._read_claim_txos()
|
||||
|
@ -871,78 +625,6 @@ class HubDB:
|
|||
self.prefix_db.close()
|
||||
self.prefix_db = None
|
||||
|
||||
def _rebuild_hashX_status_index(self, start_height: int):
|
||||
self.logger.warning("rebuilding the address status index...")
|
||||
prefix_db = self.prefix_db
|
||||
|
||||
def hashX_iterator():
|
||||
last_hashX = None
|
||||
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
|
||||
hashX = k[1:12]
|
||||
if last_hashX is None:
|
||||
last_hashX = hashX
|
||||
if last_hashX != hashX:
|
||||
yield hashX
|
||||
last_hashX = hashX
|
||||
if last_hashX:
|
||||
yield last_hashX
|
||||
|
||||
def hashX_status_from_history(history: bytes) -> bytes:
|
||||
tx_counts = self.tx_counts
|
||||
hist_tx_nums = array.array('I')
|
||||
hist_tx_nums.frombytes(history)
|
||||
hist = ''
|
||||
for tx_num in hist_tx_nums:
|
||||
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
|
||||
return sha256(hist.encode())
|
||||
|
||||
start = time.perf_counter()
|
||||
|
||||
if start_height <= 0:
|
||||
self.logger.info("loading all blockchain addresses, this will take a little while...")
|
||||
hashXs = [hashX for hashX in hashX_iterator()]
|
||||
else:
|
||||
self.logger.info("loading addresses since block %i...", start_height)
|
||||
hashXs = set()
|
||||
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
|
||||
include_key=False):
|
||||
hashXs.update(touched.touched_hashXs)
|
||||
hashXs = list(hashXs)
|
||||
|
||||
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
|
||||
f"now building the status index...")
|
||||
op_cnt = 0
|
||||
hashX_cnt = 0
|
||||
for hashX in hashXs:
|
||||
hashX_cnt += 1
|
||||
key = prefix_db.hashX_status.pack_key(hashX)
|
||||
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
||||
status = hashX_status_from_history(history)
|
||||
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
||||
if existing_status and existing_status == status:
|
||||
continue
|
||||
elif not existing_status:
|
||||
prefix_db.stage_raw_put(key, status)
|
||||
op_cnt += 1
|
||||
else:
|
||||
prefix_db.stage_raw_delete(key, existing_status)
|
||||
prefix_db.stage_raw_put(key, status)
|
||||
op_cnt += 2
|
||||
if op_cnt > 100000:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
op_cnt = 0
|
||||
if op_cnt:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
self._index_address_status = True
|
||||
self.write_db_state()
|
||||
self.prefix_db.unsafe_commit()
|
||||
self.logger.info("finished indexing address statuses")
|
||||
|
||||
def rebuild_hashX_status_index(self, start_height: int):
|
||||
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
|
||||
|
||||
def _get_hashX_status(self, hashX: bytes):
|
||||
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
|
||||
if mempool_status:
|
||||
|
@ -1169,9 +851,9 @@ class HubDB:
|
|||
return {txid: tx_infos.get(txid) for txid in txids} # match ordering of the txs in the request
|
||||
|
||||
async def fs_block_hashes(self, height, count):
|
||||
if height + count > len(self.headers):
|
||||
raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
||||
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
|
||||
if height + count > self.db_height + 1:
|
||||
raise DBError(f'only got {len(self.block_hashes) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
||||
return self.block_hashes[height:height + count]
|
||||
|
||||
def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]:
|
||||
txs = []
|
||||
|
@ -1209,33 +891,6 @@ class HubDB:
|
|||
"""Returns a height from which we should store undo info."""
|
||||
return max_height - self._reorg_limit + 1
|
||||
|
||||
def apply_expiration_extension_fork(self):
|
||||
# TODO: this can't be reorged
|
||||
for k, v in self.prefix_db.claim_expiration.iterate():
|
||||
self.prefix_db.claim_expiration.stage_delete(k, v)
|
||||
self.prefix_db.claim_expiration.stage_put(
|
||||
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
|
||||
k.tx_num, k.position), v
|
||||
)
|
||||
self.prefix_db.unsafe_commit()
|
||||
|
||||
def write_db_state(self):
|
||||
"""Write (UTXO) state to the batch."""
|
||||
last_indexed_address_status = 0
|
||||
if self.db_height > 0:
|
||||
existing = self.prefix_db.db_state.get()
|
||||
last_indexed_address_status = existing.hashX_status_last_indexed_height
|
||||
self.prefix_db.db_state.stage_delete((), existing.expanded)
|
||||
if self._index_address_status:
|
||||
last_indexed_address_status = self.db_height
|
||||
self.prefix_db.db_state.stage_put((), (
|
||||
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
||||
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
|
||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
|
||||
self.es_sync_height, last_indexed_address_status
|
||||
)
|
||||
)
|
||||
|
||||
def read_db_state(self):
|
||||
state = self.prefix_db.db_state.get()
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ import typing
|
|||
from bisect import bisect_right
|
||||
from hub.common import sha256
|
||||
if typing.TYPE_CHECKING:
|
||||
from hub.db.db import HubDB
|
||||
from hub.scribe.db import PrimaryDB
|
||||
|
||||
FROM_VERSION = 7
|
||||
TO_VERSION = 8
|
||||
|
@ -35,7 +35,7 @@ def hashX_history(db: 'HubDB', hashX: bytes):
|
|||
return history, to_delete
|
||||
|
||||
|
||||
def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes:
|
||||
def hashX_status_from_history(db: 'PrimaryDB', history: bytes) -> bytes:
|
||||
tx_counts = db.tx_counts
|
||||
hist_tx_nums = array.array('I')
|
||||
hist_tx_nums.frombytes(history)
|
||||
|
|
266
hub/elastic_sync/db.py
Normal file
266
hub/elastic_sync/db.py
Normal file
|
@ -0,0 +1,266 @@
|
|||
from typing import Optional, Set, Dict, List
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from hub.schema.claim import guess_stream_type
|
||||
from hub.schema.result import Censor
|
||||
from hub.common import hash160, STREAM_TYPES, CLAIM_TYPES, LRUCache
|
||||
from hub.db import SecondaryDB
|
||||
from hub.db.common import ResolveResult
|
||||
|
||||
|
||||
class ElasticSyncDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False):
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||
index_address_status)
|
||||
self.block_timestamp_cache = LRUCache(1024)
|
||||
|
||||
def estimate_timestamp(self, height: int) -> int:
|
||||
if height in self.block_timestamp_cache:
|
||||
return self.block_timestamp_cache[height]
|
||||
header = self.prefix_db.header.get(height, deserialize_value=False)
|
||||
timestamp = int(160.6855883050695 * height) if header else int.from_bytes(header[100:104], byteorder='little')
|
||||
self.block_timestamp_cache[height] = timestamp
|
||||
return timestamp
|
||||
|
||||
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
|
||||
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
|
||||
if not metadata:
|
||||
return
|
||||
metadata = metadata
|
||||
if not metadata.is_stream or not metadata.stream.has_fee:
|
||||
fee_amount = 0
|
||||
else:
|
||||
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
|
||||
if fee_amount >= 9223372036854775807:
|
||||
return
|
||||
reposted_claim_hash = claim.reposted_claim_hash
|
||||
reposted_claim = None
|
||||
reposted_metadata = None
|
||||
if reposted_claim_hash:
|
||||
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
|
||||
if not reposted_claim:
|
||||
return
|
||||
reposted_metadata = self.get_claim_metadata(
|
||||
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
|
||||
)
|
||||
if not reposted_metadata:
|
||||
return
|
||||
reposted_tags = []
|
||||
reposted_languages = []
|
||||
reposted_has_source = False
|
||||
reposted_claim_type = None
|
||||
reposted_stream_type = None
|
||||
reposted_media_type = None
|
||||
reposted_fee_amount = None
|
||||
reposted_fee_currency = None
|
||||
reposted_duration = None
|
||||
if reposted_claim:
|
||||
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
|
||||
try:
|
||||
reposted_metadata = self.coin.transaction(
|
||||
raw_reposted_claim_tx
|
||||
).outputs[reposted_claim.position].metadata
|
||||
except:
|
||||
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
|
||||
claim.reposted_claim_hash.hex(), claim_hash.hex())
|
||||
return
|
||||
if reposted_metadata:
|
||||
if reposted_metadata.is_stream:
|
||||
meta = reposted_metadata.stream
|
||||
elif reposted_metadata.is_channel:
|
||||
meta = reposted_metadata.channel
|
||||
elif reposted_metadata.is_collection:
|
||||
meta = reposted_metadata.collection
|
||||
elif reposted_metadata.is_repost:
|
||||
meta = reposted_metadata.repost
|
||||
else:
|
||||
return
|
||||
reposted_tags = [tag for tag in meta.tags]
|
||||
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
||||
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
|
||||
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
|
||||
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
|
||||
if reposted_has_source else 0
|
||||
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
|
||||
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
|
||||
reposted_fee_amount = 0
|
||||
else:
|
||||
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
|
||||
if reposted_fee_amount >= 9223372036854775807:
|
||||
return
|
||||
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
|
||||
reposted_duration = None
|
||||
if reposted_metadata.is_stream and \
|
||||
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
|
||||
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
|
||||
if metadata.is_stream:
|
||||
meta = metadata.stream
|
||||
elif metadata.is_channel:
|
||||
meta = metadata.channel
|
||||
elif metadata.is_collection:
|
||||
meta = metadata.collection
|
||||
elif metadata.is_repost:
|
||||
meta = metadata.repost
|
||||
else:
|
||||
return
|
||||
claim_tags = [tag for tag in meta.tags]
|
||||
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
|
||||
|
||||
tags = list(set(claim_tags).union(set(reposted_tags)))
|
||||
languages = list(set(claim_languages).union(set(reposted_languages)))
|
||||
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
|
||||
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
|
||||
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
|
||||
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
|
||||
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
|
||||
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
|
||||
value = {
|
||||
'claim_id': claim_hash.hex(),
|
||||
'claim_name': claim.name,
|
||||
'normalized_name': claim.normalized_name,
|
||||
'tx_id': claim.tx_hash[::-1].hex(),
|
||||
'tx_num': claim.tx_num,
|
||||
'tx_nout': claim.position,
|
||||
'amount': claim.amount,
|
||||
'timestamp': self.estimate_timestamp(claim.height),
|
||||
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
|
||||
'height': claim.height,
|
||||
'creation_height': claim.creation_height,
|
||||
'activation_height': claim.activation_height,
|
||||
'expiration_height': claim.expiration_height,
|
||||
'effective_amount': claim.effective_amount,
|
||||
'support_amount': claim.support_amount,
|
||||
'is_controlling': bool(claim.is_controlling),
|
||||
'last_take_over_height': claim.last_takeover_height,
|
||||
'short_url': claim.short_url,
|
||||
'canonical_url': claim.canonical_url,
|
||||
'title': None if not metadata.is_stream else metadata.stream.title,
|
||||
'author': None if not metadata.is_stream else metadata.stream.author,
|
||||
'description': None if not metadata.is_stream else metadata.stream.description,
|
||||
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
||||
'has_source': reposted_has_source if metadata.is_repost else (
|
||||
False if not metadata.is_stream else metadata.stream.has_source),
|
||||
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
|
||||
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
|
||||
if metadata.is_stream and metadata.stream.has_source
|
||||
else reposted_stream_type if metadata.is_repost else 0,
|
||||
'media_type': metadata.stream.source.media_type
|
||||
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
|
||||
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
|
||||
'fee_currency': metadata.stream.fee.currency
|
||||
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
|
||||
'repost_count': self.get_reposted_count(claim_hash),
|
||||
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
|
||||
'reposted_claim_type': reposted_claim_type,
|
||||
'reposted_has_source': reposted_has_source,
|
||||
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
|
||||
'public_key_id': None if not metadata.is_channel else
|
||||
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
|
||||
'signature': (metadata.signature or b'').hex() or None,
|
||||
# 'signature_digest': metadata.signature,
|
||||
'is_signature_valid': bool(claim.signature_valid),
|
||||
'tags': tags,
|
||||
'languages': languages,
|
||||
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
|
||||
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
|
||||
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
|
||||
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
|
||||
'reposted_tx_position': claim.reposted_tx_position,
|
||||
'reposted_height': claim.reposted_height,
|
||||
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
|
||||
'channel_tx_position': claim.channel_tx_position,
|
||||
'channel_height': claim.channel_height,
|
||||
}
|
||||
|
||||
if metadata.is_repost and reposted_duration is not None:
|
||||
value['duration'] = reposted_duration
|
||||
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
||||
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
|
||||
if metadata.is_stream:
|
||||
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
|
||||
elif metadata.is_repost or metadata.is_collection:
|
||||
value['release_time'] = value['creation_timestamp']
|
||||
return value
|
||||
|
||||
async def all_claims_producer(self, batch_size=500_000):
|
||||
batch = []
|
||||
if self._cache_all_claim_txos:
|
||||
claim_iterator = self.claim_to_txo.items()
|
||||
else:
|
||||
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
|
||||
|
||||
for claim_hash, claim_txo in claim_iterator:
|
||||
# TODO: fix the couple of claim txos that dont have controlling names
|
||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||
continue
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if claim:
|
||||
batch.append(claim)
|
||||
if len(batch) == batch_size:
|
||||
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
|
||||
for claim in batch:
|
||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if meta:
|
||||
yield meta
|
||||
batch.clear()
|
||||
batch.sort(key=lambda x: x.tx_hash)
|
||||
for claim in batch:
|
||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if meta:
|
||||
yield meta
|
||||
batch.clear()
|
||||
|
||||
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
|
||||
claim_txo = self.get_cached_claim_txo(claim_hash)
|
||||
if not claim_txo:
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
return
|
||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
return
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if not claim:
|
||||
self.logger.warning("wat")
|
||||
return
|
||||
return self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
|
||||
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||
batch = []
|
||||
results = []
|
||||
|
||||
for claim_hash in claim_hashes:
|
||||
claim_txo = self.get_cached_claim_txo(claim_hash)
|
||||
if not claim_txo:
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||
continue
|
||||
|
||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||
claim = self._prepare_resolve_result(
|
||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||
)
|
||||
if claim:
|
||||
batch.append(claim)
|
||||
|
||||
batch.sort(key=lambda x: x.tx_hash)
|
||||
|
||||
for claim in batch:
|
||||
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||
if _meta:
|
||||
results.append(_meta)
|
||||
return results
|
|
@ -12,6 +12,7 @@ from hub.db.revertable import RevertableOp
|
|||
from hub.db.common import TrendingNotification, DB_PREFIXES
|
||||
from hub.notifier_protocol import ElasticNotifierProtocol
|
||||
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
||||
from hub.elastic_sync.db import ElasticSyncDB
|
||||
if typing.TYPE_CHECKING:
|
||||
from hub.elastic_sync.env import ElasticEnv
|
||||
|
||||
|
@ -44,6 +45,15 @@ class ElasticSyncService(BlockchainReaderService):
|
|||
self._listeners: typing.List[ElasticNotifierProtocol] = []
|
||||
self._force_reindex = False
|
||||
|
||||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = ElasticSyncDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
)
|
||||
|
||||
async def run_es_notifier(self, synchronized: asyncio.Event):
|
||||
server = await asyncio.get_event_loop().create_server(
|
||||
lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port
|
||||
|
@ -233,6 +243,7 @@ class ElasticSyncService(BlockchainReaderService):
|
|||
self._advanced = True
|
||||
|
||||
def unwind(self):
|
||||
self.db.block_timestamp_cache.clear()
|
||||
reverted_block_hash = self.db.block_hashes[-1]
|
||||
super().unwind()
|
||||
packed = self.db.prefix_db.undo.get(len(self.db.tx_counts), reverted_block_hash)
|
||||
|
|
33
hub/herald/db.py
Normal file
33
hub/herald/db.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
import asyncio
|
||||
from typing import List
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from hub.db import SecondaryDB
|
||||
|
||||
|
||||
class HeraldDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False):
|
||||
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
|
||||
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
|
||||
index_address_status)
|
||||
# self.headers = None
|
||||
|
||||
# async def _read_headers(self):
|
||||
# def get_headers():
|
||||
# return [
|
||||
# header for header in self.prefix_db.header.iterate(
|
||||
# start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False,
|
||||
# deserialize_value=False
|
||||
# )
|
||||
# ]
|
||||
#
|
||||
# headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
|
||||
# assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
||||
# self.headers = headers
|
||||
|
||||
# async def initialize_caches(self):
|
||||
# await super().initialize_caches()
|
||||
# await self._read_headers()
|
|
@ -13,7 +13,7 @@ from hub.scribe.transaction.deserializer import Deserializer
|
|||
|
||||
if typing.TYPE_CHECKING:
|
||||
from hub.herald.session import SessionManager
|
||||
from hub.db import HubDB
|
||||
from hub.db import SecondaryDB
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
|
@ -46,7 +46,7 @@ mempool_touched_address_count_metric = Gauge(
|
|||
|
||||
|
||||
class HubMemPool:
|
||||
def __init__(self, coin, db: 'HubDB', refresh_secs=1.0):
|
||||
def __init__(self, coin, db: 'SecondaryDB', refresh_secs=1.0):
|
||||
self.coin = coin
|
||||
self._db = db
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
|
|
@ -10,7 +10,7 @@ from hub.schema.result import Censor, Outputs
|
|||
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
|
||||
from hub.db.common import ResolveResult
|
||||
if TYPE_CHECKING:
|
||||
from hub.db import HubDB
|
||||
from hub.db import SecondaryDB
|
||||
|
||||
|
||||
class ChannelResolution(str):
|
||||
|
@ -28,7 +28,7 @@ class StreamResolution(str):
|
|||
class SearchIndex:
|
||||
VERSION = 1
|
||||
|
||||
def __init__(self, hub_db: 'HubDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
||||
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
|
||||
elastic_port=9200):
|
||||
self.hub_db = hub_db
|
||||
self.search_timeout = search_timeout
|
||||
|
|
|
@ -5,6 +5,7 @@ from hub.scribe.daemon import LBCDaemon
|
|||
from hub.herald.session import SessionManager
|
||||
from hub.herald.mempool import HubMemPool
|
||||
from hub.herald.udp import StatusServer
|
||||
from hub.herald.db import HeraldDB
|
||||
from hub.service import BlockchainReaderService
|
||||
from hub.notifier_protocol import ElasticNotifierClientProtocol
|
||||
if typing.TYPE_CHECKING:
|
||||
|
@ -35,6 +36,15 @@ class HubServerService(BlockchainReaderService):
|
|||
self._es_height = None
|
||||
self._es_block_hash = None
|
||||
|
||||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = HeraldDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
)
|
||||
|
||||
def clear_caches(self):
|
||||
self.session_manager.clear_caches()
|
||||
# self.clear_search_cache()
|
||||
|
@ -54,7 +64,6 @@ class HubServerService(BlockchainReaderService):
|
|||
self.session_manager.hashX_history_cache.clear()
|
||||
prev_count = self.db.tx_counts.pop()
|
||||
tx_count = self.db.tx_counts[-1]
|
||||
self.db.headers.pop()
|
||||
self.db.block_hashes.pop()
|
||||
current_count = prev_count
|
||||
for _ in range(prev_count - tx_count):
|
||||
|
|
|
@ -28,7 +28,7 @@ from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2,
|
|||
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
|
||||
from hub.herald.framer import NewlineFramer
|
||||
if typing.TYPE_CHECKING:
|
||||
from hub.db import HubDB
|
||||
from hub.db import SecondaryDB
|
||||
from hub.herald.env import ServerEnv
|
||||
from hub.scribe.daemon import LBCDaemon
|
||||
from hub.herald.mempool import HubMemPool
|
||||
|
@ -179,7 +179,7 @@ class SessionManager:
|
|||
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||
)
|
||||
|
||||
def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool',
|
||||
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
|
||||
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
|
||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||
env.max_send = max(350000, env.max_send)
|
||||
|
@ -1750,7 +1750,7 @@ class LBRYElectrumX(asyncio.Protocol):
|
|||
if not verbose:
|
||||
return raw_tx.hex()
|
||||
return self.coin.transaction(raw_tx).as_dict(self.coin)
|
||||
return RPCError("No such mempool or blockchain transaction.")
|
||||
return RPCError(BAD_REQUEST, "No such mempool or blockchain transaction.")
|
||||
|
||||
def _get_merkle_branch(self, tx_hashes, tx_pos):
|
||||
"""Return a merkle branch to a transaction.
|
||||
|
|
117
hub/scribe/db.py
Normal file
117
hub/scribe/db.py
Normal file
|
@ -0,0 +1,117 @@
|
|||
import asyncio
|
||||
import array
|
||||
import time
|
||||
from typing import List
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from bisect import bisect_right
|
||||
from hub.common import sha256
|
||||
from hub.db import SecondaryDB
|
||||
|
||||
|
||||
class PrimaryDB(SecondaryDB):
|
||||
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
|
||||
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
|
||||
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
|
||||
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
|
||||
index_address_status=False):
|
||||
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
|
||||
blocking_channel_ids, filtering_channel_ids, executor, index_address_status)
|
||||
|
||||
def _rebuild_hashX_status_index(self, start_height: int):
|
||||
self.logger.warning("rebuilding the address status index...")
|
||||
prefix_db = self.prefix_db
|
||||
|
||||
def hashX_iterator():
|
||||
last_hashX = None
|
||||
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
|
||||
hashX = k[1:12]
|
||||
if last_hashX is None:
|
||||
last_hashX = hashX
|
||||
if last_hashX != hashX:
|
||||
yield hashX
|
||||
last_hashX = hashX
|
||||
if last_hashX:
|
||||
yield last_hashX
|
||||
|
||||
def hashX_status_from_history(history: bytes) -> bytes:
|
||||
tx_counts = self.tx_counts
|
||||
hist_tx_nums = array.array('I')
|
||||
hist_tx_nums.frombytes(history)
|
||||
hist = ''
|
||||
for tx_num in hist_tx_nums:
|
||||
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
|
||||
return sha256(hist.encode())
|
||||
|
||||
start = time.perf_counter()
|
||||
|
||||
if start_height <= 0:
|
||||
self.logger.info("loading all blockchain addresses, this will take a little while...")
|
||||
hashXs = [hashX for hashX in hashX_iterator()]
|
||||
else:
|
||||
self.logger.info("loading addresses since block %i...", start_height)
|
||||
hashXs = set()
|
||||
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
|
||||
include_key=False):
|
||||
hashXs.update(touched.touched_hashXs)
|
||||
hashXs = list(hashXs)
|
||||
|
||||
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
|
||||
f"now building the status index...")
|
||||
op_cnt = 0
|
||||
hashX_cnt = 0
|
||||
for hashX in hashXs:
|
||||
hashX_cnt += 1
|
||||
key = prefix_db.hashX_status.pack_key(hashX)
|
||||
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
|
||||
status = hashX_status_from_history(history)
|
||||
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
|
||||
if existing_status and existing_status == status:
|
||||
continue
|
||||
elif not existing_status:
|
||||
prefix_db.stage_raw_put(key, status)
|
||||
op_cnt += 1
|
||||
else:
|
||||
prefix_db.stage_raw_delete(key, existing_status)
|
||||
prefix_db.stage_raw_put(key, status)
|
||||
op_cnt += 2
|
||||
if op_cnt > 100000:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
op_cnt = 0
|
||||
if op_cnt:
|
||||
prefix_db.unsafe_commit()
|
||||
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
|
||||
self._index_address_status = True
|
||||
self.write_db_state()
|
||||
self.prefix_db.unsafe_commit()
|
||||
self.logger.info("finished indexing address statuses")
|
||||
|
||||
def rebuild_hashX_status_index(self, start_height: int):
|
||||
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
|
||||
|
||||
def apply_expiration_extension_fork(self):
|
||||
# TODO: this can't be reorged
|
||||
for k, v in self.prefix_db.claim_expiration.iterate():
|
||||
self.prefix_db.claim_expiration.stage_delete(k, v)
|
||||
self.prefix_db.claim_expiration.stage_put(
|
||||
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
|
||||
k.tx_num, k.position), v
|
||||
)
|
||||
self.prefix_db.unsafe_commit()
|
||||
|
||||
def write_db_state(self):
|
||||
"""Write (UTXO) state to the batch."""
|
||||
last_indexed_address_status = 0
|
||||
if self.db_height > 0:
|
||||
existing = self.prefix_db.db_state.get()
|
||||
last_indexed_address_status = existing.hashX_status_last_indexed_height
|
||||
self.prefix_db.db_state.stage_delete((), existing.expanded)
|
||||
if self._index_address_status:
|
||||
last_indexed_address_status = self.db_height
|
||||
self.prefix_db.db_state.stage_put((), (
|
||||
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
|
||||
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
|
||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
|
||||
self.es_sync_height, last_indexed_address_status
|
||||
)
|
||||
)
|
|
@ -5,7 +5,7 @@ from collections import defaultdict
|
|||
from hub.scribe.transaction.deserializer import Deserializer
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from hub.db import HubDB
|
||||
from hub.scribe.db import PrimaryDB
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
|
@ -27,7 +27,7 @@ class MemPoolTxSummary:
|
|||
|
||||
|
||||
class MemPool:
|
||||
def __init__(self, coin, db: 'HubDB'):
|
||||
def __init__(self, coin, db: 'PrimaryDB'):
|
||||
self.coin = coin
|
||||
self._db = db
|
||||
self.txs = {}
|
||||
|
|
|
@ -12,6 +12,7 @@ from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
|||
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
|
||||
from hub.error.base import ChainError
|
||||
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
|
||||
from hub.scribe.db import PrimaryDB
|
||||
from hub.scribe.daemon import LBCDaemon
|
||||
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
|
||||
from hub.scribe.prefetcher import Prefetcher
|
||||
|
@ -121,6 +122,15 @@ class BlockchainProcessorService(BlockchainService):
|
|||
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
|
||||
self.history_tx_info_cache = LRUCache(2 ** 16)
|
||||
|
||||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = PrimaryDB(
|
||||
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
|
||||
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
|
||||
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
|
||||
executor=self._executor, index_address_status=env.index_address_status
|
||||
)
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
# Run in a thread to prevent blocking. Shielded so that
|
||||
# cancellations from shutdown don't lose work - when the task
|
||||
|
@ -1383,7 +1393,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
)
|
||||
|
||||
self.height = height
|
||||
self.db.headers.append(block.header)
|
||||
self.db.block_hashes.append(self.env.coin.header_hash(block.header))
|
||||
self.tip = self.coin.header_hash(block.header)
|
||||
|
||||
|
@ -1549,7 +1558,6 @@ class BlockchainProcessorService(BlockchainService):
|
|||
# Check and update self.tip
|
||||
|
||||
self.db.tx_counts.pop()
|
||||
self.db.headers.pop()
|
||||
reverted_block_hash = self.db.block_hashes.pop()
|
||||
self.tip = self.db.block_hashes[-1]
|
||||
if self.env.cache_all_tx_hashes:
|
||||
|
|
|
@ -7,7 +7,7 @@ from prometheus_client import Gauge, Histogram
|
|||
|
||||
from hub import __version__, PROMETHEUS_NAMESPACE
|
||||
from hub.env import Env
|
||||
from hub.db import HubDB
|
||||
from hub.db import SecondaryDB
|
||||
from hub.db.prefixes import DBState
|
||||
from hub.common import HISTOGRAM_BUCKETS
|
||||
from hub.metrics import PrometheusServer
|
||||
|
@ -17,6 +17,7 @@ class BlockchainService:
|
|||
"""
|
||||
Base class for blockchain readers as well as the block processor
|
||||
"""
|
||||
|
||||
def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'):
|
||||
self.env = env
|
||||
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
||||
|
@ -27,13 +28,19 @@ class BlockchainService:
|
|||
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
|
||||
self.lock = asyncio.Lock()
|
||||
self.last_state: typing.Optional[DBState] = None
|
||||
self.db = HubDB(
|
||||
env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
||||
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
|
||||
self.secondary_name = secondary_name
|
||||
self._stopping = False
|
||||
self.db = None
|
||||
self.open_db()
|
||||
|
||||
def open_db(self):
|
||||
env = self.env
|
||||
self.db = SecondaryDB(
|
||||
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
|
||||
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
|
||||
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
|
||||
index_address_status=env.index_address_status
|
||||
)
|
||||
self._stopping = False
|
||||
|
||||
def start_cancellable(self, run, *args):
|
||||
_flag = asyncio.Event()
|
||||
|
@ -167,7 +174,7 @@ class BlockchainReaderService(BlockchainService):
|
|||
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
|
||||
|
||||
header = self.db.prefix_db.header.get(height, deserialize_value=False)
|
||||
self.db.headers.append(header)
|
||||
# self.db.headers.append(header)
|
||||
self.db.block_hashes.append(self.env.coin.header_hash(header))
|
||||
|
||||
def unwind(self):
|
||||
|
@ -176,7 +183,7 @@ class BlockchainReaderService(BlockchainService):
|
|||
"""
|
||||
prev_count = self.db.tx_counts.pop()
|
||||
tx_count = self.db.tx_counts[-1]
|
||||
self.db.headers.pop()
|
||||
# self.db.headers.pop()
|
||||
self.db.block_hashes.pop()
|
||||
if self.db._cache_all_tx_hashes:
|
||||
for _ in range(prev_count - tx_count):
|
||||
|
@ -202,7 +209,8 @@ class BlockchainReaderService(BlockchainService):
|
|||
rewound = False
|
||||
if self.last_state:
|
||||
while True:
|
||||
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
|
||||
if self.db.block_hashes[-1] == self.env.coin.header_hash(
|
||||
self.db.prefix_db.header.get(last_height, deserialize_value=False)):
|
||||
self.log.debug("connects to block %i", last_height)
|
||||
break
|
||||
else:
|
||||
|
|
Loading…
Reference in a new issue