From c745bbac01cd85eec3e71307b041db062147439f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 2 Aug 2018 09:38:00 -0400 Subject: [PATCH] add more cli commands --- txupnp/cli.py | 30 ++++++++++++++++++++++++++++-- txupnp/fault.py | 3 ++- txupnp/gateway.py | 2 +- txupnp/scpd.py | 2 +- txupnp/upnp.py | 4 ++-- 5 files changed, 34 insertions(+), 7 deletions(-) diff --git a/txupnp/cli.py b/txupnp/cli.py index 3bbfd12..7f80763 100644 --- a/txupnp/cli.py +++ b/txupnp/cli.py @@ -2,6 +2,7 @@ import argparse import logging from twisted.internet import reactor, defer from txupnp.upnp import UPnP +from txupnp.fault import UPnPError log = logging.getLogger("txupnp") @@ -28,10 +29,35 @@ def list_mappings(u, *_): ) +@defer.inlineCallbacks +def add_mapping(u, *_): + port = 4567 + protocol = "UDP" + description = "txupnp test mapping" + yield u.get_next_mapping(port, protocol, description) + result = yield u.get_specific_port_mapping(port, protocol) + if result: + print("added mapping") + + +@defer.inlineCallbacks +def delete_mapping(u, *_): + port = 4567 + protocol = "UDP" + yield u.delete_port_mapping(port, protocol) + mapping = yield u.get_specific_port_mapping(port, protocol) + if mapping: + print("failed to remove mapping") + else: + print("removed mapping") + + cli_commands = { "debug_device": debug_device, "get_external_ip": get_external_ip, - "list_mappings": list_mappings + "list_mappings": list_mappings, + "add_mapping": add_mapping, + "delete_mapping": delete_mapping } @@ -49,7 +75,7 @@ def run_command(found, u, command, debug_xml): def main(): parser = argparse.ArgumentParser(description="upnp command line utility") - parser.add_argument(dest="command", type=str, help="debug_gateway | list_mappings | get_external_ip") + parser.add_argument(dest="command", type=str, help="debug_gateway | list_mappings | get_external_ip | add_mapping | delete_mapping") parser.add_argument("--debug_logging", dest="debug_logging", default=False, action="store_true") parser.add_argument("--include_igd_xml", dest="include_igd_xml", default=False, action="store_true") args = parser.parse_args() diff --git a/txupnp/fault.py b/txupnp/fault.py index c76e3c4..94b1084 100644 --- a/txupnp/fault.py +++ b/txupnp/fault.py @@ -9,5 +9,6 @@ class UPnPError(Exception): def handle_fault(response): if FAULT in response: fault = flatten_keys(response[FAULT], "{%s}" % CONTROL) - raise UPnPError(fault['detail']['UPnPError']['errorDescription']) + error_description = fault['detail']['UPnPError']['errorDescription'] + raise UPnPError(error_description) return response diff --git a/txupnp/gateway.py b/txupnp/gateway.py index 7dee8b6..726ad5e 100644 --- a/txupnp/gateway.py +++ b/txupnp/gateway.py @@ -32,7 +32,7 @@ class CaseInsensitive(object): except AttributeError as err: not_evaluated[k] = v if not_evaluated: - log.warning("%s did not apply kwargs: %s", self.__class__.__name__, not_evaluated) + log.debug("%s did not apply kwargs: %s", self.__class__.__name__, not_evaluated) def _get_attr_name(self, case_insensitive): for k, v in self.__dict__.items(): diff --git a/txupnp/scpd.py b/txupnp/scpd.py index 14c6f9f..effef59 100644 --- a/txupnp/scpd.py +++ b/txupnp/scpd.py @@ -92,7 +92,7 @@ class _SCPDCommand(object): try: response = self.extract_response(self.extract_body(xml_response)) except Exception as err: - log.error("error extracting response (%s) to %s:\n%s", err, self.method, xml_response) + log.debug("error extracting response (%s) to %s:\n%s", err, self.method, xml_response) raise err if not response: log.debug("empty response to %s\n%s", self.method, xml_response) diff --git a/txupnp/upnp.py b/txupnp/upnp.py index 3f6f4f9..60b9196 100644 --- a/txupnp/upnp.py +++ b/txupnp/upnp.py @@ -113,14 +113,14 @@ class UPnP(object): defer.returnValue(None) raise err - def delete_port_mapping(self, external_port, protocol): + def delete_port_mapping(self, external_port, protocol, new_remote_host=""): """ :param external_port: (int) external port to listen on :param protocol: (str) 'UDP' | 'TCP' :return: None """ return self.commands.DeletePortMapping( - NewRemoteHost=None, NewExternalPort=external_port, NewProtocol=protocol + NewRemoteHost=new_remote_host, NewExternalPort=external_port, NewProtocol=protocol ) def get_rsip_nat_status(self):