forked from LBRYCommunity/lbry-sdk
es writer
This commit is contained in:
parent
611ad5c655
commit
ebec12522b
4 changed files with 557 additions and 110 deletions
|
@ -1582,7 +1582,7 @@ class BlockProcessor:
|
|||
await self._first_caught_up()
|
||||
self._caught_up_event.set()
|
||||
try:
|
||||
await asyncio.wait_for(self.blocks_event.wait(), 0.25)
|
||||
await asyncio.wait_for(self.blocks_event.wait(), 0.1)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
self.blocks_event.clear()
|
||||
|
|
150
lbry/wallet/server/db/elasticsearch/common.py
Normal file
150
lbry/wallet/server/db/elasticsearch/common.py
Normal file
|
@ -0,0 +1,150 @@
|
|||
from decimal import Decimal
|
||||
from typing import Iterable
|
||||
|
||||
from lbry.error import TooManyClaimSearchParametersError
|
||||
from lbry.schema.tags import clean_tags
|
||||
from lbry.schema.url import normalize_name
|
||||
from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES
|
||||
from lbry.wallet.server.db.elasticsearch.constants import REPLACEMENTS, FIELDS, TEXT_FIELDS, RANGE_FIELDS
|
||||
|
||||
|
||||
def expand_query(**kwargs):
|
||||
if "amount_order" in kwargs:
|
||||
kwargs["limit"] = 1
|
||||
kwargs["order_by"] = "effective_amount"
|
||||
kwargs["offset"] = int(kwargs["amount_order"]) - 1
|
||||
if 'name' in kwargs:
|
||||
kwargs['name'] = normalize_name(kwargs.pop('name'))
|
||||
if kwargs.get('is_controlling') is False:
|
||||
kwargs.pop('is_controlling')
|
||||
query = {'must': [], 'must_not': []}
|
||||
collapse = None
|
||||
if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None:
|
||||
kwargs['fee_currency'] = kwargs['fee_currency'].upper()
|
||||
for key, value in kwargs.items():
|
||||
key = key.replace('claim.', '')
|
||||
many = key.endswith('__in') or isinstance(value, list)
|
||||
if many and len(value) > 2048:
|
||||
raise TooManyClaimSearchParametersError(key, 2048)
|
||||
if many:
|
||||
key = key.replace('__in', '')
|
||||
value = list(filter(None, value))
|
||||
if value is None or isinstance(value, list) and len(value) == 0:
|
||||
continue
|
||||
key = REPLACEMENTS.get(key, key)
|
||||
if key in FIELDS:
|
||||
partial_id = False
|
||||
if key == 'claim_type':
|
||||
if isinstance(value, str):
|
||||
value = CLAIM_TYPES[value]
|
||||
else:
|
||||
value = [CLAIM_TYPES[claim_type] for claim_type in value]
|
||||
elif key == 'stream_type':
|
||||
value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
|
||||
if key == '_id':
|
||||
if isinstance(value, Iterable):
|
||||
value = [item[::-1].hex() for item in value]
|
||||
else:
|
||||
value = value[::-1].hex()
|
||||
if not many and key in ('_id', 'claim_id') and len(value) < 20:
|
||||
partial_id = True
|
||||
if key in ('signature_valid', 'has_source'):
|
||||
continue # handled later
|
||||
if key in TEXT_FIELDS:
|
||||
key += '.keyword'
|
||||
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
|
||||
if partial_id:
|
||||
query['must'].append({"prefix": {"claim_id": value}})
|
||||
elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
|
||||
operator_length = 2 if value[:2] in ops else 1
|
||||
operator, value = value[:operator_length], value[operator_length:]
|
||||
if key == 'fee_amount':
|
||||
value = str(Decimal(value)*1000)
|
||||
query['must'].append({"range": {key: {ops[operator]: value}}})
|
||||
elif many:
|
||||
query['must'].append({"terms": {key: value}})
|
||||
else:
|
||||
if key == 'fee_amount':
|
||||
value = str(Decimal(value)*1000)
|
||||
query['must'].append({"term": {key: {"value": value}}})
|
||||
elif key == 'not_channel_ids':
|
||||
for channel_id in value:
|
||||
query['must_not'].append({"term": {'channel_id.keyword': channel_id}})
|
||||
query['must_not'].append({"term": {'_id': channel_id}})
|
||||
elif key == 'channel_ids':
|
||||
query['must'].append({"terms": {'channel_id.keyword': value}})
|
||||
elif key == 'claim_ids':
|
||||
query['must'].append({"terms": {'claim_id.keyword': value}})
|
||||
elif key == 'media_types':
|
||||
query['must'].append({"terms": {'media_type.keyword': value}})
|
||||
elif key == 'any_languages':
|
||||
query['must'].append({"terms": {'languages': clean_tags(value)}})
|
||||
elif key == 'any_languages':
|
||||
query['must'].append({"terms": {'languages': value}})
|
||||
elif key == 'all_languages':
|
||||
query['must'].extend([{"term": {'languages': tag}} for tag in value])
|
||||
elif key == 'any_tags':
|
||||
query['must'].append({"terms": {'tags.keyword': clean_tags(value)}})
|
||||
elif key == 'all_tags':
|
||||
query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
|
||||
elif key == 'not_tags':
|
||||
query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
|
||||
elif key == 'not_claim_id':
|
||||
query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value])
|
||||
elif key == 'limit_claims_per_channel':
|
||||
collapse = ('channel_id.keyword', value)
|
||||
if kwargs.get('has_channel_signature'):
|
||||
query['must'].append({"exists": {"field": "signature"}})
|
||||
if 'signature_valid' in kwargs:
|
||||
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
|
||||
elif 'signature_valid' in kwargs:
|
||||
query.setdefault('should', [])
|
||||
query["minimum_should_match"] = 1
|
||||
query['should'].append({"bool": {"must_not": {"exists": {"field": "signature"}}}})
|
||||
query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
|
||||
if 'has_source' in kwargs:
|
||||
query.setdefault('should', [])
|
||||
query["minimum_should_match"] = 1
|
||||
is_stream_or_repost = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}}
|
||||
query['should'].append(
|
||||
{"bool": {"must": [{"match": {"has_source": kwargs['has_source']}}, is_stream_or_repost]}})
|
||||
query['should'].append({"bool": {"must_not": [is_stream_or_repost]}})
|
||||
query['should'].append({"bool": {"must": [{"term": {"reposted_claim_type": CLAIM_TYPES['channel']}}]}})
|
||||
if kwargs.get('text'):
|
||||
query['must'].append(
|
||||
{"simple_query_string":
|
||||
{"query": kwargs["text"], "fields": [
|
||||
"claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5"
|
||||
]}})
|
||||
query = {
|
||||
"_source": {"excludes": ["description", "title"]},
|
||||
'query': {'bool': query},
|
||||
"sort": [],
|
||||
}
|
||||
if "limit" in kwargs:
|
||||
query["size"] = kwargs["limit"]
|
||||
if 'offset' in kwargs:
|
||||
query["from"] = kwargs["offset"]
|
||||
if 'order_by' in kwargs:
|
||||
if isinstance(kwargs["order_by"], str):
|
||||
kwargs["order_by"] = [kwargs["order_by"]]
|
||||
for value in kwargs['order_by']:
|
||||
if 'trending_group' in value:
|
||||
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
|
||||
continue
|
||||
is_asc = value.startswith('^')
|
||||
value = value[1:] if is_asc else value
|
||||
value = REPLACEMENTS.get(value, value)
|
||||
if value in TEXT_FIELDS:
|
||||
value += '.keyword'
|
||||
query['sort'].append({value: "asc" if is_asc else "desc"})
|
||||
if collapse:
|
||||
query["collapse"] = {
|
||||
"field": collapse[0],
|
||||
"inner_hits": {
|
||||
"name": collapse[0],
|
||||
"size": collapse[1],
|
||||
"sort": query["sort"]
|
||||
}
|
||||
}
|
||||
return query
|
117
lbry/wallet/server/db/elasticsearch/fast_ar_trending.py
Normal file
117
lbry/wallet/server/db/elasticsearch/fast_ar_trending.py
Normal file
|
@ -0,0 +1,117 @@
|
|||
FAST_AR_TRENDING_SCRIPT = """
|
||||
double softenLBC(double lbc) { return (Math.pow(lbc, 1.0 / 3.0)); }
|
||||
|
||||
double logsumexp(double x, double y)
|
||||
{
|
||||
double top;
|
||||
if(x > y)
|
||||
top = x;
|
||||
else
|
||||
top = y;
|
||||
double result = top + Math.log(Math.exp(x-top) + Math.exp(y-top));
|
||||
return(result);
|
||||
}
|
||||
|
||||
double logdiffexp(double big, double small)
|
||||
{
|
||||
return big + Math.log(1.0 - Math.exp(small - big));
|
||||
}
|
||||
|
||||
double squash(double x)
|
||||
{
|
||||
if(x < 0.0)
|
||||
return -Math.log(1.0 - x);
|
||||
else
|
||||
return Math.log(x + 1.0);
|
||||
}
|
||||
|
||||
double unsquash(double x)
|
||||
{
|
||||
if(x < 0.0)
|
||||
return 1.0 - Math.exp(-x);
|
||||
else
|
||||
return Math.exp(x) - 1.0;
|
||||
}
|
||||
|
||||
double log_to_squash(double x)
|
||||
{
|
||||
return logsumexp(x, 0.0);
|
||||
}
|
||||
|
||||
double squash_to_log(double x)
|
||||
{
|
||||
//assert x > 0.0;
|
||||
return logdiffexp(x, 0.0);
|
||||
}
|
||||
|
||||
double squashed_add(double x, double y)
|
||||
{
|
||||
// squash(unsquash(x) + unsquash(y)) but avoiding overflow.
|
||||
// Cases where the signs are the same
|
||||
if (x < 0.0 && y < 0.0)
|
||||
return -logsumexp(-x, logdiffexp(-y, 0.0));
|
||||
if (x >= 0.0 && y >= 0.0)
|
||||
return logsumexp(x, logdiffexp(y, 0.0));
|
||||
// Where the signs differ
|
||||
if (x >= 0.0 && y < 0.0)
|
||||
if (Math.abs(x) >= Math.abs(y))
|
||||
return logsumexp(0.0, logdiffexp(x, -y));
|
||||
else
|
||||
return -logsumexp(0.0, logdiffexp(-y, x));
|
||||
if (x < 0.0 && y >= 0.0)
|
||||
{
|
||||
// Addition is commutative, hooray for new math
|
||||
return squashed_add(y, x);
|
||||
}
|
||||
return 0.0;
|
||||
}
|
||||
|
||||
double squashed_multiply(double x, double y)
|
||||
{
|
||||
// squash(unsquash(x)*unsquash(y)) but avoiding overflow.
|
||||
int sign;
|
||||
if(x*y >= 0.0)
|
||||
sign = 1;
|
||||
else
|
||||
sign = -1;
|
||||
return sign*logsumexp(squash_to_log(Math.abs(x))
|
||||
+ squash_to_log(Math.abs(y)), 0.0);
|
||||
}
|
||||
|
||||
// Squashed inflated units
|
||||
double inflateUnits(int height) {
|
||||
double timescale = 576.0; // Half life of 400 = e-folding time of a day
|
||||
// by coincidence, so may as well go with it
|
||||
return log_to_squash(height / timescale);
|
||||
}
|
||||
|
||||
double spikePower(double newAmount) {
|
||||
if (newAmount < 50.0) {
|
||||
return(0.5);
|
||||
} else if (newAmount < 85.0) {
|
||||
return(newAmount / 100.0);
|
||||
} else {
|
||||
return(0.85);
|
||||
}
|
||||
}
|
||||
|
||||
double spikeMass(double oldAmount, double newAmount) {
|
||||
double softenedChange = softenLBC(Math.abs(newAmount - oldAmount));
|
||||
double changeInSoftened = Math.abs(softenLBC(newAmount) - softenLBC(oldAmount));
|
||||
double power = spikePower(newAmount);
|
||||
if (oldAmount > newAmount) {
|
||||
-1.0 * Math.pow(changeInSoftened, power) * Math.pow(softenedChange, 1.0 - power)
|
||||
} else {
|
||||
Math.pow(changeInSoftened, power) * Math.pow(softenedChange, 1.0 - power)
|
||||
}
|
||||
}
|
||||
|
||||
for (i in params.src.changes) {
|
||||
double units = inflateUnits(i.height);
|
||||
if (ctx._source.trending_score == null) {
|
||||
ctx._source.trending_score = 0.0;
|
||||
}
|
||||
double bigSpike = squashed_multiply(units, squash(spikeMass(i.prev_amount, i.new_amount)));
|
||||
ctx._source.trending_score = squashed_add(ctx._source.trending_score, bigSpike);
|
||||
}
|
||||
"""
|
|
@ -1,138 +1,318 @@
|
|||
import os
|
||||
import argparse
|
||||
import time
|
||||
import signal
|
||||
import json
|
||||
import typing
|
||||
from collections import defaultdict
|
||||
import asyncio
|
||||
import logging
|
||||
from elasticsearch import AsyncElasticsearch
|
||||
from elasticsearch import AsyncElasticsearch, NotFoundError
|
||||
from elasticsearch.helpers import async_streaming_bulk
|
||||
from lbry.wallet.server.env import Env
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch
|
||||
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS
|
||||
|
||||
from lbry.schema.result import Censor
|
||||
from lbry.wallet.server.db.elasticsearch.search import IndexVersionMismatch
|
||||
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS
|
||||
from lbry.wallet.server.db.elasticsearch.common import expand_query
|
||||
from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierProtocol
|
||||
from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT
|
||||
from lbry.wallet.server.chain_reader import BlockchainReader
|
||||
from lbry.wallet.server.db.revertable import RevertableOp
|
||||
from lbry.wallet.server.db import DB_PREFIXES
|
||||
|
||||
|
||||
async def get_recent_claims(env, index_name='claims', db=None):
|
||||
log = logging.getLogger()
|
||||
need_open = db is None
|
||||
db = db or LevelDB(env)
|
||||
log = logging.getLogger()
|
||||
|
||||
|
||||
class ElasticWriter(BlockchainReader):
|
||||
VERSION = 1
|
||||
|
||||
def __init__(self, env):
|
||||
super().__init__(env, 'lbry-elastic-writer')
|
||||
# self._refresh_interval = 0.1
|
||||
self._task = None
|
||||
self.index = self.env.es_index_prefix + 'claims'
|
||||
self._elastic_host = env.elastic_host
|
||||
self._elastic_port = env.elastic_port
|
||||
self.sync_timeout = 1800
|
||||
self.sync_client = AsyncElasticsearch(
|
||||
[{'host': self._elastic_host, 'port': self._elastic_port}], timeout=self.sync_timeout
|
||||
)
|
||||
self._es_info_path = os.path.join(env.db_dir, 'es_info')
|
||||
self._last_wrote_height = 0
|
||||
self._last_wrote_block_hash = None
|
||||
|
||||
self._touched_claims = set()
|
||||
self._deleted_claims = set()
|
||||
|
||||
self._removed_during_undo = set()
|
||||
|
||||
self._trending = defaultdict(list)
|
||||
self._advanced = True
|
||||
self.synchronized = asyncio.Event()
|
||||
self._listeners: typing.List[ElasticNotifierProtocol] = []
|
||||
|
||||
async def run_es_notifier(self, synchronized: asyncio.Event):
|
||||
server = await asyncio.get_event_loop().create_server(
|
||||
lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port
|
||||
)
|
||||
self.log.warning("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port)
|
||||
synchronized.set()
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
|
||||
def notify_es_notification_listeners(self, height: int):
|
||||
for p in self._listeners:
|
||||
p.send_height(height)
|
||||
self.log.warning("notify listener %i", height)
|
||||
|
||||
def _read_es_height(self):
|
||||
with open(self._es_info_path, 'r') as f:
|
||||
info = json.loads(f.read())
|
||||
self._last_wrote_height = int(info.get('height', 0))
|
||||
self._last_wrote_block_hash = info.get('block_hash', None)
|
||||
|
||||
async def read_es_height(self):
|
||||
await asyncio.get_event_loop().run_in_executor(None, self._read_es_height)
|
||||
|
||||
def write_es_height(self, height: int, block_hash: str):
|
||||
with open(self._es_info_path, 'w') as f:
|
||||
f.write(json.dumps({'height': height, 'block_hash': block_hash}, indent=2))
|
||||
self._last_wrote_height = height
|
||||
self._last_wrote_block_hash = block_hash
|
||||
|
||||
async def get_index_version(self) -> int:
|
||||
try:
|
||||
if need_open:
|
||||
db.open_db()
|
||||
if db.es_sync_height == db.db_height or db.db_height <= 0:
|
||||
return
|
||||
if need_open:
|
||||
await db.initialize_caches()
|
||||
log.info(f"catching up ES ({db.es_sync_height}) to leveldb height: {db.db_height}")
|
||||
cnt = 0
|
||||
touched_claims = set()
|
||||
deleted_claims = set()
|
||||
for height in range(db.es_sync_height, db.db_height + 1):
|
||||
touched_or_deleted = db.prefix_db.touched_or_deleted.get(height)
|
||||
touched_claims.update(touched_or_deleted.touched_claims)
|
||||
deleted_claims.update(touched_or_deleted.deleted_claims)
|
||||
touched_claims.difference_update(deleted_claims)
|
||||
template = await self.sync_client.indices.get_template(self.index)
|
||||
return template[self.index]['version']
|
||||
except NotFoundError:
|
||||
return 0
|
||||
|
||||
for deleted in deleted_claims:
|
||||
async def set_index_version(self, version):
|
||||
await self.sync_client.indices.put_template(
|
||||
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
|
||||
)
|
||||
|
||||
async def start_index(self) -> bool:
|
||||
if self.sync_client:
|
||||
return False
|
||||
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
|
||||
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
|
||||
while True:
|
||||
try:
|
||||
await self.sync_client.cluster.health(wait_for_status='yellow')
|
||||
break
|
||||
except ConnectionError:
|
||||
self.log.warning("Failed to connect to Elasticsearch. Waiting for it!")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
|
||||
acked = res.get('acknowledged', False)
|
||||
if acked:
|
||||
await self.set_index_version(self.VERSION)
|
||||
return acked
|
||||
index_version = await self.get_index_version()
|
||||
if index_version != self.VERSION:
|
||||
self.log.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
|
||||
raise IndexVersionMismatch(index_version, self.VERSION)
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
return acked
|
||||
|
||||
async def stop_index(self):
|
||||
if self.sync_client:
|
||||
await self.sync_client.close()
|
||||
self.sync_client = None
|
||||
|
||||
def delete_index(self):
|
||||
return self.sync_client.indices.delete(self.index, ignore_unavailable=True)
|
||||
|
||||
def update_filter_query(self, censor_type, blockdict, channels=False):
|
||||
blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()}
|
||||
if channels:
|
||||
update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
|
||||
else:
|
||||
update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}")
|
||||
key = 'channel_id' if channels else 'claim_id'
|
||||
update['script'] = {
|
||||
"source": f"ctx._source.censor_type={censor_type}; "
|
||||
f"ctx._source.censoring_channel_id=params[ctx._source.{key}];",
|
||||
"lang": "painless",
|
||||
"params": blockdict
|
||||
}
|
||||
return update
|
||||
|
||||
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
|
||||
if filtered_streams:
|
||||
await self.sync_client.update_by_query(
|
||||
self.index, body=self.update_filter_query(Censor.SEARCH, filtered_streams), slices=4)
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
if filtered_channels:
|
||||
await self.sync_client.update_by_query(
|
||||
self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels), slices=4)
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
await self.sync_client.update_by_query(
|
||||
self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels, True), slices=4)
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
if blocked_streams:
|
||||
await self.sync_client.update_by_query(
|
||||
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_streams), slices=4)
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
if blocked_channels:
|
||||
await self.sync_client.update_by_query(
|
||||
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels), slices=4)
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
await self.sync_client.update_by_query(
|
||||
self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4)
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
|
||||
async def _claim_producer(self):
|
||||
for deleted in self._deleted_claims:
|
||||
yield {
|
||||
'_index': index_name,
|
||||
'_index': self.index,
|
||||
'_op_type': 'delete',
|
||||
'_id': deleted.hex()
|
||||
}
|
||||
for touched in touched_claims:
|
||||
claim = db.claim_producer(touched)
|
||||
for touched in self._touched_claims:
|
||||
claim = self.db.claim_producer(touched)
|
||||
if claim:
|
||||
self.log.warning("send es %s %i", claim['claim_id'], claim['activation_height'])
|
||||
yield {
|
||||
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
|
||||
'_id': claim['claim_id'],
|
||||
'_index': index_name,
|
||||
'_index': self.index,
|
||||
'_op_type': 'update',
|
||||
'doc_as_upsert': True
|
||||
}
|
||||
cnt += 1
|
||||
else:
|
||||
logging.warning("could not sync claim %s", touched.hex())
|
||||
if cnt % 10000 == 0:
|
||||
logging.info("%i claims sent to ES", cnt)
|
||||
for claim_hash, notifications in self._trending.items():
|
||||
self.log.warning("send es trending for %s", claim_hash.hex())
|
||||
yield {
|
||||
'_id': claim_hash.hex(),
|
||||
'_index': self.index,
|
||||
'_op_type': 'update',
|
||||
'script': {
|
||||
'lang': 'painless',
|
||||
'source': FAST_AR_TRENDING_SCRIPT,
|
||||
'params': {'src': {
|
||||
'changes': [
|
||||
{
|
||||
'height': notify_height,
|
||||
'prev_amount': trending_v.previous_amount / 1E8,
|
||||
'new_amount': trending_v.new_amount / 1E8,
|
||||
} for (notify_height, trending_v) in notifications
|
||||
]
|
||||
}}
|
||||
},
|
||||
}
|
||||
|
||||
db.es_sync_height = db.db_height
|
||||
db.write_db_state()
|
||||
db.prefix_db.unsafe_commit()
|
||||
db.assert_db_state()
|
||||
def advance(self, height: int):
|
||||
super().advance(height)
|
||||
|
||||
logging.info("finished sending %i claims to ES, deleted %i", cnt, len(deleted_claims))
|
||||
finally:
|
||||
if need_open:
|
||||
db.close()
|
||||
touched_or_deleted = self.db.prefix_db.touched_or_deleted.get(height)
|
||||
for k, v in self.db.prefix_db.trending_notification.iterate((height,)):
|
||||
self._trending[k.claim_hash].append((k.height, v))
|
||||
if touched_or_deleted:
|
||||
readded_after_reorg = self._removed_during_undo.intersection(touched_or_deleted.touched_claims)
|
||||
self._deleted_claims.difference_update(readded_after_reorg)
|
||||
self._touched_claims.update(touched_or_deleted.touched_claims)
|
||||
self._deleted_claims.update(touched_or_deleted.deleted_claims)
|
||||
self._touched_claims.difference_update(self._deleted_claims)
|
||||
for to_del in touched_or_deleted.deleted_claims:
|
||||
if to_del in self._trending:
|
||||
self._trending.pop(to_del)
|
||||
self.log.warning("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims),
|
||||
len(self._touched_claims), len(self._deleted_claims))
|
||||
self._advanced = True
|
||||
|
||||
def unwind(self):
|
||||
self.db.tx_counts.pop()
|
||||
reverted_block_hash = self.db.coin.header_hash(self.db.headers.pop())
|
||||
packed = self.db.prefix_db.undo.get(len(self.db.tx_counts), reverted_block_hash)
|
||||
touched_or_deleted = None
|
||||
claims_to_delete = []
|
||||
# find and apply the touched_or_deleted items in the undos for the reverted blocks
|
||||
assert packed, f'missing undo information for block {len(self.db.tx_counts)}'
|
||||
while packed:
|
||||
op, packed = RevertableOp.unpack(packed)
|
||||
if op.is_delete and op.key.startswith(DB_PREFIXES.touched_or_deleted.value):
|
||||
assert touched_or_deleted is None, 'only should have one match'
|
||||
touched_or_deleted = self.db.prefix_db.touched_or_deleted.unpack_value(op.value)
|
||||
elif op.is_delete and op.key.startswith(DB_PREFIXES.claim_to_txo.value):
|
||||
v = self.db.prefix_db.claim_to_txo.unpack_value(op.value)
|
||||
if v.root_tx_num == v.tx_num and v.root_tx_num > self.db.tx_counts[-1]:
|
||||
claims_to_delete.append(self.db.prefix_db.claim_to_txo.unpack_key(op.key).claim_hash)
|
||||
if touched_or_deleted:
|
||||
self._touched_claims.update(set(touched_or_deleted.deleted_claims).union(
|
||||
touched_or_deleted.touched_claims.difference(set(claims_to_delete))))
|
||||
self._deleted_claims.update(claims_to_delete)
|
||||
self._removed_during_undo.update(claims_to_delete)
|
||||
self._advanced = True
|
||||
self.log.warning("delete %i claim and upsert %i from reorg", len(self._deleted_claims), len(self._touched_claims))
|
||||
|
||||
async def get_all_claims(env, index_name='claims', db=None):
|
||||
need_open = db is None
|
||||
db = db or LevelDB(env)
|
||||
if need_open:
|
||||
db.open_db()
|
||||
await db.initialize_caches()
|
||||
logging.info("Fetching claims to send ES from leveldb")
|
||||
try:
|
||||
async def poll_for_changes(self):
|
||||
await super().poll_for_changes()
|
||||
cnt = 0
|
||||
async for claim in db.all_claims_producer():
|
||||
yield {
|
||||
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
|
||||
'_id': claim['claim_id'],
|
||||
'_index': index_name,
|
||||
'_op_type': 'update',
|
||||
'doc_as_upsert': True
|
||||
}
|
||||
success = 0
|
||||
if self._advanced:
|
||||
if self._touched_claims or self._deleted_claims or self._trending:
|
||||
async for ok, item in async_streaming_bulk(
|
||||
self.sync_client, self._claim_producer(),
|
||||
raise_on_error=False):
|
||||
cnt += 1
|
||||
if cnt % 10000 == 0:
|
||||
logging.info("sent %i claims to ES", cnt)
|
||||
finally:
|
||||
if need_open:
|
||||
db.close()
|
||||
|
||||
|
||||
async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'):
|
||||
index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port)
|
||||
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
|
||||
try:
|
||||
created = await index.start()
|
||||
except IndexVersionMismatch as err:
|
||||
logging.info(
|
||||
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
|
||||
)
|
||||
await index.delete_index()
|
||||
await index.stop()
|
||||
created = await index.start()
|
||||
finally:
|
||||
index.stop()
|
||||
|
||||
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
|
||||
if force or created:
|
||||
claim_generator = get_all_claims(env, index_name=index_name, db=db)
|
||||
else:
|
||||
claim_generator = get_recent_claims(env, index_name=index_name, db=db)
|
||||
try:
|
||||
async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False):
|
||||
if not ok:
|
||||
logging.warning("indexing failed for an item: %s", item)
|
||||
await es.indices.refresh(index=index_name)
|
||||
self.log.warning("indexing failed for an item: %s", item)
|
||||
else:
|
||||
success += 1
|
||||
await self.sync_client.indices.refresh(self.index)
|
||||
self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex())
|
||||
self.log.warning("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt)
|
||||
self._touched_claims.clear()
|
||||
self._deleted_claims.clear()
|
||||
self._removed_during_undo.clear()
|
||||
self._trending.clear()
|
||||
self._advanced = False
|
||||
self.synchronized.set()
|
||||
self.notify_es_notification_listeners(self._last_wrote_height)
|
||||
|
||||
@property
|
||||
def last_synced_height(self) -> int:
|
||||
return self._last_wrote_height
|
||||
|
||||
async def start(self):
|
||||
env = self.env
|
||||
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
|
||||
|
||||
def _start_cancellable(run, *args):
|
||||
_flag = asyncio.Event()
|
||||
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
|
||||
return _flag.wait()
|
||||
|
||||
self.db.open_db()
|
||||
await self.db.initialize_caches()
|
||||
await self.start_index()
|
||||
self.last_state = self.db.read_db_state()
|
||||
|
||||
await _start_cancellable(self.run_es_notifier)
|
||||
await _start_cancellable(self.refresh_blocks_forever)
|
||||
|
||||
async def stop(self, delete_index=False):
|
||||
while self.cancellable_tasks:
|
||||
t = self.cancellable_tasks.pop()
|
||||
if not t.done():
|
||||
t.cancel()
|
||||
if delete_index:
|
||||
await self.delete_index()
|
||||
await self.stop_index()
|
||||
|
||||
def run(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def __exit():
|
||||
raise SystemExit()
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||
loop.run_until_complete(self.start())
|
||||
loop.run_until_complete(self.shutdown_event.wait())
|
||||
except (SystemExit, KeyboardInterrupt):
|
||||
pass
|
||||
finally:
|
||||
await es.close()
|
||||
|
||||
|
||||
def run_elastic_sync():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.getLogger('aiohttp').setLevel(logging.WARNING)
|
||||
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
|
||||
|
||||
logging.info('lbry.server starting')
|
||||
parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync")
|
||||
parser.add_argument("-c", "--clients", type=int, default=32)
|
||||
parser.add_argument("-f", "--force", default=False, action='store_true')
|
||||
Env.contribute_to_arg_parser(parser)
|
||||
args = parser.parse_args()
|
||||
env = Env.from_arg_parser(args)
|
||||
|
||||
if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')):
|
||||
logging.info("DB path doesnt exist, nothing to sync to ES")
|
||||
return
|
||||
|
||||
asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force))
|
||||
loop.run_until_complete(self.stop())
|
||||
|
|
Loading…
Add table
Reference in a new issue