diff --git a/Makefile b/Makefile index 9911c24da..a6221fa03 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,6 @@ install: --global-option=fetch \ --global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=build --global-option=--enable --global-option=fts5 - python -m pip install elasticsearch[async] pip install -e . tools: diff --git a/docker/Dockerfile.wallet_server b/docker/Dockerfile.wallet_server index 284437cd3..6cc35ff43 100644 --- a/docker/Dockerfile.wallet_server +++ b/docker/Dockerfile.wallet_server @@ -13,6 +13,8 @@ RUN apt-get update && \ wget \ tar unzip \ build-essential \ + pkg-config \ + libleveldb-dev \ python3 \ python3-dev \ python3-pip \ diff --git a/docker/docker-compose-wallet-server.yml b/docker/docker-compose-wallet-server.yml index 221dfc780..8a1af34e2 100644 --- a/docker/docker-compose-wallet-server.yml +++ b/docker/docker-compose-wallet-server.yml @@ -3,6 +3,7 @@ version: "3" volumes: lbrycrd: wallet_server: + es01: services: lbrycrd: @@ -34,3 +35,18 @@ services: # Curently not snapshot provided # - SNAPSHOT_URL=${WALLET_SERVER_SNAPSHOT_URL-https://lbry.com/snapshot/wallet} - DAEMON_URL=http://lbry:lbry@lbrycrd:9245 + es01: + image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0 + container_name: es01 + environment: + - node.name=es01 + - discovery.type=single-node + - bootstrap.memory_lock=true + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - es01:/usr/share/elasticsearch/data + ports: + - 127.0.0.1:9200:9200 diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 649972c8a..50b04ba9b 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -5,7 +5,7 @@ from decimal import Decimal from operator import itemgetter from typing import Optional, List, Iterable -from elasticsearch import AsyncElasticsearch, NotFoundError +from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch.helpers import async_bulk from lbry.crypto.base58 import Base58 @@ -14,6 +14,7 @@ from lbry.schema.result import Outputs, Censor from lbry.schema.tags import clean_tags from lbry.schema.url import URL, normalize_name from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES +from lbry.wallet.server.util import class_logger class SearchIndex: @@ -21,51 +22,54 @@ class SearchIndex: self.client: Optional[AsyncElasticsearch] = None self.index = index_prefix + 'claims' self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import + self.logger = class_logger(__name__, self.__class__.__name__) async def start(self): if self.client: return self.client = AsyncElasticsearch(timeout=self.sync_timeout) - try: - if await self.client.indices.exists(self.index): - return - await self.client.indices.create( - self.index, - { - "settings": - {"analysis": - {"analyzer": { - "default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}}, - "index": - {"refresh_interval": -1, - "number_of_shards": 1, - "number_of_replicas": 0} - }, - "mappings": { - "properties": { - "claim_id": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - }, - "type": "text", - "index_prefixes": { - "min_chars": 1, - "max_chars": 10 + while True: + try: + await self.client.cluster.health(wait_for_status='yellow') + break + except ConnectionError: + self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!") + await asyncio.sleep(1) + await self.client.indices.create( + self.index, + { + "settings": + {"analysis": + {"analyzer": { + "default": {"tokenizer": "whitespace", "filter": ["lowercase", "porter_stem"]}}}, + "index": + {"refresh_interval": -1, + "number_of_shards": 1, + "number_of_replicas": 0} + }, + "mappings": { + "properties": { + "claim_id": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" } }, - "height": {"type": "integer"}, - "claim_type": {"type": "byte"}, - "censor_type": {"type": "byte"}, - "trending_mixed": {"type": "float"}, - } + "type": "text", + "index_prefixes": { + "min_chars": 1, + "max_chars": 10 + } + }, + "height": {"type": "integer"}, + "claim_type": {"type": "byte"}, + "censor_type": {"type": "byte"}, + "trending_mixed": {"type": "float"}, } } - ) - except Exception as e: - raise + }, ignore=400 + ) def stop(self): client = self.client