diff --git a/.pylintrc b/.pylintrc index c97b3d2..8fe95e7 100644 --- a/.pylintrc +++ b/.pylintrc @@ -33,7 +33,7 @@ unsafe-load-any-extension=no # A comma-separated list of package or module names from where C extensions may # be loaded. Extensions are loading into the active Python interpreter and may # run arbitrary code -# extension-pkg-whitelist= +extension-pkg-whitelist=netifaces, # Allow optimization of some AST trees. This will activate a peephole AST # optimizer, which will apply various small optimizations. For instance, it can @@ -123,7 +123,9 @@ disable= assignment-from-no-return, useless-return, assignment-from-none, - stop-iteration-return + stop-iteration-return, + unsubscriptable-object, + unsupported-membership-test [REPORTS] diff --git a/aioupnp/__init__.py b/aioupnp/__init__.py index 523847a..4f107a6 100644 --- a/aioupnp/__init__.py +++ b/aioupnp/__init__.py @@ -1,7 +1,6 @@ -__version__ = "0.0.12" +__version__ = "0.0.13a" __name__ = "aioupnp" __author__ = "Jack Robison" __maintainer__ = "Jack Robison" __license__ = "MIT" __email__ = "jackrobison@lbry.io" - diff --git a/aioupnp/commands.py b/aioupnp/commands.py index 4e514bf..3200a62 100644 --- a/aioupnp/commands.py +++ b/aioupnp/commands.py @@ -1,7 +1,6 @@ import asyncio import time import typing -import functools import logging from typing import Tuple from aioupnp.protocols.scpd import scpd_post @@ -18,7 +17,7 @@ def soap_bool(x: typing.Optional[str]) -> bool: return False if not x or str(x).lower() in ['false', 'False'] else True -def recast_single_result(t, result): +def recast_single_result(t: type, result: typing.Any) -> typing.Optional[typing.Union[str, int, float, bool]]: if t is bool: return soap_bool(result) if t is str: @@ -26,44 +25,22 @@ def recast_single_result(t, result): return t(result) -def recast_return(return_annotation, result, result_keys: typing.List[str]): - if return_annotation is None: - return None +def recast_return(return_annotation, result: typing.Dict[str, typing.Union[int, str]], + result_keys: typing.List[str]) -> typing.Tuple: + if return_annotation is None or len(result_keys) == 0: + return () if len(result_keys) == 1: assert len(result_keys) == 1 single_result = result[result_keys[0]] - return recast_single_result(return_annotation, single_result) - + return (recast_single_result(return_annotation, single_result), ) annotated_args: typing.List[type] = list(return_annotation.__args__) assert len(annotated_args) == len(result_keys) - recast_results: typing.List[typing.Optional[typing.Union[str, int, bool, bytes]]] = [] + recast_results: typing.List[typing.Optional[typing.Union[str, int, float, bool]]] = [] for type_annotation, result_key in zip(annotated_args, result_keys): - recast_results.append(recast_single_result(type_annotation, result[result_key])) + recast_results.append(recast_single_result(type_annotation, result.get(result_key, None))) return tuple(recast_results) -def soap_command(fn): - @functools.wraps(fn) - async def wrapper(self: 'SOAPCommands', **kwargs): - if not self.is_registered(fn.__name__): - return fn(self, **kwargs) - service = self.get_service(fn.__name__) - assert service.controlURL is not None - assert service.serviceType is not None - response, xml_bytes, err = await scpd_post( - service.controlURL, self._base_address.decode(), self._port, fn.__name__, self._registered[service][fn.__name__][0], - service.serviceType.encode(), self._loop, **kwargs - ) - if err is not None: - - self._requests.append((fn.__name__, kwargs, xml_bytes, None, err, time.time())) - raise err - result = recast_return(fn.__annotations__.get('return'), response, self._registered[service][fn.__name__][1]) - self._requests.append((fn.__name__, kwargs, xml_bytes, result, None, time.time())) - return result - return wrapper - - class SOAPCommands: """ Type annotated wrappers for common UPnP SOAP functions @@ -77,39 +54,41 @@ class SOAPCommands: SOAP_COMMANDS: typing.List[str] = [ 'AddPortMapping', - 'GetNATRSIPStatus', 'GetGenericPortMappingEntry', 'GetSpecificPortMappingEntry', - 'SetConnectionType', - 'GetExternalIPAddress', - 'GetConnectionTypeInfo', - 'GetStatusInfo', - 'ForceTermination', 'DeletePortMapping', - 'RequestConnection', - 'GetCommonLinkProperties', - 'GetTotalBytesSent', - 'GetTotalBytesReceived', - 'GetTotalPacketsSent', - 'GetTotalPacketsReceived', - 'X_GetICSStatistics', - 'GetDefaultConnectionService', - 'SetDefaultConnectionService', - 'SetEnabledForInternet', - 'GetEnabledForInternet', - 'GetMaximumActiveConnections', - 'GetActiveConnections' + 'GetExternalIPAddress', + # 'SetConnectionType', + # 'GetNATRSIPStatus', + # 'GetConnectionTypeInfo', + # 'GetStatusInfo', + # 'ForceTermination', + # 'RequestConnection', + # 'GetCommonLinkProperties', + # 'GetTotalBytesSent', + # 'GetTotalBytesReceived', + # 'GetTotalPacketsSent', + # 'GetTotalPacketsReceived', + # 'X_GetICSStatistics', + # 'GetDefaultConnectionService', + # 'SetDefaultConnectionService', + # 'SetEnabledForInternet', + # 'GetEnabledForInternet', + # 'GetMaximumActiveConnections', + # 'GetActiveConnections' ] def __init__(self, loop: asyncio.AbstractEventLoop, base_address: bytes, port: int) -> None: self._loop = loop self._registered: typing.Dict[Service, typing.Dict[str, typing.Tuple[typing.List[str], typing.List[str]]]] = {} + self._wrappers_no_args: typing.Dict[str, typing.Callable[[], typing.Awaitable[typing.Any]]] = {} + self._wrappers_kwargs: typing.Dict[str, typing.Callable[..., typing.Awaitable[typing.Any]]] = {} + self._base_address = base_address self._port = port self._requests: typing.List[typing.Tuple[str, typing.Dict[str, typing.Any], bytes, - typing.Optional[typing.Dict[str, typing.Any]], - typing.Optional[Exception], float]] = [] + typing.Tuple, typing.Optional[Exception], float]] = [] def is_registered(self, name: str) -> bool: if name not in self.SOAP_COMMANDS: @@ -127,8 +106,37 @@ class SOAPCommands: return service raise ValueError(name) + def _register_soap_wrapper(self, name: str) -> None: + annotations: typing.Dict[str, typing.Any] = typing.get_type_hints(getattr(self, name)) + service = self.get_service(name) + input_names: typing.List[str] = self._registered[service][name][0] + output_names: typing.List[str] = self._registered[service][name][1] + + async def wrapper(**kwargs: typing.Any) -> typing.Tuple: + + assert service.controlURL is not None + assert service.serviceType is not None + response, xml_bytes, err = await scpd_post( + service.controlURL, self._base_address.decode(), self._port, name, input_names, + service.serviceType.encode(), self._loop, **kwargs + ) + if err is not None: + assert isinstance(xml_bytes, bytes) + self._requests.append((name, kwargs, xml_bytes, (), err, time.time())) + raise err + assert 'return' in annotations + result = recast_return(annotations['return'], response, output_names) + + self._requests.append((name, kwargs, xml_bytes, result, None, time.time())) + return result + + if not len(list(k for k in annotations if k != 'return')): + self._wrappers_no_args[name] = wrapper + else: + self._wrappers_kwargs[name] = wrapper + return None + def register(self, name: str, service: Service, inputs: typing.List[str], outputs: typing.List[str]) -> None: - # control_url: str, service_type: bytes, if name not in self.SOAP_COMMANDS: raise AttributeError(name) if self.is_registered(name): @@ -136,122 +144,227 @@ class SOAPCommands: if service not in self._registered: self._registered[service] = {} self._registered[service][name] = inputs, outputs + self._register_soap_wrapper(name) - @soap_command async def AddPortMapping(self, NewRemoteHost: str, NewExternalPort: int, NewProtocol: str, NewInternalPort: int, NewInternalClient: str, NewEnabled: int, NewPortMappingDescription: str, NewLeaseDuration: str) -> None: """Returns None""" - raise NotImplementedError() + name = "AddPortMapping" + if not self.is_registered(name): + raise NotImplementedError() + assert name in self._wrappers_kwargs + await self._wrappers_kwargs[name]( + NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol, + NewInternalPort=NewInternalPort, NewInternalClient=NewInternalClient, NewEnabled=NewEnabled, + NewPortMappingDescription=NewPortMappingDescription, NewLeaseDuration=NewLeaseDuration + ) + return None - @soap_command - async def GetNATRSIPStatus(self) -> Tuple[bool, bool]: - """Returns (NewRSIPAvailable, NewNATEnabled)""" - raise NotImplementedError() - - @soap_command async def GetGenericPortMappingEntry(self, NewPortMappingIndex: int) -> Tuple[str, int, str, int, str, bool, str, int]: """ Returns (NewRemoteHost, NewExternalPort, NewProtocol, NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration) """ - raise NotImplementedError() + name = "GetGenericPortMappingEntry" + if not self.is_registered(name): + raise NotImplementedError() + assert name in self._wrappers_kwargs + result: Tuple[str, int, str, int, str, bool, str, int] = await self._wrappers_kwargs[name]( + NewPortMappingIndex=NewPortMappingIndex + ) + return result - @soap_command async def GetSpecificPortMappingEntry(self, NewRemoteHost: str, NewExternalPort: int, NewProtocol: str) -> Tuple[int, str, bool, str, int]: """Returns (NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration)""" - raise NotImplementedError() + name = "GetSpecificPortMappingEntry" + if not self.is_registered(name): + raise NotImplementedError() + assert name in self._wrappers_kwargs + result: Tuple[int, str, bool, str, int] = await self._wrappers_kwargs[name]( + NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol + ) + return result - @soap_command - async def SetConnectionType(self, NewConnectionType: str) -> None: - """Returns None""" - raise NotImplementedError() - - @soap_command - async def GetExternalIPAddress(self) -> str: - """Returns (NewExternalIPAddress)""" - raise NotImplementedError() - - @soap_command - async def GetConnectionTypeInfo(self) -> Tuple[str, str]: - """Returns (NewConnectionType, NewPossibleConnectionTypes)""" - raise NotImplementedError() - - @soap_command - async def GetStatusInfo(self) -> Tuple[str, str, int]: - """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)""" - raise NotImplementedError() - - @soap_command - async def ForceTermination(self) -> None: - """Returns None""" - raise NotImplementedError() - - @soap_command async def DeletePortMapping(self, NewRemoteHost: str, NewExternalPort: int, NewProtocol: str) -> None: """Returns None""" - raise NotImplementedError() + name = "DeletePortMapping" + if not self.is_registered(name): + raise NotImplementedError() + assert name in self._wrappers_kwargs + await self._wrappers_kwargs[name]( + NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol + ) + return None - @soap_command - async def RequestConnection(self) -> None: - """Returns None""" - raise NotImplementedError() + async def GetExternalIPAddress(self) -> str: + """Returns (NewExternalIPAddress)""" + name = "GetExternalIPAddress" + if not self.is_registered(name): + raise NotImplementedError() + assert name in self._wrappers_no_args + result: Tuple[str] = await self._wrappers_no_args[name]() + return result[0] - @soap_command - async def GetCommonLinkProperties(self) -> Tuple[str, int, int, str]: - """Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate, NewPhysicalLinkStatus)""" - raise NotImplementedError() - - @soap_command - async def GetTotalBytesSent(self) -> int: - """Returns (NewTotalBytesSent)""" - raise NotImplementedError() - - @soap_command - async def GetTotalBytesReceived(self) -> int: - """Returns (NewTotalBytesReceived)""" - raise NotImplementedError() - - @soap_command - async def GetTotalPacketsSent(self) -> int: - """Returns (NewTotalPacketsSent)""" - raise NotImplementedError() - - @soap_command - async def GetTotalPacketsReceived(self) -> int: - """Returns (NewTotalPacketsReceived)""" - raise NotImplementedError() - - @soap_command - async def X_GetICSStatistics(self) -> Tuple[int, int, int, int, str, str]: - """Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived, Layer1DownstreamMaxBitRate, Uptime)""" - raise NotImplementedError() - - @soap_command - async def GetDefaultConnectionService(self) -> str: - """Returns (NewDefaultConnectionService)""" - raise NotImplementedError() - - @soap_command - async def SetDefaultConnectionService(self, NewDefaultConnectionService: str) -> None: - """Returns (None)""" - raise NotImplementedError() - - @soap_command - async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None: - raise NotImplementedError() - - @soap_command - async def GetEnabledForInternet(self) -> bool: - raise NotImplementedError() - - @soap_command - async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int): - raise NotImplementedError() - - @soap_command - async def GetActiveConnections(self) -> Tuple[str, str]: - """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID""" - raise NotImplementedError() + # async def GetNATRSIPStatus(self) -> Tuple[bool, bool]: + # """Returns (NewRSIPAvailable, NewNATEnabled)""" + # name = "GetNATRSIPStatus" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[bool, bool] = await self._wrappers_no_args[name]() + # return result[0], result[1] + # + # async def SetConnectionType(self, NewConnectionType: str) -> None: + # """Returns None""" + # name = "SetConnectionType" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_kwargs + # await self._wrappers_kwargs[name](NewConnectionType=NewConnectionType) + # return None + # + # async def GetConnectionTypeInfo(self) -> Tuple[str, str]: + # """Returns (NewConnectionType, NewPossibleConnectionTypes)""" + # name = "GetConnectionTypeInfo" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[str, str] = await self._wrappers_no_args[name]() + # return result + # + # async def GetStatusInfo(self) -> Tuple[str, str, int]: + # """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)""" + # name = "GetStatusInfo" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[str, str, int] = await self._wrappers_no_args[name]() + # return result + # + # async def ForceTermination(self) -> None: + # """Returns None""" + # name = "ForceTermination" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # await self._wrappers_no_args[name]() + # return None + # + # async def RequestConnection(self) -> None: + # """Returns None""" + # name = "RequestConnection" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # await self._wrappers_no_args[name]() + # return None + # + # async def GetCommonLinkProperties(self) -> Tuple[str, int, int, str]: + # """Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate, + # NewPhysicalLinkStatus)""" + # name = "GetCommonLinkProperties" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[str, int, int, str] = await self._wrappers_no_args[name]() + # return result + # + # async def GetTotalBytesSent(self) -> int: + # """Returns (NewTotalBytesSent)""" + # name = "GetTotalBytesSent" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[int] = await self._wrappers_no_args[name]() + # return result[0] + # + # async def GetTotalBytesReceived(self) -> int: + # """Returns (NewTotalBytesReceived)""" + # name = "GetTotalBytesReceived" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[int] = await self._wrappers_no_args[name]() + # return result[0] + # + # async def GetTotalPacketsSent(self) -> int: + # """Returns (NewTotalPacketsSent)""" + # name = "GetTotalPacketsSent" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[int] = await self._wrappers_no_args[name]() + # return result[0] + # + # async def GetTotalPacketsReceived(self) -> int: + # """Returns (NewTotalPacketsReceived)""" + # name = "GetTotalPacketsReceived" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[int] = await self._wrappers_no_args[name]() + # return result[0] + # + # async def X_GetICSStatistics(self) -> Tuple[int, int, int, int, str, str]: + # """Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived, + # Layer1DownstreamMaxBitRate, Uptime)""" + # name = "X_GetICSStatistics" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[int, int, int, int, str, str] = await self._wrappers_no_args[name]() + # return result + # + # async def GetDefaultConnectionService(self) -> str: + # """Returns (NewDefaultConnectionService)""" + # name = "GetDefaultConnectionService" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[str] = await self._wrappers_no_args[name]() + # return result[0] + # + # async def SetDefaultConnectionService(self, NewDefaultConnectionService: str) -> None: + # """Returns (None)""" + # name = "SetDefaultConnectionService" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_kwargs + # await self._wrappers_kwargs[name](NewDefaultConnectionService=NewDefaultConnectionService) + # return None + # + # async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None: + # name = "SetEnabledForInternet" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_kwargs + # await self._wrappers_kwargs[name](NewEnabledForInternet=NewEnabledForInternet) + # return None + # + # async def GetEnabledForInternet(self) -> bool: + # name = "GetEnabledForInternet" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[bool] = await self._wrappers_no_args[name]() + # return result[0] + # + # async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int) -> None: + # name = "GetMaximumActiveConnections" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_kwargs + # await self._wrappers_kwargs[name](NewActiveConnectionIndex=NewActiveConnectionIndex) + # return None + # + # async def GetActiveConnections(self) -> Tuple[str, str]: + # """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID""" + # name = "GetActiveConnections" + # if not self.is_registered(name): + # raise NotImplementedError() + # assert name in self._wrappers_no_args + # result: Tuple[str, str] = await self._wrappers_no_args[name]() + # return result diff --git a/aioupnp/device.py b/aioupnp/device.py index d6d84d4..e3892c4 100644 --- a/aioupnp/device.py +++ b/aioupnp/device.py @@ -6,7 +6,8 @@ log = logging.getLogger(__name__) class CaseInsensitive: - def __init__(self, **kwargs: typing.Dict[str, typing.Union[str, typing.Dict[str, typing.Any], typing.List[typing.Any]]]) -> None: + def __init__(self, **kwargs: typing.Dict[str, typing.Union[str, typing.Dict[str, typing.Any], + typing.List[typing.Any]]]) -> None: keys: typing.List[str] = list(kwargs.keys()) for k in keys: if not k.startswith("_"): diff --git a/aioupnp/gateway.py b/aioupnp/gateway.py index 1ed4b1e..b40d8cb 100644 --- a/aioupnp/gateway.py +++ b/aioupnp/gateway.py @@ -69,8 +69,6 @@ def parse_location(location: bytes) -> typing.Tuple[bytes, int]: class Gateway: - commands: SOAPCommands - def __init__(self, ok_packet: SSDPDatagram, m_search_args: typing.Dict[str, typing.Union[int, str]], lan_address: str, gateway_address: str, loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None: @@ -144,10 +142,10 @@ class Gateway: @property def soap_requests(self) -> typing.List[typing.Tuple[str, typing.Dict[str, typing.Any], bytes, - typing.Optional[typing.Dict[str, typing.Any]], + typing.Optional[typing.Tuple], typing.Optional[Exception], float]]: soap_call_infos: typing.List[typing.Tuple[str, typing.Dict[str, typing.Any], bytes, - typing.Optional[typing.Dict[str, typing.Any]], + typing.Optional[typing.Tuple], typing.Optional[Exception], float]] = [] soap_call_infos.extend([ (name, request_args, raw_response, decoded_response, soap_error, ts) @@ -241,7 +239,7 @@ class Gateway: task.exception() except asyncio.CancelledError: pass - results: typing.List[asyncio.Future['Gateway']] = list(done) + results: typing.List['asyncio.Future[Gateway]'] = list(done) return results[0].result() async def discover_commands(self, loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None: diff --git a/aioupnp/interfaces.py b/aioupnp/interfaces.py index 78f11ad..6a03e3d 100644 --- a/aioupnp/interfaces.py +++ b/aioupnp/interfaces.py @@ -8,15 +8,17 @@ def get_netifaces(): return netifaces -def ifaddresses(iface: str): +def ifaddresses(iface: str) -> typing.Dict[int, typing.List[typing.Dict[str, str]]]: return get_netifaces().ifaddresses(iface) -def _get_interfaces(): +def _get_interfaces() -> typing.List[str]: return get_netifaces().interfaces() -def _get_gateways(): +def _get_gateways() -> typing.Dict[typing.Union[str, int], + typing.Union[typing.Dict[int, typing.Tuple[str, str]], + typing.List[typing.Tuple[str, str, bool]]]]: return get_netifaces().gateways() @@ -26,7 +28,7 @@ def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]: assert isinstance(infos, list), TypeError(f"expected list from netifaces, got a dict") interface_infos: typing.List[typing.Tuple[str, str, bool]] = infos result: typing.Dict[str, typing.Tuple[str, str]] = OrderedDict( - (interface_name, (router_address, ifaddresses(interface_name)[netifaces.AF_INET][0]['addr'])) + (interface_name, (router_address, ifaddresses(interface_name)[socket.AF_INET][0]['addr'])) for router_address, interface_name, _ in interface_infos ) for interface_name in _get_interfaces(): @@ -40,7 +42,7 @@ def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]: _default = gateways['default'] assert isinstance(_default, dict), TypeError(f"expected dict from netifaces, got a list") default: typing.Dict[int, typing.Tuple[str, str]] = _default - result['default'] = result[default[netifaces.AF_INET][1]] + result['default'] = result[default[socket.AF_INET][1]] return result diff --git a/aioupnp/protocols/multicast.py b/aioupnp/protocols/multicast.py index 452b304..4aadb2e 100644 --- a/aioupnp/protocols/multicast.py +++ b/aioupnp/protocols/multicast.py @@ -10,7 +10,7 @@ def _get_sock(transport: typing.Optional[BaseTransport]) -> typing.Optional[sock if transport is None or not hasattr(transport, "_extra"): return None sock: typing.Optional[socket.socket] = transport.get_extra_info('socket', None) - assert sock is None or isinstance(sock, socket.SocketType) or isinstance(sock, mock.MagicMock) + assert sock is None or isinstance(sock, (socket.SocketType, mock.MagicMock)) return sock diff --git a/aioupnp/protocols/scpd.py b/aioupnp/protocols/scpd.py index 3543ba8..9412946 100644 --- a/aioupnp/protocols/scpd.py +++ b/aioupnp/protocols/scpd.py @@ -101,11 +101,11 @@ class SCPDHTTPClientProtocol(Protocol): async def scpd_get(control_url: str, address: str, port: int, - loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> typing.Tuple[typing.Dict[str, typing.Any], bytes, - typing.Optional[Exception]]: + loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> typing.Tuple[ + typing.Dict[str, typing.Any], bytes, typing.Optional[Exception]]: loop = loop or asyncio.get_event_loop() packet = serialize_scpd_get(control_url, address) - finished: asyncio.Future[typing.Tuple[bytes, int, bytes]] = asyncio.Future(loop=loop) + finished: 'asyncio.Future[typing.Tuple[bytes, int, bytes]]' = asyncio.Future(loop=loop) proto_factory: typing.Callable[[], SCPDHTTPClientProtocol] = lambda: SCPDHTTPClientProtocol(packet, finished) connect_tup: typing.Tuple[asyncio.BaseTransport, asyncio.BaseProtocol] = await loop.create_connection( proto_factory, address, port @@ -140,7 +140,7 @@ async def scpd_post(control_url: str, address: str, port: int, method: str, para **kwargs: typing.Dict[str, typing.Any] ) -> typing.Tuple[typing.Dict, bytes, typing.Optional[Exception]]: loop = loop or asyncio.get_event_loop() - finished: asyncio.Future[typing.Tuple[bytes, int, bytes]] = asyncio.Future(loop=loop) + finished: 'asyncio.Future[typing.Tuple[bytes, int, bytes]]' = asyncio.Future(loop=loop) packet = serialize_soap_post(method, param_names, service_id, address.encode(), control_url.encode(), **kwargs) proto_factory: typing.Callable[[], SCPDHTTPClientProtocol] = lambda:\ SCPDHTTPClientProtocol(packet, finished, soap_method=method, soap_service_id=service_id.decode()) @@ -159,6 +159,7 @@ async def scpd_post(control_url: str, address: str, port: int, method: str, para except UPnPError as err: return {}, protocol.response_buff, err finally: + # raw_response = protocol.response_buff transport.close() try: return ( diff --git a/aioupnp/protocols/ssdp.py b/aioupnp/protocols/ssdp.py index a43e117..846f1ce 100644 --- a/aioupnp/protocols/ssdp.py +++ b/aioupnp/protocols/ssdp.py @@ -4,7 +4,6 @@ import asyncio import logging import typing import socket -from collections import OrderedDict from asyncio.transports import DatagramTransport from aioupnp.fault import UPnPError from aioupnp.serialization.ssdp import SSDPDatagram @@ -25,7 +24,7 @@ class SSDPProtocol(MulticastProtocol): self.transport: typing.Optional[DatagramTransport] = None self._unicast = unicast self._ignored: typing.Set[str] = ignored or set() # ignored locations - self._pending_searches: typing.List[typing.Tuple[str, str, asyncio.Future[SSDPDatagram], asyncio.Handle]] = [] + self._pending_searches: typing.List[typing.Tuple[str, str, 'asyncio.Future[SSDPDatagram]', asyncio.Handle]] = [] self.notifications: typing.List[SSDPDatagram] = [] self.connected = asyncio.Event(loop=self.loop) @@ -54,12 +53,12 @@ class SSDPProtocol(MulticastProtocol): if packet.location in self._ignored: return None # TODO: fix this - tmp: typing.List[typing.Tuple[str, str, asyncio.Future[SSDPDatagram], asyncio.Handle]] = [] - set_futures: typing.List[asyncio.Future[SSDPDatagram]] = [] + tmp: typing.List[typing.Tuple[str, str, 'asyncio.Future[SSDPDatagram]', asyncio.Handle]] = [] + set_futures: typing.List['asyncio.Future[SSDPDatagram]'] = [] while len(self._pending_searches): - t: typing.Tuple[str, str, asyncio.Future[SSDPDatagram], asyncio.Handle] = self._pending_searches.pop() + t: typing.Tuple[str, str, 'asyncio.Future[SSDPDatagram]', asyncio.Handle] = self._pending_searches.pop() if (address == t[0]) and (t[1] in [packet.st, "upnp:rootdevice"]): - f: asyncio.Future[SSDPDatagram] = t[2] + f = t[2] if f not in set_futures: set_futures.append(f) if not f.done(): @@ -80,7 +79,7 @@ class SSDPProtocol(MulticastProtocol): async def m_search(self, address: str, timeout: float, datagrams: typing.List[typing.Dict[str, typing.Union[str, int]]]) -> SSDPDatagram: - fut: asyncio.Future[SSDPDatagram] = asyncio.Future(loop=self.loop) + fut: 'asyncio.Future[SSDPDatagram]' = asyncio.Future(loop=self.loop) for datagram in datagrams: packet = SSDPDatagram("M-SEARCH", datagram) assert packet.st is not None diff --git a/aioupnp/upnp.py b/aioupnp/upnp.py index e16bb0f..7e8a27f 100644 --- a/aioupnp/upnp.py +++ b/aioupnp/upnp.py @@ -1,11 +1,11 @@ -import os +# import os +# import zlib +# import base64 import logging import json import asyncio -import zlib -import base64 from collections import OrderedDict -from typing import Tuple, Dict, List, Union, Optional, Callable +from typing import Tuple, Dict, List, Union, Optional from aioupnp.fault import UPnPError from aioupnp.gateway import Gateway from aioupnp.interfaces import get_gateway_and_lan_addresses @@ -49,7 +49,8 @@ class UPnP: @classmethod async def discover(cls, lan_address: str = '', gateway_address: str = '', timeout: int = 30, - igd_args: OrderedDict = None, interface_name: str = 'default', loop=None): + igd_args: Optional[Dict[str, Union[str, int]]] = None, interface_name: str = 'default', + loop: Optional[asyncio.AbstractEventLoop] = None) -> 'UPnP': try: lan_address, gateway_address = cls.get_lan_and_gateway(lan_address, gateway_address, interface_name) except Exception as err: @@ -64,7 +65,8 @@ class UPnP: async def m_search(cls, lan_address: str = '', gateway_address: str = '', timeout: int = 1, igd_args: Optional[Dict[str, Union[int, str]]] = None, unicast: bool = True, interface_name: str = 'default', - loop=None) -> Dict[str, Union[str, Dict[str, Union[int, str]]]]: + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> Dict[str, Union[str, Dict[str, Union[int, str]]]]: if not lan_address or not gateway_address: try: lan_address, gateway_address = cls.get_lan_and_gateway(lan_address, gateway_address, interface_name) @@ -98,16 +100,16 @@ class UPnP: ) @cli - async def get_port_mapping_by_index(self, index: int) -> Dict: - result = await self._get_port_mapping_by_index(index) - if result: - if self.gateway.commands.is_registered('GetGenericPortMappingEntry'): - return { - k: v for k, v in zip(self.gateway.commands.GetGenericPortMappingEntry.return_order, result) - } - return {} + async def get_port_mapping_by_index(self, index: int): + return await self._get_port_mapping_by_index(index) + # if result: + # if self.gateway.commands.is_registered('GetGenericPortMappingEntry'): + # return { + # k: v for k, v in zip(self.gateway.commands.GetGenericPortMappingEntry.return_order, result) + # } + # return {} - async def _get_port_mapping_by_index(self, index: int) -> Union[None, Tuple[Optional[str], int, str, + async def _get_port_mapping_by_index(self, index: int) -> Optional[Tuple[Optional[str], int, str, int, str, bool, str, int]]: try: redirect = await self.gateway.commands.GetGenericPortMappingEntry(NewPortMappingIndex=index) @@ -127,22 +129,20 @@ class UPnP: return redirects @cli - async def get_specific_port_mapping(self, external_port: int, protocol: str) -> Dict: + async def get_specific_port_mapping(self, external_port: int, protocol: str): """ :param external_port: (int) external port to listen on :param protocol: (str) 'UDP' | 'TCP' :return: (int) , (str) , (bool) , (str) , (int) """ - try: - result = await self.gateway.commands.GetSpecificPortMappingEntry( - NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol - ) - if result and self.gateway.commands.is_registered('GetSpecificPortMappingEntry'): - return {k: v for k, v in zip(self.gateway.commands.GetSpecificPortMappingEntry.return_order, result)} - except UPnPError: - pass - return {} + # try: + return await self.gateway.commands.GetSpecificPortMappingEntry( + NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol + ) + # except UPnPError: + # pass + # return {} @cli async def delete_port_mapping(self, external_port: int, protocol: str) -> None: @@ -193,156 +193,103 @@ class UPnP: "client_address": self.lan_address, }, default=_encode, indent=2) - @property - def zipped_debugging_info(self) -> str: - return base64.b64encode(zlib.compress( - json.dumps({ - "gateway": self.gateway.debug_gateway(), - "client_address": self.lan_address, - }, default=_encode, indent=2).encode() - )).decode() - - @cli - async def generate_test_data(self): - print("found gateway via M-SEARCH") - try: - external_ip = await self.get_external_ip() - print("got external ip: %s" % external_ip) - except (UPnPError, NotImplementedError): - print("failed to get the external ip") - try: - await self.get_redirects() - print("got redirects") - except (UPnPError, NotImplementedError): - print("failed to get redirects") - try: - await self.get_specific_port_mapping(4567, "UDP") - print("got specific mapping") - except (UPnPError, NotImplementedError): - print("failed to get specific mapping") - try: - ext_port = await self.get_next_mapping(4567, "UDP", "aioupnp test mapping") - print("set up external mapping to port %i" % ext_port) - try: - await self.get_specific_port_mapping(4567, "UDP") - print("got specific mapping") - except (UPnPError, NotImplementedError): - print("failed to get specific mapping") - try: - await self.get_redirects() - print("got redirects") - except (UPnPError, NotImplementedError): - print("failed to get redirects") - await self.delete_port_mapping(ext_port, "UDP") - print("deleted mapping") - except (UPnPError, NotImplementedError): - print("failed to add and remove a mapping") - try: - await self.get_redirects() - print("got redirects") - except (UPnPError, NotImplementedError): - print("failed to get redirects") - try: - await self.get_specific_port_mapping(4567, "UDP") - print("got specific mapping") - except (UPnPError, NotImplementedError): - print("failed to get specific mapping") - if self.gateway.devices: - device = list(self.gateway.devices.values())[0] - assert device.manufacturer and device.modelName - device_path = os.path.join(os.getcwd(), self.gateway.manufacturer_string) - else: - device_path = os.path.join(os.getcwd(), "UNKNOWN GATEWAY") - with open(device_path, "w") as f: - f.write(await self.debug_gateway()) - return "Generated test data! -> %s" % device_path - - @cli - async def get_natrsip_status(self) -> Tuple[bool, bool]: - """Returns (NewRSIPAvailable, NewNATEnabled)""" - return await self.gateway.commands.GetNATRSIPStatus() - - @cli - async def set_connection_type(self, NewConnectionType: str) -> None: - """Returns None""" - return await self.gateway.commands.SetConnectionType(NewConnectionType) - - @cli - async def get_connection_type_info(self) -> Tuple[str, str]: - """Returns (NewConnectionType, NewPossibleConnectionTypes)""" - return await self.gateway.commands.GetConnectionTypeInfo() - - @cli - async def get_status_info(self) -> Tuple[str, str, int]: - """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)""" - return await self.gateway.commands.GetStatusInfo() - - @cli - async def force_termination(self) -> None: - """Returns None""" - return await self.gateway.commands.ForceTermination() - - @cli - async def request_connection(self) -> None: - """Returns None""" - return await self.gateway.commands.RequestConnection() - - @cli - async def get_common_link_properties(self): - """Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate, NewPhysicalLinkStatus)""" - return await self.gateway.commands.GetCommonLinkProperties() - - @cli - async def get_total_bytes_sent(self): - """Returns (NewTotalBytesSent)""" - return await self.gateway.commands.GetTotalBytesSent() - - @cli - async def get_total_bytes_received(self): - """Returns (NewTotalBytesReceived)""" - return await self.gateway.commands.GetTotalBytesReceived() - - @cli - async def get_total_packets_sent(self): - """Returns (NewTotalPacketsSent)""" - return await self.gateway.commands.GetTotalPacketsSent() - - @cli - async def get_total_packets_received(self): - """Returns (NewTotalPacketsReceived)""" - return await self.gateway.commands.GetTotalPacketsReceived() - - @cli - async def x_get_ics_statistics(self) -> Tuple[int, int, int, int, str, str]: - """Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived, Layer1DownstreamMaxBitRate, Uptime)""" - return await self.gateway.commands.X_GetICSStatistics() - - @cli - async def get_default_connection_service(self): - """Returns (NewDefaultConnectionService)""" - return await self.gateway.commands.GetDefaultConnectionService() - - @cli - async def set_default_connection_service(self, NewDefaultConnectionService: str) -> None: - """Returns (None)""" - return await self.gateway.commands.SetDefaultConnectionService(NewDefaultConnectionService) - - @cli - async def set_enabled_for_internet(self, NewEnabledForInternet: bool) -> None: - return await self.gateway.commands.SetEnabledForInternet(NewEnabledForInternet) - - @cli - async def get_enabled_for_internet(self) -> bool: - return await self.gateway.commands.GetEnabledForInternet() - - @cli - async def get_maximum_active_connections(self, NewActiveConnectionIndex: int): - return await self.gateway.commands.GetMaximumActiveConnections(NewActiveConnectionIndex) - - @cli - async def get_active_connections(self) -> Tuple[str, str]: - """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID""" - return await self.gateway.commands.GetActiveConnections() + # @property + # def zipped_debugging_info(self) -> str: + # return base64.b64encode(zlib.compress( + # json.dumps({ + # "gateway": self.gateway.debug_gateway(), + # "client_address": self.lan_address, + # }, default=_encode, indent=2).encode() + # )).decode() + # + # @cli + # async def get_natrsip_status(self) -> Tuple[bool, bool]: + # """Returns (NewRSIPAvailable, NewNATEnabled)""" + # return await self.gateway.commands.GetNATRSIPStatus() + # + # @cli + # async def set_connection_type(self, NewConnectionType: str) -> None: + # """Returns None""" + # return await self.gateway.commands.SetConnectionType(NewConnectionType) + # + # @cli + # async def get_connection_type_info(self) -> Tuple[str, str]: + # """Returns (NewConnectionType, NewPossibleConnectionTypes)""" + # return await self.gateway.commands.GetConnectionTypeInfo() + # + # @cli + # async def get_status_info(self) -> Tuple[str, str, int]: + # """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)""" + # return await self.gateway.commands.GetStatusInfo() + # + # @cli + # async def force_termination(self) -> None: + # """Returns None""" + # return await self.gateway.commands.ForceTermination() + # + # @cli + # async def request_connection(self) -> None: + # """Returns None""" + # return await self.gateway.commands.RequestConnection() + # + # @cli + # async def get_common_link_properties(self): + # """Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate, + # NewPhysicalLinkStatus)""" + # return await self.gateway.commands.GetCommonLinkProperties() + # + # @cli + # async def get_total_bytes_sent(self) -> int: + # """Returns (NewTotalBytesSent)""" + # return await self.gateway.commands.GetTotalBytesSent() + # + # @cli + # async def get_total_bytes_received(self): + # """Returns (NewTotalBytesReceived)""" + # return await self.gateway.commands.GetTotalBytesReceived() + # + # @cli + # async def get_total_packets_sent(self): + # """Returns (NewTotalPacketsSent)""" + # return await self.gateway.commands.GetTotalPacketsSent() + # + # @cli + # async def get_total_packets_received(self): + # """Returns (NewTotalPacketsReceived)""" + # return await self.gateway.commands.GetTotalPacketsReceived() + # + # @cli + # async def x_get_ics_statistics(self) -> Tuple[int, int, int, int, str, str]: + # """Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived, + # Layer1DownstreamMaxBitRate, Uptime)""" + # return await self.gateway.commands.X_GetICSStatistics() + # + # @cli + # async def get_default_connection_service(self): + # """Returns (NewDefaultConnectionService)""" + # return await self.gateway.commands.GetDefaultConnectionService() + # + # @cli + # async def set_default_connection_service(self, NewDefaultConnectionService: str) -> None: + # """Returns (None)""" + # return await self.gateway.commands.SetDefaultConnectionService(NewDefaultConnectionService) + # + # @cli + # async def set_enabled_for_internet(self, NewEnabledForInternet: bool) -> None: + # return await self.gateway.commands.SetEnabledForInternet(NewEnabledForInternet) + # + # @cli + # async def get_enabled_for_internet(self) -> bool: + # return await self.gateway.commands.GetEnabledForInternet() + # + # @cli + # async def get_maximum_active_connections(self, NewActiveConnectionIndex: int): + # return await self.gateway.commands.GetMaximumActiveConnections(NewActiveConnectionIndex) + # + # @cli + # async def get_active_connections(self) -> Tuple[str, str]: + # """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID""" + # return await self.gateway.commands.GetActiveConnections() @classmethod def run_cli(cls, method, igd_args: Dict[str, Union[bool, str, int]], lan_address: str = '', @@ -361,8 +308,8 @@ class UPnP: kwargs = kwargs or {} igd_args = igd_args timeout = int(timeout) - loop = loop or asyncio.get_event_loop_policy().get_event_loop() - fut: asyncio.Future = loop.create_future() + loop = loop or asyncio.get_event_loop() + fut: 'asyncio.Future' = asyncio.Future(loop=loop) async def wrapper(): # wrap the upnp setup and call of the command in a coroutine @@ -404,6 +351,6 @@ class UPnP: return if isinstance(result, (list, tuple, dict)): - print(json.dumps(result, indent=2, default=_encode)) + print(json.dumps(result, indent=2)) else: print(result) diff --git a/aioupnp/util.py b/aioupnp/util.py index 3b19504..7553639 100644 --- a/aioupnp/util.py +++ b/aioupnp/util.py @@ -36,7 +36,8 @@ def flatten_keys(to_flatten: str_any_dict, strip: str) -> str_any_dict: return copy -def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyStr], key: typing.AnyStr) -> typing.Optional[typing.AnyStr]: +def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyStr], + key: typing.AnyStr) -> typing.Optional[typing.AnyStr]: match: typing.List[typing.AnyStr] = list(filter(lambda x: x.lower() == key.lower(), source.keys())) if not len(match): return None diff --git a/tests/__init__.py b/tests/__init__.py index 51ce45a..9d87eeb 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -29,6 +29,9 @@ def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_r sent_tcp_packets.append(data) if data in tcp_replies: loop.call_later(tcp_delay_reply, p.data_received, tcp_replies[data]) + return + else: + pass return _write diff --git a/tests/generate_test.py b/tests/generate_test.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_gateway.py b/tests/test_gateway.py index 6366e27..1680ed4 100644 --- a/tests/test_gateway.py +++ b/tests/test_gateway.py @@ -171,20 +171,20 @@ class TestDiscoverDLinkDIR890L(AsyncioTestCase): } expected_commands = { - 'GetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1', - 'SetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1', - 'GetCommonLinkProperties': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', - 'GetTotalBytesSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', - 'GetTotalBytesReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', - 'GetTotalPacketsSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', - 'GetTotalPacketsReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', - 'X_GetICSStatistics': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', - 'SetConnectionType': 'urn:schemas-upnp-org:service:WANIPConnection:1', - 'GetConnectionTypeInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1', - 'RequestConnection': 'urn:schemas-upnp-org:service:WANIPConnection:1', - 'ForceTermination': 'urn:schemas-upnp-org:service:WANIPConnection:1', - 'GetStatusInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1', - 'GetNATRSIPStatus': 'urn:schemas-upnp-org:service:WANIPConnection:1', + # 'GetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1', + # 'SetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1', + # 'GetCommonLinkProperties': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', + # 'GetTotalBytesSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', + # 'GetTotalBytesReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', + # 'GetTotalPacketsSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', + # 'GetTotalPacketsReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', + # 'X_GetICSStatistics': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1', + # 'SetConnectionType': 'urn:schemas-upnp-org:service:WANIPConnection:1', + # 'GetConnectionTypeInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1', + # 'RequestConnection': 'urn:schemas-upnp-org:service:WANIPConnection:1', + # 'ForceTermination': 'urn:schemas-upnp-org:service:WANIPConnection:1', + # 'GetStatusInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1', + # 'GetNATRSIPStatus': 'urn:schemas-upnp-org:service:WANIPConnection:1', 'GetGenericPortMappingEntry': 'urn:schemas-upnp-org:service:WANIPConnection:1', 'GetSpecificPortMappingEntry': 'urn:schemas-upnp-org:service:WANIPConnection:1', 'AddPortMapping': 'urn:schemas-upnp-org:service:WANIPConnection:1', @@ -238,22 +238,22 @@ class TestDiscoverNetgearNighthawkAC2350(TestDiscoverDLinkDIR890L): } expected_commands = { - "SetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1", - "GetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1", - "GetCommonLinkProperties": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", - "GetTotalBytesSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", - "GetTotalBytesReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", - "GetTotalPacketsSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", - "GetTotalPacketsReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", + # "SetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1", + # "GetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1", + # "GetCommonLinkProperties": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", + # "GetTotalBytesSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", + # "GetTotalBytesReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", + # "GetTotalPacketsSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", + # "GetTotalPacketsReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1", "AddPortMapping": "urn:schemas-upnp-org:service:WANIPConnection:1", "GetExternalIPAddress": "urn:schemas-upnp-org:service:WANIPConnection:1", "DeletePortMapping": "urn:schemas-upnp-org:service:WANIPConnection:1", - "SetConnectionType": "urn:schemas-upnp-org:service:WANIPConnection:1", - "GetConnectionTypeInfo": "urn:schemas-upnp-org:service:WANIPConnection:1", - "RequestConnection": "urn:schemas-upnp-org:service:WANIPConnection:1", - "ForceTermination": "urn:schemas-upnp-org:service:WANIPConnection:1", - "GetStatusInfo": "urn:schemas-upnp-org:service:WANIPConnection:1", - "GetNATRSIPStatus": "urn:schemas-upnp-org:service:WANIPConnection:1", + # "SetConnectionType": "urn:schemas-upnp-org:service:WANIPConnection:1", + # "GetConnectionTypeInfo": "urn:schemas-upnp-org:service:WANIPConnection:1", + # "RequestConnection": "urn:schemas-upnp-org:service:WANIPConnection:1", + # "ForceTermination": "urn:schemas-upnp-org:service:WANIPConnection:1", + # "GetStatusInfo": "urn:schemas-upnp-org:service:WANIPConnection:1", + # "GetNATRSIPStatus": "urn:schemas-upnp-org:service:WANIPConnection:1", "GetGenericPortMappingEntry": "urn:schemas-upnp-org:service:WANIPConnection:1", "GetSpecificPortMappingEntry": "urn:schemas-upnp-org:service:WANIPConnection:1" } diff --git a/tests/test_upnp.py b/tests/test_upnp.py new file mode 100644 index 0000000..b6a2537 --- /dev/null +++ b/tests/test_upnp.py @@ -0,0 +1,90 @@ +from tests import AsyncioTestCase, mock_tcp_and_udp +from collections import OrderedDict +from aioupnp.upnp import UPnP +from aioupnp.gateway import Gateway +from aioupnp.serialization.ssdp import SSDPDatagram + + +class UPnPCommandTestCase(AsyncioTestCase): + gateway_address = "11.2.3.4" + client_address = "11.2.3.5" + soap_port = 49152 + + m_search_args = OrderedDict([ + ("HOST", "239.255.255.250:1900"), + ("MAN", "ssdp:discover"), + ("MX", 1), + ("ST", "urn:schemas-upnp-org:device:WANDevice:1") + ]) + + reply = SSDPDatagram("OK", OrderedDict([ + ("CACHE_CONTROL", "max-age=1800"), + ("LOCATION", "http://11.2.3.4:49152/InternetGatewayDevice.xml"), + ("SERVER", "Linux, UPnP/1.0, DIR-890L Ver 1.20"), + ("ST", "urn:schemas-upnp-org:device:WANDevice:1"), + ("USN", "uuid:11111111-2222-3333-4444-555555555555::urn:schemas-upnp-org:device:WANDevice:1") + ])) + + replies = { + b'GET /InternetGatewayDevice.xml HTTP/1.1\r\nAccept-Encoding: gzip\r\nHost: 11.2.3.4\r\nConnection: Close\r\n\r\n': b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Thu, 11 Oct 2018 22:16:16 GMT\r\nContent-Type: text/xml\r\nContent-Length: 3921\r\nLast-Modified: Thu, 09 Aug 2018 12:41:07 GMT\r\nConnection: close\r\n\r\n\n\n\t\n\t\t1\n\t\t0\n\t\n\thttp://11.2.3.4:49152\n\t\n\t\turn:schemas-upnp-org:device:InternetGatewayDevice:1\n\t\tWireless Broadband Router\n\t\tD-Link Corporation\n\t\thttp://www.dlink.com\n\t\tD-Link Router\n\t\tD-Link Router\n\t\tDIR-890L\n\t\thttp://www.dlink.com\n\t\t120\n\t\tuuid:11111111-2222-3333-4444-555555555555\n\t\t\n\t\t\t\n\t\t\t\timage/gif\n\t\t\t\t118\n\t\t\t\t119\n\t\t\t\t8\n\t\t\t\t/ligd.gif\n\t\t\t\n\t\t\n\t\t\n\t\t\t\n\t\t\t\turn:schemas-microsoft-com:service:OSInfo:1\n\t\t\t\turn:microsoft-com:serviceId:OSInfo1\n\t\t\t\t/soap.cgi?service=OSInfo1\n\t\t\t\t/gena.cgi?service=OSInfo1\n\t\t\t\t/OSInfo.xml\n\t\t\t\n\t\t\t\n\t\t\t\turn:schemas-upnp-org:service:Layer3Forwarding:1\n\t\t\t\turn:upnp-org:serviceId:L3Forwarding1\n\t\t\t\t/soap.cgi?service=L3Forwarding1\n\t\t\t\t/gena.cgi?service=L3Forwarding1\n\t\t\t\t/Layer3Forwarding.xml\n\t\t\t\n\t\t\n\t\t\n\t\t\t\n\t\t\t\turn:schemas-upnp-org:device:WANDevice:1\n\t\t\t\tWANDevice\n\t\t\t\tD-Link\n\t\t\t\thttp://www.dlink.com\n\t\t\t\tWANDevice\n\t\t\t\tDIR-890L\n\t\t\t\t1\n\t\t\t\thttp://www.dlink.com\n\t\t\t\t120\n\t\t\t\tuuid:11111111-2222-3333-4444-555555555555\n\t\t\t\t\n\t\t\t\t\t\n\t\t\t\t\t\turn:schemas-upnp-org:service:WANCommonInterfaceConfig:1\n\t\t\t\t\t\turn:upnp-org:serviceId:WANCommonIFC1\n\t\t\t\t\t\t/soap.cgi?service=WANCommonIFC1\n\t\t\t\t\t\t/gena.cgi?service=WANCommonIFC1\n\t\t\t\t\t\t/WANCommonInterfaceConfig.xml\n\t\t\t\t\t\n\t\t\t\t\n\t\t\t\t\n\t\t\t\t\t\n\t\t\t\t\t\turn:schemas-upnp-org:device:WANConnectionDevice:1\n\t\t\t\t\t\tWANConnectionDevice\n\t\t\t\t\t\tD-Link\n\t\t\t\t\t\thttp://www.dlink.com\n\t\t\t\t\t\tWanConnectionDevice\n\t\t\t\t\t\tDIR-890L\n\t\t\t\t\t\t1\n\t\t\t\t\t\thttp://www.dlink.com\n\t\t\t\t\t\t120\n\t\t\t\t\t\tuuid:11111111-2222-3333-4444-555555555555\n\t\t\t\t\t\t\n\t\t\t\t\t\t\t\n\t\t\t\t\t\t\t\turn:schemas-upnp-org:service:WANEthernetLinkConfig:1\n\t\t\t\t\t\t\t\turn:upnp-org:serviceId:WANEthLinkC1\n\t\t\t\t\t\t\t\t/soap.cgi?service=WANEthLinkC1\n\t\t\t\t\t\t\t\t/gena.cgi?service=WANEthLinkC1\n\t\t\t\t\t\t\t\t/WANEthernetLinkConfig.xml\n\t\t\t\t\t\t\t\n\t\t\t\t\t\t\t\n\t\t\t\t\t\t\t\turn:schemas-upnp-org:service:WANIPConnection:1\n\t\t\t\t\t\t\t\turn:upnp-org:serviceId:WANIPConn1\n\t\t\t\t\t\t\t\t/soap.cgi?service=WANIPConn1\n\t\t\t\t\t\t\t\t/gena.cgi?service=WANIPConn1\n\t\t\t\t\t\t\t\t/WANIPConnection.xml\n\t\t\t\t\t\t\t\n\t\t\t\t\t\t\n\t\t\t\t\t\n\t\t\t\t\n\t\t\t\n\t\t\n\t\thttp://11.2.3.4\n\t\n\n", + b'GET /OSInfo.xml HTTP/1.1\r\nAccept-Encoding: gzip\r\nHost: 11.2.3.4\r\nConnection: Close\r\n\r\n': b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Thu, 11 Oct 2018 22:16:16 GMT\r\nContent-Type: text/xml\r\nContent-Length: 219\r\nLast-Modified: Thu, 09 Aug 2018 12:41:07 GMT\r\nConnection: close\r\n\r\n\n\n\t\n\t\t1\n\t\t0\n\t\n\t\n\t\n\t\n\t\n\n", + b'GET /Layer3Forwarding.xml HTTP/1.1\r\nAccept-Encoding: gzip\r\nHost: 11.2.3.4\r\nConnection: Close\r\n\r\n': b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Thu, 11 Oct 2018 22:16:16 GMT\r\nContent-Type: text/xml\r\nContent-Length: 920\r\nLast-Modified: Thu, 09 Aug 2018 12:41:07 GMT\r\nConnection: close\r\n\r\n\n\n\t\n\t\t1\n\t\t0\n\t\n\t\n\t\t\n\t\t\tGetDefaultConnectionService\n\t\t\t\n\t\t\t\t\n\t\t\t\t\tNewDefaultConnectionService\n\t\t\t\t\tout\n\t\t\t\t\tDefaultConnectionService\n\t\t\t\t\n\t\t\t\n\t\t\n\t\t\n\t\t\tSetDefaultConnectionService\n\t\t\t\n\t\t\t\t\n\t\t\t\t\tNewDefaultConnectionService\n\t\t\t\t\tin\n\t\t\t\t\tDefaultConnectionService\n\t\t\t\t\n\t\t\t\n\t\t\n\t\n\t\n\t\t\n\t\t\tDefaultConnectionService\n\t\t\tstring\n\t\t\n\t\n\n", + b'GET /WANCommonInterfaceConfig.xml HTTP/1.1\r\nAccept-Encoding: gzip\r\nHost: 11.2.3.4\r\nConnection: Close\r\n\r\n': b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Thu, 11 Oct 2018 22:16:16 GMT\r\nContent-Type: text/xml\r\nContent-Length: 5343\r\nLast-Modified: Thu, 09 Aug 2018 12:41:07 GMT\r\nConnection: close\r\n\r\n\r\n\r\n\t\r\n\t\t1\r\n\t\t0\r\n\t\r\n\t\r\n\t\t\r\n\t\t\tGetCommonLinkProperties\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewWANAccessType\r\n\t\t\t\t\tout\r\n\t\t\t\t\tWANAccessType\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewLayer1UpstreamMaxBitRate\r\n\t\t\t\t\tout\r\n\t\t\t\t\tLayer1UpstreamMaxBitRate\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewLayer1DownstreamMaxBitRate\r\n\t\t\t\t\tout\r\n\t\t\t\t\tLayer1DownstreamMaxBitRate\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewPhysicalLinkStatus\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPhysicalLinkStatus\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetTotalBytesSent\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewTotalBytesSent\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalBytesSent\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetTotalBytesReceived\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewTotalBytesReceived\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalBytesReceived\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetTotalPacketsSent\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewTotalPacketsSent\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalPacketsSent\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetTotalPacketsReceived\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewTotalPacketsReceived\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalPacketsReceived\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tX_GetICSStatistics\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tTotalBytesSent\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalBytesSent\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tTotalBytesReceived\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalBytesReceived\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tTotalPacketsSent\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalPacketsSent\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tTotalPacketsReceived\r\n\t\t\t\t\tout\r\n\t\t\t\t\tTotalPacketsReceived\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tLayer1DownstreamMaxBitRate\r\n\t\t\t\t\tout\r\n\t\t\t\t\tLayer1DownstreamMaxBitRate\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tUptime\r\n\t\t\t\t\tout\r\n\t\t\t\t\tX_Uptime\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\r\n\t\r\n\t\t\r\n\t\t\tWANAccessType\r\n\t\t\tstring\r\n\t\t\t\r\n\t\t\t\tDSL\r\n\t\t\t\tPOTS\r\n\t\t\t\tCable\r\n\t\t\t\tEthernet\r\n\t\t\t\tOther\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tLayer1UpstreamMaxBitRate\r\n\t\t\tui4\r\n\t\t\r\n\t\t\r\n\t\t\tLayer1DownstreamMaxBitRate\r\n\t\t\tui4\r\n\t\t\r\n\t\t\r\n\t\t\tPhysicalLinkStatus\r\n\t\t\tstring\r\n\t\t\t\r\n\t\t\t\tUp\r\n\t\t\t\tDown\r\n\t\t\t\tInitializing\r\n\t\t\t\tUnavailable\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tWANAccessProvider\r\n\t\t\tstring\r\n\t\t\r\n\t\t\r\n\t\t\tMaximumActiveConnections\r\n\t\t\tui2\r\n\t\t\t\r\n\t\t\t\t1\r\n\t\t\t\t\r\n\t\t\t\t1\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tTotalBytesSent\r\n\t\t\tui4\r\n\t\t\r\n\t\t\r\n\t\t\tTotalBytesReceived\r\n\t\t\tui4\r\n\t\t\r\n\t\t\r\n\t\t\tTotalPacketsSent\r\n\t\t\tui4\r\n\t\t\r\n\t\t\r\n\t\t\tTotalPacketsReceived\r\n\t\t\tui4\r\n\t\t\r\n\t\t\r\n\t\t\tX_PersonalFirewallEnabled\r\n\t\t\tboolean\r\n\t\t\r\n\t\t\r\n\t\t\tX_Uptime\r\n\t\t\tui4\r\n\t\t\r\n\t\r\n\r\n", + b'GET /WANEthernetLinkConfig.xml HTTP/1.1\r\nAccept-Encoding: gzip\r\nHost: 11.2.3.4\r\nConnection: Close\r\n\r\n': b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Thu, 11 Oct 2018 22:16:16 GMT\r\nContent-Type: text/xml\r\nContent-Length: 773\r\nLast-Modified: Thu, 09 Aug 2018 12:41:07 GMT\r\nConnection: close\r\n\r\n\n\n\t\n\t\t1\n\t\t0\n\t\n\t\n\t\t\n\t\t\tGetEthernetLinkStatus\n\t\t\t\n\t\t\t\t\n\t\t\t\t\tNewEthernetLinkStatus\n\t\t\t\t\tout\n\t\t\t\t\tEthernetLinkStatus\n\t\t\t\t\n\t\t\t\n\t\t\n\t\n\t\n\t\t\n\t\t\tEthernetLinkStatus\n\t\t\tstring\n\t\t\t\n\t\t\t\tUp\n\t\t\t\tDown\n\t\t\t\tUnavailable\n\t\t\t\n\t\t\n\t\n\n", + b'GET /WANIPConnection.xml HTTP/1.1\r\nAccept-Encoding: gzip\r\nHost: 11.2.3.4\r\nConnection: Close\r\n\r\n': b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Thu, 11 Oct 2018 22:16:16 GMT\r\nContent-Type: text/xml\r\nContent-Length: 12078\r\nLast-Modified: Thu, 09 Aug 2018 12:41:07 GMT\r\nConnection: close\r\n\r\n\r\n\r\n\t\r\n\t\t1\r\n\t\t0\r\n\t\r\n\t\r\n\t\t\r\n\t\t\tSetConnectionType\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewConnectionType\r\n\t\t\t\t\tin\r\n\t\t\t\t\tConnectionType\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t \r\n\t\t\r\n\t\t\tGetConnectionTypeInfo\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewConnectionType\r\n\t\t\t\t\tout\r\n\t\t\t\t\tConnectionType\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewPossibleConnectionTypes\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPossibleConnectionTypes\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tRequestConnection\r\n\t\t\r\n\t\t\r\n\t\t\tForceTermination\r\n\t\t\r\n\t\t\r\n\t\t\tGetStatusInfo\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewConnectionStatus\r\n\t\t\t\t\tout\r\n\t\t\t\t\tConnectionStatus\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewLastConnectionError\r\n\t\t\t\t\tout\r\n\t\t\t\t\tLastConnectionError\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewUptime\r\n\t\t\t\t\tout\r\n\t\t\t\t\tUptime\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetNATRSIPStatus\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewRSIPAvailable\r\n\t\t\t\t\tout\r\n\t\t\t\t\tRSIPAvailable\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewNATEnabled\r\n\t\t\t\t\tout\r\n\t\t\t\t\tNATEnabled\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetGenericPortMappingEntry\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewPortMappingIndex\r\n\t\t\t\t\tin\r\n\t\t\t\t\tPortMappingNumberOfEntries\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewRemoteHost\r\n\t\t\t\t\tout\r\n\t\t\t\t\tRemoteHost\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewExternalPort\r\n\t\t\t\t\tout\r\n\t\t\t\t\tExternalPort\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewProtocol\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPortMappingProtocol\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewInternalPort\r\n\t\t\t\t\tout\r\n\t\t\t\t\tInternalPort\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewInternalClient\r\n\t\t\t\t\tout\r\n\t\t\t\t\tInternalClient\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewEnabled\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPortMappingEnabled\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewPortMappingDescription\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPortMappingDescription\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewLeaseDuration\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPortMappingLeaseDuration\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetSpecificPortMappingEntry\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewRemoteHost\r\n\t\t\t\t\tin\r\n\t\t\t\t\tRemoteHost\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewExternalPort\r\n\t\t\t\t\tin\r\n\t\t\t\t\tExternalPort\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewProtocol\r\n\t\t\t\t\tin\r\n\t\t\t\t\tPortMappingProtocol\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewInternalPort\r\n\t\t\t\t\tout\r\n\t\t\t\t\tInternalPort\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewInternalClient\r\n\t\t\t\t\tout\r\n\t\t\t\t\tInternalClient\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewEnabled\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPortMappingEnabled\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewPortMappingDescription\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPortMappingDescription\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewLeaseDuration\r\n\t\t\t\t\tout\r\n\t\t\t\t\tPortMappingLeaseDuration\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tAddPortMapping\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewRemoteHost\r\n\t\t\t\t\tin\r\n\t\t\t\t\tRemoteHost\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewExternalPort\r\n\t\t\t\t\tin\r\n\t\t\t\t\tExternalPort\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewProtocol\r\n\t\t\t\t\tin\r\n\t\t\t\t\tPortMappingProtocol\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewInternalPort\r\n\t\t\t\t\tin\r\n\t\t\t\t\tInternalPort\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewInternalClient\r\n\t\t\t\t\tin\r\n\t\t\t\t\tInternalClient\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewEnabled\r\n\t\t\t\t\tin\r\n\t\t\t\t\tPortMappingEnabled\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewPortMappingDescription\r\n\t\t\t\t\tin\r\n\t\t\t\t\tPortMappingDescription\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewLeaseDuration\r\n\t\t\t\t\tin\r\n\t\t\t\t\tPortMappingLeaseDuration\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tDeletePortMapping\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewRemoteHost\r\n\t\t\t\t\tin\r\n\t\t\t\t\tRemoteHost\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewExternalPort\r\n\t\t\t\t\tin\r\n\t\t\t\t\tExternalPort\r\n\t\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewProtocol\r\n\t\t\t\t\tin\r\n\t\t\t\t\tPortMappingProtocol\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tGetExternalIPAddress\r\n\t\t\t\r\n\t\t\t\t\r\n\t\t\t\t\tNewExternalIPAddress\r\n\t\t\t\t\tout\r\n\t\t\t\t\tExternalIPAddress\r\n\t\t\t\t\r\n\t\t\t\r\n\t\t\r\n\t\r\n\t\r\n\t\t\r\n\t\t\tConnectionType\r\n\t\t\tstring\r\n\t\t\tUnconfigured\r\n\t\t\r\n\t\t\r\n\t\t\tPossibleConnectionTypes\r\n\t\t\tstring\r\n\t\t\t\r\n\t\t\t\tUnconfigured\r\n\t\t\t\tIP_Routed\r\n\t\t\t\tIP_Bridged\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tConnectionStatus\r\n\t\t\tstring\r\n\t\t\tUnconfigured\r\n\t\t\t\r\n\t\t\t\tUnconfigured\r\n\t\t\t\tConnecting\r\n\t\t\t\tAuthenticating\r\n\t\t\t\tPendingDisconnect\r\n\t\t\t\tDisconnecting\r\n\t\t\t\tDisconnected\r\n\t\t\t\tConnected\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tUptime\r\n\t\t\tui4\r\n\t\t\t0\r\n\t\t\t\r\n\t\t\t\t0\r\n\t\t\t\t\r\n\t\t\t\t1\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tRSIPAvailable\r\n\t\tboolean\r\n\t\t\t0\r\n\t\t\r\n\t\t\r\n\t\t\tNATEnabled\r\n\t\t\tboolean\r\n\t\t\t1\r\n\t\t \r\n\t\t\r\n\t\t\tX_Name\r\n\t\t\tstring\r\n\t\t\r\n\t\t\r\n\t\t\tLastConnectionError\r\n\t\t\tstring\r\n\t\t\tERROR_NONE\r\n\t\t\t\r\n\t\t\t\tERROR_NONE\r\n\t\t\t\tERROR_ISP_TIME_OUT\r\n\t\t\t\tERROR_COMMAND_ABORTED\r\n\t\t\t\tERROR_NOT_ENABLED_FOR_INTERNET\r\n\t\t\t\tERROR_BAD_PHONE_NUMBER\r\n\t\t\t\tERROR_USER_DISCONNECT\r\n\t\t\t\tERROR_ISP_DISCONNECT\r\n\t\t\t\tERROR_IDLE_DISCONNECT\r\n\t\t\t\tERROR_FORCED_DISCONNECT\r\n\t\t\t\tERROR_SERVER_OUT_OF_RESOURCES\r\n\t\t\t\tERROR_RESTRICTED_LOGON_HOURS\r\n\t\t\t\tERROR_ACCOUNT_DISABLED\r\n\t\t\t\tERROR_ACCOUNT_EXPIRED\r\n\t\t\t\tERROR_PASSWORD_EXPIRED\r\n\t\t\t\tERROR_AUTHENTICATION_FAILURE\r\n\t\t\t\tERROR_NO_DIALTONE\r\n\t\t\t\tERROR_NO_CARRIER\r\n\t\t\t\tERROR_NO_ANSWER\r\n\t\t\t\tERROR_LINE_BUSY\r\n\t\t\t\tERROR_UNSUPPORTED_BITSPERSECOND\r\n\t\t\t\tERROR_TOO_MANY_LINE_ERRORS\r\n\t\t\t\tERROR_IP_CONFIGURATION\r\n\t\t\t\tERROR_UNKNOWN\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tExternalIPAddress\r\n\t\t\tstring\r\n\t\t\r\n\t\t\r\n\t\t\tRemoteHost\r\n\t\t\tstring\r\n\t\t\r\n\t\t\r\n\t\t\tExternalPort\r\n\t\t\tui2\r\n\t\t\r\n\t\t\r\n\t\t\tInternalPort\r\n\t\t\tui2\r\n\t\t\r\n\t\t\r\n\t\t\tPortMappingProtocol\r\n\t\t\tstring\r\n\t\t\t\r\n\t\t\t\tTCP\r\n\t\t\t\tUDP\r\n\t\t\t\r\n\t\t\r\n\t\t\r\n\t\t\tInternalClient\r\n\t\t\tstring\r\n\t\t\r\n\t\t\r\n\t\t\tPortMappingDescription\r\n\t\t\tstring\r\n\t\t\r\n\t\t\r\n\t\t\tPortMappingEnabled\r\n\t\t\tboolean\r\n\t\t\r\n\t\t\r\n\t\t\tPortMappingLeaseDuration\r\n\t\t\tui4\r\n\t\t\r\n\t\t\r\n\t\t\tPortMappingNumberOfEntries\r\n\t\t\tui2\r\n\t\t\r\n\t\r\n\r\n", + } + + +class TestGetExternalIPAddress(UPnPCommandTestCase): + client_address = '11.2.3.222' + def setUp(self) -> None: + self.replies.update({b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n\r\n': b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 365 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n11.222.3.44\n\n\t\n\n"}) + + async def test_get_external_ip(self): + with mock_tcp_and_udp(self.loop, tcp_replies=self.replies): + gateway = Gateway(self.reply, self.m_search_args, self.client_address, self.gateway_address) + await gateway.discover_commands(self.loop) + upnp = UPnP(self.client_address, self.gateway_address, gateway) + external_ip = await upnp.get_external_ip() + self.assertEqual("11.222.3.44", external_ip) + + +class TestGetGenericPortMappingEntry(UPnPCommandTestCase): + def setUp(self) -> None: + query = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 341\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetGenericPortMappingEntry"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n0\r\n' + response = b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 663 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n\n\n9308\nUDP\n9308\n11.2.3.44\n1\n11.2.3.44:9308 to 9308 (UDP)\n0\n\n\t\n\n" + + self.replies.update({ + query: response + }) + + async def test_get_port_mapping_by_index(self): + with mock_tcp_and_udp(self.loop, tcp_replies=self.replies): + gateway = Gateway(self.reply, self.m_search_args, self.client_address, self.gateway_address) + await gateway.discover_commands(self.loop) + upnp = UPnP(self.client_address, self.gateway_address, gateway) + result = await upnp.get_port_mapping_by_index(0) + self.assertEqual((None, 9308, 'UDP', 9308, "11.2.3.44", True, "11.2.3.44:9308 to 9308 (UDP)", 0), result) + + +class TestGetNextPortMapping(UPnPCommandTestCase): + client_address = '11.2.3.4' + + def setUp(self) -> None: + self.replies.update({ + b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 341\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetGenericPortMappingEntry"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n0\r\n': b'HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:55:24 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 663 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n\n\n9308\nUDP\n9308\n11.2.3.55\n1\n11.2.3.55:9308 to 9308 (UDP)\n0\n\n\t\n\n', + b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 341\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetGenericPortMappingEntry"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n1\r\n': b'HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:55:24 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 663 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n\n\n9305\nUDP\n9305\n11.2.3.55\n1\n11.2.3.55:9305 to 9305 (UDP)\n0\n\n\t\n\n', + b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 341\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetGenericPortMappingEntry"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n2\r\n': b'HTTP/1.1 500 Internal Server Error\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:55:24 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 482 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n\t\t\ts:Client\n\t\t\tUPnPError\n\t\t\t\n\t\t\t\t\n\t\t\t\t\t713\n\t\t\t\t\tSpecifiedArrayIndexInvalid\n\t\t\t\t\n\t\t\t\n\t\t\n\t\n\n', + b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 598\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#AddPortMapping"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n4567UDP456711.2.3.41aioupnp test mapping0\r\n': b'HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:55:24 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 295 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n\t\n\n', + b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 379\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#DeletePortMapping"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n\r\n4567UDP\r\n': b'HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:55:24 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 301 \r\nEXT:\r\n\r\n\n\n\t\n\t\t\n\t\n\n' + }) + + async def test_get_next_mapping(self): + with mock_tcp_and_udp(self.loop, tcp_replies=self.replies): + gateway = Gateway(self.reply, self.m_search_args, self.client_address, self.gateway_address) + await gateway.discover_commands(self.loop) + upnp = UPnP(self.client_address, self.gateway_address, gateway) + ext_port = await upnp.get_next_mapping(4567, "UDP", "aioupnp test mapping") + self.assertEqual(4567, ext_port) + result = await upnp.delete_port_mapping(ext_port, "UDP") + self.assertEqual(None, result)