validate public ipv4 address for get_external_ip

-fixes invalid, ipv6, or private addresses being returned
This commit is contained in:
Jack Robison 2020-11-02 14:29:56 -05:00
parent 7223f36c1e
commit 8cf07ecb91
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 48 additions and 4 deletions

View file

@ -5,6 +5,7 @@ import logging
from aioupnp.protocols.scpd import scpd_post from aioupnp.protocols.scpd import scpd_post
from aioupnp.device import Service from aioupnp.device import Service
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
from aioupnp.util import is_valid_public_ipv4
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -252,10 +253,10 @@ class SOAPCommands:
if not self.is_registered(name): if not self.is_registered(name):
raise NotImplementedError() # pragma: no cover raise NotImplementedError() # pragma: no cover
assert name in self._wrappers_no_args assert name in self._wrappers_no_args
result: str = await self._wrappers_no_args[name]() external_ip: str = await self._wrappers_no_args[name]()
# if not result: if not is_valid_public_ipv4(external_ip):
# raise UPnPError("Got null external ip address") raise UPnPError(f"Got invalid external ipv4 address: {external_ip}")
return result return external_ip
# async def GetNATRSIPStatus(self) -> Tuple[bool, bool]: # async def GetNATRSIPStatus(self) -> Tuple[bool, bool]:
# """Returns (NewRSIPAvailable, NewNATEnabled)""" # """Returns (NewRSIPAvailable, NewNATEnabled)"""

View file

@ -1,3 +1,4 @@
import ipaddress
import typing import typing
from collections import OrderedDict from collections import OrderedDict
@ -47,3 +48,21 @@ def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyS
matched_key: typing.AnyStr = match[0] matched_key: typing.AnyStr = match[0]
return source[matched_key] return source[matched_key]
raise KeyError("overlapping keys") raise KeyError("overlapping keys")
# the ipaddress module does not show these subnets as reserved
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
def is_valid_public_ipv4(address):
try:
parsed_ip = ipaddress.ip_address(address)
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private, parsed_ip.is_reserved)):
return False
else:
return not any((CARRIER_GRADE_NAT_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32")),
IPV4_TO_6_RELAY_SUBNET.supernet_of(ipaddress.ip_network(f"{address}/32"))))
except (ipaddress.AddressValueError, ValueError):
return False

View file

@ -51,6 +51,30 @@ class TestGetExternalIPAddress(UPnPCommandTestCase):
external_ip = await upnp.get_external_ip() external_ip = await upnp.get_external_ip()
self.assertEqual("11.222.3.44", external_ip) self.assertEqual("11.222.3.44", external_ip)
async def test_handle_got_ipv6(self):
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 392 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>2a02:2e02:30c:db00:4a8d:36ff:feb0:5202</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
self.addCleanup(self.replies.pop, request)
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
await gateway.discover_commands()
upnp = UPnP(self.client_address, self.gateway_address, gateway)
with self.assertRaises(UPnPError) as err:
await upnp.get_external_ip()
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
async def test_handle_got_invalid_ip(self):
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 360 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>potato</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
self.addCleanup(self.replies.pop, request)
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
await gateway.discover_commands()
upnp = UPnP(self.client_address, self.gateway_address, gateway)
with self.assertRaises(UPnPError) as err:
await upnp.get_external_ip()
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
async def test_null_external_ip(self): async def test_null_external_ip(self):
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n' request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
self.replies.update({ self.replies.update({