move elasticsearch things into its own module

This commit is contained in:
Victor Shyba 2021-03-15 16:08:04 -03:00
parent cd66f7eb43
commit d855e6c8b1
6 changed files with 71 additions and 76 deletions

View file

@ -0,0 +1 @@
from .search import SearchIndex

View file

@ -0,0 +1,61 @@
INDEX_DEFAULT_SETTINGS = {
"settings":
{"analysis":
{"analyzer": {
"default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}},
"index":
{"refresh_interval": -1,
"number_of_shards": 1,
"number_of_replicas": 0,
"sort": {
"field": ["trending_mixed", "release_time"],
"order": ["desc", "desc"]
}}
},
"mappings": {
"properties": {
"claim_id": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text",
"index_prefixes": {
"min_chars": 1,
"max_chars": 10
}
},
"height": {"type": "integer"},
"claim_type": {"type": "byte"},
"censor_type": {"type": "byte"},
"trending_mixed": {"type": "float"},
"release_time": {"type": "long"},
}
}
}
FIELDS = {'is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount',
'timestamp', 'creation_timestamp', 'height', 'creation_height', 'activation_height', 'expiration_height',
'release_time', 'short_url', 'canonical_url', 'title', 'author', 'description', 'claim_type', 'reposted',
'stream_type', 'media_type', 'fee_amount', 'fee_currency', 'duration', 'reposted_claim_hash', 'censor_type',
'claims_in_channel', 'channel_join', 'signature_valid', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'channel_id', 'tx_id', 'tx_nout',
'signature', 'signature_digest', 'public_key_bytes', 'public_key_hash', 'public_key_id', '_id', 'tags',
'reposted_claim_id', 'has_source'}
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id',
'media_type', 'normalized', 'public_key_bytes', 'public_key_hash', 'short_url', 'signature',
'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'}
RANGE_FIELDS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'censor_type',
'trending_local', 'trending_global',
}
REPLACEMENTS = {
'name': 'normalized',
'txid': 'tx_id',
'claim_hash': '_id'
}

View file

@ -15,6 +15,8 @@ from lbry.schema.tags import clean_tags
from lbry.schema.url import URL, normalize_name
from lbry.utils import LRUCache
from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES
from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \
RANGE_FIELDS
from lbry.wallet.server.util import class_logger
@ -51,46 +53,7 @@ class SearchIndex:
except ConnectionError:
self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!")
await asyncio.sleep(1)
res = await self.client.indices.create(
self.index,
{
"settings":
{"analysis":
{"analyzer": {
"default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}},
"index":
{"refresh_interval": -1,
"number_of_shards": 1,
"number_of_replicas": 0,
"sort": {
"field": ["trending_mixed", "release_time"],
"order": ["desc", "desc"]
}}
},
"mappings": {
"properties": {
"claim_id": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text",
"index_prefixes": {
"min_chars": 1,
"max_chars": 10
}
},
"height": {"type": "integer"},
"claim_type": {"type": "byte"},
"censor_type": {"type": "byte"},
"trending_mixed": {"type": "float"},
"release_time": {"type": "long"},
}
}
}, ignore=400
)
res = await self.client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
return res.get('acknowledged', False)
def stop(self):
@ -230,7 +193,6 @@ class SearchIndex:
if channel_id:
query['channel_id'] = channel_id
query['order_by'] = ['^channel_join']
query['channel_id'] = channel_id
query['signature_valid'] = True
else:
query['order_by'] = '^creation_height'
@ -242,10 +204,9 @@ class SearchIndex:
async def search(self, **kwargs):
if 'channel' in kwargs:
channel_id = await self.resolve_url(kwargs.pop('channel'))
if not channel_id or not isinstance(channel_id, str):
kwargs['channel_id'] = await self.resolve_url(kwargs.pop('channel'))
if not kwargs['channel_id'] or not isinstance(kwargs['channel_id'], str):
return [], 0, 0
kwargs['channel_id'] = channel_id
try:
result = (await self.search_client.search(
expand_query(**kwargs), index=self.index, track_total_hits=False if kwargs.get('no_totals') else 10_000
@ -365,34 +326,7 @@ def extract_doc(doc, index):
doc['claim_type'] = doc.get('claim_type', 0) or 0
doc['stream_type'] = int(doc.get('stream_type', 0) or 0)
doc['has_source'] = bool(doc['has_source'])
return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update',
'doc_as_upsert': True}
FIELDS = {'is_controlling', 'last_take_over_height', 'claim_id', 'claim_name', 'normalized', 'tx_position', 'amount',
'timestamp', 'creation_timestamp', 'height', 'creation_height', 'activation_height', 'expiration_height',
'release_time', 'short_url', 'canonical_url', 'title', 'author', 'description', 'claim_type', 'reposted',
'stream_type', 'media_type', 'fee_amount', 'fee_currency', 'duration', 'reposted_claim_hash', 'censor_type',
'claims_in_channel', 'channel_join', 'signature_valid', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'trending_local', 'trending_global', 'channel_id', 'tx_id', 'tx_nout',
'signature', 'signature_digest', 'public_key_bytes', 'public_key_hash', 'public_key_id', '_id', 'tags',
'reposted_claim_id', 'has_source'}
TEXT_FIELDS = {'author', 'canonical_url', 'channel_id', 'claim_name', 'description', 'claim_id',
'media_type', 'normalized', 'public_key_bytes', 'public_key_hash', 'short_url', 'signature',
'signature_digest', 'stream_type', 'title', 'tx_id', 'fee_currency', 'reposted_claim_id', 'tags'}
RANGE_FIELDS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed', 'censor_type',
'trending_local', 'trending_global',
}
REPLACEMENTS = {
'name': 'normalized',
'txid': 'tx_id',
'claim_hash': '_id'
}
return {'doc': doc, '_id': doc['claim_id'], '_index': index, '_op_type': 'update', 'doc_as_upsert': True}
def expand_query(**kwargs):

View file

@ -9,7 +9,7 @@ import apsw
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
from lbry.wallet.server.db.elastic_search import extract_doc, SearchIndex
from .search import extract_doc, SearchIndex
INDEX = 'claims'

View file

@ -1,5 +1,4 @@
import os
from asyncio import Queue
import apsw
from typing import Union, Tuple, Set, List
@ -20,7 +19,7 @@ from lbry.wallet.server.db.canonical import register_canonical_functions
from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
from .elastic_search import SearchIndex
from lbry.wallet.server.db.elasticsearch import SearchIndex
ATTRIBUTE_ARRAY_MAX_LENGTH = 100

View file

@ -30,7 +30,7 @@ setup(
'lbrynet=lbry.extras.cli:main',
'torba-server=lbry.wallet.server.cli:main',
'orchstr8=lbry.wallet.orchstr8.cli:main',
'torba-elastic-sync=lbry.wallet.server.db.elastic_sync:run_elastic_sync'
'torba-elastic-sync=lbry.wallet.server.db.elasticsearch.sync:run_elastic_sync'
],
},
install_requires=[