add more cli commands
This commit is contained in:
parent
198315055f
commit
c745bbac01
5 changed files with 34 additions and 7 deletions
|
@ -2,6 +2,7 @@ import argparse
|
||||||
import logging
|
import logging
|
||||||
from twisted.internet import reactor, defer
|
from twisted.internet import reactor, defer
|
||||||
from txupnp.upnp import UPnP
|
from txupnp.upnp import UPnP
|
||||||
|
from txupnp.fault import UPnPError
|
||||||
|
|
||||||
log = logging.getLogger("txupnp")
|
log = logging.getLogger("txupnp")
|
||||||
|
|
||||||
|
@ -28,10 +29,35 @@ def list_mappings(u, *_):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def add_mapping(u, *_):
|
||||||
|
port = 4567
|
||||||
|
protocol = "UDP"
|
||||||
|
description = "txupnp test mapping"
|
||||||
|
yield u.get_next_mapping(port, protocol, description)
|
||||||
|
result = yield u.get_specific_port_mapping(port, protocol)
|
||||||
|
if result:
|
||||||
|
print("added mapping")
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def delete_mapping(u, *_):
|
||||||
|
port = 4567
|
||||||
|
protocol = "UDP"
|
||||||
|
yield u.delete_port_mapping(port, protocol)
|
||||||
|
mapping = yield u.get_specific_port_mapping(port, protocol)
|
||||||
|
if mapping:
|
||||||
|
print("failed to remove mapping")
|
||||||
|
else:
|
||||||
|
print("removed mapping")
|
||||||
|
|
||||||
|
|
||||||
cli_commands = {
|
cli_commands = {
|
||||||
"debug_device": debug_device,
|
"debug_device": debug_device,
|
||||||
"get_external_ip": get_external_ip,
|
"get_external_ip": get_external_ip,
|
||||||
"list_mappings": list_mappings
|
"list_mappings": list_mappings,
|
||||||
|
"add_mapping": add_mapping,
|
||||||
|
"delete_mapping": delete_mapping
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -49,7 +75,7 @@ def run_command(found, u, command, debug_xml):
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="upnp command line utility")
|
parser = argparse.ArgumentParser(description="upnp command line utility")
|
||||||
parser.add_argument(dest="command", type=str, help="debug_gateway | list_mappings | get_external_ip")
|
parser.add_argument(dest="command", type=str, help="debug_gateway | list_mappings | get_external_ip | add_mapping | delete_mapping")
|
||||||
parser.add_argument("--debug_logging", dest="debug_logging", default=False, action="store_true")
|
parser.add_argument("--debug_logging", dest="debug_logging", default=False, action="store_true")
|
||||||
parser.add_argument("--include_igd_xml", dest="include_igd_xml", default=False, action="store_true")
|
parser.add_argument("--include_igd_xml", dest="include_igd_xml", default=False, action="store_true")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
|
@ -9,5 +9,6 @@ class UPnPError(Exception):
|
||||||
def handle_fault(response):
|
def handle_fault(response):
|
||||||
if FAULT in response:
|
if FAULT in response:
|
||||||
fault = flatten_keys(response[FAULT], "{%s}" % CONTROL)
|
fault = flatten_keys(response[FAULT], "{%s}" % CONTROL)
|
||||||
raise UPnPError(fault['detail']['UPnPError']['errorDescription'])
|
error_description = fault['detail']['UPnPError']['errorDescription']
|
||||||
|
raise UPnPError(error_description)
|
||||||
return response
|
return response
|
||||||
|
|
|
@ -32,7 +32,7 @@ class CaseInsensitive(object):
|
||||||
except AttributeError as err:
|
except AttributeError as err:
|
||||||
not_evaluated[k] = v
|
not_evaluated[k] = v
|
||||||
if not_evaluated:
|
if not_evaluated:
|
||||||
log.warning("%s did not apply kwargs: %s", self.__class__.__name__, not_evaluated)
|
log.debug("%s did not apply kwargs: %s", self.__class__.__name__, not_evaluated)
|
||||||
|
|
||||||
def _get_attr_name(self, case_insensitive):
|
def _get_attr_name(self, case_insensitive):
|
||||||
for k, v in self.__dict__.items():
|
for k, v in self.__dict__.items():
|
||||||
|
|
|
@ -92,7 +92,7 @@ class _SCPDCommand(object):
|
||||||
try:
|
try:
|
||||||
response = self.extract_response(self.extract_body(xml_response))
|
response = self.extract_response(self.extract_body(xml_response))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.error("error extracting response (%s) to %s:\n%s", err, self.method, xml_response)
|
log.debug("error extracting response (%s) to %s:\n%s", err, self.method, xml_response)
|
||||||
raise err
|
raise err
|
||||||
if not response:
|
if not response:
|
||||||
log.debug("empty response to %s\n%s", self.method, xml_response)
|
log.debug("empty response to %s\n%s", self.method, xml_response)
|
||||||
|
|
|
@ -113,14 +113,14 @@ class UPnP(object):
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
def delete_port_mapping(self, external_port, protocol):
|
def delete_port_mapping(self, external_port, protocol, new_remote_host=""):
|
||||||
"""
|
"""
|
||||||
:param external_port: (int) external port to listen on
|
:param external_port: (int) external port to listen on
|
||||||
:param protocol: (str) 'UDP' | 'TCP'
|
:param protocol: (str) 'UDP' | 'TCP'
|
||||||
:return: None
|
:return: None
|
||||||
"""
|
"""
|
||||||
return self.commands.DeletePortMapping(
|
return self.commands.DeletePortMapping(
|
||||||
NewRemoteHost=None, NewExternalPort=external_port, NewProtocol=protocol
|
NewRemoteHost=new_remote_host, NewExternalPort=external_port, NewProtocol=protocol
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_rsip_nat_status(self):
|
def get_rsip_nat_status(self):
|
||||||
|
|
Loading…
Reference in a new issue