client side hub discovery pub/sub and hub metadata stored, removed old peers implementation

This commit is contained in:
Lex Berezhny 2021-05-21 09:50:47 -04:00
parent ad670f721a
commit 73e239cc5f
8 changed files with 116 additions and 865 deletions

View file

@ -2,7 +2,7 @@ import os
import re import re
import sys import sys
import logging import logging
from typing import List, Tuple, Union, TypeVar, Generic, Optional from typing import List, Dict, Tuple, Union, TypeVar, Generic, Optional
from argparse import ArgumentParser from argparse import ArgumentParser
from contextlib import contextmanager from contextlib import contextmanager
from appdirs import user_data_dir, user_config_dir from appdirs import user_data_dir, user_config_dir
@ -276,46 +276,61 @@ class Strings(ListSetting):
class KnownHubsList: class KnownHubsList:
def __init__(self, config: 'Config' = None, file_name: str = 'known_hubs.yml'): def __init__(self, config: 'Config' = None, file_name: str = 'known_hubs.yml'):
self.config = config
self.file_name = file_name self.file_name = file_name
self.hubs: List[Tuple[str, int]] = [] self.path = os.path.join(config.wallet_dir, self.file_name) if config else None
self.hubs: Dict[Tuple[str, int], Dict] = {}
@property if self.exists:
def path(self): self.load()
if self.config:
return os.path.join(self.config.wallet_dir, self.file_name)
@property @property
def exists(self): def exists(self):
return self.path and os.path.exists(self.path) return self.path and os.path.exists(self.path)
@property @property
def serialized(self) -> List[str]: def serialized(self) -> Dict[str, Dict]:
return [f"{host}:{port}" for host, port in self.hubs] return {f"{host}:{port}": details for (host, port), details in self.hubs.items()}
def filter(self, match_none=False, **kwargs):
if not kwargs:
return self.hubs
result = {}
for hub, details in self.hubs.items():
for key, constraint in kwargs.items():
value = details.get(key)
if value == constraint or (match_none and value is None):
result[hub] = details
break
return result
def load(self): def load(self):
if self.exists: if self.path:
with open(self.path, 'r') as known_hubs_file: with open(self.path, 'r') as known_hubs_file:
raw = known_hubs_file.read() raw = known_hubs_file.read()
for hub in yaml.safe_load(raw) or []: for hub, details in yaml.safe_load(raw).items():
self.append(hub) self.set(hub, details)
def save(self): def save(self):
if self.path: if self.path:
with open(self.path, 'w') as known_hubs_file: with open(self.path, 'w') as known_hubs_file:
known_hubs_file.write(yaml.safe_dump(self.serialized, default_flow_style=False)) known_hubs_file.write(yaml.safe_dump(self.serialized, default_flow_style=False))
def append(self, hub: str): def set(self, hub: str, details: Dict):
if hub and ':' in hub: if hub and hub.count(':') == 1:
host, port = hub.split(':') host, port = hub.split(':')
hub_parts = (host, int(port)) hub_parts = (host, int(port))
if hub_parts not in self.hubs: if hub_parts not in self.hubs:
self.hubs.append(hub_parts) self.hubs[hub_parts] = details
return hub return hub
def extend(self, hubs: List[str]): def add_hubs(self, hubs: List[str]):
added = False
for hub in hubs: for hub in hubs:
self.append(hub) if self.set(hub, {}) is not None:
added = True
return added
def items(self):
return self.hubs.items()
def __bool__(self): def __bool__(self):
return len(self) > 0 return len(self) > 0

View file

@ -350,7 +350,11 @@ class CommandTestCase(IntegrationTestCase):
server_tmp_dir = tempfile.mkdtemp() server_tmp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, server_tmp_dir) self.addCleanup(shutil.rmtree, server_tmp_dir)
self.server_config = Config() self.server_config = Config(
data_dir=server_tmp_dir,
wallet_dir=server_tmp_dir,
download_dir=server_tmp_dir
)
self.server_config.transaction_cache_size = 10000 self.server_config.transaction_cache_size = 10000
self.server_storage = SQLiteStorage(self.server_config, ':memory:') self.server_storage = SQLiteStorage(self.server_config, ':memory:')
await self.server_storage.open() await self.server_storage.open()
@ -387,10 +391,12 @@ class CommandTestCase(IntegrationTestCase):
upload_dir = os.path.join(wallet_node.data_path, 'uploads') upload_dir = os.path.join(wallet_node.data_path, 'uploads')
os.mkdir(upload_dir) os.mkdir(upload_dir)
conf = Config() conf = Config(
conf.data_dir = wallet_node.data_path # needed during instantiation to access known_hubs path
conf.wallet_dir = wallet_node.data_path data_dir=wallet_node.data_path,
conf.download_dir = wallet_node.data_path wallet_dir=wallet_node.data_path,
download_dir=wallet_node.data_path
)
conf.upload_dir = upload_dir # not a real conf setting conf.upload_dir = upload_dir # not a real conf setting
conf.share_usage_data = False conf.share_usage_data = False
conf.use_upnp = False conf.use_upnp = False

View file

