add language table and indexes

This commit is contained in:
Jack Robison 2020-09-24 13:00:18 -04:00
parent 7000ac3f3f
commit 8dc654b513
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 44 additions and 4 deletions
lbry/wallet/server
tests/integration/blockchain

View file

@ -768,6 +768,7 @@ class LBRYBlockProcessor(BlockProcessor):
self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES') self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES')
if self.env.individual_tag_indexes: if self.env.individual_tag_indexes:
self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES') self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES')
self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES')
def advance_txs(self, height, txs, header): def advance_txs(self, height, txs, header):
timer = self.timer.sub_timers['advance_blocks'] timer = self.timer.sub_timers['advance_blocks']

View file

@ -6,7 +6,6 @@ from decimal import Decimal
from collections import namedtuple from collections import namedtuple
from multiprocessing import Manager from multiprocessing import Manager
from binascii import unhexlify from binascii import unhexlify
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger from lbry.wallet.server.util import class_logger
from lbry.wallet.database import query, constraints_to_sql from lbry.wallet.database import query, constraints_to_sql
@ -19,7 +18,7 @@ from lbry.wallet.server.db.canonical import register_canonical_functions
from lbry.wallet.server.db.full_text_search import update_full_text_search, CREATE_FULL_TEXT_SEARCH, first_sync_finished from lbry.wallet.server.db.full_text_search import update_full_text_search, CREATE_FULL_TEXT_SEARCH, first_sync_finished
from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
ATTRIBUTE_ARRAY_MAX_LENGTH = 100 ATTRIBUTE_ARRAY_MAX_LENGTH = 100
@ -117,6 +116,15 @@ class SQLDB:
create unique index if not exists tag_claim_hash_tag_idx on tag (claim_hash, tag); create unique index if not exists tag_claim_hash_tag_idx on tag (claim_hash, tag);
""" """
CREATE_LANGUAGE_TABLE = """
create table if not exists language (
language text not null,
claim_hash bytes not null,
height integer not null
);
create unique index if not exists language_claim_hash_language_idx on language (claim_hash, language);
"""
CREATE_CLAIMTRIE_TABLE = """ CREATE_CLAIMTRIE_TABLE = """
create table if not exists claimtrie ( create table if not exists claimtrie (
normalized text primary key, normalized text primary key,
@ -174,12 +182,18 @@ class SQLDB:
for tag_value, tag_key in COMMON_TAGS.items() for tag_value, tag_key in COMMON_TAGS.items()
) )
LANGUAGE_INDEXES = '\n'.join(
f"create unique index if not exists language_{language}_idx on language (language, claim_hash) WHERE language='{language}';"
for language in INDEXED_LANGUAGES
)
CREATE_TABLES_QUERY = ( CREATE_TABLES_QUERY = (
CREATE_CLAIM_TABLE + CREATE_CLAIM_TABLE +
CREATE_FULL_TEXT_SEARCH + CREATE_FULL_TEXT_SEARCH +
CREATE_SUPPORT_TABLE + CREATE_SUPPORT_TABLE +
CREATE_CLAIMTRIE_TABLE + CREATE_CLAIMTRIE_TABLE +
CREATE_TAG_TABLE CREATE_TAG_TABLE +
CREATE_LANGUAGE_TABLE
) )
def __init__( def __init__(
@ -305,7 +319,7 @@ class SQLDB:
self.execute('commit;') self.execute('commit;')
def _upsertable_claims(self, txos: List[Output], header, clear_first=False): def _upsertable_claims(self, txos: List[Output], header, clear_first=False):
claim_hashes, claims, tags = set(), [], {} claim_hashes, claims, tags, languages = set(), [], {}, {}
for txo in txos: for txo in txos:
tx = txo.tx_ref.tx tx = txo.tx_ref.tx
@ -316,6 +330,13 @@ class SQLDB:
#self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.") #self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.")
continue continue
language = None
try:
if txo.claim.is_stream and txo.claim.stream.languages:
language = txo.claim.stream.languages[0].language
except:
pass
claim_hash = txo.claim_hash claim_hash = txo.claim_hash
claim_hashes.add(claim_hash) claim_hashes.add(claim_hash)
claim_record = { claim_record = {
@ -373,6 +394,9 @@ class SQLDB:
elif claim.is_channel: elif claim.is_channel:
claim_record['claim_type'] = CLAIM_TYPES['channel'] claim_record['claim_type'] = CLAIM_TYPES['channel']
if language:
languages[(language, claim_hash)] = (language, claim_hash, tx.height)
for tag in clean_tags(claim.message.tags): for tag in clean_tags(claim.message.tags):
tags[(tag, claim_hash)] = (tag, claim_hash, tx.height) tags[(tag, claim_hash)] = (tag, claim_hash, tx.height)
@ -383,6 +407,10 @@ class SQLDB:
self.executemany( self.executemany(
"INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values() "INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values()
) )
if languages:
self.executemany(
"INSERT OR IGNORE INTO language (language, claim_hash, height) VALUES (?, ?, ?)", languages.values()
)
return claims return claims

View file

@ -262,6 +262,17 @@ class ClaimSearchCommand(ClaimTestCase):
await self.assertFindsClaims([claim4, claim3, claim2], fee_amount='<1.0', fee_currency='lbc') await self.assertFindsClaims([claim4, claim3, claim2], fee_amount='<1.0', fee_currency='lbc')
await self.assertFindsClaims([claim3], fee_amount='0.5', fee_currency='lbc') await self.assertFindsClaims([claim3], fee_amount='0.5', fee_currency='lbc')
await self.assertFindsClaims([claim5], fee_currency='usd') await self.assertFindsClaims([claim5], fee_currency='usd')
async def test_search_by_language(self):
claim1 = await self.stream_create('claim1', fee_amount='1.0', fee_currency='lbc')
claim2 = await self.stream_create('claim2', fee_amount='0.9', fee_currency='lbc')
claim3 = await self.stream_create('claim3', fee_amount='0.5', fee_currency='lbc', languages='en')
claim4 = await self.stream_create('claim4', fee_amount='0.1', fee_currency='lbc', languages='en')
claim5 = await self.stream_create('claim5', fee_amount='1.0', fee_currency='usd', languages='es')
await self.assertFindsClaims([claim4, claim3], any_languages=['en'])
await self.assertFindsClaims([claim5], any_languages=['es'])
await self.assertFindsClaims([claim5, claim4, claim3], any_languages=['en', 'es'])
await self.assertFindsClaims([], fee_currency='foo') await self.assertFindsClaims([], fee_currency='foo')
async def test_search_by_channel(self): async def test_search_by_channel(self):