diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 05d95dacd..df83e6374 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -5,7 +5,7 @@ import os from collections import namedtuple from multiprocessing import Process -import apsw +import sqlite3 from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk from lbry.wallet.server.env import Env @@ -15,12 +15,11 @@ from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, async def get_all(db, shard_num, shards_total, limit=0, index_name='claims'): logging.info("shard %d starting", shard_num) - def exec_factory(cursor, statement, bindings): - tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) - cursor.setrowtrace(lambda cursor, row: tpl(*row)) - return True - db.setexectrace(exec_factory) + def namedtuple_factory(cursor, row): + Row = namedtuple('Row', (d[0] for d in cursor.description)) + return Row(*row) + db.row_factory = namedtuple_factory total = db.execute(f"select count(*) as total from claim where height % {shards_total} = {shard_num};").fetchone()[0] for num, claim in enumerate(db.execute(f""" SELECT claimtrie.claim_hash as is_controlling, @@ -77,15 +76,10 @@ async def make_es_index(index=None): async def run(db_path, clients, blocks, shard, index_name='claims'): - def itsbusy(*_): - logging.info("shard %d: db is busy, retry", shard) - return True - db = apsw.Connection(db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) - db.setbusyhandler(itsbusy) - db.cursor().execute('pragma journal_mode=wal;') - db.cursor().execute('pragma temp_store=memory;') - - producer = get_all(db.cursor(), shard, clients, limit=blocks, index_name=index_name) + db = sqlite3.connect(db_path, isolation_level=None, uri=True) + db.execute('pragma journal_mode=wal;') + db.execute('pragma temp_store=memory;') + producer = get_all(db, shard, clients, limit=blocks, index_name=index_name) await asyncio.gather(*(consume(producer, index_name=index_name) for _ in range(min(8, clients))))