Compare commits

...

14 commits

Author SHA1 Message Date
Victor Shyba
1f64275da1
fix ordering 2021-10-14 19:45:41 -04:00
Victor Shyba
8c605864e7
ignore request if protocol closed 2021-10-14 19:45:41 -04:00
Victor Shyba
6ec88e2101
initialize new sessions with high priority 2021-10-14 19:45:41 -04:00
Victor Shyba
17eeafd62c
schedule_requests is not async 2021-10-14 19:45:41 -04:00
Victor Shyba
28e4b3eefd
request scheduler 2021-10-14 19:45:41 -04:00
Jack Robison
b2de89ca29
sleeps 2021-10-14 18:57:46 -04:00
Jack Robison
8bfff2d549
improve resolve caching 2021-10-14 16:08:28 -04:00
Jack Robison
39d5078788
threadpools for block processor and es sync reader 2021-10-14 15:40:20 -04:00
Jack Robison
f8c46647d7
doc strings 2021-10-14 13:17:16 -04:00
Jack Robison
82de92c324
claim producer fix 2021-10-13 15:24:25 -04:00
Jack Robison
3dc3792478
improve claims_producer performance 2021-10-13 14:09:16 -04:00
Jack Robison
ba1d0a12d1
resolve lru cache 2021-10-13 10:18:03 -04:00
Jack Robison
e75047a0ab
add block_txs index 2021-10-12 11:18:11 -04:00
Jack Robison
83a167bd37
smaller caches 2021-10-12 11:18:11 -04:00
12 changed files with 229 additions and 172 deletions

View file

@ -424,10 +424,11 @@ class RPCSession(SessionBase):
self.max_errors = 0
self._bump_errors()
else:
for request in requests:
await self._task_group.add(self._handle_request(request))
self.schedule_requests(requests)
async def _handle_request(self, request):
if self.is_closing():
return
start = time.perf_counter()
try:
result = await self.handle_request(request)
@ -472,6 +473,10 @@ class RPCSession(SessionBase):
async def handle_request(self, request):
pass
def schedule_requests(self, requests):
for request in requests:
self._task_group.add(self._handle_request(request))
async def send_request(self, method, args=()):
"""Send an RPC request over the network."""
if self.is_closing():

View file

@ -3,14 +3,16 @@ import asyncio
import typing
from bisect import bisect_right
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple
from prometheus_client import Gauge, Histogram
from collections import defaultdict
import lbry
from lbry.schema.url import URL
from lbry.schema.claim import Claim
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
from lbry.utils import LRUCache
from lbry.wallet.transaction import OutputScript, Output, Transaction
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
from lbry.wallet.server.daemon import DaemonError
@ -202,6 +204,8 @@ class BlockProcessor:
self.env = env
self.db = db
self.daemon = daemon
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event
self.coin = env.coin
@ -231,6 +235,9 @@ class BlockProcessor:
self.db_op_stack: Optional[RevertableOpStack] = None
# self.search_cache = {}
self.resolve_cache = LRUCache(2**16)
self.resolve_outputs_cache = LRUCache(2 ** 16)
self.history_cache = {}
self.status_server = StatusServer()
@ -297,7 +304,11 @@ class BlockProcessor:
for claim_hash in self.removed_claims_to_send_es:
yield 'delete', claim_hash.hex()
async for claim in self.db.claims_producer(self.touched_claims_to_send_es):
to_update = await asyncio.get_event_loop().run_in_executor(
self._sync_reader_executor, self.db.claims_producer, self.touched_claims_to_send_es
)
for claim in to_update:
yield 'update', claim
async def run_in_thread_with_lock(self, func, *args):
@ -308,13 +319,12 @@ class BlockProcessor:
# consistent and not being updated elsewhere.
async def run_in_thread_locked():
async with self.state_lock:
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread_locked())
@staticmethod
async def run_in_thread(func, *args):
async def run_in_thread(self, func, *args):
async def run_in_thread():
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread())
async def check_and_advance_blocks(self, raw_blocks):
@ -1440,6 +1450,7 @@ class BlockProcessor:
self.db.prefix_db.block_hash.stage_put(key_args=(height,), value_args=(self.coin.header_hash(block.header),))
self.db.prefix_db.header.stage_put(key_args=(height,), value_args=(block.header,))
self.db.prefix_db.block_txs.stage_put(key_args=(height,), value_args=([tx_hash for tx, tx_hash in txs],))
for tx, tx_hash in txs:
spent_claims = {}
@ -1580,6 +1591,8 @@ class BlockProcessor:
self.pending_transaction_num_mapping.clear()
self.pending_transactions.clear()
self.pending_support_amount_change.clear()
self.resolve_cache.clear()
self.resolve_outputs_cache.clear()
async def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0
@ -1742,5 +1755,6 @@ class BlockProcessor:
self.status_server.stop()
# Shut down block processing
self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True)
self.db.close()
# self.executor.shutdown(wait=True)

