refactoring, add protocol tests

This commit is contained in:
Jack Robison 2018-10-24 19:16:17 -04:00
parent 3878521def
commit 0d01972b9b
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
19 changed files with 775 additions and 179 deletions

View file

@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/) The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/) with and this project adheres to [Semantic Versioning](http://semver.org/) with
regard to the json-rpc api. As we're currently pre-1.0 release, we regard to the json-rpc api. As we're currently pre-1.0 release, we
can and probably will change functionality and break backwards compatability can and probably will change functionality and break backwards compatibility
at anytime. at anytime.
## [Unreleased] ## [Unreleased]

View file

@ -16,6 +16,7 @@
Cisco CGA4131COM Cisco CGA4131COM
Linksys WRT1200AC Linksys WRT1200AC
Netgear Nighthawk X4 AC2350 Netgear Nighthawk X4 AC2350
ASUS RT-N66U
## Installation ## Installation

View file

@ -1,9 +1,135 @@
from typing import Tuple, Union import logging
import time
import typing
from typing import Tuple, Union, List
from aioupnp.protocols.scpd import scpd_post
log = logging.getLogger(__name__)
none_or_str = Union[None, str] none_or_str = Union[None, str]
return_type_lambas = {
Union[None, str]: lambda x: x if x is not None and str(x).lower() not in ['none', 'nil'] else None
}
def safe_type(t):
if t is typing.Tuple:
return tuple
if t is typing.List:
return list
if t is typing.Dict:
return dict
if t is typing.Set:
return set
return t
class SOAPCommand:
def __init__(self, gateway_address: str, service_port: int, control_url: str, service_id: bytes, method: str,
param_types: dict, return_types: dict, param_order: list, return_order: list, loop=None) -> None:
self.gateway_address = gateway_address
self.service_port = service_port
self.control_url = control_url
self.service_id = service_id
self.method = method
self.param_types = param_types
self.param_order = param_order
self.return_types = return_types
self.return_order = return_order
self.loop = loop
self._requests: typing.List = []
async def __call__(self, **kwargs) -> typing.Union[None, typing.Dict, typing.List, typing.Tuple]:
if set(kwargs.keys()) != set(self.param_types.keys()):
raise Exception("argument mismatch: %s vs %s" % (kwargs.keys(), self.param_types.keys()))
soap_kwargs = {n: safe_type(self.param_types[n])(kwargs[n]) for n in self.param_types.keys()}
response, xml_bytes, err = await scpd_post(
self.control_url, self.gateway_address, self.service_port, self.method, self.param_order,
self.service_id, self.loop, **soap_kwargs
)
if err is not None:
self._requests.append((soap_kwargs, xml_bytes, None, err, time.time()))
raise err
if not response:
result = None
else:
recast_result = tuple([safe_type(self.return_types[n])(response.get(n)) for n in self.return_order])
if len(recast_result) == 1:
result = recast_result[0]
else:
result = recast_result
self._requests.append((soap_kwargs, xml_bytes, result, None, time.time()))
return result
class SOAPCommands: class SOAPCommands:
"""
Type annotated wrappers for common UPnP SOAP functions
A SOAPCommands object has its command attributes overridden during device discovery with SOAPCommand objects
for the commands implemented by the gateway.
SOAPCommand will use the typing annotations provided here to properly cast the types of arguments and results
to their expected types.
"""
SOAP_COMMANDS = [
'AddPortMapping',
'GetNATRSIPStatus',
'GetGenericPortMappingEntry',
'GetSpecificPortMappingEntry',
'SetConnectionType',
'GetExternalIPAddress',
'GetConnectionTypeInfo',
'GetStatusInfo',
'ForceTermination',
'DeletePortMapping',
'RequestConnection',
'GetCommonLinkProperties',
'GetTotalBytesSent',
'GetTotalBytesReceived',
'GetTotalPacketsSent',
'GetTotalPacketsReceived',
'X_GetICSStatistics',
'GetDefaultConnectionService',
'NewDefaultConnectionService',
'NewEnabledForInternet',
'SetDefaultConnectionService',
'SetEnabledForInternet',
'GetEnabledForInternet',
'NewActiveConnectionIndex',
'GetMaximumActiveConnections',
'GetActiveConnections'
]
def __init__(self):
self._registered = set()
def register(self, base_ip: bytes, port: int, name: str, control_url: str,
service_type: bytes, inputs: List, outputs: List, loop=None) -> None:
if name not in self.SOAP_COMMANDS or name in self._registered:
raise AttributeError(name)
current = getattr(self, name)
annotations = current.__annotations__
return_types = annotations.get('return', None)
if return_types:
if hasattr(return_types, '__args__'):
return_types = tuple([return_type_lambas.get(a, a) for a in return_types.__args__])
elif isinstance(return_types, type):
return_types = (return_types,)
return_types = {r: t for r, t in zip(outputs, return_types)}
param_types = {}
for param_name, param_type in annotations.items():
if param_name == "return":
continue
param_types[param_name] = param_type
command = SOAPCommand(
base_ip.decode(), port, control_url, service_type,
name, param_types, return_types, inputs, outputs, loop=loop
)
setattr(command, "__doc__", current.__doc__)
setattr(self, command.method, command)
self._registered.add(command.method)
@staticmethod @staticmethod
async def AddPortMapping(NewRemoteHost: str, NewExternalPort: int, NewProtocol: str, NewInternalPort: int, async def AddPortMapping(NewRemoteHost: str, NewExternalPort: int, NewProtocol: str, NewInternalPort: int,
NewInternalClient: str, NewEnabled: int, NewPortMappingDescription: str, NewInternalClient: str, NewEnabled: int, NewPortMappingDescription: str,

View file

@ -6,45 +6,23 @@ log = logging.getLogger(__name__)
class CaseInsensitive: class CaseInsensitive:
def __init__(self, **kwargs) -> None: def __init__(self, **kwargs) -> None:
not_evaluated = {}
for k, v in kwargs.items(): for k, v in kwargs.items():
if k.startswith("_"): if not k.startswith("_"):
not_evaluated[k] = v
continue
try:
getattr(self, k) getattr(self, k)
setattr(self, k, v) setattr(self, k, v)
except AttributeError as err:
not_evaluated[k] = v
if not_evaluated:
log.debug("%s did not apply kwargs: %s", self.__class__.__name__, not_evaluated)
def _get_attr_name(self, case_insensitive: str) -> str:
for k, v in self.__dict__.items():
if k.lower() == case_insensitive.lower():
return k
raise AttributeError(case_insensitive)
def __getattr__(self, item): def __getattr__(self, item):
if item in self.__dict__: for k in self.__class__.__dict__.keys():
return self.__dict__[item]
for k, v in self.__class__.__dict__.items():
if k.lower() == item.lower(): if k.lower() == item.lower():
if k not in self.__dict__: return self.__dict__.get(k)
self.__dict__[k] = v
return v
raise AttributeError(item) raise AttributeError(item)
def __setattr__(self, item, value): def __setattr__(self, item, value):
if item in self.__dict__: for k, v in self.__class__.__dict__.items():
self.__dict__[item] = value
return
to_update = None
for k, v in self.__dict__.items():
if k.lower() == item.lower(): if k.lower() == item.lower():
to_update = k self.__dict__[k] = value
break return
self.__dict__[to_update or item] = value raise AttributeError(item)
def as_dict(self) -> dict: def as_dict(self) -> dict:
return { return {

View file

@ -9,7 +9,6 @@ from aioupnp.commands import SOAPCommands
from aioupnp.device import Device, Service from aioupnp.device import Device, Service
from aioupnp.protocols.ssdp import fuzzy_m_search, m_search from aioupnp.protocols.ssdp import fuzzy_m_search, m_search
from aioupnp.protocols.scpd import scpd_get from aioupnp.protocols.scpd import scpd_get
from aioupnp.protocols.soap import SOAPCommand
from aioupnp.serialization.ssdp import SSDPDatagram from aioupnp.serialization.ssdp import SSDPDatagram
from aioupnp.util import flatten_keys from aioupnp.util import flatten_keys
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
@ -156,9 +155,8 @@ class Gateway:
} }
@classmethod @classmethod
async def _discover_gateway(cls, lan_address: str, gateway_address: str, timeout: int = 30, async def _discover_gateway(cls, lan_address: str, gateway_address: str, timeout: int=30,
igd_args: OrderedDict = None, ssdp_socket: socket.socket = None, igd_args: OrderedDict=None, loop=None, unicast: bool=False):
soap_socket: socket.socket = None, unicast: bool = False):
ignored: set = set() ignored: set = set()
required_commands = [ required_commands = [
'AddPortMapping', 'AddPortMapping',
@ -167,16 +165,17 @@ class Gateway:
] ]
while True: while True:
if not igd_args: if not igd_args:
m_search_args, datagram = await asyncio.wait_for(fuzzy_m_search(lan_address, gateway_address, timeout, ssdp_socket, m_search_args, datagram = await asyncio.wait_for(
ignored, unicast), timeout) fuzzy_m_search(lan_address, gateway_address, timeout, loop, ignored, unicast),
timeout
)
else: else:
m_search_args = OrderedDict(igd_args) m_search_args = OrderedDict(igd_args)
datagram = await m_search(lan_address, gateway_address, igd_args, timeout, ssdp_socket, ignored, datagram = await m_search(lan_address, gateway_address, igd_args, timeout, loop, ignored, unicast)
unicast)
try: try:
gateway = cls(datagram, m_search_args, lan_address, gateway_address) gateway = cls(datagram, m_search_args, lan_address, gateway_address)
log.debug('get gateway descriptor %s', datagram.location) log.debug('get gateway descriptor %s', datagram.location)
await gateway.discover_commands(soap_socket) await gateway.discover_commands(loop)
requirements_met = all([required in gateway._registered_commands for required in required_commands]) requirements_met = all([required in gateway._registered_commands for required in required_commands])
if not requirements_met: if not requirements_met:
not_met = [ not_met = [
@ -196,17 +195,15 @@ class Gateway:
@classmethod @classmethod
async def discover_gateway(cls, lan_address: str, gateway_address: str, timeout: int = 30, async def discover_gateway(cls, lan_address: str, gateway_address: str, timeout: int = 30,
igd_args: OrderedDict = None, ssdp_socket: socket.socket = None, igd_args: OrderedDict = None, loop=None, unicast: bool = None):
soap_socket: socket.socket = None, unicast: bool = None):
if unicast is not None: if unicast is not None:
return await cls._discover_gateway(lan_address, gateway_address, timeout, igd_args, ssdp_socket, return await cls._discover_gateway(lan_address, gateway_address, timeout, igd_args, loop)
soap_socket, unicast=unicast)
done, pending = await asyncio.wait([ done, pending = await asyncio.wait([
cls._discover_gateway( cls._discover_gateway(
lan_address, gateway_address, timeout, igd_args, ssdp_socket, soap_socket, unicast=True lan_address, gateway_address, timeout, igd_args, loop, unicast=True
), ),
cls._discover_gateway( cls._discover_gateway(
lan_address, gateway_address, timeout, igd_args, ssdp_socket, soap_socket, unicast=False lan_address, gateway_address, timeout, igd_args, loop, unicast=False
)], return_when=asyncio.tasks.FIRST_COMPLETED )], return_when=asyncio.tasks.FIRST_COMPLETED
) )
for task in list(pending): for task in list(pending):
@ -214,8 +211,8 @@ class Gateway:
result = list(done)[0].result() result = list(done)[0].result()
return result return result
async def discover_commands(self, soap_socket: socket.socket = None): async def discover_commands(self, loop=None):
response, xml_bytes, get_err = await scpd_get(self.path.decode(), self.base_ip.decode(), self.port) response, xml_bytes, get_err = await scpd_get(self.path.decode(), self.base_ip.decode(), self.port, loop=loop)
self._xml_response = xml_bytes self._xml_response = xml_bytes
if get_err is not None: if get_err is not None:
raise get_err raise get_err
@ -230,9 +227,9 @@ class Gateway:
else: else:
self._device = Device(self._devices, self._services) self._device = Device(self._devices, self._services)
for service_type in self.services.keys(): for service_type in self.services.keys():
await self.register_commands(self.services[service_type], soap_socket) await self.register_commands(self.services[service_type], loop)
async def register_commands(self, service: Service, soap_socket: socket.socket = None): async def register_commands(self, service: Service, loop=None):
if not service.SCPDURL: if not service.SCPDURL:
raise UPnPError("no scpd url") raise UPnPError("no scpd url")
@ -252,27 +249,10 @@ class Gateway:
for name, inputs, outputs in action_list: for name, inputs, outputs in action_list:
try: try:
current = getattr(self.commands, name) self.commands.register(self.base_ip, self.port, name, service.controlURL, service.serviceType.encode(),
annotations = current.__annotations__ inputs, outputs, loop)
return_types = annotations.get('return', None) self._registered_commands[name] = service.serviceType
if return_types: log.debug("registered %s::%s", service.serviceType, name)
if hasattr(return_types, '__args__'):
return_types = tuple([return_type_lambas.get(a, a) for a in return_types.__args__])
elif isinstance(return_types, type):
return_types = (return_types, )
return_types = {r: t for r, t in zip(outputs, return_types)}
param_types = {}
for param_name, param_type in annotations.items():
if param_name == "return":
continue
param_types[param_name] = param_type
command = SOAPCommand(
self.base_ip.decode(), self.port, service.controlURL, service.serviceType.encode(),
name, param_types, return_types, inputs, outputs, soap_socket)
setattr(command, "__doc__", current.__doc__)
setattr(self.commands, command.method, command)
self._registered_commands[command.method] = service.serviceType
log.debug("registered %s::%s", service.serviceType, command.method)
except AttributeError: except AttributeError:
s = self._unsupported_actions.get(service.serviceType, []) s = self._unsupported_actions.get(service.serviceType, [])
s.append(name) s.append(name)

View file

@ -1,5 +1,4 @@
import logging import logging
import socket
import typing import typing
import re import re
from collections import OrderedDict from collections import OrderedDict
@ -64,7 +63,7 @@ class SCPDHTTPClientProtocol(Protocol):
for i, line in enumerate(self.response_buff.split(b'\r\n')): for i, line in enumerate(self.response_buff.split(b'\r\n')):
if not line: # we hit the blank line between the headers and the body if not line: # we hit the blank line between the headers and the body
if i == (len(self.response_buff.split(b'\r\n')) - 1): if i == (len(self.response_buff.split(b'\r\n')) - 1):
continue # the body is still yet to be written return # the body is still yet to be written
if not self._got_headers: if not self._got_headers:
self._headers, self._response_code, self._response_msg = parse_headers( self._headers, self._response_code, self._response_msg = parse_headers(
b'\r\n'.join(self.response_buff.split(b'\r\n')[:i]) b'\r\n'.join(self.response_buff.split(b'\r\n')[:i])
@ -82,17 +81,17 @@ class SCPDHTTPClientProtocol(Protocol):
else: else:
self.finished.set_exception( self.finished.set_exception(
UPnPError( UPnPError(
"too many bytes written to response (%i vs %i expected)" % ( "too many bytes written to response (%i vs %i expected)" % (
len(body), self._content_length len(body), self._content_length
) )
) )
) )
return return
async def scpd_get(control_url: str, address: str, port: int) -> typing.Tuple[typing.Dict, bytes, async def scpd_get(control_url: str, address: str, port: int, loop=None) -> typing.Tuple[typing.Dict, bytes,
typing.Optional[Exception]]: typing.Optional[Exception]]:
loop = asyncio.get_event_loop_policy().get_event_loop() loop = loop or asyncio.get_event_loop_policy().get_event_loop()
finished: asyncio.Future = asyncio.Future() finished: asyncio.Future = asyncio.Future()
packet = serialize_scpd_get(control_url, address) packet = serialize_scpd_get(control_url, address)
transport, protocol = await loop.create_connection( transport, protocol = await loop.create_connection(
@ -105,6 +104,9 @@ async def scpd_get(control_url: str, address: str, port: int) -> typing.Tuple[ty
except asyncio.TimeoutError: except asyncio.TimeoutError:
error = UPnPError("get request timed out") error = UPnPError("get request timed out")
body = b'' body = b''
except UPnPError as err:
error = err
body = protocol.response_buff
finally: finally:
transport.close() transport.close()
if not error: if not error:
@ -116,21 +118,22 @@ async def scpd_get(control_url: str, address: str, port: int) -> typing.Tuple[ty
async def scpd_post(control_url: str, address: str, port: int, method: str, param_names: list, service_id: bytes, async def scpd_post(control_url: str, address: str, port: int, method: str, param_names: list, service_id: bytes,
soap_socket: socket.socket = None, **kwargs) -> typing.Tuple[typing.Dict, bytes, loop=None, **kwargs) -> typing.Tuple[typing.Dict, bytes, typing.Optional[Exception]]:
typing.Optional[Exception]]: loop = loop or asyncio.get_event_loop_policy().get_event_loop()
loop = asyncio.get_event_loop_policy().get_event_loop()
finished: asyncio.Future = asyncio.Future() finished: asyncio.Future = asyncio.Future()
packet = serialize_soap_post(method, param_names, service_id, address.encode(), control_url.encode(), **kwargs) packet = serialize_soap_post(method, param_names, service_id, address.encode(), control_url.encode(), **kwargs)
transport, protocol = await loop.create_connection( transport, protocol = await loop.create_connection(
lambda : SCPDHTTPClientProtocol( lambda : SCPDHTTPClientProtocol(
packet, finished, soap_method=method, soap_service_id=service_id.decode(), packet, finished, soap_method=method, soap_service_id=service_id.decode(),
), address, port, sock=soap_socket ), address, port
) )
assert isinstance(protocol, SCPDHTTPClientProtocol) assert isinstance(protocol, SCPDHTTPClientProtocol)
try: try:
body, response_code, response_msg = await asyncio.wait_for(finished, 1.0) body, response_code, response_msg = await asyncio.wait_for(finished, 1.0)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return {}, b'', UPnPError("Timeout") return {}, b'', UPnPError("Timeout")
except UPnPError as err:
return {}, protocol.response_buff, err
finally: finally:
transport.close() transport.close()
try: try:

View file

@ -1,60 +0,0 @@
import logging
import socket
import asyncio
import typing
import time
from aioupnp.protocols.scpd import scpd_post
from aioupnp.fault import UPnPError
log = logging.getLogger(__name__)
def safe_type(t):
if t is typing.Tuple:
return tuple
if t is typing.List:
return list
if t is typing.Dict:
return dict
if t is typing.Set:
return set
return t
class SOAPCommand:
def __init__(self, gateway_address: str, service_port: int, control_url: str, service_id: bytes, method: str,
param_types: dict, return_types: dict, param_order: list, return_order: list,
soap_socket: socket.socket = None) -> None:
self.gateway_address = gateway_address
self.service_port = service_port
self.control_url = control_url
self.service_id = service_id
self.method = method
self.param_types = param_types
self.param_order = param_order
self.return_types = return_types
self.return_order = return_order
self.soap_socket = soap_socket
self._requests: typing.List = []
async def __call__(self, **kwargs) -> typing.Union[None, typing.Dict, typing.List, typing.Tuple]:
if set(kwargs.keys()) != set(self.param_types.keys()):
raise Exception("argument mismatch: %s vs %s" % (kwargs.keys(), self.param_types.keys()))
soap_kwargs = {n: safe_type(self.param_types[n])(kwargs[n]) for n in self.param_types.keys()}
response, xml_bytes, err = await scpd_post(
self.control_url, self.gateway_address, self.service_port, self.method, self.param_order,
self.service_id, self.soap_socket, **soap_kwargs
)
if err is not None:
self._requests.append((soap_kwargs, xml_bytes, None, err, time.time()))
raise err
if not response:
result = None
else:
recast_result = tuple([safe_type(self.return_types[n])(response.get(n)) for n in self.return_order])
if len(recast_result) == 1:
result = recast_result[0]
else:
result = recast_result
self._requests.append((soap_kwargs, xml_bytes, result, None, time.time()))
return result

View file

@ -11,7 +11,7 @@ from aioupnp.fault import UPnPError
from aioupnp.gateway import Gateway from aioupnp.gateway import Gateway
from aioupnp.util import get_gateway_and_lan_addresses from aioupnp.util import get_gateway_and_lan_addresses
from aioupnp.protocols.ssdp import m_search, fuzzy_m_search from aioupnp.protocols.ssdp import m_search, fuzzy_m_search
from aioupnp.protocols.soap import SOAPCommand from aioupnp.commands import SOAPCommand
from aioupnp.serialization.ssdp import SSDPDatagram from aioupnp.serialization.ssdp import SSDPDatagram
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -47,14 +47,13 @@ class UPnP:
@classmethod @classmethod
async def discover(cls, lan_address: str = '', gateway_address: str = '', timeout: int = 30, async def discover(cls, lan_address: str = '', gateway_address: str = '', timeout: int = 30,
igd_args: OrderedDict = None, interface_name: str = 'default', igd_args: OrderedDict = None, interface_name: str = 'default', loop=None):
ssdp_socket: socket.socket = None, soap_socket: socket.socket = None):
try: try:
lan_address, gateway_address = cls.get_lan_and_gateway(lan_address, gateway_address, interface_name) lan_address, gateway_address = cls.get_lan_and_gateway(lan_address, gateway_address, interface_name)
except Exception as err: except Exception as err:
raise UPnPError("failed to get lan and gateway addresses: %s" % str(err)) raise UPnPError("failed to get lan and gateway addresses: %s" % str(err))
gateway = await Gateway.discover_gateway( gateway = await Gateway.discover_gateway(
lan_address, gateway_address, timeout, igd_args, ssdp_socket, soap_socket lan_address, gateway_address, timeout, igd_args, loop
) )
return cls(lan_address, gateway_address, gateway) return cls(lan_address, gateway_address, gateway)
@ -342,21 +341,31 @@ class UPnP:
@classmethod @classmethod
def run_cli(cls, method, igd_args: OrderedDict, lan_address: str = '', gateway_address: str = '', timeout: int = 30, def run_cli(cls, method, igd_args: OrderedDict, lan_address: str = '', gateway_address: str = '', timeout: int = 30,
interface_name: str = 'default', kwargs: dict = None) -> None: interface_name: str = 'default', kwargs: dict = None) -> None:
"""
:param method: the command name
:param igd_args: ordered case sensitive M-SEARCH headers, if provided all headers to be used must be provided
:param lan_address: the ip address of the local interface
:param gateway_address: the ip address of the gateway
:param timeout: timeout, in seconds
:param interface_name: name of the network interface, the default is aliased to 'default'
:param kwargs: keyword arguments for the command
"""
kwargs = kwargs or {} kwargs = kwargs or {}
igd_args = igd_args igd_args = igd_args
timeout = int(timeout) timeout = int(timeout)
loop = asyncio.get_event_loop_policy().get_event_loop() loop = asyncio.get_event_loop_policy().get_event_loop()
fut: asyncio.Future = asyncio.Future() fut: asyncio.Future = asyncio.Future()
async def wrapper(): async def wrapper(): # wrap the upnp setup and call of the command in a coroutine
if method == 'm_search':
if method == 'm_search': # if we're only m_searching don't do any device discovery
fn = lambda *_a, **_kw: cls.m_search( fn = lambda *_a, **_kw: cls.m_search(
lan_address, gateway_address, timeout, igd_args, interface_name lan_address, gateway_address, timeout, igd_args, interface_name
) )
else: else: # automatically discover the gateway
try: try:
u = await cls.discover( u = await cls.discover(
lan_address, gateway_address, timeout, igd_args, interface_name lan_address, gateway_address, timeout, igd_args, interface_name, loop=loop
) )
except UPnPError as err: except UPnPError as err:
fut.set_exception(err) fut.set_exception(err)
@ -366,7 +375,7 @@ class UPnP:
else: else:
fut.set_exception(UPnPError("\"%s\" is not a recognized command" % method)) fut.set_exception(UPnPError("\"%s\" is not a recognized command" % method))
return return
try: try: # call the command
result = await fn(**{k: fn.__annotations__[k](v) for k, v in kwargs.items()}) result = await fn(**{k: fn.__annotations__[k](v) for k, v in kwargs.items()})
fut.set_result(result) fut.set_result(result)
except UPnPError as err: except UPnPError as err:

View file

@ -2,3 +2,6 @@
python_version = 3.7 python_version = 3.7
mypy_path=stubs mypy_path=stubs
cache_dir=/dev/null cache_dir=/dev/null
[mypy-tests]
ignore_errors=true

113
tests/protocols/__init__.py Normal file
View file

@ -0,0 +1,113 @@
import asyncio
import unittest
from unittest.case import _Outcome
try:
from asyncio.runners import _cancel_all_tasks
except ImportError:
# this is only available in py3.7
def _cancel_all_tasks(loop):
pass
class TestBase(unittest.TestCase):
# Implementation inspired by discussion:
# https://bugs.python.org/issue32972
async def asyncSetUp(self):
pass
async def asyncTearDown(self):
pass
async def doAsyncCleanups(self):
pass
def run(self, result=None):
orig_result = result
if result is None:
result = self.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None:
startTestRun()
result.startTest(self)
testMethod = getattr(self, self._testMethodName)
if (getattr(self.__class__, "__unittest_skip__", False) or
getattr(testMethod, "__unittest_skip__", False)):
# If the class or method was skipped.
try:
skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
or getattr(testMethod, '__unittest_skip_why__', ''))
self._addSkip(result, self, skip_why)
finally:
result.stopTest(self)
return
expecting_failure_method = getattr(testMethod,
"__unittest_expecting_failure__", False)
expecting_failure_class = getattr(self,
"__unittest_expecting_failure__", False)
expecting_failure = expecting_failure_class or expecting_failure_method
outcome = _Outcome(result)
try:
self._outcome = outcome
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
loop.set_debug(True)
with outcome.testPartExecutor(self):
self.setUp()
loop.run_until_complete(self.asyncSetUp())
if outcome.success:
outcome.expecting_failure = expecting_failure
with outcome.testPartExecutor(self, isTest=True):
possible_coroutine = testMethod()
if asyncio.iscoroutine(possible_coroutine):
loop.run_until_complete(possible_coroutine)
outcome.expecting_failure = False
with outcome.testPartExecutor(self):
loop.run_until_complete(self.asyncTearDown())
self.tearDown()
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.close()
self.doCleanups()
for test, reason in outcome.skipped:
self._addSkip(result, test, reason)
self._feedErrorsToResult(result, outcome.errors)
if outcome.success:
if expecting_failure:
if outcome.expectedFailure:
self._addExpectedFailure(result, outcome.expectedFailure)
else:
self._addUnexpectedSuccess(result)
else:
result.addSuccess(self)
return result
finally:
result.stopTest(self)
if orig_result is None:
stopTestRun = getattr(result, 'stopTestRun', None)
if stopTestRun is not None:
stopTestRun()
# explicitly break reference cycles:
# outcome.errors -> frame -> outcome -> outcome.errors
# outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure
outcome.errors.clear()
outcome.expectedFailure = None
# clear the outcome, no more needed
self._outcome = None
def setUp(self):
self.loop = asyncio.get_event_loop_policy().get_event_loop()

View file

@ -1,26 +1,7 @@
import asyncio import asyncio
import inspect
import contextlib import contextlib
import socket import socket
import mock import mock
import unittest
def async_test(f):
def wrapper(*args, **kwargs):
if inspect.iscoroutinefunction(f):
future = f(*args, **kwargs)
else:
coroutine = asyncio.coroutine(f)
future = coroutine(*args, **kwargs)
asyncio.get_event_loop().run_until_complete(future)
return wrapper
class TestBase(unittest.TestCase):
def setUp(self):
self.loop = asyncio.get_event_loop_policy().get_event_loop()
@contextlib.contextmanager @contextlib.contextmanager
@ -58,3 +39,39 @@ def mock_datagram_endpoint_factory(loop, expected_addr, replies=None, delay_repl
mock_socket.return_value = mock_sock mock_socket.return_value = mock_sock
loop.create_datagram_endpoint = create_datagram_endpoint loop.create_datagram_endpoint = create_datagram_endpoint
yield yield
@contextlib.contextmanager
def mock_tcp_endpoint_factory(loop, replies=None, delay_reply=0.0, sent_packets=None):
sent_packets = sent_packets if sent_packets is not None else []
replies = replies or {}
def write(p: asyncio.Protocol):
def _write(data):
sent_packets.append(data)
if data in replies:
loop.call_later(delay_reply, p.data_received, replies[data])
return _write
async def create_connection(protocol_factory, host=None, port=None):
protocol = protocol_factory()
transport = asyncio.Transport(extra={'socket': mock_sock})
transport.close = lambda: mock_sock.close()
mock_sock.write = write(protocol)
transport.write = mock_sock.write
protocol.connection_made(transport)
return transport, protocol
with mock.patch('socket.socket') as mock_socket:
mock_sock = mock.Mock(spec=socket.socket)
mock_sock.setsockopt = lambda *_: None
mock_sock.bind = lambda *_: None
mock_sock.setblocking = lambda *_: None
mock_sock.getsockname = lambda: "0.0.0.0"
mock_sock.getpeername = lambda: ""
mock_sock.close = lambda: None
mock_sock.type = socket.SOCK_STREAM
mock_sock.fileno = lambda: 7
mock_socket.return_value = mock_sock
loop.create_connection = create_connection
yield

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,228 @@
from aioupnp.fault import UPnPError
from aioupnp.protocols.scpd import scpd_post, scpd_get
from . import TestBase
from .mocks import mock_tcp_endpoint_factory
class TestSCPDGet(TestBase):
path, lan_address, port = '/IGDdevicedesc_brlan0.xml', '10.1.10.1', 49152
get_request = b'GET /IGDdevicedesc_brlan0.xml HTTP/1.1\r\n' \
b'Accept-Encoding: gzip\r\nHost: 10.1.10.1\r\nConnection: Close\r\n\r\n'
response = b"HTTP/1.1 200 OK\r\n" \
b"CONTENT-LENGTH: 2972\r\n" \
b"CONTENT-TYPE: text/xml\r\n" \
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
b"LAST-MODIFIED: Fri, 28 Sep 2018 18:35:48 GMT\r\n" \
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
b"X-User-Agent: redsonic\r\n" \
b"CONNECTION: close\r\n" \
b"\r\n" \
b"<?xml version=\"1.0\"?>\n<root xmlns=\"urn:schemas-upnp-org:device-1-0\">\n<specVersion>\n<major>1</major>\n<minor>0</minor>\n</specVersion>\n<device>\n<deviceType>urn:schemas-upnp-org:device:InternetGatewayDevice:1</deviceType>\n<friendlyName>CGA4131COM</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:11111111-2222-3333-4444-555555555556</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:Layer3Forwarding:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:L3Forwarding1</serviceId>\n<SCPDURL>/Layer3ForwardingSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/Layer3Forwarding</controlURL>\n<eventSubURL>/upnp/event/Layer3Forwarding</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n<device>\n<deviceType>urn:schemas-upnp-org:device:WANDevice:1</deviceType>\n<friendlyName>WANDevice:1</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:ebf5a0a0-1dd1-11b2-a92f-603d266f9915</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:WANCommonIFC1</serviceId>\n<SCPDURL>/WANCommonInterfaceConfigSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/WANCommonInterfaceConfig0</controlURL>\n<eventSubURL>/upnp/event/WANCommonInterfaceConfig0</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n <device>\n <deviceType>urn:schemas-upnp-org:device:WANConnectionDevice:1</deviceType>\n <friendlyName>WANConnectionDevice:1</friendlyName>\n <manufacturer>Cisco</manufacturer>\n <manufacturerURL>http://www.cisco.com/</manufacturerURL>\n <modelDescription>CGA4131COM</modelDescription>\n <modelName>CGA4131COM</modelName>\n <modelNumber>CGA4131COM</modelNumber>\n <modelURL>http://www.cisco.com</modelURL>\n <serialNumber></serialNumber>\n <UDN>uuid:11111111-2222-3333-4444-555555555555</UDN>\n <UPC>CGA4131COM</UPC>\n <serviceList>\n <service>\n <serviceType>urn:schemas-upnp-org:service:WANIPConnection:1</serviceType>\n <serviceId>urn:upnp-org:serviceId:WANIPConn1</serviceId>\n <SCPDURL>/WANIPConnectionServiceSCPD.xml</SCPDURL>\n <controlURL>/upnp/control/WANIPConnection0</controlURL>\n <eventSubURL>/upnp/event/WANIPConnection0</eventSubURL>\n </service>\n </serviceList>\n </device>\n</deviceList>\n</device>\n</deviceList>\n<presentationURL>http://10.1.10.1/</presentationURL></device>\n</root>\n"
bad_xml = b"<?xml version=\"1.0\"?>\n<root xmlns=\"urn:schemas-upnp-org:device-1-0\">\n<specVersion>\n<major>1</major>\n<minor>0</minor>\n</specVersion>\n<device>\n<deviceType>urn:schemas-upnp-org:device:InternetGatewayDevice:1</deviceType>\n<friendlyName>CGA4131COM</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:11111111-2222-3333-4444-555555555556</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:Layer3Forwarding:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:L3Forwarding1</serviceId>\n<SCPDURL>/Layer3ForwardingSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/Layer3Forwarding</controlURL>\n<eventSubURL>/upnp/event/Layer3Forwarding</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n<device>\n<deviceType>urn:schemas-upnp-org:device:WANDevice:1</deviceType>\n<friendlyName>WANDevice:1</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:ebf5a0a0-1dd1-11b2-a92f-603d266f9915</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:WANCommonIFC1</serviceId>\n<SCPDURL>/WANCommonInterfaceConfigSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/WANCommonInterfaceConfig0</controlURL>\n<eventSubURL>/upnp/event/WANCommonInterfaceConfig0</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n <device>\n <deviceType>urn:schemas-upnp-org:device:WANConnectionDevice:1</deviceType>\n <friendlyName>WANConnectionDevice:1</friendlyName>\n <manufacturer>Cisco</manufacturer>\n <manufacturerURL>http://www.cisco.com/</manufacturerURL>\n <modelDescription>CGA4131COM</modelDescription>\n <modelName>CGA4131COM</modelName>\n <modelNumber>CGA4131COM</modelNumber>\n <modelURL>http://www.cisco.com</modelURL>\n <serialNumber></serialNumber>\n <UDN>uuid:11111111-2222-3333-4444-555555555555</UDN>\n <UPC>CGA4131COM</UPC>\n <serviceList>\n <service>\n <serviceType>urn:schemas-upnp-org:service:WANIPConnection:1</serviceType>\n <serviceId>urn:upnp-org:serviceId:WANIPConn1</serviceId>\n <SCPDURL>/WANIPConnectionServiceSCPD.xml</SCPDURL>\n <controlURL>/upnp/control/WANIPConnection0</controlURL>\n <eventSubURL>/upnp/event/WANIPConnection0</eventSubURL>\n </service>\n </serviceList>\n </device>\n</deviceList>\n</device>\n</deviceList>\n<presentationURL>http://10.1.10.1/</presentationURL></device>\n/root>\n"
bad_response = b"HTTP/1.1 200 OK\r\n" \
b"CONTENT-LENGTH: 2971\r\n" \
b"CONTENT-TYPE: text/xml\r\n" \
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
b"LAST-MODIFIED: Fri, 28 Sep 2018 18:35:48 GMT\r\n" \
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
b"X-User-Agent: redsonic\r\n" \
b"CONNECTION: close\r\n" \
b"\r\n" \
b"%s" % bad_xml
expected_parsed = {
'specVersion': {'major': '1', 'minor': '0'},
'device': {
'deviceType': 'urn:schemas-upnp-org:device:InternetGatewayDevice:1',
'friendlyName': 'CGA4131COM',
'manufacturer': 'Cisco',
'manufacturerURL': 'http://www.cisco.com/',
'modelDescription': 'CGA4131COM',
'modelName': 'CGA4131COM',
'modelNumber': 'CGA4131COM',
'modelURL': 'http://www.cisco.com',
'UDN': 'uuid:11111111-2222-3333-4444-555555555556',
'UPC': 'CGA4131COM',
'serviceList': {
'service': {
'serviceType': 'urn:schemas-upnp-org:service:Layer3Forwarding:1',
'serviceId': 'urn:upnp-org:serviceId:L3Forwarding1',
'SCPDURL': '/Layer3ForwardingSCPD.xml',
'controlURL': '/upnp/control/Layer3Forwarding',
'eventSubURL': '/upnp/event/Layer3Forwarding'
}
},
'deviceList': {
'device': {
'deviceType': 'urn:schemas-upnp-org:device:WANDevice:1',
'friendlyName': 'WANDevice:1',
'manufacturer': 'Cisco',
'manufacturerURL': 'http://www.cisco.com/',
'modelDescription': 'CGA4131COM',
'modelName': 'CGA4131COM',
'modelNumber': 'CGA4131COM',
'modelURL': 'http://www.cisco.com',
'UDN': 'uuid:ebf5a0a0-1dd1-11b2-a92f-603d266f9915',
'UPC': 'CGA4131COM',
'serviceList': {
'service': {
'serviceType': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'serviceId': 'urn:upnp-org:serviceId:WANCommonIFC1',
'SCPDURL': '/WANCommonInterfaceConfigSCPD.xml',
'controlURL': '/upnp/control/WANCommonInterfaceConfig0',
'eventSubURL': '/upnp/event/WANCommonInterfaceConfig0'
}
},
'deviceList': {
'device': {
'deviceType': 'urn:schemas-upnp-org:device:WANConnectionDevice:1',
'friendlyName': 'WANConnectionDevice:1',
'manufacturer': 'Cisco',
'manufacturerURL': 'http://www.cisco.com/',
'modelDescription': 'CGA4131COM',
'modelName': 'CGA4131COM',
'modelNumber': 'CGA4131COM',
'modelURL': 'http://www.cisco.com',
'UDN': 'uuid:11111111-2222-3333-4444-555555555555',
'UPC': 'CGA4131COM',
'serviceList': {
'service': {
'serviceType': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'serviceId': 'urn:upnp-org:serviceId:WANIPConn1',
'SCPDURL': '/WANIPConnectionServiceSCPD.xml',
'controlURL': '/upnp/control/WANIPConnection0',
'eventSubURL': '/upnp/event/WANIPConnection0'
}
}
}
}
}
},
'presentationURL': 'http://10.1.10.1/'
}
}
async def test_scpd_get(self):
sent = []
replies = {self.get_request: self.response}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
self.assertEqual(None, err)
self.assertDictEqual(self.expected_parsed, result)
async def test_scpd_get_timeout(self):
sent = []
replies = {}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
self.assertTrue(isinstance(err, UPnPError))
self.assertDictEqual({}, result)
self.assertEqual(b'', raw)
async def test_scpd_get_bad_xml(self):
sent = []
replies = {self.get_request: self.bad_response}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
self.assertDictEqual({}, result)
self.assertEqual(self.bad_xml, raw)
self.assertTrue(isinstance(err, UPnPError))
self.assertTrue(str(err).startswith('no element found'))
async def test_scpd_get_overrun_content_length(self):
sent = []
replies = {self.get_request: self.bad_response + b'\r\n'}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
self.assertDictEqual({}, result)
self.assertEqual(self.bad_response + b'\r\n', raw)
self.assertTrue(isinstance(err, UPnPError))
self.assertTrue(str(err).startswith('too many bytes written'))
class TestSCPDPost(TestBase):
param_names: list = []
kwargs: dict = {}
method, gateway_address, port = "GetExternalIPAddress", '10.0.0.1', 49152
st, lan_address, path = b'urn:schemas-upnp-org:service:WANIPConnection:1', '10.0.0.2', '/soap.cgi?service=WANIPConn1'
post_bytes = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\n' \
b'Host: 10.0.0.1\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\n' \
b'Content-Length: 285\r\nContent-Type: text/xml\r\n' \
b'SOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\n' \
b'Connection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n' \
b'<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"' \
b' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">' \
b'<s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1">' \
b'</u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
bad_envelope = b"s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\"><s:Body>\n<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n<NewExternalIPAddress>11.22.33.44</NewExternalIPAddress>\r\n</u:GetExternalIPAddressResponse>\r\n</s:Body> </s:Envelope>"
envelope = b"<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\"><s:Body>\n<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n<NewExternalIPAddress>11.22.33.44</NewExternalIPAddress>\r\n</u:GetExternalIPAddressResponse>\r\n</s:Body> </s:Envelope>"
post_response = b"HTTP/1.1 200 OK\r\n" \
b"CONTENT-LENGTH: 340\r\n" \
b"CONTENT-TYPE: text/xml; charset=\"utf-8\"\r\n" \
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
b"EXT:\r\n" \
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
b"X-User-Agent: redsonic\r\n" \
b"\r\n" \
b"%s" % envelope
bad_envelope_response = b"HTTP/1.1 200 OK\r\n" \
b"CONTENT-LENGTH: 339\r\n" \
b"CONTENT-TYPE: text/xml; charset=\"utf-8\"\r\n" \
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
b"EXT:\r\n" \
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
b"X-User-Agent: redsonic\r\n" \
b"\r\n" \
b"%s" % bad_envelope
async def test_scpd_post(self):
sent = []
replies = {self.post_bytes: self.post_response}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_post(
self.path, self.gateway_address, self.port, self.method, self.param_names, self.st, self.loop
)
self.assertEqual(None, err)
self.assertEqual(self.envelope, raw)
self.assertDictEqual({'NewExternalIPAddress': '11.22.33.44'}, result)
async def test_scpd_post_timeout(self):
sent = []
replies = {}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_post(
self.path, self.gateway_address, self.port, self.method, self.param_names, self.st, self.loop
)
self.assertTrue(isinstance(err, UPnPError))
self.assertTrue(str(err).startswith('Timeout'))
self.assertEqual(b'', raw)
self.assertDictEqual({}, result)
async def test_scpd_post_bad_xml_response(self):
sent = []
replies = {self.post_bytes: self.bad_envelope_response}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_post(
self.path, self.gateway_address, self.port, self.method, self.param_names, self.st, self.loop
)
self.assertTrue(isinstance(err, UPnPError))
self.assertTrue(str(err).startswith('no element found'))
self.assertEqual(self.bad_envelope, raw)
self.assertDictEqual({}, result)
async def test_scpd_post_overrun_response(self):
sent = []
replies = {self.post_bytes: self.post_response + b'\r\n'}
with mock_tcp_endpoint_factory(self.loop, replies, sent_packets=sent):
result, raw, err = await scpd_post(
self.path, self.gateway_address, self.port, self.method, self.param_names, self.st, self.loop
)
self.assertTrue(isinstance(err, UPnPError))
self.assertTrue(str(err).startswith('too many bytes written'))
self.assertEqual(self.post_response + b'\r\n', raw)
self.assertDictEqual({}, result)

View file

@ -4,7 +4,8 @@ from aioupnp.protocols.m_search_patterns import packet_generator
from aioupnp.serialization.ssdp import SSDPDatagram from aioupnp.serialization.ssdp import SSDPDatagram
from aioupnp.constants import SSDP_IP_ADDRESS from aioupnp.constants import SSDP_IP_ADDRESS
from aioupnp.protocols.ssdp import fuzzy_m_search, m_search from aioupnp.protocols.ssdp import fuzzy_m_search, m_search
from aioupnp.protocols.test_common import TestBase, async_test, mock_datagram_endpoint_factory from . import TestBase
from .mocks import mock_datagram_endpoint_factory
class TestSSDP(TestBase): class TestSSDP(TestBase):
@ -28,7 +29,6 @@ class TestSSDP(TestBase):
]) ])
reply_packet = SSDPDatagram("OK", reply_args) reply_packet = SSDPDatagram("OK", reply_args)
@async_test
async def test_m_search_reply_unicast(self): async def test_m_search_reply_unicast(self):
replies = { replies = {
(self.query_packet.encode().encode(), ("10.0.0.1", 1900)): self.reply_packet.encode().encode() (self.query_packet.encode().encode(), ("10.0.0.1", 1900)): self.reply_packet.encode().encode()
@ -45,7 +45,6 @@ class TestSSDP(TestBase):
with mock_datagram_endpoint_factory(self.loop, "10.0.0.1", replies=replies): with mock_datagram_endpoint_factory(self.loop, "10.0.0.1", replies=replies):
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop, unicast=False) await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop, unicast=False)
@async_test
async def test_m_search_reply_multicast(self): async def test_m_search_reply_multicast(self):
replies = { replies = {
(self.query_packet.encode().encode(), (SSDP_IP_ADDRESS, 1900)): self.reply_packet.encode().encode() (self.query_packet.encode().encode(), (SSDP_IP_ADDRESS, 1900)): self.reply_packet.encode().encode()
@ -62,7 +61,6 @@ class TestSSDP(TestBase):
with mock_datagram_endpoint_factory(self.loop, "10.0.0.1", replies=replies): with mock_datagram_endpoint_factory(self.loop, "10.0.0.1", replies=replies):
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop, unicast=True) await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop, unicast=True)
@async_test
async def test_packets_sent_fuzzy_m_search(self): async def test_packets_sent_fuzzy_m_search(self):
sent = [] sent = []
@ -72,7 +70,6 @@ class TestSSDP(TestBase):
self.assertListEqual(sent, self.byte_packets) self.assertListEqual(sent, self.byte_packets)
@async_test
async def test_packets_fuzzy_m_search(self): async def test_packets_fuzzy_m_search(self):
replies = { replies = {
(self.query_packet.encode().encode(), (SSDP_IP_ADDRESS, 1900)): self.reply_packet.encode().encode() (self.query_packet.encode().encode(), (SSDP_IP_ADDRESS, 1900)): self.reply_packet.encode().encode()

View file

View file

@ -1,5 +1,7 @@
import unittest import unittest
from aioupnp.serialization.scpd import serialize_scpd_get, deserialize_scpd_get_response from aioupnp.serialization.scpd import serialize_scpd_get, deserialize_scpd_get_response
from aioupnp.device import Device
from aioupnp.util import get_dict_val_case_insensitive
class TestSCPDSerialization(unittest.TestCase): class TestSCPDSerialization(unittest.TestCase):
@ -16,7 +18,7 @@ class TestSCPDSerialization(unittest.TestCase):
b"X-User-Agent: redsonic\r\n" \ b"X-User-Agent: redsonic\r\n" \
b"CONNECTION: close\r\n" \ b"CONNECTION: close\r\n" \
b"\r\n" \ b"\r\n" \
b"<?xml version=\"1.0\"?>\n<root xmlns=\"urn:schemas-upnp-org:device-1-0\">\n<specVersion>\n<major>1</major>\n<minor>0</minor>\n</specVersion>\n<device>\n<deviceType>urn:schemas-upnp-org:device:InternetGatewayDevice:1</deviceType>\n<friendlyName>CGA4131COM</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:11111111-2222-3333-4444-555555555556</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:Layer3Forwarding:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:L3Forwarding1</serviceId>\n<SCPDURL>/Layer3ForwardingSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/Layer3Forwarding</controlURL>\n<eventSubURL>/upnp/event/Layer3Forwarding</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n<device>\n<deviceType>urn:schemas-upnp-org:device:WANDevice:1</deviceType>\n<friendlyName>WANDevice:1</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:ebf5a0a0-1dd1-11b2-a92f-603d266f9915</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:WANCommonIFC1</serviceId>\n<SCPDURL>/WANCommonInterfaceConfigSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/WANCommonInterfaceConfig0</controlURL>\n<eventSubURL>/upnp/event/WANCommonInterfaceConfig0</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n <device>\n <deviceType>urn:schemas-upnp-org:device:WANConnectionDevice:1</deviceType>\n <friendlyName>WANConnectionDevice:1</friendlyName>\n <manufacturer>Cisco</manufacturer>\n <manufacturerURL>http://www.cisco.com/</manufacturerURL>\n <modelDescription>CGA4131COM</modelDescription>\n <modelName>CGA4131COM</modelName>\n <modelNumber>CGA4131COM</modelNumber>\n <modelURL>http://www.cisco.com</modelURL>\n <serialNumber></serialNumber>\n <UDN>uuid:11111111-2222-3333-4444-555555555555</UDN>\n <UPC>CGA4131COM</UPC>\n <serviceList>\n <service>\n <serviceType>urn:schemas-upnp-org:service:WANIPConnection:1</serviceType>\n <serviceId>urn:upnp-org:serviceId:WANIPConn1</serviceId>\n <SCPDURL>/WANIPConnectionServiceSCPD.xml</SCPDURL>\n <controlURL>/upnp/control/WANIPConnection0</controlURL>\n <eventSubURL>/upnp/event/WANIPConnection0</eventSubURL>\n </service>\n </serviceList>\n </device>\n</deviceList>\n</device>\n</deviceList>\n<presentationURL>http://10.1.10.1/</presentationURL></device>\n</root>\n" b"<?xml version=\"1.0\"?>\n<root xmlns=\"urn:schemas-upnp-org:device-1-0\">\n<specVersion>\n<major>1</major>\n<minor>0</minor>\n</specVersion>\n<device>\n<deviceType>urn:schemas-upnp-org:device:InternetGatewayDevice:1</deviceType>\n<friendlyName>CGA4131COM</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:11111111-2222-3333-4444-555555555556</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:Layer3Forwarding:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:L3Forwarding1</serviceId>\n<SCPDURL>/Layer3ForwardingSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/Layer3Forwarding</controlURL>\n<eventSubURL>/upnp/event/Layer3Forwarding</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n<device>\n<deviceType>urn:schemas-upnp-org:device:WANDevice:1</deviceType>\n<friendlyName>WANDevice:1</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:11111111-2222-3333-4444-555555555556</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:WANCommonIFC1</serviceId>\n<SCPDURL>/WANCommonInterfaceConfigSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/WANCommonInterfaceConfig0</controlURL>\n<eventSubURL>/upnp/event/WANCommonInterfaceConfig0</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n <device>\n <deviceType>urn:schemas-upnp-org:device:WANConnectionDevice:1</deviceType>\n <friendlyName>WANConnectionDevice:1</friendlyName>\n <manufacturer>Cisco</manufacturer>\n <manufacturerURL>http://www.cisco.com/</manufacturerURL>\n <modelDescription>CGA4131COM</modelDescription>\n <modelName>CGA4131COM</modelName>\n <modelNumber>CGA4131COM</modelNumber>\n <modelURL>http://www.cisco.com</modelURL>\n <serialNumber></serialNumber>\n <UDN>uuid:11111111-2222-3333-4444-555555555555</UDN>\n <UPC>CGA4131COM</UPC>\n <serviceList>\n <service>\n <serviceType>urn:schemas-upnp-org:service:WANIPConnection:1</serviceType>\n <serviceId>urn:upnp-org:serviceId:WANIPConn1</serviceId>\n <SCPDURL>/WANIPConnectionServiceSCPD.xml</SCPDURL>\n <controlURL>/upnp/control/WANIPConnection0</controlURL>\n <eventSubURL>/upnp/event/WANIPConnection0</eventSubURL>\n </service>\n </serviceList>\n </device>\n</deviceList>\n</device>\n</deviceList>\n<presentationURL>http://10.1.10.1/</presentationURL></device>\n</root>\n"
expected_parsed = { expected_parsed = {
'specVersion': {'major': '1', 'minor': '0'}, 'specVersion': {'major': '1', 'minor': '0'},
@ -50,7 +52,7 @@ class TestSCPDSerialization(unittest.TestCase):
'modelName': 'CGA4131COM', 'modelName': 'CGA4131COM',
'modelNumber': 'CGA4131COM', 'modelNumber': 'CGA4131COM',
'modelURL': 'http://www.cisco.com', 'modelURL': 'http://www.cisco.com',
'UDN': 'uuid:ebf5a0a0-1dd1-11b2-a92f-603d266f9915', 'UDN': 'uuid:11111111-2222-3333-4444-555555555556',
'UPC': 'CGA4131COM', 'UPC': 'CGA4131COM',
'serviceList': { 'serviceList': {
'service': { 'service': {
@ -98,3 +100,76 @@ class TestSCPDSerialization(unittest.TestCase):
def test_deserialize_blank(self): def test_deserialize_blank(self):
self.assertDictEqual(deserialize_scpd_get_response(b''), {}) self.assertDictEqual(deserialize_scpd_get_response(b''), {})
def test_deserialize_to_device_object(self):
devices = []
services = []
device = Device(devices, services, **get_dict_val_case_insensitive(self.expected_parsed, "device"))
expected_result = {
'deviceType': 'urn:schemas-upnp-org:device:InternetGatewayDevice:1',
'friendlyName': 'CGA4131COM',
'manufacturer': 'Cisco',
'manufacturerURL': 'http://www.cisco.com/',
'modelDescription': 'CGA4131COM',
'modelName': 'CGA4131COM',
'modelNumber': 'CGA4131COM',
'modelURL': 'http://www.cisco.com',
'udn': 'uuid:11111111-2222-3333-4444-555555555556',
'upc': 'CGA4131COM',
'serviceList': {
'service': {
'serviceType': 'urn:schemas-upnp-org:service:Layer3Forwarding:1',
'serviceId': 'urn:upnp-org:serviceId:L3Forwarding1',
'SCPDURL': '/Layer3ForwardingSCPD.xml',
'controlURL': '/upnp/control/Layer3Forwarding',
'eventSubURL': '/upnp/event/Layer3Forwarding'
}
},
'deviceList': {
'device': {
'deviceType': 'urn:schemas-upnp-org:device:WANDevice:1',
'friendlyName': 'WANDevice:1',
'manufacturer': 'Cisco',
'manufacturerURL': 'http://www.cisco.com/',
'modelDescription': 'CGA4131COM',
'modelName': 'CGA4131COM',
'modelNumber': 'CGA4131COM',
'modelURL': 'http://www.cisco.com',
'UDN': 'uuid:11111111-2222-3333-4444-555555555556',
'UPC': 'CGA4131COM',
'serviceList': {
'service': {
'serviceType': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'serviceId': 'urn:upnp-org:serviceId:WANCommonIFC1',
'SCPDURL': '/WANCommonInterfaceConfigSCPD.xml',
'controlURL': '/upnp/control/WANCommonInterfaceConfig0',
'eventSubURL': '/upnp/event/WANCommonInterfaceConfig0'
}
},
'deviceList': {
'device': {
'deviceType': 'urn:schemas-upnp-org:device:WANConnectionDevice:1',
'friendlyName': 'WANConnectionDevice:1',
'manufacturer': 'Cisco',
'manufacturerURL': 'http://www.cisco.com/',
'modelDescription': 'CGA4131COM',
'modelName': 'CGA4131COM',
'modelNumber': 'CGA4131COM',
'modelURL': 'http://www.cisco.com',
'UDN': 'uuid:11111111-2222-3333-4444-555555555555',
'UPC': 'CGA4131COM',
'serviceList': {
'service': {
'serviceType': 'urn:schemas-upnp-org:service:WANIPConnection:1',
'serviceId': 'urn:upnp-org:serviceId:WANIPConn1',
'SCPDURL': '/WANIPConnectionServiceSCPD.xml',
'controlURL': '/upnp/control/WANIPConnection0',
'eventSubURL': '/upnp/event/WANIPConnection0'
}
}
}
}
}
}, 'presentationURL': 'http://10.1.10.1/'
}
self.assertDictEqual(expected_result, device.as_dict())

View file

@ -1,4 +1,5 @@
import unittest import unittest
from aioupnp.fault import UPnPError
from aioupnp.serialization.soap import serialize_soap_post, deserialize_soap_post_response from aioupnp.serialization.soap import serialize_soap_post, deserialize_soap_post_response
@ -27,6 +28,16 @@ class TestSOAPSerialization(unittest.TestCase):
b"\r\n" \ b"\r\n" \
b"<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\"><s:Body>\n<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n<NewExternalIPAddress>11.22.33.44</NewExternalIPAddress>\r\n</u:GetExternalIPAddressResponse>\r\n</s:Body> </s:Envelope>" b"<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\"><s:Body>\n<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\r\n<NewExternalIPAddress>11.22.33.44</NewExternalIPAddress>\r\n</u:GetExternalIPAddressResponse>\r\n</s:Body> </s:Envelope>"
error_response = b"HTTP/1.1 500 Internal Server Error\r\n" \
b"Server: WebServer\r\n" \
b"Date: Thu, 11 Oct 2018 22:16:17 GMT\r\n" \
b"Connection: close\r\n" \
b"CONTENT-TYPE: text/xml; charset=\"utf-8\"\r\n" \
b"CONTENT-LENGTH: 482 \r\n" \
b"EXT:\r\n" \
b"\r\n" \
b"<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<s:Fault>\n\t\t\t<faultcode>s:Client</faultcode>\n\t\t\t<faultstring>UPnPError</faultstring>\n\t\t\t<detail>\n\t\t\t\t<UPnPError xmlns=\"urn:schemas-upnp-org:control-1-0\">\n\t\t\t\t\t<errorCode>713</errorCode>\n\t\t\t\t\t<errorDescription>SpecifiedArrayIndexInvalid</errorDescription>\n\t\t\t\t</UPnPError>\n\t\t\t</detail>\n\t\t</s:Fault>\n\t</s:Body>\n</s:Envelope>\n"
def test_serialize_post(self): def test_serialize_post(self):
self.assertEqual(serialize_soap_post( self.assertEqual(serialize_soap_post(
self.method, self.param_names, self.st, self.gateway_address, self.path, **self.kwargs self.method, self.param_names, self.st, self.gateway_address, self.path, **self.kwargs
@ -37,3 +48,12 @@ class TestSOAPSerialization(unittest.TestCase):
deserialize_soap_post_response(self.post_response, self.method, service_id=self.st.decode()), deserialize_soap_post_response(self.post_response, self.method, service_id=self.st.decode()),
{'NewExternalIPAddress': '11.22.33.44'} {'NewExternalIPAddress': '11.22.33.44'}
) )
def test_raise_from_error_response(self):
raised = False
try:
deserialize_soap_post_response(self.error_response, self.method, service_id=self.st.decode())
except UPnPError as err:
raised = True
self.assertTrue(str(err) == 'SpecifiedArrayIndexInvalid')
self.assertTrue(raised)

View file

@ -0,0 +1,45 @@
import unittest
from aioupnp.device import CaseInsensitive
class TestService(CaseInsensitive):
serviceType = None
serviceId = None
controlURL = None
eventSubURL = None
SCPDURL = None
class TestCaseInsensitive(unittest.TestCase):
def test_initialize(self):
s = TestService(
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
)
self.assertEqual('test', getattr(s, 'serviceType'))
self.assertEqual('test', getattr(s, 'servicetype'))
self.assertEqual('test', getattr(s, 'SERVICETYPE'))
s = TestService(
servicetype="test", serviceid="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
)
self.assertEqual('test', getattr(s, 'serviceType'))
self.assertEqual('test', getattr(s, 'servicetype'))
self.assertEqual('test', getattr(s, 'SERVICETYPE'))
self.assertDictEqual({
'serviceType': 'test',
'serviceId': 'test id',
'controlURL': "/test",
'eventSubURL': "/test2",
'SCPDURL': "/test3"
}, s.as_dict())
def test_set_attr(self):
s = TestService(
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
)
self.assertEqual('test', getattr(s, 'serviceType'))
s.servicetype = 'foo'
self.assertEqual('foo', getattr(s, 'serviceType'))
self.assertEqual('foo', getattr(s, 'servicetype'))
self.assertEqual('foo', getattr(s, 'SERVICETYPE'))