Merge branch 'master' into patch-1

This commit is contained in:
Alex Grin 2021-05-18 15:23:08 -04:00 committed by GitHub
commit 2c79c7e2f6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 28 deletions

View file

@ -92,11 +92,23 @@ To install on Windows:
``` ```
## Run the tests ## Run the tests
### Elasticsearch
To run the unit tests from the repo directory: For running integration tests, Elasticsearch is required to be available at localhost:9200/
The easiest way to start it is using docker with:
```bash
make elastic-docker
```
Alternative installation methods are available [at Elasticsearch website](https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html).
To run the unit and integration tests from the repo directory:
``` ```
python -m unittest discover tests.unit python -m unittest discover tests.unit
python -m unittest discover tests.integration
``` ```
## Usage ## Usage

View file

@ -30,3 +30,6 @@ test-integration:
idea: idea:
mkdir -p .idea mkdir -p .idea
cp -r scripts/idea/* .idea cp -r scripts/idea/* .idea
elastic-docker:
docker run -d -v lbryhub:/usr/share/elasticsearch/data -p 9200:9200 -p 9300:9300 -e"ES_JAVA_OPTS=-Xms512m -Xmx512m" -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.12.1

View file

@ -68,9 +68,9 @@ class SearchIndex:
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400 self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
) )
async def start(self): async def start(self) -> bool:
if self.sync_client: if self.sync_client:
return return False
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout)
@ -162,6 +162,9 @@ class SearchIndex:
await self.sync_client.update_by_query( await self.sync_client.update_by_query(
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4)
await self.sync_client.indices.refresh(self.index) await self.sync_client.indices.refresh(self.index)
self.clear_caches()
def clear_caches(self):
self.search_cache.clear() self.search_cache.clear()
self.short_id_cache.clear() self.short_id_cache.clear()
self.claim_cache.clear() self.claim_cache.clear()

View file

@ -12,10 +12,8 @@ from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC from lbry.wallet.server.coin import LBC
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch
INDEX = 'claims'
async def get_all(db, shard_num, shards_total, limit=0, index_name='claims'):
async def get_all(db, shard_num, shards_total, limit=0):
logging.info("shard %d starting", shard_num) logging.info("shard %d starting", shard_num)
def exec_factory(cursor, statement, bindings): def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
@ -44,24 +42,25 @@ ORDER BY claim.height desc
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
if num % 10_000 == 0: if num % 10_000 == 0:
logging.info("%d/%d", num, total) logging.info("%d/%d", num, total)
yield extract_doc(claim, INDEX) yield extract_doc(claim, index_name)
if 0 < limit <= num: if 0 < limit <= num:
break break
async def consume(producer): async def consume(producer, index_name):
env = Env(LBC) env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
try: try:
await async_bulk(es, producer, request_timeout=120) await async_bulk(es, producer, request_timeout=120)
await es.indices.refresh(index=INDEX) await es.indices.refresh(index=index_name)
finally: finally:
await es.close() await es.close()
async def make_es_index(): async def make_es_index(index=None):
env = Env(LBC) env = Env(LBC)
if index is None:
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
try: try:
@ -71,26 +70,27 @@ async def make_es_index():
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
) )
await index.delete_index() await index.delete_index()
await index.stop()
return await index.start() return await index.start()
finally: finally:
index.stop() index.stop()
async def run(args, shard): async def run(db_path, clients, blocks, shard, index_name='claims'):
def itsbusy(*_): def itsbusy(*_):
logging.info("shard %d: db is busy, retry", shard) logging.info("shard %d: db is busy, retry", shard)
return True return True
db = apsw.Connection(args.db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) db = apsw.Connection(db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI)
db.setbusyhandler(itsbusy) db.setbusyhandler(itsbusy)
db.cursor().execute('pragma journal_mode=wal;') db.cursor().execute('pragma journal_mode=wal;')
db.cursor().execute('pragma temp_store=memory;') db.cursor().execute('pragma temp_store=memory;')
producer = get_all(db.cursor(), shard, args.clients, limit=args.blocks) producer = get_all(db.cursor(), shard, clients, limit=blocks, index_name=index_name)
await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients)))) await asyncio.gather(*(consume(producer, index_name=index_name) for _ in range(min(8, clients))))
def __run(args, shard): def __run(args, shard):
asyncio.run(run(args, shard)) asyncio.run(run(args.db_path, args.clients, args.blocks, shard))
def run_elastic_sync(): def run_elastic_sync():

View file

@ -5,7 +5,6 @@ from typing import Union, Tuple, Set, List
from itertools import chain from itertools import chain
from decimal import Decimal from decimal import Decimal
from collections import namedtuple from collections import namedtuple
from multiprocessing import Manager
from binascii import unhexlify, hexlify from binascii import unhexlify, hexlify
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger from lbry.wallet.server.util import class_logger
@ -220,7 +219,6 @@ class SQLDB:
self.db = None self.db = None
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger
self.state_manager = None
self.blocked_streams = None self.blocked_streams = None
self.blocked_channels = None self.blocked_channels = None
self.blocking_channel_hashes = { self.blocking_channel_hashes = {
@ -251,11 +249,10 @@ class SQLDB:
self.execute(self.PRAGMAS) self.execute(self.PRAGMAS)
self.execute(self.CREATE_TABLES_QUERY) self.execute(self.CREATE_TABLES_QUERY)
register_canonical_functions(self.db) register_canonical_functions(self.db)
self.state_manager = Manager() self.blocked_streams = {}
self.blocked_streams = self.state_manager.dict() self.blocked_channels = {}
self.blocked_channels = self.state_manager.dict() self.filtered_streams = {}
self.filtered_streams = self.state_manager.dict() self.filtered_channels = {}
self.filtered_channels = self.state_manager.dict()
self.update_blocked_and_filtered_claims() self.update_blocked_and_filtered_claims()
for algorithm in self.trending: for algorithm in self.trending:
algorithm.install(self.db) algorithm.install(self.db)
@ -263,8 +260,6 @@ class SQLDB:
def close(self): def close(self):
if self.db is not None: if self.db is not None:
self.db.close() self.db.close()
if self.state_manager is not None:
self.state_manager.shutdown()
def update_blocked_and_filtered_claims(self): def update_blocked_and_filtered_claims(self):
self.update_claims_from_channel_hashes( self.update_claims_from_channel_hashes(

View file

@ -4,6 +4,7 @@ import lbry
import lbry.wallet import lbry.wallet
from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession from lbry.wallet.network import ClientSession
from lbry.wallet.server.db.elasticsearch.sync import run as run_sync, make_es_index
from lbry.wallet.server.session import LBRYElectrumX from lbry.wallet.server.session import LBRYElectrumX
from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode from lbry.wallet.orchstr8.node import SPVNode
@ -13,9 +14,6 @@ class TestSessions(IntegrationTestCase):
""" """
Tests that server cleans up stale connections after session timeout and client times out too. Tests that server cleans up stale connections after session timeout and client times out too.
""" """
LEDGER = lbry.wallet
async def test_session_bloat_from_socket_timeout(self): async def test_session_bloat_from_socket_timeout(self):
await self.conductor.stop_spv() await self.conductor.stop_spv()
await self.ledger.stop() await self.ledger.stop()
@ -87,3 +85,31 @@ class TestUsagePayment(CommandTestCase):
self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted
self.assertEqual(tx.outputs[0].amount, 100000000) self.assertEqual(tx.outputs[0].amount, 100000000)
self.assertEqual(tx.outputs[0].get_address(self.ledger), address) self.assertEqual(tx.outputs[0].get_address(self.ledger), address)
class TestESSync(CommandTestCase):
async def test_es_sync_utility(self):
for i in range(10):
await self.stream_create(f"stream{i}", bid='0.001')
await self.generate(1)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
db = self.conductor.spv_node.server.db
await db.search_index.delete_index()
db.search_index.clear_caches()
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
async def resync():
await db.search_index.start()
db.search_index.clear_caches()
await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
await resync()
# this time we will test a migration from unversioned to v1
await db.search_index.sync_client.indices.delete_template(db.search_index.index)
await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
await db.search_index.start()
await resync()