This commit is contained in:
8 changed files with 139 additions and 292 deletions
@ -104,7 +104,7 @@ class LBCDaemon:
async with self.workqueue_semaphore:
async with self.client_session() as session:
async with, data=data) as resp:
kind = resp.headers.get('Content-Type', None)
kind = resp.headers.get('Content-Type')
if kind == 'application/json':
return await resp.json()
# bitcoind's HTTP protocol "handling" is a bad joke
@ -319,10 +319,10 @@ class LBCDaemon:
async def getclaimsforname(self, name):
'''Given a name, retrieves all claims matching that name.'''
"""Given a name, retrieves all claims matching that name."""
return await self._send_single('getclaimsforname', (name,))
async def getbestblockhash(self):
'''Given a name, retrieves all claims matching that name.'''
"""Given a name, retrieves all claims matching that name."""
return await self._send_single('getbestblockhash')
@ -22,16 +22,6 @@ HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
# class cachedproperty:
# def __init__(self, f):
# self.f = f
# def __get__(self, obj, type):
# obj = obj or type
# value = self.f(obj)
# setattr(obj, self.f.__name__, value)
# return value
class StagedClaimtrieItem(typing.NamedTuple):
@ -988,14 +988,14 @@ class HubDB:
merkle = {
'block_height': tx_height,
'merkle': [
for hash in branch
for _hash in branch
'pos': tx_pos
if tx_height + 10 < self.db_height:
self._tx_and_merkle_cache[tx_hash] = tx, merkle
return (None if not tx else tx.hex(), merkle)
return None if not tx else tx.hex(), merkle
async def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
@ -56,13 +56,13 @@ class PrefixRow(metaclass=PrefixRowType):
stop = self.pack_partial_key(*stop)
if deserialize_key:
key_getter = lambda k: self.unpack_key(k)
key_getter = lambda _k: self.unpack_key(_k)
key_getter = lambda k: k
key_getter = lambda _k: _k
if deserialize_value:
value_getter = lambda v: self.unpack_value(v)
value_getter = lambda _v: self.unpack_value(_v)
value_getter = lambda v: v
value_getter = lambda _v: _v
it = self._db.iterator(
start or prefix, self._column_family, iterate_lower_bound=(start or prefix),
@ -20,149 +20,6 @@ if TYPE_CHECKING:
from scribe.db import HubDB
def expand_query(**kwargs):
if "amount_order" in kwargs:
kwargs["limit"] = 1
kwargs["order_by"] = "effective_amount"
kwargs["offset"] = int(kwargs["amount_order"]) - 1
if 'name' in kwargs:
kwargs['name'] = normalize_name(kwargs.pop('name'))
if kwargs.get('is_controlling') is False:
query = {'must': [], 'must_not': []}
collapse = None
if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None:
kwargs['fee_currency'] = kwargs['fee_currency'].upper()
for key, value in kwargs.items():
key = key.replace('claim.', '')
many = key.endswith('__in') or isinstance(value, list)
if many and len(value) > 2048:
raise TooManyClaimSearchParametersError(key, 2048)
if many:
key = key.replace('__in', '')
value = list(filter(None, value))
if value is None or isinstance(value, list) and len(value) == 0:
key = REPLACEMENTS.get(key, key)
if key in FIELDS:
partial_id = False
if key == 'claim_type':
if isinstance(value, str):
value = CLAIM_TYPES[value]
value = [CLAIM_TYPES[claim_type] for claim_type in value]
elif key == 'stream_type':
value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value))
if key == '_id':
if isinstance(value, Iterable):
value = [item[::-1].hex() for item in value]
value = value[::-1].hex()
if not many and key in ('_id', 'claim_id') and len(value) < 20:
partial_id = True
if key in ('signature_valid', 'has_source'):
continue # handled later
if key in TEXT_FIELDS:
key += '.keyword'
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
if partial_id:
query['must'].append({"prefix": {"claim_id": value}})
elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
operator_length = 2 if value[:2] in ops else 1
operator, value = value[:operator_length], value[operator_length:]
if key == 'fee_amount':
value = str(Decimal(value)*1000)
query['must'].append({"range": {key: {ops[operator]: value}}})
elif many:
query['must'].append({"terms": {key: value}})
if key == 'fee_amount':
value = str(Decimal(value)*1000)
query['must'].append({"term": {key: {"value": value}}})
elif key == 'not_channel_ids':
for channel_id in value:
query['must_not'].append({"term": {'channel_id.keyword': channel_id}})
query['must_not'].append({"term": {'_id': channel_id}})
elif key == 'channel_ids':
query['must'].append({"terms": {'channel_id.keyword': value}})
elif key == 'claim_ids':
query['must'].append({"terms": {'claim_id.keyword': value}})
elif key == 'media_types':
query['must'].append({"terms": {'media_type.keyword': value}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': clean_tags(value)}})
elif key == 'any_languages':
query['must'].append({"terms": {'languages': value}})
elif key == 'all_languages':
query['must'].extend([{"term": {'languages': tag}} for tag in value])
elif key == 'any_tags':
query['must'].append({"terms": {'tags.keyword': clean_tags(value)}})
elif key == 'all_tags':
query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
elif key == 'not_tags':
query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)])
elif key == 'not_claim_id':
query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value])
elif key == 'limit_claims_per_channel':
collapse = ('channel_id.keyword', value)
if kwargs.get('has_channel_signature'):
query['must'].append({"exists": {"field": "signature"}})
if 'signature_valid' in kwargs:
query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
elif 'signature_valid' in kwargs:
query.setdefault('should', [])
query["minimum_should_match"] = 1
query['should'].append({"bool": {"must_not": {"exists": {"field": "signature"}}}})
query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}})
if 'has_source' in kwargs:
query.setdefault('should', [])
query["minimum_should_match"] = 1
is_stream_or_repost = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}}
{"bool": {"must": [{"match": {"has_source": kwargs['has_source']}}, is_stream_or_repost]}})
query['should'].append({"bool": {"must_not": [is_stream_or_repost]}})
query['should'].append({"bool": {"must": [{"term": {"reposted_claim_type": CLAIM_TYPES['channel']}}]}})
if kwargs.get('text'):
{"query": kwargs["text"], "fields": [
"claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5"
query = {
"_source": {"excludes": ["description", "title"]},
'query': {'bool': query},
"sort": [],
if "limit" in kwargs:
query["size"] = kwargs["limit"]
if 'offset' in kwargs:
query["from"] = kwargs["offset"]
if 'order_by' in kwargs:
if isinstance(kwargs["order_by"], str):
kwargs["order_by"] = [kwargs["order_by"]]
for value in kwargs['order_by']:
if 'trending_group' in value:
# fixme: trending_mixed is 0 for all records on variable decay, making sort slow.
is_asc = value.startswith('^')
value = value[1:] if is_asc else value
value = REPLACEMENTS.get(value, value)
if value in TEXT_FIELDS:
value += '.keyword'
query['sort'].append({value: "asc" if is_asc else "desc"})
if collapse:
query["collapse"] = {
"field": collapse[0],
"inner_hits": {
"name": collapse[0],
"size": collapse[1],
"sort": query["sort"]
return query
class ChannelResolution(str):
def lookup_error(cls, url):
@ -437,6 +437,12 @@ class JSONRPCAutoDetect(JSONRPCv2):
return protocol_for_payload(main)
class ResultEvent(asyncio.Event):
def __init__(self, loop=None):
self.result = None
class JSONRPCConnection:
"""Maintains state of a JSON RPC connection, in particular
encapsulating the handling of request IDs.
@ -453,7 +459,7 @@ class JSONRPCConnection:
# Sent Requests and Batches that have not received a response.
# The key is its request ID; for a batch it is sorted tuple
# of request IDs
self._requests: typing.Dict[str, typing.Tuple[Request, Event]] = {}
self._requests: typing.Dict[str, typing.Tuple[Request, ResultEvent]] = {}
# A public attribute intended to be settable dynamically
self.max_response_size = 0
@ -533,7 +539,7 @@ class JSONRPCConnection:
return message
def _event(self, request, request_id):
event = Event()
event = ResultEvent()
self._requests[request_id] = (request, event)
return event
@ -142,7 +142,7 @@ class MemPool:
if tx_hash not in self.txs:
continue # the tx hash for the touched address is an input that isn't in mempool anymore
tx = self.txs[tx_hash]
has_ui = any(hash in self.txs for hash, idx in tx.in_pairs)
has_ui = any(_hash in self.txs for _hash, idx in tx.in_pairs)
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
return result
@ -193,7 +193,7 @@ class MemPool:
if tx_hash not in self.txs:
return -2
tx = self.txs[tx_hash]
unspent_inputs = any(hash in self.raw_mempool for hash, idx in tx.in_pairs)
unspent_inputs = any(_hash in self.raw_mempool for _hash, idx in tx.in_pairs)
if unspent_inputs:
return -1
return 0
@ -409,115 +409,115 @@ class SessionManager:
self.notified_height = height
# --- LocalRPC command handlers
async def rpc_add_peer(self, real_name):
"""Add a peer.
real_name: " t50001 s50002" for example
await self._notify_peer(real_name)
return f"peer '{real_name}' added"
async def rpc_disconnect(self, session_ids):
"""Disconnect sessions.
session_ids: array of session IDs
async def close(session):
"""Close the session's transport."""
await session.close(force_after=2)
return f'disconnected {session.session_id}'
return await self._for_each_session(session_ids, close)
async def rpc_log(self, session_ids):
"""Toggle logging of sessions.
session_ids: array of session IDs
async def toggle_logging(session):
"""Toggle logging of the session."""
return f'log {session.session_id}: {session.log_me}'
return await self._for_each_session(session_ids, toggle_logging)
async def rpc_daemon_url(self, daemon_url):
"""Replace the daemon URL."""
daemon_url = daemon_url or self.env.daemon_url
except Exception as e:
raise RPCError(BAD_REQUEST, f'an error occurred: {e!r}')
return f'now using daemon at {self.daemon.logged_url()}'
async def rpc_stop(self):
"""Shut down the server cleanly."""
return 'stopping'
async def rpc_getinfo(self):
"""Return summary information about the server process."""
return self._get_info()
async def rpc_groups(self):
"""Return statistics about the session groups."""
return self._group_data()
async def rpc_peers(self):
"""Return a list of data about server peers."""
return self.env.peer_hubs
async def rpc_query(self, items, limit):
"""Return a list of data about server peers."""
coin = self.env.coin
db = self.db
lines = []
def arg_to_hashX(arg):
script = bytes.fromhex(arg)
lines.append(f'Script: {arg}')
return coin.hashX_from_script(script)
except ValueError:
hashX = coin.address_to_hashX(arg)
except Base58Error as e:
return None
lines.append(f'Address: {arg}')
return hashX
for arg in items:
hashX = arg_to_hashX(arg)
if not hashX:
n = None
history = await db.limited_history(hashX, limit=limit)
for n, (tx_hash, height) in enumerate(history):
lines.append(f'History #{n:,d}: height {height:,d} '
f'tx_hash {hash_to_hex_str(tx_hash)}')
if n is None:
lines.append('No history found')
n = None
utxos = await db.all_utxos(hashX)
for n, utxo in enumerate(utxos, start=1):
lines.append(f'UTXO #{n:,d}: tx_hash '
f'{hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height '
f'{utxo.height:,d} value {utxo.value:,d}')
if n == limit:
if n is None:
lines.append('No UTXOs found')
balance = sum(utxo.value for utxo in utxos)
lines.append(f'Balance: {coin.decimal_value(balance):,f} '
return lines
# async def rpc_add_peer(self, real_name):
# """Add a peer.
# real_name: " t50001 s50002" for example
# """
# await self._notify_peer(real_name)
# return f"peer '{real_name}' added"
# async def rpc_disconnect(self, session_ids):
# """Disconnect sessions.
# session_ids: array of session IDs
# """
# async def close(session):
# """Close the session's transport."""
# await session.close(force_after=2)
# return f'disconnected {session.session_id}'
# return await self._for_each_session(session_ids, close)
# async def rpc_log(self, session_ids):
# """Toggle logging of sessions.
# session_ids: array of session IDs
# """
# async def toggle_logging(session):
# """Toggle logging of the session."""
# session.toggle_logging()
# return f'log {session.session_id}: {session.log_me}'
# return await self._for_each_session(session_ids, toggle_logging)
# async def rpc_daemon_url(self, daemon_url):
# """Replace the daemon URL."""
# daemon_url = daemon_url or self.env.daemon_url
# try:
# self.daemon.set_url(daemon_url)
# except Exception as e:
# raise RPCError(BAD_REQUEST, f'an error occurred: {e!r}')
# return f'now using daemon at {self.daemon.logged_url()}'
# async def rpc_stop(self):
# """Shut down the server cleanly."""
# self.shutdown_event.set()
# return 'stopping'
# async def rpc_getinfo(self):
# """Return summary information about the server process."""
# return self._get_info()
# async def rpc_groups(self):
# """Return statistics about the session groups."""
# return self._group_data()
# async def rpc_peers(self):
# """Return a list of data about server peers."""
# return self.env.peer_hubs
# async def rpc_query(self, items, limit):
# """Return a list of data about server peers."""
# coin = self.env.coin
# db = self.db
# lines = []
# def arg_to_hashX(arg):
# try:
# script = bytes.fromhex(arg)
# lines.append(f'Script: {arg}')
# return coin.hashX_from_script(script)
# except ValueError:
# pass
# try:
# hashX = coin.address_to_hashX(arg)
# except Base58Error as e:
# lines.append(e.args[0])
# return None
# lines.append(f'Address: {arg}')
# return hashX
# for arg in items:
# hashX = arg_to_hashX(arg)
# if not hashX:
# continue
# n = None
# history = await db.limited_history(hashX, limit=limit)
# for n, (tx_hash, height) in enumerate(history):
# lines.append(f'History #{n:,d}: height {height:,d} '
# f'tx_hash {hash_to_hex_str(tx_hash)}')
# if n is None:
# lines.append('No history found')
# n = None
# utxos = await db.all_utxos(hashX)
# for n, utxo in enumerate(utxos, start=1):
# lines.append(f'UTXO #{n:,d}: tx_hash '
# f'{hash_to_hex_str(utxo.tx_hash)} '
# f'tx_pos {utxo.tx_pos:,d} height '
# f'{utxo.height:,d} value {utxo.value:,d}')
# if n == limit:
# break
# if n is None:
# lines.append('No UTXOs found')
# balance = sum(utxo.value for utxo in utxos)
# lines.append(f'Balance: {coin.decimal_value(balance):,f} '
# f'{coin.SHORTNAME}')
# return lines
# async def rpc_reorg(self, count):
# """Force a reorg of the given number of blocks.
@ -804,11 +804,6 @@ class LBRYElectrumX(asyncio.Protocol):
return f'{ip_addr_str}:{port}'
def receive_message(self, message):
if self.log_me:
||||'processing {message}')
return self._receive_message_orig(message)
def toggle_logging(self):
self.log_me = not self.log_me
@ -1538,14 +1533,14 @@ class LBRYElectrumX(asyncio.Protocol):
height = non_negative_integer(height)
return await self.session_manager.electrum_header(height)
def is_tor(self):
"""Try to detect if the connection is to a tor hidden service we are
peername = self.peer_mgr.proxy_peername()
if not peername:
return False
peer_address = self.peer_address()
return peer_address and peer_address[0] == peername[0]
# def is_tor(self):
# """Try to detect if the connection is to a tor hidden service we are
# running."""
# peername = self.peer_mgr.proxy_peername()
# if not peername:
# return False
# peer_address = self.peer_address()
# return peer_address and peer_address[0] == peername[0]
async def replaced_banner(self, banner):
network_info = await self.daemon_request('getnetworkinfo')
@ -1727,9 +1722,9 @@ class LBRYElectrumX(asyncio.Protocol):
tx_hashes: ordered list of hex strings of tx hashes in a block
tx_pos: index of transaction in tx_hashes to create branch for
hashes = [hex_str_to_hash(hash) for hash in tx_hashes]
hashes = [hex_str_to_hash(_hash) for _hash in tx_hashes]
branch, root = self.db.merkle.branch_and_root(hashes, tx_pos)
branch = [hash_to_hex_str(hash) for hash in branch]
branch = [hash_to_hex_str(_hash) for _hash in branch]
return branch
async def transaction_merkle(self, tx_hash, height):
@ -1747,7 +1742,6 @@ class LBRYElectrumX(asyncio.Protocol):
return result[tx_hash][1]
def get_from_possible_keys(dictionary, *keys):
for key in keys:
if key in dictionary:
Add table
Reference in a new issue