diff --git a/scribe/blockchain/daemon.py b/scribe/blockchain/daemon.py index 6567d01..fdf0e4e 100644 --- a/scribe/blockchain/daemon.py +++ b/scribe/blockchain/daemon.py @@ -104,7 +104,7 @@ class LBCDaemon: async with self.workqueue_semaphore: async with self.client_session() as session: async with session.post(self.current_url(), data=data) as resp: - kind = resp.headers.get('Content-Type', None) + kind = resp.headers.get('Content-Type') if kind == 'application/json': return await resp.json() # bitcoind's HTTP protocol "handling" is a bad joke @@ -319,10 +319,10 @@ class LBCDaemon: @handles_errors async def getclaimsforname(self, name): - '''Given a name, retrieves all claims matching that name.''' + """Given a name, retrieves all claims matching that name.""" return await self._send_single('getclaimsforname', (name,)) @handles_errors async def getbestblockhash(self): - '''Given a name, retrieves all claims matching that name.''' + """Given a name, retrieves all claims matching that name.""" return await self._send_single('getbestblockhash') diff --git a/scribe/common.py b/scribe/common.py index 490c3e4..45d122d 100644 --- a/scribe/common.py +++ b/scribe/common.py @@ -22,16 +22,6 @@ HISTOGRAM_BUCKETS = ( .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf') ) -# class cachedproperty: -# def __init__(self, f): -# self.f = f -# -# def __get__(self, obj, type): -# obj = obj or type -# value = self.f(obj) -# setattr(obj, self.f.__name__, value) -# return value - class StagedClaimtrieItem(typing.NamedTuple): """ diff --git a/scribe/db/db.py b/scribe/db/db.py index b43e17d..462694b 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -988,14 +988,14 @@ class HubDB: merkle = { 'block_height': tx_height, 'merkle': [ - hash_to_hex_str(hash) - for hash in branch + hash_to_hex_str(_hash) + for _hash in branch ], 'pos': tx_pos } if tx_height + 10 < self.db_height: self._tx_and_merkle_cache[tx_hash] = tx, merkle - return (None if not tx else tx.hex(), merkle) + return None if not tx else tx.hex(), merkle async def fs_block_hashes(self, height, count): if height + count > len(self.headers): diff --git a/scribe/db/interface.py b/scribe/db/interface.py index e0f569e..e9b5b19 100644 --- a/scribe/db/interface.py +++ b/scribe/db/interface.py @@ -56,13 +56,13 @@ class PrefixRow(metaclass=PrefixRowType): stop = self.pack_partial_key(*stop) if deserialize_key: - key_getter = lambda k: self.unpack_key(k) + key_getter = lambda _k: self.unpack_key(_k) else: - key_getter = lambda k: k + key_getter = lambda _k: _k if deserialize_value: - value_getter = lambda v: self.unpack_value(v) + value_getter = lambda _v: self.unpack_value(_v) else: - value_getter = lambda v: v + value_getter = lambda _v: _v it = self._db.iterator( start or prefix, self._column_family, iterate_lower_bound=(start or prefix), diff --git a/scribe/elasticsearch/search.py b/scribe/elasticsearch/search.py index b4d335b..6ba31ef 100644 --- a/scribe/elasticsearch/search.py +++ b/scribe/elasticsearch/search.py @@ -20,149 +20,6 @@ if TYPE_CHECKING: from scribe.db import HubDB -def expand_query(**kwargs): - if "amount_order" in kwargs: - kwargs["limit"] = 1 - kwargs["order_by"] = "effective_amount" - kwargs["offset"] = int(kwargs["amount_order"]) - 1 - if 'name' in kwargs: - kwargs['name'] = normalize_name(kwargs.pop('name')) - if kwargs.get('is_controlling') is False: - kwargs.pop('is_controlling') - query = {'must': [], 'must_not': []} - collapse = None - if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None: - kwargs['fee_currency'] = kwargs['fee_currency'].upper() - for key, value in kwargs.items(): - key = key.replace('claim.', '') - many = key.endswith('__in') or isinstance(value, list) - if many and len(value) > 2048: - raise TooManyClaimSearchParametersError(key, 2048) - if many: - key = key.replace('__in', '') - value = list(filter(None, value)) - if value is None or isinstance(value, list) and len(value) == 0: - continue - key = REPLACEMENTS.get(key, key) - if key in FIELDS: - partial_id = False - if key == 'claim_type': - if isinstance(value, str): - value = CLAIM_TYPES[value] - else: - value = [CLAIM_TYPES[claim_type] for claim_type in value] - elif key == 'stream_type': - value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value)) - if key == '_id': - if isinstance(value, Iterable): - value = [item[::-1].hex() for item in value] - else: - value = value[::-1].hex() - if not many and key in ('_id', 'claim_id') and len(value) < 20: - partial_id = True - if key in ('signature_valid', 'has_source'): - continue # handled later - if key in TEXT_FIELDS: - key += '.keyword' - ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} - if partial_id: - query['must'].append({"prefix": {"claim_id": value}}) - elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops: - operator_length = 2 if value[:2] in ops else 1 - operator, value = value[:operator_length], value[operator_length:] - if key == 'fee_amount': - value = str(Decimal(value)*1000) - query['must'].append({"range": {key: {ops[operator]: value}}}) - elif many: - query['must'].append({"terms": {key: value}}) - else: - if key == 'fee_amount': - value = str(Decimal(value)*1000) - query['must'].append({"term": {key: {"value": value}}}) - elif key == 'not_channel_ids': - for channel_id in value: - query['must_not'].append({"term": {'channel_id.keyword': channel_id}}) - query['must_not'].append({"term": {'_id': channel_id}}) - elif key == 'channel_ids': - query['must'].append({"terms": {'channel_id.keyword': value}}) - elif key == 'claim_ids': - query['must'].append({"terms": {'claim_id.keyword': value}}) - elif key == 'media_types': - query['must'].append({"terms": {'media_type.keyword': value}}) - elif key == 'any_languages': - query['must'].append({"terms": {'languages': clean_tags(value)}}) - elif key == 'any_languages': - query['must'].append({"terms": {'languages': value}}) - elif key == 'all_languages': - query['must'].extend([{"term": {'languages': tag}} for tag in value]) - elif key == 'any_tags': - query['must'].append({"terms": {'tags.keyword': clean_tags(value)}}) - elif key == 'all_tags': - query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) - elif key == 'not_tags': - query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) - elif key == 'not_claim_id': - query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value]) - elif key == 'limit_claims_per_channel': - collapse = ('channel_id.keyword', value) - if kwargs.get('has_channel_signature'): - query['must'].append({"exists": {"field": "signature"}}) - if 'signature_valid' in kwargs: - query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) - elif 'signature_valid' in kwargs: - query.setdefault('should', []) - query["minimum_should_match"] = 1 - query['should'].append({"bool": {"must_not": {"exists": {"field": "signature"}}}}) - query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) - if 'has_source' in kwargs: - query.setdefault('should', []) - query["minimum_should_match"] = 1 - is_stream_or_repost = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}} - query['should'].append( - {"bool": {"must": [{"match": {"has_source": kwargs['has_source']}}, is_stream_or_repost]}}) - query['should'].append({"bool": {"must_not": [is_stream_or_repost]}}) - query['should'].append({"bool": {"must": [{"term": {"reposted_claim_type": CLAIM_TYPES['channel']}}]}}) - if kwargs.get('text'): - query['must'].append( - {"simple_query_string": - {"query": kwargs["text"], "fields": [ - "claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5" - ]}}) - query = { - "_source": {"excludes": ["description", "title"]}, - 'query': {'bool': query}, - "sort": [], - } - if "limit" in kwargs: - query["size"] = kwargs["limit"] - if 'offset' in kwargs: - query["from"] = kwargs["offset"] - if 'order_by' in kwargs: - if isinstance(kwargs["order_by"], str): - kwargs["order_by"] = [kwargs["order_by"]] - for value in kwargs['order_by']: - if 'trending_group' in value: - # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. - continue - is_asc = value.startswith('^') - value = value[1:] if is_asc else value - value = REPLACEMENTS.get(value, value) - if value in TEXT_FIELDS: - value += '.keyword' - query['sort'].append({value: "asc" if is_asc else "desc"}) - if collapse: - query["collapse"] = { - "field": collapse[0], - "inner_hits": { - "name": collapse[0], - "size": collapse[1], - "sort": query["sort"] - } - } - return query - - - class ChannelResolution(str): @classmethod def lookup_error(cls, url): diff --git a/scribe/hub/jsonrpc.py b/scribe/hub/jsonrpc.py index a92c0ec..aae8c4e 100644 --- a/scribe/hub/jsonrpc.py +++ b/scribe/hub/jsonrpc.py @@ -437,6 +437,12 @@ class JSONRPCAutoDetect(JSONRPCv2): return protocol_for_payload(main) +class ResultEvent(asyncio.Event): + def __init__(self, loop=None): + super().__init__(loop=loop) + self.result = None + + class JSONRPCConnection: """Maintains state of a JSON RPC connection, in particular encapsulating the handling of request IDs. @@ -453,7 +459,7 @@ class JSONRPCConnection: # Sent Requests and Batches that have not received a response. # The key is its request ID; for a batch it is sorted tuple # of request IDs - self._requests: typing.Dict[str, typing.Tuple[Request, Event]] = {} + self._requests: typing.Dict[str, typing.Tuple[Request, ResultEvent]] = {} # A public attribute intended to be settable dynamically self.max_response_size = 0 @@ -533,7 +539,7 @@ class JSONRPCConnection: return message def _event(self, request, request_id): - event = Event() + event = ResultEvent() self._requests[request_id] = (request, event) return event diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index 3613749..1403310 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -142,7 +142,7 @@ class MemPool: if tx_hash not in self.txs: continue # the tx hash for the touched address is an input that isn't in mempool anymore tx = self.txs[tx_hash] - has_ui = any(hash in self.txs for hash, idx in tx.in_pairs) + has_ui = any(_hash in self.txs for _hash, idx in tx.in_pairs) result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) return result @@ -193,7 +193,7 @@ class MemPool: if tx_hash not in self.txs: return -2 tx = self.txs[tx_hash] - unspent_inputs = any(hash in self.raw_mempool for hash, idx in tx.in_pairs) + unspent_inputs = any(_hash in self.raw_mempool for _hash, idx in tx.in_pairs) if unspent_inputs: return -1 return 0 diff --git a/scribe/hub/session.py b/scribe/hub/session.py index b1cc13c..c2fa2a1 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -409,115 +409,115 @@ class SessionManager: self.notified_height = height # --- LocalRPC command handlers - - async def rpc_add_peer(self, real_name): - """Add a peer. - - real_name: "bch.electrumx.cash t50001 s50002" for example - """ - await self._notify_peer(real_name) - return f"peer '{real_name}' added" - - async def rpc_disconnect(self, session_ids): - """Disconnect sessions. - - session_ids: array of session IDs - """ - async def close(session): - """Close the session's transport.""" - await session.close(force_after=2) - return f'disconnected {session.session_id}' - - return await self._for_each_session(session_ids, close) - - async def rpc_log(self, session_ids): - """Toggle logging of sessions. - - session_ids: array of session IDs - """ - async def toggle_logging(session): - """Toggle logging of the session.""" - session.toggle_logging() - return f'log {session.session_id}: {session.log_me}' - - return await self._for_each_session(session_ids, toggle_logging) - - async def rpc_daemon_url(self, daemon_url): - """Replace the daemon URL.""" - daemon_url = daemon_url or self.env.daemon_url - try: - self.daemon.set_url(daemon_url) - except Exception as e: - raise RPCError(BAD_REQUEST, f'an error occurred: {e!r}') - return f'now using daemon at {self.daemon.logged_url()}' - - async def rpc_stop(self): - """Shut down the server cleanly.""" - self.shutdown_event.set() - return 'stopping' - - async def rpc_getinfo(self): - """Return summary information about the server process.""" - return self._get_info() - - async def rpc_groups(self): - """Return statistics about the session groups.""" - return self._group_data() - - async def rpc_peers(self): - """Return a list of data about server peers.""" - return self.env.peer_hubs - - async def rpc_query(self, items, limit): - """Return a list of data about server peers.""" - coin = self.env.coin - db = self.db - lines = [] - - def arg_to_hashX(arg): - try: - script = bytes.fromhex(arg) - lines.append(f'Script: {arg}') - return coin.hashX_from_script(script) - except ValueError: - pass - - try: - hashX = coin.address_to_hashX(arg) - except Base58Error as e: - lines.append(e.args[0]) - return None - lines.append(f'Address: {arg}') - return hashX - - for arg in items: - hashX = arg_to_hashX(arg) - if not hashX: - continue - n = None - history = await db.limited_history(hashX, limit=limit) - for n, (tx_hash, height) in enumerate(history): - lines.append(f'History #{n:,d}: height {height:,d} ' - f'tx_hash {hash_to_hex_str(tx_hash)}') - if n is None: - lines.append('No history found') - n = None - utxos = await db.all_utxos(hashX) - for n, utxo in enumerate(utxos, start=1): - lines.append(f'UTXO #{n:,d}: tx_hash ' - f'{hash_to_hex_str(utxo.tx_hash)} ' - f'tx_pos {utxo.tx_pos:,d} height ' - f'{utxo.height:,d} value {utxo.value:,d}') - if n == limit: - break - if n is None: - lines.append('No UTXOs found') - - balance = sum(utxo.value for utxo in utxos) - lines.append(f'Balance: {coin.decimal_value(balance):,f} ' - f'{coin.SHORTNAME}') - - return lines + # + # async def rpc_add_peer(self, real_name): + # """Add a peer. + # + # real_name: "bch.electrumx.cash t50001 s50002" for example + # """ + # await self._notify_peer(real_name) + # return f"peer '{real_name}' added" + # + # async def rpc_disconnect(self, session_ids): + # """Disconnect sessions. + # + # session_ids: array of session IDs + # """ + # async def close(session): + # """Close the session's transport.""" + # await session.close(force_after=2) + # return f'disconnected {session.session_id}' + # + # return await self._for_each_session(session_ids, close) + # + # async def rpc_log(self, session_ids): + # """Toggle logging of sessions. + # + # session_ids: array of session IDs + # """ + # async def toggle_logging(session): + # """Toggle logging of the session.""" + # session.toggle_logging() + # return f'log {session.session_id}: {session.log_me}' + # + # return await self._for_each_session(session_ids, toggle_logging) + # + # async def rpc_daemon_url(self, daemon_url): + # """Replace the daemon URL.""" + # daemon_url = daemon_url or self.env.daemon_url + # try: + # self.daemon.set_url(daemon_url) + # except Exception as e: + # raise RPCError(BAD_REQUEST, f'an error occurred: {e!r}') + # return f'now using daemon at {self.daemon.logged_url()}' + # + # async def rpc_stop(self): + # """Shut down the server cleanly.""" + # self.shutdown_event.set() + # return 'stopping' + # + # async def rpc_getinfo(self): + # """Return summary information about the server process.""" + # return self._get_info() + # + # async def rpc_groups(self): + # """Return statistics about the session groups.""" + # return self._group_data() + # + # async def rpc_peers(self): + # """Return a list of data about server peers.""" + # return self.env.peer_hubs + # + # async def rpc_query(self, items, limit): + # """Return a list of data about server peers.""" + # coin = self.env.coin + # db = self.db + # lines = [] + # + # def arg_to_hashX(arg): + # try: + # script = bytes.fromhex(arg) + # lines.append(f'Script: {arg}') + # return coin.hashX_from_script(script) + # except ValueError: + # pass + # + # try: + # hashX = coin.address_to_hashX(arg) + # except Base58Error as e: + # lines.append(e.args[0]) + # return None + # lines.append(f'Address: {arg}') + # return hashX + # + # for arg in items: + # hashX = arg_to_hashX(arg) + # if not hashX: + # continue + # n = None + # history = await db.limited_history(hashX, limit=limit) + # for n, (tx_hash, height) in enumerate(history): + # lines.append(f'History #{n:,d}: height {height:,d} ' + # f'tx_hash {hash_to_hex_str(tx_hash)}') + # if n is None: + # lines.append('No history found') + # n = None + # utxos = await db.all_utxos(hashX) + # for n, utxo in enumerate(utxos, start=1): + # lines.append(f'UTXO #{n:,d}: tx_hash ' + # f'{hash_to_hex_str(utxo.tx_hash)} ' + # f'tx_pos {utxo.tx_pos:,d} height ' + # f'{utxo.height:,d} value {utxo.value:,d}') + # if n == limit: + # break + # if n is None: + # lines.append('No UTXOs found') + # + # balance = sum(utxo.value for utxo in utxos) + # lines.append(f'Balance: {coin.decimal_value(balance):,f} ' + # f'{coin.SHORTNAME}') + # + # return lines # async def rpc_reorg(self, count): # """Force a reorg of the given number of blocks. @@ -804,11 +804,6 @@ class LBRYElectrumX(asyncio.Protocol): else: return f'{ip_addr_str}:{port}' - def receive_message(self, message): - if self.log_me: - self.logger.info(f'processing {message}') - return self._receive_message_orig(message) - def toggle_logging(self): self.log_me = not self.log_me @@ -1538,14 +1533,14 @@ class LBRYElectrumX(asyncio.Protocol): height = non_negative_integer(height) return await self.session_manager.electrum_header(height) - def is_tor(self): - """Try to detect if the connection is to a tor hidden service we are - running.""" - peername = self.peer_mgr.proxy_peername() - if not peername: - return False - peer_address = self.peer_address() - return peer_address and peer_address[0] == peername[0] + # def is_tor(self): + # """Try to detect if the connection is to a tor hidden service we are + # running.""" + # peername = self.peer_mgr.proxy_peername() + # if not peername: + # return False + # peer_address = self.peer_address() + # return peer_address and peer_address[0] == peername[0] async def replaced_banner(self, banner): network_info = await self.daemon_request('getnetworkinfo') @@ -1727,9 +1722,9 @@ class LBRYElectrumX(asyncio.Protocol): tx_hashes: ordered list of hex strings of tx hashes in a block tx_pos: index of transaction in tx_hashes to create branch for """ - hashes = [hex_str_to_hash(hash) for hash in tx_hashes] + hashes = [hex_str_to_hash(_hash) for _hash in tx_hashes] branch, root = self.db.merkle.branch_and_root(hashes, tx_pos) - branch = [hash_to_hex_str(hash) for hash in branch] + branch = [hash_to_hex_str(_hash) for _hash in branch] return branch async def transaction_merkle(self, tx_hash, height): @@ -1747,7 +1742,6 @@ class LBRYElectrumX(asyncio.Protocol): return result[tx_hash][1] - def get_from_possible_keys(dictionary, *keys): for key in keys: if key in dictionary: