Split up the db classes used by the different hub services #43

Merged
jackrobison merged 3 commits from cleanup-db into master 2022-05-27 18:54:21 +02:00
15 changed files with 500 additions and 399 deletions

View file

@ -347,14 +347,9 @@ INVALID_ARGS = -32602
class CodeMessageError(Exception): class CodeMessageError(Exception):
def __init__(self, code: int, message: str):
@property self.code = code
def code(self): self.message = message
return self.args[0]
@property
def message(self):
return self.args[1]
def __eq__(self, other): def __eq__(self, other):
return (isinstance(other, self.__class__) and return (isinstance(other, self.__class__) and
@ -382,7 +377,6 @@ class RPCError(CodeMessageError):
pass pass
class DaemonError(Exception): class DaemonError(Exception):
"""Raised when the daemon returns an error in its results.""" """Raised when the daemon returns an error in its results."""

View file

@ -1 +1 @@
from .db import HubDB from .db import SecondaryDB

View file

@ -33,19 +33,18 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db" NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
class HubDB: class SecondaryDB:
DB_VERSIONS = [7, 8, 9] DB_VERSIONS = [7, 8, 9]
def __init__(self, coin, db_dir: str, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False): index_address_status=False):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.coin = coin self.coin = coin
self._executor = executor self._executor = executor
self._db_dir = db_dir self._db_dir = db_dir
self._reorg_limit = reorg_limit self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes self._cache_all_tx_hashes = cache_all_tx_hashes
@ -81,7 +80,7 @@ class HubDB:
} }
self.tx_counts = None self.tx_counts = None
self.headers = None # self.headers = None
self.block_hashes = None self.block_hashes = None
self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE) self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE)
self.last_flush = time.time() self.last_flush = time.time()
@ -470,245 +469,6 @@ class HubDB:
self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex()) self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex())
return return
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
if not metadata:
return
metadata = metadata
if not metadata.is_stream or not metadata.stream.has_fee:
fee_amount = 0
else:
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
if fee_amount >= 9223372036854775807:
return
reposted_claim_hash = claim.reposted_claim_hash
reposted_claim = None
reposted_metadata = None
if reposted_claim_hash:
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
)
if not reposted_metadata:
return
reposted_tags = []
reposted_languages = []
reposted_has_source = False
reposted_claim_type = None
reposted_stream_type = None
reposted_media_type = None
reposted_fee_amount = None
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
try:
reposted_metadata = self.coin.transaction(
raw_reposted_claim_tx
).outputs[reposted_claim.position].metadata
except:
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
claim.reposted_claim_hash.hex(), claim_hash.hex())
return
if reposted_metadata:
if reposted_metadata.is_stream:
meta = reposted_metadata.stream
elif reposted_metadata.is_channel:
meta = reposted_metadata.channel
elif reposted_metadata.is_collection:
meta = reposted_metadata.collection
elif reposted_metadata.is_repost:
meta = reposted_metadata.repost
else:
return
reposted_tags = [tag for tag in meta.tags]
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
if reposted_has_source else 0
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
reposted_fee_amount = 0
else:
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
if reposted_fee_amount >= 9223372036854775807:
return
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
reposted_duration = None
if reposted_metadata.is_stream and \
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
if metadata.is_stream:
meta = metadata.stream
elif metadata.is_channel:
meta = metadata.channel
elif metadata.is_collection:
meta = metadata.collection
elif metadata.is_repost:
meta = metadata.repost
else:
return
claim_tags = [tag for tag in meta.tags]
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
tags = list(set(claim_tags).union(set(reposted_tags)))
languages = list(set(claim_languages).union(set(reposted_languages)))
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
value = {
'claim_id': claim_hash.hex(),
'claim_name': claim.name,
'normalized_name': claim.normalized_name,
'tx_id': claim.tx_hash[::-1].hex(),
'tx_num': claim.tx_num,
'tx_nout': claim.position,
'amount': claim.amount,
'timestamp': self.estimate_timestamp(claim.height),
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
'height': claim.height,
'creation_height': claim.creation_height,
'activation_height': claim.activation_height,
'expiration_height': claim.expiration_height,
'effective_amount': claim.effective_amount,
'support_amount': claim.support_amount,
'is_controlling': bool(claim.is_controlling),
'last_take_over_height': claim.last_takeover_height,
'short_url': claim.short_url,
'canonical_url': claim.canonical_url,
'title': None if not metadata.is_stream else metadata.stream.title,
'author': None if not metadata.is_stream else metadata.stream.author,
'description': None if not metadata.is_stream else metadata.stream.description,
'claim_type': CLAIM_TYPES[metadata.claim_type],
'has_source': reposted_has_source if metadata.is_repost else (
False if not metadata.is_stream else metadata.stream.has_source),
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
if metadata.is_stream and metadata.stream.has_source
else reposted_stream_type if metadata.is_repost else 0,
'media_type': metadata.stream.source.media_type
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
'fee_currency': metadata.stream.fee.currency
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
'repost_count': self.get_reposted_count(claim_hash),
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
'reposted_claim_type': reposted_claim_type,
'reposted_has_source': reposted_has_source,
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
'public_key_id': None if not metadata.is_channel else
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
'signature': (metadata.signature or b'').hex() or None,
# 'signature_digest': metadata.signature,
'is_signature_valid': bool(claim.signature_valid),
'tags': tags,
'languages': languages,
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
'reposted_tx_position': claim.reposted_tx_position,
'reposted_height': claim.reposted_height,
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
'channel_tx_position': claim.channel_tx_position,
'channel_height': claim.channel_height,
}
if metadata.is_repost and reposted_duration is not None:
value['duration'] = reposted_duration
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
if metadata.is_stream:
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
elif metadata.is_repost or metadata.is_collection:
value['release_time'] = value['creation_timestamp']
return value
async def all_claims_producer(self, batch_size=500_000):
batch = []
if self._cache_all_claim_txos:
claim_iterator = self.claim_to_txo.items()
else:
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
for claim_hash, claim_txo in claim_iterator:
# TODO: fix the couple of claim txos that dont have controlling names
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
if len(batch) == batch_size:
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if not claim:
self.logger.warning("wat")
return
return self._prepare_claim_metadata(claim.claim_hash, claim)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
for claim_hash in claim_hashes:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list) activated = defaultdict(list)
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)): for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):
@ -761,20 +521,20 @@ class HubDB:
ts = time.perf_counter() - start ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4)) self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
async def _read_headers(self): # async def _read_headers(self):
# if self.headers is not None: # # if self.headers is not None:
# return # # return
#
def get_headers(): # def get_headers():
return [ # return [
header for header in self.prefix_db.header.iterate( # header for header in self.prefix_db.header.iterate(
start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False # start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False
) # )
] # ]
#
headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers) # headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}" # assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers # self.headers = headers
async def _read_block_hashes(self): async def _read_block_hashes(self):
def get_block_hashes(): def get_block_hashes():
@ -785,7 +545,7 @@ class HubDB:
] ]
block_hashes = await asyncio.get_event_loop().run_in_executor(self._executor, get_block_hashes) block_hashes = await asyncio.get_event_loop().run_in_executor(self._executor, get_block_hashes)
assert len(block_hashes) == len(self.headers) # assert len(block_hashes) == len(self.headers)
self.block_hashes = block_hashes self.block_hashes = block_hashes
async def _read_tx_hashes(self): async def _read_tx_hashes(self):
@ -803,11 +563,6 @@ class HubDB:
ts = time.perf_counter() - start ts = time.perf_counter() - start
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4)) self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))
def estimate_timestamp(self, height: int) -> int:
if height < len(self.headers):
return struct.unpack('<I', self.headers[height][100:104])[0]
return int(160.6855883050695 * height)
def open_db(self): def open_db(self):
if self.prefix_db: if self.prefix_db:
return return
@ -858,7 +613,6 @@ class HubDB:
async def initialize_caches(self): async def initialize_caches(self):
await self._read_tx_counts() await self._read_tx_counts()
await self._read_headers()
await self._read_block_hashes() await self._read_block_hashes()
if self._cache_all_claim_txos: if self._cache_all_claim_txos:
await self._read_claim_txos() await self._read_claim_txos()
@ -871,78 +625,6 @@ class HubDB:
self.prefix_db.close() self.prefix_db.close()
self.prefix_db = None self.prefix_db = None
def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...")
prefix_db = self.prefix_db
def hashX_iterator():
last_hashX = None
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
start = time.perf_counter()
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
include_key=False):
hashXs.update(touched.touched_hashXs)
hashXs = list(hashXs)
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
self._index_address_status = True
self.write_db_state()
self.prefix_db.unsafe_commit()
self.logger.info("finished indexing address statuses")
def rebuild_hashX_status_index(self, start_height: int):
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
def _get_hashX_status(self, hashX: bytes): def _get_hashX_status(self, hashX: bytes):
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False) mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
if mempool_status: if mempool_status:
@ -1169,9 +851,9 @@ class HubDB:
return {txid: tx_infos.get(txid) for txid in txids} # match ordering of the txs in the request return {txid: tx_infos.get(txid) for txid in txids} # match ordering of the txs in the request
async def fs_block_hashes(self, height, count): async def fs_block_hashes(self, height, count):
if height + count > len(self.headers): if height + count > self.db_height + 1:
raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}') raise DBError(f'only got {len(self.block_hashes) - height:,d} headers starting at {height:,d}, not {count:,d}')
return [self.coin.header_hash(header) for header in self.headers[height:height + count]] return self.block_hashes[height:height + count]
def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]: def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]:
txs = [] txs = []
@ -1209,33 +891,6 @@ class HubDB:
"""Returns a height from which we should store undo info.""" """Returns a height from which we should store undo info."""
return max_height - self._reorg_limit + 1 return max_height - self._reorg_limit + 1
def apply_expiration_extension_fork(self):
# TODO: this can't be reorged
for k, v in self.prefix_db.claim_expiration.iterate():
self.prefix_db.claim_expiration.stage_delete(k, v)
self.prefix_db.claim_expiration.stage_put(
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
k.tx_num, k.position), v
)
self.prefix_db.unsafe_commit()
def write_db_state(self):
"""Write (UTXO) state to the batch."""
last_indexed_address_status = 0
if self.db_height > 0:
existing = self.prefix_db.db_state.get()
last_indexed_address_status = existing.hashX_status_last_indexed_height
self.prefix_db.db_state.stage_delete((), existing.expanded)
if self._index_address_status:
last_indexed_address_status = self.db_height
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height, last_indexed_address_status
)
)
def read_db_state(self): def read_db_state(self):
state = self.prefix_db.db_state.get() state = self.prefix_db.db_state.get()

