Compare commits

..

1 commit

Author SHA1 Message Date
Jack Robison
1e50335429
fix content pattern regex
-fixes `<?xml version="1.0" encoding="utf-8"?>` breaking vs typical `<?xml version="1.0"?>`
2020-10-15 17:34:12 -04:00
28 changed files with 217 additions and 1228 deletions

View file

@ -1,31 +0,0 @@
# This workflows will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
name: pypi release
on:
release:
types: [created]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.6'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
run: |
python setup.py sdist
twine upload dist/*

View file

@ -1,53 +0,0 @@
name: CI
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
code-quality:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.8]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install mypy lxml flake8
pip install -e .
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
- name: mypy
run: |
mypy aioupnp
tests:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install coverage
pip install -e .
- name: Test with coverage
run: |
coverage run -m unittest discover -v tests
bash <(curl -s https://codecov.io/bash)

85
.travis.yml Normal file
View file

@ -0,0 +1,85 @@
sudo: required
dist: xenial
language: python
python: "3.8"
jobs:
include:
- stage: code quality
name: "mypy"
before_install:
- pip install mypy lxml
- pip install -e .
script:
- mypy aioupnp --txt-report . --scripts-are-modules; cat index.txt; rm index.txt
- &tests
stage: test
name: "Unit Tests w/ Python 3.8"
python: "3.8"
before_install:
- pip install pylint coverage
- pip install -e .
script:
- HOME=/tmp coverage run -m unittest discover -v tests
after_success:
- bash <(curl -s https://codecov.io/bash)
- <<: *tests
name: "Unit Tests w/ Python 3.7"
python: "3.7"
- <<: *tests
name: "Unit Tests w/ Python 3.6"
python: "3.6"
#
#if: tag IS present
# - &build
# name: "Linux Release"
# python: "3.6"
# install:
# - pip install pyinstaller
# - pip install -e .
#
# script:
# - pyinstaller -F -n aioupnp aioupnp/__main__.py
# - chmod +x dist/aioupnp
# - zip -j dist/aioupnp-${OS}.zip dist/aioupnp
#
# env: OS=linux
#
# addons:
# artifacts:
# working_dir: dist
# paths:
# - aioupnp-${OS}.zip
#
# - <<: *build
# name: "Mac Release"
# os: osx
# osx_image: xcode9.4
# language: generic
# env: OS=mac
# install:
# - pip3 install pyinstaller
# - pip3 install -e .
#
# - <<: *build
# name: "Windows Release"
# language: generic
# services:
# - docker
# install:
# - docker pull cdrx/pyinstaller-windows:python3-32bit
# script:
# - docker run -v "$(pwd):/src/aioupnp" cdrx/pyinstaller-windows:python3-32bit aioupnp/wine_build.sh
# - sudo zip -j dist/aioupnp-windows.zip dist/aioupnp.exe
# addons:
# artifacts:
# working_dir: dist
# paths:
# - aioupnp-windows.zip

View file

@ -1,4 +1,4 @@
[![Build Status](https://github.com/lbryio/aioupnp/workflows/CI/badge.svg) [![Build Status](https://travis-ci.org/lbryio/aioupnp.svg?branch=master)](https://travis-ci.org/lbryio/aioupnp)
[![codecov](https://codecov.io/gh/lbryio/aioupnp/branch/master/graph/badge.svg)](https://codecov.io/gh/lbryio/aioupnp) [![codecov](https://codecov.io/gh/lbryio/aioupnp/branch/master/graph/badge.svg)](https://codecov.io/gh/lbryio/aioupnp)
[![PyPI version](https://badge.fury.io/py/aioupnp.svg)](https://badge.fury.io/py/aioupnp) [![PyPI version](https://badge.fury.io/py/aioupnp.svg)](https://badge.fury.io/py/aioupnp)
[![Python 3.6](https://img.shields.io/badge/python-3.6-blue.svg)](https://www.python.org/downloads/release/python-360/) [![Python 3.6](https://img.shields.io/badge/python-3.6-blue.svg)](https://www.python.org/downloads/release/python-360/)
@ -19,12 +19,12 @@ Verify python is version 3.6-8
python --version python --version
``` ```
#### Installation for normal usage Installation for normal usage
``` ```
pip install aioupnp pip install aioupnp
``` ```
#### Installation for development Installation for development
``` ```
git clone https://github.com/lbryio/aioupnp.git git clone https://github.com/lbryio/aioupnp.git
cd aioupnp cd aioupnp
@ -75,7 +75,7 @@ aioupnp [-h] [--debug_logging] [--interface=<interface>] [--gateway_address=<gat
aioupnp delete_port_mapping --external_port=1234 --protocol=TCP aioupnp delete_port_mapping --external_port=1234 --protocol=TCP
#### M-Search headers #### M-Search headers
UPnP uses a multicast protocol (SSDP) to locate the gateway. Gateway discovery is automatic by default, but you may provide specific headers for the search to use to override automatic discovery. UPnP uses multicast protocol - SSDP - to locate the gateway. Gateway discovery is automatic by default, but you may provide specific headers for the search to use to override automatic discovery.
If m-search headers are provided as keyword arguments then all of the headers to be used must be provided, in the order they are to be used. For example: If m-search headers are provided as keyword arguments then all of the headers to be used must be provided, in the order they are to be used. For example:
@ -109,33 +109,16 @@ By default, the network device will be automatically discovered. The interface m
## Troubleshooting ## Troubleshooting
If `aioupnp` is failing with m-search timeouts this means the UPnP gateway (the router) isn't being found at all. To see if this error is expected try running m_search with debug logging, which will print out the packets sent and received:
#### Debug logging
To enable verbose debug logging, add the `--debug_logging` argument before the command
aioupnp --debug_logging m_search aioupnp --debug_logging m_search
If you only see packets being sent or the replies are only from devices that aren't your router (smart devices, speakers, etc), then there are three options: #### It really doesn't work
* your router does not support UPnP (this is unlikely) If aioupnp doesn't work with a device, a debugging report can be collected with `aioupnp gather_debug_info`.
* UPnP is turned off in the web gui for your router (more likely)
* `aioupnp` has a bug (very likely if you don't see your router manufacturer doing well in the supported devices table)
If you see replies from the router but it still fails, then it's likely a bug in aioupnp. This will attempt to discover the UPnP gateway, and then perform a functionality check where it will request the external address and existing port mappings before attempting to make and remove a port mapping. The final result is the zipped packet dump of these attempts, which allows writing tests replaying it.
If there are no replies and UPnP is certainly turned on, then a local firewall is the likely culprit.
## Sending a bug report
If it still doesn't work, you can send a bug report using an included script. This script will try finding the UPnP gateway using `aioupnp` as well as `miniupnpc` and then try add and remove a port mapping using each library. The script does this while capturing the packets sent/received, which makes figuring out what went wrong possible. The script will create a file with this packet capture (`aioupnp-bug-report.json`) and automatically send it.
Note: the bug report script currently does not work on MacOS
```
git clone https://github.com/lbryio/aioupnp.git
cd aioupnp
python3 -m pip install -e .
python3 -m pip install --user certifi aiohttp miniupnpc
sudo -E python3 generate_bug_report.py
```
## License ## License

View file

@ -1,4 +1,4 @@
__version__ = "0.0.18" __version__ = "0.0.17"
__author__ = "Jack Robison" __author__ = "Jack Robison"
__maintainer__ = "Jack Robison" __maintainer__ = "Jack Robison"
__license__ = "MIT" __license__ = "MIT"

View file

@ -121,4 +121,4 @@ def main(argv: typing.Optional[typing.List[typing.Optional[str]]] = None,
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) # pragma: no cover sys.exit(main())

View file

@ -5,7 +5,6 @@ import logging
from aioupnp.protocols.scpd import scpd_post from aioupnp.protocols.scpd import scpd_post
from aioupnp.device import Service from aioupnp.device import Service
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
from aioupnp.util import is_valid_public_ipv4
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -135,7 +134,7 @@ class SOAPCommands:
def is_registered(self, name: str) -> bool: def is_registered(self, name: str) -> bool:
if name not in self.SOAP_COMMANDS: if name not in self.SOAP_COMMANDS:
raise ValueError("unknown command") # pragma: no cover raise ValueError("unknown command")
for service in self._registered.values(): for service in self._registered.values():
if name in service: if name in service:
return True return True
@ -143,11 +142,11 @@ class SOAPCommands:
def get_service(self, name: str) -> Service: def get_service(self, name: str) -> Service:
if name not in self.SOAP_COMMANDS: if name not in self.SOAP_COMMANDS:
raise ValueError("unknown command") # pragma: no cover raise ValueError("unknown command")
for service, commands in self._registered.items(): for service, commands in self._registered.items():
if name in commands: if name in commands:
return service return service
raise ValueError(name) # pragma: no cover raise ValueError(name)
def _register_soap_wrapper(self, name: str) -> None: def _register_soap_wrapper(self, name: str) -> None:
annotations: typing.Dict[str, typing.Any] = typing.get_type_hints(getattr(self, name)) annotations: typing.Dict[str, typing.Any] = typing.get_type_hints(getattr(self, name))
@ -174,7 +173,7 @@ class SOAPCommands:
self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, result, None, time.time())) self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, result, None, time.time()))
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): if isinstance(err, asyncio.CancelledError):
raise # pragma: no cover raise
self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, None, err, time.time())) self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, None, err, time.time()))
raise UPnPError(f"Raised {str(type(err).__name__)}({str(err)}) parsing response for {name}") raise UPnPError(f"Raised {str(type(err).__name__)}({str(err)}) parsing response for {name}")
return result return result
@ -201,7 +200,7 @@ class SOAPCommands:
"""Returns None""" """Returns None"""
name = "AddPortMapping" name = "AddPortMapping"
if not self.is_registered(name): if not self.is_registered(name):
raise NotImplementedError() # pragma: no cover raise NotImplementedError()
assert name in self._wrappers_kwargs assert name in self._wrappers_kwargs
await self._wrappers_kwargs[name]( await self._wrappers_kwargs[name](
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol, NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol,
@ -217,7 +216,7 @@ class SOAPCommands:
""" """
name = "GetGenericPortMappingEntry" name = "GetGenericPortMappingEntry"
if not self.is_registered(name): if not self.is_registered(name):
raise NotImplementedError() # pragma: no cover raise NotImplementedError()
assert name in self._wrappers_kwargs assert name in self._wrappers_kwargs
result: GetGenericPortMappingEntryResponse = await self._wrappers_kwargs[name]( result: GetGenericPortMappingEntryResponse = await self._wrappers_kwargs[name](
NewPortMappingIndex=NewPortMappingIndex NewPortMappingIndex=NewPortMappingIndex
@ -229,7 +228,7 @@ class SOAPCommands:
"""Returns (NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration)""" """Returns (NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration)"""
name = "GetSpecificPortMappingEntry" name = "GetSpecificPortMappingEntry"
if not self.is_registered(name): if not self.is_registered(name):
raise NotImplementedError() # pragma: no cover raise NotImplementedError()
assert name in self._wrappers_kwargs assert name in self._wrappers_kwargs
result: GetSpecificPortMappingEntryResponse = await self._wrappers_kwargs[name]( result: GetSpecificPortMappingEntryResponse = await self._wrappers_kwargs[name](
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
@ -240,7 +239,7 @@ class SOAPCommands:
"""Returns None""" """Returns None"""
name = "DeletePortMapping" name = "DeletePortMapping"
if not self.is_registered(name): if not self.is_registered(name):
raise NotImplementedError() # pragma: no cover raise NotImplementedError()
assert name in self._wrappers_kwargs assert name in self._wrappers_kwargs
await self._wrappers_kwargs[name]( await self._wrappers_kwargs[name](
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
@ -251,18 +250,18 @@ class SOAPCommands:
"""Returns (NewExternalIPAddress)""" """Returns (NewExternalIPAddress)"""
name = "GetExternalIPAddress" name = "GetExternalIPAddress"
if not self.is_registered(name): if not self.is_registered(name):
raise NotImplementedError() # pragma: no cover raise NotImplementedError()
assert name in self._wrappers_no_args assert name in self._wrappers_no_args
external_ip: str = await self._wrappers_no_args[name]() result: str = await self._wrappers_no_args[name]()
if not is_valid_public_ipv4(external_ip): if not result:
raise UPnPError(f"Got invalid external ipv4 address: {external_ip}") raise UPnPError("Got null external ip address")
return external_ip return result
# async def GetNATRSIPStatus(self) -> Tuple[bool, bool]: # async def GetNATRSIPStatus(self) -> Tuple[bool, bool]:
# """Returns (NewRSIPAvailable, NewNATEnabled)""" # """Returns (NewRSIPAvailable, NewNATEnabled)"""
# name = "GetNATRSIPStatus" # name = "GetNATRSIPStatus"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[bool, bool] = await self._wrappers_no_args[name]() # result: Tuple[bool, bool] = await self._wrappers_no_args[name]()
# return result[0], result[1] # return result[0], result[1]
@ -271,7 +270,7 @@ class SOAPCommands:
# """Returns None""" # """Returns None"""
# name = "SetConnectionType" # name = "SetConnectionType"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_kwargs # assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewConnectionType=NewConnectionType) # await self._wrappers_kwargs[name](NewConnectionType=NewConnectionType)
# return None # return None
@ -280,7 +279,7 @@ class SOAPCommands:
# """Returns (NewConnectionType, NewPossibleConnectionTypes)""" # """Returns (NewConnectionType, NewPossibleConnectionTypes)"""
# name = "GetConnectionTypeInfo" # name = "GetConnectionTypeInfo"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[str, str] = await self._wrappers_no_args[name]() # result: Tuple[str, str] = await self._wrappers_no_args[name]()
# return result # return result
@ -289,7 +288,7 @@ class SOAPCommands:
# """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)""" # """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)"""
# name = "GetStatusInfo" # name = "GetStatusInfo"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[str, str, int] = await self._wrappers_no_args[name]() # result: Tuple[str, str, int] = await self._wrappers_no_args[name]()
# return result # return result
@ -298,7 +297,7 @@ class SOAPCommands:
# """Returns None""" # """Returns None"""
# name = "ForceTermination" # name = "ForceTermination"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# await self._wrappers_no_args[name]() # await self._wrappers_no_args[name]()
# return None # return None
@ -307,7 +306,7 @@ class SOAPCommands:
# """Returns None""" # """Returns None"""
# name = "RequestConnection" # name = "RequestConnection"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# await self._wrappers_no_args[name]() # await self._wrappers_no_args[name]()
# return None # return None
@ -317,7 +316,7 @@ class SOAPCommands:
# NewPhysicalLinkStatus)""" # NewPhysicalLinkStatus)"""
# name = "GetCommonLinkProperties" # name = "GetCommonLinkProperties"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[str, int, int, str] = await self._wrappers_no_args[name]() # result: Tuple[str, int, int, str] = await self._wrappers_no_args[name]()
# return result # return result
@ -326,7 +325,7 @@ class SOAPCommands:
# """Returns (NewTotalBytesSent)""" # """Returns (NewTotalBytesSent)"""
# name = "GetTotalBytesSent" # name = "GetTotalBytesSent"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]() # result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0] # return result[0]
@ -335,7 +334,7 @@ class SOAPCommands:
# """Returns (NewTotalBytesReceived)""" # """Returns (NewTotalBytesReceived)"""
# name = "GetTotalBytesReceived" # name = "GetTotalBytesReceived"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]() # result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0] # return result[0]
@ -344,7 +343,7 @@ class SOAPCommands:
# """Returns (NewTotalPacketsSent)""" # """Returns (NewTotalPacketsSent)"""
# name = "GetTotalPacketsSent" # name = "GetTotalPacketsSent"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]() # result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0] # return result[0]
@ -353,7 +352,7 @@ class SOAPCommands:
# """Returns (NewTotalPacketsReceived)""" # """Returns (NewTotalPacketsReceived)"""
# name = "GetTotalPacketsReceived" # name = "GetTotalPacketsReceived"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[int] = await self._wrappers_no_args[name]() # result: Tuple[int] = await self._wrappers_no_args[name]()
# return result[0] # return result[0]
@ -363,7 +362,7 @@ class SOAPCommands:
# Layer1DownstreamMaxBitRate, Uptime)""" # Layer1DownstreamMaxBitRate, Uptime)"""
# name = "X_GetICSStatistics" # name = "X_GetICSStatistics"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[int, int, int, int, str, str] = await self._wrappers_no_args[name]() # result: Tuple[int, int, int, int, str, str] = await self._wrappers_no_args[name]()
# return result # return result
@ -372,7 +371,7 @@ class SOAPCommands:
# """Returns (NewDefaultConnectionService)""" # """Returns (NewDefaultConnectionService)"""
# name = "GetDefaultConnectionService" # name = "GetDefaultConnectionService"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[str] = await self._wrappers_no_args[name]() # result: Tuple[str] = await self._wrappers_no_args[name]()
# return result[0] # return result[0]
@ -381,7 +380,7 @@ class SOAPCommands:
# """Returns (None)""" # """Returns (None)"""
# name = "SetDefaultConnectionService" # name = "SetDefaultConnectionService"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_kwargs # assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewDefaultConnectionService=NewDefaultConnectionService) # await self._wrappers_kwargs[name](NewDefaultConnectionService=NewDefaultConnectionService)
# return None # return None
@ -389,7 +388,7 @@ class SOAPCommands:
# async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None: # async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None:
# name = "SetEnabledForInternet" # name = "SetEnabledForInternet"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_kwargs # assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewEnabledForInternet=NewEnabledForInternet) # await self._wrappers_kwargs[name](NewEnabledForInternet=NewEnabledForInternet)
# return None # return None
@ -397,7 +396,7 @@ class SOAPCommands:
# async def GetEnabledForInternet(self) -> bool: # async def GetEnabledForInternet(self) -> bool:
# name = "GetEnabledForInternet" # name = "GetEnabledForInternet"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[bool] = await self._wrappers_no_args[name]() # result: Tuple[bool] = await self._wrappers_no_args[name]()
# return result[0] # return result[0]
@ -405,7 +404,7 @@ class SOAPCommands:
# async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int) -> None: # async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int) -> None:
# name = "GetMaximumActiveConnections" # name = "GetMaximumActiveConnections"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_kwargs # assert name in self._wrappers_kwargs
# await self._wrappers_kwargs[name](NewActiveConnectionIndex=NewActiveConnectionIndex) # await self._wrappers_kwargs[name](NewActiveConnectionIndex=NewActiveConnectionIndex)
# return None # return None
@ -414,7 +413,7 @@ class SOAPCommands:
# """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID""" # """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID"""
# name = "GetActiveConnections" # name = "GetActiveConnections"
# if not self.is_registered(name): # if not self.is_registered(name):
# raise NotImplementedError() # pragma: no cover # raise NotImplementedError()
# assert name in self._wrappers_no_args # assert name in self._wrappers_no_args
# result: Tuple[str, str] = await self._wrappers_no_args[name]() # result: Tuple[str, str] = await self._wrappers_no_args[name]()
# return result # return result

View file

@ -1,10 +1,11 @@
import socket
from collections import OrderedDict from collections import OrderedDict
import typing import typing
import netifaces import netifaces
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
def get_netifaces(): # pragma: no cover def get_netifaces():
return netifaces return netifaces
@ -24,11 +25,11 @@ def _get_gateways() -> typing.Dict[typing.Union[str, int],
def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]: def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]:
gateways = _get_gateways() gateways = _get_gateways()
infos = gateways[netifaces.AF_INET] infos = gateways[socket.AF_INET]
assert isinstance(infos, list), TypeError(f"expected list from netifaces, got a dict") assert isinstance(infos, list), TypeError(f"expected list from netifaces, got a dict")
interface_infos: typing.List[typing.Tuple[str, str, bool]] = infos interface_infos: typing.List[typing.Tuple[str, str, bool]] = infos
result: typing.Dict[str, typing.Tuple[str, str]] = OrderedDict( result: typing.Dict[str, typing.Tuple[str, str]] = OrderedDict(
(interface_name, (router_address, ifaddresses(interface_name)[netifaces.AF_INET][0]['addr'])) (interface_name, (router_address, ifaddresses(interface_name)[socket.AF_INET][0]['addr']))
for router_address, interface_name, _ in interface_infos for router_address, interface_name, _ in interface_infos
) )
for interface_name in _get_interfaces(): for interface_name in _get_interfaces():
@ -42,7 +43,7 @@ def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]:
_default = gateways['default'] _default = gateways['default']
assert isinstance(_default, dict), TypeError(f"expected dict from netifaces, got a list") assert isinstance(_default, dict), TypeError(f"expected dict from netifaces, got a list")
default: typing.Dict[int, typing.Tuple[str, str]] = _default default: typing.Dict[int, typing.Tuple[str, str]] = _default
result['default'] = result[default[netifaces.AF_INET][1]] result['default'] = result[default[socket.AF_INET][1]]
return result return result

View file

@ -1,4 +1,3 @@
import sys
import struct import struct
import socket import socket
import typing import typing
@ -7,17 +6,11 @@ from asyncio.transports import DatagramTransport
from unittest import mock from unittest import mock
SOCKET_TYPES = (socket.SocketType, mock.MagicMock)
if sys.version_info >= (3, 8): # pragma: no cover
from asyncio.trsock import TransportSocket
SOCKET_TYPES = (socket.SocketType, TransportSocket, mock.MagicMock)
def _get_sock(transport: typing.Optional[DatagramTransport]) -> typing.Optional[socket.socket]: def _get_sock(transport: typing.Optional[DatagramTransport]) -> typing.Optional[socket.socket]:
if transport is None or not hasattr(transport, "_extra"): if transport is None or not hasattr(transport, "_extra"):
return None return None
sock: typing.Optional[socket.socket] = transport.get_extra_info('socket', None) sock: typing.Optional[socket.socket] = transport.get_extra_info('socket', None)
assert sock is None or isinstance(sock, SOCKET_TYPES) assert sock is None or isinstance(sock, (socket.SocketType, mock.MagicMock))
return sock return sock

View file

@ -56,7 +56,6 @@ class SCPDHTTPClientProtocol(Protocol):
self._response_msg = b"" self._response_msg = b""
self._content_length = 0 self._content_length = 0
self._got_headers = False self._got_headers = False
self._has_content_length = True
self._headers: typing.Dict[bytes, bytes] = {} self._headers: typing.Dict[bytes, bytes] = {}
self._body = b"" self._body = b""
self.transport: typing.Optional[asyncio.WriteTransport] = None self.transport: typing.Optional[asyncio.WriteTransport] = None
@ -68,57 +67,35 @@ class SCPDHTTPClientProtocol(Protocol):
return None return None
def data_received(self, data: bytes) -> None: def data_received(self, data: bytes) -> None:
if self.finished.done(): # possible to hit during tests
return
self.response_buff += data self.response_buff += data
for i, line in enumerate(self.response_buff.split(b'\r\n')): for i, line in enumerate(self.response_buff.split(b'\r\n')):
if not line: # we hit the blank line between the headers and the body if not line: # we hit the blank line between the headers and the body
if i == (len(self.response_buff.split(b'\r\n')) - 1): if i == (len(self.response_buff.split(b'\r\n')) - 1):
return None # the body is still yet to be written return None # the body is still yet to be written
if not self._got_headers: if not self._got_headers:
try: self._headers, self._response_code, self._response_msg = parse_headers(
self._headers, self._response_code, self._response_msg = parse_headers( b'\r\n'.join(self.response_buff.split(b'\r\n')[:i])
b'\r\n'.join(self.response_buff.split(b'\r\n')[:i]) )
)
except ValueError as err:
self.finished.set_exception(UPnPError(str(err)))
return
content_length = get_dict_val_case_insensitive( content_length = get_dict_val_case_insensitive(
self._headers, b'Content-Length' self._headers, b'Content-Length'
) )
if content_length is not None: if content_length is None:
self._content_length = int(content_length) return None
else: self._content_length = int(content_length or 0)
self._has_content_length = False
self._got_headers = True self._got_headers = True
if self._got_headers and self._has_content_length: body = b'\r\n'.join(self.response_buff.split(b'\r\n')[i+1:])
body = b'\r\n'.join(self.response_buff.split(b'\r\n')[i+1:]) if self._content_length == len(body):
if self._content_length == len(body):
self.finished.set_result((self.response_buff, body, self._response_code, self._response_msg))
elif self._content_length > len(body):
pass
else:
self.finished.set_exception(
UPnPError(
"too many bytes written to response (%i vs %i expected)" % (
len(body), self._content_length
)
)
)
elif any(map(self.response_buff.endswith, (b"</root>\r\n", b"</scpd>\r\n"))):
# Actiontec has a router that doesn't give a Content-Length for the gateway xml
body = b'\r\n'.join(self.response_buff.split(b'\r\n')[i+1:])
self.finished.set_result((self.response_buff, body, self._response_code, self._response_msg)) self.finished.set_result((self.response_buff, body, self._response_code, self._response_msg))
elif len(self.response_buff) >= 65535: elif self._content_length > len(body):
pass
else:
self.finished.set_exception( self.finished.set_exception(
UPnPError( UPnPError(
"too many bytes written to response (%i) with unspecified content length" % len(self.response_buff) "too many bytes written to response (%i vs %i expected)" % (
len(body), self._content_length
) )
) )
return )
else:
# needed for the actiontec case
pass
return None return None
return None return None

View file

@ -171,8 +171,8 @@ async def multi_m_search(lan_address: str, gateway_address: str, timeout: int =
protocol, gateway_address, lan_address = await listen_ssdp( protocol, gateway_address, lan_address = await listen_ssdp(
lan_address, gateway_address, loop lan_address, gateway_address, loop
) )
fut = protocol.send_m_searches( fut = asyncio.ensure_future(protocol.send_m_searches(
address=gateway_address, datagrams=list(packet_generator()) address=gateway_address, datagrams=list(packet_generator())
) ), loop=loop)
loop.call_later(timeout, lambda: None if not fut or fut.done() else fut.cancel()) loop.call_later(timeout, lambda: None if not fut or fut.done() else fut.cancel())
return protocol return protocol

View file

@ -39,7 +39,7 @@ def serialize_scpd_get(path: str, address: str) -> bytes:
def deserialize_scpd_get_response(content: bytes) -> Dict[str, Any]: def deserialize_scpd_get_response(content: bytes) -> Dict[str, Any]:
if XML_VERSION_PREFIX.encode() in content: if XML_VERSION_PREFIX.encode() in content:
parsed: List[Tuple[str, str]] = CONTENT_PATTERN.findall(content.decode()) parsed: List[Tuple[bytes, bytes]] = CONTENT_PATTERN.findall(content.decode())
xml_dict = xml_to_dict('' if not parsed else parsed[0][0]) xml_dict = xml_to_dict('' if not parsed else parsed[0][0])
return parse_device_dict(xml_dict) return parse_device_dict(xml_dict)
return {} return {}

View file

@ -13,7 +13,7 @@ CONTENT_NO_XML_VERSION_PATTERN = re.compile(
def serialize_soap_post(method: str, param_names: typing.List[str], service_id: bytes, gateway_address: bytes, def serialize_soap_post(method: str, param_names: typing.List[str], service_id: bytes, gateway_address: bytes,
control_url: bytes, **kwargs: typing.Dict[str, str]) -> bytes: control_url: bytes, **kwargs: typing.Dict[str, str]) -> bytes:
args = "".join(f"<{param_name}>{kwargs.get(param_name, '')}</{param_name}>" for param_name in param_names) args = "".join(f"<{n}>{kwargs.get(n)}</{n}>" for n in param_names)
soap_body = (f'\r\n{XML_VERSION}\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" ' soap_body = (f'\r\n{XML_VERSION}\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" '
f's:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body>' f's:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body>'
f'<u:{method} xmlns:u="{service_id.decode()}">{args}</u:{method}></s:Body></s:Envelope>') f'<u:{method} xmlns:u="{service_id.decode()}">{args}</u:{method}></s:Body></s:Envelope>')

View file

@ -3,13 +3,23 @@ import logging
import binascii import binascii
import json import json
from collections import OrderedDict from collections import OrderedDict
from typing import List, Optional, Dict, Union, Callable from typing import List, Optional, Dict, Union, Tuple, Callable
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
from aioupnp.constants import line_separator from aioupnp.constants import line_separator
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
_template = "(?i)^(%s):[ ]*(.*)$" _template = "(?i)^(%s):[ ]*(.*)$"
vendor_pattern = re.compile("^([\w|\d]*)\.([\w|\d]*\.com):([ \"|\w|\d\:]*)$")
def match_vendor(line: str) -> Optional[Tuple[str, str]]:
match: List[Tuple[str, str, str]] = vendor_pattern.findall(line)
if match:
vendor_key: str = match[-1][0].lstrip(" ").rstrip(" ")
vendor_value: str = match[-1][2].lstrip(" ").rstrip(" ")
return vendor_key, vendor_value
return None
def compile_find(pattern: str) -> Callable[[str], Optional[str]]: def compile_find(pattern: str) -> Callable[[str], Optional[str]]:
@ -59,6 +69,8 @@ class SSDPDatagram:
_OK: "m-search response" _OK: "m-search response"
} }
_vendor_field_pattern = vendor_pattern
_required_fields: Dict[str, List[str]] = { _required_fields: Dict[str, List[str]] = {
_M_SEARCH: [ _M_SEARCH: [
'host', 'host',
@ -118,7 +130,10 @@ class SSDPDatagram:
def get_cli_igd_kwargs(self) -> str: def get_cli_igd_kwargs(self) -> str:
fields = [] fields = []
for field in self._field_order: for field in self._field_order:
fields.append("--%s=%s" % (self._case_mappings.get(field, field), getattr(self, field, None))) v = getattr(self, field, None)
if v is None:
raise UPnPError("missing required field %s" % field)
fields.append("--%s=%s" % (self._case_mappings.get(field, field), v))
return " ".join(fields) return " ".join(fields)
def __repr__(self) -> str: def __repr__(self) -> str:
@ -159,6 +174,11 @@ class SSDPDatagram:
raise UPnPError( raise UPnPError(
f"failed to decode datagram: {binascii.hexlify(datagram).decode()}" f"failed to decode datagram: {binascii.hexlify(datagram).decode()}"
) )
for attr_name in packet._required_fields[packet._packet_type]:
if getattr(packet, attr_name, None) is None:
raise UPnPError(
"required field for {} is missing from m-search response: {}".format(packet._packet_type, attr_name)
)
return packet return packet
@classmethod @classmethod
@ -186,6 +206,10 @@ class SSDPDatagram:
matched = True matched = True
matched_keys.append(name) matched_keys.append(name)
break break
if not matched:
matched_vendor = match_vendor(line)
if matched_vendor and matched_vendor[0] not in result:
result[matched_vendor[0]] = matched_vendor[1]
return result return result
@classmethod @classmethod

View file

@ -37,7 +37,7 @@ class UPnP:
return cls.delete_port_mapping.__annotations__, cls.delete_port_mapping.__doc__ return cls.delete_port_mapping.__annotations__, cls.delete_port_mapping.__doc__
if command == "get_next_mapping": if command == "get_next_mapping":
return cls.get_next_mapping.__annotations__, cls.get_next_mapping.__doc__ return cls.get_next_mapping.__annotations__, cls.get_next_mapping.__doc__
raise AttributeError(command) # pragma: no cover raise AttributeError(command)
@staticmethod @staticmethod
def get_lan_and_gateway(lan_address: str = '', gateway_address: str = '', def get_lan_and_gateway(lan_address: str = '', gateway_address: str = '',
@ -107,7 +107,7 @@ class UPnP:
return await self.gateway.commands.GetExternalIPAddress() return await self.gateway.commands.GetExternalIPAddress()
async def add_port_mapping(self, external_port: int, protocol: str, internal_port: int, lan_address: str, async def add_port_mapping(self, external_port: int, protocol: str, internal_port: int, lan_address: str,
description: str, lease_time: int = 0) -> None: description: str) -> None:
""" """
Add a new port mapping Add a new port mapping
@ -116,13 +116,12 @@ class UPnP:
:param internal_port: (int) internal port :param internal_port: (int) internal port
:param lan_address: (str) internal lan address :param lan_address: (str) internal lan address
:param description: (str) mapping description :param description: (str) mapping description
:param lease_time: (int) lease time in seconds
:return: None :return: None
""" """
await self.gateway.commands.AddPortMapping( await self.gateway.commands.AddPortMapping(
NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol, NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol,
NewInternalPort=internal_port, NewInternalClient=lan_address, NewInternalPort=internal_port, NewInternalClient=lan_address,
NewEnabled=1, NewPortMappingDescription=description, NewLeaseDuration=str(lease_time) NewEnabled=1, NewPortMappingDescription=description, NewLeaseDuration='0'
) )
return None return None
@ -209,7 +208,7 @@ class UPnP:
return None return None
async def get_next_mapping(self, port: int, protocol: str, description: str, async def get_next_mapping(self, port: int, protocol: str, description: str,
internal_port: Optional[int] = None, lease_time: int = 0) -> int: internal_port: Optional[int] = None) -> int:
""" """
Get a new port mapping. If the requested port is not available, increment until the next free port is mapped Get a new port mapping. If the requested port is not available, increment until the next free port is mapped
@ -217,7 +216,6 @@ class UPnP:
:param protocol: (str) UDP | TCP :param protocol: (str) UDP | TCP
:param description: (str) mapping description :param description: (str) mapping description
:param internal_port: (int) internal port :param internal_port: (int) internal port
:param lease_time: (int) lease time in seconds
:return: (int) mapped port :return: (int) mapped port
""" """
@ -237,7 +235,7 @@ class UPnP:
if int_host == self.lan_address and int_port == requested_port and desc == description: if int_host == self.lan_address and int_port == requested_port and desc == description:
return port return port
port += 1 port += 1
await self.add_port_mapping(port, protocol, _internal_port, self.lan_address, description, lease_time) await self.add_port_mapping(port, protocol, _internal_port, self.lan_address, description)
return port return port
async def gather_debug_info(self) -> str: # pragma: no cover async def gather_debug_info(self) -> str: # pragma: no cover
@ -262,18 +260,15 @@ class UPnP:
await self.get_redirects() await self.get_redirects()
except UPnPError: except UPnPError:
pass pass
external_port = 0
made_mapping = False
try: try:
external_port = await self.get_next_mapping(1234, 'TCP', 'aioupnp testing') external_port = await self.get_next_mapping(1234, 'TCP', 'aioupnp testing')
made_mapping = True
except UPnPError: except UPnPError:
pass external_port = None
try: try:
await self.get_redirects() await self.get_redirects()
except UPnPError: except UPnPError:
pass pass
if made_mapping: if external_port:
try: try:
await self.delete_port_mapping(external_port, 'TCP') await self.delete_port_mapping(external_port, 'TCP')
except UPnPError: except UPnPError:
@ -412,12 +407,12 @@ def run_cli(method: str, igd_args: Dict[str, Union[bool, str, int]], lan_address
u = await UPnP.discover( u = await UPnP.discover(
lan_address, gateway_address, timeout, igd_args, interface_name, loop=loop lan_address, gateway_address, timeout, igd_args, interface_name, loop=loop
) )
except UPnPError as err: # pragma: no cover except UPnPError as err:
fut.set_exception(err) fut.set_exception(err)
return return
if method not in cli_commands: if method not in cli_commands:
fut.set_exception(UPnPError("\"%s\" is not a recognized command" % method)) # pragma: no cover fut.set_exception(UPnPError("\"%s\" is not a recognized command" % method))
return # pragma: no cover return
else: else:
fn = getattr(u, method) fn = getattr(u, method)
@ -426,7 +421,8 @@ def run_cli(method: str, igd_args: Dict[str, Union[bool, str, int]], lan_address
fut.set_result(result) fut.set_result(result)
except UPnPError as err: except UPnPError as err:
fut.set_exception(err) fut.set_exception(err)
except Exception as err: # pragma: no cover
except Exception as err:
log.exception("uncaught error") log.exception("uncaught error")
fut.set_exception(UPnPError("uncaught error: %s" % str(err))) fut.set_exception(UPnPError("uncaught error: %s" % str(err)))

View file

@ -1,5 +1,3 @@
import sys
import ipaddress
import typing import typing
from collections import OrderedDict from collections import OrderedDict
@ -49,21 +47,3 @@ def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyS
matched_key: typing.AnyStr = match[0] matched_key: typing.AnyStr = match[0]
return source[matched_key] return source[matched_key]
raise KeyError("overlapping keys") raise KeyError("overlapping keys")
# the ipaddress module does not show these subnets as reserved
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
def is_valid_public_ipv4(address):
try:
parsed_ip = ipaddress.ip_address(address)
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private, parsed_ip.is_reserved)):
return False
else:
return not any((CARRIER_GRADE_NAT_SUBNET.overlaps(ipaddress.ip_network(f"{address}/32")),
IPV4_TO_6_RELAY_SUBNET.overlaps(ipaddress.ip_network(f"{address}/32"))))
except (ipaddress.AddressValueError, ValueError):
return False

View file

@ -1,511 +0,0 @@
import os
import sys
import socket
import ctypes
import contextlib
import struct
import binascii
import enum
import re
import time
import base64
import ssl
import asyncio
import codecs
import typing
import json
from ctypes import c_char, c_short
from typing import Tuple
import aioupnp
from aioupnp.upnp import UPnP, UPnPError, get_gateway_and_lan_addresses
from aioupnp.constants import SSDP_IP_ADDRESS
import certifi
import aiohttp
import miniupnpc
_IFF_PROMISC = 0x0100
_SIOCGIFFLAGS = 0x8913 # get the active flags
_SIOCSIFFLAGS = 0x8914 # set the active flags
_ETH_P_ALL = 0x0003 # all protocols
ETHER_HEADER_LEN = 6 + 6 + 2
VLAN_HEADER_LEN = 2
printable = re.compile(b"([a-z0-9!\"#$%&'()*+,.\/:;<=>?@\[\] ^_`{|}~-]*)")
class PacketTypes(enum.Enum): # if_packet.h
HOST = 0
BROADCAST = 1
MULTICAST = 2
OTHERHOST = 3
OUTGOING = 4
LOOPBACK = 5
FASTROUTE = 6
class Layer2(enum.Enum): # https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
IPv4 = 0x0800
ARP = 0x0806
VLAN = 0x8100
MVRP = 0x88f5
MMRP = 0x88f6
IPv6 = 0x86dd
GRE = 0xb7ea
class Layer3(enum.Enum): # https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
ICMP = 1
IGMP = 2
TCP = 6
UDP = 17
class _ifreq(ctypes.Structure):
_fields_ = [("ifr_ifrn", c_char * 16),
("ifr_flags", c_short)]
@contextlib.contextmanager
def _promiscuous_posix_socket_context(interface: str):
import fcntl # posix-only
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(_ETH_P_ALL))
ifr = _ifreq()
ifr.ifr_ifrn = interface.encode()[:16]
fcntl.ioctl(sock, _SIOCGIFFLAGS, ifr) # get the flags
ifr.ifr_flags |= _IFF_PROMISC # add the promiscuous flag
fcntl.ioctl(sock, _SIOCSIFFLAGS, ifr) # update
sock.setblocking(False)
try:
yield sock
finally:
ifr.ifr_flags ^= _IFF_PROMISC # mask it off (remove)
fcntl.ioctl(sock, _SIOCSIFFLAGS, ifr) # update
print("closed posix promiscuous socket")
@contextlib.contextmanager
def _promiscuous_non_posix_socket_context():
# the public network interface
HOST = socket.gethostbyname(socket.gethostname())
# create a raw socket and bind it to the public interface
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
# prevent socket from being left in TIME_WAIT state, enabling reuse
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, 0))
# Include IP headers
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
# receive all packages
sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
sock.setblocking(False)
try:
yield sock
finally:
# disable promiscuous mode
sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
print("closed non-posix promiscuous socket")
def promiscuous(interface: typing.Optional[str] = None) -> typing.ContextManager[socket.socket]:
if os.name == 'posix':
return _promiscuous_posix_socket_context(interface)
return _promiscuous_non_posix_socket_context()
def ipv4_to_str(addr: bytes) -> str:
return ".".join((str(b) for b in addr))
def pretty_mac(mac: bytes) -> str:
return ":".join((('0' if b < 16 else '') + hex(b)[2:] for b in mac))
def split_byte(b: int, bit=4) -> Tuple[bytes, bytes]:
return chr(((b >> (8-bit)) % 256) << (8-bit) >> (8-bit)).encode(), chr(((b << bit) % 256) >> bit).encode()
class EtherFrame:
__slots__ = [
'source_mac',
'target_mac',
'ether_type',
'vlan_id',
'tpid'
]
def __init__(self, source_mac: bytes, target_mac: bytes, ether_type: int,
vlan_id: typing.Optional[int] = None, tpid: typing.Optional[int] = None):
self.source_mac = source_mac
self.target_mac = target_mac
self.ether_type = ether_type
self.vlan_id = vlan_id
self.tpid = tpid
def encode(self) -> bytes:
if self.vlan_id is None:
return struct.pack("6s6sH", *(getattr(self, slot) for slot in self.__slots__[:-2]))
return struct.pack("6s6sHHH", *(getattr(self, slot) for slot in self.__slots__))
@classmethod
def decode(cls, packet: bytes) -> Tuple['EtherFrame', bytes]:
vlan_id = None
tpid = None
if struct.unpack(f'!H', packet[12:14])[0] == Layer2.VLAN.value:
target_mac, source_mac, tpid, vlan_id, ether_type, data = struct.unpack(f'!6s6sHHH{len(packet) - ETHER_HEADER_LEN - VLAN_HEADER_LEN}s', packet)
else:
target_mac, source_mac, ether_type, data = struct.unpack(f'!6s6sH{len(packet) - ETHER_HEADER_LEN}s', packet)
return cls(source_mac, target_mac, ether_type, vlan_id, tpid), data
def debug(self) -> str:
if self.vlan_id is None:
return f"EtherFrame(source={pretty_mac(self.source_mac)}, target={pretty_mac(self.target_mac)}, " \
f"ether_type={Layer2(self.ether_type).name})"
return f"EtherFrame(source={pretty_mac(self.source_mac)}, target={pretty_mac(self.target_mac)}, " \
f"ether_type={Layer2(self.ether_type).name}, vlan={self.vlan_id})"
class IPv4Packet:
__slots__ = [
'ether_frame',
'version',
'header_length',
'dscp',
'ecn',
'total_length',
'identification',
'df',
'mf',
'flag',
'fragment_offset',
'ttl',
'protocol',
'header_checksum',
'_source_address',
'_destination_address',
'data',
'packet_type',
'interface'
]
ETHER_TYPE = Layer2.IPv4
def __init__(self, ether_frame: EtherFrame, version: int, header_length: int, dscp: int, ecn: int,
total_length: int, identification: int, mf: bool, df: bool, flag: bool, fragment_offset: int, ttl: int,
protocol: int, header_checksum: int, source_address: bytes, destination_address: bytes, data: bytes,
packet_type: int, interface: str):
self.ether_frame = ether_frame
self.version = version
self.header_length = header_length
self.dscp = dscp
self.ecn = ecn
self.total_length = total_length
self.identification = identification
self.mf = mf
self.df = df
self.flag = flag
self.fragment_offset = fragment_offset
self.ttl = ttl
self.protocol = Layer3(protocol)
self.header_checksum = header_checksum
self._source_address = source_address
self._destination_address = destination_address
self.data = data
self.packet_type = PacketTypes(packet_type)
self.interface = interface
@property
def source(self) -> str:
return ipv4_to_str(self._source_address)
@property
def destination(self) -> str:
return ipv4_to_str(self._destination_address)
@staticmethod
def checksum(header: bytes) -> int:
c = 0
for i in range(0, len(header), 2):
c += int.from_bytes(header[i:i + 2], 'big')
while c > 0xffff:
c %= 0xffff
while c > 0xffff:
c %= 0xffff
return c ^ 0xffff
def get_header(self) -> bytes:
version_and_hlen = (self.version << 4) + self.header_length
dscp_and_ecn = (self.dscp << 2) + self.ecn
flags = (4 if self.flag else 0) + (2 if self.df else 0) + (1 if self.mf else 0)
df_mf_and_fragment = (flags << 12) + self.fragment_offset
return struct.pack(
'!BBHHHBBH4s4s', version_and_hlen, dscp_and_ecn, self.total_length,
self.identification, df_mf_and_fragment, self.ttl, self.protocol.value,
self.header_checksum, self._source_address, self._destination_address
)
@classmethod
def decode(cls, ether_frame: EtherFrame, packet: bytes, packet_type: int, interface: str) -> 'IPv4Packet':
if cls.checksum(packet[:20]):
raise ValueError(f'\nipv4 checksum failed, frame: {ether_frame.debug()}\n'
f'packet: {binascii.hexlify(packet).decode()}, checksum: {hex(cls.checksum(packet[:20]))}')
data_len = len(packet) - 20
version_and_hlen, dscp_and_ecn, tlen, ident, df_mf_and_fragment, ttl, proto, checksum, source, dest = \
struct.unpack(
f'!BBHHHBBH4s4s', packet[:20]
)
version, hlen = split_byte(version_and_hlen)
flags = df_mf_and_fragment >> 13
mask = (flags << 13) | df_mf_and_fragment
fragment = mask ^ df_mf_and_fragment
flag, df, mf = False, False, False
if flags % 2:
mf = True
flags -= 1
if flags % 2:
df = True
flags -= 2
if flags % 4:
flag = True
flags -= 4
dscp, ecn = split_byte(dscp_and_ecn, 6)
return cls(
ether_frame, ord(version), ord(hlen), ord(dscp), ord(ecn), tlen, ident, mf, df, flag, fragment, ttl,
proto, checksum, source, dest, packet[20:], packet_type, interface
)
def encode(self) -> bytes:
return self.ether_frame.encode() + self.get_header() + self.data
@property
def printable_data(self) -> str:
return b".".join(printable.findall(self.data)).decode()
def __repr__(self) -> str:
return f"IPv4(protocol={self.protocol.name}, " \
f"iface={self.interface}, " \
f"type={self.packet_type.name}, " \
f"source={ipv4_to_str(self._source_address)}, " \
f"destination={ipv4_to_str(self._destination_address)}, " \
f"data_len={len(self.data)})"
def make_filter(l3_protocol=None, src=None, dst=None, invert=False):
def filter_packet(packet: IPv4Packet):
if l3_protocol and not Layer3(packet.protocol) == l3_protocol:
return False
if src and not packet.source == src:
return False
if dst and not packet.destination == dst:
return False
return True
if invert:
return lambda packet: not filter_packet(packet)
return filter_packet
async def sniff_ipv4(filters=None, kill=None):
start = time.perf_counter()
loop = asyncio.get_event_loop()
async def sock_recv(sock, n):
"""Receive data from the socket.
The return value is a bytes object representing the data received.
The maximum amount of data to be received at once is specified by
nbytes.
"""
if loop._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = loop.create_future()
_sock_recv(fut, None, sock, n)
return await fut
def _sock_recv(fut, registered_fd, sock, n):
# _sock_recv() can add itself as an I/O callback if the operation can't
# be done immediately. Don't use it directly, call sock_recv().
if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
loop.remove_reader(registered_fd)
if fut.cancelled():
return
try:
data, flags = sock.recvfrom(n)
except (BlockingIOError, InterruptedError):
fd = sock.fileno()
loop.add_reader(fd, _sock_recv, fut, fd, sock, n)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result((data, flags))
with promiscuous('lo') as sock:
while True:
if not kill:
data, flags = await sock_recv(sock, 9000)
else:
t = asyncio.create_task(sock_recv(sock, 9000))
await asyncio.wait([t, kill.wait()], return_when=asyncio.FIRST_COMPLETED)
if kill.is_set():
break
data, flags = await t
# https://stackoverflow.com/questions/42821309/how-to-interpret-result-of-recvfrom-raw-socket/45215859#45215859
if data:
try:
ether_frame, packet = EtherFrame.decode(data)
if ether_frame.ether_type == Layer2.IPv4.value:
interface, _, packet_type, _, _ = flags
ipv4 = IPv4Packet.decode(ether_frame, packet, packet_type, interface)
if not filters or any((f(ipv4) for f in filters)):
yield time.perf_counter() - start, ipv4
except ValueError:
pass
async def main():
loop = asyncio.get_event_loop()
gateway, lan = get_gateway_and_lan_addresses('default')
done = asyncio.Event()
def discover_aioupnp():
async def _discover():
print("testing aioupnp")
try:
u = await UPnP.discover()
print("successfully detected router with aioupnp")
try:
await u.get_external_ip()
print("successfully detected external ip with aioupnp")
except UPnPError:
print("failed to detect external ip with aioupnp")
try:
await u.get_redirects()
print("successfully detected redirects with aioupnp")
except UPnPError:
print("failed to get redirects with aioupnp")
try:
external_port = await u.get_next_mapping(1234, 'TCP', 'aioupnp testing')
print("successfully set redirect with aioupnp")
except UPnPError:
print("failed to set redirect with aioupnp")
external_port = None
try:
await u.get_redirects()
print("successfully detected redirects with aioupnp")
except UPnPError:
print("failed to get redirects with aioupnp")
if external_port:
try:
print("successfully removed redirect with aioupnp")
await u.delete_port_mapping(external_port, 'TCP')
except UPnPError:
print("failed to delete redirect with aioupnp")
try:
await u.get_redirects()
print("successfully detected redirects with aioupnp")
except UPnPError:
print("failed to get redirects with aioupnp")
except UPnPError:
print("failed to discover router with aioupnp")
finally:
print("done with aioupnp test")
asyncio.create_task(_discover())
def discover_miniupnpc():
def _miniupnpc_discover():
try:
u = miniupnpc.UPnP()
except:
print("failed to create upnp object with miniupnpc")
return
try:
u.discover()
except:
print("failed to detect router with miniupnpc")
return
try:
u.selectigd()
print("successfully detected router with miniupnpc")
except:
print("failed to detect router with miniupnpc")
return
try:
u.externalipaddress()
print("successfully detected external ip with miniupnpc")
except:
print("failed to detect external ip with miniupnpc")
return
async def _discover():
print("testing miniupnpc")
try:
await loop.run_in_executor(None, _miniupnpc_discover)
finally:
done.set()
print("done with miniupnpc test")
asyncio.create_task(_discover())
loop.call_later(0, discover_aioupnp)
loop.call_later(8, discover_miniupnpc)
start = time.perf_counter()
packets = []
try:
async for (ts, ipv4_packet) in sniff_ipv4([
make_filter(l3_protocol=Layer3.UDP, src=SSDP_IP_ADDRESS),
make_filter(l3_protocol=Layer3.UDP, dst=SSDP_IP_ADDRESS),
make_filter(l3_protocol=Layer3.UDP, src=lan, dst=gateway),
make_filter(l3_protocol=Layer3.UDP, src=gateway, dst=lan),
make_filter(l3_protocol=Layer3.TCP, src=lan, dst=gateway),
make_filter(l3_protocol=Layer3.TCP, src=gateway, dst=lan)], done):
packets.append(
(time.perf_counter() - start, ipv4_packet.packet_type.name,
ipv4_packet.source, ipv4_packet.destination, base64.b64encode(ipv4_packet.data).decode())
)
except KeyboardInterrupt:
print("stopping")
finally:
with open("aioupnp-bug-report.json", "w") as cap_file:
cap_file.write(json.dumps(packets))
print(f"Wrote bug report: {os.path.abspath('aioupnp-bug-report.json')}")
print("Sending bug report")
ssl_ctx = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH, capath=certifi.where()
)
auth = aiohttp.BasicAuth(
base64.b64decode(codecs.encode('Ax5LZzR1o3q3Z3WjATASDwR5rKyHH0qOIRIbLmMXn2H=', 'rot_13')).decode(), ''
)
report_id = base64.b64encode(os.urandom(16)).decode()
async with aiohttp.ClientSession() as session:
for i, (ts, direction, source, destination, packet) in enumerate(packets):
post = {
'userId': report_id,
'event': 'aioupnp bug report',
'context': {
'library': {
'name': 'aioupnp',
'version': aioupnp.__version__
}
},
'properties': {
'sequence': i,
'ts': ts,
'direction': direction,
'source': source,
'destination': destination,
'packet': packet
},
}
async with session.request(method='POST', url='https://api.segment.io/v1/track',
headers={'Connection': 'Close'}, auth=auth, json=post, ssl=ssl_ctx):
sys.stdout.write(f"\r{'.' * i}")
sys.stdout.write("\n")
print("Successfully sent bug report, thanks for your contribution!")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -18,7 +18,7 @@ except ImportError:
@contextlib.contextmanager @contextlib.contextmanager
def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_reply=0.0, sent_udp_packets=None, def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_reply=0.0, sent_udp_packets=None,
tcp_replies=None, tcp_delay_reply=0.0, sent_tcp_packets=None, add_potato_datagrams=False, tcp_replies=None, tcp_delay_reply=0.0, sent_tcp_packets=None, add_potato_datagrams=False,
raise_oserror_on_bind=False, raise_connectionerror=False, tcp_chunk_size=100): raise_oserror_on_bind=False, raise_connectionerror=False):
sent_udp_packets = sent_udp_packets if sent_udp_packets is not None else [] sent_udp_packets = sent_udp_packets if sent_udp_packets is not None else []
udp_replies = udp_replies or {} udp_replies = udp_replies or {}
@ -36,8 +36,8 @@ def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_r
reply = tcp_replies[data] reply = tcp_replies[data]
i = 0 i = 0
while i < len(reply): while i < len(reply):
loop.call_later(tcp_delay_reply, p.data_received, reply[i:i+tcp_chunk_size]) loop.call_later(tcp_delay_reply, p.data_received, reply[i:i+100])
i += tcp_chunk_size i += 100
return return
else: else:
pass pass
@ -71,7 +71,6 @@ def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_r
loop.call_later(udp_delay_reply, p.datagram_received, udp_replies[(data, addr)], loop.call_later(udp_delay_reply, p.datagram_received, udp_replies[(data, addr)],
(udp_expected_addr, 1900)) (udp_expected_addr, 1900))
return _sendto return _sendto
protocol = proto_lam() protocol = proto_lam()

View file

@ -31,28 +31,6 @@ class TestSCPDGet(AsyncioTestCase):
b"\r\n" \ b"\r\n" \
b"%s" % bad_xml b"%s" % bad_xml
bad_response2 = b"HTTP/1.1 200 OK\r\n" \
b"CONTENT-TYPE: text/xml\r\n" \
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
b"LAST-MODIFIED: Fri, 28 Sep 2018 18:35:48 GMT\r\n" \
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
b"X-User-Agent: redsonic\r\n" \
b"CONNECTION: close\r\n" \
b"\r\n" \
b"%s" % (b'.' * 65300)
bad_response3 = b"HTTP/1.1 200 OK\r\n" \
b"CONTENT-TYPE: text/xml\r\n" \
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
b"LAST-MODIFIED: Fri, 28 Sep 2018 18:35:48 GMT\r\n" \
b"CONTENT-TYPE: text/xml\r\n" \
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
b"X-User-Agent: redsonic\r\n" \
b"CONNECTION: close\r\n" \
b"\r\n" \
b"<?xml version=\"1.0\"?>\n<root xmlns=\"urn:schemas-upnp-org:device-1-0\">\n<specVersion>\n<major>1</major>\n<minor>0</minor>\n</specVersion>\n<device>\n<deviceType>urn:schemas-upnp-org:device:InternetGatewayDevice:1</deviceType>\n<friendlyName>CGA4131COM</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:11111111-2222-3333-4444-555555555556</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:Layer3Forwarding:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:L3Forwarding1</serviceId>\n<SCPDURL>/Layer3ForwardingSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/Layer3Forwarding</controlURL>\n<eventSubURL>/upnp/event/Layer3Forwarding</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n<device>\n<deviceType>urn:schemas-upnp-org:device:WANDevice:1</deviceType>\n<friendlyName>WANDevice:1</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:ebf5a0a0-1dd1-11b2-a92f-603d266f9915</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:WANCommonIFC1</serviceId>\n<SCPDURL>/WANCommonInterfaceConfigSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/WANCommonInterfaceConfig0</controlURL>\n<eventSubURL>/upnp/event/WANCommonInterfaceConfig0</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n <device>\n <deviceType>urn:schemas-upnp-org:device:WANConnectionDevice:1</deviceType>\n <friendlyName>WANConnectionDevice:1</friendlyName>\n <manufacturer>Cisco</manufacturer>\n <manufacturerURL>http://www.cisco.com/</manufacturerURL>\n <modelDescription>CGA4131COM</modelDescription>\n <modelName>CGA4131COM</modelName>\n <modelNumber>CGA4131COM</modelNumber>\n <modelURL>http://www.cisco.com</modelURL>\n <serialNumber></serialNumber>\n <UDN>uuid:11111111-2222-3333-4444-555555555555</UDN>\n <UPC>CGA4131COM</UPC>\n <serviceList>\n <service>\n <serviceType>urn:schemas-upnp-org:service:WANIPConnection:1</serviceType>\n <serviceId>urn:upnp-org:serviceId:WANIPConn1</serviceId>\n <SCPDURL>/WANIPConnectionServiceSCPD.xml</SCPDURL>\n <controlURL>/upnp/control/WANIPConnection0</controlURL>\n <eventSubURL>/upnp/event/WANIPConnection0</eventSubURL>\n </service>\n </serviceList>\n </device>\n</deviceList>\n</device>\n</deviceList>\n<presentationURL>http://10.1.10.1/</presentationURL></device>\n</root>\n"
expected_parsed = { expected_parsed = {
'specVersion': {'major': '1', 'minor': '0'}, 'specVersion': {'major': '1', 'minor': '0'},
'device': { 'device': {
@ -162,26 +140,6 @@ class TestSCPDGet(AsyncioTestCase):
self.assertIsInstance(err, UPnPError) self.assertIsInstance(err, UPnPError)
self.assertTrue(str(err).startswith('too many bytes written')) self.assertTrue(str(err).startswith('too many bytes written'))
async def test_scpd_get_overrun_unspecified_content_length(self):
sent = []
replies = {self.get_request: self.bad_response2}
with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent):
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
self.assertDictEqual({}, result)
self.assertEqual(self.bad_response2.decode(), raw.decode())
self.assertIsInstance(err, UPnPError)
self.assertTrue(str(err).endswith('with unspecified content length'))
async def test_scpd_duplicate_header(self):
sent = []
replies = {self.get_request: self.bad_response3}
with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent):
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
self.assertDictEqual({}, result)
self.assertTrue(self.bad_response3.startswith(raw))
self.assertIsInstance(err, UPnPError)
self.assertEqual("duplicate headers", str(err))
class TestSCPDPost(AsyncioTestCase): class TestSCPDPost(AsyncioTestCase):
param_names: list = [] param_names: list = []
@ -244,7 +202,7 @@ class TestSCPDPost(AsyncioTestCase):
self.assertEqual(b'', raw) self.assertEqual(b'', raw)
self.assertDictEqual({}, result) self.assertDictEqual({}, result)
async def test_scpd_connection_error(self): async def test_scpd_post_connection_error(self):
sent = [] sent = []
replies = {} replies = {}
with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent, raise_connectionerror=True): with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent, raise_connectionerror=True):
@ -256,14 +214,6 @@ class TestSCPDPost(AsyncioTestCase):
self.assertEqual(b'', raw) self.assertEqual(b'', raw)
self.assertDictEqual({}, result) self.assertDictEqual({}, result)
result, raw, err = await scpd_get(
self.path, self.gateway_address, self.port
)
self.assertIsInstance(err, UPnPError)
self.assertEqual('ConnectionRefusedError()', str(err))
self.assertEqual(b'', raw)
self.assertDictEqual({}, result)
async def test_scpd_post_bad_xml_response(self): async def test_scpd_post_bad_xml_response(self):
sent = [] sent = []
replies = {self.post_bytes: self.bad_envelope_response} replies = {self.post_bytes: self.bad_envelope_response}

View file

@ -1,4 +1,3 @@
import json
from collections import OrderedDict from collections import OrderedDict
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
from aioupnp.protocols.m_search_patterns import packet_generator from aioupnp.protocols.m_search_patterns import packet_generator
@ -51,16 +50,6 @@ class TestSSDP(AsyncioTestCase):
with self.assertRaises(UPnPError): with self.assertRaises(UPnPError):
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop) await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop)
async def test_ssdp_pretty_print(self):
self.assertEqual(
json.dumps({
"HOST": "239.255.255.250:1900",
"MAN": "ssdp:discover",
"MX": 1,
"ST": "urn:schemas-upnp-org:device:WANDevice:1"
}, indent=2), str(self.query_packet)
)
async def test_m_search_reply_unicast(self): async def test_m_search_reply_unicast(self):
replies = { replies = {
(self.query_packet.encode().encode(), ("10.0.0.1", 1900)): self.reply_packet.encode().encode() (self.query_packet.encode().encode(), ("10.0.0.1", 1900)): self.reply_packet.encode().encode()

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -28,15 +28,6 @@ class TestSSDPDatagram(unittest.TestCase):
with self.assertRaises(UPnPError): with self.assertRaises(UPnPError):
SSDPDatagram.decode(packet) SSDPDatagram.decode(packet)
packet = \
b'M-SEARCH * HTTP/1.1\r\n' \
b'Host: 239.255.255.250:1900\r\n' \
b'MX: 5\r\n' \
b'\r\n'
with self.assertRaises(UPnPError):
SSDPDatagram.decode(packet)
def test_fail_to_decode_blank(self): def test_fail_to_decode_blank(self):
packet = b'' packet = b''

View file

@ -2,7 +2,7 @@ import unittest
from aioupnp.device import CaseInsensitive from aioupnp.device import CaseInsensitive
class _TestService(CaseInsensitive): class TestService(CaseInsensitive):
serviceType = None serviceType = None
serviceId = None serviceId = None
controlURL = None controlURL = None
@ -12,14 +12,14 @@ class _TestService(CaseInsensitive):
class TestCaseInsensitive(unittest.TestCase): class TestCaseInsensitive(unittest.TestCase):
def test_initialize(self): def test_initialize(self):
s = _TestService( s = TestService(
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3" serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
) )
self.assertEqual('test', getattr(s, 'serviceType')) self.assertEqual('test', getattr(s, 'serviceType'))
self.assertEqual('test', getattr(s, 'servicetype')) self.assertEqual('test', getattr(s, 'servicetype'))
self.assertEqual('test', getattr(s, 'SERVICETYPE')) self.assertEqual('test', getattr(s, 'SERVICETYPE'))
s = _TestService( s = TestService(
servicetype="test", serviceid="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3" servicetype="test", serviceid="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
) )
self.assertEqual('test', getattr(s, 'serviceType')) self.assertEqual('test', getattr(s, 'serviceType'))
@ -35,7 +35,7 @@ class TestCaseInsensitive(unittest.TestCase):
}, s.as_dict()) }, s.as_dict())
def test_set_attr(self): def test_set_attr(self):
s = _TestService( s = TestService(
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3" serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
) )
self.assertEqual('test', getattr(s, 'serviceType')) self.assertEqual('test', getattr(s, 'serviceType'))

View file

@ -62,7 +62,7 @@ Get the external ip address from the gateway
""" """
expected_add_port_mapping_usage = """aioupnp [-h] [--debug_logging] add_port_mapping [--external_port=<int>] [--protocol=<str>] expected_add_port_mapping_usage = """aioupnp [-h] [--debug_logging] add_port_mapping [--external_port=<int>] [--protocol=<str>]
[--internal_port=<int>] [--lan_address=<str>] [--description=<str>] [--lease_time=<int>] [--internal_port=<int>] [--lan_address=<str>] [--description=<str>]
Add a new port mapping Add a new port mapping
@ -71,13 +71,12 @@ Add a new port mapping
:param internal_port: (int) internal port :param internal_port: (int) internal port
:param lan_address: (str) internal lan address :param lan_address: (str) internal lan address
:param description: (str) mapping description :param description: (str) mapping description
:param lease_time: (int) lease time in seconds
:return: None :return: None
""" """
expected_get_next_mapping_usage = """aioupnp [-h] [--debug_logging] get_next_mapping [--port=<int>] [--protocol=<str>] expected_get_next_mapping_usage = """aioupnp [-h] [--debug_logging] get_next_mapping [--port=<int>] [--protocol=<str>]
[--description=<str>] [--internal_port=<typing.Union[int, NoneType]>] [--lease_time=<int>] [--description=<str>] [--internal_port=<typing.Union[int, NoneType]>]
Get a new port mapping. If the requested port is not available, increment until the next free port is mapped Get a new port mapping. If the requested port is not available, increment until the next free port is mapped
@ -85,7 +84,6 @@ Get a new port mapping. If the requested port is not available, increment until
:param protocol: (str) UDP | TCP :param protocol: (str) UDP | TCP
:param description: (str) mapping description :param description: (str) mapping description
:param internal_port: (int) internal port :param internal_port: (int) internal port
:param lease_time: (int) lease time in seconds
:return: (int) mapped port :return: (int) mapped port

View file

@ -1,12 +1,8 @@
import os
import json
from collections import OrderedDict
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
from tests import AsyncioTestCase, mock_tcp_and_udp from tests import AsyncioTestCase, mock_tcp_and_udp
from collections import OrderedDict
from aioupnp.gateway import Gateway, get_action_list from aioupnp.gateway import Gateway, get_action_list
from aioupnp.serialization.ssdp import SSDPDatagram from aioupnp.serialization.ssdp import SSDPDatagram
from aioupnp.serialization.soap import serialize_soap_post
from aioupnp.upnp import UPnP
def gen_get_bytes(location: str, host: str) -> bytes: def gen_get_bytes(location: str, host: str) -> bytes:
@ -300,76 +296,3 @@ class TestDiscoverNetgearNighthawkAC2350(TestDiscoverDLinkDIR890L):
'RequestConnection', 'ForceTermination', 'RequestConnection', 'ForceTermination',
'GetStatusInfo', 'GetNATRSIPStatus']}, 'GetStatusInfo', 'GetNATRSIPStatus']},
'soap_requests': []} 'soap_requests': []}
class TestActiontec(AsyncioTestCase):
name = "Actiontec GT784WN"
_location_key = 'Location'
@property
def data_path(self):
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "replays", self.name)
def _get_location(self):
# return self.gateway_info['reply']['Location'].split(self.gateway_address)[-1]
return self.gateway_info['reply'][self._location_key].split(f"{self.gateway_address}:{self.gateway_info['soap_port']}")[-1]
def setUp(self) -> None:
with open(self.data_path, 'r') as f:
data = json.loads(f.read())
self.gateway_info = data['gateway']
self.client_address = data['client_address']
self.gateway_address = self.gateway_info['gateway_address']
self.udp_replies = {
(SSDPDatagram('M-SEARCH', self.gateway_info['m_search_args']).encode().encode(), ("239.255.255.250", 1900)): SSDPDatagram("OK", self.gateway_info['reply']).encode().encode()
}
self.tcp_replies = {
(
f"GET {path} HTTP/1.1\r\n"
f"Accept-Encoding: gzip\r\n"
f"Host: {self.gateway_info['gateway_address']}\r\n"
f"Connection: Close\r\n"
f"\r\n"
).encode(): xml_bytes.encode()
for path, xml_bytes in self.gateway_info['service_descriptors'].items()
}
self.tcp_replies.update({
(
f"GET {self._get_location()} HTTP/1.1\r\n"
f"Accept-Encoding: gzip\r\n"
f"Host: {self.gateway_info['gateway_address']}\r\n"
f"Connection: Close\r\n"
f"\r\n"
).encode(): self.gateway_info['gateway_xml'].encode()
})
self.registered_soap_commands = self.gateway_info['registered_soap_commands']
super().setUp()
async def setup_request_replay(self, u: UPnP):
for method, reqs in self.gateway_info['soap_requests'].items():
if not reqs:
continue
self.tcp_replies.update({
serialize_soap_post(
method, list(args.keys()), self.registered_soap_commands[method].encode(),
self.gateway_address.encode(), u.gateway.services[self.registered_soap_commands[method]].controlURL.encode()
): response.encode() for args, response in reqs
})
async def replay(self, u: UPnP):
self.assertEqual('11.222.33.111', await u.get_external_ip())
async def test_replay(self):
with mock_tcp_and_udp(self.loop, udp_replies=self.udp_replies, tcp_replies=self.tcp_replies, udp_expected_addr=self.gateway_address, tcp_chunk_size=1450):
u = await UPnP.discover(lan_address=self.client_address, gateway_address=self.gateway_address, loop=self.loop)
await self.setup_request_replay(u)
await self.replay(u)
class TestNewMediaNet(TestActiontec):
name = "NewMedia-NET GmbH Generic X86"
async def replay(self, u: UPnP):
self.assertEqual('11.222.33.111', await u.get_external_ip())
await u.get_redirects()
# print(await u.get_next_mapping(4567, 'UDP', 'aioupnp test mapping'))

View file

@ -1,6 +1,4 @@
from unittest import mock from unittest import mock
from collections import OrderedDict
from aioupnp import interfaces
from aioupnp.fault import UPnPError from aioupnp.fault import UPnPError
from aioupnp.upnp import UPnP from aioupnp.upnp import UPnP
from tests import AsyncioTestCase from tests import AsyncioTestCase
@ -31,26 +29,22 @@ class mock_netifaces:
@staticmethod @staticmethod
def ifaddresses(interface): def ifaddresses(interface):
return { return {
17: [ "test0": {
{ 17: [
"addr": "01:02:03:04:05:06", {
"broadcast": "ff:ff:ff:ff:ff:ff" "addr": "01:02:03:04:05:06",
} "broadcast": "ff:ff:ff:ff:ff:ff"
], }
2: [ ],
{ 2: [
"addr": "192.168.1.2", {
"netmask": "255.255.255.0", "addr": "192.168.1.2",
"broadcast": "192.168.1.255" "netmask": "255.255.255.0",
} "broadcast": "192.168.1.255"
], }
} ],
},
}[interface]
class mock_netifaces_extra_interface(mock_netifaces):
@staticmethod
def interfaces():
return ['lo', 'test0', 'test1']
class TestParseInterfaces(AsyncioTestCase): class TestParseInterfaces(AsyncioTestCase):
@ -74,16 +68,3 @@ class TestParseInterfaces(AsyncioTestCase):
else: else:
self.assertTrue(False) self.assertTrue(False)
self.assertEqual(len(checked), 1) self.assertEqual(len(checked), 1)
def test_guess_gateway(self):
# handle edge case where netifaces gives more interfaces than it does gateways
with mock.patch('aioupnp.interfaces.get_netifaces') as patch:
patch.return_value = mock_netifaces_extra_interface
self.assertDictEqual(
OrderedDict(
[
('test0', ('192.168.1.1', '192.168.1.2')),
('test1', ('192.168.1.1', '192.168.1.2')),
('default', ('192.168.1.1', '192.168.1.2'))
]), interfaces.get_interfaces()
)

View file

@ -51,42 +51,6 @@ class TestGetExternalIPAddress(UPnPCommandTestCase):
external_ip = await upnp.get_external_ip() external_ip = await upnp.get_external_ip()
self.assertEqual("11.222.3.44", external_ip) self.assertEqual("11.222.3.44", external_ip)
async def test_handle_got_ipv6(self):
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 392 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>2a02:2e02:30c:db00:4a8d:36ff:feb0:5202</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
self.addCleanup(self.replies.pop, request)
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
await gateway.discover_commands()
upnp = UPnP(self.client_address, self.gateway_address, gateway)
with self.assertRaises(UPnPError) as err:
await upnp.get_external_ip()
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
async def test_handle_got_invalid_ip(self):
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 360 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>potato</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
self.addCleanup(self.replies.pop, request)
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
await gateway.discover_commands()
upnp = UPnP(self.client_address, self.gateway_address, gateway)
with self.assertRaises(UPnPError) as err:
await upnp.get_external_ip()
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
async def test_handle_got_carrier_nat_ip(self):
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 365 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>192.88.99.2</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
self.addCleanup(self.replies.pop, request)
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
await gateway.discover_commands()
upnp = UPnP(self.client_address, self.gateway_address, gateway)
with self.assertRaises(UPnPError) as err:
await upnp.get_external_ip()
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
async def test_null_external_ip(self): async def test_null_external_ip(self):
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n' request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
self.replies.update({ self.replies.update({