forked from LBRYCommunity/lbry-sdk
Merge pull request #3443 from lbryio/fix-resolve-reposted-channel
Fix including channels for reposted claims when resolving a repost
This commit is contained in:
commit
02d356ef12
6 changed files with 70 additions and 27 deletions
|
@ -180,7 +180,13 @@ class Outputs:
|
|||
if blocked is not None:
|
||||
blocked.to_message(page, extra_txo_rows)
|
||||
for row in extra_txo_rows:
|
||||
cls.encode_txo(page.extra_txos.add(), row)
|
||||
txo_message: 'OutputsMessage' = page.extra_txos.add()
|
||||
if not isinstance(row, Exception):
|
||||
if row.channel_hash:
|
||||
set_reference(txo_message.claim.channel, row.channel_hash, extra_txo_rows)
|
||||
if row.reposted_claim_hash:
|
||||
set_reference(txo_message.claim.repost, row.reposted_claim_hash, extra_txo_rows)
|
||||
cls.encode_txo(txo_message, row)
|
||||
|
||||
for row in txo_rows:
|
||||
# cls.row_to_message(row, page.txos.add(), extra_txo_rows)
|
||||
|
|
|
@ -345,6 +345,11 @@ class BlockProcessor:
|
|||
"applying extended claim expiration fork on claims accepted by, %i", self.height
|
||||
)
|
||||
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
|
||||
if self.db.first_sync:
|
||||
self.db.search_index.clear_caches()
|
||||
self.touched_claims_to_send_es.clear()
|
||||
self.removed_claims_to_send_es.clear()
|
||||
self.activation_info_to_send_es.clear()
|
||||
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||
if not self.db.first_sync:
|
||||
await self.db.reload_blocking_filtering_streams()
|
||||
|
|
|
@ -58,6 +58,13 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
|
|||
OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, ResolveCensoredError, LookupError, ValueError]]
|
||||
|
||||
|
||||
class ExpandedResolveResult(typing.NamedTuple):
|
||||
stream: OptionalResolveResultOrError
|
||||
channel: OptionalResolveResultOrError
|
||||
repost: OptionalResolveResultOrError
|
||||
reposted_channel: OptionalResolveResultOrError
|
||||
|
||||
|
||||
class DBError(Exception):
|
||||
"""Raised on general DB errors generally indicating corruption."""
|
||||
|
||||
|
@ -228,8 +235,8 @@ class LevelDB:
|
|||
signature_valid=None if not channel_hash else signature_valid
|
||||
)
|
||||
|
||||
def _resolve(self, name: str, claim_id: Optional[str] = None,
|
||||
amount_order: Optional[int] = None) -> Optional[ResolveResult]:
|
||||
def _resolve_parsed_url(self, name: str, claim_id: Optional[str] = None,
|
||||
amount_order: Optional[int] = None) -> Optional[ResolveResult]:
|
||||
"""
|
||||
:param normalized_name: name
|
||||
:param claim_id: partial or complete claim id
|
||||
|
@ -296,12 +303,11 @@ class LevelDB:
|
|||
return
|
||||
return list(sorted(candidates, key=lambda item: item[1]))[0]
|
||||
|
||||
def _fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError,
|
||||
OptionalResolveResultOrError]:
|
||||
def _resolve(self, url) -> ExpandedResolveResult:
|
||||
try:
|
||||
parsed = URL.parse(url)
|
||||
except ValueError as e:
|
||||
return e, None, None
|
||||
return ExpandedResolveResult(e, None, None, None)
|
||||
|
||||
stream = channel = resolved_channel = resolved_stream = None
|
||||
if parsed.has_stream_in_channel:
|
||||
|
@ -312,9 +318,9 @@ class LevelDB:
|
|||
elif parsed.has_stream:
|
||||
stream = parsed.stream
|
||||
if channel:
|
||||
resolved_channel = self._resolve(channel.name, channel.claim_id, channel.amount_order)
|
||||
resolved_channel = self._resolve_parsed_url(channel.name, channel.claim_id, channel.amount_order)
|
||||
if not resolved_channel:
|
||||
return None, LookupError(f'Could not find channel in "{url}".'), None
|
||||
return ExpandedResolveResult(None, LookupError(f'Could not find channel in "{url}".'), None, None)
|
||||
if stream:
|
||||
if resolved_channel:
|
||||
stream_claim = self._resolve_claim_in_channel(resolved_channel.claim_hash, stream.normalized)
|
||||
|
@ -322,13 +328,14 @@ class LevelDB:
|
|||
stream_claim_id, stream_tx_num, stream_tx_pos, effective_amount = stream_claim
|
||||
resolved_stream = self._fs_get_claim_by_hash(stream_claim_id)
|
||||
else:
|
||||
resolved_stream = self._resolve(stream.name, stream.claim_id, stream.amount_order)
|
||||
resolved_stream = self._resolve_parsed_url(stream.name, stream.claim_id, stream.amount_order)
|
||||
if not channel and not resolved_channel and resolved_stream and resolved_stream.channel_hash:
|
||||
resolved_channel = self._fs_get_claim_by_hash(resolved_stream.channel_hash)
|
||||
if not resolved_stream:
|
||||
return LookupError(f'Could not find claim at "{url}".'), None, None
|
||||
return ExpandedResolveResult(LookupError(f'Could not find claim at "{url}".'), None, None, None)
|
||||
|
||||
repost = None
|
||||
reposted_channel = None
|
||||
if resolved_stream or resolved_channel:
|
||||
claim_hash = resolved_stream.claim_hash if resolved_stream else resolved_channel.claim_hash
|
||||
claim = resolved_stream if resolved_stream else resolved_channel
|
||||
|
@ -338,14 +345,17 @@ class LevelDB:
|
|||
reposted_claim_hash) or self.blocked_channels.get(claim.channel_hash)
|
||||
if blocker_hash:
|
||||
reason_row = self._fs_get_claim_by_hash(blocker_hash)
|
||||
return None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row), None
|
||||
return ExpandedResolveResult(
|
||||
None, ResolveCensoredError(url, blocker_hash, censor_row=reason_row), None, None
|
||||
)
|
||||
if claim.reposted_claim_hash:
|
||||
repost = self._fs_get_claim_by_hash(claim.reposted_claim_hash)
|
||||
return resolved_stream, resolved_channel, repost
|
||||
if repost and repost.channel_hash and repost.signature_valid:
|
||||
reposted_channel = self._fs_get_claim_by_hash(repost.channel_hash)
|
||||
return ExpandedResolveResult(resolved_stream, resolved_channel, repost, reposted_channel)
|
||||
|
||||
async def fs_resolve(self, url) -> typing.Tuple[OptionalResolveResultOrError, OptionalResolveResultOrError,
|
||||
OptionalResolveResultOrError]:
|
||||
return await asyncio.get_event_loop().run_in_executor(None, self._fs_resolve, url)
|
||||
async def resolve(self, url) -> ExpandedResolveResult:
|
||||
return await asyncio.get_event_loop().run_in_executor(None, self._resolve, url)
|
||||
|
||||
def _fs_get_claim_by_hash(self, claim_hash):
|
||||
claim = self.claim_to_txo.get(claim_hash)
|
||||
|
|
|
@ -1019,7 +1019,7 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.pending_query_metric.inc()
|
||||
if 'channel' in kwargs:
|
||||
channel_url = kwargs.pop('channel')
|
||||
_, channel_claim, _ = await self.db.fs_resolve(channel_url)
|
||||
_, channel_claim, _, _ = await self.db.resolve(channel_url)
|
||||
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
|
||||
return Outputs.to_base64([], [], 0, None, None)
|
||||
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
||||
|
@ -1036,12 +1036,11 @@ class LBRYElectrumX(SessionBase):
|
|||
self.session_mgr.pending_query_metric.dec()
|
||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
async def claimtrie_resolve(self, *urls):
|
||||
def _claimtrie_resolve(self, *urls):
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc()
|
||||
stream, channel, repost = await self.db.fs_resolve(url)
|
||||
self.session_mgr.resolved_url_count_metric.inc()
|
||||
stream, channel, repost, reposted_channel = self.db._resolve(url)
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
extra.append(channel.censor_row)
|
||||
|
@ -1053,6 +1052,8 @@ class LBRYElectrumX(SessionBase):
|
|||
# print("resolved channel", channel.name.decode())
|
||||
if repost:
|
||||
extra.append(repost)
|
||||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
elif stream:
|
||||
# print("resolved stream", stream.name.decode())
|
||||
rows.append(stream)
|
||||
|
@ -1061,9 +1062,16 @@ class LBRYElectrumX(SessionBase):
|
|||
extra.append(channel)
|
||||
if repost:
|
||||
extra.append(repost)
|
||||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
|
||||
return Outputs.to_base64(rows, extra, 0, None, None)
|
||||
|
||||
async def claimtrie_resolve(self, *urls):
|
||||
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
|
||||
self.session_mgr.resolved_url_count_metric.inc(len(urls))
|
||||
return result
|
||||
|
||||
async def get_server_height(self):
|
||||
return self.bp.height
|
||||
|
||||
|
|
|
@ -1433,7 +1433,11 @@ class StreamCommands(ClaimTestCase):
|
|||
self.assertTrue(signed['outputs'][0]['is_channel_signature_valid'])
|
||||
|
||||
async def test_repost(self):
|
||||
await self.channel_create('@goodies', '1.0')
|
||||
tx = await self.channel_create('@goodies', '1.0')
|
||||
goodies_claim_id = self.get_claim_id(tx)
|
||||
tx = await self.channel_create('@spam', '1.0')
|
||||
spam_claim_id = self.get_claim_id(tx)
|
||||
|
||||
tx = await self.stream_create('newstuff', '1.1', channel_name='@goodies', tags=['foo', 'gaming'])
|
||||
claim_id = self.get_claim_id(tx)
|
||||
|
||||
|
@ -1441,8 +1445,18 @@ class StreamCommands(ClaimTestCase):
|
|||
self.assertItemCount(await self.daemon.jsonrpc_txo_list(reposted_claim_id=claim_id), 0)
|
||||
self.assertItemCount(await self.daemon.jsonrpc_txo_list(type='repost'), 0)
|
||||
|
||||
tx = await self.stream_repost(claim_id, 'newstuff-again', '1.1')
|
||||
tx = await self.stream_repost(claim_id, 'newstuff-again', '1.1', channel_name='@spam')
|
||||
repost_id = self.get_claim_id(tx)
|
||||
|
||||
# test inflating reposted channels works
|
||||
repost_url = f'newstuff-again:{repost_id}'
|
||||
self.ledger._tx_cache.clear()
|
||||
self.assertEqual(
|
||||
goodies_claim_id,
|
||||
(await self.out(self.daemon.jsonrpc_resolve(repost_url))
|
||||
)[repost_url]['reposted_claim']['signing_channel']['claim_id']
|
||||
)
|
||||
|
||||
self.assertItemCount(await self.daemon.jsonrpc_claim_list(claim_type='repost'), 1)
|
||||
self.assertEqual((await self.claim_search(name='newstuff'))[0]['meta']['reposted'], 1)
|
||||
self.assertEqual((await self.claim_search(reposted_claim_id=claim_id))[0]['claim_id'], repost_id)
|
||||
|
|
|
@ -35,7 +35,7 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
|
||||
async def assertNoClaimForName(self, name: str):
|
||||
lbrycrd_winning = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
|
||||
stream, channel, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(name)
|
||||
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
|
||||
self.assertNotIn('claimId', lbrycrd_winning)
|
||||
if stream is not None:
|
||||
self.assertIsInstance(stream, LookupError)
|
||||
|
@ -55,7 +55,7 @@ class BaseResolveTestCase(CommandTestCase):
|
|||
|
||||
async def assertMatchWinningClaim(self, name):
|
||||
expected = json.loads(await self.blockchain._cli_cmnd('getvalueforname', name))
|
||||
stream, channel, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(name)
|
||||
stream, channel, _, _ = await self.conductor.spv_node.server.bp.db.resolve(name)
|
||||
claim = stream if stream else channel
|
||||
claim_from_es = await self.conductor.spv_node.server.bp.db.search_index.search(
|
||||
claim_id=claim.claim_hash.hex()
|
||||
|
@ -983,7 +983,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
await self.generate(32 * 10 - 1)
|
||||
self.assertEqual(1120, self.conductor.spv_node.server.bp.db.db_height)
|
||||
claim_id_B = (await self.stream_create(name, '20.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
|
||||
claim_B, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_B}")
|
||||
claim_B, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_B}")
|
||||
self.assertEqual(1121, self.conductor.spv_node.server.bp.db.db_height)
|
||||
self.assertEqual(1131, claim_B.activation_height)
|
||||
await self.assertMatchClaimIsWinning(name, claim_id_A)
|
||||
|
@ -1000,7 +1000,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
# State: A(10+14) is controlling, B(20) is accepted, C(50) is accepted.
|
||||
claim_id_C = (await self.stream_create(name, '50.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
|
||||
self.assertEqual(1123, self.conductor.spv_node.server.bp.db.db_height)
|
||||
claim_C, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_C}")
|
||||
claim_C, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_C}")
|
||||
self.assertEqual(1133, claim_C.activation_height)
|
||||
await self.assertMatchClaimIsWinning(name, claim_id_A)
|
||||
|
||||
|
@ -1018,7 +1018,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
# State: A(10+14) is controlling, B(20) is active, C(50) is accepted, D(300) is accepted.
|
||||
claim_id_D = (await self.stream_create(name, '300.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
|
||||
self.assertEqual(1132, self.conductor.spv_node.server.bp.db.db_height)
|
||||
claim_D, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}")
|
||||
claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}")
|
||||
self.assertEqual(False, claim_D.is_controlling)
|
||||
self.assertEqual(801, claim_D.last_takeover_height)
|
||||
self.assertEqual(1142, claim_D.activation_height)
|
||||
|
@ -1028,7 +1028,7 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
|
|||
# State: A(10+14) is active, B(20) is active, C(50) is active, D(300) is controlling
|
||||
await self.generate(1)
|
||||
self.assertEqual(1133, self.conductor.spv_node.server.bp.db.db_height)
|
||||
claim_D, _, _ = await self.conductor.spv_node.server.bp.db.fs_resolve(f"{name}:{claim_id_D}")
|
||||
claim_D, _, _, _ = await self.conductor.spv_node.server.bp.db.resolve(f"{name}:{claim_id_D}")
|
||||
self.assertEqual(True, claim_D.is_controlling)
|
||||
self.assertEqual(1133, claim_D.last_takeover_height)
|
||||
self.assertEqual(1133, claim_D.activation_height)
|
||||
|
|
Loading…
Reference in a new issue