aioupnp/txupnp/upnp.py

176 lines
6.2 KiB
Python
Raw Normal View History

2018-07-27 01:49:33 +02:00
import logging
2018-07-30 23:48:20 +02:00
import json
2018-07-31 22:53:08 +02:00
import treq
2018-07-27 01:49:33 +02:00
from twisted.internet import defer
from txupnp.fault import UPnPError
2018-07-29 04:08:24 +02:00
from txupnp.soap import SOAPServiceManager
2018-07-31 17:23:21 +02:00
from txupnp.scpd import UPnPFallback
2018-07-30 23:48:20 +02:00
from txupnp.util import DeferredDict
2018-07-27 01:49:33 +02:00
log = logging.getLogger(__name__)
2018-07-29 04:08:24 +02:00
class UPnP(object):
2018-07-31 17:23:21 +02:00
def __init__(self, reactor, miniupnpc_fallback=True):
2018-07-29 04:08:24 +02:00
self._reactor = reactor
2018-07-31 17:23:21 +02:00
self._miniupnpc_fallback = miniupnpc_fallback
2018-07-29 04:08:24 +02:00
self.soap_manager = SOAPServiceManager(reactor)
2018-07-31 17:23:21 +02:00
self.miniupnpc_runner = None
2018-07-31 22:53:08 +02:00
self._miniupnpc_igd_url = None
2018-07-27 01:49:33 +02:00
@property
2018-07-29 04:08:24 +02:00
def lan_address(self):
return self.soap_manager.lan_address
2018-07-27 01:49:33 +02:00
@property
2018-07-29 04:08:24 +02:00
def commands(self):
2018-07-30 23:48:20 +02:00
try:
return self.soap_manager.get_runner()
except UPnPError as err:
2018-07-31 17:23:21 +02:00
if self._miniupnpc_fallback and self.miniupnpc_runner:
return self.miniupnpc_runner
2018-07-30 23:48:20 +02:00
log.warning("upnp is not available: %s", err)
2018-07-27 01:49:33 +02:00
2018-07-29 23:32:14 +02:00
def m_search(self, address, timeout=30, max_devices=2):
2018-07-27 01:49:33 +02:00
"""
Perform a HTTP over UDP M-SEARCH query
2018-07-29 04:08:24 +02:00
returns (list) [{
'server: <gateway os and version string>
2018-07-27 01:49:33 +02:00
'location': <upnp gateway url>,
'cache-control': <max age>,
'date': <server time>,
'usn': <usn>
2018-07-29 04:08:24 +02:00
}, ...]
2018-07-27 01:49:33 +02:00
"""
2018-07-29 23:32:14 +02:00
return self.soap_manager.sspd_factory.m_search(address, timeout=timeout, max_devices=max_devices)
2018-07-27 01:49:33 +02:00
@defer.inlineCallbacks
2018-07-30 23:48:20 +02:00
def discover(self, timeout=1, max_devices=1, keep_listening=False):
2018-07-27 01:49:33 +02:00
try:
2018-07-29 23:32:14 +02:00
yield self.soap_manager.discover_services(timeout=timeout, max_devices=max_devices)
2018-07-30 23:48:20 +02:00
found = True
2018-07-27 01:49:33 +02:00
except defer.TimeoutError:
2018-07-30 23:48:20 +02:00
found = False
finally:
if not keep_listening:
self.soap_manager.sspd_factory.disconnect()
2018-07-31 17:23:21 +02:00
if not self.commands:
log.debug("trying miniupnpc fallback")
fallback = UPnPFallback()
success = yield fallback.discover()
2018-07-31 22:53:08 +02:00
self._miniupnpc_igd_url = fallback.device_url
2018-07-31 17:23:21 +02:00
if success:
log.info("successfully started miniupnpc fallback")
self.miniupnpc_runner = fallback
found = True
if not found:
log.warning("failed to find upnp gateway")
2018-07-30 23:48:20 +02:00
defer.returnValue(found)
2018-07-27 01:49:33 +02:00
def get_external_ip(self):
return self.commands.GetExternalIPAddress()
def add_port_mapping(self, external_port, protocol, internal_port, lan_address, description, lease_duration):
return self.commands.AddPortMapping(
NewRemoteHost=None, NewExternalPort=external_port, NewProtocol=protocol,
NewInternalPort=internal_port, NewInternalClient=lan_address,
NewEnabled=1, NewPortMappingDescription=description,
NewLeaseDuration=lease_duration
)
def get_port_mapping_by_index(self, index):
return self.commands.GetGenericPortMappingEntry(NewPortMappingIndex=index)
@defer.inlineCallbacks
def get_redirects(self):
redirects = []
cnt = 0
while True:
try:
redirect = yield self.get_port_mapping_by_index(cnt)
redirects.append(redirect)
cnt += 1
except UPnPError:
break
defer.returnValue(redirects)
2018-07-30 23:48:20 +02:00
@defer.inlineCallbacks
2018-07-27 01:49:33 +02:00
def get_specific_port_mapping(self, external_port, protocol):
2018-07-29 04:08:24 +02:00
"""
:param external_port: (int) external port to listen on
:param protocol: (str) 'UDP' | 'TCP'
:return: (int) <internal port>, (str) <lan ip>, (bool) <enabled>, (str) <description>, (int) <lease time>
"""
2018-07-30 23:48:20 +02:00
try:
result = yield self.commands.GetSpecificPortMappingEntry(
NewRemoteHost=None, NewExternalPort=external_port, NewProtocol=protocol
)
defer.returnValue(result)
except UPnPError as err:
if 'NoSuchEntryInArray' in str(err):
defer.returnValue(None)
raise err
2018-07-27 01:49:33 +02:00
def delete_port_mapping(self, external_port, protocol):
2018-07-29 04:08:24 +02:00
"""
:param external_port: (int) external port to listen on
:param protocol: (str) 'UDP' | 'TCP'
:return: None
"""
2018-07-27 01:49:33 +02:00
return self.commands.DeletePortMapping(
NewRemoteHost=None, NewExternalPort=external_port, NewProtocol=protocol
)
2018-07-29 04:08:24 +02:00
def get_rsip_nat_status(self):
"""
:return: (bool) NewRSIPAvailable, (bool) NewNATEnabled
"""
return self.commands.GetNATRSIPStatus()
def get_status_info(self):
"""
:return: (str) NewConnectionStatus, (str) NewLastConnectionError, (int) NewUptime
"""
return self.commands.GetStatusInfo()
def get_connection_type_info(self):
"""
:return: (str) NewConnectionType (str), NewPossibleConnectionTypes (str)
"""
return self.commands.GetConnectionTypeInfo()
2018-07-30 23:48:20 +02:00
@defer.inlineCallbacks
def get_next_mapping(self, port, protocol, description):
if protocol not in ["UDP", "TCP"]:
raise UPnPError("unsupported protocol: {}".format(protocol))
mappings = yield DeferredDict({p: self.get_specific_port_mapping(port, p)
for p in ["UDP", "TCP"]})
if not any((m is not None for m in mappings.values())): # there are no redirects for this port
yield self.add_port_mapping( # set one up
port, protocol, port, self.lan_address, description, 0
)
defer.returnValue(port)
if mappings[protocol]:
mapped_port = mappings[protocol][0]
mapped_address = mappings[protocol][1]
if mapped_port == port and mapped_address == self.lan_address: # reuse redirect to us
defer.returnValue(port)
port = yield self.get_next_mapping( # try the next port
port + 1, protocol, description
)
defer.returnValue(port)
def get_debug_info(self):
def default_byte(x):
if isinstance(x, bytes):
return x.decode()
return x
2018-07-31 22:53:08 +02:00
return json.dumps({
'txupnp': self.soap_manager.debug(),
'miniupnpc_igd_url': self._miniupnpc_igd_url
},
indent=2, default=default_byte
)