From c465d6a6c2687723e6da5f15dc3798268b55d284 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Apr 2021 12:20:37 -0400 Subject: [PATCH 1/3] ignore udp packets with low source ports --- lbry/dht/peer.py | 4 +-- lbry/wallet/server/udp.py | 13 +++++++--- tests/unit/dht/test_peer.py | 51 +++++++++++++++++++------------------ 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index afba9bc56..cf30b359e 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -155,9 +155,9 @@ class KademliaPeer: if self._node_id is not None: if not len(self._node_id) == constants.HASH_LENGTH: raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode())) - if self.udp_port is not None and not 1 <= self.udp_port <= 65535: + if self.udp_port is not None and not 1024 <= self.udp_port <= 65535: raise ValueError("invalid udp port") - if self.tcp_port is not None and not 1 <= self.tcp_port <= 65535: + if self.tcp_port is not None and not 1024 <= self.tcp_port <= 65535: raise ValueError("invalid tcp port") if not is_valid_public_ipv4(self.address, self.allow_localhost): raise ValueError(f"invalid ip address: '{self.address}'") diff --git a/lbry/wallet/server/udp.py b/lbry/wallet/server/udp.py index fd043ebfa..2e44654d1 100644 --- a/lbry/wallet/server/udp.py +++ b/lbry/wallet/server/udp.py @@ -3,7 +3,7 @@ import struct from time import perf_counter import logging from typing import Optional, Tuple, NamedTuple -from lbry.utils import LRUCache +from lbry.utils import LRUCache, is_valid_public_ipv4 # from prometheus_client import Counter @@ -69,7 +69,8 @@ class SPVPong(NamedTuple): class SPVServerStatusProtocol(asyncio.DatagramProtocol): PROTOCOL_VERSION = 1 - def __init__(self, height: int, tip: bytes, throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10): + def __init__(self, height: int, tip: bytes, throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10, + allow_localhost: bool = False): super().__init__() self.transport: Optional[asyncio.transports.DatagramTransport] = None self._height = height @@ -80,6 +81,7 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol): self._throttle = LRUCache(throttle_cache_size) self._should_log = LRUCache(throttle_cache_size) self._min_delay = 1 / throttle_reqs_per_sec + self._allow_localhost = allow_localhost def update_cached_response(self): self._cached_response = SPVPong.make(self._height, self._tip, self._flags, self.PROTOCOL_VERSION) @@ -119,7 +121,10 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol): except (ValueError, struct.error, AttributeError, TypeError): # log.exception("derp") return - self.transport.sendto(self.make_pong(addr[0]), addr) + if is_valid_public_ipv4(addr[0], allow_localhost=self._allow_localhost) and addr[1] >= 1024: + self.transport.sendto(self.make_pong(addr[0]), addr) + else: + log.warning("odd packet from %s:%i", addr[0], addr[1]) # ping_count_metric.inc() def connection_made(self, transport) -> None: @@ -141,8 +146,8 @@ class StatusServer: if self.is_running: return loop = asyncio.get_event_loop() - self._protocol = SPVServerStatusProtocol(height, tip) interface = interface if interface.lower() != 'localhost' else '127.0.0.1' + self._protocol = SPVServerStatusProtocol(height, tip, allow_localhost=interface == '127.0.0.1') await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port)) log.info("started udp status server on %s:%i", interface, port) diff --git a/tests/unit/dht/test_peer.py b/tests/unit/dht/test_peer.py index 8a61dad14..2659db012 100644 --- a/tests/unit/dht/test_peer.py +++ b/tests/unit/dht/test_peer.py @@ -10,8 +10,8 @@ class PeerTest(AsyncioTestCase): self.loop = asyncio.get_event_loop() self.peer_manager = PeerManager(self.loop) self.node_ids = [generate_id(), generate_id(), generate_id()] - self.first_contact = make_kademlia_peer(self.node_ids[1], '1.0.0.1', udp_port=1000) - self.second_contact = make_kademlia_peer(self.node_ids[0], '1.0.0.2', udp_port=1000) + self.first_contact = make_kademlia_peer(self.node_ids[1], '1.0.0.1', udp_port=1024) + self.second_contact = make_kademlia_peer(self.node_ids[0], '1.0.0.2', udp_port=1024) def test_peer_is_good_unknown_peer(self): # Scenario: peer replied, but caller doesn't know the node_id. @@ -25,36 +25,37 @@ class PeerTest(AsyncioTestCase): def test_make_contact_error_cases(self): self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '1.2.3.4', 100000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '1.2.3.4.5', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], 'this is not an ip', 1000) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '1.2.3.4.5', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], 'this is not an ip', 1024) self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '1.2.3.4', -1000) self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '1.2.3.4', 0) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '1.2.3.4', 1023) self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '1.2.3.4', 70000) - self.assertRaises(ValueError, make_kademlia_peer, b'not valid node id', '1.2.3.4', 1000) + self.assertRaises(ValueError, make_kademlia_peer, b'not valid node id', '1.2.3.4', 1024) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '0.0.0.0', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '10.0.0.1', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '100.64.0.1', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '127.0.0.1', 1000) - self.assertIsNotNone(make_kademlia_peer(self.node_ids[1], '127.0.0.1', 1000, allow_localhost=True)) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.0.1', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '172.16.0.1', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '169.254.1.1', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.0.0.2', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.0.2.2', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.88.99.2', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '198.18.1.1', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '198.51.100.2', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '198.51.100.2', 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '203.0.113.4', 1000) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '0.0.0.0', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '10.0.0.1', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '100.64.0.1', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '127.0.0.1', 1024) + self.assertIsNotNone(make_kademlia_peer(self.node_ids[1], '127.0.0.1', 1024, allow_localhost=True)) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.168.0.1', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '172.16.0.1', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '169.254.1.1', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.0.0.2', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.0.2.2', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '192.88.99.2', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '198.18.1.1', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '198.51.100.2', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '198.51.100.2', 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '203.0.113.4', 1024) for i in range(32): - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], f"{224 + i}.0.0.0", 1000) - self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '255.255.255.255', 1000) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], f"{224 + i}.0.0.0", 1024) + self.assertRaises(ValueError, make_kademlia_peer, self.node_ids[1], '255.255.255.255', 1024) self.assertRaises( - ValueError, make_kademlia_peer, self.node_ids[1], 'beee:eeee:eeee:eeee:eeee:eeee:eeee:eeef', 1000 + ValueError, make_kademlia_peer, self.node_ids[1], 'beee:eeee:eeee:eeee:eeee:eeee:eeee:eeef', 1024 ) self.assertRaises( - ValueError, make_kademlia_peer, self.node_ids[1], '2001:db8::ff00:42:8329', 1000 + ValueError, make_kademlia_peer, self.node_ids[1], '2001:db8::ff00:42:8329', 1024 ) def test_is_valid_ipv4(self): @@ -79,7 +80,7 @@ class PeerTest(AsyncioTestCase): def test_boolean(self): self.assertNotEqual(self.first_contact, self.second_contact) self.assertEqual( - self.second_contact, make_kademlia_peer(self.node_ids[0], '1.0.0.2', udp_port=1000) + self.second_contact, make_kademlia_peer(self.node_ids[0], '1.0.0.2', udp_port=1024) ) def test_compact_ip(self): From c094d8f2e81ec4d1c95d983930cfe6eaaa5fe9e5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Apr 2021 15:28:00 -0400 Subject: [PATCH 2/3] add ALLOW_LAN_UDP hub setting --- lbry/utils.py | 6 ++++-- lbry/wallet/server/env.py | 1 + lbry/wallet/server/server.py | 2 +- lbry/wallet/server/udp.py | 12 ++++++++---- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/lbry/utils.py b/lbry/utils.py index de55504f2..e5d624991 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -380,13 +380,15 @@ CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10') IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24') -def is_valid_public_ipv4(address, allow_localhost: bool = False): +def is_valid_public_ipv4(address, allow_localhost: bool = False, allow_lan: bool = False): try: parsed_ip = ipaddress.ip_address(address) if parsed_ip.is_loopback and allow_localhost: return True + if allow_lan and parsed_ip.is_private: + return True if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback, - parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private, parsed_ip.is_reserved)): + parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private)): return False else: return not any((CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")), diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 18f594a96..5477cf9e1 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -73,6 +73,7 @@ class Env: self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) self.anon_logs = self.boolean('ANON_LOGS', False) self.log_sessions = self.integer('LOG_SESSIONS', 3600) + self.allow_lan_udp = self.boolean('ALLOW_LAN_UDP', False) # Peer discovery self.peer_discovery = self.peer_discovery_enum() self.peer_announce = self.boolean('PEER_ANNOUNCE', True) diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index fd85bd202..956f06ed2 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -115,7 +115,7 @@ class Server: if self.env.udp_port: await self.bp.status_server.start( 0, bytes.fromhex(self.bp.coin.GENESIS_HASH)[::-1], - self.env.host, self.env.udp_port + self.env.host, self.env.udp_port, self.env.allow_lan_udp ) await _start_cancellable(self.bp.fetch_and_process_blocks) diff --git a/lbry/wallet/server/udp.py b/lbry/wallet/server/udp.py index 2e44654d1..1357beb04 100644 --- a/lbry/wallet/server/udp.py +++ b/lbry/wallet/server/udp.py @@ -70,7 +70,7 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol): PROTOCOL_VERSION = 1 def __init__(self, height: int, tip: bytes, throttle_cache_size: int = 1024, throttle_reqs_per_sec: int = 10, - allow_localhost: bool = False): + allow_localhost: bool = False, allow_lan: bool = False): super().__init__() self.transport: Optional[asyncio.transports.DatagramTransport] = None self._height = height @@ -82,6 +82,7 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol): self._should_log = LRUCache(throttle_cache_size) self._min_delay = 1 / throttle_reqs_per_sec self._allow_localhost = allow_localhost + self._allow_lan = allow_lan def update_cached_response(self): self._cached_response = SPVPong.make(self._height, self._tip, self._flags, self.PROTOCOL_VERSION) @@ -121,7 +122,8 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol): except (ValueError, struct.error, AttributeError, TypeError): # log.exception("derp") return - if is_valid_public_ipv4(addr[0], allow_localhost=self._allow_localhost) and addr[1] >= 1024: + if addr[1] >= 1024 and is_valid_public_ipv4( + addr[0], allow_localhost=self._allow_localhost, allow_lan=self._allow_lan): self.transport.sendto(self.make_pong(addr[0]), addr) else: log.warning("odd packet from %s:%i", addr[0], addr[1]) @@ -142,12 +144,14 @@ class StatusServer: def __init__(self): self._protocol: Optional[SPVServerStatusProtocol] = None - async def start(self, height: int, tip: bytes, interface: str, port: int): + async def start(self, height: int, tip: bytes, interface: str, port: int, allow_lan: bool = False): if self.is_running: return loop = asyncio.get_event_loop() interface = interface if interface.lower() != 'localhost' else '127.0.0.1' - self._protocol = SPVServerStatusProtocol(height, tip, allow_localhost=interface == '127.0.0.1') + self._protocol = SPVServerStatusProtocol( + height, tip, allow_localhost=interface == '127.0.0.1', allow_lan=allow_lan + ) await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port)) log.info("started udp status server on %s:%i", interface, port) From 21d0038ff2c34e4f48a78355e43ea8055555e9bd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Apr 2021 16:46:24 -0400 Subject: [PATCH 3/3] add timestamps to hub log --- lbry/wallet/server/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index 3e36db133..d99512b93 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -25,7 +25,7 @@ def main(): parser = get_argument_parser() args = parser.parse_args() coin_class = get_coin_class(args.spvserver) - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") logging.info('lbry.server starting') logging.getLogger('aiohttp').setLevel(logging.WARNING) logging.getLogger('elasticsearch').setLevel(logging.WARNING)