View file

@ -55,7 +55,7 @@ class Daemon:
self.available_rpcs = {}
self.connector = aiohttp.TCPConnector()
self._block_hash_cache = LRUCacheWithMetrics(100000)
self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE)
self._block_cache = LRUCacheWithMetrics(2 ** 13, metric_name='block', namespace=NAMESPACE)
async def close(self):
if self.connector:

View file

@ -39,3 +39,4 @@ class DB_PREFIXES(enum.Enum):
db_state = b's'
channel_count = b'Z'
support_amount = b'a'
block_txs = b'b'

View file

@ -51,9 +51,7 @@ class SearchIndex:
self.index = index_prefix + 'claims'
self.logger = class_logger(__name__, self.__class__.__name__)
self.claim_cache = LRUCache(2 ** 15)
self.short_id_cache = LRUCache(2 ** 17)
self.search_cache = LRUCache(2 ** 17)
self.resolution_cache = LRUCache(2 ** 17)
self._elastic_host = elastic_host
self._elastic_port = elastic_port
self._trending_half_life = half_life
@ -260,9 +258,7 @@ class SearchIndex:
def clear_caches(self):
self.search_cache.clear()
self.short_id_cache.clear()
self.claim_cache.clear()
self.resolution_cache.clear()
async def cached_search(self, kwargs):
total_referenced = []
@ -354,21 +350,6 @@ class SearchIndex:
for result in expand_result(filter(lambda doc: doc['found'], results["docs"])):
self.claim_cache.set(result['claim_id'], result)
async def full_id_from_short_id(self, name, short_id, channel_id=None):
key = '#'.join((channel_id or '', name, short_id))
if key not in self.short_id_cache:
query = {'name': name, 'claim_id': short_id}
if channel_id:
query['channel_id'] = channel_id
query['order_by'] = ['^channel_join']
query['signature_valid'] = True
else:
query['order_by'] = '^creation_height'
result, _, _ = await self.search(**query, limit=1)
if len(result) == 1:
result = result[0]['claim_id']
self.short_id_cache[key] = result
return self.short_id_cache.get(key, None)
async def search(self, **kwargs):
try:

View file

@ -169,6 +169,14 @@ class BlockHashValue(NamedTuple):
return f"{self.__class__.__name__}(block_hash={self.block_hash.hex()})"
class BlockTxsKey(NamedTuple):
height: int
class BlockTxsValue(NamedTuple):
tx_hashes: typing.List[bytes]
class TxCountKey(NamedTuple):
height: int
@ -1540,6 +1548,36 @@ class DBStatePrefixRow(PrefixRow):
)
class BlockTxsPrefixRow(PrefixRow):
prefix = DB_PREFIXES.block_txs.value
key_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@classmethod
def unpack_key(cls, key: bytes) -> BlockTxsKey:
return BlockTxsKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, tx_hashes: typing.List[bytes]) -> bytes:
assert all(len(tx_hash) == 32 for tx_hash in tx_hashes)
return b''.join(tx_hashes)
@classmethod
def unpack_value(cls, data: bytes) -> BlockTxsValue:
return BlockTxsValue([data[i*32:(i+1)*32] for i in range(len(data) // 32)])
@classmethod
def pack_item(cls, height, tx_hashes):
return cls.pack_key(height), cls.pack_value(tx_hashes)
class LevelDBStore(KeyValueStorage):
def __init__(self, path: str, cache_mb: int, max_open_files: int):
import plyvel
@ -1604,6 +1642,7 @@ class HubDB(PrefixDB):
self.channel_count = ChannelCountPrefixRow(db, self._op_stack)
self.db_state = DBStatePrefixRow(db, self._op_stack)
self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
self.block_txs = BlockTxsPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -83,11 +83,26 @@ class OpStackIntegrity(Exception):
class RevertableOpStack:
def __init__(self, get_fn: Callable[[bytes], Optional[bytes]], unsafe_prefixes=None):
"""
This represents a sequence of revertable puts and deletes to a key-value database that checks for integrity
violations when applying the puts and deletes. The integrity checks assure that keys that do not exist
are not deleted, and that when keys are deleted the current value is correctly known so that the delete
may be undone. When putting values, the integrity checks assure that existing values are not overwritten
without first being deleted. Updates are performed by applying a delete op for the old value and a put op
for the new value.
:param get_fn: getter function from an object implementing `KeyValueStorage`
:param unsafe_prefixes: optional set of prefixes to ignore integrity errors for, violations are still logged
"""
self._get = get_fn
self._items = defaultdict(list)
self._unsafe_prefixes = unsafe_prefixes or set()
def append_op(self, op: RevertableOp):
"""
Apply a put or delete op, checking that it introduces no integrity errors
"""
inverted = op.invert()
if self._items[op.key] and inverted == self._items[op.key][-1]:
self._items[op.key].pop() # if the new op is the inverse of the last op, we can safely null both
@ -119,6 +134,9 @@ class RevertableOpStack:
self._items[op.key].append(op)
def extend_ops(self, ops: Iterable[RevertableOp]):
"""
Apply a sequence of put or delete ops, checking that they introduce no integrity errors
"""
for op in ops:
self.append_op(op)
@ -139,9 +157,15 @@ class RevertableOpStack:
yield op
def get_undo_ops(self) -> bytes:
"""
Get the serialized bytes to undo all of the changes made by the pending ops
"""
return b''.join(op.invert().pack() for op in reversed(self))
def apply_packed_undo_ops(self, packed: bytes):
"""
Unpack and apply a sequence of undo ops from serialized undo bytes
"""
while packed:
op, packed = RevertableOp.unpack(packed)
self.append_op(op)

View file

@ -112,7 +112,7 @@ class LevelDB:
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 16, metric_name='tx_and_merkle', namespace="wallet_server")
self.claim_to_txo: Dict[bytes, ClaimToTXOValue] = {}
self.txo_to_claim: DefaultDict[int, Dict[int, bytes]] = defaultdict(dict)
@ -696,23 +696,21 @@ class LevelDB:
yield meta
batch.clear()
async def claims_producer(self, claim_hashes: Set[bytes]):
def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
loop = asyncio.get_event_loop()
def produce_claim(claim_hash):
for claim_hash in claim_hashes:
if claim_hash not in self.claim_to_txo:
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
continue
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
continue
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo:
return
continue
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
@ -721,25 +719,13 @@ class LevelDB:
if claim:
batch.append(claim)
def get_metadata(claim):
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
results.append(meta)
if claim_hashes:
await asyncio.wait(
[loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes]
)
batch.sort(key=lambda x: x.tx_hash)
if batch:
await asyncio.wait(
[loop.run_in_executor(None, get_metadata, claim) for claim in batch]
)
for meta in results:
yield meta
batch.clear()
for claim in batch:
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if _meta:
results.append(_meta)
return results
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list)
@ -923,57 +909,51 @@ class LevelDB:
return None, tx_height
def get_block_txs(self, height: int) -> List[bytes]:
return [
tx_hash for tx_hash in self.prefix_db.tx_hash.iterate(
start=(self.tx_counts[height-1],), stop=(self.tx_counts[height],),
deserialize_value=False, include_key=False
)
]
return self.prefix_db.block_txs.get(height).tx_hashes
def _fs_transactions(self, txids: Iterable[str]):
tx_counts = self.tx_counts
tx_db_get = self.prefix_db.tx.get
tx_cache = self._tx_and_merkle_cache
async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]):
tx_infos = {}
for tx_hash in txids:
cached_tx = tx_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(tx_counts, tx_num)
tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
tx_cache[tx_hash] = tx, merkle
tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle)
for tx_hash in tx_hashes:
tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor(
None, self._get_transaction_and_merkle, tx_hash
)
await asyncio.sleep(0)
return tx_infos
async def fs_transactions(self, txids):
return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids)
def _get_transaction_and_merkle(self, tx_hash):
cached_tx = self._tx_and_merkle_cache.get(tx_hash)
if cached_tx:
tx, merkle = cached_tx
else:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = self.prefix_db.tx_num.get(tx_hash_bytes)
tx = None
tx_height = -1
tx_num = None if not tx_num else tx_num.tx_num
if tx_num is not None:
fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0
tx_height = bisect_right(self.tx_counts, tx_num)
tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False)
if tx_height == -1:
merkle = {
'block_height': -1
}
else:
tx_pos = tx_num - self.tx_counts[tx_height - 1]
branch, root = self.merkle.branch_and_root(
self.get_block_txs(tx_height), tx_pos
)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 10 < self.db_height:
self._tx_and_merkle_cache[tx_hash] = tx, merkle
return (None if not tx else tx.hex(), merkle)
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):

