From c1dad347ec97c418ee5ecde9e0a5edb5ff44d10b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 28 Jul 2018 22:08:24 -0400 Subject: [PATCH] more --- setup.py | 9 +- txupnp/__init__.py | 4 +- txupnp/constants.py | 20 ++- txupnp/fault.py | 4 +- txupnp/gateway.py | 95 +++++++++++++++ txupnp/scpd.py | 132 ++++++++++++++++---- txupnp/soap.py | 47 +++++++ txupnp/ssdp.py | 162 ++++++++++++++++++------- txupnp/tests/__init__.py | 0 {tests => txupnp/tests}/test_txupnp.py | 8 +- txupnp/upnp.py | 132 +++++++------------- txupnp/util.py | 2 + 12 files changed, 451 insertions(+), 164 deletions(-) create mode 100644 txupnp/gateway.py create mode 100644 txupnp/soap.py create mode 100644 txupnp/tests/__init__.py rename {tests => txupnp/tests}/test_txupnp.py (88%) diff --git a/setup.py b/setup.py index 88df1bc..a982002 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,9 @@ from setuptools import setup, find_packages +console_scripts = [ + 'test-txupnp = txupnp.tests.test_txupnp:main', +] + setup( name="txupnp", version="0.0.1", @@ -8,9 +12,12 @@ setup( description="UPnP for twisted", license='MIT', packages=find_packages(), + entry_points={'console_scripts': console_scripts}, install_requires=[ 'Twisted', 'treq', - 'netifaces' + 'netifaces', + 'pycryptodome', + 'service-identity' ], ) diff --git a/txupnp/__init__.py b/txupnp/__init__.py index 2bb3785..50ef9fe 100644 --- a/txupnp/__init__.py +++ b/txupnp/__init__.py @@ -1,5 +1,7 @@ import logging - +# from twisted.python import log +# observer = log.PythonLoggingObserver(loggerName=__name__) +# observer.start() log = logging.getLogger(__name__) handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)-15s-%(filename)s:%(lineno)s->%(message)s')) diff --git a/txupnp/constants.py b/txupnp/constants.py index 14a150f..1a5ca1f 100644 --- a/txupnp/constants.py +++ b/txupnp/constants.py @@ -1,16 +1,26 @@ POST = "POST" +ROOT = "root" +SPEC_VERSION = "specVersion" XML_VERSION = "" FAULT = "{http://schemas.xmlsoap.org/soap/envelope/}Fault" ENVELOPE = "{http://schemas.xmlsoap.org/soap/envelope/}Envelope" BODY = "{http://schemas.xmlsoap.org/soap/envelope/}Body" SOAP_ENCODING = "http://schemas.xmlsoap.org/soap/encoding/" SOAP_ENVELOPE = "http://schemas.xmlsoap.org/soap/envelope" -CONTROL_KEY = 'urn:schemas-upnp-org:control-1-0' -SERVICE_KEY = 'urn:schemas-upnp-org:service-1-0' +CONTROL = 'urn:schemas-upnp-org:control-1-0' +SERVICE = 'urn:schemas-upnp-org:service-1-0' +DEVICE = 'urn:schemas-upnp-org:device-1-0' GATEWAY_SCHEMA = 'urn:schemas-upnp-org:device:InternetGatewayDevice:1' -WAN_INTERFACE_KEY = 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1' -LAYER_FORWARD_KEY = 'urn:schemas-upnp-org:service:Layer3Forwarding:1' -WAN_IP_KEY = 'urn:schemas-upnp-org:service:WANIPConnection:1' +WAN_SCHEMA = 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1' +LAYER_SCHEMA = 'urn:schemas-upnp-org:service:Layer3Forwarding:1' +IP_SCHEMA = 'urn:schemas-upnp-org:service:WANIPConnection:1' + +service_types = [ + GATEWAY_SCHEMA, + WAN_SCHEMA, + LAYER_SCHEMA, + IP_SCHEMA, +] SSDP_IP_ADDRESS = '239.255.255.250' SSDP_PORT = 1900 diff --git a/txupnp/fault.py b/txupnp/fault.py index 3a6a94e..c76e3c4 100644 --- a/txupnp/fault.py +++ b/txupnp/fault.py @@ -1,5 +1,5 @@ from txupnp.util import flatten_keys -from txupnp.constants import FAULT, CONTROL_KEY +from txupnp.constants import FAULT, CONTROL class UPnPError(Exception): @@ -8,6 +8,6 @@ class UPnPError(Exception): def handle_fault(response): if FAULT in response: - fault = flatten_keys(response[FAULT], "{%s}" % CONTROL_KEY) + fault = flatten_keys(response[FAULT], "{%s}" % CONTROL) raise UPnPError(fault['detail']['UPnPError']['errorDescription']) return response diff --git a/txupnp/gateway.py b/txupnp/gateway.py new file mode 100644 index 0000000..7f017b7 --- /dev/null +++ b/txupnp/gateway.py @@ -0,0 +1,95 @@ +import logging +from twisted.internet import defer +import treq +from xml.etree import ElementTree +from txupnp.util import etree_to_dict, flatten_keys +from txupnp.util import BASE_PORT_REGEX, BASE_ADDRESS_REGEX +from txupnp.constants import DEVICE, ROOT +from txupnp.constants import SPEC_VERSION + +log = logging.getLogger(__name__) + + +class Service(object): + def __init__(self, serviceType, serviceId, SCPDURL, eventSubURL, controlURL): + self.service_type = serviceType + self.service_id = serviceId + self.control_path = controlURL + self.subscribe_path = eventSubURL + self.scpd_path = SCPDURL + + +class Device(object): + def __init__(self, _root_device, deviceType=None, friendlyName=None, manufacturer=None, manufacturerURL=None, + modelDescription=None, modelName=None, modelNumber=None, modelURL=None, serialNumber=None, + UDN=None, serviceList=None, deviceList=None, **kwargs): + serviceList = serviceList or {} + deviceList = deviceList or {} + self._root_device = _root_device + self.device_type = deviceType + self.friendly_name = friendlyName + self.manufacturer = manufacturer + self.manufacturer_url = manufacturerURL + self.model_description = modelDescription + self.model_name = modelName + self.model_number = modelNumber + self.model_url = modelURL + self.serial_number = serialNumber + self.udn = UDN + services = serviceList["service"] + if isinstance(services, dict): + services = [services] + services = [Service(**service) for service in services] + self._root_device.services.extend(services) + devices = [Device(self._root_device, **deviceList[k]) for k in deviceList] + self._root_device.devices.extend(devices) + + +class RootDevice(object): + def __init__(self, xml_string): + root = flatten_keys(etree_to_dict(ElementTree.fromstring(xml_string)), "{%s}" % DEVICE)[ROOT] + self.spec_version = root.get(SPEC_VERSION) + self.url_base = root["URLBase"] + self.devices = [] + self.services = [] + root_device = Device(self, **(root["device"])) + self.devices.append(root_device) + log.info("finished setting up root device. %i devices and %i services", len(self.devices), len(self.services)) + + +class Gateway(object): + def __init__(self, usn, ext, server, location, cache_control, date, st): + self.usn = usn.encode() + self.ext = ext.encode() + self.server = server.encode() + self.location = location.encode() + self.cache_control = cache_control.encode() + self.date = date.encode() + self.urn = st.encode() + self.base_address = BASE_ADDRESS_REGEX.findall(self.location)[0] + self.port = int(BASE_PORT_REGEX.findall(self.location)[0]) + self._device = None + + @defer.inlineCallbacks + def discover_services(self): + log.info("querying %s", self.location) + response = yield treq.get(self.location) + response_xml = yield response.text() + self._device = RootDevice(response_xml) + + @property + def services(self): + if not self._device: + return {} + return {service.service_type: service for service in self._device.services} + + @property + def devices(self): + if not self._device: + return {} + return {device.udn: device for device in self._device.devices} + + def get_service(self, service_type): + for service in self._device.services: + if service.service_type == service_type: + return service diff --git a/txupnp/scpd.py b/txupnp/scpd.py index 6ad4522..486e6ec 100644 --- a/txupnp/scpd.py +++ b/txupnp/scpd.py @@ -1,13 +1,13 @@ import logging -from collections import namedtuple, OrderedDict +from collections import OrderedDict from twisted.internet import defer from twisted.web.client import Agent, HTTPConnectionPool import treq from treq.client import HTTPClient from xml.etree import ElementTree -from txupnp.util import etree_to_dict, flatten_keys, return_types, _return_types, none_or_str -from txupnp.fault import handle_fault -from txupnp.constants import POST, ENVELOPE, BODY, XML_VERSION, WAN_IP_KEY, SERVICE_KEY, SSDP_IP_ADDRESS +from txupnp.util import etree_to_dict, flatten_keys, return_types, _return_types, none_or_str, none +from txupnp.fault import handle_fault, UPnPError +from txupnp.constants import POST, ENVELOPE, BODY, XML_VERSION, IP_SCHEMA, SERVICE, SSDP_IP_ADDRESS, DEVICE, ROOT, service_types log = logging.getLogger(__name__) @@ -54,7 +54,7 @@ class _SCPDCommand(object): self.param_names = param_names self.returns = returns - def extract_body(self, xml_response, service_key=WAN_IP_KEY): + def extract_body(self, xml_response, service_key=IP_SCHEMA): content_dict = etree_to_dict(ElementTree.fromstring(xml_response)) envelope = content_dict[ENVELOPE] return flatten_keys(envelope[BODY], "{%s}" % service_key) @@ -64,9 +64,8 @@ class _SCPDCommand(object): if '%sResponse' % self.method in body: response_key = '%sResponse' % self.method else: - 1/0 - return - + log.error(body.keys()) + raise UPnPError("unknown response fields") response = body[response_key] extracted_response = tuple([response[n] for n in self.returns]) if len(extracted_response) == 1: @@ -81,8 +80,7 @@ class _SCPDCommand(object): ('Host', ('%s:%i' % (SSDP_IP_ADDRESS, self.service_port))), ('Content-Type', 'text/xml'), ('Content-Length', len(soap_body)) - ) - ) + )) response = yield self._http_client.request( POST, url=self.control_url, data=soap_body, headers=headers ) @@ -107,20 +105,50 @@ class _SCPDCommand(object): defer.returnValue(result) -class SCPDCommandManager(object): - def __init__(self, upnp): - self._upnp = upnp +class SCPDResponse(object): + def __init__(self, url, headers, content): + self.url = url + self.headers = headers + self.content = content + + def get_element_tree(self): + return ElementTree.fromstring(self.content) + + def get_element_dict(self, service_key): + return flatten_keys(etree_to_dict(self.get_element_tree()), "{%s}" % service_key) + + def get_action_list(self): + return self.get_element_dict(SERVICE)["scpd"]["actionList"]["action"] + + def get_device_info(self): + return self.get_element_dict(DEVICE)[ROOT] + + +class SCPDCommandRunner(object): + def __init__(self, gateway): + self._gateway = gateway + self._unsupported_actions = [] + self._scpd_responses = [] + + @defer.inlineCallbacks + def _discover_commands(self, service): + scpd_url = self._gateway.base_address + service.scpd_path.encode() + response = yield treq.get(scpd_url) + content = yield response.content() + scpd_response = SCPDResponse(scpd_url, + response.headers, content) + self._scpd_responses.append(scpd_response) + for action_dict in scpd_response.get_action_list(): + self._register_command(action_dict, service.service_type) + defer.returnValue(None) @defer.inlineCallbacks def discover_commands(self): - response = yield treq.get(self._upnp.wan_ip.scpd_url) - content = yield response.content() - tree = ElementTree.fromstring(content) - actions = flatten_keys(etree_to_dict(tree), "{%s}" % SERVICE_KEY)["scpd"]["actionList"]["action"] - for action_dict in actions: - self._register_command(action_dict) - log.info("registered %i commands", len(actions)) - defer.returnValue(None) + for service_type in service_types: + service = self._gateway.get_service(service_type) + if not service: + continue + yield self._discover_commands(service) @staticmethod def _soap_function_info(action_dict): @@ -139,24 +167,31 @@ class SCPDCommandManager(object): [i['name'] for i in arg_dicts if i['direction'] == 'out'] ) - def _register_command(self, action_info): - command = _SCPDCommand(self._upnp.gateway_ip, self._upnp.gateway_port, self._upnp.wan_ip.control_url, - self._upnp.wan_ip.service_id, *self._soap_function_info(action_info)) + def _register_command(self, action_info, service_type): + func_info = self._soap_function_info(action_info) + command = _SCPDCommand(self._gateway.base_address, self._gateway.port, + self._gateway.base_address + self._gateway.get_service(service_type).control_path.encode(), + self._gateway.get_service(service_type).service_id.encode(), *func_info) if not hasattr(self, command.method): - raise NotImplementedError(command.method) + self._unsupported_actions.append(action_info) + print(("# send this to jack!\n\n@staticmethod\ndef %s(" % func_info[0]) + ("" if not func_info[1] else ", ".join(func_info[1])) + ("):\n \"\"\"Returns (%s)\"\"\"\n raise NotImplementedError()\n\n" % ("None" if not func_info[2] else ", ".join(func_info[2])))) + return current = getattr(self, command.method) if hasattr(current, "_return_types"): command._process_result = _return_types(*current._return_types)(command._process_result) setattr(command, "__doc__", current.__doc__) setattr(self, command.method, command) + # log.info("registered %s::%s", service_type, action_info['name']) @staticmethod + @return_types(none) def AddPortMapping(NewRemoteHost, NewExternalPort, NewProtocol, NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration): """Returns None""" raise NotImplementedError() @staticmethod + @return_types(bool, bool) def GetNATRSIPStatus(): """Returns (NewRSIPAvailable, NewNATEnabled)""" raise NotImplementedError() @@ -177,36 +212,83 @@ class SCPDCommandManager(object): raise NotImplementedError() @staticmethod + @return_types(none) def SetConnectionType(NewConnectionType): """Returns None""" raise NotImplementedError() @staticmethod + @return_types(str) def GetExternalIPAddress(): """Returns (NewExternalIPAddress)""" raise NotImplementedError() @staticmethod + @return_types(str, str) def GetConnectionTypeInfo(): """Returns (NewConnectionType, NewPossibleConnectionTypes)""" raise NotImplementedError() @staticmethod + @return_types(str, str, int) def GetStatusInfo(): """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)""" raise NotImplementedError() @staticmethod + @return_types(none) def ForceTermination(): """Returns None""" raise NotImplementedError() @staticmethod + @return_types(none) def DeletePortMapping(NewRemoteHost, NewExternalPort, NewProtocol): """Returns None""" raise NotImplementedError() @staticmethod + @return_types(none) def RequestConnection(): """Returns None""" raise NotImplementedError() + + @staticmethod + def GetCommonLinkProperties(): + """Returns (NewWANAccessType, NewLayer1UpstreamMaxBitRate, NewLayer1DownstreamMaxBitRate, NewPhysicalLinkStatus)""" + raise NotImplementedError() + + @staticmethod + def GetTotalBytesSent(): + """Returns (NewTotalBytesSent)""" + raise NotImplementedError() + + @staticmethod + def GetTotalBytesReceived(): + """Returns (NewTotalBytesReceived)""" + raise NotImplementedError() + + @staticmethod + def GetTotalPacketsSent(): + """Returns (NewTotalPacketsSent)""" + raise NotImplementedError() + + @staticmethod + def GetTotalPacketsReceived(): + """Returns (NewTotalPacketsReceived)""" + raise NotImplementedError() + + @staticmethod + def X_GetICSStatistics(): + """Returns (TotalBytesSent, TotalBytesReceived, TotalPacketsSent, TotalPacketsReceived, Layer1DownstreamMaxBitRate, Uptime)""" + raise NotImplementedError() + + @staticmethod + def GetDefaultConnectionService(): + """Returns (NewDefaultConnectionService)""" + raise NotImplementedError() + + @staticmethod + def SetDefaultConnectionService(NewDefaultConnectionService): + """Returns (None)""" + raise NotImplementedError() diff --git a/txupnp/soap.py b/txupnp/soap.py new file mode 100644 index 0000000..425717d --- /dev/null +++ b/txupnp/soap.py @@ -0,0 +1,47 @@ +import logging +from twisted.internet import defer +from txupnp.util import get_lan_info +from txupnp.ssdp import SSDPFactory +from txupnp.scpd import SCPDCommandRunner +from txupnp.gateway import Gateway +from txupnp.constants import GATEWAY_SCHEMA + +log = logging.getLogger(__name__) + + +class SOAPServiceManager(object): + def __init__(self, reactor): + self._reactor = reactor + self.iface_name, self.router_ip, self.lan_address = get_lan_info() + self.sspd_factory = SSDPFactory(self.lan_address, self._reactor) + self._command_runners = {} + self._selected_runner = GATEWAY_SCHEMA + + @defer.inlineCallbacks + def discover_services(self, address=None, ttl=30, max_devices=2): + server_infos = yield self.sspd_factory.m_search( + address or self.router_ip, ttl=ttl, max_devices=max_devices + ) + locations = [] + for server_info in server_infos: + if server_info['st'] not in self._command_runners: + locations.append(server_info['location']) + gateway = Gateway(**server_info) + yield gateway.discover_services() + command_runner = SCPDCommandRunner(gateway) + yield command_runner.discover_commands() + self._command_runners[gateway.urn.decode()] = command_runner + defer.returnValue(len(self._command_runners)) + + def set_runner(self, urn): + if urn not in self._command_runners: + raise IndexError(urn) + self._command_runners = urn + + def get_runner(self): + if self._selected_runner and self._command_runners and self._selected_runner not in self._command_runners: + self._selected_runner = self._command_runners.keys()[0] + return self._command_runners[self._selected_runner] + + def get_available_runners(self): + return self._command_runners.keys() diff --git a/txupnp/ssdp.py b/txupnp/ssdp.py index 6eb9473..177dff5 100644 --- a/txupnp/ssdp.py +++ b/txupnp/ssdp.py @@ -1,15 +1,48 @@ import logging +import binascii from twisted.internet import defer from twisted.internet.protocol import DatagramProtocol -from txupnp.util import get_lan_info +from txupnp.fault import UPnPError from txupnp.constants import GATEWAY_SCHEMA, M_SEARCH_TEMPLATE, SSDP_DISCOVER, SSDP_IP_ADDRESS, SSDP_PORT log = logging.getLogger(__name__) +def parse_http_fields(content_lines): + return { + (k.lower().rstrip(":".encode()).replace("-".encode(), "_".encode())).decode(): v.decode() + for k, v in { + l.split(": ".encode())[0]: "".encode().join(l.split(": ".encode())[1:]) + for l in content_lines + }.items() if k + } + + +def parse_ssdp_request(operation, port, protocol, content_lines): + if operation != "NOTIFY".encode(): + log.warning("unsupported operation: %s", operation) + raise UPnPError("unsupported operation: %s" % operation) + if port != "*".encode(): + log.warning("unexpected port: %s", port) + raise UPnPError("unexpected port: %s" % port) + return parse_http_fields(content_lines) + + +def parse_ssdp_response(code, response, content_lines): + try: + if int(code) != 200: + raise UPnPError("unexpected http response code: %i" % int(code)) + except ValueError: + log.error(response) + raise UPnPError("unexpected http response code: %s" % code) + if response != "OK".encode(): + raise UPnPError("unexpected response: %s" % response) + return parse_http_fields(content_lines) + + class SSDPProtocol(DatagramProtocol): def __init__(self, reactor, finished_deferred, iface, router, ssdp_address=SSDP_IP_ADDRESS, - ssdp_port=SSDP_PORT, ttl=1): + ssdp_port=SSDP_PORT, ttl=1, max_devices=None): self._reactor = reactor self._sem = defer.DeferredSemaphore(1) self.finished_deferred = finished_deferred @@ -19,41 +52,51 @@ class SSDPProtocol(DatagramProtocol): self.ssdp_port = ssdp_port self.ttl = ttl self._start = None - - @staticmethod - def parse_ssdp_response(datagram): - lines = datagram.split("\r\n".encode()) - if not lines: - return - protocol, code, response = lines[0].split(" ".encode()) - if int(code) != 200: - raise Exception("unexpected http response code") - if response != "OK".encode(): - raise Exception("unexpected response") - fields = { - k.lower(): v - for k, v in { - l.split(": ".encode())[0]: "".encode().join(l.split(": ".encode())[1:]) - for l in lines[1:] - }.items() if k - } - return fields + self.max_devices = max_devices + self.devices = [] def startProtocol(self): return self._sem.run(self.do_start) + def send_m_search(self): + data = M_SEARCH_TEMPLATE.format(self.ssdp_address, self.ssdp_port, GATEWAY_SCHEMA, SSDP_DISCOVER, self.ttl) + try: + log.info("sending m-search (%i bytes) to %s:%i", len(data), self.ssdp_address, self.ssdp_port) + self.transport.write(data.encode(), (self.ssdp_address, self.ssdp_port)) + except Exception as err: + log.exception("failed to write %s to %s:%i", binascii.hexlify(data), self.ssdp_address, self.ssdp_port) + raise err + + def parse_ssdp_datagram(self, datagram): + lines = datagram.split("\r\n".encode()) + header_pieces = lines[0].split(" ".encode()) + protocols = { + "HTTP/1.1".encode() + } + operations = { + "M-SEARCH".encode(), + "NOTIFY".encode() + } + if header_pieces[0] in operations: + if header_pieces[2] not in protocols: + raise UPnPError("unknown protocol: %s" % header_pieces[2]) + return parse_ssdp_request(header_pieces[0], header_pieces[1], header_pieces[2], lines[1:]) + if header_pieces[0] in protocols: + parsed = parse_ssdp_response(header_pieces[1], header_pieces[2], lines[1:]) + log.info("received reply (%i bytes) to SSDP request (%f) (%s) %s", len(datagram), + self._reactor.seconds() - self._start, parsed['location'], parsed['server']) + return parsed + raise UPnPError("don't know how to decode datagram: %s" % binascii.hexlify(datagram)) + def do_start(self): self._start = self._reactor.seconds() self.finished_deferred.addTimeout(self.ttl, self._reactor) self.transport.setTTL(self.ttl) self.transport.joinGroup(self.ssdp_address, interface=self.iface) - data = M_SEARCH_TEMPLATE.format(self.ssdp_address, self.ssdp_port, GATEWAY_SCHEMA, SSDP_DISCOVER, self.ttl) - self.transport.write(data.encode(), (self.ssdp_address, self.ssdp_port)) + self.send_m_search() - def do_stop(self, gateway_xml_location): + def leave_group(self): self.transport.leaveGroup(self.ssdp_address, interface=self.iface) - if not self.finished_deferred.called: - self.finished_deferred.callback(gateway_xml_location) def datagramReceived(self, datagram, addr): self._sem.run(self.handle_datagram, datagram, addr) @@ -61,28 +104,61 @@ class SSDPProtocol(DatagramProtocol): def handle_datagram(self, datagram, address): if address[0] == self.router: try: - server_info = self.parse_ssdp_response(datagram) - except: - log.exception("error parsing response: %s", datagram.encode('hex')) - raise - if server_info: - log.info("received reply (%i bytes) to SSDP request (%fs)", len(datagram), - self._reactor.seconds() - self._start) - self._sem.run(self.do_stop, server_info) - elif address[0] != get_lan_info()[2]: - log.info("received %i bytes from %s:%i", len(datagram), address[0], address[1]) + parsed = self.parse_ssdp_datagram(datagram) + self.devices.append(parsed) + log.info("found %i/%s so far", len(self.devices), self.max_devices) + if not self.finished_deferred.called: + if not self.max_devices or (self.max_devices and len(self.devices) >= self.max_devices): + self._sem.run(self.finished_deferred.callback, self.devices) + except UPnPError as err: + log.error("error decoding SSDP response from %s:%s (error: %s)\n%s", address[0], address[1], str(err), binascii.hexlify(datagram)) + raise err + elif address[0] != self.iface: + log.info("received %i bytes from %s:%i\n%s", len(datagram), address[0], address[1], binascii.hexlify(datagram)) + else: + pass # loopback class SSDPFactory(object): def __init__(self, lan_address, reactor): self.lan_address = lan_address self._reactor = reactor + self.protocol = None + self.port = None + self.finished_deferred = defer.Deferred() + + def stop(self): + try: + self.protocol.leave_group() + self.port.stopListening() + except: + pass + + def connect(self, address, ttl, max_devices=1): + self.protocol = SSDPProtocol(self._reactor, self.finished_deferred, self.lan_address, address, ttl=ttl, + max_devices=max_devices) + self.port = self._reactor.listenMulticast(self.protocol.ssdp_port, self.protocol, listenMultiple=True) + self._reactor.addSystemEventTrigger("before", "shutdown", self.stop) + return self.finished_deferred @defer.inlineCallbacks - def m_search(self, address): - finished_d = defer.Deferred() - ssdp_protocol = SSDPProtocol(self._reactor, finished_d, self.lan_address, address, ttl=30) - port = ssdp_protocol._reactor.listenMulticast(ssdp_protocol.ssdp_port, ssdp_protocol, listenMultiple=True) - server_info = yield finished_d - port.stopListening() - defer.returnValue(server_info) + def m_search(self, address, ttl=30, max_devices=2): + """ + Perform a HTTP over UDP M-SEARCH query + + returns (list) [{ + 'server: + 'location': , + 'cache-control': , + 'date': , + 'usn': + }, ...] + """ + d = self.connect(address, ttl, max_devices=max_devices) + try: + server_infos = yield d + except defer.TimeoutError: + server_infos = self.protocol.devices + log.info("found %i devices", len(server_infos)) + self.stop() + defer.returnValue(server_infos) diff --git a/txupnp/tests/__init__.py b/txupnp/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_txupnp.py b/txupnp/tests/test_txupnp.py similarity index 88% rename from tests/test_txupnp.py rename to txupnp/tests/test_txupnp.py index 9a4785d..ff5a1c0 100644 --- a/tests/test_txupnp.py +++ b/txupnp/tests/test_txupnp.py @@ -7,7 +7,7 @@ log = logging.getLogger("txupnp") @defer.inlineCallbacks -def test(ext_port=4446, int_port=4445, proto='UDP'): +def test(ext_port=4446, int_port=4446, proto='UDP'): u = UPnP(reactor) found = yield u.discover() assert found, "M-SEARCH failed to find gateway" @@ -36,6 +36,12 @@ def test(ext_port=4446, int_port=4445, proto='UDP'): else: log.error("failed to tear down redirect") raise AssertionError() + r = yield u.get_rsip_nat_status() + log.info(r) + r = yield u.get_status_info() + log.info(r) + r = yield u.get_connection_type_info() + log.info(r) @defer.inlineCallbacks diff --git a/txupnp/upnp.py b/txupnp/upnp.py index 78f8035..faf85e5 100644 --- a/txupnp/upnp.py +++ b/txupnp/upnp.py @@ -1,111 +1,49 @@ import logging -from xml.etree import ElementTree from twisted.internet import defer -import treq from txupnp.fault import UPnPError -from txupnp.ssdp import SSDPFactory -from txupnp.scpd import SCPDCommandManager -from txupnp.util import get_lan_info, BASE_ADDRESS_REGEX, flatten_keys, etree_to_dict, DEVICE_ELEMENT_REGEX -from txupnp.util import find_inner_service_info, BASE_PORT_REGEX -from txupnp.constants import LAYER_FORWARD_KEY, WAN_INTERFACE_KEY, WAN_IP_KEY +from txupnp.soap import SOAPServiceManager log = logging.getLogger(__name__) -class Service(object): - def __init__(self, base_address, serviceId=None, SCPDURL=None, eventSubURL=None, controlURL=None, **kwargs): - self.base_address = base_address - self.service_id = serviceId - self._control_path = controlURL - self._subscribe_path = eventSubURL - self._scpd_path = SCPDURL - - @property - def scpd_url(self): - return self.base_address.decode() + self._scpd_path - - @property - def control_url(self): - return self.base_address.decode() + self._control_path - - class UPnP(object): def __init__(self, reactor): self._reactor = reactor - self.iface_name, self.gateway_ip, self.lan_address = get_lan_info() - self._m_search_factory = SSDPFactory(self.lan_address, self._reactor) - self.gateway_url = "" - self.gateway_base = "" - self.gateway_port = None - self.layer_3_forwarding = None - self.wan_ip = None - self.wan_interface = None - self.commands = SCPDCommandManager(self) + self.soap_manager = SOAPServiceManager(reactor) - def m_search(self, address): + @property + def lan_address(self): + return self.soap_manager.lan_address + + @property + def commands(self): + return self.soap_manager.get_runner() + + def m_search(self, address, ttl=30, max_devices=2): """ Perform a HTTP over UDP M-SEARCH query - returns (dict) { + returns (list) [{ + 'server: 'location': , 'cache-control': , 'date': , 'usn': - } + }, ...] """ - return self._m_search_factory.m_search(address) + return self.soap_manager.sspd_factory.m_search(address, ttl=ttl, max_devices=max_devices) @defer.inlineCallbacks - def _discover_gateway(self): - server_info = yield self.m_search(self.gateway_ip) - if 'server'.encode() in server_info: - log.info("gateway version: %s", server_info['server'.encode()]) - else: - log.info("discovered gateway") - self.gateway_url = server_info['location'.encode()] - self.gateway_base = BASE_ADDRESS_REGEX.findall(self.gateway_url)[0] - self.gateway_port = int(BASE_PORT_REGEX.findall(self.gateway_url)[0]) # the tcp port - response = yield treq.get(self.gateway_url) - response_xml = yield response.text() - elements = ElementTree.fromstring(response_xml) - for element in elements: - if DEVICE_ELEMENT_REGEX.findall(element.tag): - tag = DEVICE_ELEMENT_REGEX.findall(element.tag)[0] - prefix = tag[:-6] - device_info = flatten_keys(etree_to_dict(elements.find(tag)), prefix) - self.layer_3_forwarding = Service(self.gateway_base, **find_inner_service_info( - device_info['device']['serviceList']['service'], LAYER_FORWARD_KEY - ) - ) - self.wan_interface = Service(self.gateway_base, **find_inner_service_info( - device_info['device']['deviceList']['device']['serviceList']['service'], WAN_INTERFACE_KEY - ) - ) - self.wan_ip = Service(self.gateway_base, **find_inner_service_info( - device_info['device']['deviceList']['device']['deviceList']['device']['serviceList']['service'], - WAN_IP_KEY - ) - ) - defer.returnValue(None) - - @defer.inlineCallbacks - def discover(self): + def discover(self, ttl=30, max_devices=2): try: - yield self._discover_gateway() + yield self.soap_manager.discover_services(ttl=ttl, max_devices=max_devices) except defer.TimeoutError: - log.warning("failed to find gateway") + log.warning("failed to find upnp gateway") defer.returnValue(False) - yield self.commands.discover_commands() defer.returnValue(True) def get_external_ip(self): return self.commands.GetExternalIPAddress() - # - # def GetStatusInfo(self): - # return self._commands['GetStatusInfo']() - # - # def GetConnectionTypeInfo(self): - # return self._commands['GetConnectionTypeInfo']() def add_port_mapping(self, external_port, protocol, internal_port, lan_address, description, lease_duration): return self.commands.AddPortMapping( @@ -115,9 +53,6 @@ class UPnP(object): NewLeaseDuration=lease_duration ) - # def GetNATRSIPStatus(self): - # return self._commands['GetNATRSIPStatus']() - def get_port_mapping_by_index(self, index): return self.commands.GetGenericPortMappingEntry(NewPortMappingIndex=index) @@ -135,14 +70,39 @@ class UPnP(object): defer.returnValue(redirects) def get_specific_port_mapping(self, external_port, protocol): + """ + :param external_port: (int) external port to listen on + :param protocol: (str) 'UDP' | 'TCP' + :return: (int) , (str) , (bool) , (str) , (int) + """ return self.commands.GetSpecificPortMappingEntry( NewRemoteHost=None, NewExternalPort=external_port, NewProtocol=protocol ) - # def ForceTermination(self): - # return self._commands['ForceTermination']() - def delete_port_mapping(self, external_port, protocol): + """ + :param external_port: (int) external port to listen on + :param protocol: (str) 'UDP' | 'TCP' + :return: None + """ return self.commands.DeletePortMapping( NewRemoteHost=None, NewExternalPort=external_port, NewProtocol=protocol ) + + def get_rsip_nat_status(self): + """ + :return: (bool) NewRSIPAvailable, (bool) NewNATEnabled + """ + return self.commands.GetNATRSIPStatus() + + def get_status_info(self): + """ + :return: (str) NewConnectionStatus, (str) NewLastConnectionError, (int) NewUptime + """ + return self.commands.GetStatusInfo() + + def get_connection_type_info(self): + """ + :return: (str) NewConnectionType (str), NewPossibleConnectionTypes (str) + """ + return self.commands.GetConnectionTypeInfo() diff --git a/txupnp/util.py b/txupnp/util.py index ea59637..c032252 100644 --- a/txupnp/util.py +++ b/txupnp/util.py @@ -81,3 +81,5 @@ def return_types(*types): none_or_str = lambda x: None if not x or x == 'None' else str(x) + +none = lambda _: None