Split up the db classes used by the different hub services #43

Merged
jackrobison merged 3 commits from cleanup-db into master 2022-05-27 18:54:21 +02:00
14 changed files with 482 additions and 389 deletions
Showing only changes of commit 0901f67d89 - Show all commits

View file

@ -1 +1 @@
from .db import HubDB
from .db import SecondaryDB

View file

@ -33,19 +33,18 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
class HubDB:
class SecondaryDB:
DB_VERSIONS = [7, 8, 9]
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
self.logger = logging.getLogger(__name__)
self.coin = coin
self._executor = executor
self._db_dir = db_dir
self._reorg_limit = reorg_limit
self._cache_all_claim_txos = cache_all_claim_txos
self._cache_all_tx_hashes = cache_all_tx_hashes
@ -81,7 +80,7 @@ class HubDB:
}
self.tx_counts = None
self.headers = None
# self.headers = None
self.block_hashes = None
self.encoded_headers = LRUCacheWithMetrics(1024, metric_name='encoded_headers', namespace=NAMESPACE)
self.last_flush = time.time()
@ -470,245 +469,6 @@ class HubDB:
self.logger.exception("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex())
return
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
if not metadata:
return
metadata = metadata
if not metadata.is_stream or not metadata.stream.has_fee:
fee_amount = 0
else:
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
if fee_amount >= 9223372036854775807:
return
reposted_claim_hash = claim.reposted_claim_hash
reposted_claim = None
reposted_metadata = None
if reposted_claim_hash:
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
)
if not reposted_metadata:
return
reposted_tags = []
reposted_languages = []
reposted_has_source = False
reposted_claim_type = None
reposted_stream_type = None
reposted_media_type = None
reposted_fee_amount = None
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
try:
reposted_metadata = self.coin.transaction(
raw_reposted_claim_tx
).outputs[reposted_claim.position].metadata
except:
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
claim.reposted_claim_hash.hex(), claim_hash.hex())
return
if reposted_metadata:
if reposted_metadata.is_stream:
meta = reposted_metadata.stream
elif reposted_metadata.is_channel:
meta = reposted_metadata.channel
elif reposted_metadata.is_collection:
meta = reposted_metadata.collection
elif reposted_metadata.is_repost:
meta = reposted_metadata.repost
else:
return
reposted_tags = [tag for tag in meta.tags]
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
if reposted_has_source else 0
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
reposted_fee_amount = 0
else:
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
if reposted_fee_amount >= 9223372036854775807:
return
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
reposted_duration = None
if reposted_metadata.is_stream and \
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
if metadata.is_stream:
meta = metadata.stream
elif metadata.is_channel:
meta = metadata.channel
elif metadata.is_collection:
meta = metadata.collection
elif metadata.is_repost:
meta = metadata.repost
else:
return
claim_tags = [tag for tag in meta.tags]
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
tags = list(set(claim_tags).union(set(reposted_tags)))
languages = list(set(claim_languages).union(set(reposted_languages)))
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
value = {
'claim_id': claim_hash.hex(),
'claim_name': claim.name,
'normalized_name': claim.normalized_name,
'tx_id': claim.tx_hash[::-1].hex(),
'tx_num': claim.tx_num,
'tx_nout': claim.position,
'amount': claim.amount,
'timestamp': self.estimate_timestamp(claim.height),
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
'height': claim.height,
'creation_height': claim.creation_height,
'activation_height': claim.activation_height,
'expiration_height': claim.expiration_height,
'effective_amount': claim.effective_amount,
'support_amount': claim.support_amount,
'is_controlling': bool(claim.is_controlling),
'last_take_over_height': claim.last_takeover_height,
'short_url': claim.short_url,
'canonical_url': claim.canonical_url,
'title': None if not metadata.is_stream else metadata.stream.title,
'author': None if not metadata.is_stream else metadata.stream.author,
'description': None if not metadata.is_stream else metadata.stream.description,
'claim_type': CLAIM_TYPES[metadata.claim_type],
'has_source': reposted_has_source if metadata.is_repost else (
False if not metadata.is_stream else metadata.stream.has_source),
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
if metadata.is_stream and metadata.stream.has_source
else reposted_stream_type if metadata.is_repost else 0,
'media_type': metadata.stream.source.media_type
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
'fee_currency': metadata.stream.fee.currency
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
'repost_count': self.get_reposted_count(claim_hash),
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
'reposted_claim_type': reposted_claim_type,
'reposted_has_source': reposted_has_source,
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
'public_key_id': None if not metadata.is_channel else
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
'signature': (metadata.signature or b'').hex() or None,
# 'signature_digest': metadata.signature,
'is_signature_valid': bool(claim.signature_valid),
'tags': tags,
'languages': languages,
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
'reposted_tx_position': claim.reposted_tx_position,
'reposted_height': claim.reposted_height,
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
'channel_tx_position': claim.channel_tx_position,
'channel_height': claim.channel_height,
}
if metadata.is_repost and reposted_duration is not None:
value['duration'] = reposted_duration
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
if metadata.is_stream:
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
elif metadata.is_repost or metadata.is_collection:
value['release_time'] = value['creation_timestamp']
return value
async def all_claims_producer(self, batch_size=500_000):
batch = []
if self._cache_all_claim_txos:
claim_iterator = self.claim_to_txo.items()
else:
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
for claim_hash, claim_txo in claim_iterator:
# TODO: fix the couple of claim txos that dont have controlling names
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
if len(batch) == batch_size:
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if not claim:
self.logger.warning("wat")
return
return self._prepare_claim_metadata(claim.claim_hash, claim)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
for claim_hash in claim_hashes:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list)
for k, v in self.prefix_db.pending_activation.iterate(prefix=(height,)):
@ -761,20 +521,20 @@ class HubDB:
ts = time.perf_counter() - start
self.logger.info("loaded %i claim txos in %ss", len(self.claim_to_txo), round(ts, 4))
async def _read_headers(self):
# if self.headers is not None:
# return
def get_headers():
return [
header for header in self.prefix_db.header.iterate(
start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False
)
]
headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers
# async def _read_headers(self):
# # if self.headers is not None:
# # return
#
# def get_headers():
# return [
# header for header in self.prefix_db.header.iterate(
# start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False, deserialize_value=False
# )
# ]
#
# headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
# assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
# self.headers = headers
async def _read_block_hashes(self):
def get_block_hashes():
@ -785,7 +545,7 @@ class HubDB:
]
block_hashes = await asyncio.get_event_loop().run_in_executor(self._executor, get_block_hashes)
assert len(block_hashes) == len(self.headers)
# assert len(block_hashes) == len(self.headers)
self.block_hashes = block_hashes
async def _read_tx_hashes(self):
@ -803,11 +563,6 @@ class HubDB:
ts = time.perf_counter() - start
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))
def estimate_timestamp(self, height: int) -> int:
if height < len(self.headers):
return struct.unpack('<I', self.headers[height][100:104])[0]
return int(160.6855883050695 * height)
def open_db(self):
if self.prefix_db:
return
@ -858,7 +613,6 @@ class HubDB:
async def initialize_caches(self):
await self._read_tx_counts()
await self._read_headers()
await self._read_block_hashes()
if self._cache_all_claim_txos:
await self._read_claim_txos()
@ -871,78 +625,6 @@ class HubDB:
self.prefix_db.close()
self.prefix_db = None
def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...")
prefix_db = self.prefix_db
def hashX_iterator():
last_hashX = None
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
start = time.perf_counter()
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
include_key=False):
hashXs.update(touched.touched_hashXs)
hashXs = list(hashXs)
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
self._index_address_status = True
self.write_db_state()
self.prefix_db.unsafe_commit()
self.logger.info("finished indexing address statuses")
def rebuild_hashX_status_index(self, start_height: int):
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
def _get_hashX_status(self, hashX: bytes):
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
if mempool_status:
@ -1169,9 +851,9 @@ class HubDB:
return {txid: tx_infos.get(txid) for txid in txids} # match ordering of the txs in the request
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
raise DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
if height + count > self.db_height + 1:
raise DBError(f'only got {len(self.block_hashes) - height:,d} headers starting at {height:,d}, not {count:,d}')
return self.block_hashes[height:height + count]
def _read_history(self, hashX: bytes, limit: Optional[int] = 1000) -> List[int]:
txs = []
@ -1209,33 +891,6 @@ class HubDB:
"""Returns a height from which we should store undo info."""
return max_height - self._reorg_limit + 1
def apply_expiration_extension_fork(self):
# TODO: this can't be reorged
for k, v in self.prefix_db.claim_expiration.iterate():
self.prefix_db.claim_expiration.stage_delete(k, v)
self.prefix_db.claim_expiration.stage_put(
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
k.tx_num, k.position), v
)
self.prefix_db.unsafe_commit()
def write_db_state(self):
"""Write (UTXO) state to the batch."""
last_indexed_address_status = 0
if self.db_height > 0:
existing = self.prefix_db.db_state.get()
last_indexed_address_status = existing.hashX_status_last_indexed_height
self.prefix_db.db_state.stage_delete((), existing.expanded)
if self._index_address_status:
last_indexed_address_status = self.db_height
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height, last_indexed_address_status
)
)
def read_db_state(self):
state = self.prefix_db.db_state.get()