View file

@ -69,7 +69,7 @@ class Server:
def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers)
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)
def __exit():

View file

@ -2,7 +2,6 @@ import os
import ssl
import math
import time
import json
import base64
import codecs
import typing
@ -197,6 +196,8 @@ class SessionManager:
self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.consumers = 32
self.priority_queue = asyncio.PriorityQueue()
self.session_event = Event()
@ -546,8 +547,13 @@ class SessionManager:
# because we connect to ourself
await asyncio.wait([
self._clear_stale_sessions(),
self._manage_servers()
self._manage_servers(),
self.serve_requests()
])
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
log.exception("hub server died")
raise err
finally:
await self._close_servers(list(self.servers.keys()))
log.warning("disconnect %i sessions", len(self.sessions))
@ -557,6 +563,18 @@ class SessionManager:
])
await self.stop_other()
async def serve_requests(self):
async def consumer():
while True:
_, _, fut = await self.priority_queue.get()
try:
await fut
except asyncio.CancelledError:
raise
except Exception as e:
log.exception("raised while serving a request. This should never happen.")
await asyncio.gather(*(consumer() for _ in range(self.consumers)))
async def start_other(self):
pass
@ -633,7 +651,7 @@ class SessionManager:
self.mempool_statuses.pop(hashX, None)
await asyncio.get_event_loop().run_in_executor(
None, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys()
)
if touched or new_touched or (height_changed and self.mempool_statuses):
@ -775,10 +793,9 @@ class LBRYSessionManager(SessionManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.query_executor = None
self.websocket = None
# self.metrics = ServerLoadData()
self.metrics_loop = None
# self.metrics_loop = None
self.running = False
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
@ -795,12 +812,6 @@ class LBRYSessionManager(SessionManager):
async def start_other(self):
self.running = True
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1)
else:
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4)
)
if self.websocket is not None:
await self.websocket.start()
@ -808,7 +819,6 @@ class LBRYSessionManager(SessionManager):
self.running = False
if self.websocket is not None:
await self.websocket.stop()
self.query_executor.shutdown()
class LBRYElectrumX(SessionBase):
@ -880,6 +890,14 @@ class LBRYElectrumX(SessionBase):
self.daemon = self.session_mgr.daemon
self.bp: BlockProcessor = self.session_mgr.bp
self.db: LevelDB = self.bp.db
self.last_request_received_at = 0
def schedule_requests(self, requests):
for request in requests:
current = time.perf_counter()
elapsed = (1 << 65) - current - self.last_request_received_at
self.last_request_received_at = current
self.session_mgr.priority_queue.put_nowait((elapsed, id(request), self._handle_request(request)))
@classmethod
def protocol_min_max_strings(cls):
@ -971,24 +989,6 @@ class LBRYElectrumX(SessionBase):
# else:
# return APICallMetrics(query_name)
async def run_in_executor(self, query_name, func, kwargs):
start = time.perf_counter()
try:
self.session_mgr.pending_query_metric.inc()
result = await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, func, kwargs
)
except asyncio.CancelledError:
raise
except Exception:
log.exception("dear devs, please handle this exception better")
self.session_mgr.db_error_metric.inc()
raise RPCError(JSONRPC.INTERNAL_ERROR, 'unknown server error')
else:
return base64.b64encode(result).decode()
finally:
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
# async def run_and_cache_query(self, query_name, kwargs):
# start = time.perf_counter()
@ -1036,41 +1036,52 @@ class LBRYElectrumX(SessionBase):
self.session_mgr.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
def _claimtrie_resolve(self, *urls):
rows, extra = [], []
for url in urls:
self.session_mgr.urls_to_resolve_count_metric.inc()
stream, channel, repost, reposted_channel = self.db._resolve(url)
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
# print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra)))
return Outputs.to_base64(rows, extra, 0, None, None)
async def _cached_resolve_url(self, url):
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.bp.resolve_cache[url]
async def claimtrie_resolve(self, *urls):
result = await self.loop.run_in_executor(None, self._claimtrie_resolve, *urls)
self.session_mgr.resolved_url_count_metric.inc(len(urls))
return result
async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
try:
if sorted_urls in self.bp.resolve_outputs_cache:
return self.bp.resolve_outputs_cache[sorted_urls]
rows, extra = [], []
for url in urls:
if url not in self.bp.resolve_cache:
self.bp.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.bp.resolve_cache[url]
if isinstance(channel, ResolveCensoredError):
rows.append(channel)
extra.append(channel.censor_row)
elif isinstance(stream, ResolveCensoredError):
rows.append(stream)
extra.append(stream.censor_row)
elif channel and not stream:
rows.append(channel)
# print("resolved channel", channel.name.decode())
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
elif stream:
# print("resolved stream", stream.name.decode())
rows.append(stream)
if channel:
# print("and channel", channel.name.decode())
extra.append(channel)
if repost:
extra.append(repost)
if reposted_channel:
extra.append(reposted_channel)
await asyncio.sleep(0)
self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None
)
return result
finally:
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self):
return self.bp.height
@ -1221,9 +1232,11 @@ class LBRYElectrumX(SessionBase):
address: the address to subscribe to"""
if len(addresses) > 1000:
raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}')
return [
await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses
]
results = []
for address in addresses:
results.append(await self.hashX_subscribe(self.address_to_hashX(address), address))
await asyncio.sleep(0)
return results
async def address_unsubscribe(self, address):
"""Unsubscribe an address.
@ -1472,7 +1485,7 @@ class LBRYElectrumX(SessionBase):
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
for tx_hash in tx_hashes:
assert_tx_hash(tx_hash)
batch_result = await self.db.fs_transactions(tx_hashes)
batch_result = await self.db.get_transactions_and_merkles(tx_hashes)
needed_merkles = {}
for tx_hash in tx_hashes:

View file

@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase):
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(None, get_txids)
txs = await bp.db.fs_transactions(txids)
txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')

View file

@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase):
txids = [
tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height)
]
txs = await bp.db.fs_transactions(txids)
txs = await bp.db.get_transactions_and_merkles(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')