From 73e239cc5f35755b0f90fbaba216e5c579cbea6a Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 21 May 2021 09:50:47 -0400 Subject: [PATCH] client side hub discovery pub/sub and hub metadata stored, removed old peers implementation --- lbry/conf.py | 53 +- lbry/testcase.py | 16 +- lbry/wallet/network.py | 21 +- lbry/wallet/server/peer.py | 302 ----------- lbry/wallet/server/peers.py | 506 ------------------ lbry/wallet/server/session.py | 38 +- .../blockchain/test_wallet_server_sessions.py | 7 + tests/unit/test_conf.py | 38 +- 8 files changed, 116 insertions(+), 865 deletions(-) delete mode 100644 lbry/wallet/server/peer.py delete mode 100644 lbry/wallet/server/peers.py diff --git a/lbry/conf.py b/lbry/conf.py index e82e59cbc..a77719d5d 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -2,7 +2,7 @@ import os import re import sys import logging -from typing import List, Tuple, Union, TypeVar, Generic, Optional +from typing import List, Dict, Tuple, Union, TypeVar, Generic, Optional from argparse import ArgumentParser from contextlib import contextmanager from appdirs import user_data_dir, user_config_dir @@ -276,46 +276,61 @@ class Strings(ListSetting): class KnownHubsList: def __init__(self, config: 'Config' = None, file_name: str = 'known_hubs.yml'): - self.config = config self.file_name = file_name - self.hubs: List[Tuple[str, int]] = [] - - @property - def path(self): - if self.config: - return os.path.join(self.config.wallet_dir, self.file_name) + self.path = os.path.join(config.wallet_dir, self.file_name) if config else None + self.hubs: Dict[Tuple[str, int], Dict] = {} + if self.exists: + self.load() @property def exists(self): return self.path and os.path.exists(self.path) @property - def serialized(self) -> List[str]: - return [f"{host}:{port}" for host, port in self.hubs] + def serialized(self) -> Dict[str, Dict]: + return {f"{host}:{port}": details for (host, port), details in self.hubs.items()} + + def filter(self, match_none=False, **kwargs): + if not kwargs: + return self.hubs + result = {} + for hub, details in self.hubs.items(): + for key, constraint in kwargs.items(): + value = details.get(key) + if value == constraint or (match_none and value is None): + result[hub] = details + break + return result def load(self): - if self.exists: + if self.path: with open(self.path, 'r') as known_hubs_file: raw = known_hubs_file.read() - for hub in yaml.safe_load(raw) or []: - self.append(hub) + for hub, details in yaml.safe_load(raw).items(): + self.set(hub, details) def save(self): if self.path: with open(self.path, 'w') as known_hubs_file: known_hubs_file.write(yaml.safe_dump(self.serialized, default_flow_style=False)) - def append(self, hub: str): - if hub and ':' in hub: + def set(self, hub: str, details: Dict): + if hub and hub.count(':') == 1: host, port = hub.split(':') hub_parts = (host, int(port)) if hub_parts not in self.hubs: - self.hubs.append(hub_parts) - return hub + self.hubs[hub_parts] = details + return hub - def extend(self, hubs: List[str]): + def add_hubs(self, hubs: List[str]): + added = False for hub in hubs: - self.append(hub) + if self.set(hub, {}) is not None: + added = True + return added + + def items(self): + return self.hubs.items() def __bool__(self): return len(self) > 0 diff --git a/lbry/testcase.py b/lbry/testcase.py index 33d265952..f482fe2c6 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -350,7 +350,11 @@ class CommandTestCase(IntegrationTestCase): server_tmp_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, server_tmp_dir) - self.server_config = Config() + self.server_config = Config( + data_dir=server_tmp_dir, + wallet_dir=server_tmp_dir, + download_dir=server_tmp_dir + ) self.server_config.transaction_cache_size = 10000 self.server_storage = SQLiteStorage(self.server_config, ':memory:') await self.server_storage.open() @@ -387,10 +391,12 @@ class CommandTestCase(IntegrationTestCase): upload_dir = os.path.join(wallet_node.data_path, 'uploads') os.mkdir(upload_dir) - conf = Config() - conf.data_dir = wallet_node.data_path - conf.wallet_dir = wallet_node.data_path - conf.download_dir = wallet_node.data_path + conf = Config( + # needed during instantiation to access known_hubs path + data_dir=wallet_node.data_path, + wallet_dir=wallet_node.data_path, + download_dir=wallet_node.data_path + ) conf.upload_dir = upload_dir # not a real conf setting conf.share_usage_data = False conf.use_upnp = False diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 07b3f7405..32b1d70f9 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -166,9 +166,13 @@ class Network: self._on_status_controller = StreamController(merge_repeated_events=True) self.on_status = self._on_status_controller.stream + self._on_hub_controller = StreamController(merge_repeated_events=True) + self.on_hub = self._on_hub_controller.stream + self.subscription_controllers = { 'blockchain.headers.subscribe': self._on_header_controller, 'blockchain.address.subscribe': self._on_status_controller, + 'blockchain.peers.subscribe': self._on_hub_controller, } self.aiohttp_session: Optional[aiohttp.ClientSession] = None @@ -194,6 +198,7 @@ class Network: self.running = True self.aiohttp_session = aiohttp.ClientSession() self.on_header.listen(self._update_remote_height) + self.on_hub.listen(self._update_hubs) self._loop_task = asyncio.create_task(self.network_loop()) self._urgent_need_reconnect.set() @@ -300,13 +305,7 @@ class Network: features = await client.send_request('server.features', []) self.client, self.server_features = client, features log.debug("discover other hubs %s:%i", *client.server) - peers = await client.send_request('server.peers.get', []) - if peers: - try: - self.known_hubs.extend(peers) - self.known_hubs.save() - except Exception: - log.exception("could not add hub peers: %s", peers) + self._update_hubs(await client.send_request('server.peers.subscribe', [])) log.info("subscribe to headers %s:%i", *client.server) self._update_remote_height((await self.subscribe_headers(),)) self._on_connected_controller.add(True) @@ -376,6 +375,14 @@ class Network: def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] + def _update_hubs(self, hubs): + if hubs: + try: + if self.known_hubs.add_hubs(hubs): + self.known_hubs.save() + except Exception: + log.exception("could not add hubs: %s", hubs) + def get_transaction(self, tx_hash, known_height=None): # use any server if its old, otherwise restrict to who gave us the history restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 diff --git a/lbry/wallet/server/peer.py b/lbry/wallet/server/peer.py deleted file mode 100644 index 078fda9e4..000000000 --- a/lbry/wallet/server/peer.py +++ /dev/null @@ -1,302 +0,0 @@ -# Copyright (c) 2017, Neil Booth -# -# All rights reserved. -# -# The MIT License (MIT) -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -"""Representation of a peer server.""" - -from ipaddress import ip_address - -from lbry.wallet.server import util -from lbry.wallet.server.util import cachedproperty - -from typing import Dict - - -class Peer: - - # Protocol version - ATTRS = ('host', 'features', - # metadata - 'source', 'ip_addr', - 'last_good', 'last_try', 'try_count') - FEATURES = ('pruning', 'server_version', 'protocol_min', 'protocol_max', - 'ssl_port', 'tcp_port') - # This should be set by the application - DEFAULT_PORTS: Dict[str, int] = {} - - def __init__(self, host, features, source='unknown', ip_addr=None, - last_good=0, last_try=0, try_count=0): - """Create a peer given a host name (or IP address as a string), - a dictionary of features, and a record of the source.""" - assert isinstance(host, str) - assert isinstance(features, dict) - assert host in features.get('hosts', {}) - self.host = host - self.features = features.copy() - # Canonicalize / clean-up - for feature in self.FEATURES: - self.features[feature] = getattr(self, feature) - # Metadata - self.source = source - self.ip_addr = ip_addr - # last_good represents the last connection that was - # successful *and* successfully verified, at which point - # try_count is set to 0. Failure to connect or failure to - # verify increment the try_count. - self.last_good = last_good - self.last_try = last_try - self.try_count = try_count - # Transient, non-persisted metadata - self.bad = False - self.other_port_pairs = set() - self.status = 2 - - @classmethod - def peers_from_features(cls, features, source): - peers = [] - if isinstance(features, dict): - hosts = features.get('hosts') - if isinstance(hosts, dict): - peers = [Peer(host, features, source=source) - for host in hosts if isinstance(host, str)] - return peers - - @classmethod - def deserialize(cls, item): - """Deserialize from a dictionary.""" - return cls(**item) - - def matches(self, peers): - """Return peers whose host matches our hostname or IP address. - Additionally include all peers whose IP address matches our - hostname if that is an IP address. - """ - candidates = (self.host.lower(), self.ip_addr) - return [peer for peer in peers - if peer.host.lower() in candidates - or peer.ip_addr == self.host] - - def __str__(self): - return self.host - - def update_features(self, features): - """Update features in-place.""" - try: - tmp = Peer(self.host, features) - except Exception: - pass - else: - self.update_features_from_peer(tmp) - - def update_features_from_peer(self, peer): - if peer != self: - self.features = peer.features - for feature in self.FEATURES: - setattr(self, feature, getattr(peer, feature)) - - def connection_port_pairs(self): - """Return a list of (kind, port) pairs to try when making a - connection.""" - # Use a list not a set - it's important to try the registered - # ports first. - pairs = [('SSL', self.ssl_port), ('TCP', self.tcp_port)] - while self.other_port_pairs: - pairs.append(self.other_port_pairs.pop()) - return [pair for pair in pairs if pair[1]] - - def mark_bad(self): - """Mark as bad to avoid reconnects but also to remember for a - while.""" - self.bad = True - - def check_ports(self, other): - """Remember differing ports in case server operator changed them - or removed one.""" - if other.ssl_port != self.ssl_port: - self.other_port_pairs.add(('SSL', other.ssl_port)) - if other.tcp_port != self.tcp_port: - self.other_port_pairs.add(('TCP', other.tcp_port)) - return bool(self.other_port_pairs) - - @cachedproperty - def is_tor(self): - return self.host.endswith('.onion') - - @cachedproperty - def is_valid(self): - ip = self.ip_address - if ip: - return ((ip.is_global or ip.is_private) - and not (ip.is_multicast or ip.is_unspecified)) - return util.is_valid_hostname(self.host) - - @cachedproperty - def is_public(self): - ip = self.ip_address - if ip: - return self.is_valid and not ip.is_private - else: - return self.is_valid and self.host != 'localhost' - - @cachedproperty - def ip_address(self): - """The host as a python ip_address object, or None.""" - try: - return ip_address(self.host) - except ValueError: - return None - - def bucket(self): - if self.is_tor: - return 'onion' - if not self.ip_addr: - return '' - return tuple(self.ip_addr.split('.')[:2]) - - def serialize(self): - """Serialize to a dictionary.""" - return {attr: getattr(self, attr) for attr in self.ATTRS} - - def _port(self, key): - hosts = self.features.get('hosts') - if isinstance(hosts, dict): - host = hosts.get(self.host) - port = self._integer(key, host) - if port and 0 < port < 65536: - return port - return None - - def _integer(self, key, d=None): - d = d or self.features - result = d.get(key) if isinstance(d, dict) else None - if isinstance(result, str): - try: - result = int(result) - except ValueError: - pass - return result if isinstance(result, int) else None - - def _string(self, key): - result = self.features.get(key) - return result if isinstance(result, str) else None - - @cachedproperty - def genesis_hash(self): - """Returns None if no SSL port, otherwise the port as an integer.""" - return self._string('genesis_hash') - - @cachedproperty - def ssl_port(self): - """Returns None if no SSL port, otherwise the port as an integer.""" - return self._port('ssl_port') - - @cachedproperty - def tcp_port(self): - """Returns None if no TCP port, otherwise the port as an integer.""" - return self._port('tcp_port') - - @cachedproperty - def server_version(self): - """Returns the server version as a string if known, otherwise None.""" - return self._string('server_version') - - @cachedproperty - def pruning(self): - """Returns the pruning level as an integer. None indicates no - pruning.""" - pruning = self._integer('pruning') - if pruning and pruning > 0: - return pruning - return None - - def _protocol_version_string(self, key): - version_str = self.features.get(key) - ptuple = util.protocol_tuple(version_str) - return util.version_string(ptuple) - - @cachedproperty - def protocol_min(self): - """Minimum protocol version as a string, e.g., 1.0""" - return self._protocol_version_string('protocol_min') - - @cachedproperty - def protocol_max(self): - """Maximum protocol version as a string, e.g., 1.1""" - return self._protocol_version_string('protocol_max') - - def to_tuple(self): - """The tuple ((ip, host, details) expected in response - to a peers subscription.""" - details = self.real_name().split()[1:] - return (self.ip_addr or self.host, self.host, details) - - def real_name(self): - """Real name of this peer as used on IRC.""" - def port_text(letter, port): - if port == self.DEFAULT_PORTS.get(letter): - return letter - else: - return letter + str(port) - - parts = [self.host, 'v' + self.protocol_max] - if self.pruning: - parts.append(f'p{self.pruning:d}') - for letter, port in (('s', self.ssl_port), ('t', self.tcp_port)): - if port: - parts.append(port_text(letter, port)) - return ' '.join(parts) - - @classmethod - def from_real_name(cls, real_name, source): - """Real name is a real name as on IRC, such as - - "erbium1.sytes.net v1.0 s t" - - Returns an instance of this Peer class. - """ - host = 'nohost' - features = {} - ports = {} - for n, part in enumerate(real_name.split()): - if n == 0: - host = part - continue - if part[0] in ('s', 't'): - if len(part) == 1: - port = cls.DEFAULT_PORTS[part[0]] - else: - port = part[1:] - if part[0] == 's': - ports['ssl_port'] = port - else: - ports['tcp_port'] = port - elif part[0] == 'v': - features['protocol_max'] = features['protocol_min'] = part[1:] - elif part[0] == 'p': - features['pruning'] = part[1:] - - features.update(ports) - features['hosts'] = {host: ports} - - return cls(host, features, source) diff --git a/lbry/wallet/server/peers.py b/lbry/wallet/server/peers.py deleted file mode 100644 index a65ce07e3..000000000 --- a/lbry/wallet/server/peers.py +++ /dev/null @@ -1,506 +0,0 @@ -# Copyright (c) 2017-2018, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -"""Peer management.""" - -import asyncio -import random -import socket -import ssl -import time -import typing -from asyncio import Event, sleep -from collections import defaultdict, Counter - -from lbry.wallet.tasks import TaskGroup -from lbry.wallet.rpc import ( - Connector, RPCSession, SOCKSProxy, Notification, handler_invocation, - SOCKSError, RPCError -) -from lbry.wallet.server.peer import Peer -from lbry.wallet.server.util import class_logger, protocol_tuple - -PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4) -STALE_SECS = 24 * 3600 -WAKEUP_SECS = 300 - - -class BadPeerError(Exception): - pass - - -def assert_good(message, result, instance): - if not isinstance(result, instance): - raise BadPeerError(f'{message} returned bad result type ' - f'{type(result).__name__}') - - -class PeerSession(RPCSession): - """An outgoing session to a peer.""" - - async def handle_request(self, request): - # We subscribe so might be unlucky enough to get a notification... - if (isinstance(request, Notification) and - request.method == 'blockchain.headers.subscribe'): - pass - else: - await handler_invocation(None, request) # Raises - - -class PeerManager: - """Looks after the DB of peer network servers. - - Attempts to maintain a connection with up to 8 peers. - Issues a 'peers.subscribe' RPC to them and tells them our data. - """ - def __init__(self, env, db): - self.logger = class_logger(__name__, self.__class__.__name__) - # Initialise the Peer class - Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS - self.env = env - self.db = db - - # Our clearnet and Tor Peers, if any - sclass = env.coin.SESSIONCLS - self.myselves = [Peer(ident.host, sclass.server_features(env), 'env') - for ident in env.identities] - self.server_version_args = sclass.server_version_args() - # Peers have one entry per hostname. Once connected, the - # ip_addr property is either None, an onion peer, or the - # IP address that was connected to. Adding a peer will evict - # any other peers with the same host name or IP address. - self.peers: typing.Set[Peer] = set() - self.permit_onion_peer_time = time.time() - self.proxy = None - self.group = TaskGroup() - - def _my_clearnet_peer(self): - """Returns the clearnet peer representing this server, if any.""" - clearnet = [peer for peer in self.myselves if not peer.is_tor] - return clearnet[0] if clearnet else None - - def _set_peer_statuses(self): - """Set peer statuses.""" - cutoff = time.time() - STALE_SECS - for peer in self.peers: - if peer.bad: - peer.status = PEER_BAD - elif peer.last_good > cutoff: - peer.status = PEER_GOOD - elif peer.last_good: - peer.status = PEER_STALE - else: - peer.status = PEER_NEVER - - def _features_to_register(self, peer, remote_peers): - """If we should register ourselves to the remote peer, which has - reported the given list of known peers, return the clearnet - identity features to register, otherwise None. - """ - # Announce ourself if not present. Don't if disabled, we - # are a non-public IP address, or to ourselves. - if not self.env.peer_announce or peer in self.myselves: - return None - my = self._my_clearnet_peer() - if not my or not my.is_public: - return None - # Register if no matches, or ports have changed - for peer in my.matches(remote_peers): - if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port: - return None - return my.features - - def _permit_new_onion_peer(self): - """Accept a new onion peer only once per random time interval.""" - now = time.time() - if now < self.permit_onion_peer_time: - return False - self.permit_onion_peer_time = now + random.randrange(0, 1200) - return True - - async def _import_peers(self): - """Import hard-coded peers from a file or the coin defaults.""" - imported_peers = self.myselves.copy() - # Add the hard-coded ones unless only reporting ourself - if self.env.peer_discovery != self.env.PD_SELF: - imported_peers.extend(Peer.from_real_name(real_name, 'coins.py') - for real_name in self.env.coin.PEERS) - await self._note_peers(imported_peers, limit=None) - - async def _detect_proxy(self): - """Detect a proxy if we don't have one and some time has passed since - the last attempt. - - If found self.proxy is set to a SOCKSProxy instance, otherwise - None. - """ - host = self.env.tor_proxy_host - if self.env.tor_proxy_port is None: - ports = [9050, 9150, 1080] - else: - ports = [self.env.tor_proxy_port] - while True: - self.logger.info(f'trying to detect proxy on "{host}" ' - f'ports {ports}') - proxy = await SOCKSProxy.auto_detect_host(host, ports, None) - if proxy: - self.proxy = proxy - self.logger.info(f'detected {proxy}') - return - self.logger.info('no proxy detected, will try later') - await sleep(900) - - async def _note_peers(self, peers, limit=2, check_ports=False, - source=None): - """Add a limited number of peers that are not already present.""" - new_peers = [] - for peer in peers: - if not peer.is_public or (peer.is_tor and not self.proxy): - continue - - matches = peer.matches(self.peers) - if not matches: - new_peers.append(peer) - elif check_ports: - for match in matches: - if match.check_ports(peer): - self.logger.info(f'ports changed for {peer}') - match.retry_event.set() - - if new_peers: - source = source or new_peers[0].source - if limit: - random.shuffle(new_peers) - use_peers = new_peers[:limit] - else: - use_peers = new_peers - for peer in use_peers: - self.logger.info(f'accepted new peer {peer} from {source}') - peer.retry_event = Event() - self.peers.add(peer) - await self.group.add(self._monitor_peer(peer)) - - async def _monitor_peer(self, peer): - # Stop monitoring if we were dropped (a duplicate peer) - while peer in self.peers: - if await self._should_drop_peer(peer): - self.peers.discard(peer) - break - # Figure out how long to sleep before retrying. Retry a - # good connection when it is about to turn stale, otherwise - # exponentially back off retries. - if peer.try_count == 0: - pause = STALE_SECS - WAKEUP_SECS * 2 - else: - pause = WAKEUP_SECS * 2 ** peer.try_count - pending, done = await asyncio.wait([peer.retry_event.wait()], timeout=pause) - if done: - peer.retry_event.clear() - - async def _should_drop_peer(self, peer): - peer.try_count += 1 - is_good = False - for kind, port in peer.connection_port_pairs(): - peer.last_try = time.time() - - kwargs = {} - if kind == 'SSL': - kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS) - - host = self.env.cs_host(for_rpc=False) - if isinstance(host, list): - host = host[0] - - if self.env.force_proxy or peer.is_tor: - if not self.proxy: - return - kwargs['proxy'] = self.proxy - kwargs['resolve'] = not peer.is_tor - elif host: - # Use our listening Host/IP for outgoing non-proxy - # connections so our peers see the correct source. - kwargs['local_addr'] = (host, None) - - peer_text = f'[{peer}:{port} {kind}]' - try: - async with Connector(PeerSession, peer.host, port, - **kwargs) as session: - await asyncio.wait_for( - self._verify_peer(session, peer), - 120 if peer.is_tor else 30 - ) - is_good = True - break - except BadPeerError as e: - self.logger.error(f'{peer_text} marking bad: ({e})') - peer.mark_bad() - break - except RPCError as e: - self.logger.error(f'{peer_text} RPC error: {e.message} ' - f'({e.code})') - except (OSError, SOCKSError, ConnectionError, asyncio.TimeoutError) as e: - self.logger.info(f'{peer_text} {e}') - - if is_good: - now = time.time() - elapsed = now - peer.last_try - self.logger.info(f'{peer_text} verified in {elapsed:.1f}s') - peer.try_count = 0 - peer.last_good = now - peer.source = 'peer' - # At most 2 matches if we're a host name, potentially - # several if we're an IP address (several instances - # can share a NAT). - matches = peer.matches(self.peers) - for match in matches: - if match.ip_address: - if len(matches) > 1: - self.peers.remove(match) - # Force the peer's monitoring task to exit - match.retry_event.set() - elif peer.host in match.features['hosts']: - match.update_features_from_peer(peer) - else: - # Forget the peer if long-term unreachable - if peer.last_good and not peer.bad: - try_limit = 10 - else: - try_limit = 3 - if peer.try_count >= try_limit: - desc = 'bad' if peer.bad else 'unreachable' - self.logger.info(f'forgetting {desc} peer: {peer}') - return True - return False - - async def _verify_peer(self, session, peer): - if not peer.is_tor: - address = session.peer_address() - if address: - peer.ip_addr = address[0] - - # server.version goes first - message = 'server.version' - result = await session.send_request(message, self.server_version_args) - assert_good(message, result, list) - - # Protocol version 1.1 returns a pair with the version first - if len(result) != 2 or not all(isinstance(x, str) for x in result): - raise BadPeerError(f'bad server.version result: {result}') - server_version, protocol_version = result - peer.server_version = server_version - peer.features['server_version'] = server_version - ptuple = protocol_tuple(protocol_version) - - await asyncio.wait([ - self._send_headers_subscribe(session, peer, ptuple), - self._send_server_features(session, peer), - self._send_peers_subscribe(session, peer) - ]) - - async def _send_headers_subscribe(self, session, peer, ptuple): - message = 'blockchain.headers.subscribe' - result = await session.send_request(message) - assert_good(message, result, dict) - - our_height = self.db.db_height - if ptuple < (1, 3): - their_height = result.get('block_height') - else: - their_height = result.get('height') - if not isinstance(their_height, int): - raise BadPeerError(f'invalid height {their_height}') - if abs(our_height - their_height) > 5: - raise BadPeerError(f'bad height {their_height:,d} ' - f'(ours: {our_height:,d})') - - # Check prior header too in case of hard fork. - check_height = min(our_height, their_height) - raw_header = self.db.raw_header(check_height) - if ptuple >= (1, 4): - ours = raw_header.hex() - message = 'blockchain.block.header' - theirs = await session.send_request(message, [check_height]) - assert_good(message, theirs, str) - if ours != theirs: - raise BadPeerError(f'our header {ours} and ' - f'theirs {theirs} differ') - else: - ours = self.env.coin.electrum_header(raw_header, check_height) - ours = ours.get('prev_block_hash') - message = 'blockchain.block.get_header' - theirs = await session.send_request(message, [check_height]) - assert_good(message, theirs, dict) - theirs = theirs.get('prev_block_hash') - if ours != theirs: - raise BadPeerError(f'our header hash {ours} and ' - f'theirs {theirs} differ') - - async def _send_server_features(self, session, peer): - message = 'server.features' - features = await session.send_request(message) - assert_good(message, features, dict) - hosts = [host.lower() for host in features.get('hosts', {})] - if self.env.coin.GENESIS_HASH != features.get('genesis_hash'): - raise BadPeerError('incorrect genesis hash') - elif peer.host.lower() in hosts: - peer.update_features(features) - else: - raise BadPeerError(f'not listed in own hosts list {hosts}') - - async def _send_peers_subscribe(self, session, peer): - message = 'server.peers.subscribe' - raw_peers = await session.send_request(message) - assert_good(message, raw_peers, list) - - # Check the peers list we got from a remote peer. - # Each is expected to be of the form: - # [ip_addr, hostname, ['v1.0', 't51001', 's51002']] - # Call add_peer if the remote doesn't appear to know about us. - try: - real_names = [' '.join([u[1]] + u[2]) for u in raw_peers] - peers = [Peer.from_real_name(real_name, str(peer)) - for real_name in real_names] - except Exception: - raise BadPeerError('bad server.peers.subscribe response') - - await self._note_peers(peers) - features = self._features_to_register(peer, peers) - if not features: - return - self.logger.info(f'registering ourself with {peer}') - # We only care to wait for the response - await session.send_request('server.add_peer', [features]) - - # - # External interface - # - async def discover_peers(self): - """Perform peer maintenance. This includes - - 1) Forgetting unreachable peers. - 2) Verifying connectivity of new peers. - 3) Retrying old peers at regular intervals. - """ - if self.env.peer_discovery != self.env.PD_ON: - self.logger.info('peer discovery is disabled') - return - - self.logger.info(f'beginning peer discovery. Force use of ' - f'proxy: {self.env.force_proxy}') - - self.group.add(self._detect_proxy()) - self.group.add(self._import_peers()) - - def info(self) -> typing.Dict[str, int]: - """The number of peers.""" - self._set_peer_statuses() - counter = Counter(peer.status for peer in self.peers) - return { - 'bad': counter[PEER_BAD], - 'good': counter[PEER_GOOD], - 'never': counter[PEER_NEVER], - 'stale': counter[PEER_STALE], - 'total': len(self.peers), - } - - async def add_localRPC_peer(self, real_name): - """Add a peer passed by the admin over LocalRPC.""" - await self._note_peers([Peer.from_real_name(real_name, 'RPC')]) - - async def on_add_peer(self, features, source_info): - """Add a peer (but only if the peer resolves to the source).""" - if not source_info: - self.logger.info('ignored add_peer request: no source info') - return False - source = source_info[0] - peers = Peer.peers_from_features(features, source) - if not peers: - self.logger.info('ignored add_peer request: no peers given') - return False - - # Just look at the first peer, require it - peer = peers[0] - host = peer.host - if peer.is_tor: - permit = self._permit_new_onion_peer() - reason = 'rate limiting' - else: - getaddrinfo = asyncio.get_event_loop().getaddrinfo - try: - infos = await getaddrinfo(host, 80, type=socket.SOCK_STREAM) - except socket.gaierror: - permit = False - reason = 'address resolution failure' - else: - permit = any(source == info[-1][0] for info in infos) - reason = 'source-destination mismatch' - - if permit: - self.logger.info(f'accepted add_peer request from {source} ' - f'for {host}') - await self._note_peers([peer], check_ports=True) - else: - self.logger.warning(f'rejected add_peer request from {source} ' - f'for {host} ({reason})') - - return permit - - def on_peers_subscribe(self, is_tor): - """Returns the server peers as a list of (ip, host, details) tuples. - - We return all peers we've connected to in the last day. - Additionally, if we don't have onion routing, we return a few - hard-coded onion servers. - """ - cutoff = time.time() - STALE_SECS - recent = [peer for peer in self.peers - if peer.last_good > cutoff and - not peer.bad and peer.is_public] - onion_peers = [] - - # Always report ourselves if valid (even if not public) - peers = {myself for myself in self.myselves - if myself.last_good > cutoff} - - # Bucket the clearnet peers and select up to two from each - buckets = defaultdict(list) - for peer in recent: - if peer.is_tor: - onion_peers.append(peer) - else: - buckets[peer.bucket()].append(peer) - for bucket_peers in buckets.values(): - random.shuffle(bucket_peers) - peers.update(bucket_peers[:2]) - - # Add up to 20% onion peers (but up to 10 is OK anyway) - random.shuffle(onion_peers) - max_onion = 50 if is_tor else max(10, len(peers) // 4) - - peers.update(onion_peers[:max_onion]) - - return [peer.to_tuple() for peer in peers] - - def proxy_peername(self): - """Return the peername of the proxy, if there is a proxy, otherwise - None.""" - return self.proxy.peername if self.proxy else None - - def rpc_data(self): - """Peer data for the peers RPC method.""" - self._set_peer_statuses() - descs = ['good', 'stale', 'never', 'bad'] - - def peer_data(peer): - data = peer.serialize() - data['status'] = descs[peer.status] - return data - - def peer_key(peer): - return (peer.bad, -peer.last_good) - - return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)] diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 15da6f7d2..f55120eea 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -39,7 +39,6 @@ from lbry.wallet.server import text from lbry.wallet.server import util from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error from lbry.wallet.server.daemon import DaemonError -from lbry.wallet.server.peers import PeerManager if typing.TYPE_CHECKING: from lbry.wallet.server.env import Env from lbry.wallet.server.mempool import MemPool @@ -185,7 +184,6 @@ class SessionManager: self.bp = bp self.daemon = daemon self.mempool = mempool - self.peer_mgr = PeerManager(env, db) self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers: typing.Dict[str, asyncio.AbstractServer] = {} @@ -209,7 +207,7 @@ class SessionManager: else: protocol_class = self.env.coin.SESSIONCLS protocol_factory = partial(protocol_class, self, self.db, - self.mempool, self.peer_mgr, kind) + self.mempool, kind) host, port = args[:2] try: @@ -369,7 +367,7 @@ class SessionManager: 'logged': logged, 'paused': paused, 'pid': os.getpid(), - 'peers': self.peer_mgr.info(), + 'peers': [], 'requests': pending_requests, 'method_counts': method_counts, 'sessions': self.session_count(), @@ -436,7 +434,7 @@ class SessionManager: real_name: "bch.electrumx.cash t50001 s50002" for example """ - await self.peer_mgr.add_localRPC_peer(real_name) + await self._notify_peer(real_name) return f"peer '{real_name}' added" async def rpc_disconnect(self, session_ids): @@ -487,7 +485,7 @@ class SessionManager: async def rpc_peers(self): """Return a list of data about server peers.""" - return self.peer_mgr.rpc_data() + return self.env.peer_hubs async def rpc_query(self, items, limit): """Return a list of data about server peers.""" @@ -578,7 +576,6 @@ class SessionManager: # Peer discovery should start after the external servers # because we connect to ourself await asyncio.wait([ - self.peer_mgr.discover_peers(), self._clear_stale_sessions(), self._log_sessions(), self._manage_servers() @@ -638,6 +635,15 @@ class SessionManager: self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) return self.history_cache[hashX] + def _notify_peer(self, peer): + notify_tasks = [ + session.send_notification('blockchain.peers.subscribe', [peer]) + for session in self.sessions.values() if session.subscribe_peers + ] + if notify_tasks: + self.logger.info(f'notify {len(notify_tasks)} sessions of new peers') + asyncio.create_task(asyncio.wait(notify_tasks)) + async def _notify_sessions(self, height, touched, new_touched): """Notify sessions about height changes and touched addresses.""" height_changed = height != self.notified_height @@ -704,7 +710,7 @@ class SessionBase(RPCSession): request_handlers: typing.Dict[str, typing.Callable] = {} version = '0.5.7' - def __init__(self, session_mgr, db, mempool, peer_mgr, kind): + def __init__(self, session_mgr, db, mempool, kind): connection = JSONRPCConnection(JSONRPCAutoDetect) self.env = session_mgr.env super().__init__(connection=connection) @@ -712,7 +718,6 @@ class SessionBase(RPCSession): self.session_mgr = session_mgr self.db = db self.mempool = mempool - self.peer_mgr = peer_mgr self.kind = kind # 'RPC', 'TCP' etc. self.coin = self.env.coin self.anon_logs = self.env.anon_logs @@ -878,7 +883,6 @@ class LBRYElectrumX(SessionBase): 'server.payment_address': cls.payment_address, 'server.donation_address': cls.donation_address, 'server.features': cls.server_features_async, - 'server.peers.get': cls.peers_get, 'server.peers.subscribe': cls.peers_subscribe, 'server.version': cls.server_version, 'blockchain.transaction.get_height': cls.transaction_get_height, @@ -905,6 +909,7 @@ class LBRYElectrumX(SessionBase): LBRYElectrumX.set_server_features(self.env) self.subscribe_headers = False self.subscribe_headers_raw = False + self.subscribe_peers = False self.connection.max_response_size = self.env.max_send self.hashX_subs = {} self.sv_seen = False @@ -1083,13 +1088,10 @@ class LBRYElectrumX(SessionBase): """Add a peer (but only if the peer resolves to the source).""" return await self.peer_mgr.on_add_peer(features, self.peer_address()) - async def peers_get(self): - """Return the server peers as a list of (ip, host, details) tuples.""" - return self.env.peer_hubs - async def peers_subscribe(self): """Return the server peers as a list of (ip, host, details) tuples.""" - return self.peer_mgr.on_peers_subscribe(self.is_tor()) + self.subscribe_peers = True + return self.env.peer_hubs async def address_status(self, hashX): """Returns an address status. @@ -1332,11 +1334,7 @@ class LBRYElectrumX(SessionBase): async def banner(self): """Return the server banner text.""" banner = f'You are connected to an {self.version} server.' - - if self.is_tor(): - banner_file = self.env.tor_banner_file - else: - banner_file = self.env.banner_file + banner_file = self.env.banner_file if banner_file: try: with codecs.open(banner_file, 'r', 'utf-8') as f: diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 2b24c0040..d3f3e131f 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -147,3 +147,10 @@ class TestHubDiscovery(CommandTestCase): self.assertEqual( self.daemon.ledger.network.client.server_address_and_port, ('127.0.0.1', final_node.port) ) + + final_node.server.session_mgr._notify_peer('127.0.0.1:9988') + self.assertEqual(list(self.daemon.conf.known_hubs), [(final_node.hostname, final_node.port)]) + await self.daemon.ledger.network.on_hub.first + self.assertEqual(list(self.daemon.conf.known_hubs), [ + (final_node.hostname, final_node.port), ('127.0.0.1', 9988) + ]) diff --git a/tests/unit/test_conf.py b/tests/unit/test_conf.py index 4710219cc..3650f8844 100644 --- a/tests/unit/test_conf.py +++ b/tests/unit/test_conf.py @@ -258,9 +258,35 @@ class ConfigurationTests(unittest.TestCase): def test_known_hubs_list(self): with tempfile.TemporaryDirectory() as temp_dir: - c1 = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir) - self.assertEqual(list(c1.known_hubs), []) - c1.known_hubs.append('new.hub.io:99') - c1.known_hubs.save() - c2 = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir) - self.assertEqual(list(c2.known_hubs), [('new.hub.io', 99)]) + hubs = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir).known_hubs + + self.assertEqual(hubs.serialized, {}) + self.assertEqual(list(hubs), []) + self.assertFalse(hubs) + hubs.set('new.hub.io:99', {'jurisdiction': 'us'}) + self.assertTrue(hubs) + + self.assertFalse(hubs.exists) + hubs.save() + self.assertTrue(hubs.exists) + + hubs = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir).known_hubs + self.assertEqual(list(hubs), [('new.hub.io', 99)]) + self.assertEqual(hubs.serialized, {'new.hub.io:99': {'jurisdiction': 'us'}}) + + hubs.set('any.hub.io:99', {}) + hubs.set('oth.hub.io:99', {'jurisdiction': 'other'}) + self.assertEqual(list(hubs), [('new.hub.io', 99), ('any.hub.io', 99), ('oth.hub.io', 99)]) + self.assertEqual(hubs.filter(), { + ('new.hub.io', 99): {'jurisdiction': 'us'}, + ('oth.hub.io', 99): {'jurisdiction': 'other'}, + ('any.hub.io', 99): {} + }) + self.assertEqual(hubs.filter(foo="bar"), {}) + self.assertEqual(hubs.filter(jurisdiction="us"), { + ('new.hub.io', 99): {'jurisdiction': 'us'} + }) + self.assertEqual(hubs.filter(jurisdiction="us", match_none=True), { + ('new.hub.io', 99): {'jurisdiction': 'us'}, + ('any.hub.io', 99): {} + })