From 8cf07ecb91c7c903bd2c4997f237576bd4932fed Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 2 Nov 2020 14:29:56 -0500 Subject: [PATCH] validate public ipv4 address for `get_external_ip` -fixes invalid, ipv6, or private addresses being returned --- aioupnp/commands.py | 9 +++++---- aioupnp/util.py | 19 +++++++++++++++++++ tests/test_upnp.py | 24 ++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/aioupnp/commands.py b/aioupnp/commands.py index fa7b46e..0433e0a 100644 --- a/aioupnp/commands.py +++ b/aioupnp/commands.py @@ -5,6 +5,7 @@ import logging from aioupnp.protocols.scpd import scpd_post from aioupnp.device import Service from aioupnp.fault import UPnPError +from aioupnp.util import is_valid_public_ipv4 log = logging.getLogger(__name__) @@ -252,10 +253,10 @@ class SOAPCommands: if not self.is_registered(name): raise NotImplementedError() # pragma: no cover assert name in self._wrappers_no_args - result: str = await self._wrappers_no_args[name]() - # if not result: - # raise UPnPError("Got null external ip address") - return result + external_ip: str = await self._wrappers_no_args[name]() + if not is_valid_public_ipv4(external_ip): + raise UPnPError(f"Got invalid external ipv4 address: {external_ip}") + return external_ip # async def GetNATRSIPStatus(self) -> Tuple[bool, bool]: # """Returns (NewRSIPAvailable, NewNATEnabled)""" diff --git a/aioupnp/util.py b/aioupnp/util.py index 7553639..540d33e 100644 --- a/aioupnp/util.py +++ b/aioupnp/util.py @@ -1,3 +1,4 @@ +import ipaddress import typing from collections import OrderedDict @@ -47,3 +48,21 @@ def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyS matched_key: typing.AnyStr = match[0] return source[matched_key] raise KeyError("overlapping keys") + + +# the ipaddress module does not show these subnets as reserved +CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10') +IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24') + + +def is_valid_public_ipv4(address): + try: + parsed_ip = ipaddress.ip_address(address) + if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback, + parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private, parsed_ip.is_reserved)): + return False + else: + return not any((CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")), + IPV4_TO_6_RELAY_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")))) + except (ipaddress.AddressValueError, ValueError): + return False diff --git a/tests/test_upnp.py b/tests/test_upnp.py index 096a0d4..cec00f6 100644 --- a/tests/test_upnp.py +++ b/tests/test_upnp.py @@ -51,6 +51,30 @@ class TestGetExternalIPAddress(UPnPCommandTestCase): external_ip = await upnp.get_external_ip() self.assertEqual("11.222.3.44", external_ip) + async def test_handle_got_ipv6(self): + request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n\r\n' + self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 392 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n2a02:2e02:30c:db00:4a8d:36ff:feb0:5202\n\n\t\n\n"}) + self.addCleanup(self.replies.pop, request) + with mock_tcp_and_udp(self.loop, tcp_replies=self.replies): + gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop) + await gateway.discover_commands() + upnp = UPnP(self.client_address, self.gateway_address, gateway) + with self.assertRaises(UPnPError) as err: + await upnp.get_external_ip() + self.assertTrue(str(err).startswith("Got invalid external ipv4 address")) + + async def test_handle_got_invalid_ip(self): + request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n\r\n' + self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 360 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\npotato\n\n\t\n\n"}) + self.addCleanup(self.replies.pop, request) + with mock_tcp_and_udp(self.loop, tcp_replies=self.replies): + gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop) + await gateway.discover_commands() + upnp = UPnP(self.client_address, self.gateway_address, gateway) + with self.assertRaises(UPnPError) as err: + await upnp.get_external_ip() + self.assertTrue(str(err).startswith("Got invalid external ipv4 address")) + async def test_null_external_ip(self): request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n\r\n' self.replies.update({