This commit is contained in:
Jack Robison 2019-10-25 01:30:58 -04:00
parent 655d2ff623
commit 9570a843e3
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 43 additions and 44 deletions

View file

@ -3,7 +3,7 @@ from aioupnp.fault import UPnPError
from aioupnp.protocols.m_search_patterns import packet_generator
from aioupnp.serialization.ssdp import SSDPDatagram
from aioupnp.constants import SSDP_IP_ADDRESS
from aioupnp.protocols.ssdp import fuzzy_m_search, m_search, SSDPProtocol
from aioupnp.protocols.ssdp import m_search, SSDPProtocol
from tests import AsyncioTestCase, mock_tcp_and_udp
@ -42,14 +42,14 @@ class TestSSDP(AsyncioTestCase):
sent = []
with mock_tcp_and_udp(self.loop, udp_replies=replies, udp_expected_addr="10.0.0.1", sent_udp_packets=sent):
reply = await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop, unicast=True)
reply = await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop)
self.assertEqual(reply.encode(), self.reply_packet.encode())
self.assertListEqual(sent, [self.query_packet.encode().encode()])
self.assertIn(self.query_packet.encode().encode(), sent)
with self.assertRaises(UPnPError):
with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.1", udp_replies=replies):
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop, unicast=False)
with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.10", udp_replies=replies):
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop)
async def test_m_search_reply_multicast(self):
replies = {
@ -61,39 +61,40 @@ class TestSSDP(AsyncioTestCase):
reply = await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop)
self.assertEqual(reply.encode(), self.reply_packet.encode())
self.assertListEqual(sent, [self.query_packet.encode().encode()])
self.assertIn(self.query_packet.encode().encode(), sent)
with self.assertRaises(UPnPError):
with mock_tcp_and_udp(self.loop, udp_replies=replies, udp_expected_addr="10.0.0.1"):
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop, unicast=True)
with mock_tcp_and_udp(self.loop, udp_replies=replies, udp_expected_addr="10.0.0.10"):
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop)
async def test_packets_sent_fuzzy_m_search(self):
sent = []
with self.assertRaises(UPnPError):
with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.1", sent_udp_packets=sent):
await fuzzy_m_search("10.0.0.2", "10.0.0.1", 1, self.loop)
self.assertListEqual(sent, self.byte_packets)
async def test_packets_fuzzy_m_search(self):
replies = {
(self.query_packet.encode().encode(), (SSDP_IP_ADDRESS, 1900)): self.reply_packet.encode().encode()
}
sent = []
with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.1", udp_replies=replies, sent_udp_packets=sent):
args, reply = await fuzzy_m_search("10.0.0.2", "10.0.0.1", 1, self.loop)
self.assertEqual(reply.encode(), self.reply_packet.encode())
self.assertEqual(args, self.successful_args)
async def test_packets_sent_fuzzy_m_search_ignore_invalid_datagram_replies(self):
sent = []
with self.assertRaises(UPnPError):
with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.1", sent_udp_packets=sent,
add_potato_datagrams=True):
await fuzzy_m_search("10.0.0.2", "10.0.0.1", 1, self.loop)
self.assertListEqual(sent, self.byte_packets)
# async def test_packets_sent_fuzzy_m_search(self):
# sent = []
#
# with self.assertRaises(UPnPError):
# with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.1", sent_udp_packets=sent):
# await fuzzy_m_search("10.0.0.2", "10.0.0.1", 1, self.loop)
# for packet in self.byte_packets:
# self.assertIn(packet, sent)
#
# async def test_packets_fuzzy_m_search(self):
# replies = {
# (self.query_packet.encode().encode(), (SSDP_IP_ADDRESS, 1900)): self.reply_packet.encode().encode()
# }
# sent = []
#
# with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.1", udp_replies=replies, sent_udp_packets=sent):
# args, reply = await fuzzy_m_search("10.0.0.2", "10.0.0.1", 1, self.loop)
#
# self.assertEqual(reply.encode(), self.reply_packet.encode())
# self.assertEqual(args, self.successful_args)
#
# async def test_packets_sent_fuzzy_m_search_ignore_invalid_datagram_replies(self):
# sent = []
#
# with self.assertRaises(UPnPError):
# with mock_tcp_and_udp(self.loop, udp_expected_addr="10.0.0.1", sent_udp_packets=sent,
# add_potato_datagrams=True):
# await fuzzy_m_search("10.0.0.2", "10.0.0.1", 1, self.loop)
#
# for packet in self.byte_packets:
# self.assertIn(packet, sent)

