drop apsw in wallet.server.db.elasticsearch.sync

This commit is contained in:
Lex Berezhny 2021-06-15 16:12:23 -04:00
parent b0371dd33d
commit c9cf7fd4d4

View file

@ -5,7 +5,7 @@ import os
from collections import namedtuple from collections import namedtuple
from multiprocessing import Process from multiprocessing import Process
import apsw import sqlite3
from elasticsearch import AsyncElasticsearch from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk from elasticsearch.helpers import async_bulk
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
@ -15,12 +15,11 @@ from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex,
async def get_all(db, shard_num, shards_total, limit=0, index_name='claims'): async def get_all(db, shard_num, shards_total, limit=0, index_name='claims'):
logging.info("shard %d starting", shard_num) logging.info("shard %d starting", shard_num)
def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
cursor.setrowtrace(lambda cursor, row: tpl(*row))
return True
db.setexectrace(exec_factory) def namedtuple_factory(cursor, row):
Row = namedtuple('Row', (d[0] for d in cursor.description))
return Row(*row)
db.row_factory = namedtuple_factory
total = db.execute(f"select count(*) as total from claim where height % {shards_total} = {shard_num};").fetchone()[0] total = db.execute(f"select count(*) as total from claim where height % {shards_total} = {shard_num};").fetchone()[0]
for num, claim in enumerate(db.execute(f""" for num, claim in enumerate(db.execute(f"""
SELECT claimtrie.claim_hash as is_controlling, SELECT claimtrie.claim_hash as is_controlling,
@ -77,15 +76,10 @@ async def make_es_index(index=None):
async def run(db_path, clients, blocks, shard, index_name='claims'): async def run(db_path, clients, blocks, shard, index_name='claims'):
def itsbusy(*_): db = sqlite3.connect(db_path, isolation_level=None, uri=True)
logging.info("shard %d: db is busy, retry", shard) db.execute('pragma journal_mode=wal;')
return True db.execute('pragma temp_store=memory;')
db = apsw.Connection(db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) producer = get_all(db, shard, clients, limit=blocks, index_name=index_name)
db.setbusyhandler(itsbusy)
db.cursor().execute('pragma journal_mode=wal;')
db.cursor().execute('pragma temp_store=memory;')
producer = get_all(db.cursor(), shard, clients, limit=blocks, index_name=index_name)
await asyncio.gather(*(consume(producer, index_name=index_name) for _ in range(min(8, clients)))) await asyncio.gather(*(consume(producer, index_name=index_name) for _ in range(min(8, clients))))