View file

@ -5,7 +5,7 @@ import typing
from bisect import bisect_right from bisect import bisect_right
from hub.common import sha256 from hub.common import sha256
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from hub.db.db import HubDB from hub.scribe.db import PrimaryDB
FROM_VERSION = 7 FROM_VERSION = 7
TO_VERSION = 8 TO_VERSION = 8
@ -35,7 +35,7 @@ def hashX_history(db: 'HubDB', hashX: bytes):
return history, to_delete return history, to_delete
def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes: def hashX_status_from_history(db: 'PrimaryDB', history: bytes) -> bytes:
tx_counts = db.tx_counts tx_counts = db.tx_counts
hist_tx_nums = array.array('I') hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history) hist_tx_nums.frombytes(history)

266
hub/elastic_sync/db.py Normal file
View file

@ -0,0 +1,266 @@
from typing import Optional, Set, Dict, List
from concurrent.futures.thread import ThreadPoolExecutor
from hub.schema.claim import guess_stream_type
from hub.schema.result import Censor
from hub.common import hash160, STREAM_TYPES, CLAIM_TYPES, LRUCache
from hub.db import SecondaryDB
from hub.db.common import ResolveResult
class ElasticSyncDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status)
self.block_timestamp_cache = LRUCache(1024)
def estimate_timestamp(self, height: int) -> int:
if height in self.block_timestamp_cache:
return self.block_timestamp_cache[height]
header = self.prefix_db.header.get(height, deserialize_value=False)
timestamp = int(160.6855883050695 * height) if header else int.from_bytes(header[100:104], byteorder='little')
self.block_timestamp_cache[height] = timestamp
return timestamp
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
if not metadata:
return
metadata = metadata
if not metadata.is_stream or not metadata.stream.has_fee:
fee_amount = 0
else:
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
if fee_amount >= 9223372036854775807:
return
reposted_claim_hash = claim.reposted_claim_hash
reposted_claim = None
reposted_metadata = None
if reposted_claim_hash:
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
)
if not reposted_metadata:
return
reposted_tags = []
reposted_languages = []
reposted_has_source = False
reposted_claim_type = None
reposted_stream_type = None
reposted_media_type = None
reposted_fee_amount = None
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
try:
reposted_metadata = self.coin.transaction(
raw_reposted_claim_tx
).outputs[reposted_claim.position].metadata
except:
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
claim.reposted_claim_hash.hex(), claim_hash.hex())
return
if reposted_metadata:
if reposted_metadata.is_stream:
meta = reposted_metadata.stream
elif reposted_metadata.is_channel:
meta = reposted_metadata.channel
elif reposted_metadata.is_collection:
meta = reposted_metadata.collection
elif reposted_metadata.is_repost:
meta = reposted_metadata.repost
else:
return
reposted_tags = [tag for tag in meta.tags]
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
if reposted_has_source else 0
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
reposted_fee_amount = 0
else:
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
if reposted_fee_amount >= 9223372036854775807:
return
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
reposted_duration = None
if reposted_metadata.is_stream and \
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
if metadata.is_stream:
meta = metadata.stream
elif metadata.is_channel:
meta = metadata.channel
elif metadata.is_collection:
meta = metadata.collection
elif metadata.is_repost:
meta = metadata.repost
else:
return
claim_tags = [tag for tag in meta.tags]
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
tags = list(set(claim_tags).union(set(reposted_tags)))
languages = list(set(claim_languages).union(set(reposted_languages)))
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
value = {
'claim_id': claim_hash.hex(),
'claim_name': claim.name,
'normalized_name': claim.normalized_name,
'tx_id': claim.tx_hash[::-1].hex(),
'tx_num': claim.tx_num,
'tx_nout': claim.position,
'amount': claim.amount,
'timestamp': self.estimate_timestamp(claim.height),
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
'height': claim.height,
'creation_height': claim.creation_height,
'activation_height': claim.activation_height,
'expiration_height': claim.expiration_height,
'effective_amount': claim.effective_amount,
'support_amount': claim.support_amount,
'is_controlling': bool(claim.is_controlling),
'last_take_over_height': claim.last_takeover_height,
'short_url': claim.short_url,
'canonical_url': claim.canonical_url,
'title': None if not metadata.is_stream else metadata.stream.title,
'author': None if not metadata.is_stream else metadata.stream.author,
'description': None if not metadata.is_stream else metadata.stream.description,
'claim_type': CLAIM_TYPES[metadata.claim_type],
'has_source': reposted_has_source if metadata.is_repost else (
False if not metadata.is_stream else metadata.stream.has_source),
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
if metadata.is_stream and metadata.stream.has_source
else reposted_stream_type if metadata.is_repost else 0,
'media_type': metadata.stream.source.media_type
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
'fee_currency': metadata.stream.fee.currency
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
'repost_count': self.get_reposted_count(claim_hash),
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
'reposted_claim_type': reposted_claim_type,
'reposted_has_source': reposted_has_source,
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
'public_key_id': None if not metadata.is_channel else
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
'signature': (metadata.signature or b'').hex() or None,
# 'signature_digest': metadata.signature,
'is_signature_valid': bool(claim.signature_valid),
'tags': tags,
'languages': languages,
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
'reposted_tx_position': claim.reposted_tx_position,
'reposted_height': claim.reposted_height,
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
'channel_tx_position': claim.channel_tx_position,
'channel_height': claim.channel_height,
}
if metadata.is_repost and reposted_duration is not None:
value['duration'] = reposted_duration
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
if metadata.is_stream:
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
elif metadata.is_repost or metadata.is_collection:
value['release_time'] = value['creation_timestamp']
return value
async def all_claims_producer(self, batch_size=500_000):
batch = []
if self._cache_all_claim_txos:
claim_iterator = self.claim_to_txo.items()
else:
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
for claim_hash, claim_txo in claim_iterator:
# TODO: fix the couple of claim txos that dont have controlling names
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
if len(batch) == batch_size:
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if not claim:
self.logger.warning("wat")
return
return self._prepare_claim_metadata(claim.claim_hash, claim)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
for claim_hash in claim_hashes:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results

View file

@ -12,6 +12,7 @@ from hub.db.revertable import RevertableOp
from hub.db.common import TrendingNotification, DB_PREFIXES from hub.db.common import TrendingNotification, DB_PREFIXES
from hub.notifier_protocol import ElasticNotifierProtocol from hub.notifier_protocol import ElasticNotifierProtocol
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
from hub.elastic_sync.db import ElasticSyncDB
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from hub.elastic_sync.env import ElasticEnv from hub.elastic_sync.env import ElasticEnv
@ -44,6 +45,15 @@ class ElasticSyncService(BlockchainReaderService):
self._listeners: typing.List[ElasticNotifierProtocol] = [] self._listeners: typing.List[ElasticNotifierProtocol] = []
self._force_reindex = False self._force_reindex = False
def open_db(self):
env = self.env
self.db = ElasticSyncDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
)
async def run_es_notifier(self, synchronized: asyncio.Event): async def run_es_notifier(self, synchronized: asyncio.Event):
server = await asyncio.get_event_loop().create_server( server = await asyncio.get_event_loop().create_server(
lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port
@ -233,6 +243,7 @@ class ElasticSyncService(BlockchainReaderService):
self._advanced = True self._advanced = True
def unwind(self): def unwind(self):
self.db.block_timestamp_cache.clear()
reverted_block_hash = self.db.block_hashes[-1] reverted_block_hash = self.db.block_hashes[-1]
super().unwind() super().unwind()
packed = self.db.prefix_db.undo.get(len(self.db.tx_counts), reverted_block_hash) packed = self.db.prefix_db.undo.get(len(self.db.tx_counts), reverted_block_hash)

33
hub/herald/db.py Normal file
View file

@ -0,0 +1,33 @@
import asyncio
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from hub.db import SecondaryDB
class HeraldDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status)
# self.headers = None
# async def _read_headers(self):
# def get_headers():
# return [
# header for header in self.prefix_db.header.iterate(
# start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False,
# deserialize_value=False
# )
# ]
#
# headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
# assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
# self.headers = headers
# async def initialize_caches(self):
# await super().initialize_caches()
# await self._read_headers()

View file

@ -13,7 +13,7 @@ from hub.scribe.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from hub.herald.session import SessionManager from hub.herald.session import SessionManager
from hub.db import HubDB from hub.db import SecondaryDB
@attr.s(slots=True) @attr.s(slots=True)
@ -46,7 +46,7 @@ mempool_touched_address_count_metric = Gauge(
class HubMemPool: class HubMemPool:
def __init__(self, coin, db: 'HubDB', refresh_secs=1.0): def __init__(self, coin, db: 'SecondaryDB', refresh_secs=1.0):
self.coin = coin self.coin = coin
self._db = db self._db = db
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)

View file

@ -10,7 +10,7 @@ from hub.schema.result import Censor, Outputs
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
from hub.db.common import ResolveResult from hub.db.common import ResolveResult
if TYPE_CHECKING: if TYPE_CHECKING:
from hub.db import HubDB from hub.db import SecondaryDB
class ChannelResolution(str): class ChannelResolution(str):
@ -28,7 +28,7 @@ class StreamResolution(str):
class SearchIndex: class SearchIndex:
VERSION = 1 VERSION = 1
def __init__(self, hub_db: 'HubDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
elastic_port=9200): elastic_port=9200):
self.hub_db = hub_db self.hub_db = hub_db
self.search_timeout = search_timeout self.search_timeout = search_timeout

