Merge branch 'master' into dht_seed_script_metrics

This commit is contained in:
Alex Grin 2021-10-22 11:54:19 -04:00 committed by GitHub
commit ca4d758db9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 780 additions and 536 deletions

View file

@ -20,6 +20,7 @@ RUN apt-get update && \
python3-dev \
python3-pip \
python3-wheel \
python3-cffi \
python3-setuptools && \
update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
rm -rf /var/lib/apt/lists/*

View file

@ -18,14 +18,18 @@ services:
- "wallet_server:/database"
environment:
- DAEMON_URL=http://lbry:lbry@127.0.0.1:9245
- MAX_QUERY_WORKERS=4
- CACHE_MB=1024
- CACHE_ALL_TX_HASHES=
- CACHE_ALL_CLAIM_TXOS=
- MAX_SEND=1000000000000000000
- MAX_RECEIVE=1000000000000000000
- MAX_SESSIONS=100000
- HOST=0.0.0.0
- TCP_PORT=50001
- PROMETHEUS_PORT=2112
- QUERY_TIMEOUT_MS=3000 # how long search queries allowed to run before cancelling, in milliseconds
- TRENDING_ALGORITHMS=variable_decay
- MAX_SEND=10000000000000 # deprecated. leave it high until its removed
- MAX_SUBS=1000000000000 # deprecated. leave it high until its removed
- FILTERING_CHANNEL_IDS=770bd7ecba84fd2f7607fb15aedd2b172c2e153f 95e5db68a3101df19763f3a5182e4b12ba393ee8
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6 e4e230b131082f6b10c8f7994bbb83f29e8e6fb9
- BLOCKING_CHANNEL_IDS=dd687b357950f6f271999971f43c785e8067c3a9 06871aa438032244202840ec59a469b303257cad b4a2528f436eca1bf3bf3e10ff3f98c57bd6c4c6
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: es01

View file

@ -6,7 +6,7 @@ set -euo pipefail
SNAPSHOT_URL="${SNAPSHOT_URL:-}" #off by default. latest snapshot at https://lbry.com/snapshot/wallet
if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then
if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/lbry-leveldb ]]; then
files="$(ls)"
echo "Downloading wallet snapshot from $SNAPSHOT_URL"
wget --no-verbose --trust-server-names --content-disposition "$SNAPSHOT_URL"
@ -20,6 +20,6 @@ if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then
rm "$filename"
fi
/home/lbry/.local/bin/lbry-hub-elastic-sync /database/claims.db
/home/lbry/.local/bin/lbry-hub-elastic-sync
echo 'starting server'
/home/lbry/.local/bin/lbry-hub "$@"

View file

@ -3,14 +3,16 @@ import asyncio
import typing
from bisect import bisect_right
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
from prometheus_client import Gauge, Histogram
from collections import defaultdict
import lbry
from lbry.schema.url import URL
from lbry.schema.claim import Claim
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
from lbry.utils import LRUCache
from lbry.wallet.transaction import OutputScript, Output, Transaction
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
from lbry.wallet.server.daemon import DaemonError
@ -28,7 +30,6 @@ if typing.TYPE_CHECKING:
class TrendingNotification(NamedTuple):
height: int
added: bool
prev_amount: int
new_amount: int
@ -202,6 +203,8 @@ class BlockProcessor:
self.env = env
self.db = db
self.daemon = daemon
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event
self.coin = env.coin
@ -231,6 +234,9 @@ class BlockProcessor:
self.db_op_stack: Optional[RevertableOpStack] = None
# self.search_cache = {}
self.resolve_cache = LRUCache(2**16)
self.resolve_outputs_cache = LRUCache(2 ** 16)
self.history_cache = {}
self.status_server = StatusServer()
@ -297,7 +303,11 @@ class BlockProcessor:
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
to_update = await asyncio.get_event_loop().run_in_executor(
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
)
for claim in to_update:
yield 'update', claim
async def run_in_thread_with_lock(self, func, *args):
@ -308,13 +318,12 @@ class BlockProcessor:
# consistent and not being updated elsewhere.
async def run_in_thread_locked():
async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread_locked())
@staticmethod
async def run_in_thread(func, *args):
async def run_in_thread(self, func, *args):
async def run_in_thread():
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread())
async def check_and_advance_blocks(self, raw_blocks):
@ -357,6 +366,7 @@ class BlockProcessor:
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
self.db.filtered_streams, self.db.filtered_channels)
await self.db.search_index.update_trending_score(self.activation_info_to_send_es)
await self._es_caught_up()
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
@ -394,7 +404,8 @@ class BlockProcessor:
await self.backup_block()
self.logger.info(f'backed up to height {self.height:,d}')
await self.db._read_claim_txos() # TODO: don't do this
if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this
for touched in self.touched_claims_to_send_es:
if not self.db.get_claim_txo(touched):
self.removed_claims_to_send_es.add(touched)
@ -480,9 +491,7 @@ class BlockProcessor:
if signing_channel:
raw_channel_tx = self.db.prefix_db.tx.get(
self.db.prefix_db.tx_hash.get(
signing_channel.tx_num, deserialize_value=False
), deserialize_value=False
self.db.get_tx_hash(signing_channel.tx_num), deserialize_value=False
)
channel_pub_key_bytes = None
try:
@ -537,10 +546,11 @@ class BlockProcessor:
previous_amount = previous_claim.amount
self.updated_claims.add(claim_hash)
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name
)
self.db.txo_to_claim[tx_num][nout] = claim_hash
if self.env.cache_all_claim_txos:
self.db.claim_to_txo[claim_hash] = ClaimToTXOValue(
tx_num, nout, root_tx_num, root_idx, txo.amount, channel_signature_is_valid, claim_name
)
self.db.txo_to_claim[tx_num][nout] = claim_hash
pending = StagedClaimtrieItem(
claim_name, normalized_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout,
@ -685,7 +695,7 @@ class BlockProcessor:
if (txin_num, txin.prev_idx) in self.txo_to_claim:
spent = self.txo_to_claim[(txin_num, txin.prev_idx)]
else:
if txin_num not in self.db.txo_to_claim or txin.prev_idx not in self.db.txo_to_claim[txin_num]:
if not self.db.get_cached_claim_exists(txin_num, txin.prev_idx):
# txo is not a claim
return False
spent_claim_hash_and_name = self.db.get_claim_from_txo(
@ -693,10 +703,12 @@ class BlockProcessor:
)
assert spent_claim_hash_and_name is not None
spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash)
claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx)
if not self.db.txo_to_claim[txin_num]:
self.db.txo_to_claim.pop(txin_num)
self.db.claim_to_txo.pop(claim_hash)
if spent.reposted_claim_hash:
self.pending_reposted.add(spent.reposted_claim_hash)
if spent.signing_hash and spent.channel_signature_is_valid and spent.signing_hash not in self.abandoned_claims:
@ -1014,8 +1026,8 @@ class BlockProcessor:
# prepare to activate or delay activation of the pending claims being added this block
for (tx_num, nout), staged in self.txo_to_claim.items():
is_delayed = not staged.is_update
if staged.claim_hash in self.db.claim_to_txo:
prev_txo = self.db.claim_to_txo[staged.claim_hash]
prev_txo = self.db.get_cached_claim_txo(staged.claim_hash)
if prev_txo:
prev_activation = self.db.get_activation(prev_txo.tx_num, prev_txo.position)
if height < prev_activation or prev_activation < 0:
is_delayed = True
@ -1309,9 +1321,9 @@ class BlockProcessor:
self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning)
def _add_claim_activation_change_notification(self, claim_id: str, height: int, added: bool, prev_amount: int,
def _add_claim_activation_change_notification(self, claim_id: str, height: int, prev_amount: int,
new_amount: int):
self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, added, prev_amount, new_amount))
self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, prev_amount, new_amount))
def _get_cumulative_update_ops(self, height: int):
# update the last takeover height for names with takeovers
@ -1389,25 +1401,13 @@ class BlockProcessor:
(name, prev_effective_amount, amt.tx_num, amt.position), (touched,)
)
if (name, touched) in self.activated_claim_amount_by_name_and_hash:
self._add_claim_activation_change_notification(
touched.hex(), height, True, prev_effective_amount,
self.activated_claim_amount_by_name_and_hash[(name, touched)]
)
if touched in self.activated_support_amount_by_claim:
for support_amount in self.activated_support_amount_by_claim[touched]:
self._add_claim_activation_change_notification(
touched.hex(), height, True, prev_effective_amount, support_amount
)
if touched in self.removed_active_support_amount_by_claim:
for support_amount in self.removed_active_support_amount_by_claim[touched]:
self._add_claim_activation_change_notification(
touched.hex(), height, False, prev_effective_amount, support_amount
)
new_effective_amount = self._get_pending_effective_amount(name, touched)
self.db.prefix_db.effective_amount.stage_put(
(name, new_effective_amount, tx_num, position), (touched,)
)
self._add_claim_activation_change_notification(
touched.hex(), height, prev_effective_amount, new_effective_amount
)
for channel_hash, count in self.pending_channel_counts.items():
if count != 0:
@ -1440,6 +1440,7 @@ class BlockProcessor:
self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
for tx, tx_hash in txs:
spent_claims = {}
@ -1490,6 +1491,9 @@ class BlockProcessor:
self._abandon_claim(abandoned_claim_hash, tx_num, nout, normalized_name)
self.pending_transactions[tx_count] = tx_hash
self.pending_transaction_num_mapping[tx_hash] = tx_count
if self.env.cache_all_tx_hashes:
self.db.total_transactions.append(tx_hash)
self.db.tx_num_mapping[tx_hash] = tx_count
tx_count += 1
# handle expired claims
@ -1580,6 +1584,8 @@ class BlockProcessor:
self.pending_transaction_num_mapping.clear()
self.pending_transactions.clear()
self.pending_support_amount_change.clear()
self.resolve_cache.clear()
self.resolve_outputs_cache.clear()
async def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0
@ -1595,8 +1601,14 @@ class BlockProcessor:
self.db.headers.pop()
self.db.tx_counts.pop()
self.tip = self.coin.header_hash(self.db.headers[-1])
self.tx_count = self.db.tx_counts[-1]
if self.env.cache_all_tx_hashes:
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
self.db.tx_num_mapping.pop(self.db.total_transactions.pop())
self.tx_count -= 1
else:
self.tx_count = self.db.tx_counts[-1]
self.height -= 1
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched_hashXs.discard(None)
@ -1626,8 +1638,15 @@ class BlockProcessor:
self.db.last_flush = now
self.db.last_flush_tx_count = self.db.fs_tx_count
await self.run_in_thread_with_lock(self.db.prefix_db.rollback, self.height + 1)
def rollback():
self.db.prefix_db.rollback(self.height + 1)
self.db.es_sync_height = self.height
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
await self.run_in_thread_with_lock(rollback)
self.clear_after_advance_or_reorg()
self.db.assert_db_state()
elapsed = self.db.last_flush - start_time
self.logger.warning(f'backup flush #{self.db.hist_flush_count:,d} took {elapsed:.1f}s. '
@ -1646,7 +1665,7 @@ class BlockProcessor:
if tx_hash in self.pending_transaction_num_mapping:
return self.pending_transaction_num_mapping[tx_hash]
else:
return self.db.prefix_db.tx_num.get(tx_hash).tx_num
return self.db.get_tx_num(tx_hash)
def spend_utxo(self, tx_hash: bytes, nout: int):
hashX, amount = self.utxo_cache.pop((tx_hash, nout), (None, None))
@ -1690,6 +1709,17 @@ class BlockProcessor:
self.logger.exception("error while processing txs")
raise
async def _es_caught_up(self):
self.db.es_sync_height = self.height
def flush():
assert len(self.db.prefix_db._op_stack) == 0
self.db.write_db_state()
self.db.prefix_db.unsafe_commit()
self.db.assert_db_state()
await self.run_in_thread_with_lock(flush)
async def _first_caught_up(self):
self.logger.info(f'caught up to height {self.height}')
# Flush everything but with first_sync->False state.
@ -1742,5 +1772,6 @@ class BlockProcessor:
self.status_server.stop()
# Shut down block processing
self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True)
self.db.close()
# self.executor.shutdown(wait=True)

View file

@ -1,7 +1,6 @@
import logging
import traceback
import argparse
import importlib
from lbry.wallet.server.env import Env
from lbry.wallet.server.server import Server
@ -10,27 +9,19 @@ def get_argument_parser():
parser = argparse.ArgumentParser(
prog="lbry-hub"
)
parser.add_argument("spvserver", type=str, help="Python class path to SPV server implementation.",
nargs="?", default="lbry.wallet.server.coin.LBC")
Env.contribute_to_arg_parser(parser)
return parser
def get_coin_class(spvserver):
spvserver_path, coin_class_name = spvserver.rsplit('.', 1)
spvserver_module = importlib.import_module(spvserver_path)
return getattr(spvserver_module, coin_class_name)
def main():
parser = get_argument_parser()
args = parser.parse_args()
coin_class = get_coin_class(args.spvserver)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
logging.info('lbry.server starting')
logging.getLogger('aiohttp').setLevel(logging.WARNING)
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
try:
server = Server(Env(coin_class))
server = Server(Env.from_arg_parser(args))
server.run()
except Exception:
traceback.print_exc()

View file

@ -55,7 +55,7 @@ class Daemon:
self.available_rpcs = {}
self.connector = aiohttp.TCPConnector()
self._block_hash_cache = LRUCacheWithMetrics(100000)
self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE)
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
async def close(self):
if self.connector:

View file

@ -39,3 +39,4 @@ class DB_PREFIXES(enum.Enum):
db_state = b's'
channel_count = b'Z'
support_amount = b'a'
block_txs = b'b'

View file

@ -42,8 +42,7 @@ class IndexVersionMismatch(Exception):
class SearchIndex:
VERSION = 1
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200,
half_life=0.4, whale_threshold=10000, whale_half_life=0.99):
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
self.search_timeout = search_timeout
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
self.search_client: Optional[AsyncElasticsearch] = None
@ -51,14 +50,9 @@ class SearchIndex:
self.index = index_prefix + 'claims'
self.logger = class_logger(__name__, self.__class__.__name__)
self.claim_cache = LRUCache(2 ** 15)
self.short_id_cache = LRUCache(2 ** 17)
self.search_cache = LRUCache(2 ** 17)
self.resolution_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
self._trending_half_life = half_life
self._trending_whale_threshold = whale_threshold
self._trending_whale_half_life = whale_half_life
async def get_index_version(self) -> int:
try:
@ -160,21 +154,102 @@ class SearchIndex:
async def update_trending_score(self, params):
update_trending_score_script = """
double softenLBC(double lbc) { Math.pow(lbc, 1.0f / 3.0f) }
double inflateUnits(int height) {
int renormalizationPeriod = 100000;
double doublingRate = 400.0f;
Math.pow(2.0, (height % renormalizationPeriod) / doublingRate)
double softenLBC(double lbc) { return (Math.pow(lbc, 1.0 / 3.0)); }
double logsumexp(double x, double y)
{
double top;
if(x > y)
top = x;
else
top = y;
double result = top + Math.log(Math.exp(x-top) + Math.exp(y-top));
return(result);
}
double logdiffexp(double big, double small)
{
return big + Math.log(1.0 - Math.exp(small - big));
}
double squash(double x)
{
if(x < 0.0)
return -Math.log(1.0 - x);
else
return Math.log(x + 1.0);
}
double unsquash(double x)
{
if(x < 0.0)
return 1.0 - Math.exp(-x);
else
return Math.exp(x) - 1.0;
}
double log_to_squash(double x)
{
return logsumexp(x, 0.0);
}
double squash_to_log(double x)
{
//assert x > 0.0;
return logdiffexp(x, 0.0);
}
double squashed_add(double x, double y)
{
// squash(unsquash(x) + unsquash(y)) but avoiding overflow.
// Cases where the signs are the same
if (x < 0.0 && y < 0.0)
return -logsumexp(-x, logdiffexp(-y, 0.0));
if (x >= 0.0 && y >= 0.0)
return logsumexp(x, logdiffexp(y, 0.0));
// Where the signs differ
if (x >= 0.0 && y < 0.0)
if (Math.abs(x) >= Math.abs(y))
return logsumexp(0.0, logdiffexp(x, -y));
else
return -logsumexp(0.0, logdiffexp(-y, x));
if (x < 0.0 && y >= 0.0)
{
// Addition is commutative, hooray for new math
return squashed_add(y, x);
}
return 0.0;
}
double squashed_multiply(double x, double y)
{
// squash(unsquash(x)*unsquash(y)) but avoiding overflow.
int sign;
if(x*y >= 0.0)
sign = 1;
else
sign = -1;
return sign*logsumexp(squash_to_log(Math.abs(x))
+ squash_to_log(Math.abs(y)), 0.0);
}
// Squashed inflated units
double inflateUnits(int height) {
double timescale = 576.0; // Half life of 400 = e-folding time of a day
// by coincidence, so may as well go with it
return log_to_squash(height / timescale);
}
double spikePower(double newAmount) {
if (newAmount < 50.0) {
0.5
return(0.5);
} else if (newAmount < 85.0) {
newAmount / 100.0
return(newAmount / 100.0);
} else {
0.85
return(0.85);
}
}
double spikeMass(double oldAmount, double newAmount) {
double softenedChange = softenLBC(Math.abs(newAmount - oldAmount));
double changeInSoftened = Math.abs(softenLBC(newAmount) - softenLBC(oldAmount));
@ -187,19 +262,11 @@ class SearchIndex:
}
for (i in params.src.changes) {
double units = inflateUnits(i.height);
if (i.added) {
if (ctx._source.trending_score == null) {
ctx._source.trending_score = (units * spikeMass(i.prev_amount, i.prev_amount + i.new_amount));
} else {
ctx._source.trending_score += (units * spikeMass(i.prev_amount, i.prev_amount + i.new_amount));
}
} else {
if (ctx._source.trending_score == null) {
ctx._source.trending_score = (units * spikeMass(i.prev_amount, i.prev_amount - i.new_amount));
} else {
ctx._source.trending_score += (units * spikeMass(i.prev_amount, i.prev_amount - i.new_amount));
}
if (ctx._source.trending_score == null) {
ctx._source.trending_score = 0.0;
}
double bigSpike = squashed_multiply(units, squash(spikeMass(i.prev_amount, i.new_amount)));
ctx._source.trending_score = squashed_add(ctx._source.trending_score, bigSpike);
}
"""
start = time.perf_counter()
@ -217,9 +284,8 @@ class SearchIndex:
'changes': [
{
'height': p.height,
'added': p.added,
'prev_amount': p.prev_amount * 1E-9,
'new_amount': p.new_amount * 1E-9,
'prev_amount': p.prev_amount / 1E8,
'new_amount': p.new_amount / 1E8,
} for p in claim_updates
]
}}
@ -260,9 +326,7 @@ class SearchIndex:
def clear_caches(self):
self.search_cache.clear()
self.short_id_cache.clear()
self.claim_cache.clear()
self.resolution_cache.clear()
async def cached_search(self, kwargs):
total_referenced = []
@ -354,21 +418,6 @@ class SearchIndex:
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
self.claim_cache.set(result['claim_id'], result)
async def full_id_from_short_id(self, name, short_id, channel_id=None):
key = '#'.join((channel_id or '', name, short_id))
if key not in self.short_id_cache:
query = {'name': name, 'claim_id': short_id}
if channel_id:
query['channel_id'] = channel_id
query['order_by'] = ['^channel_join']
query['signature_valid'] = True
else:
query['order_by'] = '^creation_height'
result, _, _ = await self.search(**query, limit=1)
if len(result) == 1:
result = result[0]['claim_id']
self.short_id_cache[key] = result
return self.short_id_cache.get(key, None)
async def search(self, **kwargs):
try:

View file

@ -1,21 +1,72 @@
import os
import argparse
import asyncio
import logging
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
from elasticsearch.helpers import async_streaming_bulk
from lbry.wallet.server.env import Env
from lbry.wallet.server.coin import LBC
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch
from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS
async def get_all_claims(index_name='claims', db=None):
env = Env(LBC)
async def get_recent_claims(env, index_name='claims', db=None):
need_open = db is None
db = db or LevelDB(env)
try:
if need_open:
await db.open_dbs()
db_state = db.prefix_db.db_state.get()
if db_state.es_sync_height == db_state.height:
return
cnt = 0
touched_claims = set()
deleted_claims = set()
for height in range(db_state.es_sync_height, db_state.height + 1):
touched_or_deleted = db.prefix_db.touched_or_deleted.get(height)
touched_claims.update(touched_or_deleted.touched_claims)
deleted_claims.update(touched_or_deleted.deleted_claims)
touched_claims.difference_update(deleted_claims)
for deleted in deleted_claims:
yield {
'_index': index_name,
'_op_type': 'delete',
'_id': deleted.hex()
}
for touched in touched_claims:
claim = db.claim_producer(touched)
if claim:
yield {
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
'_id': claim['claim_id'],
'_index': index_name,
'_op_type': 'update',
'doc_as_upsert': True
}
cnt += 1
else:
logging.warning("could not sync claim %s", touched.hex())
if cnt % 10000 == 0:
logging.info("%i claims sent to ES", cnt)
db.es_sync_height = db.db_height
db.write_db_state()
db.prefix_db.unsafe_commit()
db.assert_db_state()
logging.info("finished sending %i claims to ES, deleted %i", cnt, len(touched_claims), len(deleted_claims))
finally:
if need_open:
db.close()
async def get_all_claims(env, index_name='claims', db=None):
need_open = db is None
db = db or LevelDB(env)
if need_open:
await db.open_dbs()
logging.info("Fetching claims to send ES from leveldb")
try:
cnt = 0
async for claim in db.all_claims_producer():
@ -28,38 +79,36 @@ async def get_all_claims(index_name='claims', db=None):
}
cnt += 1
if cnt % 10000 == 0:
print(f"{cnt} claims sent")
logging.info("sent %i claims to ES", cnt)
finally:
if need_open:
db.close()
async def make_es_index(index=None):
env = Env(LBC)
if index is None:
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'):
index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
try:
return await index.start()
created = await index.start()
except IndexVersionMismatch as err:
logging.info(
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
)
await index.delete_index()
await index.stop()
return await index.start()
created = await index.start()
finally:
index.stop()
async def run_sync(index_name='claims', db=None, clients=32):
env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
claim_generator = get_all_claims(index_name=index_name, db=db)
if force or created:
claim_generator = get_all_claims(env, index_name=index_name, db=db)
else:
claim_generator = get_recent_claims(env, index_name=index_name, db=db)
try:
await async_bulk(es, claim_generator, request_timeout=600)
async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False):
if not ok:
logging.warning("indexing failed for an item: %s", item)
await es.indices.refresh(index=index_name)
finally:
await es.close()
@ -72,17 +121,14 @@ def run_elastic_sync():
logging.info('lbry.server starting')
parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync")
# parser.add_argument("db_path", type=str)
parser.add_argument("-c", "--clients", type=int, default=32)
parser.add_argument("-b", "--blocks", type=int, default=0)
parser.add_argument("-f", "--force", default=False, action='store_true')
Env.contribute_to_arg_parser(parser)
args = parser.parse_args()
env = Env.from_arg_parser(args)
# if not args.force and not os.path.exists(args.db_path):
# logging.info("DB path doesnt exist")
# return
if not args.force and not asyncio.run(make_es_index()):
logging.info("ES is already initialized")
if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')):
logging.info("DB path doesnt exist, nothing to sync to ES")
return
asyncio.run(run_sync(clients=args.clients))
asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force))

View file

@ -85,6 +85,18 @@ class PrefixRow(metaclass=PrefixRowType):
if v:
return v if not deserialize_value else self.unpack_value(v)
def get_pending(self, *key_args, fill_cache=True, deserialize_value=True):
packed_key = self.pack_key(*key_args)
last_op = self._op_stack.get_last_op_for_key(packed_key)
if last_op:
if last_op.is_put:
return last_op.value if not deserialize_value else self.unpack_value(last_op.value)
else: # it's a delete
return
v = self._db.get(packed_key, fill_cache=fill_cache)
if v:
return v if not deserialize_value else self.unpack_value(v)
def stage_put(self, key_args=(), value_args=()):
self._op_stack.append_op(RevertablePut(self.pack_key(*key_args), self.pack_value(*value_args)))
@ -169,6 +181,14 @@ class BlockHashValue(NamedTuple):
return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})"
class BlockTxsKey(NamedTuple):
height: int
class BlockTxsValue(NamedTuple):
tx_hashes: typing.List[bytes]
class TxCountKey(NamedTuple):
height: int
@ -514,6 +534,7 @@ class DBState(typing.NamedTuple):
hist_flush_count: int
comp_flush_count: int
comp_cursor: int
es_sync_height: int
class ActiveAmountPrefixRow(PrefixRow):
@ -1501,7 +1522,7 @@ class SupportAmountPrefixRow(PrefixRow):
class DBStatePrefixRow(PrefixRow):
prefix = DB_PREFIXES.db_state.value
value_struct = struct.Struct(b'>32sLL32sLLBBlll')
value_struct = struct.Struct(b'>32sLL32sLLBBlllL')
key_struct = struct.Struct(b'')
key_part_lambdas = [
@ -1519,15 +1540,19 @@ class DBStatePrefixRow(PrefixRow):
@classmethod
def pack_value(cls, genesis: bytes, height: int, tx_count: int, tip: bytes, utxo_flush_count: int, wall_time: int,
first_sync: bool, db_version: int, hist_flush_count: int, comp_flush_count: int,
comp_cursor: int) -> bytes:
comp_cursor: int, es_sync_height: int) -> bytes:
return super().pack_value(
genesis, height, tx_count, tip, utxo_flush_count,
wall_time, 1 if first_sync else 0, db_version, hist_flush_count,
comp_flush_count, comp_cursor
comp_flush_count, comp_cursor, es_sync_height
)
@classmethod
def unpack_value(cls, data: bytes) -> DBState:
if len(data) == 94:
# TODO: delete this after making a new snapshot - 10/20/21
# migrate in the es_sync_height if it doesnt exist
data += data[32:36]
return DBState(*super().unpack_value(data))
@classmethod
@ -1540,6 +1565,36 @@ class DBStatePrefixRow(PrefixRow):
)
class BlockTxsPrefixRow(PrefixRow):
prefix = DB_PREFIXES.block_txs.value
key_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@classmethod
def unpack_key(cls, key: bytes) -> BlockTxsKey:
return BlockTxsKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, tx_hashes: typing.List[bytes]) -> bytes:
assert all(len(tx_hash) == 32 for tx_hash in tx_hashes)
return b''.join(tx_hashes)
@classmethod
def unpack_value(cls, data: bytes) -> BlockTxsValue:
return BlockTxsValue([data[i*32:(i+1)*32] for i in range(len(data) // 32)])
@classmethod
def pack_item(cls, height, tx_hashes):
return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
@ -1604,6 +1659,7 @@ class HubDB(PrefixDB):
self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
self.db_state = DBStatePrefixRow(db, self._op_stack)
self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
self.block_txs = BlockTxsPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -83,11 +83,26 @@ class OpStackIntegrity(Exception):
class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
"""
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
are not deleted, and that when keys are deleted the current value is correctly known so that the delete
may be undone. When putting values, the integrity checks assure that existing values are not overwritten
without first being deleted. Updates are performed by applying a delete op for the old value and a put op
for the new value.
:param get_fn: getter function from an object implementing `KeyValueStorage`
:param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged
"""
self._get = get_fn
self._items = defaultdict(list)
self._unsafe_prefixes = unsafe_prefixes or set()
def append_op(self, op: RevertableOp):
"""
Apply a put or delete op, checking that it introduces no integrity errors
"""
inverted = op.invert()
if self._items[op.key] and inverted == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
@ -106,19 +121,22 @@ class RevertableOpStack:
elif op.is_delete and has_stored_val and stored_val != op.value and not will_delete_existing_stored:
# there is a value and we're not deleting it in this op
# check that a delete for the stored value is in the stack
raise OpStackIntegrity(f"delete {op}")
raise OpStackIntegrity(f"db op tries to delete with incorrect existing value {op}")
elif op.is_delete and not has_stored_val:
raise OpStackIntegrity(f"db op tries to delete nonexistent key: {op}")
elif op.is_delete and stored_val != op.value:
raise OpStackIntegrity(f"db op tries to delete with incorrect value: {op}")
except OpStackIntegrity as err:
if op.key[:1] in self._unsafe_prefixes:
log.error(f"skipping over integrity error: {err}")
log.debug(f"skipping over integrity error: {err}")
else:
raise err
self._items[op.key].append(op)
def extend_ops(self, ops: Iterable[RevertableOp]):
"""
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
"""
for op in ops:
self.append_op(op)
@ -139,9 +157,19 @@ class RevertableOpStack:
yield op
def get_undo_ops(self) -> bytes:
"""
Get the serialized bytes to undo all of the changes made by the pending ops
"""
return b''.join(op.invert().pack() for op in reversed(self))
def apply_packed_undo_ops(self, packed: bytes):
"""
Unpack and apply a sequence of undo ops from serialized undo bytes
"""
while packed:
op, packed = RevertableOp.unpack(packed)
self.append_op(op)
def get_last_op_for_key(self, key: bytes) -> Optional[RevertableOp]:
if key in self._items and self._items[key]:
return self._items[key][-1]

View file

@ -13,7 +13,7 @@ from collections import namedtuple
from ipaddress import ip_address
from lbry.wallet.server.util import class_logger
from lbry.wallet.server.coin import Coin
from lbry.wallet.server.coin import Coin, LBC, LBCTestNet, LBCRegTest
import lbry.wallet.server.util as lib_util
@ -28,77 +28,84 @@ class Env:
class Error(Exception):
pass
def __init__(self, coin=None):
def __init__(self, coin=None, db_dir=None, daemon_url=None, host=None, rpc_host=None, elastic_host=None,
elastic_port=None, loop_policy=None, max_query_workers=None, websocket_host=None, websocket_port=None,
chain=None, es_index_prefix=None, es_mode=None, cache_MB=None, reorg_limit=None, tcp_port=None,
udp_port=None, ssl_port=None, ssl_certfile=None, ssl_keyfile=None, rpc_port=None,
prometheus_port=None, max_subscriptions=None, banner_file=None, anon_logs=None, log_sessions=None,
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=512):
self.logger = class_logger(__name__, self.__class__.__name__)
self.allow_root = self.boolean('ALLOW_ROOT', False)
self.host = self.default('HOST', 'localhost')
self.rpc_host = self.default('RPC_HOST', 'localhost')
self.elastic_host = self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = self.integer('ELASTIC_PORT', 9200)
self.loop_policy = self.set_event_loop_policy()
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.db_dir = self.required('DB_DIRECTORY')
self.db_engine = self.default('DB_ENGINE', 'leveldb')
# self.trending_algorithms = [
# trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
# ]
self.trending_half_life = math.log2(0.1 ** (1 / (3 + self.integer('TRENDING_DECAY_RATE', 48)))) + 1
self.trending_whale_half_life = math.log2(0.1 ** (1 / (3 + self.integer('TRENDING_WHALE_DECAY_RATE', 24)))) + 1
self.trending_whale_threshold = float(self.integer('TRENDING_WHALE_THRESHOLD', 10000)) * 1E8
self.max_query_workers = self.integer('MAX_QUERY_WORKERS', 4)
self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True)
self.track_metrics = self.boolean('TRACK_METRICS', False)
self.websocket_host = self.default('WEBSOCKET_HOST', self.host)
self.websocket_port = self.integer('WEBSOCKET_PORT', None)
self.daemon_url = self.required('DAEMON_URL')
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
self.daemon_url = daemon_url if daemon_url is not None else self.required('DAEMON_URL')
self.db_max_open_files = db_max_open_files
self.host = host if host is not None else self.default('HOST', 'localhost')
self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.loop_policy = self.set_event_loop_policy(
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)
)
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.max_query_workers = max_query_workers if max_query_workers is not None else self.integer('MAX_QUERY_WORKERS', 4)
self.websocket_host = websocket_host if websocket_host is not None else self.default('WEBSOCKET_HOST', self.host)
self.websocket_port = websocket_port if websocket_port is not None else self.integer('WEBSOCKET_PORT', None)
if coin is not None:
assert issubclass(coin, Coin)
self.coin = coin
else:
coin_name = self.required('COIN').strip()
network = self.default('NET', 'mainnet').strip()
self.coin = Coin.lookup_coin_class(coin_name, network)
self.es_index_prefix = self.default('ES_INDEX_PREFIX', '')
self.es_mode = self.default('ES_MODE', 'writer')
self.cache_MB = self.integer('CACHE_MB', 4096)
self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
chain = chain if chain is not None else self.default('NET', 'mainnet').strip().lower()
if chain == 'mainnet':
self.coin = LBC
elif chain == 'testnet':
self.coin = LBCTestNet
else:
self.coin = LBCRegTest
self.es_index_prefix = es_index_prefix if es_index_prefix is not None else self.default('ES_INDEX_PREFIX', '')
self.es_mode = es_mode if es_mode is not None else self.default('ES_MODE', 'writer')
self.cache_MB = cache_MB if cache_MB is not None else self.integer('CACHE_MB', 1024)
self.reorg_limit = reorg_limit if reorg_limit is not None else self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
# Server stuff
self.tcp_port = self.integer('TCP_PORT', None)
self.udp_port = self.integer('UDP_PORT', self.tcp_port)
self.ssl_port = self.integer('SSL_PORT', None)
self.tcp_port = tcp_port if tcp_port is not None else self.integer('TCP_PORT', None)
self.udp_port = udp_port if udp_port is not None else self.integer('UDP_PORT', self.tcp_port)
self.ssl_port = ssl_port if ssl_port is not None else self.integer('SSL_PORT', None)
if self.ssl_port:
self.ssl_certfile = self.required('SSL_CERTFILE')
self.ssl_keyfile = self.required('SSL_KEYFILE')
self.rpc_port = self.integer('RPC_PORT', 8000)
self.prometheus_port = self.integer('PROMETHEUS_PORT', 0)
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None)
self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
self.anon_logs = self.boolean('ANON_LOGS', False)
self.log_sessions = self.integer('LOG_SESSIONS', 3600)
self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False)
self.country = self.default('COUNTRY', 'US')
self.ssl_certfile = ssl_certfile if ssl_certfile is not None else self.required('SSL_CERTFILE')
self.ssl_keyfile = ssl_keyfile if ssl_keyfile is not None else self.required('SSL_KEYFILE')
self.rpc_port = rpc_port if rpc_port is not None else self.integer('RPC_PORT', 8000)
self.prometheus_port = prometheus_port if prometheus_port is not None else self.integer('PROMETHEUS_PORT', 0)
self.max_subscriptions = max_subscriptions if max_subscriptions is not None else self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = banner_file if banner_file is not None else self.default('BANNER_FILE', None)
# self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file)
self.anon_logs = anon_logs if anon_logs is not None else self.boolean('ANON_LOGS', False)
self.log_sessions = log_sessions if log_sessions is not None else self.integer('LOG_SESSIONS', 3600)
self.allow_lan_udp = allow_lan_udp if allow_lan_udp is not None else self.boolean('ALLOW_LAN_UDP', False)
self.cache_all_tx_hashes = cache_all_tx_hashes if cache_all_tx_hashes is not None else self.boolean('CACHE_ALL_TX_HASHES', False)
self.cache_all_claim_txos = cache_all_claim_txos if cache_all_claim_txos is not None else self.boolean('CACHE_ALL_CLAIM_TXOS', False)
self.country = country if country is not None else self.default('COUNTRY', 'US')
# Peer discovery
self.peer_discovery = self.peer_discovery_enum()
self.peer_announce = self.boolean('PEER_ANNOUNCE', True)
self.peer_hubs = self.extract_peer_hubs()
self.force_proxy = self.boolean('FORCE_PROXY', False)
self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost')
self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)
# self.tor_proxy_host = self.default('TOR_PROXY_HOST', 'localhost')
# self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)
# The electrum client takes the empty string as unspecified
self.payment_address = self.default('PAYMENT_ADDRESS', '')
self.donation_address = self.default('DONATION_ADDRESS', '')
self.payment_address = payment_address if payment_address is not None else self.default('PAYMENT_ADDRESS', '')
self.donation_address = donation_address if donation_address is not None else self.default('DONATION_ADDRESS', '')
# Server limits to help prevent DoS
self.max_send = self.integer('MAX_SEND', 1000000)
self.max_receive = self.integer('MAX_RECEIVE', 1000000)
self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_sessions = self.sane_max_sessions()
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
self.drop_client = self.custom("DROP_CLIENT", None, re.compile)
self.description = self.default('DESCRIPTION', '')
self.daily_fee = self.string_amount('DAILY_FEE', '0')
self.max_send = max_send if max_send is not None else self.integer('MAX_SEND', 1000000)
self.max_receive = max_receive if max_receive is not None else self.integer('MAX_RECEIVE', 1000000)
# self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_sessions = max_sessions if max_sessions is not None else self.sane_max_sessions()
# self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
self.session_timeout = session_timeout if session_timeout is not None else self.integer('SESSION_TIMEOUT', 600)
self.drop_client = drop_client if drop_client is not None else self.custom("DROP_CLIENT", None, re.compile)
self.description = description if description is not None else self.default('DESCRIPTION', '')
self.daily_fee = daily_fee if daily_fee is not None else self.string_amount('DAILY_FEE', '0')
# Identities
clearnet_identity = self.clearnet_identity()
@ -106,7 +113,8 @@ class Env:
self.identities = [identity
for identity in (clearnet_identity, tor_identity)
if identity is not None]
self.database_query_timeout = float(self.integer('QUERY_TIMEOUT_MS', 3000)) / 1000.0
self.database_query_timeout = database_query_timeout if database_query_timeout is not None else \
(float(self.integer('QUERY_TIMEOUT_MS', 10000)) / 1000.0)
@classmethod
def default(cls, envvar, default):
@ -158,9 +166,9 @@ class Env:
if bad:
raise cls.Error(f'remove obsolete environment variables {bad}')
def set_event_loop_policy(self):
policy_name = self.default('EVENT_LOOP_POLICY', None)
if not policy_name:
@classmethod
def set_event_loop_policy(cls, policy_name: str = None):
if not policy_name or policy_name == 'default':
import asyncio
return asyncio.get_event_loop_policy()
elif policy_name == 'uvloop':
@ -169,7 +177,7 @@ class Env:
loop_policy = uvloop.EventLoopPolicy()
asyncio.set_event_loop_policy(loop_policy)
return loop_policy
raise self.Error(f'unknown event loop policy "{policy_name}"')
raise cls.Error(f'unknown event loop policy "{policy_name}"')
def cs_host(self, *, for_rpc):
"""Returns the 'host' argument to pass to asyncio's create_server
@ -278,3 +286,99 @@ class Env:
def extract_peer_hubs(self):
return [hub.strip() for hub in self.default('PEER_HUBS', '').split(',') if hub.strip()]
@classmethod
def contribute_to_arg_parser(cls, parser):
parser.add_argument('--db_dir', type=str, help='path of the directory containing lbry-leveldb',
default=cls.default('DB_DIRECTORY', None))
parser.add_argument('--daemon_url',
help='URL for rpc from lbrycrd, <rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>',
default=cls.default('DAEMON_URL', None))
parser.add_argument('--db_max_open_files', type=int, default=512,
help='number of files leveldb can have open at a time')
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
help='Interface for hub server to listen on')
parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),
help='TCP port to listen on for hub server')
parser.add_argument('--udp_port', type=int, default=cls.integer('UDP_PORT', 50001),
help='UDP port to listen on for hub server')
parser.add_argument('--rpc_host', default=cls.default('RPC_HOST', 'localhost'), type=str,
help='Listening interface for admin rpc')
parser.add_argument('--rpc_port', default=cls.integer('RPC_PORT', 8000), type=int,
help='Listening port for admin rpc')
parser.add_argument('--websocket_host', default=cls.default('WEBSOCKET_HOST', 'localhost'), type=str,
help='Listening interface for websocket')
parser.add_argument('--websocket_port', default=cls.integer('WEBSOCKET_PORT', None), type=int,
help='Listening port for websocket')
parser.add_argument('--ssl_port', default=cls.integer('SSL_PORT', None), type=int,
help='SSL port to listen on for hub server')
parser.add_argument('--ssl_certfile', default=cls.default('SSL_CERTFILE', None), type=str,
help='Path to SSL cert file')
parser.add_argument('--ssl_keyfile', default=cls.default('SSL_KEYFILE', None), type=str,
help='Path to SSL key file')
parser.add_argument('--reorg_limit', default=cls.integer('REORG_LIMIT', 200), type=int, help='Max reorg depth')
parser.add_argument('--elastic_host', default=cls.default('ELASTIC_HOST', 'localhost'), type=str,
help='elasticsearch host')
parser.add_argument('--elastic_port', default=cls.integer('ELASTIC_PORT', 9200), type=int,
help='elasticsearch port')
parser.add_argument('--es_mode', default=cls.default('ES_MODE', 'writer'), type=str,
choices=['reader', 'writer'])
parser.add_argument('--es_index_prefix', default=cls.default('ES_INDEX_PREFIX', ''), type=str)
parser.add_argument('--loop_policy', default=cls.default('EVENT_LOOP_POLICY', 'default'), type=str,
choices=['default', 'uvloop'])
parser.add_argument('--max_query_workers', type=int, default=cls.integer('MAX_QUERY_WORKERS', 4),
help='number of threads used by the request handler to read the database')
parser.add_argument('--cache_MB', type=int, default=cls.integer('CACHE_MB', 1024),
help='size of the leveldb lru cache, in megabytes')
parser.add_argument('--cache_all_tx_hashes', type=bool,
help='Load all tx hashes into memory. This will make address subscriptions and sync, '
'resolve, transaction fetching, and block sync all faster at the expense of higher '
'memory usage')
parser.add_argument('--cache_all_claim_txos', type=bool,
help='Load all claim txos into memory. This will make address subscriptions and sync, '
'resolve, transaction fetching, and block sync all faster at the expense of higher '
'memory usage')
parser.add_argument('--prometheus_port', type=int, default=cls.integer('PROMETHEUS_PORT', 0),
help='port for hub prometheus metrics to listen on, disabled by default')
parser.add_argument('--max_subscriptions', type=int, default=cls.integer('MAX_SUBSCRIPTIONS', 10000),
help='max subscriptions per connection')
parser.add_argument('--banner_file', type=str, default=cls.default('BANNER_FILE', None),
help='path to file containing banner text')
parser.add_argument('--anon_logs', type=bool, default=cls.boolean('ANON_LOGS', False),
help="don't log ip addresses")
parser.add_argument('--allow_lan_udp', type=bool, default=cls.boolean('ALLOW_LAN_UDP', False),
help='reply to hub UDP ping messages from LAN ip addresses')
parser.add_argument('--country', type=str, default=cls.default('COUNTRY', 'US'), help='')
parser.add_argument('--max_send', type=int, default=cls.default('MAX_SEND', 1000000), help='')
parser.add_argument('--max_receive', type=int, default=cls.default('MAX_RECEIVE', 1000000), help='')
parser.add_argument('--max_sessions', type=int, default=cls.default('MAX_SESSIONS', 1000), help='')
parser.add_argument('--session_timeout', type=int, default=cls.default('SESSION_TIMEOUT', 600), help='')
parser.add_argument('--drop_client', type=str, default=cls.default('DROP_CLIENT', None), help='')
parser.add_argument('--description', type=str, default=cls.default('DESCRIPTION', ''), help='')
parser.add_argument('--daily_fee', type=float, default=cls.default('DAILY_FEE', 0.0), help='')
parser.add_argument('--payment_address', type=str, default=cls.default('PAYMENT_ADDRESS', ''), help='')
parser.add_argument('--donation_address', type=str, default=cls.default('DONATION_ADDRESS', ''), help='')
parser.add_argument('--chain', type=str, default=cls.default('NET', 'mainnet'),
help="Which chain to use, default is mainnet")
parser.add_argument('--query_timeout_ms', type=int, default=cls.integer('QUERY_TIMEOUT_MS', 10000),
help="elasticsearch query timeout")
@classmethod
def from_arg_parser(cls, args):
return cls(
db_dir=args.db_dir, daemon_url=args.daemon_url, db_max_open_files=args.db_max_open_files,
host=args.host, rpc_host=args.rpc_host, elastic_host=args.elastic_host, elastic_port=args.elastic_port,
loop_policy=args.loop_policy, max_query_workers=args.max_query_workers, websocket_host=args.websocket_host,
websocket_port=args.websocket_port, chain=args.chain, es_index_prefix=args.es_index_prefix,
es_mode=args.es_mode, cache_MB=args.cache_MB, reorg_limit=args.reorg_limit, tcp_port=args.tcp_port,
udp_port=args.udp_port, ssl_port=args.ssl_port, ssl_certfile=args.ssl_certfile,
ssl_keyfile=args.ssl_keyfile, rpc_port=args.rpc_port, prometheus_port=args.prometheus_port,
max_subscriptions=args.max_subscriptions, banner_file=args.banner_file, anon_logs=args.anon_logs,
log_sessions=None, allow_lan_udp=args.allow_lan_udp,
cache_all_tx_hashes=args.cache_all_tx_hashes, cache_all_claim_txos=args.cache_all_claim_txos,
country=args.country, payment_address=args.payment_address, donation_address=args.donation_address,
max_send=args.max_send, max_receive=args.max_receive, max_sessions=args.max_sessions,
session_timeout=args.session_timeout, drop_client=args.drop_client, description=args.description,
daily_fee=args.daily_fee, database_query_timeout=(args.query_timeout_ms / 1000)
)

View file

@ -33,7 +33,7 @@ from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, HubDB
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue
from lbry.wallet.server.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow
from lbry.wallet.transaction import OutputScript
from lbry.schema.claim import Claim, guess_stream_type
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
@ -87,6 +87,8 @@ class LevelDB:
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.es_sync_height = 0
# blocking/filtering dicts
blocking_channels = self.env.default('BLOCKING_CHANNEL_IDS', '').split(' ')
filtering_channels = self.env.default('FILTERING_CHANNEL_IDS', '').split(' ')
@ -106,23 +108,24 @@ class LevelDB:
self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server')
self.last_flush = time.time()
self.logger.info(f'using {self.env.db_engine} for DB backend')
# Header merkle cache
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
# these are only used if the cache_all_tx_hashes setting is on
self.total_transactions: List[bytes] = []
self.tx_num_mapping: Dict[bytes, int] = {}
# these are only used if the cache_all_claim_txos setting is on
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
# Search index
self.search_index = SearchIndex(
self.env.es_index_prefix, self.env.database_query_timeout,
elastic_host=env.elastic_host, elastic_port=env.elastic_port,
half_life=self.env.trending_half_life, whale_threshold=self.env.trending_whale_threshold,
whale_half_life=self.env.trending_whale_half_life
elastic_host=env.elastic_host, elastic_port=env.elastic_port
)
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
@ -201,14 +204,14 @@ class LevelDB:
normalized_name = name
controlling_claim = self.get_controlling_claim(normalized_name)
tx_hash = self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
tx_hash = self.get_tx_hash(tx_num)
height = bisect_right(self.tx_counts, tx_num)
created_height = bisect_right(self.tx_counts, root_tx_num)
last_take_over_height = controlling_claim.height
expiration_height = self.coin.get_expiration_height(height)
support_amount = self.get_support_amount(claim_hash)
claim_amount = self.claim_to_txo[claim_hash].amount
claim_amount = self.get_cached_claim_txo(claim_hash).amount
effective_amount = support_amount + claim_amount
channel_hash = self.get_channel_for_claim(claim_hash, tx_num, position)
@ -217,7 +220,7 @@ class LevelDB:
canonical_url = short_url
claims_in_channel = self.get_claims_in_channel_count(claim_hash)
if channel_hash:
channel_vals = self.claim_to_txo.get(channel_hash)
channel_vals = self.get_cached_claim_txo(channel_hash)
if channel_vals:
channel_short_url = self.get_short_claim_id_url(
channel_vals.name, channel_vals.normalized_name, channel_hash, channel_vals.root_tx_num,
@ -269,11 +272,13 @@ class LevelDB:
)
# resolve by partial/complete claim id
for key, claim_txo in self.prefix_db.claim_short_id.iterate(prefix=(normalized_name, claim_id[:10])):
claim_hash = self.txo_to_claim[claim_txo.tx_num][claim_txo.position]
non_normalized_name = self.claim_to_txo.get(claim_hash).name
signature_is_valid = self.claim_to_txo.get(claim_hash).channel_signature_is_valid
full_claim_hash = self.get_cached_claim_hash(claim_txo.tx_num, claim_txo.position)
c = self.get_cached_claim_txo(full_claim_hash)
non_normalized_name = c.name
signature_is_valid = c.channel_signature_is_valid
return self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, non_normalized_name, key.root_tx_num,
claim_txo.tx_num, claim_txo.position, full_claim_hash, non_normalized_name, key.root_tx_num,
key.root_position, self.get_activation(claim_txo.tx_num, claim_txo.position),
signature_is_valid
)
@ -283,7 +288,7 @@ class LevelDB:
for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))):
if amount_order > idx + 1:
continue
claim_txo = self.claim_to_txo.get(claim_val.claim_hash)
claim_txo = self.get_cached_claim_txo(claim_val.claim_hash)
activation = self.get_activation(key.tx_num, key.position)
return self._prepare_resolve_result(
key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num,
@ -358,7 +363,7 @@ class LevelDB:
return await asyncio.get_event_loop().run_in_executor(None, self._resolve, url)
def _fs_get_claim_by_hash(self, claim_hash):
claim = self.claim_to_txo.get(claim_hash)
claim = self.get_cached_claim_txo(claim_hash)
if claim:
activation = self.get_activation(claim.tx_num, claim.position)
return self._prepare_resolve_result(
@ -462,7 +467,7 @@ class LevelDB:
def get_expired_by_height(self, height: int) -> Dict[bytes, Tuple[int, int, str, TxInput]]:
expired = {}
for k, v in self.prefix_db.claim_expiration.iterate(prefix=(height,)):
tx_hash = self.prefix_db.tx_hash.get(k.tx_num, deserialize_value=False)
tx_hash = self.get_tx_hash(k.tx_num)
tx = self.coin.transaction(self.prefix_db.tx.get(tx_hash, deserialize_value=False))
# treat it like a claim spend so it will delete/abandon properly
# the _spend_claim function this result is fed to expects a txi, so make a mock one
@ -495,19 +500,9 @@ class LevelDB:
script.parse()
return Claim.from_bytes(script.values['claim'])
except:
self.logger.error(
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
(raw or b'').hex()
)
self.logger.error("claim parsing for ES failed with tx: %s", tx_hash[::-1].hex())
return
def _prepare_claim_for_sync(self, claim_hash: bytes):
claim = self._fs_get_claim_by_hash(claim_hash)
if not claim:
print("wat")
return
return self._prepare_claim_metadata(claim_hash, claim)
def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult):
metadata = self.get_claim_metadata(claim.tx_hash, claim.position)
if not metadata:
@ -523,11 +518,11 @@ class LevelDB:
reposted_claim = None
reposted_metadata = None
if reposted_claim_hash:
reposted_claim = self.claim_to_txo.get(reposted_claim_hash)
reposted_claim = self.get_cached_claim_txo(reposted_claim_hash)
if not reposted_claim:
return
reposted_metadata = self.get_claim_metadata(
self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False), reposted_claim.position
self.get_tx_hash(reposted_claim.tx_num), reposted_claim.position
)
if not reposted_metadata:
return
@ -541,7 +536,7 @@ class LevelDB:
reposted_fee_currency = None
reposted_duration = None
if reposted_claim:
reposted_tx_hash = self.prefix_db.tx_hash.get(reposted_claim.tx_num, deserialize_value=False)
reposted_tx_hash = self.get_tx_hash(reposted_claim.tx_num)
raw_reposted_claim_tx = self.prefix_db.tx.get(reposted_tx_hash, deserialize_value=False)
try:
reposted_claim_txo = self.coin.transaction(
@ -549,19 +544,10 @@ class LevelDB:
).outputs[reposted_claim.position]
reposted_script = OutputScript(reposted_claim_txo.pk_script)
reposted_script.parse()
except:
self.logger.error(
"repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
raw_reposted_claim_tx.hex()
)
return
try:
reposted_metadata = Claim.from_bytes(reposted_script.values['claim'])
except:
self.logger.error(
"reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
raw_reposted_claim_tx.hex()
)
self.logger.error("failed to parse reposted claim in tx %s that was reposted by %s",
reposted_tx_hash[::-1].hex(), claim_hash.hex())
return
if reposted_metadata:
if reposted_metadata.is_stream:
@ -675,11 +661,21 @@ class LevelDB:
async def all_claims_producer(self, batch_size=500_000):
batch = []
for claim_hash, claim_txo in self.claim_to_txo.items():
if self.env.cache_all_claim_txos:
claim_iterator = self.claim_to_txo.items()
else:
claim_iterator = map(lambda item: (item[0].claim_hash, item[1]), self.prefix_db.claim_to_txo.iterate())
for claim_hash, claim_txo in claim_iterator:
# TODO: fix the couple of claim txos that dont have controlling names
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
continue
claim = self._fs_get_claim_by_hash(claim_hash)
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
if len(batch) == batch_size:
@ -696,23 +692,36 @@ class LevelDB:
yield meta
batch.clear()
async def claims_producer(self, claim_hashes: Set[bytes]):
def claim_producer(self, claim_hash: bytes) -> Optional[Dict]:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if not claim:
return
return self._prepare_claim_metadata(claim.claim_hash, claim)
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
loop = asyncio.get_event_loop()
def produce_claim(claim_hash):
if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
claim_txo = self.claim_to_txo.get(claim_hash)
for claim_hash in claim_hashes:
claim_txo = self.get_cached_claim_txo(claim_hash)
if not claim_txo:
return
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
@ -721,25 +730,13 @@ class LevelDB:
if claim:
batch.append(claim)
def get_metadata(claim):
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
results.append(meta)
if claim_hashes:
await asyncio.wait(
[loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes]
)
batch.sort(key=lambda x: x.tx_hash)
if batch:
await asyncio.wait(
[loop.run_in_executor(None, get_metadata, claim) for claim in batch]
)
for meta in results:
yield meta
batch.clear()
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list)
@ -776,7 +773,6 @@ class LevelDB:
else:
assert self.db_tx_count == 0
async def _read_claim_txos(self):
def read_claim_txos():
set_claim_to_txo = self.claim_to_txo.__setitem__
@ -807,6 +803,21 @@ class LevelDB:
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers
async def _read_tx_hashes(self):
def _read_tx_hashes():
return list(self.prefix_db.tx_hash.iterate(include_key=False, fill_cache=False, deserialize_value=False))
self.logger.info("loading tx hashes")
self.total_transactions.clear()
self.tx_num_mapping.clear()
start = time.perf_counter()
self.total_transactions.extend(await asyncio.get_event_loop().run_in_executor(None, _read_tx_hashes))
self.tx_num_mapping = {
tx_hash: tx_num for tx_num, tx_hash in enumerate(self.total_transactions)
}
ts = time.perf_counter() - start
self.logger.info("loaded %i tx hashes in %ss", len(self.total_transactions), round(ts, 4))
def estimate_timestamp(self, height: int) -> int:
if height < len(self.headers):
return struct.unpack('<I', self.headers[height][100:104])[0]
@ -818,7 +829,8 @@ class LevelDB:
self.prefix_db = HubDB(
os.path.join(self.env.db_dir, 'lbry-leveldb'), cache_mb=self.env.cache_MB,
reorg_limit=self.env.reorg_limit, max_open_files=512
reorg_limit=self.env.reorg_limit, max_open_files=self.env.db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix}
)
self.logger.info(f'opened db: lbry-leveldb')
@ -850,7 +862,10 @@ class LevelDB:
# Read TX counts (requires meta directory)
await self._read_tx_counts()
await self._read_headers()
await self._read_claim_txos()
if self.env.cache_all_claim_txos:
await self._read_claim_txos()
if self.env.cache_all_tx_hashes:
await self._read_tx_hashes()
# start search index
await self.search_index.start()
@ -858,6 +873,32 @@ class LevelDB:
def close(self):
self.prefix_db.close()
def get_tx_hash(self, tx_num: int) -> bytes:
if self.env.cache_all_tx_hashes:
return self.total_transactions[tx_num]
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
def get_tx_num(self, tx_hash: bytes) -> int:
if self.env.cache_all_tx_hashes:
return self.tx_num_mapping[tx_hash]
return self.prefix_db.tx_num.get(tx_hash).tx_num
def get_cached_claim_txo(self, claim_hash: bytes) -> Optional[ClaimToTXOValue]:
if self.env.cache_all_claim_txos:
return self.claim_to_txo.get(claim_hash)
return self.prefix_db.claim_to_txo.get_pending(claim_hash)
def get_cached_claim_hash(self, tx_num: int, position: int) -> Optional[bytes]:
if self.env.cache_all_claim_txos:
if tx_num not in self.txo_to_claim:
return
return self.txo_to_claim[tx_num].get(position, None)
v = self.prefix_db.txo_to_claim.get_pending(tx_num, position)
return None if not v else v.claim_hash
def get_cached_claim_exists(self, tx_num: int, position: int) -> bool:
return self.get_cached_claim_hash(tx_num, position) is not None
# Header merkle cache
async def populate_header_merkle_cache(self):
@ -914,7 +955,7 @@ class LevelDB:
if tx_height > self.db_height:
return None, tx_height
try:
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), tx_height
return self.get_tx_hash(tx_num), tx_height
except IndexError:
self.logger.exception(
"Failed to access a cached transaction, known bug #3142 "
@ -923,57 +964,54 @@ class LevelDB:
return None, tx_height
def get_block_txs(self, height: int) -> List[bytes]:
return [
tx_hash for tx_hash in self.prefix_db.tx_hash.iterate(
start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],),
deserialize_value=False, include_key=False
)
]
return self.prefix_db.block_txs.get(height).tx_hashes
def _fs_transactions(self, txids: Iterable[str]):
tx_counts = self.tx_counts
tx_db_get = self.prefix_db.tx.get
tx_cache = self._tx_and_merkle_cache
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
tx_infos = {}
for tx_hash in txids:
cached_tx = tx_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(tx_counts, tx_num)
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
tx_cache[tx_hash] = tx, merkle
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
for tx_hash in tx_hashes:
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
None, self._get_transaction_and_merkle, tx_hash
)
await asyncio.sleep(0)
return tx_infos
async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
def _get_transaction_and_merkle(self, tx_hash):
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
if self.env.cache_all_claim_txos:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
else:
fill_cache = False
tx_height = bisect_right(self.tx_counts, tx_num)
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - self.tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
self._tx_and_merkle_cache[tx_hash] = tx, merkle
return (None if not tx else tx.hex(), merkle)
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
@ -984,13 +1022,13 @@ class LevelDB:
txs = []
txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
txs_extend([
(self.prefix_db.tx_hash.get(tx_num, deserialize_value=False), bisect_right(self.tx_counts, tx_num))
for tx_num in hist
])
txs_extend(hist)
if len(txs) >= limit:
break
return txs
return [
(self.get_tx_hash(tx_num), bisect_right(self.tx_counts, tx_num))
for tx_num in txs
]
async def limited_history(self, hashX, *, limit=1000):
"""Return an unpruned, sorted list of (tx_hash, height) tuples of
@ -1024,7 +1062,8 @@ class LevelDB:
self.prefix_db.db_state.stage_put((), (
self.genesis_bytes, self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor,
self.es_sync_height
)
)
@ -1066,11 +1105,12 @@ class LevelDB:
def assert_db_state(self):
state = self.prefix_db.db_state.get()
assert self.db_version == state.db_version
assert self.db_height == state.height
assert self.db_tx_count == state.tx_count
assert self.db_tip == state.tip
assert self.first_sync == state.first_sync
assert self.db_version == state.db_version, f"{self.db_version} != {state.db_version}"
assert self.db_height == state.height, f"{self.db_height} != {state.height}"
assert self.db_tx_count == state.tx_count, f"{self.db_tx_count} != {state.tx_count}"
assert self.db_tip == state.tip, f"{self.db_tip} != {state.tip}"
assert self.first_sync == state.first_sync, f"{self.first_sync} != {state.first_sync}"
assert self.es_sync_height == state.es_sync_height, f"{self.es_sync_height} != {state.es_sync_height}"
async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order."""

View file

@ -69,7 +69,7 @@ class Server:
def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers)
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)
def __exit():

View file

@ -2,8 +2,6 @@ import os
import ssl
import math
import time
import json
import base64
import codecs
import typing
import asyncio
@ -15,8 +13,6 @@ from asyncio import Event, sleep
from collections import defaultdict
from functools import partial
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from elasticsearch import ConnectionTimeout
from prometheus_client import Counter, Info, Histogram, Gauge
@ -27,7 +23,6 @@ from lbry.schema.result import Outputs
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
from lbry.wallet.rpc.framing import NewlineFramer
import lbry.wallet.server.version as VERSION
@ -36,13 +31,11 @@ from lbry.wallet.rpc import (
RPCSession, JSONRPCAutoDetect, JSONRPCConnection,
handler_invocation, RPCError, Request, JSONRPC, Notification, Batch
)
from lbry.wallet.server import text
from lbry.wallet.server import util
from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error
from lbry.wallet.server.daemon import DaemonError
if typing.TYPE_CHECKING:
from lbry.wallet.server.env import Env
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.daemon import Daemon
BAD_REQUEST = 1
@ -264,7 +257,6 @@ class SessionManager:
await self._start_external_servers()
paused = False
def _group_map(self):
group_map = defaultdict(list)
for session in self.sessions.values():
@ -548,6 +540,10 @@ class SessionManager:
self._clear_stale_sessions(),
self._manage_servers()
])
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
log.exception("hub server died")
raise err
finally:
await self._close_servers(list(self.servers.keys()))
log.warning("disconnect %i sessions", len(self.sessions))
@ -633,7 +629,7 @@ class SessionManager:
self.mempool_statuses.pop(hashX, None)
await asyncio.get_event_loop().run_in_executor(
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
)
if touched or new_touched or (height_changed and self.mempool_statuses):
@ -775,10 +771,9 @@ class LBRYSessionManager(SessionManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_executor = None
self.websocket = None
# self.metrics = ServerLoadData()
self.metrics_loop = None
# self.metrics_loop = None
self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
@ -795,12 +790,6 @@ class LBRYSessionManager(SessionManager):
async def start_other(self):
self.running = True
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1)
else:
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
)
if self.websocket is not None:
await self.websocket.start()
@ -808,7 +797,6 @@ class LBRYSessionManager(SessionManager):
self.running = False
if self.websocket is not None:
await self.websocket.stop()
self.query_executor.shutdown()
class LBRYElectrumX(SessionBase):
@ -971,24 +959,6 @@ class LBRYElectrumX(SessionBase):
# else:
# return APICallMetrics(query_name)
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
self.session_mgr.pending_query_metric.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
except asyncio.CancelledError:
raise
except Exception:
log.exception("dear devs, please handle this exception better")
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
return base64.b64encode(result).decode()
finally:
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
# async def run_and_cache_query(self, query_name, kwargs):
# start = time.perf_counter()
@ -1036,41 +1006,52 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
def _claimtrie_resolve(self, *urls):
rows, extra = [], []
for url in urls:
self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel, repost, reposted_channel = self.db._resolve(url)
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None)
async def _cached_resolve_url(self, url):
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url]
async def claimtrie_resolve(self, *urls):
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
self.session_mgr.resolved_url_count_metric.inc(len(urls))
return result
async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
rows, extra = [], []
for url in urls:
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None
)
return result
finally:
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self):
return self.bp.height
@ -1221,9 +1202,11 @@ class LBRYElectrumX(SessionBase):
address: the address to subscribe to"""
if len(addresses) > 1000:
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
return [
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses
]
results = []
for address in addresses:
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
await asyncio.sleep(0)
return results
async def address_unsubscribe(self, address):
"""Unsubscribe an address.
@ -1472,7 +1455,7 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
for tx_hash in tx_hashes:
assert_tx_hash(tx_hash)
batch_result = await self.db.fs_transactions(tx_hashes)
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
needed_merkles = {}
for tx_hash in tx_hashes:

View file

@ -1,82 +0,0 @@
import time
from lbry.wallet.server import util
def sessions_lines(data):
"""A generator returning lines for a list of sessions.
data is the return value of rpc_sessions()."""
fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}')
yield fmt.format('ID', 'Flags', 'Client', 'Proto',
'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer')
for (id_, flags, peer, client, proto, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size, time) in data:
yield fmt.format(id_, flags, client, proto,
f'{reqs:,d}',
f'{txs_sent:,d}',
f'{subs:,d}',
f'{recv_count:,d}',
'{:,d}'.format(recv_size // 1024),
f'{send_count:,d}',
'{:,d}'.format(send_size // 1024),
util.formatted_time(time, sep=''), peer)
def groups_lines(data):
"""A generator returning lines for a list of groups.
data is the return value of rpc_groups()."""
fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}'
'{:>7} {:>9} {:>7} {:>9}')
yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB')
for (id_, session_count, bandwidth, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size) in data:
yield fmt.format(id_,
f'{session_count:,d}',
'{:,d}'.format(bandwidth // 1024),
f'{reqs:,d}',
f'{txs_sent:,d}',
f'{subs:,d}',
f'{recv_count:,d}',
'{:,d}'.format(recv_size // 1024),
f'{send_count:,d}',
'{:,d}'.format(send_size // 1024))
def peers_lines(data):
"""A generator returning lines for a list of peers.
data is the return value of rpc_peers()."""
def time_fmt(t):
if not t:
return 'Never'
return util.formatted_time(now - t)
now = time.time()
fmt = ('{:<30} {:<6} {:>5} {:>5} {:<17} {:>4} '
'{:>4} {:>8} {:>11} {:>11} {:>5} {:>20} {:<15}')
yield fmt.format('Host', 'Status', 'TCP', 'SSL', 'Server', 'Min',
'Max', 'Pruning', 'Last Good', 'Last Try',
'Tries', 'Source', 'IP Address')
for item in data:
features = item['features']
hostname = item['host']
host = features['hosts'][hostname]
yield fmt.format(hostname[:30],
item['status'],
host.get('tcp_port') or '',
host.get('ssl_port') or '',
features['server_version'] or 'unknown',
features['protocol_min'],
features['protocol_max'],
features['pruning'] or '',
time_fmt(item['last_good']),
time_fmt(item['last_try']),
item['try_count'],
item['source'][:20],
item['ip_addr'] or '')

View file

@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids)
txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')

View file

@ -5,7 +5,7 @@ import lbry.wallet
from lbry.error import ServerPaymentFeeAboveMaxAllowedError
from lbry.wallet.network import ClientSession
from lbry.wallet.rpc import RPCError
from lbry.wallet.server.db.elasticsearch.sync import run_sync, make_es_index
from lbry.wallet.server.db.elasticsearch.sync import make_es_index_and_run_sync
from lbry.wallet.server.session import LBRYElectrumX
from lbry.testcase import IntegrationTestCase, CommandTestCase
from lbry.wallet.orchstr8.node import SPVNode
@ -95,16 +95,17 @@ class TestESSync(CommandTestCase):
await self.generate(1)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
db = self.conductor.spv_node.server.db
env = self.conductor.spv_node.server.env
await db.search_index.delete_index()
db.search_index.clear_caches()
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
async def resync():
await db.search_index.start()
db.search_index.clear_caches()
await run_sync(index_name=db.search_index.index, db=db)
await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True)
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
@ -114,9 +115,12 @@ class TestESSync(CommandTestCase):
# this time we will test a migration from unversioned to v1
await db.search_index.sync_client.indices.delete_template(db.search_index.index)
await db.search_index.stop()
self.assertTrue(await make_es_index(db.search_index))
await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True)
await db.search_index.start()
await resync()
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
class TestHubDiscovery(CommandTestCase):

View file

@ -1397,47 +1397,32 @@ class ResolveClaimTakeovers(BaseResolveTestCase):
))[0][0]['trending_score']
claim_id1 = (await self.stream_create('derp', '1.0'))['outputs'][0]['claim_id']
claim_id2 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
claim_id3 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
claim_id4 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
claim_id5 = (await self.stream_create('derp', '1.0', allow_duplicate_name=True))['outputs'][0]['claim_id']
COIN = 1E9
COIN = 1E8
height = 99000
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id1, height, True, 1 * COIN, 1_000_000 * COIN
)
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id2, height, True, 1 * COIN, 100_000 * COIN
)
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id2, height + 1, False, 100_001 * COIN, 100_000 * COIN
)
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id3, height, True, 1 * COIN, 1_000 * COIN
)
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id4, height, True, 1 * COIN, 10 * COIN
claim_id1, height, 0, 10 * COIN
)
await self.generate(1)
self.assertEqual(3.1711298570548195e+76, await get_trending_score(claim_id1))
self.assertEqual(-1.369652719234026e+74, await get_trending_score(claim_id2))
self.assertEqual(2.925275298842502e+75, await get_trending_score(claim_id3))
self.assertEqual(5.193711055804491e+74, await get_trending_score(claim_id4))
self.assertEqual(0.6690521635580086, await get_trending_score(claim_id5))
self.assertEqual(172.64252836433135, await get_trending_score(claim_id1))
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id5, height + 100, True, 2 * COIN, 10 * COIN
claim_id1, height + 1, 10 * COIN, 100 * COIN
)
await self.generate(1)
self.assertEqual(5.664516565750028e+74, await get_trending_score(claim_id5))
self.assertEqual(173.45931832928875, await get_trending_score(claim_id1))
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id1, height + 100, 100 * COIN, 1000000 * COIN
)
await self.generate(1)
self.assertEqual(176.65517070393514, await get_trending_score(claim_id1))
self.conductor.spv_node.server.bp._add_claim_activation_change_notification(
claim_id1, height + 200, 1000000 * COIN, 1 * COIN
)
await self.generate(1)
self.assertEqual(-174.951347102643, await get_trending_score(claim_id1))
search_results = (await self.conductor.spv_node.server.bp.db.search_index.search(claim_name="derp"))[0]
self.assertEqual(5, len(search_results))
self.assertListEqual([claim_id1, claim_id3, claim_id4, claim_id2, claim_id5], [c['claim_id'] for c in search_results])
self.assertEqual(1, len(search_results))
self.assertListEqual([claim_id1], [c['claim_id'] for c in search_results])
class ResolveAfterReorg(BaseResolveTestCase):
@ -1458,7 +1443,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
txids = [
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
]
txs = await bp.db.fs_transactions(txids)
txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')

View file

@ -123,6 +123,9 @@ class TestRevertablePrefixDB(unittest.TestCase):
self.assertIsNone(self.db.claim_takeover.get(name))
self.db.claim_takeover.stage_put((name,), (claim_hash1, takeover_height))
self.assertIsNone(self.db.claim_takeover.get(name))
self.assertEqual(10000000, self.db.claim_takeover.get_pending(name).height)
self.db.commit(10000000)
self.assertEqual(10000000, self.db.claim_takeover.get(name).height)