@ -166,9 +166,13 @@ class Network:
self._on_status_controller = StreamController(merge_repeated_events=True) self._on_status_controller = StreamController(merge_repeated_events=True)
self.on_status = self._on_status_controller.stream self.on_status = self._on_status_controller.stream
self._on_hub_controller = StreamController(merge_repeated_events=True)
self.on_hub = self._on_hub_controller.stream
self.subscription_controllers = { self.subscription_controllers = {
'blockchain.headers.subscribe': self._on_header_controller, 'blockchain.headers.subscribe': self._on_header_controller,
'blockchain.address.subscribe': self._on_status_controller, 'blockchain.address.subscribe': self._on_status_controller,
'blockchain.peers.subscribe': self._on_hub_controller,
} }
self.aiohttp_session: Optional[aiohttp.ClientSession] = None self.aiohttp_session: Optional[aiohttp.ClientSession] = None
@ -194,6 +198,7 @@ class Network:
self.running = True self.running = True
self.aiohttp_session = aiohttp.ClientSession() self.aiohttp_session = aiohttp.ClientSession()
self.on_header.listen(self._update_remote_height) self.on_header.listen(self._update_remote_height)
self.on_hub.listen(self._update_hubs)
self._loop_task = asyncio.create_task(self.network_loop()) self._loop_task = asyncio.create_task(self.network_loop())
self._urgent_need_reconnect.set() self._urgent_need_reconnect.set()
@ -300,13 +305,7 @@ class Network:
features = await client.send_request('server.features', []) features = await client.send_request('server.features', [])
self.client, self.server_features = client, features self.client, self.server_features = client, features
log.debug("discover other hubs %s:%i", *client.server) log.debug("discover other hubs %s:%i", *client.server)
peers = await client.send_request('server.peers.get', []) self._update_hubs(await client.send_request('server.peers.subscribe', []))
if peers:
try:
self.known_hubs.extend(peers)
self.known_hubs.save()
except Exception:
log.exception("could not add hub peers: %s", peers)
log.info("subscribe to headers %s:%i", *client.server) log.info("subscribe to headers %s:%i", *client.server)
self._update_remote_height((await self.subscribe_headers(),)) self._update_remote_height((await self.subscribe_headers(),))
self._on_connected_controller.add(True) self._on_connected_controller.add(True)
@ -376,6 +375,14 @@ class Network:
def _update_remote_height(self, header_args): def _update_remote_height(self, header_args):
self.remote_height = header_args[0]["height"] self.remote_height = header_args[0]["height"]
def _update_hubs(self, hubs):
if hubs:
try:
if self.known_hubs.add_hubs(hubs):
self.known_hubs.save()
except Exception:
log.exception("could not add hubs: %s", hubs)
def get_transaction(self, tx_hash, known_height=None): def get_transaction(self, tx_hash, known_height=None):
# use any server if its old, otherwise restrict to who gave us the history # use any server if its old, otherwise restrict to who gave us the history
restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10 restricted = known_height in (None, -1, 0) or 0 > known_height > self.remote_height - 10

View file

@ -1,302 +0,0 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# The MIT License (MIT)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Representation of a peer server."""
from ipaddress import ip_address
from lbry.wallet.server import util
from lbry.wallet.server.util import cachedproperty
from typing import Dict
class Peer:
# Protocol version
ATTRS = ('host', 'features',
# metadata
'source', 'ip_addr',
'last_good', 'last_try', 'try_count')
FEATURES = ('pruning', 'server_version', 'protocol_min', 'protocol_max',
'ssl_port', 'tcp_port')
# This should be set by the application
DEFAULT_PORTS: Dict[str, int] = {}
def __init__(self, host, features, source='unknown', ip_addr=None,
last_good=0, last_try=0, try_count=0):
"""Create a peer given a host name (or IP address as a string),
a dictionary of features, and a record of the source."""
assert isinstance(host, str)
assert isinstance(features, dict)
assert host in features.get('hosts', {})
self.host = host
self.features = features.copy()
# Canonicalize / clean-up
for feature in self.FEATURES:
self.features[feature] = getattr(self, feature)
# Metadata
self.source = source
self.ip_addr = ip_addr
# last_good represents the last connection that was
# successful *and* successfully verified, at which point
# try_count is set to 0. Failure to connect or failure to
# verify increment the try_count.
self.last_good = last_good
self.last_try = last_try
self.try_count = try_count
# Transient, non-persisted metadata
self.bad = False
self.other_port_pairs = set()
self.status = 2
@classmethod
def peers_from_features(cls, features, source):
peers = []
if isinstance(features, dict):
hosts = features.get('hosts')
if isinstance(hosts, dict):
peers = [Peer(host, features, source=source)
for host in hosts if isinstance(host, str)]
return peers
@classmethod
def deserialize(cls, item):
"""Deserialize from a dictionary."""
return cls(**item)
def matches(self, peers):
"""Return peers whose host matches our hostname or IP address.
Additionally include all peers whose IP address matches our
hostname if that is an IP address.
"""
candidates = (self.host.lower(), self.ip_addr)
return [peer for peer in peers
if peer.host.lower() in candidates
or peer.ip_addr == self.host]
def __str__(self):
return self.host
def update_features(self, features):
"""Update features in-place."""
try:
tmp = Peer(self.host, features)
except Exception:
pass
else:
self.update_features_from_peer(tmp)
def update_features_from_peer(self, peer):
if peer != self:
self.features = peer.features
for feature in self.FEATURES:
setattr(self, feature, getattr(peer, feature))
def connection_port_pairs(self):
"""Return a list of (kind, port) pairs to try when making a
connection."""
# Use a list not a set - it's important to try the registered
# ports first.
pairs = [('SSL', self.ssl_port), ('TCP', self.tcp_port)]
while self.other_port_pairs:
pairs.append(self.other_port_pairs.pop())
return [pair for pair in pairs if pair[1]]
def mark_bad(self):
"""Mark as bad to avoid reconnects but also to remember for a
while."""
self.bad = True
def check_ports(self, other):
"""Remember differing ports in case server operator changed them
or removed one."""
if other.ssl_port != self.ssl_port:
self.other_port_pairs.add(('SSL', other.ssl_port))
if other.tcp_port != self.tcp_port:
self.other_port_pairs.add(('TCP', other.tcp_port))
return bool(self.other_port_pairs)
@cachedproperty
def is_tor(self):
return self.host.endswith('.onion')
@cachedproperty
def is_valid(self):
ip = self.ip_address
if ip:
return ((ip.is_global or ip.is_private)
and not (ip.is_multicast or ip.is_unspecified))
return util.is_valid_hostname(self.host)
@cachedproperty
def is_public(self):
ip = self.ip_address
if ip:
return self.is_valid and not ip.is_private
else:
return self.is_valid and self.host != 'localhost'
@cachedproperty
def ip_address(self):
"""The host as a python ip_address object, or None."""
try:
return ip_address(self.host)
except ValueError:
return None
def bucket(self):
if self.is_tor:
return 'onion'
if not self.ip_addr:
return ''
return tuple(self.ip_addr.split('.')[:2])
def serialize(self):
"""Serialize to a dictionary."""
return {attr: getattr(self, attr) for attr in self.ATTRS}
def _port(self, key):
hosts = self.features.get('hosts')
if isinstance(hosts, dict):
host = hosts.get(self.host)
port = self._integer(key, host)
if port and 0 < port < 65536:
return port
return None
def _integer(self, key, d=None):
d = d or self.features
result = d.get(key) if isinstance(d, dict) else None
if isinstance(result, str):
try:
result = int(result)
except ValueError:
pass
return result if isinstance(result, int) else None
def _string(self, key):
result = self.features.get(key)
return result if isinstance(result, str) else None
@cachedproperty
def genesis_hash(self):
"""Returns None if no SSL port, otherwise the port as an integer."""
return self._string('genesis_hash')
@cachedproperty
def ssl_port(self):
"""Returns None if no SSL port, otherwise the port as an integer."""
return self._port('ssl_port')
@cachedproperty
def tcp_port(self):
"""Returns None if no TCP port, otherwise the port as an integer."""
return self._port('tcp_port')
@cachedproperty
def server_version(self):
"""Returns the server version as a string if known, otherwise None."""
return self._string('server_version')
@cachedproperty
def pruning(self):
"""Returns the pruning level as an integer. None indicates no
pruning."""
pruning = self._integer('pruning')
if pruning and pruning > 0:
return pruning
return None
def _protocol_version_string(self, key):
version_str = self.features.get(key)
ptuple = util.protocol_tuple(version_str)
return util.version_string(ptuple)
@cachedproperty
def protocol_min(self):
"""Minimum protocol version as a string, e.g., 1.0"""
return self._protocol_version_string('protocol_min')
@cachedproperty
def protocol_max(self):
"""Maximum protocol version as a string, e.g., 1.1"""
return self._protocol_version_string('protocol_max')
def to_tuple(self):
"""The tuple ((ip, host, details) expected in response
to a peers subscription."""
details = self.real_name().split()[1:]
return (self.ip_addr or self.host, self.host, details)
def real_name(self):
"""Real name of this peer as used on IRC."""
def port_text(letter, port):
if port == self.DEFAULT_PORTS.get(letter):
return letter
else:
return letter + str(port)
parts = [self.host, 'v' + self.protocol_max]
if self.pruning:
parts.append(f'p{self.pruning:d}')
for letter, port in (('s', self.ssl_port), ('t', self.tcp_port)):
if port:
parts.append(port_text(letter, port))
return ' '.join(parts)
@classmethod
def from_real_name(cls, real_name, source):
"""Real name is a real name as on IRC, such as
"erbium1.sytes.net v1.0 s t"
Returns an instance of this Peer class.
"""
host = 'nohost'
features = {}
ports = {}
for n, part in enumerate(real_name.split()):
if n == 0:
host = part
continue
if part[0] in ('s', 't'):
if len(part) == 1:
port = cls.DEFAULT_PORTS[part[0]]
else:
port = part[1:]
if part[0] == 's':
ports['ssl_port'] = port
else:
ports['tcp_port'] = port
elif part[0] == 'v':
features['protocol_max'] = features['protocol_min'] = part[1:]
elif part[0] == 'p':
features['pruning'] = part[1:]
features.update(ports)
features['hosts'] = {host: ports}
return cls(host, features, source)

