This commit is contained in:
Jack Robison 2018-10-12 13:18:46 -04:00
parent a34d62e256
commit e8ec88cbf8
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 21 additions and 21 deletions

View file

@ -3,8 +3,8 @@ import socket
import binascii
import asyncio
import logging
import typing
from collections import OrderedDict
from typing import Dict, List, Tuple
from asyncio.futures import Future
from asyncio.transports import DatagramTransport
from aioupnp.fault import UPnPError
@ -22,13 +22,13 @@ class SSDPProtocol(MulticastProtocol):
def __init__(self, multicast_address: str, lan_address: str) -> None:
super().__init__(multicast_address, lan_address)
self.lan_address = lan_address
self._pending_searches: List[Tuple[str, str, Future, asyncio.Handle]] = []
self._pending_searches: typing.List[typing.Tuple[str, str, Future, asyncio.Handle]] = []
self.notifications: List = []
self.notifications: typing.List = []
def _callback_m_search_ok(self, address: str, packet: SSDPDatagram) -> None:
tmp: list = []
set_futures: list = []
tmp: typing.List = []
set_futures: typing.List = []
while self._pending_searches:
t: tuple = self._pending_searches.pop()
a, s = t[0], t[1]
@ -45,14 +45,14 @@ class SSDPProtocol(MulticastProtocol):
while tmp:
self._pending_searches.append(tmp.pop())
def send_many_m_searches(self, address: str, packets: List[SSDPDatagram]):
def send_many_m_searches(self, address: str, packets: typing.List[SSDPDatagram]):
for packet in packets:
log.debug("send m search to %s: %s", address, packet.st)
self.transport.sendto(packet.encode().encode(), (address, SSDP_PORT))
async def m_search(self, address: str, timeout: float, datagrams: List[OrderedDict]) -> SSDPDatagram:
async def m_search(self, address: str, timeout: float, datagrams: typing.List[OrderedDict]) -> SSDPDatagram:
fut: Future = Future()
packets: List[SSDPDatagram] = []
packets: typing.List[SSDPDatagram] = []
for datagram in datagrams:
packet = SSDPDatagram(SSDPDatagram._M_SEARCH, datagram)
assert packet.st is not None
@ -97,12 +97,12 @@ class SSDPProtocol(MulticastProtocol):
async def listen_ssdp(lan_address: str, gateway_address: str,
ssdp_socket: socket.socket = None) -> Tuple[DatagramTransport, SSDPProtocol,
ssdp_socket: socket.socket = None) -> typing.Tuple[DatagramTransport, SSDPProtocol,
str, str]:
loop = asyncio.get_running_loop()
try:
sock = ssdp_socket or SSDPProtocol.create_multicast_socket(lan_address)
listen_result: Tuple = await loop.create_datagram_endpoint(
listen_result: typing.Tuple = await loop.create_datagram_endpoint(
lambda: SSDPProtocol(SSDP_IP_ADDRESS, lan_address), sock=sock
)
transport: DatagramTransport = listen_result[0]
@ -133,7 +133,7 @@ async def m_search(lan_address: str, gateway_address: str, datagram_args: Ordere
async def _fuzzy_m_search(lan_address: str, gateway_address: str, timeout: int = 30,
ssdp_socket: socket.socket = None) -> List[OrderedDict]:
ssdp_socket: socket.socket = None) -> typing.List[OrderedDict]:
transport, protocol, gateway_address, lan_address = await listen_ssdp(
lan_address, gateway_address, ssdp_socket
)
@ -155,7 +155,7 @@ async def _fuzzy_m_search(lan_address: str, gateway_address: str, timeout: int =
async def fuzzy_m_search(lan_address: str, gateway_address: str, timeout: int = 30,
ssdp_socket: socket.socket = None) -> Tuple[OrderedDict, SSDPDatagram]:
ssdp_socket: socket.socket = None) -> typing.Tuple[OrderedDict, SSDPDatagram]:
args_to_try = await _fuzzy_m_search(lan_address, gateway_address, timeout, ssdp_socket)
for args in args_to_try:
try:

View file

@ -73,14 +73,13 @@ class UPnP:
return await self.gateway.commands.GetExternalIPAddress()
@cli
async def add_port_mapping(self, external_port: int, protocol: str, internal_port, lan_address: str,
description: str) -> None:
await self.gateway.commands.AddPortMapping(
async def add_port_mapping(self, external_port: int, protocol: str, internal_port: int, lan_address: str,
description: str, lease_duration: int) -> None:
return await self.gateway.commands.AddPortMapping(
NewRemoteHost="", NewExternalPort=external_port, NewProtocol=protocol,
NewInternalPort=internal_port, NewInternalClient=lan_address,
NewEnabled=True, NewPortMappingDescription=description, NewLeaseDuration=""
NewEnabled=True, NewPortMappingDescription=description, NewLeaseDuration=str(lease_duration)
)
return
@cli
async def get_port_mapping_by_index(self, index: int) -> Dict:
@ -92,8 +91,8 @@ class UPnP:
}
return {}
async def _get_port_mapping_by_index(self, index: int) -> Union[None,
Tuple[Union[None, str], int, str, int, str, bool, str, int]]:
async def _get_port_mapping_by_index(self, index: int) -> Union[None, Tuple[Union[None, str], int, str,
int, str, bool, str, int]]:
try:
redirect = await self.gateway.commands.GetGenericPortMappingEntry(NewPortMappingIndex=index)
return redirect
@ -141,7 +140,8 @@ class UPnP:
)
@cli
async def get_next_mapping(self, port: int, protocol: str, description: str, internal_port: int=None) -> int:
async def get_next_mapping(self, port: int, protocol: str, description: str, internal_port: int=None,
lease_duration: int=86400) -> int:
if protocol not in ["UDP", "TCP"]:
raise UPnPError("unsupported protocol: {}".format(protocol))
internal_port = internal_port or port
@ -166,7 +166,7 @@ class UPnP:
port += 1
await self.add_port_mapping( # set one up
port, protocol, internal_port, self.lan_address, description
port, protocol, internal_port, self.lan_address, description, lease_duration
)
return port