Merge pull request #3248 from lbryio/add-es-host-setting

add ELASTIC_HOST and ELASTIC_PORT settings to hub
This commit is contained in:
Jack Robison 2021-03-30 13:09:18 -04:00 committed by GitHub
commit 1e28e21ab5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 8 deletions

View file

@ -27,6 +27,8 @@ def main():
coin_class = get_coin_class(args.spvserver)
logging.basicConfig(level=logging.INFO)
logging.info('lbry.server starting')
logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
try:
server = Server(Env(coin_class))
server.run()

View file

@ -33,7 +33,7 @@ class StreamResolution(str):
class SearchIndex:
def __init__(self, index_prefix: str, search_timeout=3.0):
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
self.search_timeout = search_timeout
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.search_client: Optional[AsyncElasticsearch] = None
@ -44,13 +44,17 @@ class SearchIndex:
self.short_id_cache = LRUCache(2 ** 17) # never invalidated, since short ids are forever
self.search_cache = LRUCache(2 ** 17)
self.resolution_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
async def start(self):
if self.sync_client:
return
self.sync_client = AsyncElasticsearch(timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(timeout=self.search_timeout)
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout)
while True:
self.logger.info("DOITDOITDOIT\n\n\n\n%s:%i\n\n\n", self._elastic_host, self._elastic_port)
try:
await self.sync_client.cluster.health(wait_for_status='yellow')
break

View file

@ -8,8 +8,9 @@ from multiprocessing import Process
import apsw
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
from .search import extract_doc, SearchIndex
from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex
INDEX = 'claims'
@ -48,7 +49,9 @@ ORDER BY claim.height desc
async def consume(producer):
es = AsyncElasticsearch()
env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
try:
await async_bulk(es, producer, request_timeout=120)
await es.indices.refresh(index=INDEX)
@ -57,7 +60,8 @@ async def consume(producer):
async def make_es_index():
index = SearchIndex('')
env = Env(LBC)
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
try:
return await index.start()
finally:
@ -83,6 +87,9 @@ def __run(args, shard):
def run_elastic_sync():
logging.basicConfig(level=logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
logging.info('lbry.server starting')
parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync")
parser.add_argument("db_path", type=str)

View file

@ -974,7 +974,9 @@ class LBRYLevelDB(LevelDB):
)
# Search index
self.search_index = SearchIndex(self.env.es_index_prefix, self.env.database_query_timeout)
self.search_index = SearchIndex(
self.env.es_index_prefix, self.env.database_query_timeout, self.env.elastic_host, self.env.elastic_port
)
def close(self):
super().close()

View file

@ -33,6 +33,8 @@ class Env:
self.allow_root = self.boolean('ALLOW_ROOT', False)
self.host = self.default('HOST', 'localhost')
self.rpc_host = self.default('RPC_HOST', 'localhost')
self.elastic_host = self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = self.integer('ELASTIC_PORT', 9200)
self.loop_policy = self.set_event_loop_policy()
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.db_dir = self.required('DB_DIRECTORY')