View file

@ -1,506 +0,0 @@
# Copyright (c) 2017-2018, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
"""Peer management."""
import asyncio
import random
import socket
import ssl
import time
import typing
from asyncio import Event, sleep
from collections import defaultdict, Counter
from lbry.wallet.tasks import TaskGroup
from lbry.wallet.rpc import (
Connector, RPCSession, SOCKSProxy, Notification, handler_invocation,
SOCKSError, RPCError
)
from lbry.wallet.server.peer import Peer
from lbry.wallet.server.util import class_logger, protocol_tuple
PEER_GOOD, PEER_STALE, PEER_NEVER, PEER_BAD = range(4)
STALE_SECS = 24 * 3600
WAKEUP_SECS = 300
class BadPeerError(Exception):
pass
def assert_good(message, result, instance):
if not isinstance(result, instance):
raise BadPeerError(f'{message} returned bad result type '
f'{type(result).__name__}')
class PeerSession(RPCSession):
"""An outgoing session to a peer."""
async def handle_request(self, request):
# We subscribe so might be unlucky enough to get a notification...
if (isinstance(request, Notification) and
request.method == 'blockchain.headers.subscribe'):
pass
else:
await handler_invocation(None, request) # Raises
class PeerManager:
"""Looks after the DB of peer network servers.
Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data.
"""
def __init__(self, env, db):
self.logger = class_logger(__name__, self.__class__.__name__)
# Initialise the Peer class
Peer.DEFAULT_PORTS = env.coin.PEER_DEFAULT_PORTS
self.env = env
self.db = db
# Our clearnet and Tor Peers, if any
sclass = env.coin.SESSIONCLS
self.myselves = [Peer(ident.host, sclass.server_features(env), 'env')
for ident in env.identities]
self.server_version_args = sclass.server_version_args()
# Peers have one entry per hostname. Once connected, the
# ip_addr property is either None, an onion peer, or the
# IP address that was connected to. Adding a peer will evict
# any other peers with the same host name or IP address.
self.peers: typing.Set[Peer] = set()
self.permit_onion_peer_time = time.time()
self.proxy = None
self.group = TaskGroup()
def _my_clearnet_peer(self):
"""Returns the clearnet peer representing this server, if any."""
clearnet = [peer for peer in self.myselves if not peer.is_tor]
return clearnet[0] if clearnet else None
def _set_peer_statuses(self):
"""Set peer statuses."""
cutoff = time.time() - STALE_SECS
for peer in self.peers:
if peer.bad:
peer.status = PEER_BAD
elif peer.last_good > cutoff:
peer.status = PEER_GOOD
elif peer.last_good:
peer.status = PEER_STALE
else:
peer.status = PEER_NEVER
def _features_to_register(self, peer, remote_peers):
"""If we should register ourselves to the remote peer, which has
reported the given list of known peers, return the clearnet
identity features to register, otherwise None.
"""
# Announce ourself if not present. Don't if disabled, we
# are a non-public IP address, or to ourselves.
if not self.env.peer_announce or peer in self.myselves:
return None
my = self._my_clearnet_peer()
if not my or not my.is_public:
return None
# Register if no matches, or ports have changed
for peer in my.matches(remote_peers):
if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port:
return None
return my.features
def _permit_new_onion_peer(self):
"""Accept a new onion peer only once per random time interval."""
now = time.time()
if now < self.permit_onion_peer_time:
return False
self.permit_onion_peer_time = now + random.randrange(0, 1200)
return True
async def _import_peers(self):
"""Import hard-coded peers from a file or the coin defaults."""
imported_peers = self.myselves.copy()
# Add the hard-coded ones unless only reporting ourself
if self.env.peer_discovery != self.env.PD_SELF:
imported_peers.extend(Peer.from_real_name(real_name, 'coins.py')
for real_name in self.env.coin.PEERS)
await self._note_peers(imported_peers, limit=None)
async def _detect_proxy(self):
"""Detect a proxy if we don't have one and some time has passed since
the last attempt.
If found self.proxy is set to a SOCKSProxy instance, otherwise
None.
"""
host = self.env.tor_proxy_host
if self.env.tor_proxy_port is None:
ports = [9050, 9150, 1080]
else:
ports = [self.env.tor_proxy_port]
while True:
self.logger.info(f'trying to detect proxy on "{host}" '
f'ports {ports}')
proxy = await SOCKSProxy.auto_detect_host(host, ports, None)
if proxy:
self.proxy = proxy
self.logger.info(f'detected {proxy}')
return
self.logger.info('no proxy detected, will try later')
await sleep(900)
async def _note_peers(self, peers, limit=2, check_ports=False,
source=None):
"""Add a limited number of peers that are not already present."""
new_peers = []
for peer in peers:
if not peer.is_public or (peer.is_tor and not self.proxy):
continue
matches = peer.matches(self.peers)
if not matches:
new_peers.append(peer)
elif check_ports:
for match in matches:
if match.check_ports(peer):
self.logger.info(f'ports changed for {peer}')
match.retry_event.set()
if new_peers:
source = source or new_peers[0].source
if limit:
random.shuffle(new_peers)
use_peers = new_peers[:limit]
else:
use_peers = new_peers
for peer in use_peers:
self.logger.info(f'accepted new peer {peer} from {source}')
peer.retry_event = Event()
self.peers.add(peer)
await self.group.add(self._monitor_peer(peer))
async def _monitor_peer(self, peer):
# Stop monitoring if we were dropped (a duplicate peer)
while peer in self.peers:
if await self._should_drop_peer(peer):
self.peers.discard(peer)
break
# Figure out how long to sleep before retrying. Retry a
# good connection when it is about to turn stale, otherwise
# exponentially back off retries.
if peer.try_count == 0:
pause = STALE_SECS - WAKEUP_SECS * 2
else:
pause = WAKEUP_SECS * 2 ** peer.try_count
pending, done = await asyncio.wait([peer.retry_event.wait()], timeout=pause)
if done:
peer.retry_event.clear()
async def _should_drop_peer(self, peer):
peer.try_count += 1
is_good = False
for kind, port in peer.connection_port_pairs():
peer.last_try = time.time()
kwargs = {}
if kind == 'SSL':
kwargs['ssl'] = ssl.SSLContext(ssl.PROTOCOL_TLS)
host = self.env.cs_host(for_rpc=False)
if isinstance(host, list):
host = host[0]
if self.env.force_proxy or peer.is_tor:
if not self.proxy:
return
kwargs['proxy'] = self.proxy
kwargs['resolve'] = not peer.is_tor
elif host:
# Use our listening Host/IP for outgoing non-proxy
# connections so our peers see the correct source.
kwargs['local_addr'] = (host, None)
peer_text = f'[{peer}:{port} {kind}]'
try:
async with Connector(PeerSession, peer.host, port,
**kwargs) as session:
await asyncio.wait_for(
self._verify_peer(session, peer),
120 if peer.is_tor else 30
)
is_good = True
break
except BadPeerError as e:
self.logger.error(f'{peer_text} marking bad: ({e})')
peer.mark_bad()
break
except RPCError as e:
self.logger.error(f'{peer_text} RPC error: {e.message} '
f'({e.code})')
except (OSError, SOCKSError, ConnectionError, asyncio.TimeoutError) as e:
self.logger.info(f'{peer_text} {e}')
if is_good:
now = time.time()
elapsed = now - peer.last_try
self.logger.info(f'{peer_text} verified in {elapsed:.1f}s')
peer.try_count = 0
peer.last_good = now
peer.source = 'peer'
# At most 2 matches if we're a host name, potentially
# several if we're an IP address (several instances
# can share a NAT).
matches = peer.matches(self.peers)
for match in matches:
if match.ip_address:
if len(matches) > 1:
self.peers.remove(match)
# Force the peer's monitoring task to exit
match.retry_event.set()
elif peer.host in match.features['hosts']:
match.update_features_from_peer(peer)
else:
# Forget the peer if long-term unreachable
if peer.last_good and not peer.bad:
try_limit = 10
else:
try_limit = 3
if peer.try_count >= try_limit:
desc = 'bad' if peer.bad else 'unreachable'
self.logger.info(f'forgetting {desc} peer: {peer}')
return True
return False
async def _verify_peer(self, session, peer):
if not peer.is_tor:
address = session.peer_address()
if address:
peer.ip_addr = address[0]
# server.version goes first
message = 'server.version'
result = await session.send_request(message, self.server_version_args)
assert_good(message, result, list)
# Protocol version 1.1 returns a pair with the version first
if len(result) != 2 or not all(isinstance(x, str) for x in result):
raise BadPeerError(f'bad server.version result: {result}')
server_version, protocol_version = result
peer.server_version = server_version
peer.features['server_version'] = server_version
ptuple = protocol_tuple(protocol_version)
await asyncio.wait([
self._send_headers_subscribe(session, peer, ptuple),
self._send_server_features(session, peer),
self._send_peers_subscribe(session, peer)
])
async def _send_headers_subscribe(self, session, peer, ptuple):
message = 'blockchain.headers.subscribe'
result = await session.send_request(message)
assert_good(message, result, dict)
our_height = self.db.db_height
if ptuple < (1, 3):
their_height = result.get('block_height')
else:
their_height = result.get('height')
if not isinstance(their_height, int):
raise BadPeerError(f'invalid height {their_height}')
if abs(our_height - their_height) > 5:
raise BadPeerError(f'bad height {their_height:,d} '
f'(ours: {our_height:,d})')
# Check prior header too in case of hard fork.
check_height = min(our_height, their_height)
raw_header = self.db.raw_header(check_height)
if ptuple >= (1, 4):
ours = raw_header.hex()
message = 'blockchain.block.header'
theirs = await session.send_request(message, [check_height])
assert_good(message, theirs, str)
if ours != theirs:
raise BadPeerError(f'our header {ours} and '
f'theirs {theirs} differ')
else:
ours = self.env.coin.electrum_header(raw_header, check_height)
ours = ours.get('prev_block_hash')
message = 'blockchain.block.get_header'
theirs = await session.send_request(message, [check_height])
assert_good(message, theirs, dict)
theirs = theirs.get('prev_block_hash')
if ours != theirs:
raise BadPeerError(f'our header hash {ours} and '
f'theirs {theirs} differ')
async def _send_server_features(self, session, peer):
message = 'server.features'
features = await session.send_request(message)
assert_good(message, features, dict)
hosts = [host.lower() for host in features.get('hosts', {})]
if self.env.coin.GENESIS_HASH != features.get('genesis_hash'):
raise BadPeerError('incorrect genesis hash')
elif peer.host.lower() in hosts:
peer.update_features(features)
else:
raise BadPeerError(f'not listed in own hosts list {hosts}')
async def _send_peers_subscribe(self, session, peer):
message = 'server.peers.subscribe'
raw_peers = await session.send_request(message)
assert_good(message, raw_peers, list)
# Check the peers list we got from a remote peer.
# Each is expected to be of the form:
# [ip_addr, hostname, ['v1.0', 't51001', 's51002']]
# Call add_peer if the remote doesn't appear to know about us.
try:
real_names = [' '.join([u[1]] + u[2]) for u in raw_peers]
peers = [Peer.from_real_name(real_name, str(peer))
for real_name in real_names]
except Exception:
raise BadPeerError('bad server.peers.subscribe response')
await self._note_peers(peers)
features = self._features_to_register(peer, peers)
if not features:
return
self.logger.info(f'registering ourself with {peer}')
# We only care to wait for the response
await session.send_request('server.add_peer', [features])
#
# External interface
#
async def discover_peers(self):
"""Perform peer maintenance. This includes
1) Forgetting unreachable peers.
2) Verifying connectivity of new peers.
3) Retrying old peers at regular intervals.
"""
if self.env.peer_discovery != self.env.PD_ON:
self.logger.info('peer discovery is disabled')
return
self.logger.info(f'beginning peer discovery. Force use of '
f'proxy: {self.env.force_proxy}')
self.group.add(self._detect_proxy())
self.group.add(self._import_peers())
def info(self) -> typing.Dict[str, int]:
"""The number of peers."""
self._set_peer_statuses()
counter = Counter(peer.status for peer in self.peers)
return {
'bad': counter[PEER_BAD],
'good': counter[PEER_GOOD],
'never': counter[PEER_NEVER],
'stale': counter[PEER_STALE],
'total': len(self.peers),
}
async def add_localRPC_peer(self, real_name):
"""Add a peer passed by the admin over LocalRPC."""
await self._note_peers([Peer.from_real_name(real_name, 'RPC')])
async def on_add_peer(self, features, source_info):
"""Add a peer (but only if the peer resolves to the source)."""
if not source_info:
self.logger.info('ignored add_peer request: no source info')
return False
source = source_info[0]
peers = Peer.peers_from_features(features, source)
if not peers:
self.logger.info('ignored add_peer request: no peers given')
return False
# Just look at the first peer, require it
peer = peers[0]
host = peer.host
if peer.is_tor:
permit = self._permit_new_onion_peer()
reason = 'rate limiting'
else:
getaddrinfo = asyncio.get_event_loop().getaddrinfo
try:
infos = await getaddrinfo(host, 80, type=socket.SOCK_STREAM)
except socket.gaierror:
permit = False
reason = 'address resolution failure'
else:
permit = any(source == info[-1][0] for info in infos)
reason = 'source-destination mismatch'
if permit:
self.logger.info(f'accepted add_peer request from {source} '
f'for {host}')
await self._note_peers([peer], check_ports=True)
else:
self.logger.warning(f'rejected add_peer request from {source} '
f'for {host} ({reason})')
return permit
def on_peers_subscribe(self, is_tor):
"""Returns the server peers as a list of (ip, host, details) tuples.
We return all peers we've connected to in the last day.
Additionally, if we don't have onion routing, we return a few
hard-coded onion servers.
"""
cutoff = time.time() - STALE_SECS
recent = [peer for peer in self.peers
if peer.last_good > cutoff and
not peer.bad and peer.is_public]
onion_peers = []
# Always report ourselves if valid (even if not public)
peers = {myself for myself in self.myselves
if myself.last_good > cutoff}
# Bucket the clearnet peers and select up to two from each
buckets = defaultdict(list)
for peer in recent:
if peer.is_tor:
onion_peers.append(peer)
else:
buckets[peer.bucket()].append(peer)
for bucket_peers in buckets.values():
random.shuffle(bucket_peers)
peers.update(bucket_peers[:2])
# Add up to 20% onion peers (but up to 10 is OK anyway)
random.shuffle(onion_peers)
max_onion = 50 if is_tor else max(10, len(peers) // 4)
peers.update(onion_peers[:max_onion])
return [peer.to_tuple() for peer in peers]
def proxy_peername(self):
"""Return the peername of the proxy, if there is a proxy, otherwise
None."""
return self.proxy.peername if self.proxy else None
def rpc_data(self):
"""Peer data for the peers RPC method."""
self._set_peer_statuses()
descs = ['good', 'stale', 'never', 'bad']
def peer_data(peer):
data = peer.serialize()
data['status'] = descs[peer.status]
return data
def peer_key(peer):
return (peer.bad, -peer.last_good)
return [peer_data(peer) for peer in sorted(self.peers, key=peer_key)]

View file

@ -39,7 +39,6 @@ from lbry.wallet.server import text
from lbry.wallet.server import util from lbry.wallet.server import util
from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.peers import PeerManager
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
from lbry.wallet.server.mempool import MemPool from lbry.wallet.server.mempool import MemPool
@ -185,7 +184,6 @@ class SessionManager:
self.bp = bp self.bp = bp
self.daemon = daemon self.daemon = daemon
self.mempool = mempool self.mempool = mempool
self.peer_mgr = PeerManager(env, db)
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {} self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
@ -209,7 +207,7 @@ class SessionManager:
else: else:
protocol_class = self.env.coin.SESSIONCLS protocol_class = self.env.coin.SESSIONCLS
protocol_factory = partial(protocol_class, self, self.db, protocol_factory = partial(protocol_class, self, self.db,
self.mempool, self.peer_mgr, kind) self.mempool, kind)
host, port = args[:2] host, port = args[:2]
try: try:
@ -369,7 +367,7 @@ class SessionManager:
'logged': logged, 'logged': logged,
'paused': paused, 'paused': paused,
'pid': os.getpid(), 'pid': os.getpid(),
'peers': self.peer_mgr.info(), 'peers': [],
'requests': pending_requests, 'requests': pending_requests,
'method_counts': method_counts, 'method_counts': method_counts,
'sessions': self.session_count(), 'sessions': self.session_count(),
@ -436,7 +434,7 @@ class SessionManager:
real_name: "bch.electrumx.cash t50001 s50002" for example real_name: "bch.electrumx.cash t50001 s50002" for example
""" """
await self.peer_mgr.add_localRPC_peer(real_name) await self._notify_peer(real_name)
return f"peer '{real_name}' added" return f"peer '{real_name}' added"
async def rpc_disconnect(self, session_ids): async def rpc_disconnect(self, session_ids):
@ -487,7 +485,7 @@ class SessionManager:
async def rpc_peers(self): async def rpc_peers(self):
"""Return a list of data about server peers.""" """Return a list of data about server peers."""
return self.peer_mgr.rpc_data() return self.env.peer_hubs
async def rpc_query(self, items, limit): async def rpc_query(self, items, limit):
"""Return a list of data about server peers.""" """Return a list of data about server peers."""
@ -578,7 +576,6 @@ class SessionManager:
# Peer discovery should start after the external servers # Peer discovery should start after the external servers
# because we connect to ourself # because we connect to ourself
await asyncio.wait([ await asyncio.wait([
self.peer_mgr.discover_peers(),
self._clear_stale_sessions(), self._clear_stale_sessions(),
self._log_sessions(), self._log_sessions(),
self._manage_servers() self._manage_servers()
@ -638,6 +635,15 @@ class SessionManager:
self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit) self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
return self.history_cache[hashX] return self.history_cache[hashX]
def _notify_peer(self, peer):
notify_tasks = [
session.send_notification('blockchain.peers.subscribe', [peer])
for session in self.sessions.values() if session.subscribe_peers
]
if notify_tasks:
self.logger.info(f'notify {len(notify_tasks)} sessions of new peers')
asyncio.create_task(asyncio.wait(notify_tasks))
async def _notify_sessions(self, height, touched, new_touched): async def _notify_sessions(self, height, touched, new_touched):
"""Notify sessions about height changes and touched addresses.""" """Notify sessions about height changes and touched addresses."""
height_changed = height != self.notified_height height_changed = height != self.notified_height
@ -704,7 +710,7 @@ class SessionBase(RPCSession):
request_handlers: typing.Dict[str, typing.Callable] = {} request_handlers: typing.Dict[str, typing.Callable] = {}
version = '0.5.7' version = '0.5.7'
def __init__(self, session_mgr, db, mempool, peer_mgr, kind): def __init__(self, session_mgr, db, mempool, kind):
connection = JSONRPCConnection(JSONRPCAutoDetect) connection = JSONRPCConnection(JSONRPCAutoDetect)
self.env = session_mgr.env self.env = session_mgr.env
super().__init__(connection=connection) super().__init__(connection=connection)
@ -712,7 +718,6 @@ class SessionBase(RPCSession):
self.session_mgr = session_mgr self.session_mgr = session_mgr
self.db = db self.db = db
self.mempool = mempool self.mempool = mempool
self.peer_mgr = peer_mgr
self.kind = kind # 'RPC', 'TCP' etc. self.kind = kind # 'RPC', 'TCP' etc.
self.coin = self.env.coin self.coin = self.env.coin
self.anon_logs = self.env.anon_logs self.anon_logs = self.env.anon_logs
@ -878,7 +883,6 @@ class LBRYElectrumX(SessionBase):
'server.payment_address': cls.payment_address, 'server.payment_address': cls.payment_address,
'server.donation_address': cls.donation_address, 'server.donation_address': cls.donation_address,
'server.features': cls.server_features_async, 'server.features': cls.server_features_async,
'server.peers.get': cls.peers_get,
'server.peers.subscribe': cls.peers_subscribe, 'server.peers.subscribe': cls.peers_subscribe,
'server.version': cls.server_version, 'server.version': cls.server_version,
'blockchain.transaction.get_height': cls.transaction_get_height, 'blockchain.transaction.get_height': cls.transaction_get_height,
@ -905,6 +909,7 @@ class LBRYElectrumX(SessionBase):
LBRYElectrumX.set_server_features(self.env) LBRYElectrumX.set_server_features(self.env)
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_headers_raw = False self.subscribe_headers_raw = False
self.subscribe_peers = False
self.connection.max_response_size = self.env.max_send self.connection.max_response_size = self.env.max_send
self.hashX_subs = {} self.hashX_subs = {}
self.sv_seen = False self.sv_seen = False
@ -1083,13 +1088,10 @@ class LBRYElectrumX(SessionBase):
"""Add a peer (but only if the peer resolves to the source).""" """Add a peer (but only if the peer resolves to the source)."""
return await self.peer_mgr.on_add_peer(features, self.peer_address()) return await self.peer_mgr.on_add_peer(features, self.peer_address())
async def peers_get(self):
"""Return the server peers as a list of (ip, host, details) tuples."""
return self.env.peer_hubs
async def peers_subscribe(self): async def peers_subscribe(self):
"""Return the server peers as a list of (ip, host, details) tuples.""" """Return the server peers as a list of (ip, host, details) tuples."""
return self.peer_mgr.on_peers_subscribe(self.is_tor()) self.subscribe_peers = True
return self.env.peer_hubs
async def address_status(self, hashX): async def address_status(self, hashX):
"""Returns an address status. """Returns an address status.
@ -1332,11 +1334,7 @@ class LBRYElectrumX(SessionBase):
async def banner(self): async def banner(self):
"""Return the server banner text.""" """Return the server banner text."""
banner = f'You are connected to an {self.version} server.' banner = f'You are connected to an {self.version} server.'
banner_file = self.env.banner_file
if self.is_tor():
banner_file = self.env.tor_banner_file
else:
banner_file = self.env.banner_file
if banner_file: if banner_file:
try: try:
with codecs.open(banner_file, 'r', 'utf-8') as f: with codecs.open(banner_file, 'r', 'utf-8') as f:

View file

@ -147,3 +147,10 @@ class TestHubDiscovery(CommandTestCase):
self.assertEqual( self.assertEqual(
self.daemon.ledger.network.client.server_address_and_port, ('127.0.0.1', final_node.port) self.daemon.ledger.network.client.server_address_and_port, ('127.0.0.1', final_node.port)
) )
final_node.server.session_mgr._notify_peer('127.0.0.1:9988')
self.assertEqual(list(self.daemon.conf.known_hubs), [(final_node.hostname, final_node.port)])
await self.daemon.ledger.network.on_hub.first
self.assertEqual(list(self.daemon.conf.known_hubs), [
(final_node.hostname, final_node.port), ('127.0.0.1', 9988)
])

View file

@ -258,9 +258,35 @@ class ConfigurationTests(unittest.TestCase):
def test_known_hubs_list(self): def test_known_hubs_list(self):
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
c1 = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir) hubs = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir).known_hubs
self.assertEqual(list(c1.known_hubs), [])
c1.known_hubs.append('new.hub.io:99') self.assertEqual(hubs.serialized, {})
c1.known_hubs.save() self.assertEqual(list(hubs), [])
c2 = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir) self.assertFalse(hubs)
self.assertEqual(list(c2.known_hubs), [('new.hub.io', 99)]) hubs.set('new.hub.io:99', {'jurisdiction': 'us'})
self.assertTrue(hubs)
self.assertFalse(hubs.exists)
hubs.save()
self.assertTrue(hubs.exists)
hubs = Config(config=os.path.join(temp_dir, 'settings.yml'), wallet_dir=temp_dir).known_hubs
self.assertEqual(list(hubs), [('new.hub.io', 99)])
self.assertEqual(hubs.serialized, {'new.hub.io:99': {'jurisdiction': 'us'}})
hubs.set('any.hub.io:99', {})
hubs.set('oth.hub.io:99', {'jurisdiction': 'other'})
self.assertEqual(list(hubs), [('new.hub.io', 99), ('any.hub.io', 99), ('oth.hub.io', 99)])
self.assertEqual(hubs.filter(), {
('new.hub.io', 99): {'jurisdiction': 'us'},
('oth.hub.io', 99): {'jurisdiction': 'other'},
('any.hub.io', 99): {}
})
self.assertEqual(hubs.filter(foo="bar"), {})
self.assertEqual(hubs.filter(jurisdiction="us"), {
('new.hub.io', 99): {'jurisdiction': 'us'}
})
self.assertEqual(hubs.filter(jurisdiction="us", match_none=True), {
('new.hub.io', 99): {'jurisdiction': 'us'},
('any.hub.io', 99): {}
})