pylint, more mypy refactoring, improve tests

This commit is contained in:
Jack Robison 2019-05-21 23:42:03 -04:00
parent 836271c6e0
commit 65dab4ce9f
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
15 changed files with 540 additions and 384 deletions

View file

@ -33,7 +33,7 @@ unsafe-load-any-extension=no
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code
# extension-pkg-whitelist=
extension-pkg-whitelist=netifaces,
# Allow optimization of some AST trees. This will activate a peephole AST
# optimizer, which will apply various small optimizations. For instance, it can
@ -123,7 +123,9 @@ disable=
assignment-from-no-return,
useless-return,
assignment-from-none,
stop-iteration-return
stop-iteration-return,
unsubscriptable-object,
unsupported-membership-test
[REPORTS]

View file

@ -1,7 +1,6 @@
__version__ = "0.0.12"
__version__ = "0.0.13a"
__name__ = "aioupnp"
__author__ = "Jack Robison"
__maintainer__ = "Jack Robison"
__license__ = "MIT"
__email__ = "jackrobison@lbry.io"

View file

@ -1,7 +1,6 @@
import asyncio
import time
import typing
import functools
import logging
from typing import Tuple
from aioupnp.protocols.scpd import scpd_post
@ -18,7 +17,7 @@ def soap_bool(x: typing.Optional[str]) -> bool:
return False if not x or str(x).lower() in ['false', 'False'] else True
def recast_single_result(t, result):
def recast_single_result(t: type, result: typing.Any) -> typing.Optional[typing.Union[str, int, float, bool]]:
if t is bool:
return soap_bool(result)
if t is str:
@ -26,44 +25,22 @@ def recast_single_result(t, result):
return t(result)
def recast_return(return_annotation, result, result_keys: typing.List[str]):
if return_annotation is None:
return None
def recast_return(return_annotation, result: typing.Dict[str, typing.Union[int, str]],
result_keys: typing.List[str]) -> typing.Tuple:
if return_annotation is None or len(result_keys) == 0:
return ()
if len(result_keys) == 1:
assert len(result_keys) == 1
single_result = result[result_keys[0]]
return recast_single_result(return_annotation, single_result)
return (recast_single_result(return_annotation, single_result), )
annotated_args: typing.List[type] = list(return_annotation.__args__)
assert len(annotated_args) == len(result_keys)
recast_results: typing.List[typing.Optional[typing.Union[str, int, bool, bytes]]] = []
recast_results: typing.List[typing.Optional[typing.Union[str, int, float, bool]]] = []
for type_annotation, result_key in zip(annotated_args, result_keys):
recast_results.append(recast_single_result(type_annotation, result[result_key]))
recast_results.append(recast_single_result(type_annotation, result.get(result_key, None)))
return tuple(recast_results)
def soap_command(fn):
@functools.wraps(fn)
async def wrapper(self: 'SOAPCommands', **kwargs):
if not self.is_registered(fn.__name__):
return fn(self, **kwargs)
service = self.get_service(fn.__name__)
assert service.controlURL is not None
assert service.serviceType is not None
response, xml_bytes, err = await scpd_post(
service.controlURL, self._base_address.decode(), self._port, fn.__name__, self._registered[service][fn.__name__][0],
service.serviceType.encode(), self._loop, **kwargs
)
if err is not None:
self._requests.append((fn.__name__, kwargs, xml_bytes, None, err, time.time()))
raise err
result = recast_return(fn.__annotations__.get('return'), response, self._registered[service][fn.__name__][1])
self._requests.append((fn.__name__, kwargs, xml_bytes, result, None, time.time()))
return result
return wrapper
class SOAPCommands:
"""
Type annotated wrappers for common UPnP SOAP functions
@ -77,39 +54,41 @@ class SOAPCommands:
SOAP_COMMANDS: typing.List[str] = [
'AddPortMapping',
'GetNATRSIPStatus',
'GetGenericPortMappingEntry',
'GetSpecificPortMappingEntry',
'SetConnectionType',
'GetExternalIPAddress',
'GetConnectionTypeInfo',
'GetStatusInfo',
'ForceTermination',
'DeletePortMapping',
'RequestConnection',
'GetCommonLinkProperties',
'GetTotalBytesSent',
'GetTotalBytesReceived',
'GetTotalPacketsSent',
'GetTotalPacketsReceived',
'X_GetICSStatistics',
'GetDefaultConnectionService',
'SetDefaultConnectionService',
'SetEnabledForInternet',
'GetEnabledForInternet',
'GetMaximumActiveConnections',
'GetActiveConnections'
'GetExternalIPAddress',
# 'SetConnectionType',
# 'GetNATRSIPStatus',
# 'GetConnectionTypeInfo',
# 'GetStatusInfo',
# 'ForceTermination',
# 'RequestConnection',
# 'GetCommonLinkProperties',
# 'GetTotalBytesSent',
# 'GetTotalBytesReceived',
# 'GetTotalPacketsSent',
# 'GetTotalPacketsReceived',
# 'X_GetICSStatistics',
# 'GetDefaultConnectionService',
# 'SetDefaultConnectionService',
# 'SetEnabledForInternet',
# 'GetEnabledForInternet',
# 'GetMaximumActiveConnections',
# 'GetActiveConnections'
]
def __init__(self, loop: asyncio.AbstractEventLoop, base_address: bytes, port: int) -> None:
self._loop = loop
self._registered: typing.Dict[Service,
typing.Dict[str, typing.Tuple[typing.List[str], typing.List[str]]]] = {}
self._wrappers_no_args: typing.Dict[str, typing.Callable[[], typing.Awaitable[typing.Any]]] = {}
self._wrappers_kwargs: typing.Dict[str, typing.Callable[..., typing.Awaitable[typing.Any]]] = {}
self._base_address = base_address
self._port = port
self._requests: typing.List[typing.Tuple[str, typing.Dict[str, typing.Any], bytes,
typing.Optional[typing.Dict[str, typing.Any]],
typing.Optional[Exception], float]] = []
typing.Tuple, typing.Optional[Exception], float]] = []
def is_registered(self, name: str) -> bool:
if name not in self.SOAP_COMMANDS:
@ -127,8 +106,37 @@ class SOAPCommands:
return service
raise ValueError(name)
def _register_soap_wrapper(self, name: str) -> None:
annotations: typing.Dict[str, typing.Any] = typing.get_type_hints(getattr(self, name))
service = self.get_service(name)
input_names: typing.List[str] = self._registered[service][name][0]
output_names: typing.List[str] = self._registered[service][name][1]
async def wrapper(**kwargs: typing.Any) -> typing.Tuple:
assert service.controlURL is not None
assert service.serviceType is not None
response, xml_bytes, err = await scpd_post(
service.controlURL, self._base_address.decode(), self._port, name, input_names,
service.serviceType.encode(), self._loop, **kwargs
)
if err is not None:
assert isinstance(xml_bytes, bytes)
self._requests.append((name, kwargs, xml_bytes, (), err, time.time()))
raise err
assert 'return' in annotations
result = recast_return(annotations['return'], response, output_names)
self._requests.append((name, kwargs, xml_bytes, result, None, time.time()))
return result
if not len(list(k for k in annotations if k != 'return')):
self._wrappers_no_args[name] = wrapper
else:
self._wrappers_kwargs[name] = wrapper
return None
def register(self, name: str, service: Service, inputs: typing.List[str], outputs: typing.List[str]) -> None:
# control_url: str, service_type: bytes,
if name not in self.SOAP_COMMANDS:
raise AttributeError(name)
if self.is_registered(name):
@ -136,122 +144,227 @@ class SOAPCommands:
if service not in self._registered:
self._registered[service] = {}
self._registered[service][name] = inputs, outputs
self._register_soap_wrapper(name)
@soap_command
async def AddPortMapping(self, NewRemoteHost: str, NewExternalPort: int, NewProtocol: str, NewInternalPort: int,
NewInternalClient: str, NewEnabled: int, NewPortMappingDescription: str,
NewLeaseDuration: str) -> None:
"""Returns None"""
raise NotImplementedError()
name = "AddPortMapping"
if not self.is_registered(name):
raise NotImplementedError()
assert name in self._wrappers_kwargs
await self._wrappers_kwargs[name](
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol,
NewInternalPort=NewInternalPort, NewInternalClient=NewInternalClient, NewEnabled=NewEnabled,
NewPortMappingDescription=NewPortMappingDescription, NewLeaseDuration=NewLeaseDuration
)
return None
@soap_command
async def GetNATRSIPStatus(self) -> Tuple[bool, bool]:
"""Returns (NewRSIPAvailable, NewNATEnabled)"""
raise NotImplementedError()
@soap_command
async def GetGenericPortMappingEntry(self, NewPortMappingIndex: int) -> Tuple[str, int, str, int, str,
bool, str, int]:
"""
Returns (NewRemoteHost, NewExternalPort, NewProtocol, NewInternalPort, NewInternalClient, NewEnabled,
NewPortMappingDescription, NewLeaseDuration)
"""
raise NotImplementedError()
name = "GetGenericPortMappingEntry"
if not self.is_registered(name):
raise NotImplementedError()
assert name in self._wrappers_kwargs
result: Tuple[str, int, str, int, str, bool, str, int] = await self._wrappers_kwargs[name](
NewPortMappingIndex=NewPortMappingIndex
)
return result
@soap_command
async def GetSpecificPortMappingEntry(self, NewRemoteHost: str, NewExternalPort: int,
NewProtocol: str) -> Tuple[int, str, bool, str, int]:
"""Returns (NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration)"""
raise NotImplementedError()
name = "GetSpecificPortMappingEntry"
if not self.is_registered(name):
raise NotImplementedError()
assert name in self._wrappers_kwargs
result: Tuple[int, str, bool, str, int] = await self._wrappers_kwargs[name](
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
)
return result
@soap_command
async def SetConnectionType(self, NewConnectionType: str) -> None:
"""Returns None"""
raise NotImplementedError()
@soap_command
async def GetExternalIPAddress(self) -> str:
"""Returns (NewExternalIPAddress)"""
raise NotImplementedError()
@soap_command
async def GetConnectionTypeInfo(self) -> Tuple[str, str]:
"""Returns (NewConnectionType, NewPossibleConnectionTypes)"""
raise NotImplementedError()
@soap_command
async def GetStatusInfo(self) -> Tuple[str, str, int]:
"""Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)"""
raise NotImplementedError()
@soap_command
async def ForceTermination(self) -> None:
"""Returns None"""
raise NotImplementedError()
@soap_command
async def DeletePortMapping(self, NewRemoteHost: str, NewExternalPort: int, NewProtocol: str) -> None:
"""Returns None"""
raise NotImplementedError()
name = "DeletePortMapping"
if not self.is_registered(name):
raise NotImplementedError()
assert name in self._wrappers_kwargs
await self._wrappers_kwargs[name](
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
)
return None
@soap_command
async def RequestConnection(self) -> None:
"""Returns None"""
raise NotImplementedError()
async def GetExternalIPAddress(self) -> str:
"""Returns (NewExternalIPAddress)"""
name = "GetExternalIPAddress"
if not self.is_registered(name):
raise NotImplementedError()
assert name in self._wrappers_no_args
result: Tuple[str] = await self._wrappers_no_args[name]()
return result[0]
@soap_command
async def GetCommonLinkProperties(self) -> Tuple[str, int, int, str]:
"""Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate, NewPhysicalLinkStatus)"""
raise NotImplementedError()
@soap_command
async def GetTotalBytesSent(self) -> int:
"""Returns (NewTotalBytesSent)"""
raise NotImplementedError()
@soap_command
async def GetTotalBytesReceived(self) -> int:
"""Returns (NewTotalBytesReceived)"""
raise NotImplementedError()
@soap_command
async def GetTotalPacketsSent(self) -> int:
"""Returns (NewTotalPacketsSent)"""
raise NotImplementedError()
@soap_command
async def GetTotalPacketsReceived(self) -> int:
"""Returns (NewTotalPacketsReceived)"""
raise NotImplementedError()
@soap_command
async def X_GetICSStatistics(self) -> Tuple[int, int, int, int, str, str]:
"""Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived, Layer1DownstreamMaxBitRate, Uptime)"""
raise NotImplementedError()
@soap_command
async def GetDefaultConnectionService(self) -> str:
"""Returns (NewDefaultConnectionService)"""
raise NotImplementedError()
@soap_command
async def SetDefaultConnectionService(self, NewDefaultConnectionService: str) -> None:
"""Returns (None)"""
raise NotImplementedError()
@soap_command
async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None:
raise NotImplementedError()
@soap_command
async def GetEnabledForInternet(self) -> bool:
raise NotImplementedError()
@soap_command
async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int):
raise NotImplementedError()
@soap_command
async def GetActiveConnections(self) -> Tuple[str, str]:
"""Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID"""
raise NotImplementedError()
# async def GetNATRSIPStatus(self) -> Tuple[bool, bool]:
# """Returns (NewRSIPAvailable, NewNATEnabled)"""
# name = "GetNATRSIPStatus"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[bool, bool] = await self._wrappers_no_args[name]()
# return result[0], result[1]
#
# async def SetConnectionType(self, NewConnectionType: str) -> None:
# """Returns None"""
# name = "SetConnectionType"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewConnectionType=NewConnectionType)
# return None
#
# async def GetConnectionTypeInfo(self) -> Tuple[str, str]:
# """Returns (NewConnectionType, NewPossibleConnectionTypes)"""
# name = "GetConnectionTypeInfo"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[str, str] = await self._wrappers_no_args[name]()
# return result
#
# async def GetStatusInfo(self) -> Tuple[str, str, int]:
# """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)"""
# name = "GetStatusInfo"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[str, str, int] = await self._wrappers_no_args[name]()
# return result
#
# async def ForceTermination(self) -> None:
# """Returns None"""
# name = "ForceTermination"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# await self._wrappers_no_args[name]()
# return None
#
# async def RequestConnection(self) -> None:
# """Returns None"""
# name = "RequestConnection"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# await self._wrappers_no_args[name]()
# return None
#
# async def GetCommonLinkProperties(self) -> Tuple[str, int, int, str]:
# """Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate,
# NewPhysicalLinkStatus)"""
# name = "GetCommonLinkProperties"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[str, int, int, str] = await self._wrappers_no_args[name]()
# return result
#
# async def GetTotalBytesSent(self) -> int:
# """Returns (NewTotalBytesSent)"""
# name = "GetTotalBytesSent"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0]
#
# async def GetTotalBytesReceived(self) -> int:
# """Returns (NewTotalBytesReceived)"""
# name = "GetTotalBytesReceived"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0]
#
# async def GetTotalPacketsSent(self) -> int:
# """Returns (NewTotalPacketsSent)"""
# name = "GetTotalPacketsSent"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0]
#
# async def GetTotalPacketsReceived(self) -> int:
# """Returns (NewTotalPacketsReceived)"""
# name = "GetTotalPacketsReceived"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0]
#
# async def X_GetICSStatistics(self) -> Tuple[int, int, int, int, str, str]:
# """Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived,
# Layer1DownstreamMaxBitRate, Uptime)"""
# name = "X_GetICSStatistics"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[int, int, int, int, str, str] = await self._wrappers_no_args[name]()
# return result
#
# async def GetDefaultConnectionService(self) -> str:
# """Returns (NewDefaultConnectionService)"""
# name = "GetDefaultConnectionService"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[str] = await self._wrappers_no_args[name]()
# return result[0]
#
# async def SetDefaultConnectionService(self, NewDefaultConnectionService: str) -> None:
# """Returns (None)"""
# name = "SetDefaultConnectionService"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewDefaultConnectionService=NewDefaultConnectionService)
# return None
#
# async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None:
# name = "SetEnabledForInternet"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewEnabledForInternet=NewEnabledForInternet)
# return None
#
# async def GetEnabledForInternet(self) -> bool:
# name = "GetEnabledForInternet"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[bool] = await self._wrappers_no_args[name]()
# return result[0]
#
# async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int) -> None:
# name = "GetMaximumActiveConnections"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewActiveConnectionIndex=NewActiveConnectionIndex)
# return None
#
# async def GetActiveConnections(self) -> Tuple[str, str]:
# """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID"""
# name = "GetActiveConnections"
# if not self.is_registered(name):
# raise NotImplementedError()
# assert name in self._wrappers_no_args
# result: Tuple[str, str] = await self._wrappers_no_args[name]()
# return result

View file

@ -6,7 +6,8 @@ log = logging.getLogger(__name__)
class CaseInsensitive:
def __init__(self, **kwargs: typing.Dict[str, typing.Union[str, typing.Dict[str, typing.Any], typing.List[typing.Any]]]) -> None:
def __init__(self, **kwargs: typing.Dict[str, typing.Union[str, typing.Dict[str, typing.Any],
typing.List[typing.Any]]]) -> None:
keys: typing.List[str] = list(kwargs.keys())
for k in keys:
if not k.startswith("_"):

View file

@ -69,8 +69,6 @@ def parse_location(location: bytes) -> typing.Tuple[bytes, int]:
class Gateway:
commands: SOAPCommands
def __init__(self, ok_packet: SSDPDatagram, m_search_args: typing.Dict[str, typing.Union[int, str]],
lan_address: str, gateway_address: str,
loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None:
@ -144,10 +142,10 @@ class Gateway:
@property
def soap_requests(self) -> typing.List[typing.Tuple[str, typing.Dict[str, typing.Any], bytes,
typing.Optional[typing.Dict[str, typing.Any]],
typing.Optional[typing.Tuple],
typing.Optional[Exception], float]]:
soap_call_infos: typing.List[typing.Tuple[str, typing.Dict[str, typing.Any], bytes,
typing.Optional[typing.Dict[str, typing.Any]],
typing.Optional[typing.Tuple],
typing.Optional[Exception], float]] = []
soap_call_infos.extend([
(name, request_args, raw_response, decoded_response, soap_error, ts)
@ -241,7 +239,7 @@ class Gateway:
task.exception()
except asyncio.CancelledError:
pass
results: typing.List[asyncio.Future['Gateway']] = list(done)
results: typing.List['asyncio.Future[Gateway]'] = list(done)
return results[0].result()
async def discover_commands(self, loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None:

View file

@ -8,15 +8,17 @@ def get_netifaces():
return netifaces
def ifaddresses(iface: str):
def ifaddresses(iface: str) -> typing.Dict[int, typing.List[typing.Dict[str, str]]]:
return get_netifaces().ifaddresses(iface)
def _get_interfaces():
def _get_interfaces() -> typing.List[str]:
return get_netifaces().interfaces()
def _get_gateways():
def _get_gateways() -> typing.Dict[typing.Union[str, int],
typing.Union[typing.Dict[int, typing.Tuple[str, str]],
typing.List[typing.Tuple[str, str, bool]]]]:
return get_netifaces().gateways()
@ -26,7 +28,7 @@ def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]:
assert isinstance(infos, list), TypeError(f"expected list from netifaces, got a dict")
interface_infos: typing.List[typing.Tuple[str, str, bool]] = infos
result: typing.Dict[str, typing.Tuple[str, str]] = OrderedDict(
(interface_name, (router_address, ifaddresses(interface_name)[netifaces.AF_INET][0]['addr']))
(interface_name, (router_address, ifaddresses(interface_name)[socket.AF_INET][0]['addr']))
for router_address, interface_name, _ in interface_infos
)
for interface_name in _get_interfaces():
@ -40,7 +42,7 @@ def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]:
_default = gateways['default']
assert isinstance(_default, dict), TypeError(f"expected dict from netifaces, got a list")
default: typing.Dict[int, typing.Tuple[str, str]] = _default
result['default'] = result[default[netifaces.AF_INET][1]]
result['default'] = result[default[socket.AF_INET][1]]
return result

View file

@ -10,7 +10,7 @@ def _get_sock(transport: typing.Optional[BaseTransport]) -> typing.Optional[sock
if transport is None or not hasattr(transport, "_extra"):
return None
sock: typing.Optional[socket.socket] = transport.get_extra_info('socket', None)
assert sock is None or isinstance(sock, socket.SocketType) or isinstance(sock, mock.MagicMock)
assert sock is None or isinstance(sock, (socket.SocketType, mock.MagicMock))
return sock

View file

@ -101,11 +101,11 @@ class SCPDHTTPClientProtocol(Protocol):
async def scpd_get(control_url: str, address: str, port: int,
loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> typing.Tuple[typing.Dict[str, typing.Any], bytes,
typing.Optional[Exception]]:
loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> typing.Tuple[
typing.Dict[str, typing.Any], bytes, typing.Optional[Exception]]:
loop = loop or asyncio.get_event_loop()
packet = serialize_scpd_get(control_url, address)
finished: asyncio.Future[typing.Tuple[bytes, int, bytes]] = asyncio.Future(loop=loop)
finished: 'asyncio.Future[typing.Tuple[bytes, int, bytes]]' = asyncio.Future(loop=loop)
proto_factory: typing.Callable[[], SCPDHTTPClientProtocol] = lambda: SCPDHTTPClientProtocol(packet, finished)
connect_tup: typing.Tuple[asyncio.BaseTransport, asyncio.BaseProtocol] = await loop.create_connection(
proto_factory, address, port
@ -140,7 +140,7 @@ async def scpd_post(control_url: str, address: str, port: int, method: str, para
**kwargs: typing.Dict[str, typing.Any]
) -> typing.Tuple[typing.Dict, bytes, typing.Optional[Exception]]:
loop = loop or asyncio.get_event_loop()
finished: asyncio.Future[typing.Tuple[bytes, int, bytes]] = asyncio.Future(loop=loop)
finished: 'asyncio.Future[typing.Tuple[bytes, int, bytes]]' = asyncio.Future(loop=loop)
packet = serialize_soap_post(method, param_names, service_id, address.encode(), control_url.encode(), **kwargs)
proto_factory: typing.Callable[[], SCPDHTTPClientProtocol] = lambda:\
SCPDHTTPClientProtocol(packet, finished, soap_method=method, soap_service_id=service_id.decode())
@ -159,6 +159,7 @@ async def scpd_post(control_url: str, address: str, port: int, method: str, para
except UPnPError as err:
return {}, protocol.response_buff, err
finally:
# raw_response = protocol.response_buff
transport.close()
try:
return (

View file

@ -4,7 +4,6 @@ import asyncio
import logging
import typing
import socket
from collections import OrderedDict
from asyncio.transports import DatagramTransport
from aioupnp.fault import UPnPError
from aioupnp.serialization.ssdp import SSDPDatagram
@ -25,7 +24,7 @@ class SSDPProtocol(MulticastProtocol):
self.transport: typing.Optional[DatagramTransport] = None
self._unicast = unicast
self._ignored: typing.Set[str] = ignored or set() # ignored locations
self._pending_searches: typing.List[typing.Tuple[str, str, asyncio.Future[SSDPDatagram], asyncio.Handle]] = []
self._pending_searches: typing.List[typing.Tuple[str, str, 'asyncio.Future[SSDPDatagram]', asyncio.Handle]] = []
self.notifications: typing.List[SSDPDatagram] = []
self.connected = asyncio.Event(loop=self.loop)
@ -54,12 +53,12 @@ class SSDPProtocol(MulticastProtocol):
if packet.location in self._ignored:
return None
# TODO: fix this
tmp: typing.List[typing.Tuple[str, str, asyncio.Future[SSDPDatagram], asyncio.Handle]] = []
set_futures: typing.List[asyncio.Future[SSDPDatagram]] = []
tmp: typing.List[typing.Tuple[str, str, 'asyncio.Future[SSDPDatagram]', asyncio.Handle]] = []
set_futures: typing.List['asyncio.Future[SSDPDatagram]'] = []
while len(self._pending_searches):
t: typing.Tuple[str, str, asyncio.Future[SSDPDatagram], asyncio.Handle] = self._pending_searches.pop()
t: typing.Tuple[str, str, 'asyncio.Future[SSDPDatagram]', asyncio.Handle] = self._pending_searches.pop()
if (address == t[0]) and (t[1] in [packet.st, "upnp:rootdevice"]):
f: asyncio.Future[SSDPDatagram] = t[2]
f = t[2]
if f not in set_futures:
set_futures.append(f)
if not f.done():
@ -80,7 +79,7 @@ class SSDPProtocol(MulticastProtocol):
async def m_search(self, address: str, timeout: float,
datagrams: typing.List[typing.Dict[str, typing.Union[str, int]]]) -> SSDPDatagram:
fut: asyncio.Future[SSDPDatagram] = asyncio.Future(loop=self.loop)
fut: 'asyncio.Future[SSDPDatagram]' = asyncio.Future(loop=self.loop)
for datagram in datagrams:
packet = SSDPDatagram("M-SEARCH", datagram)
assert packet.st is not None

View file

@ -1,11 +1,11 @@
import os
# import os
# import zlib
# import base64
import logging
import json
import asyncio
import zlib
import base64
from collections import OrderedDict
from typing import Tuple, Dict, List, Union, Optional, Callable
from typing import Tuple, Dict, List, Union, Optional
from aioupnp.fault import UPnPError
from aioupnp.gateway import Gateway
from aioupnp.interfaces import get_gateway_and_lan_addresses
@ -49,7 +49,8 @@ class UPnP:
@classmethod
async def discover(cls, lan_address: str = '', gateway_address: str = '', timeout: int = 30,
igd_args: OrderedDict = None, interface_name: str = 'default', loop=None):
igd_args: Optional[Dict[str, Union[str, int]]] = None, interface_name: str = 'default',
loop: Optional[asyncio.AbstractEventLoop] = None) -> 'UPnP':
try:
lan_address, gateway_address = cls.get_lan_and_gateway(lan_address, gateway_address, interface_name)
except Exception as err:
@ -64,7 +65,8 @@ class UPnP:
async def m_search(cls, lan_address: str = '', gateway_address: str = '', timeout: int = 1,
igd_args: Optional[Dict[str, Union[int, str]]] = None,
unicast: bool = True, interface_name: str = 'default',
loop=None) -> Dict[str, Union[str, Dict[str, Union[int, str]]]]:
loop: Optional[asyncio.AbstractEventLoop] = None
) -> Dict[str, Union[str, Dict[str, Union[int, str]]]]:
if not lan_address or not gateway_address:
try:
lan_address, gateway_address = cls.get_lan_and_gateway(lan_address, gateway_address, interface_name)
@ -98,16 +100,16 @@ class UPnP:
)
@cli
async def get_port_mapping_by_index(self, index: int) -> Dict:
result = await self._get_port_mapping_by_index(index)
if result:
if self.gateway.commands.is_registered('GetGenericPortMappingEntry'):
return {
k: v for k, v in zip(self.gateway.commands.GetGenericPortMappingEntry.return_order, result)
}
return {}
async def get_port_mapping_by_index(self, index: int):
return await self._get_port_mapping_by_index(index)
# if result:
# if self.gateway.commands.is_registered('GetGenericPortMappingEntry'):
# return {
# k: v for k, v in zip(self.gateway.commands.GetGenericPortMappingEntry.return_order, result)
# }
# return {}
async def _get_port_mapping_by_index(self, index: int) -> Union[None, Tuple[Optional[str], int, str,
async def _get_port_mapping_by_index(self, index: int) -> Optional[Tuple[Optional[str], int, str,
int, str, bool, str, int]]:
try:
redirect = await self.gateway.commands.GetGenericPortMappingEntry(NewPortMappingIndex=index)
@ -127,22 +129,20 @@ class UPnP:
return redirects
@cli
async def get_specific_port_mapping(self, external_port: int, protocol: str) -> Dict:
async def get_specific_port_mapping(self, external_port: int, protocol: str):
"""
:param external_port: (int) external port to listen on
:param protocol: (str) 'UDP' | 'TCP'
:return: (int) <internal port>, (str) <lan ip>, (bool) <enabled>, (str) <description>, (int) <lease time>
"""
try:
result = await self.gateway.commands.GetSpecificPortMappingEntry(
NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol
)
if result and self.gateway.commands.is_registered('GetSpecificPortMappingEntry'):
return {k: v for k, v in zip(self.gateway.commands.GetSpecificPortMappingEntry.return_order, result)}
except UPnPError:
pass
return {}
# try:
return await self.gateway.commands.GetSpecificPortMappingEntry(
NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol
)
# except UPnPError:
# pass
# return {}
@cli
async def delete_port_mapping(self, external_port: int, protocol: str) -> None:
@ -193,156 +193,103 @@ class UPnP:
"client_address": self.lan_address,
}, default=_encode, indent=2)
@property
def zipped_debugging_info(self) -> str:
return base64.b64encode(zlib.compress(
json.dumps({
"gateway": self.gateway.debug_gateway(),
"client_address": self.lan_address,
}, default=_encode, indent=2).encode()
)).decode()
@cli
async def generate_test_data(self):
print("found gateway via M-SEARCH")
try:
external_ip = await self.get_external_ip()
print("got external ip: %s" % external_ip)
except (UPnPError, NotImplementedError):
print("failed to get the external ip")
try:
await self.get_redirects()
print("got redirects")
except (UPnPError, NotImplementedError):
print("failed to get redirects")
try:
await self.get_specific_port_mapping(4567, "UDP")
print("got specific mapping")
except (UPnPError, NotImplementedError):
print("failed to get specific mapping")
try:
ext_port = await self.get_next_mapping(4567, "UDP", "aioupnp test mapping")
print("set up external mapping to port %i" % ext_port)
try:
await self.get_specific_port_mapping(4567, "UDP")
print("got specific mapping")
except (UPnPError, NotImplementedError):
print("failed to get specific mapping")
try:
await self.get_redirects()
print("got redirects")
except (UPnPError, NotImplementedError):
print("failed to get redirects")
await self.delete_port_mapping(ext_port, "UDP")
print("deleted mapping")
except (UPnPError, NotImplementedError):
print("failed to add and remove a mapping")
try:
await self.get_redirects()
print("got redirects")
except (UPnPError, NotImplementedError):
print("failed to get redirects")
try:
await self.get_specific_port_mapping(4567, "UDP")
print("got specific mapping")
except (UPnPError, NotImplementedError):
print("failed to get specific mapping")
if self.gateway.devices:
device = list(self.gateway.devices.values())[0]
assert device.manufacturer and device.modelName
device_path = os.path.join(os.getcwd(), self.gateway.manufacturer_string)
else:
device_path = os.path.join(os.getcwd(), "UNKNOWN GATEWAY")
with open(device_path, "w") as f:
f.write(await self.debug_gateway())
return "Generated test data! -> %s" % device_path
@cli
async def get_natrsip_status(self) -> Tuple[bool, bool]:
"""Returns (NewRSIPAvailable, NewNATEnabled)"""
return await self.gateway.commands.GetNATRSIPStatus()
@cli
async def set_connection_type(self, NewConnectionType: str) -> None:
"""Returns None"""
return await self.gateway.commands.SetConnectionType(NewConnectionType)
@cli
async def get_connection_type_info(self) -> Tuple[str, str]:
"""Returns (NewConnectionType, NewPossibleConnectionTypes)"""
return await self.gateway.commands.GetConnectionTypeInfo()
@cli
async def get_status_info(self) -> Tuple[str, str, int]:
"""Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)"""
return await self.gateway.commands.GetStatusInfo()
@cli
async def force_termination(self) -> None:
"""Returns None"""
return await self.gateway.commands.ForceTermination()
@cli
async def request_connection(self) -> None:
"""Returns None"""
return await self.gateway.commands.RequestConnection()
@cli
async def get_common_link_properties(self):
"""Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate, NewPhysicalLinkStatus)"""
return await self.gateway.commands.GetCommonLinkProperties()
@cli
async def get_total_bytes_sent(self):
"""Returns (NewTotalBytesSent)"""
return await self.gateway.commands.GetTotalBytesSent()
@cli
async def get_total_bytes_received(self):
"""Returns (NewTotalBytesReceived)"""
return await self.gateway.commands.GetTotalBytesReceived()
@cli
async def get_total_packets_sent(self):
"""Returns (NewTotalPacketsSent)"""
return await self.gateway.commands.GetTotalPacketsSent()
@cli
async def get_total_packets_received(self):
"""Returns (NewTotalPacketsReceived)"""
return await self.gateway.commands.GetTotalPacketsReceived()
@cli
async def x_get_ics_statistics(self) -> Tuple[int, int, int, int, str, str]:
"""Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived, Layer1DownstreamMaxBitRate, Uptime)"""
return await self.gateway.commands.X_GetICSStatistics()
@cli
async def get_default_connection_service(self):
"""Returns (NewDefaultConnectionService)"""
return await self.gateway.commands.GetDefaultConnectionService()
@cli
async def set_default_connection_service(self, NewDefaultConnectionService: str) -> None:
"""Returns (None)"""
return await self.gateway.commands.SetDefaultConnectionService(NewDefaultConnectionService)
@cli
async def set_enabled_for_internet(self, NewEnabledForInternet: bool) -> None:
return await self.gateway.commands.SetEnabledForInternet(NewEnabledForInternet)
@cli
async def get_enabled_for_internet(self) -> bool:
return await self.gateway.commands.GetEnabledForInternet()
@cli
async def get_maximum_active_connections(self, NewActiveConnectionIndex: int):
return await self.gateway.commands.GetMaximumActiveConnections(NewActiveConnectionIndex)
@cli
async def get_active_connections(self) -> Tuple[str, str]:
"""Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID"""
return await self.gateway.commands.GetActiveConnections()
# @property
# def zipped_debugging_info(self) -> str:
# return base64.b64encode(zlib.compress(
# json.dumps({
# "gateway": self.gateway.debug_gateway(),
# "client_address": self.lan_address,
# }, default=_encode, indent=2).encode()
# )).decode()
#
# @cli
# async def get_natrsip_status(self) -> Tuple[bool, bool]:
# """Returns (NewRSIPAvailable, NewNATEnabled)"""
# return await self.gateway.commands.GetNATRSIPStatus()
#
# @cli
# async def set_connection_type(self, NewConnectionType: str) -> None:
# """Returns None"""
# return await self.gateway.commands.SetConnectionType(NewConnectionType)
#
# @cli
# async def get_connection_type_info(self) -> Tuple[str, str]:
# """Returns (NewConnectionType, NewPossibleConnectionTypes)"""
# return await self.gateway.commands.GetConnectionTypeInfo()
#
# @cli
# async def get_status_info(self) -> Tuple[str, str, int]:
# """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)"""
# return await self.gateway.commands.GetStatusInfo()
#
# @cli
# async def force_termination(self) -> None:
# """Returns None"""
# return await self.gateway.commands.ForceTermination()
#
# @cli
# async def request_connection(self) -> None:
# """Returns None"""
# return await self.gateway.commands.RequestConnection()
#
# @cli
# async def get_common_link_properties(self):
# """Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate,
# NewPhysicalLinkStatus)"""
# return await self.gateway.commands.GetCommonLinkProperties()
#
# @cli
# async def get_total_bytes_sent(self) -> int:
# """Returns (NewTotalBytesSent)"""
# return await self.gateway.commands.GetTotalBytesSent()
#
# @cli
# async def get_total_bytes_received(self):
# """Returns (NewTotalBytesReceived)"""
# return await self.gateway.commands.GetTotalBytesReceived()
#
# @cli
# async def get_total_packets_sent(self):
# """Returns (NewTotalPacketsSent)"""
# return await self.gateway.commands.GetTotalPacketsSent()
#
# @cli
# async def get_total_packets_received(self):
# """Returns (NewTotalPacketsReceived)"""
# return await self.gateway.commands.GetTotalPacketsReceived()
#
# @cli
# async def x_get_ics_statistics(self) -> Tuple[int, int, int, int, str, str]:
# """Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived,
# Layer1DownstreamMaxBitRate, Uptime)"""
# return await self.gateway.commands.X_GetICSStatistics()
#
# @cli
# async def get_default_connection_service(self):
# """Returns (NewDefaultConnectionService)"""
# return await self.gateway.commands.GetDefaultConnectionService()
#
# @cli
# async def set_default_connection_service(self, NewDefaultConnectionService: str) -> None:
# """Returns (None)"""
# return await self.gateway.commands.SetDefaultConnectionService(NewDefaultConnectionService)
#
# @cli
# async def set_enabled_for_internet(self, NewEnabledForInternet: bool) -> None:
# return await self.gateway.commands.SetEnabledForInternet(NewEnabledForInternet)
#
# @cli
# async def get_enabled_for_internet(self) -> bool:
# return await self.gateway.commands.GetEnabledForInternet()
#
# @cli
# async def get_maximum_active_connections(self, NewActiveConnectionIndex: int):
# return await self.gateway.commands.GetMaximumActiveConnections(NewActiveConnectionIndex)
#
# @cli
# async def get_active_connections(self) -> Tuple[str, str]:
# """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID"""
# return await self.gateway.commands.GetActiveConnections()
@classmethod
def run_cli(cls, method, igd_args: Dict[str, Union[bool, str, int]], lan_address: str = '',
@ -361,8 +308,8 @@ class UPnP:
kwargs = kwargs or {}
igd_args = igd_args
timeout = int(timeout)
loop = loop or asyncio.get_event_loop_policy().get_event_loop()
fut: asyncio.Future = loop.create_future()
loop = loop or asyncio.get_event_loop()
fut: 'asyncio.Future' = asyncio.Future(loop=loop)
async def wrapper(): # wrap the upnp setup and call of the command in a coroutine
@ -404,6 +351,6 @@ class UPnP:
return
if isinstance(result, (list, tuple, dict)):
print(json.dumps(result, indent=2, default=_encode))
print(json.dumps(result, indent=2))
else:
print(result)

View file

@ -36,7 +36,8 @@ def flatten_keys(to_flatten: str_any_dict, strip: str) -> str_any_dict:
return copy
def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyStr], key: typing.AnyStr) -> typing.Optional[typing.AnyStr]:
def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyStr],
key: typing.AnyStr) -> typing.Optional[typing.AnyStr]:
match: typing.List[typing.AnyStr] = list(filter(lambda x: x.lower() == key.lower(), source.keys()))
if not len(match):
return None

View file

@ -29,6 +29,9 @@ def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_r
sent_tcp_packets.append(data)
if data in tcp_replies:
loop.call_later(tcp_delay_reply, p.data_received, tcp_replies[data])
return
else:
pass
return _write

0
tests/generate_test.py Normal file
View file

View file

@ -171,20 +171,20 @@ class TestDiscoverDLinkDIR890L(AsyncioTestCase):
}
expected_commands = {
'GetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1',
'SetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1',
'GetCommonLinkProperties': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'GetTotalBytesSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'GetTotalBytesReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'GetTotalPacketsSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'GetTotalPacketsReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'X_GetICSStatistics': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'SetConnectionType': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'GetConnectionTypeInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'RequestConnection': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'ForceTermination': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'GetStatusInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'GetNATRSIPStatus': 'urn:schemas-upnp-org:service:WANIPConnection:1',
# 'GetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1',
# 'SetDefaultConnectionService': 'urn:schemas-upnp-org:service:Layer3Forwarding:1',
# 'GetCommonLinkProperties': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
# 'GetTotalBytesSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
# 'GetTotalBytesReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
# 'GetTotalPacketsSent': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
# 'GetTotalPacketsReceived': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
# 'X_GetICSStatistics': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
# 'SetConnectionType': 'urn:schemas-upnp-org:service:WANIPConnection:1',
# 'GetConnectionTypeInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1',
# 'RequestConnection': 'urn:schemas-upnp-org:service:WANIPConnection:1',
# 'ForceTermination': 'urn:schemas-upnp-org:service:WANIPConnection:1',
# 'GetStatusInfo': 'urn:schemas-upnp-org:service:WANIPConnection:1',
# 'GetNATRSIPStatus': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'GetGenericPortMappingEntry': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'GetSpecificPortMappingEntry': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'AddPortMapping': 'urn:schemas-upnp-org:service:WANIPConnection:1',
@ -238,22 +238,22 @@ class TestDiscoverNetgearNighthawkAC2350(TestDiscoverDLinkDIR890L):
}
expected_commands = {
"SetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1",
"GetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1",
"GetCommonLinkProperties": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
"GetTotalBytesSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
"GetTotalBytesReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
"GetTotalPacketsSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
"GetTotalPacketsReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
# "SetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1",
# "GetDefaultConnectionService": "urn:schemas-upnp-org:service:Layer3Forwarding:1",
# "GetCommonLinkProperties": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
# "GetTotalBytesSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
# "GetTotalBytesReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
# "GetTotalPacketsSent": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
# "GetTotalPacketsReceived": "urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1",
"AddPortMapping": "urn:schemas-upnp-org:service:WANIPConnection:1",
"GetExternalIPAddress": "urn:schemas-upnp-org:service:WANIPConnection:1",
"DeletePortMapping": "urn:schemas-upnp-org:service:WANIPConnection:1",
"SetConnectionType": "urn:schemas-upnp-org:service:WANIPConnection:1",
"GetConnectionTypeInfo": "urn:schemas-upnp-org:service:WANIPConnection:1",
"RequestConnection": "urn:schemas-upnp-org:service:WANIPConnection:1",
"ForceTermination": "urn:schemas-upnp-org:service:WANIPConnection:1",
"GetStatusInfo": "urn:schemas-upnp-org:service:WANIPConnection:1",
"GetNATRSIPStatus": "urn:schemas-upnp-org:service:WANIPConnection:1",
# "SetConnectionType": "urn:schemas-upnp-org:service:WANIPConnection:1",
# "GetConnectionTypeInfo": "urn:schemas-upnp-org:service:WANIPConnection:1",
# "RequestConnection": "urn:schemas-upnp-org:service:WANIPConnection:1",
# "ForceTermination": "urn:schemas-upnp-org:service:WANIPConnection:1",
# "GetStatusInfo": "urn:schemas-upnp-org:service:WANIPConnection:1",
# "GetNATRSIPStatus": "urn:schemas-upnp-org:service:WANIPConnection:1",
"GetGenericPortMappingEntry": "urn:schemas-upnp-org:service:WANIPConnection:1",
"GetSpecificPortMappingEntry": "urn:schemas-upnp-org:service:WANIPConnection:1"
}

90
tests/test_upnp.py Normal file

File diff suppressed because one or more lines are too long