View file

@ -5,7 +5,7 @@ import typing
from bisect import bisect_right
from hub.common import sha256
if typing.TYPE_CHECKING:
from hub.db.db import HubDB
from hub.scribe.db import PrimaryDB
FROM_VERSION = 7
TO_VERSION = 8
@ -35,7 +35,7 @@ def hashX_history(db: 'HubDB', hashX: bytes):
return history, to_delete
def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes:
def hashX_status_from_history(db: 'PrimaryDB', history: bytes) -> bytes:
tx_counts = db.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)

253
hub/elastic_sync/db.py Normal file
View file

@ -0,0 +1,253 @@
from typing import Optional, Set, Dict
from hub.schema.claim import guess_stream_type
from hub.schema.result import Censor
from hub.common import hash160, STREAM_TYPES, CLAIM_TYPES
from hub.db import SecondaryDB
from hub.db.common import ResolveResult
class ElasticSyncDB(SecondaryDB):
def estimate_timestamp(self, height: int) -> int:
header = self.prefix_db.header.get(height, deserialize_value=False)
if header:
return int.from_bytes(header[100:104], byteorder='little')
return int(160.6855883050695 * height)
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
if not metadata:
return
metadata = metadata
if not metadata.is_stream or not metadata.stream.has_fee:
fee_amount = 0
else:
fee_amount = int(max(metadata.stream.fee.amount or 0, 0) * 1000)
if fee_amount >= 9223372036854775807:
return
reposted_claim_hash = claim.reposted_claim_hash
reposted_claim = None
reposted_metadata = None
if reposted_claim_hash:
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
)
if not reposted_metadata:
return
reposted_tags = []
reposted_languages = []
reposted_has_source = False
reposted_claim_type = None
reposted_stream_type = None
reposted_media_type = None
reposted_fee_amount = None
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
raw_reposted_claim_tx = self.prefix_db.tx.get(claim.reposted_tx_hash, deserialize_value=False)
try:
reposted_metadata = self.coin.transaction(
raw_reposted_claim_tx
).outputs[reposted_claim.position].metadata
except:
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
claim.reposted_claim_hash.hex(), claim_hash.hex())
return
if reposted_metadata:
if reposted_metadata.is_stream:
meta = reposted_metadata.stream
elif reposted_metadata.is_channel:
meta = reposted_metadata.channel
elif reposted_metadata.is_collection:
meta = reposted_metadata.collection
elif reposted_metadata.is_repost:
meta = reposted_metadata.repost
else:
return
reposted_tags = [tag for tag in meta.tags]
reposted_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
reposted_stream_type = STREAM_TYPES[guess_stream_type(reposted_metadata.stream.source.media_type)] \
if reposted_has_source else 0
reposted_media_type = reposted_metadata.stream.source.media_type if reposted_metadata.is_stream else 0
if not reposted_metadata.is_stream or not reposted_metadata.stream.has_fee:
reposted_fee_amount = 0
else:
reposted_fee_amount = int(max(reposted_metadata.stream.fee.amount or 0, 0) * 1000)
if reposted_fee_amount >= 9223372036854775807:
return
reposted_fee_currency = None if not reposted_metadata.is_stream else reposted_metadata.stream.fee.currency
reposted_duration = None
if reposted_metadata.is_stream and \
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
if metadata.is_stream:
meta = metadata.stream
elif metadata.is_channel:
meta = metadata.channel
elif metadata.is_collection:
meta = metadata.collection
elif metadata.is_repost:
meta = metadata.repost
else:
return
claim_tags = [tag for tag in meta.tags]
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
tags = list(set(claim_tags).union(set(reposted_tags)))
languages = list(set(claim_languages).union(set(reposted_languages)))
blocked_hash = self.blocked_streams.get(claim_hash) or self.blocked_streams.get(
reposted_claim_hash) or self.blocked_channels.get(claim_hash) or self.blocked_channels.get(
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
filtered_hash = self.filtered_streams.get(claim_hash) or self.filtered_streams.get(
reposted_claim_hash) or self.filtered_channels.get(claim_hash) or self.filtered_channels.get(
reposted_claim_hash) or self.filtered_channels.get(claim.channel_hash)
value = {
'claim_id': claim_hash.hex(),
'claim_name': claim.name,
'normalized_name': claim.normalized_name,
'tx_id': claim.tx_hash[::-1].hex(),
'tx_num': claim.tx_num,
'tx_nout': claim.position,
'amount': claim.amount,
'timestamp': self.estimate_timestamp(claim.height),
'creation_timestamp': self.estimate_timestamp(claim.creation_height),
'height': claim.height,
'creation_height': claim.creation_height,
'activation_height': claim.activation_height,
'expiration_height': claim.expiration_height,
'effective_amount': claim.effective_amount,
'support_amount': claim.support_amount,
'is_controlling': bool(claim.is_controlling),
'last_take_over_height': claim.last_takeover_height,
'short_url': claim.short_url,
'canonical_url': claim.canonical_url,
'title': None if not metadata.is_stream else metadata.stream.title,
'author': None if not metadata.is_stream else metadata.stream.author,
'description': None if not metadata.is_stream else metadata.stream.description,
'claim_type': CLAIM_TYPES[metadata.claim_type],
'has_source': reposted_has_source if metadata.is_repost else (
False if not metadata.is_stream else metadata.stream.has_source),
'sd_hash': metadata.stream.source.sd_hash if metadata.is_stream and metadata.stream.has_source else None,
'stream_type': STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)]
if metadata.is_stream and metadata.stream.has_source
else reposted_stream_type if metadata.is_repost else 0,
'media_type': metadata.stream.source.media_type
if metadata.is_stream else reposted_media_type if metadata.is_repost else None,
'fee_amount': fee_amount if not metadata.is_repost else reposted_fee_amount,
'fee_currency': metadata.stream.fee.currency
if metadata.is_stream else reposted_fee_currency if metadata.is_repost else None,
'repost_count': self.get_reposted_count(claim_hash),
'reposted_claim_id': None if not reposted_claim_hash else reposted_claim_hash.hex(),
'reposted_claim_type': reposted_claim_type,
'reposted_has_source': reposted_has_source,
'channel_id': None if not metadata.is_signed else metadata.signing_channel_hash[::-1].hex(),
'public_key_id': None if not metadata.is_channel else
self.coin.P2PKH_address_from_hash160(hash160(metadata.channel.public_key_bytes)),
'signature': (metadata.signature or b'').hex() or None,
# 'signature_digest': metadata.signature,
'is_signature_valid': bool(claim.signature_valid),
'tags': tags,
'languages': languages,
'censor_type': Censor.RESOLVE if blocked_hash else Censor.SEARCH if filtered_hash else Censor.NOT_CENSORED,
'censoring_channel_id': (blocked_hash or filtered_hash or b'').hex() or None,
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash),
'reposted_tx_id': None if not claim.reposted_tx_hash else claim.reposted_tx_hash[::-1].hex(),
'reposted_tx_position': claim.reposted_tx_position,
'reposted_height': claim.reposted_height,
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
'channel_tx_position': claim.channel_tx_position,
'channel_height': claim.channel_height,
}
if metadata.is_repost and reposted_duration is not None:
value['duration'] = reposted_duration
elif metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
if metadata.is_stream:
value['release_time'] = metadata.stream.release_time or value['creation_timestamp']
elif metadata.is_repost or metadata.is_collection:
value['release_time'] = value['creation_timestamp']
return value
async def all_claims_producer(self, batch_size=500_000):
batch = []
if self._cache_all_claim_txos:
claim_iterator = self.claim_to_txo.items()
else:
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
for claim_hash, claim_txo in claim_iterator:
# TODO: fix the couple of claim txos that dont have controlling names
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
if len(batch) == batch_size:
batch.sort(key=lambda x: x.tx_hash) # sort is to improve read-ahead hits
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
yield meta
batch.clear()
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if not claim:
self.logger.warning("wat")
return
return self._prepare_claim_metadata(claim.claim_hash, claim)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
for claim_hash in claim_hashes:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results

View file

@ -12,6 +12,7 @@ from hub.db.revertable import RevertableOp
from hub.db.common import TrendingNotification, DB_PREFIXES
from hub.notifier_protocol import ElasticNotifierProtocol
from hub.elastic_sync.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
from hub.elastic_sync.db import ElasticSyncDB
if typing.TYPE_CHECKING:
from hub.elastic_sync.env import ElasticEnv
@ -44,6 +45,15 @@ class ElasticSyncService(BlockchainReaderService):
self._listeners: typing.List[ElasticNotifierProtocol] = []
self._force_reindex = False
def open_db(self):
env = self.env
self.db = ElasticSyncDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
)
async def run_es_notifier(self, synchronized: asyncio.Event):
server = await asyncio.get_event_loop().create_server(
lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port

33
hub/herald/db.py Normal file
View file

@ -0,0 +1,33 @@
import asyncio
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from hub.db import SecondaryDB
class HeraldDB(SecondaryDB):
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, secondary_name, max_open_files, reorg_limit, cache_all_claim_txos,
cache_all_tx_hashes, blocking_channel_ids, filtering_channel_ids, executor,
index_address_status)
# self.headers = None
# async def _read_headers(self):
# def get_headers():
# return [
# header for header in self.prefix_db.header.iterate(
# start=(0, ), stop=(self.db_height + 1, ), include_key=False, fill_cache=False,
# deserialize_value=False
# )
# ]
#
# headers = await asyncio.get_event_loop().run_in_executor(self._executor, get_headers)
# assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
# self.headers = headers
# async def initialize_caches(self):
# await super().initialize_caches()
# await self._read_headers()

View file

@ -13,7 +13,7 @@ from hub.scribe.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING:
from hub.herald.session import SessionManager
from hub.db import HubDB
from hub.db import SecondaryDB
@attr.s(slots=True)
@ -46,7 +46,7 @@ mempool_touched_address_count_metric = Gauge(
class HubMemPool:
def __init__(self, coin, db: 'HubDB', refresh_secs=1.0):
def __init__(self, coin, db: 'SecondaryDB', refresh_secs=1.0):
self.coin = coin
self._db = db
self.logger = logging.getLogger(__name__)

View file

@ -10,7 +10,7 @@ from hub.schema.result import Censor, Outputs
from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result
from hub.db.common import ResolveResult
if TYPE_CHECKING:
from hub.db import HubDB
from hub.db import SecondaryDB
class ChannelResolution(str):
@ -28,7 +28,7 @@ class StreamResolution(str):
class SearchIndex:
VERSION = 1
def __init__(self, hub_db: 'HubDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost',
elastic_port=9200):
self.hub_db = hub_db
self.search_timeout = search_timeout

View file

@ -5,6 +5,7 @@ from hub.scribe.daemon import LBCDaemon
from hub.herald.session import SessionManager
from hub.herald.mempool import HubMemPool
from hub.herald.udp import StatusServer
from hub.herald.db import HeraldDB
from hub.service import BlockchainReaderService
from hub.notifier_protocol import ElasticNotifierClientProtocol
if typing.TYPE_CHECKING:
@ -35,6 +36,15 @@ class HubServerService(BlockchainReaderService):
self._es_height = None
self._es_block_hash = None
def open_db(self):
env = self.env
self.db = HeraldDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
)
def clear_caches(self):
self.session_manager.clear_caches()
# self.clear_search_cache()
@ -54,7 +64,6 @@ class HubServerService(BlockchainReaderService):
self.session_manager.hashX_history_cache.clear()
prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1]
self.db.headers.pop()
self.db.block_hashes.pop()
current_count = prev_count
for _ in range(prev_count - tx_count):

View file

@ -28,7 +28,7 @@ from hub.herald.jsonrpc import JSONRPCAutoDetect, JSONRPCConnection, JSONRPCv2,
from hub.herald.common import BatchRequest, ProtocolError, Request, Batch, Notification
from hub.herald.framer import NewlineFramer
if typing.TYPE_CHECKING:
from hub.db import HubDB
from hub.db import SecondaryDB
from hub.herald.env import ServerEnv
from hub.scribe.daemon import LBCDaemon
from hub.herald.mempool import HubMemPool
@ -179,7 +179,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
)
def __init__(self, env: 'ServerEnv', db: 'HubDB', mempool: 'HubMemPool',
def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send)

117
hub/scribe/db.py Normal file
View file

@ -0,0 +1,117 @@
import asyncio
import array
import time
from typing import List
from concurrent.futures.thread import ThreadPoolExecutor
from bisect import bisect_right
from hub.common import sha256
from hub.db import SecondaryDB
class PrimaryDB(SecondaryDB):
def __init__(self, coin, db_dir: str, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None,
index_address_status=False):
super().__init__(coin, db_dir, '', max_open_files, reorg_limit, cache_all_claim_txos, cache_all_tx_hashes,
blocking_channel_ids, filtering_channel_ids, executor, index_address_status)
def _rebuild_hashX_status_index(self, start_height: int):
self.logger.warning("rebuilding the address status index...")
prefix_db = self.prefix_db
def hashX_iterator():
last_hashX = None
for k in prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
def hashX_status_from_history(history: bytes) -> bytes:
tx_counts = self.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{self.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
start = time.perf_counter()
if start_height <= 0:
self.logger.info("loading all blockchain addresses, this will take a little while...")
hashXs = [hashX for hashX in hashX_iterator()]
else:
self.logger.info("loading addresses since block %i...", start_height)
hashXs = set()
for touched in prefix_db.touched_hashX.iterate(start=(start_height,), stop=(self.db_height + 1,),
include_key=False):
hashXs.update(touched.touched_hashXs)
hashXs = list(hashXs)
self.logger.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history = b''.join(prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, include_key=False))
status = hashX_status_from_history(history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status == status:
continue
elif not existing_status:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
else:
prefix_db.stage_raw_delete(key, existing_status)
prefix_db.stage_raw_put(key, status)
op_cnt += 2
if op_cnt > 100000:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
self.logger.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses...")
self._index_address_status = True
self.write_db_state()
self.prefix_db.unsafe_commit()
self.logger.info("finished indexing address statuses")
def rebuild_hashX_status_index(self, start_height: int):
return asyncio.get_event_loop().run_in_executor(self._executor, self._rebuild_hashX_status_index, start_height)
def apply_expiration_extension_fork(self):
# TODO: this can't be reorged
for k, v in self.prefix_db.claim_expiration.iterate():
self.prefix_db.claim_expiration.stage_delete(k, v)
self.prefix_db.claim_expiration.stage_put(
(bisect_right(self.tx_counts, k.tx_num) + self.coin.nExtendedClaimExpirationTime,
k.tx_num, k.position), v
)
self.prefix_db.unsafe_commit()
def write_db_state(self):
"""Write (UTXO) state to the batch."""
last_indexed_address_status = 0
if self.db_height > 0:
existing = self.prefix_db.db_state.get()
last_indexed_address_status = existing.hashX_status_last_indexed_height
self.prefix_db.db_state.stage_delete((), existing.expanded)
if self._index_address_status:
last_indexed_address_status = self.db_height
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.catching_up, self._index_address_status, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height, last_indexed_address_status
)
)

View file

@ -5,7 +5,7 @@ from collections import defaultdict
from hub.scribe.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING:
from hub.db import HubDB
from hub.scribe.db import PrimaryDB
@attr.s(slots=True)
@ -27,7 +27,7 @@ class MemPoolTxSummary:
class MemPool:
def __init__(self, coin, db: 'HubDB'):
def __init__(self, coin, db: 'PrimaryDB'):
self.coin = coin
self._db = db
self.txs = {}

View file

@ -12,6 +12,7 @@ from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from hub.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from hub.error.base import ChainError
from hub.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256, LRUCache
from hub.scribe.db import PrimaryDB
from hub.scribe.daemon import LBCDaemon
from hub.scribe.transaction import Tx, TxOutput, TxInput, Block
from hub.scribe.prefetcher import Prefetcher
@ -121,6 +122,15 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache = LRUCache(min(100, max(0, env.hashX_history_cache_size)))
self.history_tx_info_cache = LRUCache(2 ** 16)
def open_db(self):
env = self.env
self.db = PrimaryDB(
env.coin, env.db_dir, env.reorg_limit, cache_all_claim_txos=env.cache_all_claim_txos,
cache_all_tx_hashes=env.cache_all_tx_hashes, max_open_files=env.db_max_open_files,
blocking_channel_ids=env.blocking_channel_ids, filtering_channel_ids=env.filtering_channel_ids,
executor=self._executor, index_address_status=env.index_address_status
)
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task
@ -1383,7 +1393,6 @@ class BlockchainProcessorService(BlockchainService):
)
self.height = height
self.db.headers.append(block.header)
self.db.block_hashes.append(self.env.coin.header_hash(block.header))
self.tip = self.coin.header_hash(block.header)
@ -1549,7 +1558,6 @@ class BlockchainProcessorService(BlockchainService):
# Check and update self.tip
self.db.tx_counts.pop()
self.db.headers.pop()
reverted_block_hash = self.db.block_hashes.pop()
self.tip = self.db.block_hashes[-1]
if self.env.cache_all_tx_hashes:

View file

@ -7,7 +7,7 @@ from prometheus_client import Gauge, Histogram
from hub import __version__, PROMETHEUS_NAMESPACE
from hub.env import Env
from hub.db import HubDB
from hub.db import SecondaryDB
from hub.db.prefixes import DBState
from hub.common import HISTOGRAM_BUCKETS
from hub.metrics import PrometheusServer
@ -17,6 +17,7 @@ class BlockchainService:
"""
Base class for blockchain readers as well as the block processor
"""
def __init__(self, env: Env, secondary_name: str, thread_workers: int = 1, thread_prefix: str = 'scribe'):
self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
@ -27,13 +28,19 @@ class BlockchainService:
self._executor = ThreadPoolExecutor(thread_workers, thread_name_prefix=thread_prefix)
self.lock = asyncio.Lock()
self.last_state: typing.Optional[DBState] = None
self.db = HubDB(
env.coin, env.db_dir, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1, blocking_channel_ids=env.blocking_channel_ids,
self.secondary_name = secondary_name
self._stopping = False
self.db = None
self.open_db()
def open_db(self):
env = self.env
self.db = SecondaryDB(
env.coin, env.db_dir, self.secondary_name, -1, env.reorg_limit, env.cache_all_claim_txos,
env.cache_all_tx_hashes, blocking_channel_ids=env.blocking_channel_ids,
filtering_channel_ids=env.filtering_channel_ids, executor=self._executor,
index_address_status=env.index_address_status
)
self._stopping = False
def start_cancellable(self, run, *args):
_flag = asyncio.Event()
@ -167,7 +174,7 @@ class BlockchainReaderService(BlockchainService):
assert len(self.db.total_transactions) == tx_count, f"{len(self.db.total_transactions)} vs {tx_count}"
header = self.db.prefix_db.header.get(height, deserialize_value=False)
self.db.headers.append(header)
# self.db.headers.append(header)
self.db.block_hashes.append(self.env.coin.header_hash(header))
def unwind(self):
@ -176,7 +183,7 @@ class BlockchainReaderService(BlockchainService):
"""
prev_count = self.db.tx_counts.pop()
tx_count = self.db.tx_counts[-1]
self.db.headers.pop()
# self.db.headers.pop()
self.db.block_hashes.pop()
if self.db._cache_all_tx_hashes:
for _ in range(prev_count - tx_count):
@ -202,7 +209,8 @@ class BlockchainReaderService(BlockchainService):
rewound = False
if self.last_state:
while True:
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
if self.db.block_hashes[-1] == self.env.coin.header_hash(
self.db.prefix_db.header.get(last_height, deserialize_value=False)):
self.log.debug("connects to block %i", last_height)
break
else: