diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index b119b436f..3e36db133 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -27,6 +27,8 @@ def main(): coin_class = get_coin_class(args.spvserver) logging.basicConfig(level=logging.INFO) logging.info('lbry.server starting') + logging.getLogger('aiohttp').setLevel(logging.WARNING) + logging.getLogger('elasticsearch').setLevel(logging.WARNING) try: server = Server(Env(coin_class)) server.run() diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index a5ef0b8d1..6583615c2 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -33,7 +33,7 @@ class StreamResolution(str): class SearchIndex: - def __init__(self, index_prefix: str, search_timeout=3.0): + def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200): self.search_timeout = search_timeout self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None @@ -44,13 +44,17 @@ class SearchIndex: self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever self.search_cache = LRUCache(2 ** 17) self.resolution_cache = LRUCache(2 ** 17) + self._elastic_host = elastic_host + self._elastic_port = elastic_port async def start(self): if self.sync_client: return - self.sync_client = AsyncElasticsearch(timeout=self.sync_timeout) - self.search_client = AsyncElasticsearch(timeout=self.search_timeout) + hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] + self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) + self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout) while True: + self.logger.info("DOITDOITDOIT\n\n\n\n%s:%i\n\n\n", self._elastic_host, self._elastic_port) try: await self.sync_client.cluster.health(wait_for_status='yellow') break diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 0255c0c2a..b551aeeab 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -8,8 +8,9 @@ from multiprocessing import Process import apsw from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk - -from .search import extract_doc, SearchIndex +from lbry.wallet.server.env import Env +from lbry.wallet.server.coin import LBC +from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex INDEX = 'claims' @@ -48,7 +49,9 @@ ORDER BY claim.height desc async def consume(producer): - es = AsyncElasticsearch() + env = Env(LBC) + logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) + es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) try: await async_bulk(es, producer, request_timeout=120) await es.indices.refresh(index=INDEX) @@ -57,7 +60,8 @@ async def consume(producer): async def make_es_index(): - index = SearchIndex('') + env = Env(LBC) + index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) try: return await index.start() finally: @@ -83,6 +87,9 @@ def __run(args, shard): def run_elastic_sync(): logging.basicConfig(level=logging.INFO) + logging.getLogger('aiohttp').setLevel(logging.WARNING) + logging.getLogger('elasticsearch').setLevel(logging.WARNING) + logging.info('lbry.server starting') parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") parser.add_argument("db_path", type=str) diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 6f94a3c66..f7d30fd85 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -974,7 +974,9 @@ class LBRYLevelDB(LevelDB): ) # Search index - self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout) + self.search_index = SearchIndex( + self.env.es_index_prefix, self.env.database_query_timeout, self.env.elastic_host, self.env.elastic_port + ) def close(self): super().close() diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index a3e47a78f..18f594a96 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -33,6 +33,8 @@ class Env: self.allow_root = self.boolean('ALLOW_ROOT', False) self.host = self.default('HOST', 'localhost') self.rpc_host = self.default('RPC_HOST', 'localhost') + self.elastic_host = self.default('ELASTIC_HOST', 'localhost') + self.elastic_port = self.integer('ELASTIC_PORT', 9200) self.loop_policy = self.set_event_loop_policy() self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) self.db_dir = self.required('DB_DIRECTORY')