forked from LBRYCommunity/lbry-sdk
update lbry-hub-elastic-sync to support resyncing recent blocks
This commit is contained in:
parent
64509ca95d
commit
9020e39a83
2 changed files with 73 additions and 6 deletions
|
@ -2,7 +2,7 @@ import argparse
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from elasticsearch import AsyncElasticsearch
|
from elasticsearch import AsyncElasticsearch
|
||||||
from elasticsearch.helpers import async_bulk
|
from elasticsearch.helpers import async_streaming_bulk
|
||||||
from lbry.wallet.server.env import Env
|
from lbry.wallet.server.env import Env
|
||||||
from lbry.wallet.server.coin import LBC
|
from lbry.wallet.server.coin import LBC
|
||||||
from lbry.wallet.server.leveldb import LevelDB
|
from lbry.wallet.server.leveldb import LevelDB
|
||||||
|
@ -10,6 +10,50 @@ from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersion
|
||||||
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS
|
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS
|
||||||
|
|
||||||
|
|
||||||
|
async def get_recent_claims(blocks: int, index_name='claims', db=None):
|
||||||
|
env = Env(LBC)
|
||||||
|
need_open = db is None
|
||||||
|
db = db or LevelDB(env)
|
||||||
|
if need_open:
|
||||||
|
await db.open_dbs()
|
||||||
|
try:
|
||||||
|
cnt = 0
|
||||||
|
state = db.prefix_db.db_state.get()
|
||||||
|
touched_claims = set()
|
||||||
|
deleted_claims = set()
|
||||||
|
for height in range(state.height - blocks + 1, state.height + 1):
|
||||||
|
touched_or_deleted = db.prefix_db.touched_or_deleted.get(height)
|
||||||
|
touched_claims.update(touched_or_deleted.touched_claims)
|
||||||
|
deleted_claims.update(touched_or_deleted.deleted_claims)
|
||||||
|
touched_claims.difference_update(deleted_claims)
|
||||||
|
|
||||||
|
for deleted in deleted_claims:
|
||||||
|
yield {
|
||||||
|
'_index': index_name,
|
||||||
|
'_op_type': 'delete',
|
||||||
|
'_id': deleted.hex()
|
||||||
|
}
|
||||||
|
for touched in touched_claims:
|
||||||
|
claim = db.claim_producer(touched)
|
||||||
|
if claim:
|
||||||
|
yield {
|
||||||
|
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
|
||||||
|
'_id': claim['claim_id'],
|
||||||
|
'_index': index_name,
|
||||||
|
'_op_type': 'update',
|
||||||
|
'doc_as_upsert': True
|
||||||
|
}
|
||||||
|
cnt += 1
|
||||||
|
else:
|
||||||
|
logging.warning("could not sync claim %s", touched.hex())
|
||||||
|
if cnt % 10000 == 0:
|
||||||
|
print(f"{cnt} claims sent")
|
||||||
|
print("sent %i claims, deleted %i" % (len(touched_claims), len(deleted_claims)))
|
||||||
|
finally:
|
||||||
|
if need_open:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
async def get_all_claims(index_name='claims', db=None):
|
async def get_all_claims(index_name='claims', db=None):
|
||||||
env = Env(LBC)
|
env = Env(LBC)
|
||||||
need_open = db is None
|
need_open = db is None
|
||||||
|
@ -52,14 +96,20 @@ async def make_es_index(index=None):
|
||||||
index.stop()
|
index.stop()
|
||||||
|
|
||||||
|
|
||||||
async def run_sync(index_name='claims', db=None, clients=32):
|
async def run_sync(index_name='claims', db=None, clients=32, blocks=0):
|
||||||
env = Env(LBC)
|
env = Env(LBC)
|
||||||
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
|
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
|
||||||
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
|
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
|
||||||
|
if blocks > 0:
|
||||||
|
blocks = min(blocks, 200)
|
||||||
|
logging.info("Resyncing last %i blocks", blocks)
|
||||||
|
claim_generator = get_recent_claims(blocks, index_name=index_name, db=db)
|
||||||
|
else:
|
||||||
claim_generator = get_all_claims(index_name=index_name, db=db)
|
claim_generator = get_all_claims(index_name=index_name, db=db)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await async_bulk(es, claim_generator, request_timeout=600)
|
async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False):
|
||||||
|
if not ok:
|
||||||
|
logging.warning("indexing failed for an item: %s", item)
|
||||||
await es.indices.refresh(index=index_name)
|
await es.indices.refresh(index=index_name)
|
||||||
finally:
|
finally:
|
||||||
await es.close()
|
await es.close()
|
||||||
|
@ -85,4 +135,4 @@ def run_elastic_sync():
|
||||||
if not args.force and not asyncio.run(make_es_index()):
|
if not args.force and not asyncio.run(make_es_index()):
|
||||||
logging.info("ES is already initialized")
|
logging.info("ES is already initialized")
|
||||||
return
|
return
|
||||||
asyncio.run(run_sync(clients=args.clients))
|
asyncio.run(run_sync(clients=args.clients, blocks=args.blocks))
|
||||||
|
|
|
@ -713,6 +713,23 @@ class LevelDB:
|
||||||
yield meta
|
yield meta
|
||||||
batch.clear()
|
batch.clear()
|
||||||
|
|
||||||
|
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
|
||||||
|
claim_txo = self.get_cached_claim_txo(claim_hash)
|
||||||
|
if not claim_txo:
|
||||||
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
|
return
|
||||||
|
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
|
||||||
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
|
return
|
||||||
|
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||||
|
claim = self._prepare_resolve_result(
|
||||||
|
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||||
|
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||||
|
)
|
||||||
|
if not claim:
|
||||||
|
return
|
||||||
|
return self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||||
|
|
||||||
def claims_producer(self, claim_hashes: Set[bytes]):
|
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||||
batch = []
|
batch = []
|
||||||
results = []
|
results = []
|
||||||
|
|
Loading…
Reference in a new issue