bid ordered resolve, feed ES claim data from block processor

This commit is contained in:
Jack Robison 2021-05-27 13:35:41 -04:00 committed by Victor Shyba
parent 8711ece274
commit 966f47a5b1
5 changed files with 187 additions and 58 deletions

View file

@ -18,8 +18,8 @@ from lbry.crypto.hash import hash160
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrieSupport
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation
from lbry.wallet.server.db.claimtrie import get_remove_name_ops
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue
from lbry.wallet.server.udp import StatusServer
@ -237,6 +237,75 @@ class BlockProcessor:
self.possible_future_activated_claim: Dict[Tuple[str, bytes], int] = {}
self.possible_future_support_txos: DefaultDict[bytes, List[Tuple[int, int]]] = defaultdict(list)
self.removed_claims_to_send_es = set()
self.touched_claims_to_send_es = set()
def claim_producer(self):
if self.db.db_height <= 1:
return
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
for claim_hash in self.touched_claims_to_send_es:
claim = self.db._fs_get_claim_by_hash(claim_hash)
yield ('update', {
'claim_hash': claim_hash,
# 'claim_id': claim_hash.hex(),
'claim_name': claim.name,
'normalized': claim.name,
'tx_id': claim.tx_hash[::-1].hex(),
'tx_nout': claim.position,
'amount': claim.amount,
'timestamp': 0,
'creation_timestamp': 0,
'height': claim.height,
'creation_height': claim.creation_height,
'activation_height': claim.activation_height,
'expiration_height': claim.expiration_height,
'effective_amount': claim.effective_amount,
'support_amount': claim.support_amount,
'is_controlling': claim.is_controlling,
'last_take_over_height': claim.last_takeover_height,
'short_url': '',
'canonical_url': '',
'release_time': 0,
'title': '',
'author': '',
'description': '',
'claim_type': 0,
'has_source': False,
'stream_type': '',
'media_type': '',
'fee_amount': 0,
'fee_currency': '',
'duration': 0,
'reposted': 0,
'reposted_claim_hash': None,
'reposted_claim_type': None,
'reposted_has_source': False,
'channel_hash': None,
'public_key_bytes': None,
'public_key_hash': None,
'signature': None,
'signature_digest': None,
'signature_valid': False,
'claims_in_channel': 0,
'tags': [],
'languages': [],
'censor_type': 0,
'censoring_channel_hash': None,
# 'trending_group': 0,
# 'trending_mixed': 0,
# 'trending_local': 0,
# 'trending_global': 0,
})
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
# cancellations from shutdown don't lose work - when the task
@ -266,12 +335,15 @@ class BlockProcessor:
try:
for block in blocks:
await self.run_in_thread_with_lock(self.advance_block, block)
await self.db.search_index.claim_consumer(self.claim_producer())
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
print("******************\n")
except:
self.logger.exception("advance blocks failed")
raise
# if self.sql:
# await self.db.search_index.claim_consumer(self.db.claim_producer())
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear() # TODO: is this needed?
@ -948,6 +1020,43 @@ class BlockProcessor:
if (controlling and winning != controlling.claim_hash) or (not controlling and winning):
print(f"\ttakeover from abandoned support {controlling.claim_hash.hex()} -> {winning.hex()}")
ops.extend(get_takeover_name_ops(name, winning, height, controlling))
# gather cumulative removed/touched sets to update the search index
self.removed_claims_to_send_es.update(set(self.staged_pending_abandoned.keys()))
self.touched_claims_to_send_es.update(
set(self.staged_activated_support.keys()).union(set(claim_hash for (_, claim_hash) in self.staged_activated_claim.keys())).difference(
self.removed_claims_to_send_es)
)
# for use the cumulative changes to now update bid ordered resolve
for removed in self.removed_claims_to_send_es:
removed_claim = self.db.get_claim_txo(removed)
if not removed_claim:
continue
k, v = removed_claim
name, tx_num, position = v.name, k.tx_num, k.position
ops.extend(get_remove_effective_amount_ops(
name, self.db.get_effective_amount(removed), tx_num, position, removed
))
for touched in self.touched_claims_to_send_es:
if touched in self.pending_claim_txos:
pending = self.pending_claims[self.pending_claim_txos[touched]]
name, tx_num, position = pending.name, pending.tx_num, pending.position
claim_from_db = self.db.get_claim_txo(touched)
if claim_from_db:
k, v = claim_from_db
prev_tx_num, prev_position = k.tx_num, k.position
ops.extend(get_remove_effective_amount_ops(
name, self.db.get_effective_amount(touched), prev_tx_num, prev_position, touched
))
else:
k, v = self.db.get_claim_txo(touched)
name, tx_num, position = v.name, k.tx_num, k.position
ops.extend(get_remove_effective_amount_ops(
name, self.db.get_effective_amount(touched), tx_num, position, touched
))
ops.extend(get_add_effective_amount_ops(name, self._get_pending_effective_amount(name, touched),
tx_num, position, touched))
return ops
def advance_block(self, block):
@ -1060,8 +1169,6 @@ class BlockProcessor:
self.db.flush_dbs(self.flush_data())
# self.effective_amount_changes.clear()
self.pending_claims.clear()
self.pending_claim_txos.clear()
self.pending_supports.clear()

View file

@ -2,7 +2,7 @@ import typing
from typing import Optional
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE
@ -115,6 +115,18 @@ def get_takeover_name_ops(name: str, claim_hash: bytes, takeover_height: int,
]
def get_remove_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes):
return [
RevertableDelete(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash))
]
def get_add_effective_amount_ops(name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes):
return [
RevertablePut(*EffectiveAmountPrefixRow.pack_item(name, effective_amount, tx_num, position, claim_hash))
]
class StagedClaimtrieItem(typing.NamedTuple):
name: str
claim_hash: bytes

View file

@ -170,48 +170,43 @@ class SearchIndex:
self.claim_cache.clear()
self.resolution_cache.clear()
async def session_query(self, query_name, kwargs):
offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0
async def cached_search(self, kwargs):
total_referenced = []
if query_name == 'resolve':
total_referenced, response, censor = await self.resolve(*kwargs)
else:
cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache)
if cache_item.result is not None:
cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache)
if cache_item.result is not None:
return cache_item.result
async with cache_item.lock:
if cache_item.result:
return cache_item.result
async with cache_item.lock:
if cache_item.result:
return cache_item.result
censor = Censor(Censor.SEARCH)
if kwargs.get('no_totals'):
response, offset, total = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
else:
response, offset, total = await self.search(**kwargs)
censor.apply(response)
censor = Censor(Censor.SEARCH)
if kwargs.get('no_totals'):
response, offset, total = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
else:
response, offset, total = await self.search(**kwargs)
censor.apply(response)
total_referenced.extend(response)
if censor.censored:
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
total_referenced.extend(response)
if censor.censored:
response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED)
total_referenced.extend(response)
result = Outputs.to_base64(
response, await self._get_referenced_rows(total_referenced), offset, total, censor
)
cache_item.result = result
return result
return Outputs.to_base64(response, await self._get_referenced_rows(total_referenced), offset, total, censor)
result = Outputs.to_base64(
response, await self._get_referenced_rows(total_referenced), offset, total, censor
)
cache_item.result = result
return result
async def resolve(self, *urls):
censor = Censor(Censor.RESOLVE)
results = [await self.resolve_url(url) for url in urls]
# just heat the cache
await self.populate_claim_cache(*filter(lambda x: isinstance(x, str), results))
results = [self._get_from_cache_or_error(url, result) for url, result in zip(urls, results)]
censored = [
result if not isinstance(result, dict) or not censor.censor(result)
else ResolveCensoredError(url, result['censoring_channel_id'])
for url, result in zip(urls, results)
]
return results, censored, censor
# async def resolve(self, *urls):
# censor = Censor(Censor.RESOLVE)
# results = [await self.resolve_url(url) for url in urls]
# # just heat the cache
# await self.populate_claim_cache(*filter(lambda x: isinstance(x, str), results))
# results = [self._get_from_cache_or_error(url, result) for url, result in zip(urls, results)]
#
# censored = [
# result if not isinstance(result, dict) or not censor.censor(result)
# else ResolveCensoredError(url, result['censoring_channel_hash'])
# for url, result in zip(urls, results)
# ]
# return results, censored, censor
def _get_from_cache_or_error(self, url: str, resolution: Union[LookupError, StreamResolution, ChannelResolution]):
cached = self.claim_cache.get(resolution)
@ -432,10 +427,11 @@ def extract_doc(doc, index):
doc['reposted_claim_id'] = None
channel_hash = doc.pop('channel_hash')
doc['channel_id'] = channel_hash[::-1].hex() if channel_hash else channel_hash
doc['censoring_channel_id'] = doc.get('censoring_channel_id')
txo_hash = doc.pop('txo_hash')
doc['tx_id'] = txo_hash[:32][::-1].hex()
doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0]
channel_hash = doc.pop('censoring_channel_hash')
doc['censoring_channel_hash'] = channel_hash[::-1].hex() if channel_hash else channel_hash
# txo_hash = doc.pop('txo_hash')
# doc['tx_id'] = txo_hash[:32][::-1].hex()
# doc['tx_nout'] = struct.unpack('<I', txo_hash[32:])[0]
doc['repost_count'] = doc.pop('reposted')
doc['is_controlling'] = bool(doc['is_controlling'])
doc['signature'] = (doc.pop('signature') or b'').hex() or None
@ -613,6 +609,8 @@ def expand_result(results):
result['reposted'] = result.pop('repost_count')
result['signature_valid'] = result.pop('is_signature_valid')
result['normalized'] = result.pop('normalized_name')
# if result['censoring_channel_hash']:
# result['censoring_channel_hash'] = unhexlify(result['censoring_channel_hash'])[::-1]
expanded.append(result)
if inner_hits:
return expand_result(inner_hits)

View file

@ -1001,13 +1001,30 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def run_and_cache_query(self, query_name, kwargs):
# async def run_and_cache_query(self, query_name, kwargs):
# start = time.perf_counter()
# if isinstance(kwargs, dict):
# kwargs['release_time'] = format_release_time(kwargs.get('release_time'))
# try:
# self.session_mgr.pending_query_metric.inc()
# return await self.db.search_index.session_query(query_name, kwargs)
# except ConnectionTimeout:
# self.session_mgr.interrupt_count_metric.inc()
# raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
# finally:
# self.session_mgr.pending_query_metric.dec()
# self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def mempool_compact_histogram(self):
return self.mempool.compact_fee_histogram()
async def claimtrie_search(self, **kwargs):
start = time.perf_counter()
if isinstance(kwargs, dict):
kwargs['release_time'] = format_release_time(kwargs.get('release_time'))
try:
self.session_mgr.pending_query_metric.inc()
return await self.db.search_index.session_query(query_name, kwargs)
return await self.db.search_index.cached_search(kwargs)
except ConnectionTimeout:
self.session_mgr.interrupt_count_metric.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
@ -1015,14 +1032,6 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
async def mempool_compact_histogram(self):
return self.mempool.compact_fee_histogram()
async def claimtrie_search(self, **kwargs):
raise NotImplementedError()
# if kwargs:
# return await self.run_and_cache_query('search', kwargs)
async def claimtrie_resolve(self, *urls):
rows, extra = [], []
for url in urls:

View file

@ -182,8 +182,11 @@ class ResolveCommand(BaseResolveTestCase):
async def test_advanced_resolve(self):
claim_id1 = self.get_claim_id(
await self.stream_create('foo', '0.7', allow_duplicate_name=True))
await self.assertResolvesToClaimId('foo$1', claim_id1)
claim_id2 = self.get_claim_id(
await self.stream_create('foo', '0.8', allow_duplicate_name=True))
await self.assertResolvesToClaimId('foo$1', claim_id2)
await self.assertResolvesToClaimId('foo$2', claim_id1)
claim_id3 = self.get_claim_id(
await self.stream_create('foo', '0.9', allow_duplicate_name=True))
# plain winning claim