View file

@ -5,6 +5,7 @@ from hub.scribe.daemon import LBCDaemon
from hub.herald.session import SessionManager from hub.herald.session import SessionManager
from hub.herald.mempool import HubMemPool from hub.herald.mempool import HubMemPool
from hub.herald.udp import StatusServer from hub.herald.udp import StatusServer
from hub.herald.db import HeraldDB
from hub.service import BlockchainReaderService from hub.service import BlockchainReaderService
from hub.notifier_protocol import ElasticNotifierClientProtocol from hub.notifier_protocol import ElasticNotifierClientProtocol
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -35,6 +36,15 @@ class HubServerService(BlockchainReaderService):
self._es_height = None self._es_height = None
self._es_block_hash = None self._es_block_hash = None
def open_db(self):
env = self.env
self.db = HeraldDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
)
def clear_caches(self): def clear_caches(self):
self.session_manager.clear_caches() self.session_manager.clear_caches()
# self.clear_search_cache() # self.clear_search_cache()
@ -54,7 +64,6 @@ class HubServerService(BlockchainReaderService):
self.session_manager.hashX_history_cache.clear() self.session_manager.hashX_history_cache.clear()
prev_count = self.db.tx_counts.pop() prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1] tx_count = self.db.tx_counts[-1]
self.db.headers.pop()
self.db.block_hashes.pop() self.db.block_hashes.pop()
current_count = prev_count current_count = prev_count
for _ in range(prev_count - tx_count): for _ in range(prev_count - tx_count):

View file

@ -28,7 +28,7 @@ from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2,
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
from hub.herald.framer import NewlineFramer from hub.herald.framer import NewlineFramer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from hub.db import HubDB from hub.db import SecondaryDB
from hub.herald.env import ServerEnv from hub.herald.env import ServerEnv
from hub.scribe.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from hub.herald.mempool import HubMemPool from hub.herald.mempool import HubMemPool
@ -179,7 +179,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool', def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event, daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
@ -1750,7 +1750,7 @@ class LBRYElectrumX(asyncio.Protocol):
if not verbose: if not verbose:
return raw_tx.hex() return raw_tx.hex()
return self.coin.transaction(raw_tx).as_dict(self.coin) return self.coin.transaction(raw_tx).as_dict(self.coin)
return RPCError("No such mempool or blockchain transaction.") return RPCError(BAD_REQUEST, "No such mempool or blockchain transaction.")
def _get_merkle_branch(self, tx_hashes, tx_pos): def _get_merkle_branch(self, tx_hashes, tx_pos):
"""Return a merkle branch to a transaction. """Return a merkle branch to a transaction.

117
hub/scribe/db.py Normal file
View file

@ -0,0 +1,117 @@
import asyncio
import array
import time
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right
from hub.common import sha256
from hub.db import SecondaryDB
class PrimaryDB(SecondaryDB):
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status)
def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...")
prefix_db = self.prefix_db
def hashX_iterator():
last_hashX = None
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
start = time.perf_counter()
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
include_key=False):
hashXs.update(touched.touched_hashXs)
hashXs = list(hashXs)
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
self._index_address_status = True
self.write_db_state()
self.prefix_db.unsafe_commit()
self.logger.info("finished indexing address statuses")
def rebuild_hashX_status_index(self, start_height: int):
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
def apply_expiration_extension_fork(self):
# TODO: this can't be reorged
for k, v in self.prefix_db.claim_expiration.iterate():
self.prefix_db.claim_expiration.stage_delete(k, v)
self.prefix_db.claim_expiration.stage_put(
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
k.tx_num, k.position), v
)
self.prefix_db.unsafe_commit()
def write_db_state(self):
"""Write (UTXO) state to the batch."""
last_indexed_address_status = 0
if self.db_height > 0:
existing = self.prefix_db.db_state.get()
last_indexed_address_status = existing.hashX_status_last_indexed_height
self.prefix_db.db_state.stage_delete((), existing.expanded)
if self._index_address_status:
last_indexed_address_status = self.db_height
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height, last_indexed_address_status
)
)

View file

@ -5,7 +5,7 @@ from collections import defaultdict
from hub.scribe.transaction.deserializer import Deserializer from hub.scribe.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from hub.db import HubDB from hub.scribe.db import PrimaryDB
@attr.s(slots=True) @attr.s(slots=True)
@ -27,7 +27,7 @@ class MemPoolTxSummary:
class MemPool: class MemPool:
def __init__(self, coin, db: 'HubDB'): def __init__(self, coin, db: 'PrimaryDB'):
self.coin = coin self.coin = coin
self._db = db self._db = db
self.txs = {} self.txs = {}

View file

@ -12,6 +12,7 @@ from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
from hub.scribe.prefetcher import Prefetcher from hub.scribe.prefetcher import Prefetcher
@ -121,6 +122,15 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size))) self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
self.history_tx_info_cache = LRUCache(2 ** 16) self.history_tx_info_cache = LRUCache(2 ** 16)
def open_db(self):
env = self.env
self.db = PrimaryDB(
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
executor=self._executor, index_address_status=env.index_address_status
)
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task # cancellations from shutdown don't lose work - when the task
@ -1383,7 +1393,6 @@ class BlockchainProcessorService(BlockchainService):
) )
self.height = height self.height = height
self.db.headers.append(block.header)
self.db.block_hashes.append(self.env.coin.header_hash(block.header)) self.db.block_hashes.append(self.env.coin.header_hash(block.header))
self.tip = self.coin.header_hash(block.header) self.tip = self.coin.header_hash(block.header)
@ -1549,7 +1558,6 @@ class BlockchainProcessorService(BlockchainService):
# Check and update self.tip # Check and update self.tip
self.db.tx_counts.pop() self.db.tx_counts.pop()
self.db.headers.pop()
reverted_block_hash = self.db.block_hashes.pop() reverted_block_hash = self.db.block_hashes.pop()
self.tip = self.db.block_hashes[-1] self.tip = self.db.block_hashes[-1]
if self.env.cache_all_tx_hashes: if self.env.cache_all_tx_hashes:

View file

@ -7,7 +7,7 @@ from prometheus_client import Gauge, Histogram
from hub import __version__, PROMETHEUS_NAMESPACE from hub import __version__, PROMETHEUS_NAMESPACE
from hub.env import Env from hub.env import Env
from hub.db import HubDB from hub.db import SecondaryDB
from hub.db.prefixes import DBState from hub.db.prefixes import DBState
from hub.common import HISTOGRAM_BUCKETS from hub.common import HISTOGRAM_BUCKETS
from hub.metrics import PrometheusServer from hub.metrics import PrometheusServer
@ -17,6 +17,7 @@ class BlockchainService:
""" """
Base class for blockchain readers as well as the block processor Base class for blockchain readers as well as the block processor
""" """
def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'): def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'):
self.env = env self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
@ -27,13 +28,19 @@ class BlockchainService:
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix) self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.last_state: typing.Optional[DBState] = None self.last_state: typing.Optional[DBState] = None
self.db = HubDB( self.secondary_name = secondary_name
env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, self._stopping = False
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids, self.db = None
self.open_db()
def open_db(self):
env = self.env
self.db = SecondaryDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor, filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status index_address_status=env.index_address_status
) )
self._stopping = False
def start_cancellable(self, run, *args): def start_cancellable(self, run, *args):
_flag = asyncio.Event() _flag = asyncio.Event()
@ -167,7 +174,7 @@ class BlockchainReaderService(BlockchainService):
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}" assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
header = self.db.prefix_db.header.get(height, deserialize_value=False) header = self.db.prefix_db.header.get(height, deserialize_value=False)
self.db.headers.append(header) # self.db.headers.append(header)
self.db.block_hashes.append(self.env.coin.header_hash(header)) self.db.block_hashes.append(self.env.coin.header_hash(header))
def unwind(self): def unwind(self):
@ -176,7 +183,7 @@ class BlockchainReaderService(BlockchainService):
""" """
prev_count = self.db.tx_counts.pop() prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1] tx_count = self.db.tx_counts[-1]
self.db.headers.pop() # self.db.headers.pop()
self.db.block_hashes.pop() self.db.block_hashes.pop()
if self.db._cache_all_tx_hashes: if self.db._cache_all_tx_hashes:
for _ in range(prev_count - tx_count): for _ in range(prev_count - tx_count):
@ -202,7 +209,8 @@ class BlockchainReaderService(BlockchainService):
rewound = False rewound = False
if self.last_state: if self.last_state:
while True: while True:
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False): if self.db.block_hashes[-1] == self.env.coin.header_hash(
self.db.prefix_db.header.get(last_height, deserialize_value=False)):
self.log.debug("connects to block %i", last_height) self.log.debug("connects to block %i", last_height)
break break
else: else: