Merge pull request #3301 from lbryio/no_repeat_claim_id

add `--remove_duplicates` to the search api
This commit is contained in:
Alex Grin 2021-05-28 11:01:15 -04:00 committed by GitHub
commit 935adfb51a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 213 additions and 155 deletions

File diff suppressed because one or more lines are too long

View file

@ -2352,7 +2352,7 @@ class Daemon(metaclass=JSONRPCServerType):
[--not_locations=<not_locations>...] [--not_locations=<not_locations>...]
[--order_by=<order_by>...] [--no_totals] [--page=<page>] [--page_size=<page_size>] [--order_by=<order_by>...] [--no_totals] [--page=<page>] [--page_size=<page_size>]
[--wallet_id=<wallet_id>] [--include_purchase_receipt] [--include_is_my_output] [--wallet_id=<wallet_id>] [--include_purchase_receipt] [--include_is_my_output]
[--has_source | --has_no_source] [--remove_duplicates] [--has_source | --has_no_source]
[--new_sdk_server=<new_sdk_server>] [--new_sdk_server=<new_sdk_server>]
Options: Options:
@ -2461,6 +2461,8 @@ class Daemon(metaclass=JSONRPCServerType):
has purchased the claim has purchased the claim
--include_is_my_output : (bool) lookup and include a boolean indicating --include_is_my_output : (bool) lookup and include a boolean indicating
if claim being resolved is yours if claim being resolved is yours
--remove_duplicates : (bool) removes duplicated content from search by picking either the
original claim or the oldest matching repost
--has_source : (bool) find claims containing a source field --has_source : (bool) find claims containing a source field
--has_no_source : (bool) find claims not containing a source field --has_no_source : (bool) find claims not containing a source field
--new_sdk_server=<new_sdk_server> : (str) URL of the new SDK server (EXPERIMENTAL) --new_sdk_server=<new_sdk_server> : (str) URL of the new SDK server (EXPERIMENTAL)

View file

@ -252,20 +252,15 @@ class SearchIndex:
if not kwargs['channel_id'] or not isinstance(kwargs['channel_id'], str): if not kwargs['channel_id'] or not isinstance(kwargs['channel_id'], str):
return [], 0, 0 return [], 0, 0
try: try:
if 'limit_claims_per_channel' in kwargs: return await self.search_ahead(**kwargs)
return await self.search_ahead(**kwargs), 0, 0
else:
result = (await self.search_client.search(
expand_query(**kwargs), index=self.index,
track_total_hits=False if kwargs.get('no_totals') else 10_000
))['hits']
except NotFoundError: except NotFoundError:
return [], 0, 0 return [], 0, 0
return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0) return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0)
async def search_ahead(self, **kwargs): async def search_ahead(self, **kwargs):
# 'limit_claims_per_channel' case. Fetch 1000 results, reorder, slice, inflate and return # 'limit_claims_per_channel' case. Fetch 1000 results, reorder, slice, inflate and return
per_channel_per_page = kwargs.pop('limit_claims_per_channel') per_channel_per_page = kwargs.pop('limit_claims_per_channel', 0) or 0
remove_duplicates = kwargs.pop('remove_duplicates', False)
page_size = kwargs.pop('limit', 10) page_size = kwargs.pop('limit', 10)
offset = kwargs.pop('offset', 0) offset = kwargs.pop('offset', 0)
kwargs['limit'] = 1000 kwargs['limit'] = 1000
@ -278,14 +273,37 @@ class SearchIndex:
reordered_hits = cache_item.result reordered_hits = cache_item.result
else: else:
query = expand_query(**kwargs) query = expand_query(**kwargs)
reordered_hits = await self.__search_ahead(query, page_size, per_channel_per_page) search_hits = deque((await self.search_client.search(
query, index=self.index, track_total_hits=False,
_source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height']
))['hits']['hits'])
if remove_duplicates:
search_hits = self.__remove_duplicates(search_hits)
if per_channel_per_page > 0:
reordered_hits = self.__search_ahead(search_hits, page_size, per_channel_per_page)
else:
reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits]
cache_item.result = reordered_hits cache_item.result = reordered_hits
return list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)]))) result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)])))
return result, 0, len(reordered_hits)
async def __search_ahead(self, query: dict, page_size: int, per_channel_per_page: int): def __remove_duplicates(self, search_hits: deque) -> deque:
search_hits = deque((await self.search_client.search( known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original
query, index=self.index, track_total_hits=False, _source_includes=['_id', 'channel_id'] dropped = set()
))['hits']['hits']) for hit in search_hits:
hit_height, hit_id = hit['_source']['creation_height'], hit['_source']['reposted_claim_id'] or hit['_id']
if hit_id not in known_ids:
known_ids[hit_id] = (hit_height, hit['_id'])
else:
previous_height, previous_id = known_ids[hit_id]
if hit_height < previous_height:
known_ids[hit_id] = (hit_height, hit['_id'])
dropped.add(previous_id)
else:
dropped.add(hit['_id'])
return deque(hit for hit in search_hits if hit['_id'] not in dropped)
def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int):
reordered_hits = [] reordered_hits = []
channel_counters = Counter() channel_counters = Counter()
next_page_hits_maybe_check_later = deque() next_page_hits_maybe_check_later = deque()
@ -298,7 +316,7 @@ class SearchIndex:
break # means last page was incomplete and we are left with bad replacements break # means last page was incomplete and we are left with bad replacements
for _ in range(len(next_page_hits_maybe_check_later)): for _ in range(len(next_page_hits_maybe_check_later)):
claim_id, channel_id = next_page_hits_maybe_check_later.popleft() claim_id, channel_id = next_page_hits_maybe_check_later.popleft()
if channel_counters[channel_id] < per_channel_per_page: if per_channel_per_page > 0 and channel_counters[channel_id] < per_channel_per_page:
reordered_hits.append((claim_id, channel_id)) reordered_hits.append((claim_id, channel_id))
channel_counters[channel_id] += 1 channel_counters[channel_id] += 1
else: else:
@ -306,7 +324,7 @@ class SearchIndex:
while search_hits: while search_hits:
hit = search_hits.popleft() hit = search_hits.popleft()
hit_id, hit_channel_id = hit['_id'], hit['_source']['channel_id'] hit_id, hit_channel_id = hit['_id'], hit['_source']['channel_id']
if hit_channel_id is None: if hit_channel_id is None or per_channel_per_page <= 0:
reordered_hits.append((hit_id, hit_channel_id)) reordered_hits.append((hit_id, hit_channel_id))
elif channel_counters[hit_channel_id] < per_channel_per_page: elif channel_counters[hit_channel_id] < per_channel_per_page:
reordered_hits.append((hit_id, hit_channel_id)) reordered_hits.append((hit_id, hit_channel_id))

View file

@ -398,6 +398,32 @@ class ClaimSearchCommand(ClaimTestCase):
limit_claims_per_channel=3, claim_type='stream' limit_claims_per_channel=3, claim_type='stream'
) )
async def test_no_duplicates(self):
await self.generate(10)
match = self.assertFindsClaims
claims = []
channels = []
first = await self.stream_create('original_claim0')
second = await self.stream_create('original_claim1')
for i in range(10):
repost_id = self.get_claim_id(second if i % 2 == 0 else first)
channel = await self.channel_create(f'@chan{i}', bid='0.001')
channels.append(channel)
claims.append(
await self.stream_repost(repost_id, f'claim{i}', bid='0.001', channel_id=self.get_claim_id(channel)))
await match([first, second] + channels,
remove_duplicates=True, order_by=['^height'])
await match(list(reversed(channels)) + [second, first],
remove_duplicates=True, order_by=['height'])
# the original claims doesn't show up, so we pick the oldest reposts
await match([channels[0], claims[0], channels[1], claims[1]] + channels[2:],
height='>218',
remove_duplicates=True, order_by=['^height'])
# limit claims per channel, invert order, oldest ones are still chosen
await match(channels[2:][::-1] + [claims[1], channels[1], claims[0], channels[0]],
height='>218', limit_claims_per_channel=1,
remove_duplicates=True, order_by=['height'])
async def test_limit_claims_per_channel_across_sorted_pages(self): async def test_limit_claims_per_channel_across_sorted_pages(self):
await self.generate(10) await self.generate(10)
match = self.assertFindsClaims match = self.assertFindsClaims
@ -429,6 +455,12 @@ class ClaimSearchCommand(ClaimTestCase):
[claims[6], claims[7], last], page_size=4, page=3, [claims[6], claims[7], last], page_size=4, page=3,
limit_claims_per_channel=1, claim_type='stream', order_by=['^height'] limit_claims_per_channel=1, claim_type='stream', order_by=['^height']
) )
# feature disabled on 0 or negative values
for limit in [None, 0, -1]:
await match(
[first, second] + claims + [last],
limit_claims_per_channel=limit, claim_type='stream', order_by=['^height']
)
async def test_claim_type_and_media_type_search(self): async def test_claim_type_and_media_type_search(self):
# create an invalid/unknown claim # create an invalid/unknown claim