View file

@ -10,7 +10,6 @@ from aioupnp.__main__ import main
m_search_cli_result = """{
"lan_address": "10.0.0.2",
"gateway_address": "10.0.0.1",
"m_search_kwargs": "--HOST=239.255.255.250:1900 --MAN=ssdp:discover --MX=1 --ST=urn:schemas-upnp-org:device:WANDevice:1",
"discover_reply": {
"CACHE_CONTROL": "max-age=1800",
"LOCATION": "http://10.0.0.1:49152/InternetGatewayDevice.xml",
@ -22,14 +21,13 @@ m_search_cli_result = """{
m_search_help_msg = """aioupnp [-h] [--debug_logging] m_search [--lan_address=<str>] [--gateway_address=<str>]
[--timeout=<int>] [--unicast] [--interface_name=<str>] [--<header key>=<header value>, ...]
[--timeout=<int>] [--interface_name=<str>] [--<header key>=<header value>, ...]
Perform a M-SEARCH for a upnp gateway.
:param lan_address: (str) the local interface ipv4 address
:param gateway_address: (str) the gateway ipv4 address
:param timeout: (int) m search timeout
:param unicast: (bool) use unicast
:param interface_name: (str) name of the network interface
:param igd_args: (dict) case sensitive M-SEARCH headers. if used all headers to be used must be provided.
@ -219,7 +217,7 @@ class TestCLI(AsyncioTestCase):
actual_output = StringIO()
timeout_msg = "aioupnp encountered an error: M-SEARCH for 10.0.0.1:1900 timed out\n"
with contextlib.redirect_stdout(actual_output):
with mock_tcp_and_udp(self.loop, '10.0.0.1', tcp_replies=self.scpd_replies, udp_replies=self.udp_replies):
with mock_tcp_and_udp(self.loop, '10.0.0.1', tcp_replies={}, udp_replies={}):
main(
[None, '--timeout=1', '--gateway_address=10.0.0.1', '--lan_address=10.0.0.2', 'm-search'],
self.loop
@ -230,7 +228,7 @@ class TestCLI(AsyncioTestCase):
with contextlib.redirect_stdout(actual_output):
with mock_tcp_and_udp(self.loop, '10.0.0.1', tcp_replies=self.scpd_replies, udp_replies=self.udp_replies):
main(
[None, '--timeout=1', '--gateway_address=10.0.0.1', '--lan_address=10.0.0.2', '--unicast', 'm-search'],
[None, '--timeout=1', '--gateway_address=10.0.0.1', '--lan_address=10.0.0.2', 'm-search'],
self.loop
)
self.assertEqual(m_search_cli_result, actual_output.getvalue())

View file

@ -231,7 +231,7 @@ class TestDiscoverDLinkDIR890L(AsyncioTestCase):
with self.assertRaises(UPnPError) as e2:
with mock_tcp_and_udp(self.loop):
await Gateway.discover_gateway(self.client_address, self.gateway_info['gateway_address'], 2,
unicast=False, loop=self.loop)
loop=self.loop)
self.assertEqual(str(e1.exception), f"M-SEARCH for {self.gateway_info['gateway_address']}:1900 timed out")
self.assertEqual(str(e2.exception), f"M-SEARCH for {self.gateway_info['gateway_address']}:1900 timed out")