Compare commits
1 commit
master
...
fix-motoro
Author | SHA1 | Date | |
---|---|---|---|
|
1e50335429 |
28 changed files with 217 additions and 1228 deletions
31
.github/workflows/pypi-publish.yml
vendored
31
.github/workflows/pypi-publish.yml
vendored
|
@ -1,31 +0,0 @@
|
||||||
# This workflows will upload a Python Package using Twine when a release is created
|
|
||||||
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
|
|
||||||
|
|
||||||
name: pypi release
|
|
||||||
|
|
||||||
on:
|
|
||||||
release:
|
|
||||||
types: [created]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
deploy:
|
|
||||||
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
- name: Set up Python
|
|
||||||
uses: actions/setup-python@v2
|
|
||||||
with:
|
|
||||||
python-version: '3.6'
|
|
||||||
- name: Install dependencies
|
|
||||||
run: |
|
|
||||||
python -m pip install --upgrade pip
|
|
||||||
pip install setuptools wheel twine
|
|
||||||
- name: Build and publish
|
|
||||||
env:
|
|
||||||
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
|
|
||||||
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
|
|
||||||
run: |
|
|
||||||
python setup.py sdist
|
|
||||||
twine upload dist/*
|
|
53
.github/workflows/python-package.yml
vendored
53
.github/workflows/python-package.yml
vendored
|
@ -1,53 +0,0 @@
|
||||||
name: CI
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches: [ master ]
|
|
||||||
pull_request:
|
|
||||||
branches: [ master ]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
code-quality:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
python-version: [3.8]
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
- name: Set up Python ${{ matrix.python-version }}
|
|
||||||
uses: actions/setup-python@v2
|
|
||||||
with:
|
|
||||||
python-version: ${{ matrix.python-version }}
|
|
||||||
- name: Install dependencies
|
|
||||||
run: |
|
|
||||||
python -m pip install --upgrade pip
|
|
||||||
pip install mypy lxml flake8
|
|
||||||
pip install -e .
|
|
||||||
- name: Lint with flake8
|
|
||||||
run: |
|
|
||||||
# stop the build if there are Python syntax errors or undefined names
|
|
||||||
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
|
|
||||||
- name: mypy
|
|
||||||
run: |
|
|
||||||
mypy aioupnp
|
|
||||||
|
|
||||||
tests:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
strategy:
|
|
||||||
matrix:
|
|
||||||
python-version: [3.6, 3.7, 3.8]
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
- name: Set up Python ${{ matrix.python-version }}
|
|
||||||
uses: actions/setup-python@v2
|
|
||||||
with:
|
|
||||||
python-version: ${{ matrix.python-version }}
|
|
||||||
- name: Install dependencies
|
|
||||||
run: |
|
|
||||||
python -m pip install --upgrade pip
|
|
||||||
pip install coverage
|
|
||||||
pip install -e .
|
|
||||||
- name: Test with coverage
|
|
||||||
run: |
|
|
||||||
coverage run -m unittest discover -v tests
|
|
||||||
bash <(curl -s https://codecov.io/bash)
|
|
85
.travis.yml
Normal file
85
.travis.yml
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
sudo: required
|
||||||
|
dist: xenial
|
||||||
|
language: python
|
||||||
|
python: "3.8"
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
include:
|
||||||
|
- stage: code quality
|
||||||
|
name: "mypy"
|
||||||
|
before_install:
|
||||||
|
- pip install mypy lxml
|
||||||
|
- pip install -e .
|
||||||
|
|
||||||
|
script:
|
||||||
|
- mypy aioupnp --txt-report . --scripts-are-modules; cat index.txt; rm index.txt
|
||||||
|
|
||||||
|
- &tests
|
||||||
|
stage: test
|
||||||
|
name: "Unit Tests w/ Python 3.8"
|
||||||
|
python: "3.8"
|
||||||
|
before_install:
|
||||||
|
- pip install pylint coverage
|
||||||
|
- pip install -e .
|
||||||
|
|
||||||
|
script:
|
||||||
|
- HOME=/tmp coverage run -m unittest discover -v tests
|
||||||
|
|
||||||
|
after_success:
|
||||||
|
- bash <(curl -s https://codecov.io/bash)
|
||||||
|
|
||||||
|
- <<: *tests
|
||||||
|
name: "Unit Tests w/ Python 3.7"
|
||||||
|
python: "3.7"
|
||||||
|
|
||||||
|
- <<: *tests
|
||||||
|
name: "Unit Tests w/ Python 3.6"
|
||||||
|
python: "3.6"
|
||||||
|
|
||||||
|
#
|
||||||
|
#if: tag IS present
|
||||||
|
# - &build
|
||||||
|
# name: "Linux Release"
|
||||||
|
# python: "3.6"
|
||||||
|
# install:
|
||||||
|
# - pip install pyinstaller
|
||||||
|
# - pip install -e .
|
||||||
|
#
|
||||||
|
# script:
|
||||||
|
# - pyinstaller -F -n aioupnp aioupnp/__main__.py
|
||||||
|
# - chmod +x dist/aioupnp
|
||||||
|
# - zip -j dist/aioupnp-${OS}.zip dist/aioupnp
|
||||||
|
#
|
||||||
|
# env: OS=linux
|
||||||
|
#
|
||||||
|
# addons:
|
||||||
|
# artifacts:
|
||||||
|
# working_dir: dist
|
||||||
|
# paths:
|
||||||
|
# - aioupnp-${OS}.zip
|
||||||
|
#
|
||||||
|
# - <<: *build
|
||||||
|
# name: "Mac Release"
|
||||||
|
# os: osx
|
||||||
|
# osx_image: xcode9.4
|
||||||
|
# language: generic
|
||||||
|
# env: OS=mac
|
||||||
|
# install:
|
||||||
|
# - pip3 install pyinstaller
|
||||||
|
# - pip3 install -e .
|
||||||
|
#
|
||||||
|
# - <<: *build
|
||||||
|
# name: "Windows Release"
|
||||||
|
# language: generic
|
||||||
|
# services:
|
||||||
|
# - docker
|
||||||
|
# install:
|
||||||
|
# - docker pull cdrx/pyinstaller-windows:python3-32bit
|
||||||
|
# script:
|
||||||
|
# - docker run -v "$(pwd):/src/aioupnp" cdrx/pyinstaller-windows:python3-32bit aioupnp/wine_build.sh
|
||||||
|
# - sudo zip -j dist/aioupnp-windows.zip dist/aioupnp.exe
|
||||||
|
# addons:
|
||||||
|
# artifacts:
|
||||||
|
# working_dir: dist
|
||||||
|
# paths:
|
||||||
|
# - aioupnp-windows.zip
|
37
README.md
37
README.md
|
@ -1,4 +1,4 @@
|
||||||
[![Build Status](https://github.com/lbryio/aioupnp/workflows/CI/badge.svg)
|
[![Build Status](https://travis-ci.org/lbryio/aioupnp.svg?branch=master)](https://travis-ci.org/lbryio/aioupnp)
|
||||||
[![codecov](https://codecov.io/gh/lbryio/aioupnp/branch/master/graph/badge.svg)](https://codecov.io/gh/lbryio/aioupnp)
|
[![codecov](https://codecov.io/gh/lbryio/aioupnp/branch/master/graph/badge.svg)](https://codecov.io/gh/lbryio/aioupnp)
|
||||||
[![PyPI version](https://badge.fury.io/py/aioupnp.svg)](https://badge.fury.io/py/aioupnp)
|
[![PyPI version](https://badge.fury.io/py/aioupnp.svg)](https://badge.fury.io/py/aioupnp)
|
||||||
[![Python 3.6](https://img.shields.io/badge/python-3.6-blue.svg)](https://www.python.org/downloads/release/python-360/)
|
[![Python 3.6](https://img.shields.io/badge/python-3.6-blue.svg)](https://www.python.org/downloads/release/python-360/)
|
||||||
|
@ -19,12 +19,12 @@ Verify python is version 3.6-8
|
||||||
python --version
|
python --version
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Installation for normal usage
|
Installation for normal usage
|
||||||
```
|
```
|
||||||
pip install aioupnp
|
pip install aioupnp
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Installation for development
|
Installation for development
|
||||||
```
|
```
|
||||||
git clone https://github.com/lbryio/aioupnp.git
|
git clone https://github.com/lbryio/aioupnp.git
|
||||||
cd aioupnp
|
cd aioupnp
|
||||||
|
@ -75,7 +75,7 @@ aioupnp [-h] [--debug_logging] [--interface=<interface>] [--gateway_address=<gat
|
||||||
aioupnp delete_port_mapping --external_port=1234 --protocol=TCP
|
aioupnp delete_port_mapping --external_port=1234 --protocol=TCP
|
||||||
|
|
||||||
#### M-Search headers
|
#### M-Search headers
|
||||||
UPnP uses a multicast protocol (SSDP) to locate the gateway. Gateway discovery is automatic by default, but you may provide specific headers for the search to use to override automatic discovery.
|
UPnP uses multicast protocol - SSDP - to locate the gateway. Gateway discovery is automatic by default, but you may provide specific headers for the search to use to override automatic discovery.
|
||||||
|
|
||||||
If m-search headers are provided as keyword arguments then all of the headers to be used must be provided, in the order they are to be used. For example:
|
If m-search headers are provided as keyword arguments then all of the headers to be used must be provided, in the order they are to be used. For example:
|
||||||
|
|
||||||
|
@ -109,33 +109,16 @@ By default, the network device will be automatically discovered. The interface m
|
||||||
|
|
||||||
|
|
||||||
## Troubleshooting
|
## Troubleshooting
|
||||||
If `aioupnp` is failing with m-search timeouts this means the UPnP gateway (the router) isn't being found at all. To see if this error is expected try running m_search with debug logging, which will print out the packets sent and received:
|
|
||||||
|
#### Debug logging
|
||||||
|
To enable verbose debug logging, add the `--debug_logging` argument before the command
|
||||||
|
|
||||||
aioupnp --debug_logging m_search
|
aioupnp --debug_logging m_search
|
||||||
|
|
||||||
If you only see packets being sent or the replies are only from devices that aren't your router (smart devices, speakers, etc), then there are three options:
|
#### It really doesn't work
|
||||||
* your router does not support UPnP (this is unlikely)
|
If aioupnp doesn't work with a device, a debugging report can be collected with `aioupnp gather_debug_info`.
|
||||||
* UPnP is turned off in the web gui for your router (more likely)
|
|
||||||
* `aioupnp` has a bug (very likely if you don't see your router manufacturer doing well in the supported devices table)
|
|
||||||
|
|
||||||
If you see replies from the router but it still fails, then it's likely a bug in aioupnp.
|
This will attempt to discover the UPnP gateway, and then perform a functionality check where it will request the external address and existing port mappings before attempting to make and remove a port mapping. The final result is the zipped packet dump of these attempts, which allows writing tests replaying it.
|
||||||
|
|
||||||
If there are no replies and UPnP is certainly turned on, then a local firewall is the likely culprit.
|
|
||||||
|
|
||||||
|
|
||||||
## Sending a bug report
|
|
||||||
|
|
||||||
If it still doesn't work, you can send a bug report using an included script. This script will try finding the UPnP gateway using `aioupnp` as well as `miniupnpc` and then try add and remove a port mapping using each library. The script does this while capturing the packets sent/received, which makes figuring out what went wrong possible. The script will create a file with this packet capture (`aioupnp-bug-report.json`) and automatically send it.
|
|
||||||
|
|
||||||
Note: the bug report script currently does not work on MacOS
|
|
||||||
```
|
|
||||||
git clone https://github.com/lbryio/aioupnp.git
|
|
||||||
cd aioupnp
|
|
||||||
python3 -m pip install -e .
|
|
||||||
|
|
||||||
python3 -m pip install --user certifi aiohttp miniupnpc
|
|
||||||
sudo -E python3 generate_bug_report.py
|
|
||||||
```
|
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
__version__ = "0.0.18"
|
__version__ = "0.0.17"
|
||||||
__author__ = "Jack Robison"
|
__author__ = "Jack Robison"
|
||||||
__maintainer__ = "Jack Robison"
|
__maintainer__ = "Jack Robison"
|
||||||
__license__ = "MIT"
|
__license__ = "MIT"
|
||||||
|
|
|
@ -121,4 +121,4 @@ def main(argv: typing.Optional[typing.List[typing.Optional[str]]] = None,
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(main()) # pragma: no cover
|
sys.exit(main())
|
||||||
|
|
|
@ -5,7 +5,6 @@ import logging
|
||||||
from aioupnp.protocols.scpd import scpd_post
|
from aioupnp.protocols.scpd import scpd_post
|
||||||
from aioupnp.device import Service
|
from aioupnp.device import Service
|
||||||
from aioupnp.fault import UPnPError
|
from aioupnp.fault import UPnPError
|
||||||
from aioupnp.util import is_valid_public_ipv4
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -135,7 +134,7 @@ class SOAPCommands:
|
||||||
|
|
||||||
def is_registered(self, name: str) -> bool:
|
def is_registered(self, name: str) -> bool:
|
||||||
if name not in self.SOAP_COMMANDS:
|
if name not in self.SOAP_COMMANDS:
|
||||||
raise ValueError("unknown command") # pragma: no cover
|
raise ValueError("unknown command")
|
||||||
for service in self._registered.values():
|
for service in self._registered.values():
|
||||||
if name in service:
|
if name in service:
|
||||||
return True
|
return True
|
||||||
|
@ -143,11 +142,11 @@ class SOAPCommands:
|
||||||
|
|
||||||
def get_service(self, name: str) -> Service:
|
def get_service(self, name: str) -> Service:
|
||||||
if name not in self.SOAP_COMMANDS:
|
if name not in self.SOAP_COMMANDS:
|
||||||
raise ValueError("unknown command") # pragma: no cover
|
raise ValueError("unknown command")
|
||||||
for service, commands in self._registered.items():
|
for service, commands in self._registered.items():
|
||||||
if name in commands:
|
if name in commands:
|
||||||
return service
|
return service
|
||||||
raise ValueError(name) # pragma: no cover
|
raise ValueError(name)
|
||||||
|
|
||||||
def _register_soap_wrapper(self, name: str) -> None:
|
def _register_soap_wrapper(self, name: str) -> None:
|
||||||
annotations: typing.Dict[str, typing.Any] = typing.get_type_hints(getattr(self, name))
|
annotations: typing.Dict[str, typing.Any] = typing.get_type_hints(getattr(self, name))
|
||||||
|
@ -174,7 +173,7 @@ class SOAPCommands:
|
||||||
self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, result, None, time.time()))
|
self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, result, None, time.time()))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if isinstance(err, asyncio.CancelledError):
|
if isinstance(err, asyncio.CancelledError):
|
||||||
raise # pragma: no cover
|
raise
|
||||||
self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, None, err, time.time()))
|
self._request_debug_infos.append(SCPDRequestDebuggingInfo(name, kwargs, xml_bytes, None, err, time.time()))
|
||||||
raise UPnPError(f"Raised {str(type(err).__name__)}({str(err)}) parsing response for {name}")
|
raise UPnPError(f"Raised {str(type(err).__name__)}({str(err)}) parsing response for {name}")
|
||||||
return result
|
return result
|
||||||
|
@ -201,7 +200,7 @@ class SOAPCommands:
|
||||||
"""Returns None"""
|
"""Returns None"""
|
||||||
name = "AddPortMapping"
|
name = "AddPortMapping"
|
||||||
if not self.is_registered(name):
|
if not self.is_registered(name):
|
||||||
raise NotImplementedError() # pragma: no cover
|
raise NotImplementedError()
|
||||||
assert name in self._wrappers_kwargs
|
assert name in self._wrappers_kwargs
|
||||||
await self._wrappers_kwargs[name](
|
await self._wrappers_kwargs[name](
|
||||||
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol,
|
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol,
|
||||||
|
@ -217,7 +216,7 @@ class SOAPCommands:
|
||||||
"""
|
"""
|
||||||
name = "GetGenericPortMappingEntry"
|
name = "GetGenericPortMappingEntry"
|
||||||
if not self.is_registered(name):
|
if not self.is_registered(name):
|
||||||
raise NotImplementedError() # pragma: no cover
|
raise NotImplementedError()
|
||||||
assert name in self._wrappers_kwargs
|
assert name in self._wrappers_kwargs
|
||||||
result: GetGenericPortMappingEntryResponse = await self._wrappers_kwargs[name](
|
result: GetGenericPortMappingEntryResponse = await self._wrappers_kwargs[name](
|
||||||
NewPortMappingIndex=NewPortMappingIndex
|
NewPortMappingIndex=NewPortMappingIndex
|
||||||
|
@ -229,7 +228,7 @@ class SOAPCommands:
|
||||||
"""Returns (NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration)"""
|
"""Returns (NewInternalPort, NewInternalClient, NewEnabled, NewPortMappingDescription, NewLeaseDuration)"""
|
||||||
name = "GetSpecificPortMappingEntry"
|
name = "GetSpecificPortMappingEntry"
|
||||||
if not self.is_registered(name):
|
if not self.is_registered(name):
|
||||||
raise NotImplementedError() # pragma: no cover
|
raise NotImplementedError()
|
||||||
assert name in self._wrappers_kwargs
|
assert name in self._wrappers_kwargs
|
||||||
result: GetSpecificPortMappingEntryResponse = await self._wrappers_kwargs[name](
|
result: GetSpecificPortMappingEntryResponse = await self._wrappers_kwargs[name](
|
||||||
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
|
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
|
||||||
|
@ -240,7 +239,7 @@ class SOAPCommands:
|
||||||
"""Returns None"""
|
"""Returns None"""
|
||||||
name = "DeletePortMapping"
|
name = "DeletePortMapping"
|
||||||
if not self.is_registered(name):
|
if not self.is_registered(name):
|
||||||
raise NotImplementedError() # pragma: no cover
|
raise NotImplementedError()
|
||||||
assert name in self._wrappers_kwargs
|
assert name in self._wrappers_kwargs
|
||||||
await self._wrappers_kwargs[name](
|
await self._wrappers_kwargs[name](
|
||||||
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
|
NewRemoteHost=NewRemoteHost, NewExternalPort=NewExternalPort, NewProtocol=NewProtocol
|
||||||
|
@ -251,18 +250,18 @@ class SOAPCommands:
|
||||||
"""Returns (NewExternalIPAddress)"""
|
"""Returns (NewExternalIPAddress)"""
|
||||||
name = "GetExternalIPAddress"
|
name = "GetExternalIPAddress"
|
||||||
if not self.is_registered(name):
|
if not self.is_registered(name):
|
||||||
raise NotImplementedError() # pragma: no cover
|
raise NotImplementedError()
|
||||||
assert name in self._wrappers_no_args
|
assert name in self._wrappers_no_args
|
||||||
external_ip: str = await self._wrappers_no_args[name]()
|
result: str = await self._wrappers_no_args[name]()
|
||||||
if not is_valid_public_ipv4(external_ip):
|
if not result:
|
||||||
raise UPnPError(f"Got invalid external ipv4 address: {external_ip}")
|
raise UPnPError("Got null external ip address")
|
||||||
return external_ip
|
return result
|
||||||
|
|
||||||
# async def GetNATRSIPStatus(self) -> Tuple[bool, bool]:
|
# async def GetNATRSIPStatus(self) -> Tuple[bool, bool]:
|
||||||
# """Returns (NewRSIPAvailable, NewNATEnabled)"""
|
# """Returns (NewRSIPAvailable, NewNATEnabled)"""
|
||||||
# name = "GetNATRSIPStatus"
|
# name = "GetNATRSIPStatus"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[bool, bool] = await self._wrappers_no_args[name]()
|
# result: Tuple[bool, bool] = await self._wrappers_no_args[name]()
|
||||||
# return result[0], result[1]
|
# return result[0], result[1]
|
||||||
|
@ -271,7 +270,7 @@ class SOAPCommands:
|
||||||
# """Returns None"""
|
# """Returns None"""
|
||||||
# name = "SetConnectionType"
|
# name = "SetConnectionType"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_kwargs
|
# assert name in self._wrappers_kwargs
|
||||||
# await self._wrappers_kwargs[name](NewConnectionType=NewConnectionType)
|
# await self._wrappers_kwargs[name](NewConnectionType=NewConnectionType)
|
||||||
# return None
|
# return None
|
||||||
|
@ -280,7 +279,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewConnectionType, NewPossibleConnectionTypes)"""
|
# """Returns (NewConnectionType, NewPossibleConnectionTypes)"""
|
||||||
# name = "GetConnectionTypeInfo"
|
# name = "GetConnectionTypeInfo"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[str, str] = await self._wrappers_no_args[name]()
|
# result: Tuple[str, str] = await self._wrappers_no_args[name]()
|
||||||
# return result
|
# return result
|
||||||
|
@ -289,7 +288,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)"""
|
# """Returns (NewConnectionStatus, NewLastConnectionError, NewUptime)"""
|
||||||
# name = "GetStatusInfo"
|
# name = "GetStatusInfo"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[str, str, int] = await self._wrappers_no_args[name]()
|
# result: Tuple[str, str, int] = await self._wrappers_no_args[name]()
|
||||||
# return result
|
# return result
|
||||||
|
@ -298,7 +297,7 @@ class SOAPCommands:
|
||||||
# """Returns None"""
|
# """Returns None"""
|
||||||
# name = "ForceTermination"
|
# name = "ForceTermination"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# await self._wrappers_no_args[name]()
|
# await self._wrappers_no_args[name]()
|
||||||
# return None
|
# return None
|
||||||
|
@ -307,7 +306,7 @@ class SOAPCommands:
|
||||||
# """Returns None"""
|
# """Returns None"""
|
||||||
# name = "RequestConnection"
|
# name = "RequestConnection"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# await self._wrappers_no_args[name]()
|
# await self._wrappers_no_args[name]()
|
||||||
# return None
|
# return None
|
||||||
|
@ -317,7 +316,7 @@ class SOAPCommands:
|
||||||
# NewPhysicalLinkStatus)"""
|
# NewPhysicalLinkStatus)"""
|
||||||
# name = "GetCommonLinkProperties"
|
# name = "GetCommonLinkProperties"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[str, int, int, str] = await self._wrappers_no_args[name]()
|
# result: Tuple[str, int, int, str] = await self._wrappers_no_args[name]()
|
||||||
# return result
|
# return result
|
||||||
|
@ -326,7 +325,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewTotalBytesSent)"""
|
# """Returns (NewTotalBytesSent)"""
|
||||||
# name = "GetTotalBytesSent"
|
# name = "GetTotalBytesSent"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
||||||
# return result[0]
|
# return result[0]
|
||||||
|
@ -335,7 +334,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewTotalBytesReceived)"""
|
# """Returns (NewTotalBytesReceived)"""
|
||||||
# name = "GetTotalBytesReceived"
|
# name = "GetTotalBytesReceived"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
||||||
# return result[0]
|
# return result[0]
|
||||||
|
@ -344,7 +343,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewTotalPacketsSent)"""
|
# """Returns (NewTotalPacketsSent)"""
|
||||||
# name = "GetTotalPacketsSent"
|
# name = "GetTotalPacketsSent"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
||||||
# return result[0]
|
# return result[0]
|
||||||
|
@ -353,7 +352,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewTotalPacketsReceived)"""
|
# """Returns (NewTotalPacketsReceived)"""
|
||||||
# name = "GetTotalPacketsReceived"
|
# name = "GetTotalPacketsReceived"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
# result: Tuple[int] = await self._wrappers_no_args[name]()
|
||||||
# return result[0]
|
# return result[0]
|
||||||
|
@ -363,7 +362,7 @@ class SOAPCommands:
|
||||||
# Layer1DownstreamMaxBitRate, Uptime)"""
|
# Layer1DownstreamMaxBitRate, Uptime)"""
|
||||||
# name = "X_GetICSStatistics"
|
# name = "X_GetICSStatistics"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[int, int, int, int, str, str] = await self._wrappers_no_args[name]()
|
# result: Tuple[int, int, int, int, str, str] = await self._wrappers_no_args[name]()
|
||||||
# return result
|
# return result
|
||||||
|
@ -372,7 +371,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewDefaultConnectionService)"""
|
# """Returns (NewDefaultConnectionService)"""
|
||||||
# name = "GetDefaultConnectionService"
|
# name = "GetDefaultConnectionService"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[str] = await self._wrappers_no_args[name]()
|
# result: Tuple[str] = await self._wrappers_no_args[name]()
|
||||||
# return result[0]
|
# return result[0]
|
||||||
|
@ -381,7 +380,7 @@ class SOAPCommands:
|
||||||
# """Returns (None)"""
|
# """Returns (None)"""
|
||||||
# name = "SetDefaultConnectionService"
|
# name = "SetDefaultConnectionService"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_kwargs
|
# assert name in self._wrappers_kwargs
|
||||||
# await self._wrappers_kwargs[name](NewDefaultConnectionService=NewDefaultConnectionService)
|
# await self._wrappers_kwargs[name](NewDefaultConnectionService=NewDefaultConnectionService)
|
||||||
# return None
|
# return None
|
||||||
|
@ -389,7 +388,7 @@ class SOAPCommands:
|
||||||
# async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None:
|
# async def SetEnabledForInternet(self, NewEnabledForInternet: bool) -> None:
|
||||||
# name = "SetEnabledForInternet"
|
# name = "SetEnabledForInternet"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_kwargs
|
# assert name in self._wrappers_kwargs
|
||||||
# await self._wrappers_kwargs[name](NewEnabledForInternet=NewEnabledForInternet)
|
# await self._wrappers_kwargs[name](NewEnabledForInternet=NewEnabledForInternet)
|
||||||
# return None
|
# return None
|
||||||
|
@ -397,7 +396,7 @@ class SOAPCommands:
|
||||||
# async def GetEnabledForInternet(self) -> bool:
|
# async def GetEnabledForInternet(self) -> bool:
|
||||||
# name = "GetEnabledForInternet"
|
# name = "GetEnabledForInternet"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[bool] = await self._wrappers_no_args[name]()
|
# result: Tuple[bool] = await self._wrappers_no_args[name]()
|
||||||
# return result[0]
|
# return result[0]
|
||||||
|
@ -405,7 +404,7 @@ class SOAPCommands:
|
||||||
# async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int) -> None:
|
# async def GetMaximumActiveConnections(self, NewActiveConnectionIndex: int) -> None:
|
||||||
# name = "GetMaximumActiveConnections"
|
# name = "GetMaximumActiveConnections"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_kwargs
|
# assert name in self._wrappers_kwargs
|
||||||
# await self._wrappers_kwargs[name](NewActiveConnectionIndex=NewActiveConnectionIndex)
|
# await self._wrappers_kwargs[name](NewActiveConnectionIndex=NewActiveConnectionIndex)
|
||||||
# return None
|
# return None
|
||||||
|
@ -414,7 +413,7 @@ class SOAPCommands:
|
||||||
# """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID"""
|
# """Returns (NewActiveConnDeviceContainer, NewActiveConnectionServiceID"""
|
||||||
# name = "GetActiveConnections"
|
# name = "GetActiveConnections"
|
||||||
# if not self.is_registered(name):
|
# if not self.is_registered(name):
|
||||||
# raise NotImplementedError() # pragma: no cover
|
# raise NotImplementedError()
|
||||||
# assert name in self._wrappers_no_args
|
# assert name in self._wrappers_no_args
|
||||||
# result: Tuple[str, str] = await self._wrappers_no_args[name]()
|
# result: Tuple[str, str] = await self._wrappers_no_args[name]()
|
||||||
# return result
|
# return result
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
|
import socket
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
import typing
|
import typing
|
||||||
import netifaces
|
import netifaces
|
||||||
from aioupnp.fault import UPnPError
|
from aioupnp.fault import UPnPError
|
||||||
|
|
||||||
|
|
||||||
def get_netifaces(): # pragma: no cover
|
def get_netifaces():
|
||||||
return netifaces
|
return netifaces
|
||||||
|
|
||||||
|
|
||||||
|
@ -24,11 +25,11 @@ def _get_gateways() -> typing.Dict[typing.Union[str, int],
|
||||||
|
|
||||||
def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]:
|
def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]:
|
||||||
gateways = _get_gateways()
|
gateways = _get_gateways()
|
||||||
infos = gateways[netifaces.AF_INET]
|
infos = gateways[socket.AF_INET]
|
||||||
assert isinstance(infos, list), TypeError(f"expected list from netifaces, got a dict")
|
assert isinstance(infos, list), TypeError(f"expected list from netifaces, got a dict")
|
||||||
interface_infos: typing.List[typing.Tuple[str, str, bool]] = infos
|
interface_infos: typing.List[typing.Tuple[str, str, bool]] = infos
|
||||||
result: typing.Dict[str, typing.Tuple[str, str]] = OrderedDict(
|
result: typing.Dict[str, typing.Tuple[str, str]] = OrderedDict(
|
||||||
(interface_name, (router_address, ifaddresses(interface_name)[netifaces.AF_INET][0]['addr']))
|
(interface_name, (router_address, ifaddresses(interface_name)[socket.AF_INET][0]['addr']))
|
||||||
for router_address, interface_name, _ in interface_infos
|
for router_address, interface_name, _ in interface_infos
|
||||||
)
|
)
|
||||||
for interface_name in _get_interfaces():
|
for interface_name in _get_interfaces():
|
||||||
|
@ -42,7 +43,7 @@ def get_interfaces() -> typing.Dict[str, typing.Tuple[str, str]]:
|
||||||
_default = gateways['default']
|
_default = gateways['default']
|
||||||
assert isinstance(_default, dict), TypeError(f"expected dict from netifaces, got a list")
|
assert isinstance(_default, dict), TypeError(f"expected dict from netifaces, got a list")
|
||||||
default: typing.Dict[int, typing.Tuple[str, str]] = _default
|
default: typing.Dict[int, typing.Tuple[str, str]] = _default
|
||||||
result['default'] = result[default[netifaces.AF_INET][1]]
|
result['default'] = result[default[socket.AF_INET][1]]
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import sys
|
|
||||||
import struct
|
import struct
|
||||||
import socket
|
import socket
|
||||||
import typing
|
import typing
|
||||||
|
@ -7,17 +6,11 @@ from asyncio.transports import DatagramTransport
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
|
|
||||||
SOCKET_TYPES = (socket.SocketType, mock.MagicMock)
|
|
||||||
if sys.version_info >= (3, 8): # pragma: no cover
|
|
||||||
from asyncio.trsock import TransportSocket
|
|
||||||
SOCKET_TYPES = (socket.SocketType, TransportSocket, mock.MagicMock)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_sock(transport: typing.Optional[DatagramTransport]) -> typing.Optional[socket.socket]:
|
def _get_sock(transport: typing.Optional[DatagramTransport]) -> typing.Optional[socket.socket]:
|
||||||
if transport is None or not hasattr(transport, "_extra"):
|
if transport is None or not hasattr(transport, "_extra"):
|
||||||
return None
|
return None
|
||||||
sock: typing.Optional[socket.socket] = transport.get_extra_info('socket', None)
|
sock: typing.Optional[socket.socket] = transport.get_extra_info('socket', None)
|
||||||
assert sock is None or isinstance(sock, SOCKET_TYPES)
|
assert sock is None or isinstance(sock, (socket.SocketType, mock.MagicMock))
|
||||||
return sock
|
return sock
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,6 @@ class SCPDHTTPClientProtocol(Protocol):
|
||||||
self._response_msg = b""
|
self._response_msg = b""
|
||||||
self._content_length = 0
|
self._content_length = 0
|
||||||
self._got_headers = False
|
self._got_headers = False
|
||||||
self._has_content_length = True
|
|
||||||
self._headers: typing.Dict[bytes, bytes] = {}
|
self._headers: typing.Dict[bytes, bytes] = {}
|
||||||
self._body = b""
|
self._body = b""
|
||||||
self.transport: typing.Optional[asyncio.WriteTransport] = None
|
self.transport: typing.Optional[asyncio.WriteTransport] = None
|
||||||
|
@ -68,57 +67,35 @@ class SCPDHTTPClientProtocol(Protocol):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def data_received(self, data: bytes) -> None:
|
def data_received(self, data: bytes) -> None:
|
||||||
if self.finished.done(): # possible to hit during tests
|
|
||||||
return
|
|
||||||
self.response_buff += data
|
self.response_buff += data
|
||||||
for i, line in enumerate(self.response_buff.split(b'\r\n')):
|
for i, line in enumerate(self.response_buff.split(b'\r\n')):
|
||||||
if not line: # we hit the blank line between the headers and the body
|
if not line: # we hit the blank line between the headers and the body
|
||||||
if i == (len(self.response_buff.split(b'\r\n')) - 1):
|
if i == (len(self.response_buff.split(b'\r\n')) - 1):
|
||||||
return None # the body is still yet to be written
|
return None # the body is still yet to be written
|
||||||
if not self._got_headers:
|
if not self._got_headers:
|
||||||
try:
|
self._headers, self._response_code, self._response_msg = parse_headers(
|
||||||
self._headers, self._response_code, self._response_msg = parse_headers(
|
b'\r\n'.join(self.response_buff.split(b'\r\n')[:i])
|
||||||
b'\r\n'.join(self.response_buff.split(b'\r\n')[:i])
|
)
|
||||||
)
|
|
||||||
except ValueError as err:
|
|
||||||
self.finished.set_exception(UPnPError(str(err)))
|
|
||||||
return
|
|
||||||
content_length = get_dict_val_case_insensitive(
|
content_length = get_dict_val_case_insensitive(
|
||||||
self._headers, b'Content-Length'
|
self._headers, b'Content-Length'
|
||||||
)
|
)
|
||||||
if content_length is not None:
|
if content_length is None:
|
||||||
self._content_length = int(content_length)
|
return None
|
||||||
else:
|
self._content_length = int(content_length or 0)
|
||||||
self._has_content_length = False
|
|
||||||
self._got_headers = True
|
self._got_headers = True
|
||||||
if self._got_headers and self._has_content_length:
|
body = b'\r\n'.join(self.response_buff.split(b'\r\n')[i+1:])
|
||||||
body = b'\r\n'.join(self.response_buff.split(b'\r\n')[i+1:])
|
if self._content_length == len(body):
|
||||||
if self._content_length == len(body):
|
|
||||||
self.finished.set_result((self.response_buff, body, self._response_code, self._response_msg))
|
|
||||||
elif self._content_length > len(body):
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
self.finished.set_exception(
|
|
||||||
UPnPError(
|
|
||||||
"too many bytes written to response (%i vs %i expected)" % (
|
|
||||||
len(body), self._content_length
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
elif any(map(self.response_buff.endswith, (b"</root>\r\n", b"</scpd>\r\n"))):
|
|
||||||
# Actiontec has a router that doesn't give a Content-Length for the gateway xml
|
|
||||||
body = b'\r\n'.join(self.response_buff.split(b'\r\n')[i+1:])
|
|
||||||
self.finished.set_result((self.response_buff, body, self._response_code, self._response_msg))
|
self.finished.set_result((self.response_buff, body, self._response_code, self._response_msg))
|
||||||
elif len(self.response_buff) >= 65535:
|
elif self._content_length > len(body):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
self.finished.set_exception(
|
self.finished.set_exception(
|
||||||
UPnPError(
|
UPnPError(
|
||||||
"too many bytes written to response (%i) with unspecified content length" % len(self.response_buff)
|
"too many bytes written to response (%i vs %i expected)" % (
|
||||||
|
len(body), self._content_length
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return
|
)
|
||||||
else:
|
|
||||||
# needed for the actiontec case
|
|
||||||
pass
|
|
||||||
return None
|
return None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
|
@ -171,8 +171,8 @@ async def multi_m_search(lan_address: str, gateway_address: str, timeout: int =
|
||||||
protocol, gateway_address, lan_address = await listen_ssdp(
|
protocol, gateway_address, lan_address = await listen_ssdp(
|
||||||
lan_address, gateway_address, loop
|
lan_address, gateway_address, loop
|
||||||
)
|
)
|
||||||
fut = protocol.send_m_searches(
|
fut = asyncio.ensure_future(protocol.send_m_searches(
|
||||||
address=gateway_address, datagrams=list(packet_generator())
|
address=gateway_address, datagrams=list(packet_generator())
|
||||||
)
|
), loop=loop)
|
||||||
loop.call_later(timeout, lambda: None if not fut or fut.done() else fut.cancel())
|
loop.call_later(timeout, lambda: None if not fut or fut.done() else fut.cancel())
|
||||||
return protocol
|
return protocol
|
||||||
|
|
|
@ -39,7 +39,7 @@ def serialize_scpd_get(path: str, address: str) -> bytes:
|
||||||
|
|
||||||
def deserialize_scpd_get_response(content: bytes) -> Dict[str, Any]:
|
def deserialize_scpd_get_response(content: bytes) -> Dict[str, Any]:
|
||||||
if XML_VERSION_PREFIX.encode() in content:
|
if XML_VERSION_PREFIX.encode() in content:
|
||||||
parsed: List[Tuple[str, str]] = CONTENT_PATTERN.findall(content.decode())
|
parsed: List[Tuple[bytes, bytes]] = CONTENT_PATTERN.findall(content.decode())
|
||||||
xml_dict = xml_to_dict('' if not parsed else parsed[0][0])
|
xml_dict = xml_to_dict('' if not parsed else parsed[0][0])
|
||||||
return parse_device_dict(xml_dict)
|
return parse_device_dict(xml_dict)
|
||||||
return {}
|
return {}
|
||||||
|
|
|
@ -13,7 +13,7 @@ CONTENT_NO_XML_VERSION_PATTERN = re.compile(
|
||||||
|
|
||||||
def serialize_soap_post(method: str, param_names: typing.List[str], service_id: bytes, gateway_address: bytes,
|
def serialize_soap_post(method: str, param_names: typing.List[str], service_id: bytes, gateway_address: bytes,
|
||||||
control_url: bytes, **kwargs: typing.Dict[str, str]) -> bytes:
|
control_url: bytes, **kwargs: typing.Dict[str, str]) -> bytes:
|
||||||
args = "".join(f"<{param_name}>{kwargs.get(param_name, '')}</{param_name}>" for param_name in param_names)
|
args = "".join(f"<{n}>{kwargs.get(n)}</{n}>" for n in param_names)
|
||||||
soap_body = (f'\r\n{XML_VERSION}\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" '
|
soap_body = (f'\r\n{XML_VERSION}\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" '
|
||||||
f's:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body>'
|
f's:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body>'
|
||||||
f'<u:{method} xmlns:u="{service_id.decode()}">{args}</u:{method}></s:Body></s:Envelope>')
|
f'<u:{method} xmlns:u="{service_id.decode()}">{args}</u:{method}></s:Body></s:Envelope>')
|
||||||
|
|
|
@ -3,13 +3,23 @@ import logging
|
||||||
import binascii
|
import binascii
|
||||||
import json
|
import json
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from typing import List, Optional, Dict, Union, Callable
|
from typing import List, Optional, Dict, Union, Tuple, Callable
|
||||||
from aioupnp.fault import UPnPError
|
from aioupnp.fault import UPnPError
|
||||||
from aioupnp.constants import line_separator
|
from aioupnp.constants import line_separator
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
_template = "(?i)^(%s):[ ]*(.*)$"
|
_template = "(?i)^(%s):[ ]*(.*)$"
|
||||||
|
vendor_pattern = re.compile("^([\w|\d]*)\.([\w|\d]*\.com):([ \"|\w|\d\:]*)$")
|
||||||
|
|
||||||
|
|
||||||
|
def match_vendor(line: str) -> Optional[Tuple[str, str]]:
|
||||||
|
match: List[Tuple[str, str, str]] = vendor_pattern.findall(line)
|
||||||
|
if match:
|
||||||
|
vendor_key: str = match[-1][0].lstrip(" ").rstrip(" ")
|
||||||
|
vendor_value: str = match[-1][2].lstrip(" ").rstrip(" ")
|
||||||
|
return vendor_key, vendor_value
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def compile_find(pattern: str) -> Callable[[str], Optional[str]]:
|
def compile_find(pattern: str) -> Callable[[str], Optional[str]]:
|
||||||
|
@ -59,6 +69,8 @@ class SSDPDatagram:
|
||||||
_OK: "m-search response"
|
_OK: "m-search response"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_vendor_field_pattern = vendor_pattern
|
||||||
|
|
||||||
_required_fields: Dict[str, List[str]] = {
|
_required_fields: Dict[str, List[str]] = {
|
||||||
_M_SEARCH: [
|
_M_SEARCH: [
|
||||||
'host',
|
'host',
|
||||||
|
@ -118,7 +130,10 @@ class SSDPDatagram:
|
||||||
def get_cli_igd_kwargs(self) -> str:
|
def get_cli_igd_kwargs(self) -> str:
|
||||||
fields = []
|
fields = []
|
||||||
for field in self._field_order:
|
for field in self._field_order:
|
||||||
fields.append("--%s=%s" % (self._case_mappings.get(field, field), getattr(self, field, None)))
|
v = getattr(self, field, None)
|
||||||
|
if v is None:
|
||||||
|
raise UPnPError("missing required field %s" % field)
|
||||||
|
fields.append("--%s=%s" % (self._case_mappings.get(field, field), v))
|
||||||
return " ".join(fields)
|
return " ".join(fields)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
|
@ -159,6 +174,11 @@ class SSDPDatagram:
|
||||||
raise UPnPError(
|
raise UPnPError(
|
||||||
f"failed to decode datagram: {binascii.hexlify(datagram).decode()}"
|
f"failed to decode datagram: {binascii.hexlify(datagram).decode()}"
|
||||||
)
|
)
|
||||||
|
for attr_name in packet._required_fields[packet._packet_type]:
|
||||||
|
if getattr(packet, attr_name, None) is None:
|
||||||
|
raise UPnPError(
|
||||||
|
"required field for {} is missing from m-search response: {}".format(packet._packet_type, attr_name)
|
||||||
|
)
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -186,6 +206,10 @@ class SSDPDatagram:
|
||||||
matched = True
|
matched = True
|
||||||
matched_keys.append(name)
|
matched_keys.append(name)
|
||||||
break
|
break
|
||||||
|
if not matched:
|
||||||
|
matched_vendor = match_vendor(line)
|
||||||
|
if matched_vendor and matched_vendor[0] not in result:
|
||||||
|
result[matched_vendor[0]] = matched_vendor[1]
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|
|
@ -37,7 +37,7 @@ class UPnP:
|
||||||
return cls.delete_port_mapping.__annotations__, cls.delete_port_mapping.__doc__
|
return cls.delete_port_mapping.__annotations__, cls.delete_port_mapping.__doc__
|
||||||
if command == "get_next_mapping":
|
if command == "get_next_mapping":
|
||||||
return cls.get_next_mapping.__annotations__, cls.get_next_mapping.__doc__
|
return cls.get_next_mapping.__annotations__, cls.get_next_mapping.__doc__
|
||||||
raise AttributeError(command) # pragma: no cover
|
raise AttributeError(command)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_lan_and_gateway(lan_address: str = '', gateway_address: str = '',
|
def get_lan_and_gateway(lan_address: str = '', gateway_address: str = '',
|
||||||
|
@ -107,7 +107,7 @@ class UPnP:
|
||||||
return await self.gateway.commands.GetExternalIPAddress()
|
return await self.gateway.commands.GetExternalIPAddress()
|
||||||
|
|
||||||
async def add_port_mapping(self, external_port: int, protocol: str, internal_port: int, lan_address: str,
|
async def add_port_mapping(self, external_port: int, protocol: str, internal_port: int, lan_address: str,
|
||||||
description: str, lease_time: int = 0) -> None:
|
description: str) -> None:
|
||||||
"""
|
"""
|
||||||
Add a new port mapping
|
Add a new port mapping
|
||||||
|
|
||||||
|
@ -116,13 +116,12 @@ class UPnP:
|
||||||
:param internal_port: (int) internal port
|
:param internal_port: (int) internal port
|
||||||
:param lan_address: (str) internal lan address
|
:param lan_address: (str) internal lan address
|
||||||
:param description: (str) mapping description
|
:param description: (str) mapping description
|
||||||
:param lease_time: (int) lease time in seconds
|
|
||||||
:return: None
|
:return: None
|
||||||
"""
|
"""
|
||||||
await self.gateway.commands.AddPortMapping(
|
await self.gateway.commands.AddPortMapping(
|
||||||
NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol,
|
NewRemoteHost='', NewExternalPort=external_port, NewProtocol=protocol,
|
||||||
NewInternalPort=internal_port, NewInternalClient=lan_address,
|
NewInternalPort=internal_port, NewInternalClient=lan_address,
|
||||||
NewEnabled=1, NewPortMappingDescription=description, NewLeaseDuration=str(lease_time)
|
NewEnabled=1, NewPortMappingDescription=description, NewLeaseDuration='0'
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -209,7 +208,7 @@ class UPnP:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def get_next_mapping(self, port: int, protocol: str, description: str,
|
async def get_next_mapping(self, port: int, protocol: str, description: str,
|
||||||
internal_port: Optional[int] = None, lease_time: int = 0) -> int:
|
internal_port: Optional[int] = None) -> int:
|
||||||
"""
|
"""
|
||||||
Get a new port mapping. If the requested port is not available, increment until the next free port is mapped
|
Get a new port mapping. If the requested port is not available, increment until the next free port is mapped
|
||||||
|
|
||||||
|
@ -217,7 +216,6 @@ class UPnP:
|
||||||
:param protocol: (str) UDP | TCP
|
:param protocol: (str) UDP | TCP
|
||||||
:param description: (str) mapping description
|
:param description: (str) mapping description
|
||||||
:param internal_port: (int) internal port
|
:param internal_port: (int) internal port
|
||||||
:param lease_time: (int) lease time in seconds
|
|
||||||
|
|
||||||
:return: (int) mapped port
|
:return: (int) mapped port
|
||||||
"""
|
"""
|
||||||
|
@ -237,7 +235,7 @@ class UPnP:
|
||||||
if int_host == self.lan_address and int_port == requested_port and desc == description:
|
if int_host == self.lan_address and int_port == requested_port and desc == description:
|
||||||
return port
|
return port
|
||||||
port += 1
|
port += 1
|
||||||
await self.add_port_mapping(port, protocol, _internal_port, self.lan_address, description, lease_time)
|
await self.add_port_mapping(port, protocol, _internal_port, self.lan_address, description)
|
||||||
return port
|
return port
|
||||||
|
|
||||||
async def gather_debug_info(self) -> str: # pragma: no cover
|
async def gather_debug_info(self) -> str: # pragma: no cover
|
||||||
|
@ -262,18 +260,15 @@ class UPnP:
|
||||||
await self.get_redirects()
|
await self.get_redirects()
|
||||||
except UPnPError:
|
except UPnPError:
|
||||||
pass
|
pass
|
||||||
external_port = 0
|
|
||||||
made_mapping = False
|
|
||||||
try:
|
try:
|
||||||
external_port = await self.get_next_mapping(1234, 'TCP', 'aioupnp testing')
|
external_port = await self.get_next_mapping(1234, 'TCP', 'aioupnp testing')
|
||||||
made_mapping = True
|
|
||||||
except UPnPError:
|
except UPnPError:
|
||||||
pass
|
external_port = None
|
||||||
try:
|
try:
|
||||||
await self.get_redirects()
|
await self.get_redirects()
|
||||||
except UPnPError:
|
except UPnPError:
|
||||||
pass
|
pass
|
||||||
if made_mapping:
|
if external_port:
|
||||||
try:
|
try:
|
||||||
await self.delete_port_mapping(external_port, 'TCP')
|
await self.delete_port_mapping(external_port, 'TCP')
|
||||||
except UPnPError:
|
except UPnPError:
|
||||||
|
@ -412,12 +407,12 @@ def run_cli(method: str, igd_args: Dict[str, Union[bool, str, int]], lan_address
|
||||||
u = await UPnP.discover(
|
u = await UPnP.discover(
|
||||||
lan_address, gateway_address, timeout, igd_args, interface_name, loop=loop
|
lan_address, gateway_address, timeout, igd_args, interface_name, loop=loop
|
||||||
)
|
)
|
||||||
except UPnPError as err: # pragma: no cover
|
except UPnPError as err:
|
||||||
fut.set_exception(err)
|
fut.set_exception(err)
|
||||||
return
|
return
|
||||||
if method not in cli_commands:
|
if method not in cli_commands:
|
||||||
fut.set_exception(UPnPError("\"%s\" is not a recognized command" % method)) # pragma: no cover
|
fut.set_exception(UPnPError("\"%s\" is not a recognized command" % method))
|
||||||
return # pragma: no cover
|
return
|
||||||
else:
|
else:
|
||||||
fn = getattr(u, method)
|
fn = getattr(u, method)
|
||||||
|
|
||||||
|
@ -426,7 +421,8 @@ def run_cli(method: str, igd_args: Dict[str, Union[bool, str, int]], lan_address
|
||||||
fut.set_result(result)
|
fut.set_result(result)
|
||||||
except UPnPError as err:
|
except UPnPError as err:
|
||||||
fut.set_exception(err)
|
fut.set_exception(err)
|
||||||
except Exception as err: # pragma: no cover
|
|
||||||
|
except Exception as err:
|
||||||
log.exception("uncaught error")
|
log.exception("uncaught error")
|
||||||
fut.set_exception(UPnPError("uncaught error: %s" % str(err)))
|
fut.set_exception(UPnPError("uncaught error: %s" % str(err)))
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
import sys
|
|
||||||
import ipaddress
|
|
||||||
import typing
|
import typing
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
@ -49,21 +47,3 @@ def get_dict_val_case_insensitive(source: typing.Dict[typing.AnyStr, typing.AnyS
|
||||||
matched_key: typing.AnyStr = match[0]
|
matched_key: typing.AnyStr = match[0]
|
||||||
return source[matched_key]
|
return source[matched_key]
|
||||||
raise KeyError("overlapping keys")
|
raise KeyError("overlapping keys")
|
||||||
|
|
||||||
|
|
||||||
# the ipaddress module does not show these subnets as reserved
|
|
||||||
CARRIER_GRADE_NAT_SUBNET = ipaddress.ip_network('100.64.0.0/10')
|
|
||||||
IPV4_TO_6_RELAY_SUBNET = ipaddress.ip_network('192.88.99.0/24')
|
|
||||||
|
|
||||||
|
|
||||||
def is_valid_public_ipv4(address):
|
|
||||||
try:
|
|
||||||
parsed_ip = ipaddress.ip_address(address)
|
|
||||||
if any((parsed_ip.version != 4, parsed_ip.is_unspecified, parsed_ip.is_link_local, parsed_ip.is_loopback,
|
|
||||||
parsed_ip.is_multicast, parsed_ip.is_reserved, parsed_ip.is_private, parsed_ip.is_reserved)):
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
return not any((CARRIER_GRADE_NAT_SUBNET.overlaps(ipaddress.ip_network(f"{address}/32")),
|
|
||||||
IPV4_TO_6_RELAY_SUBNET.overlaps(ipaddress.ip_network(f"{address}/32"))))
|
|
||||||
except (ipaddress.AddressValueError, ValueError):
|
|
||||||
return False
|
|
||||||
|
|
|
@ -1,511 +0,0 @@
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import socket
|
|
||||||
import ctypes
|
|
||||||
import contextlib
|
|
||||||
import struct
|
|
||||||
import binascii
|
|
||||||
import enum
|
|
||||||
import re
|
|
||||||
import time
|
|
||||||
import base64
|
|
||||||
import ssl
|
|
||||||
import asyncio
|
|
||||||
import codecs
|
|
||||||
import typing
|
|
||||||
import json
|
|
||||||
from ctypes import c_char, c_short
|
|
||||||
from typing import Tuple
|
|
||||||
import aioupnp
|
|
||||||
from aioupnp.upnp import UPnP, UPnPError, get_gateway_and_lan_addresses
|
|
||||||
from aioupnp.constants import SSDP_IP_ADDRESS
|
|
||||||
import certifi
|
|
||||||
import aiohttp
|
|
||||||
import miniupnpc
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
_IFF_PROMISC = 0x0100
|
|
||||||
_SIOCGIFFLAGS = 0x8913 # get the active flags
|
|
||||||
_SIOCSIFFLAGS = 0x8914 # set the active flags
|
|
||||||
_ETH_P_ALL = 0x0003 # all protocols
|
|
||||||
ETHER_HEADER_LEN = 6 + 6 + 2
|
|
||||||
VLAN_HEADER_LEN = 2
|
|
||||||
|
|
||||||
printable = re.compile(b"([a-z0-9!\"#$%&'()*+,.\/:;<=>?@\[\] ^_`{|}~-]*)")
|
|
||||||
|
|
||||||
|
|
||||||
class PacketTypes(enum.Enum): # if_packet.h
|
|
||||||
HOST = 0
|
|
||||||
BROADCAST = 1
|
|
||||||
MULTICAST = 2
|
|
||||||
OTHERHOST = 3
|
|
||||||
OUTGOING = 4
|
|
||||||
LOOPBACK = 5
|
|
||||||
FASTROUTE = 6
|
|
||||||
|
|
||||||
|
|
||||||
class Layer2(enum.Enum): # https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
|
|
||||||
IPv4 = 0x0800
|
|
||||||
ARP = 0x0806
|
|
||||||
VLAN = 0x8100
|
|
||||||
MVRP = 0x88f5
|
|
||||||
MMRP = 0x88f6
|
|
||||||
IPv6 = 0x86dd
|
|
||||||
GRE = 0xb7ea
|
|
||||||
|
|
||||||
|
|
||||||
class Layer3(enum.Enum): # https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
|
|
||||||
ICMP = 1
|
|
||||||
IGMP = 2
|
|
||||||
TCP = 6
|
|
||||||
UDP = 17
|
|
||||||
|
|
||||||
|
|
||||||
class _ifreq(ctypes.Structure):
|
|
||||||
_fields_ = [("ifr_ifrn", c_char * 16),
|
|
||||||
("ifr_flags", c_short)]
|
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def _promiscuous_posix_socket_context(interface: str):
|
|
||||||
import fcntl # posix-only
|
|
||||||
sock = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(_ETH_P_ALL))
|
|
||||||
ifr = _ifreq()
|
|
||||||
ifr.ifr_ifrn = interface.encode()[:16]
|
|
||||||
fcntl.ioctl(sock, _SIOCGIFFLAGS, ifr) # get the flags
|
|
||||||
ifr.ifr_flags |= _IFF_PROMISC # add the promiscuous flag
|
|
||||||
fcntl.ioctl(sock, _SIOCSIFFLAGS, ifr) # update
|
|
||||||
sock.setblocking(False)
|
|
||||||
try:
|
|
||||||
yield sock
|
|
||||||
finally:
|
|
||||||
ifr.ifr_flags ^= _IFF_PROMISC # mask it off (remove)
|
|
||||||
fcntl.ioctl(sock, _SIOCSIFFLAGS, ifr) # update
|
|
||||||
print("closed posix promiscuous socket")
|
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def _promiscuous_non_posix_socket_context():
|
|
||||||
# the public network interface
|
|
||||||
HOST = socket.gethostbyname(socket.gethostname())
|
|
||||||
# create a raw socket and bind it to the public interface
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
|
|
||||||
# prevent socket from being left in TIME_WAIT state, enabling reuse
|
|
||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
||||||
sock.bind((HOST, 0))
|
|
||||||
# Include IP headers
|
|
||||||
sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
|
|
||||||
# receive all packages
|
|
||||||
sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
|
|
||||||
sock.setblocking(False)
|
|
||||||
try:
|
|
||||||
yield sock
|
|
||||||
finally:
|
|
||||||
# disable promiscuous mode
|
|
||||||
sock.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
|
|
||||||
print("closed non-posix promiscuous socket")
|
|
||||||
|
|
||||||
|
|
||||||
def promiscuous(interface: typing.Optional[str] = None) -> typing.ContextManager[socket.socket]:
|
|
||||||
if os.name == 'posix':
|
|
||||||
return _promiscuous_posix_socket_context(interface)
|
|
||||||
return _promiscuous_non_posix_socket_context()
|
|
||||||
|
|
||||||
|
|
||||||
def ipv4_to_str(addr: bytes) -> str:
|
|
||||||
return ".".join((str(b) for b in addr))
|
|
||||||
|
|
||||||
|
|
||||||
def pretty_mac(mac: bytes) -> str:
|
|
||||||
return ":".join((('0' if b < 16 else '') + hex(b)[2:] for b in mac))
|
|
||||||
|
|
||||||
|
|
||||||
def split_byte(b: int, bit=4) -> Tuple[bytes, bytes]:
|
|
||||||
return chr(((b >> (8-bit)) % 256) << (8-bit) >> (8-bit)).encode(), chr(((b << bit) % 256) >> bit).encode()
|
|
||||||
|
|
||||||
|
|
||||||
class EtherFrame:
|
|
||||||
__slots__ = [
|
|
||||||
'source_mac',
|
|
||||||
'target_mac',
|
|
||||||
'ether_type',
|
|
||||||
'vlan_id',
|
|
||||||
'tpid'
|
|
||||||
]
|
|
||||||
|
|
||||||
def __init__(self, source_mac: bytes, target_mac: bytes, ether_type: int,
|
|
||||||
vlan_id: typing.Optional[int] = None, tpid: typing.Optional[int] = None):
|
|
||||||
self.source_mac = source_mac
|
|
||||||
self.target_mac = target_mac
|
|
||||||
self.ether_type = ether_type
|
|
||||||
self.vlan_id = vlan_id
|
|
||||||
self.tpid = tpid
|
|
||||||
|
|
||||||
def encode(self) -> bytes:
|
|
||||||
if self.vlan_id is None:
|
|
||||||
return struct.pack("6s6sH", *(getattr(self, slot) for slot in self.__slots__[:-2]))
|
|
||||||
return struct.pack("6s6sHHH", *(getattr(self, slot) for slot in self.__slots__))
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def decode(cls, packet: bytes) -> Tuple['EtherFrame', bytes]:
|
|
||||||
vlan_id = None
|
|
||||||
tpid = None
|
|
||||||
if struct.unpack(f'!H', packet[12:14])[0] == Layer2.VLAN.value:
|
|
||||||
target_mac, source_mac, tpid, vlan_id, ether_type, data = struct.unpack(f'!6s6sHHH{len(packet) - ETHER_HEADER_LEN - VLAN_HEADER_LEN}s', packet)
|
|
||||||
else:
|
|
||||||
target_mac, source_mac, ether_type, data = struct.unpack(f'!6s6sH{len(packet) - ETHER_HEADER_LEN}s', packet)
|
|
||||||
return cls(source_mac, target_mac, ether_type, vlan_id, tpid), data
|
|
||||||
|
|
||||||
def debug(self) -> str:
|
|
||||||
if self.vlan_id is None:
|
|
||||||
return f"EtherFrame(source={pretty_mac(self.source_mac)}, target={pretty_mac(self.target_mac)}, " \
|
|
||||||
f"ether_type={Layer2(self.ether_type).name})"
|
|
||||||
return f"EtherFrame(source={pretty_mac(self.source_mac)}, target={pretty_mac(self.target_mac)}, " \
|
|
||||||
f"ether_type={Layer2(self.ether_type).name}, vlan={self.vlan_id})"
|
|
||||||
|
|
||||||
|
|
||||||
class IPv4Packet:
|
|
||||||
__slots__ = [
|
|
||||||
'ether_frame',
|
|
||||||
'version',
|
|
||||||
'header_length',
|
|
||||||
'dscp',
|
|
||||||
'ecn',
|
|
||||||
'total_length',
|
|
||||||
'identification',
|
|
||||||
'df',
|
|
||||||
'mf',
|
|
||||||
'flag',
|
|
||||||
'fragment_offset',
|
|
||||||
'ttl',
|
|
||||||
'protocol',
|
|
||||||
'header_checksum',
|
|
||||||
'_source_address',
|
|
||||||
'_destination_address',
|
|
||||||
'data',
|
|
||||||
'packet_type',
|
|
||||||
'interface'
|
|
||||||
]
|
|
||||||
|
|
||||||
ETHER_TYPE = Layer2.IPv4
|
|
||||||
|
|
||||||
def __init__(self, ether_frame: EtherFrame, version: int, header_length: int, dscp: int, ecn: int,
|
|
||||||
total_length: int, identification: int, mf: bool, df: bool, flag: bool, fragment_offset: int, ttl: int,
|
|
||||||
protocol: int, header_checksum: int, source_address: bytes, destination_address: bytes, data: bytes,
|
|
||||||
packet_type: int, interface: str):
|
|
||||||
self.ether_frame = ether_frame
|
|
||||||
self.version = version
|
|
||||||
self.header_length = header_length
|
|
||||||
self.dscp = dscp
|
|
||||||
self.ecn = ecn
|
|
||||||
self.total_length = total_length
|
|
||||||
self.identification = identification
|
|
||||||
self.mf = mf
|
|
||||||
self.df = df
|
|
||||||
self.flag = flag
|
|
||||||
self.fragment_offset = fragment_offset
|
|
||||||
self.ttl = ttl
|
|
||||||
self.protocol = Layer3(protocol)
|
|
||||||
self.header_checksum = header_checksum
|
|
||||||
self._source_address = source_address
|
|
||||||
self._destination_address = destination_address
|
|
||||||
self.data = data
|
|
||||||
self.packet_type = PacketTypes(packet_type)
|
|
||||||
self.interface = interface
|
|
||||||
|
|
||||||
@property
|
|
||||||
def source(self) -> str:
|
|
||||||
return ipv4_to_str(self._source_address)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def destination(self) -> str:
|
|
||||||
return ipv4_to_str(self._destination_address)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def checksum(header: bytes) -> int:
|
|
||||||
c = 0
|
|
||||||
for i in range(0, len(header), 2):
|
|
||||||
c += int.from_bytes(header[i:i + 2], 'big')
|
|
||||||
while c > 0xffff:
|
|
||||||
c %= 0xffff
|
|
||||||
while c > 0xffff:
|
|
||||||
c %= 0xffff
|
|
||||||
return c ^ 0xffff
|
|
||||||
|
|
||||||
def get_header(self) -> bytes:
|
|
||||||
version_and_hlen = (self.version << 4) + self.header_length
|
|
||||||
dscp_and_ecn = (self.dscp << 2) + self.ecn
|
|
||||||
flags = (4 if self.flag else 0) + (2 if self.df else 0) + (1 if self.mf else 0)
|
|
||||||
df_mf_and_fragment = (flags << 12) + self.fragment_offset
|
|
||||||
return struct.pack(
|
|
||||||
'!BBHHHBBH4s4s', version_and_hlen, dscp_and_ecn, self.total_length,
|
|
||||||
self.identification, df_mf_and_fragment, self.ttl, self.protocol.value,
|
|
||||||
self.header_checksum, self._source_address, self._destination_address
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def decode(cls, ether_frame: EtherFrame, packet: bytes, packet_type: int, interface: str) -> 'IPv4Packet':
|
|
||||||
if cls.checksum(packet[:20]):
|
|
||||||
raise ValueError(f'\nipv4 checksum failed, frame: {ether_frame.debug()}\n'
|
|
||||||
f'packet: {binascii.hexlify(packet).decode()}, checksum: {hex(cls.checksum(packet[:20]))}')
|
|
||||||
data_len = len(packet) - 20
|
|
||||||
version_and_hlen, dscp_and_ecn, tlen, ident, df_mf_and_fragment, ttl, proto, checksum, source, dest = \
|
|
||||||
struct.unpack(
|
|
||||||
f'!BBHHHBBH4s4s', packet[:20]
|
|
||||||
)
|
|
||||||
version, hlen = split_byte(version_and_hlen)
|
|
||||||
flags = df_mf_and_fragment >> 13
|
|
||||||
mask = (flags << 13) | df_mf_and_fragment
|
|
||||||
fragment = mask ^ df_mf_and_fragment
|
|
||||||
flag, df, mf = False, False, False
|
|
||||||
if flags % 2:
|
|
||||||
mf = True
|
|
||||||
flags -= 1
|
|
||||||
if flags % 2:
|
|
||||||
df = True
|
|
||||||
flags -= 2
|
|
||||||
if flags % 4:
|
|
||||||
flag = True
|
|
||||||
flags -= 4
|
|
||||||
dscp, ecn = split_byte(dscp_and_ecn, 6)
|
|
||||||
return cls(
|
|
||||||
ether_frame, ord(version), ord(hlen), ord(dscp), ord(ecn), tlen, ident, mf, df, flag, fragment, ttl,
|
|
||||||
proto, checksum, source, dest, packet[20:], packet_type, interface
|
|
||||||
)
|
|
||||||
|
|
||||||
def encode(self) -> bytes:
|
|
||||||
return self.ether_frame.encode() + self.get_header() + self.data
|
|
||||||
|
|
||||||
@property
|
|
||||||
def printable_data(self) -> str:
|
|
||||||
return b".".join(printable.findall(self.data)).decode()
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
return f"IPv4(protocol={self.protocol.name}, " \
|
|
||||||
f"iface={self.interface}, " \
|
|
||||||
f"type={self.packet_type.name}, " \
|
|
||||||
f"source={ipv4_to_str(self._source_address)}, " \
|
|
||||||
f"destination={ipv4_to_str(self._destination_address)}, " \
|
|
||||||
f"data_len={len(self.data)})"
|
|
||||||
|
|
||||||
|
|
||||||
def make_filter(l3_protocol=None, src=None, dst=None, invert=False):
|
|
||||||
def filter_packet(packet: IPv4Packet):
|
|
||||||
if l3_protocol and not Layer3(packet.protocol) == l3_protocol:
|
|
||||||
return False
|
|
||||||
if src and not packet.source == src:
|
|
||||||
return False
|
|
||||||
if dst and not packet.destination == dst:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
if invert:
|
|
||||||
return lambda packet: not filter_packet(packet)
|
|
||||||
return filter_packet
|
|
||||||
|
|
||||||
|
|
||||||
async def sniff_ipv4(filters=None, kill=None):
|
|
||||||
start = time.perf_counter()
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
|
|
||||||
async def sock_recv(sock, n):
|
|
||||||
"""Receive data from the socket.
|
|
||||||
|
|
||||||
The return value is a bytes object representing the data received.
|
|
||||||
The maximum amount of data to be received at once is specified by
|
|
||||||
nbytes.
|
|
||||||
"""
|
|
||||||
if loop._debug and sock.gettimeout() != 0:
|
|
||||||
raise ValueError("the socket must be non-blocking")
|
|
||||||
fut = loop.create_future()
|
|
||||||
_sock_recv(fut, None, sock, n)
|
|
||||||
return await fut
|
|
||||||
|
|
||||||
def _sock_recv(fut, registered_fd, sock, n):
|
|
||||||
# _sock_recv() can add itself as an I/O callback if the operation can't
|
|
||||||
# be done immediately. Don't use it directly, call sock_recv().
|
|
||||||
if registered_fd is not None:
|
|
||||||
# Remove the callback early. It should be rare that the
|
|
||||||
# selector says the fd is ready but the call still returns
|
|
||||||
# EAGAIN, and I am willing to take a hit in that case in
|
|
||||||
# order to simplify the common case.
|
|
||||||
loop.remove_reader(registered_fd)
|
|
||||||
if fut.cancelled():
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
data, flags = sock.recvfrom(n)
|
|
||||||
except (BlockingIOError, InterruptedError):
|
|
||||||
fd = sock.fileno()
|
|
||||||
loop.add_reader(fd, _sock_recv, fut, fd, sock, n)
|
|
||||||
except Exception as exc:
|
|
||||||
fut.set_exception(exc)
|
|
||||||
else:
|
|
||||||
fut.set_result((data, flags))
|
|
||||||
|
|
||||||
with promiscuous('lo') as sock:
|
|
||||||
while True:
|
|
||||||
if not kill:
|
|
||||||
data, flags = await sock_recv(sock, 9000)
|
|
||||||
else:
|
|
||||||
t = asyncio.create_task(sock_recv(sock, 9000))
|
|
||||||
await asyncio.wait([t, kill.wait()], return_when=asyncio.FIRST_COMPLETED)
|
|
||||||
if kill.is_set():
|
|
||||||
break
|
|
||||||
data, flags = await t
|
|
||||||
# https://stackoverflow.com/questions/42821309/how-to-interpret-result-of-recvfrom-raw-socket/45215859#45215859
|
|
||||||
if data:
|
|
||||||
try:
|
|
||||||
ether_frame, packet = EtherFrame.decode(data)
|
|
||||||
if ether_frame.ether_type == Layer2.IPv4.value:
|
|
||||||
interface, _, packet_type, _, _ = flags
|
|
||||||
ipv4 = IPv4Packet.decode(ether_frame, packet, packet_type, interface)
|
|
||||||
if not filters or any((f(ipv4) for f in filters)):
|
|
||||||
yield time.perf_counter() - start, ipv4
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
gateway, lan = get_gateway_and_lan_addresses('default')
|
|
||||||
|
|
||||||
done = asyncio.Event()
|
|
||||||
|
|
||||||
def discover_aioupnp():
|
|
||||||
async def _discover():
|
|
||||||
print("testing aioupnp")
|
|
||||||
try:
|
|
||||||
u = await UPnP.discover()
|
|
||||||
print("successfully detected router with aioupnp")
|
|
||||||
try:
|
|
||||||
await u.get_external_ip()
|
|
||||||
print("successfully detected external ip with aioupnp")
|
|
||||||
except UPnPError:
|
|
||||||
print("failed to detect external ip with aioupnp")
|
|
||||||
try:
|
|
||||||
await u.get_redirects()
|
|
||||||
print("successfully detected redirects with aioupnp")
|
|
||||||
except UPnPError:
|
|
||||||
print("failed to get redirects with aioupnp")
|
|
||||||
try:
|
|
||||||
external_port = await u.get_next_mapping(1234, 'TCP', 'aioupnp testing')
|
|
||||||
print("successfully set redirect with aioupnp")
|
|
||||||
except UPnPError:
|
|
||||||
print("failed to set redirect with aioupnp")
|
|
||||||
external_port = None
|
|
||||||
try:
|
|
||||||
await u.get_redirects()
|
|
||||||
print("successfully detected redirects with aioupnp")
|
|
||||||
except UPnPError:
|
|
||||||
print("failed to get redirects with aioupnp")
|
|
||||||
if external_port:
|
|
||||||
try:
|
|
||||||
print("successfully removed redirect with aioupnp")
|
|
||||||
await u.delete_port_mapping(external_port, 'TCP')
|
|
||||||
except UPnPError:
|
|
||||||
print("failed to delete redirect with aioupnp")
|
|
||||||
try:
|
|
||||||
await u.get_redirects()
|
|
||||||
print("successfully detected redirects with aioupnp")
|
|
||||||
except UPnPError:
|
|
||||||
print("failed to get redirects with aioupnp")
|
|
||||||
except UPnPError:
|
|
||||||
print("failed to discover router with aioupnp")
|
|
||||||
finally:
|
|
||||||
print("done with aioupnp test")
|
|
||||||
asyncio.create_task(_discover())
|
|
||||||
|
|
||||||
def discover_miniupnpc():
|
|
||||||
def _miniupnpc_discover():
|
|
||||||
try:
|
|
||||||
u = miniupnpc.UPnP()
|
|
||||||
except:
|
|
||||||
print("failed to create upnp object with miniupnpc")
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
u.discover()
|
|
||||||
except:
|
|
||||||
print("failed to detect router with miniupnpc")
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
u.selectigd()
|
|
||||||
print("successfully detected router with miniupnpc")
|
|
||||||
except:
|
|
||||||
print("failed to detect router with miniupnpc")
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
u.externalipaddress()
|
|
||||||
print("successfully detected external ip with miniupnpc")
|
|
||||||
except:
|
|
||||||
print("failed to detect external ip with miniupnpc")
|
|
||||||
return
|
|
||||||
|
|
||||||
async def _discover():
|
|
||||||
print("testing miniupnpc")
|
|
||||||
try:
|
|
||||||
await loop.run_in_executor(None, _miniupnpc_discover)
|
|
||||||
finally:
|
|
||||||
done.set()
|
|
||||||
print("done with miniupnpc test")
|
|
||||||
|
|
||||||
asyncio.create_task(_discover())
|
|
||||||
|
|
||||||
loop.call_later(0, discover_aioupnp)
|
|
||||||
loop.call_later(8, discover_miniupnpc)
|
|
||||||
start = time.perf_counter()
|
|
||||||
packets = []
|
|
||||||
try:
|
|
||||||
async for (ts, ipv4_packet) in sniff_ipv4([
|
|
||||||
make_filter(l3_protocol=Layer3.UDP, src=SSDP_IP_ADDRESS),
|
|
||||||
make_filter(l3_protocol=Layer3.UDP, dst=SSDP_IP_ADDRESS),
|
|
||||||
make_filter(l3_protocol=Layer3.UDP, src=lan, dst=gateway),
|
|
||||||
make_filter(l3_protocol=Layer3.UDP, src=gateway, dst=lan),
|
|
||||||
make_filter(l3_protocol=Layer3.TCP, src=lan, dst=gateway),
|
|
||||||
make_filter(l3_protocol=Layer3.TCP, src=gateway, dst=lan)], done):
|
|
||||||
packets.append(
|
|
||||||
(time.perf_counter() - start, ipv4_packet.packet_type.name,
|
|
||||||
ipv4_packet.source, ipv4_packet.destination, base64.b64encode(ipv4_packet.data).decode())
|
|
||||||
)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print("stopping")
|
|
||||||
finally:
|
|
||||||
with open("aioupnp-bug-report.json", "w") as cap_file:
|
|
||||||
cap_file.write(json.dumps(packets))
|
|
||||||
print(f"Wrote bug report: {os.path.abspath('aioupnp-bug-report.json')}")
|
|
||||||
print("Sending bug report")
|
|
||||||
ssl_ctx = ssl.create_default_context(
|
|
||||||
purpose=ssl.Purpose.CLIENT_AUTH, capath=certifi.where()
|
|
||||||
)
|
|
||||||
auth = aiohttp.BasicAuth(
|
|
||||||
base64.b64decode(codecs.encode('Ax5LZzR1o3q3Z3WjATASDwR5rKyHH0qOIRIbLmMXn2H=', 'rot_13')).decode(), ''
|
|
||||||
)
|
|
||||||
report_id = base64.b64encode(os.urandom(16)).decode()
|
|
||||||
async with aiohttp.ClientSession() as session:
|
|
||||||
for i, (ts, direction, source, destination, packet) in enumerate(packets):
|
|
||||||
post = {
|
|
||||||
'userId': report_id,
|
|
||||||
'event': 'aioupnp bug report',
|
|
||||||
'context': {
|
|
||||||
'library': {
|
|
||||||
'name': 'aioupnp',
|
|
||||||
'version': aioupnp.__version__
|
|
||||||
}
|
|
||||||
},
|
|
||||||
'properties': {
|
|
||||||
'sequence': i,
|
|
||||||
'ts': ts,
|
|
||||||
'direction': direction,
|
|
||||||
'source': source,
|
|
||||||
'destination': destination,
|
|
||||||
'packet': packet
|
|
||||||
},
|
|
||||||
}
|
|
||||||
async with session.request(method='POST', url='https://api.segment.io/v1/track',
|
|
||||||
headers={'Connection': 'Close'}, auth=auth, json=post, ssl=ssl_ctx):
|
|
||||||
sys.stdout.write(f"\r{'.' * i}")
|
|
||||||
sys.stdout.write("\n")
|
|
||||||
print("Successfully sent bug report, thanks for your contribution!")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(main())
|
|
|
@ -18,7 +18,7 @@ except ImportError:
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_reply=0.0, sent_udp_packets=None,
|
def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_reply=0.0, sent_udp_packets=None,
|
||||||
tcp_replies=None, tcp_delay_reply=0.0, sent_tcp_packets=None, add_potato_datagrams=False,
|
tcp_replies=None, tcp_delay_reply=0.0, sent_tcp_packets=None, add_potato_datagrams=False,
|
||||||
raise_oserror_on_bind=False, raise_connectionerror=False, tcp_chunk_size=100):
|
raise_oserror_on_bind=False, raise_connectionerror=False):
|
||||||
sent_udp_packets = sent_udp_packets if sent_udp_packets is not None else []
|
sent_udp_packets = sent_udp_packets if sent_udp_packets is not None else []
|
||||||
udp_replies = udp_replies or {}
|
udp_replies = udp_replies or {}
|
||||||
|
|
||||||
|
@ -36,8 +36,8 @@ def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_r
|
||||||
reply = tcp_replies[data]
|
reply = tcp_replies[data]
|
||||||
i = 0
|
i = 0
|
||||||
while i < len(reply):
|
while i < len(reply):
|
||||||
loop.call_later(tcp_delay_reply, p.data_received, reply[i:i+tcp_chunk_size])
|
loop.call_later(tcp_delay_reply, p.data_received, reply[i:i+100])
|
||||||
i += tcp_chunk_size
|
i += 100
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
@ -71,7 +71,6 @@ def mock_tcp_and_udp(loop, udp_expected_addr=None, udp_replies=None, udp_delay_r
|
||||||
loop.call_later(udp_delay_reply, p.datagram_received, udp_replies[(data, addr)],
|
loop.call_later(udp_delay_reply, p.datagram_received, udp_replies[(data, addr)],
|
||||||
(udp_expected_addr, 1900))
|
(udp_expected_addr, 1900))
|
||||||
|
|
||||||
|
|
||||||
return _sendto
|
return _sendto
|
||||||
|
|
||||||
protocol = proto_lam()
|
protocol = proto_lam()
|
||||||
|
|
|
@ -31,28 +31,6 @@ class TestSCPDGet(AsyncioTestCase):
|
||||||
b"\r\n" \
|
b"\r\n" \
|
||||||
b"%s" % bad_xml
|
b"%s" % bad_xml
|
||||||
|
|
||||||
|
|
||||||
bad_response2 = b"HTTP/1.1 200 OK\r\n" \
|
|
||||||
b"CONTENT-TYPE: text/xml\r\n" \
|
|
||||||
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
|
|
||||||
b"LAST-MODIFIED: Fri, 28 Sep 2018 18:35:48 GMT\r\n" \
|
|
||||||
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
|
|
||||||
b"X-User-Agent: redsonic\r\n" \
|
|
||||||
b"CONNECTION: close\r\n" \
|
|
||||||
b"\r\n" \
|
|
||||||
b"%s" % (b'.' * 65300)
|
|
||||||
|
|
||||||
bad_response3 = b"HTTP/1.1 200 OK\r\n" \
|
|
||||||
b"CONTENT-TYPE: text/xml\r\n" \
|
|
||||||
b"DATE: Thu, 18 Oct 2018 01:20:23 GMT\r\n" \
|
|
||||||
b"LAST-MODIFIED: Fri, 28 Sep 2018 18:35:48 GMT\r\n" \
|
|
||||||
b"CONTENT-TYPE: text/xml\r\n" \
|
|
||||||
b"SERVER: Linux/3.14.28-Prod_17.2, UPnP/1.0, Portable SDK for UPnP devices/1.6.22\r\n" \
|
|
||||||
b"X-User-Agent: redsonic\r\n" \
|
|
||||||
b"CONNECTION: close\r\n" \
|
|
||||||
b"\r\n" \
|
|
||||||
b"<?xml version=\"1.0\"?>\n<root xmlns=\"urn:schemas-upnp-org:device-1-0\">\n<specVersion>\n<major>1</major>\n<minor>0</minor>\n</specVersion>\n<device>\n<deviceType>urn:schemas-upnp-org:device:InternetGatewayDevice:1</deviceType>\n<friendlyName>CGA4131COM</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:11111111-2222-3333-4444-555555555556</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:Layer3Forwarding:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:L3Forwarding1</serviceId>\n<SCPDURL>/Layer3ForwardingSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/Layer3Forwarding</controlURL>\n<eventSubURL>/upnp/event/Layer3Forwarding</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n<device>\n<deviceType>urn:schemas-upnp-org:device:WANDevice:1</deviceType>\n<friendlyName>WANDevice:1</friendlyName>\n<manufacturer>Cisco</manufacturer>\n<manufacturerURL>http://www.cisco.com/</manufacturerURL>\n<modelDescription>CGA4131COM</modelDescription>\n<modelName>CGA4131COM</modelName>\n<modelNumber>CGA4131COM</modelNumber>\n<modelURL>http://www.cisco.com</modelURL>\n<serialNumber></serialNumber>\n<UDN>uuid:ebf5a0a0-1dd1-11b2-a92f-603d266f9915</UDN>\n<UPC>CGA4131COM</UPC>\n<serviceList>\n<service>\n<serviceType>urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1</serviceType>\n<serviceId>urn:upnp-org:serviceId:WANCommonIFC1</serviceId>\n<SCPDURL>/WANCommonInterfaceConfigSCPD.xml</SCPDURL>\n<controlURL>/upnp/control/WANCommonInterfaceConfig0</controlURL>\n<eventSubURL>/upnp/event/WANCommonInterfaceConfig0</eventSubURL>\n</service>\n</serviceList>\n<deviceList>\n <device>\n <deviceType>urn:schemas-upnp-org:device:WANConnectionDevice:1</deviceType>\n <friendlyName>WANConnectionDevice:1</friendlyName>\n <manufacturer>Cisco</manufacturer>\n <manufacturerURL>http://www.cisco.com/</manufacturerURL>\n <modelDescription>CGA4131COM</modelDescription>\n <modelName>CGA4131COM</modelName>\n <modelNumber>CGA4131COM</modelNumber>\n <modelURL>http://www.cisco.com</modelURL>\n <serialNumber></serialNumber>\n <UDN>uuid:11111111-2222-3333-4444-555555555555</UDN>\n <UPC>CGA4131COM</UPC>\n <serviceList>\n <service>\n <serviceType>urn:schemas-upnp-org:service:WANIPConnection:1</serviceType>\n <serviceId>urn:upnp-org:serviceId:WANIPConn1</serviceId>\n <SCPDURL>/WANIPConnectionServiceSCPD.xml</SCPDURL>\n <controlURL>/upnp/control/WANIPConnection0</controlURL>\n <eventSubURL>/upnp/event/WANIPConnection0</eventSubURL>\n </service>\n </serviceList>\n </device>\n</deviceList>\n</device>\n</deviceList>\n<presentationURL>http://10.1.10.1/</presentationURL></device>\n</root>\n"
|
|
||||||
|
|
||||||
expected_parsed = {
|
expected_parsed = {
|
||||||
'specVersion': {'major': '1', 'minor': '0'},
|
'specVersion': {'major': '1', 'minor': '0'},
|
||||||
'device': {
|
'device': {
|
||||||
|
@ -162,26 +140,6 @@ class TestSCPDGet(AsyncioTestCase):
|
||||||
self.assertIsInstance(err, UPnPError)
|
self.assertIsInstance(err, UPnPError)
|
||||||
self.assertTrue(str(err).startswith('too many bytes written'))
|
self.assertTrue(str(err).startswith('too many bytes written'))
|
||||||
|
|
||||||
async def test_scpd_get_overrun_unspecified_content_length(self):
|
|
||||||
sent = []
|
|
||||||
replies = {self.get_request: self.bad_response2}
|
|
||||||
with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent):
|
|
||||||
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
|
|
||||||
self.assertDictEqual({}, result)
|
|
||||||
self.assertEqual(self.bad_response2.decode(), raw.decode())
|
|
||||||
self.assertIsInstance(err, UPnPError)
|
|
||||||
self.assertTrue(str(err).endswith('with unspecified content length'))
|
|
||||||
|
|
||||||
async def test_scpd_duplicate_header(self):
|
|
||||||
sent = []
|
|
||||||
replies = {self.get_request: self.bad_response3}
|
|
||||||
with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent):
|
|
||||||
result, raw, err = await scpd_get(self.path, self.lan_address, self.port, self.loop)
|
|
||||||
self.assertDictEqual({}, result)
|
|
||||||
self.assertTrue(self.bad_response3.startswith(raw))
|
|
||||||
self.assertIsInstance(err, UPnPError)
|
|
||||||
self.assertEqual("duplicate headers", str(err))
|
|
||||||
|
|
||||||
|
|
||||||
class TestSCPDPost(AsyncioTestCase):
|
class TestSCPDPost(AsyncioTestCase):
|
||||||
param_names: list = []
|
param_names: list = []
|
||||||
|
@ -244,7 +202,7 @@ class TestSCPDPost(AsyncioTestCase):
|
||||||
self.assertEqual(b'', raw)
|
self.assertEqual(b'', raw)
|
||||||
self.assertDictEqual({}, result)
|
self.assertDictEqual({}, result)
|
||||||
|
|
||||||
async def test_scpd_connection_error(self):
|
async def test_scpd_post_connection_error(self):
|
||||||
sent = []
|
sent = []
|
||||||
replies = {}
|
replies = {}
|
||||||
with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent, raise_connectionerror=True):
|
with mock_tcp_and_udp(self.loop, tcp_replies=replies, sent_tcp_packets=sent, raise_connectionerror=True):
|
||||||
|
@ -256,14 +214,6 @@ class TestSCPDPost(AsyncioTestCase):
|
||||||
self.assertEqual(b'', raw)
|
self.assertEqual(b'', raw)
|
||||||
self.assertDictEqual({}, result)
|
self.assertDictEqual({}, result)
|
||||||
|
|
||||||
result, raw, err = await scpd_get(
|
|
||||||
self.path, self.gateway_address, self.port
|
|
||||||
)
|
|
||||||
self.assertIsInstance(err, UPnPError)
|
|
||||||
self.assertEqual('ConnectionRefusedError()', str(err))
|
|
||||||
self.assertEqual(b'', raw)
|
|
||||||
self.assertDictEqual({}, result)
|
|
||||||
|
|
||||||
async def test_scpd_post_bad_xml_response(self):
|
async def test_scpd_post_bad_xml_response(self):
|
||||||
sent = []
|
sent = []
|
||||||
replies = {self.post_bytes: self.bad_envelope_response}
|
replies = {self.post_bytes: self.bad_envelope_response}
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import json
|
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from aioupnp.fault import UPnPError
|
from aioupnp.fault import UPnPError
|
||||||
from aioupnp.protocols.m_search_patterns import packet_generator
|
from aioupnp.protocols.m_search_patterns import packet_generator
|
||||||
|
@ -51,16 +50,6 @@ class TestSSDP(AsyncioTestCase):
|
||||||
with self.assertRaises(UPnPError):
|
with self.assertRaises(UPnPError):
|
||||||
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop)
|
await m_search("10.0.0.2", "10.0.0.1", self.successful_args, timeout=1, loop=self.loop)
|
||||||
|
|
||||||
async def test_ssdp_pretty_print(self):
|
|
||||||
self.assertEqual(
|
|
||||||
json.dumps({
|
|
||||||
"HOST": "239.255.255.250:1900",
|
|
||||||
"MAN": "ssdp:discover",
|
|
||||||
"MX": 1,
|
|
||||||
"ST": "urn:schemas-upnp-org:device:WANDevice:1"
|
|
||||||
}, indent=2), str(self.query_packet)
|
|
||||||
)
|
|
||||||
|
|
||||||
async def test_m_search_reply_unicast(self):
|
async def test_m_search_reply_unicast(self):
|
||||||
replies = {
|
replies = {
|
||||||
(self.query_packet.encode().encode(), ("10.0.0.1", 1900)): self.reply_packet.encode().encode()
|
(self.query_packet.encode().encode(), ("10.0.0.1", 1900)): self.reply_packet.encode().encode()
|
||||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -28,15 +28,6 @@ class TestSSDPDatagram(unittest.TestCase):
|
||||||
with self.assertRaises(UPnPError):
|
with self.assertRaises(UPnPError):
|
||||||
SSDPDatagram.decode(packet)
|
SSDPDatagram.decode(packet)
|
||||||
|
|
||||||
packet = \
|
|
||||||
b'M-SEARCH * HTTP/1.1\r\n' \
|
|
||||||
b'Host: 239.255.255.250:1900\r\n' \
|
|
||||||
b'MX: 5\r\n' \
|
|
||||||
b'\r\n'
|
|
||||||
|
|
||||||
with self.assertRaises(UPnPError):
|
|
||||||
SSDPDatagram.decode(packet)
|
|
||||||
|
|
||||||
def test_fail_to_decode_blank(self):
|
def test_fail_to_decode_blank(self):
|
||||||
packet = b''
|
packet = b''
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ import unittest
|
||||||
from aioupnp.device import CaseInsensitive
|
from aioupnp.device import CaseInsensitive
|
||||||
|
|
||||||
|
|
||||||
class _TestService(CaseInsensitive):
|
class TestService(CaseInsensitive):
|
||||||
serviceType = None
|
serviceType = None
|
||||||
serviceId = None
|
serviceId = None
|
||||||
controlURL = None
|
controlURL = None
|
||||||
|
@ -12,14 +12,14 @@ class _TestService(CaseInsensitive):
|
||||||
|
|
||||||
class TestCaseInsensitive(unittest.TestCase):
|
class TestCaseInsensitive(unittest.TestCase):
|
||||||
def test_initialize(self):
|
def test_initialize(self):
|
||||||
s = _TestService(
|
s = TestService(
|
||||||
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
|
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
|
||||||
)
|
)
|
||||||
self.assertEqual('test', getattr(s, 'serviceType'))
|
self.assertEqual('test', getattr(s, 'serviceType'))
|
||||||
self.assertEqual('test', getattr(s, 'servicetype'))
|
self.assertEqual('test', getattr(s, 'servicetype'))
|
||||||
self.assertEqual('test', getattr(s, 'SERVICETYPE'))
|
self.assertEqual('test', getattr(s, 'SERVICETYPE'))
|
||||||
|
|
||||||
s = _TestService(
|
s = TestService(
|
||||||
servicetype="test", serviceid="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
|
servicetype="test", serviceid="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
|
||||||
)
|
)
|
||||||
self.assertEqual('test', getattr(s, 'serviceType'))
|
self.assertEqual('test', getattr(s, 'serviceType'))
|
||||||
|
@ -35,7 +35,7 @@ class TestCaseInsensitive(unittest.TestCase):
|
||||||
}, s.as_dict())
|
}, s.as_dict())
|
||||||
|
|
||||||
def test_set_attr(self):
|
def test_set_attr(self):
|
||||||
s = _TestService(
|
s = TestService(
|
||||||
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
|
serviceType="test", serviceId="test id", controlURL="/test", eventSubURL="/test2", SCPDURL="/test3"
|
||||||
)
|
)
|
||||||
self.assertEqual('test', getattr(s, 'serviceType'))
|
self.assertEqual('test', getattr(s, 'serviceType'))
|
||||||
|
|
|
@ -62,7 +62,7 @@ Get the external ip address from the gateway
|
||||||
"""
|
"""
|
||||||
|
|
||||||
expected_add_port_mapping_usage = """aioupnp [-h] [--debug_logging] add_port_mapping [--external_port=<int>] [--protocol=<str>]
|
expected_add_port_mapping_usage = """aioupnp [-h] [--debug_logging] add_port_mapping [--external_port=<int>] [--protocol=<str>]
|
||||||
[--internal_port=<int>] [--lan_address=<str>] [--description=<str>] [--lease_time=<int>]
|
[--internal_port=<int>] [--lan_address=<str>] [--description=<str>]
|
||||||
|
|
||||||
Add a new port mapping
|
Add a new port mapping
|
||||||
|
|
||||||
|
@ -71,13 +71,12 @@ Add a new port mapping
|
||||||
:param internal_port: (int) internal port
|
:param internal_port: (int) internal port
|
||||||
:param lan_address: (str) internal lan address
|
:param lan_address: (str) internal lan address
|
||||||
:param description: (str) mapping description
|
:param description: (str) mapping description
|
||||||
:param lease_time: (int) lease time in seconds
|
|
||||||
:return: None
|
:return: None
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
expected_get_next_mapping_usage = """aioupnp [-h] [--debug_logging] get_next_mapping [--port=<int>] [--protocol=<str>]
|
expected_get_next_mapping_usage = """aioupnp [-h] [--debug_logging] get_next_mapping [--port=<int>] [--protocol=<str>]
|
||||||
[--description=<str>] [--internal_port=<typing.Union[int, NoneType]>] [--lease_time=<int>]
|
[--description=<str>] [--internal_port=<typing.Union[int, NoneType]>]
|
||||||
|
|
||||||
Get a new port mapping. If the requested port is not available, increment until the next free port is mapped
|
Get a new port mapping. If the requested port is not available, increment until the next free port is mapped
|
||||||
|
|
||||||
|
@ -85,7 +84,6 @@ Get a new port mapping. If the requested port is not available, increment until
|
||||||
:param protocol: (str) UDP | TCP
|
:param protocol: (str) UDP | TCP
|
||||||
:param description: (str) mapping description
|
:param description: (str) mapping description
|
||||||
:param internal_port: (int) internal port
|
:param internal_port: (int) internal port
|
||||||
:param lease_time: (int) lease time in seconds
|
|
||||||
|
|
||||||
:return: (int) mapped port
|
:return: (int) mapped port
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,8 @@
|
||||||
import os
|
|
||||||
import json
|
|
||||||
from collections import OrderedDict
|
|
||||||
from aioupnp.fault import UPnPError
|
from aioupnp.fault import UPnPError
|
||||||
from tests import AsyncioTestCase, mock_tcp_and_udp
|
from tests import AsyncioTestCase, mock_tcp_and_udp
|
||||||
|
from collections import OrderedDict
|
||||||
from aioupnp.gateway import Gateway, get_action_list
|
from aioupnp.gateway import Gateway, get_action_list
|
||||||
from aioupnp.serialization.ssdp import SSDPDatagram
|
from aioupnp.serialization.ssdp import SSDPDatagram
|
||||||
from aioupnp.serialization.soap import serialize_soap_post
|
|
||||||
from aioupnp.upnp import UPnP
|
|
||||||
|
|
||||||
|
|
||||||
def gen_get_bytes(location: str, host: str) -> bytes:
|
def gen_get_bytes(location: str, host: str) -> bytes:
|
||||||
|
@ -300,76 +296,3 @@ class TestDiscoverNetgearNighthawkAC2350(TestDiscoverDLinkDIR890L):
|
||||||
'RequestConnection', 'ForceTermination',
|
'RequestConnection', 'ForceTermination',
|
||||||
'GetStatusInfo', 'GetNATRSIPStatus']},
|
'GetStatusInfo', 'GetNATRSIPStatus']},
|
||||||
'soap_requests': []}
|
'soap_requests': []}
|
||||||
|
|
||||||
|
|
||||||
class TestActiontec(AsyncioTestCase):
|
|
||||||
name = "Actiontec GT784WN"
|
|
||||||
_location_key = 'Location'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def data_path(self):
|
|
||||||
return os.path.join(os.path.dirname(os.path.abspath(__file__)), "replays", self.name)
|
|
||||||
|
|
||||||
def _get_location(self):
|
|
||||||
# return self.gateway_info['reply']['Location'].split(self.gateway_address)[-1]
|
|
||||||
return self.gateway_info['reply'][self._location_key].split(f"{self.gateway_address}:{self.gateway_info['soap_port']}")[-1]
|
|
||||||
|
|
||||||
def setUp(self) -> None:
|
|
||||||
with open(self.data_path, 'r') as f:
|
|
||||||
data = json.loads(f.read())
|
|
||||||
self.gateway_info = data['gateway']
|
|
||||||
self.client_address = data['client_address']
|
|
||||||
self.gateway_address = self.gateway_info['gateway_address']
|
|
||||||
self.udp_replies = {
|
|
||||||
(SSDPDatagram('M-SEARCH', self.gateway_info['m_search_args']).encode().encode(), ("239.255.255.250", 1900)): SSDPDatagram("OK", self.gateway_info['reply']).encode().encode()
|
|
||||||
}
|
|
||||||
self.tcp_replies = {
|
|
||||||
(
|
|
||||||
f"GET {path} HTTP/1.1\r\n"
|
|
||||||
f"Accept-Encoding: gzip\r\n"
|
|
||||||
f"Host: {self.gateway_info['gateway_address']}\r\n"
|
|
||||||
f"Connection: Close\r\n"
|
|
||||||
f"\r\n"
|
|
||||||
).encode(): xml_bytes.encode()
|
|
||||||
for path, xml_bytes in self.gateway_info['service_descriptors'].items()
|
|
||||||
}
|
|
||||||
self.tcp_replies.update({
|
|
||||||
(
|
|
||||||
f"GET {self._get_location()} HTTP/1.1\r\n"
|
|
||||||
f"Accept-Encoding: gzip\r\n"
|
|
||||||
f"Host: {self.gateway_info['gateway_address']}\r\n"
|
|
||||||
f"Connection: Close\r\n"
|
|
||||||
f"\r\n"
|
|
||||||
).encode(): self.gateway_info['gateway_xml'].encode()
|
|
||||||
})
|
|
||||||
self.registered_soap_commands = self.gateway_info['registered_soap_commands']
|
|
||||||
super().setUp()
|
|
||||||
|
|
||||||
async def setup_request_replay(self, u: UPnP):
|
|
||||||
for method, reqs in self.gateway_info['soap_requests'].items():
|
|
||||||
if not reqs:
|
|
||||||
continue
|
|
||||||
self.tcp_replies.update({
|
|
||||||
serialize_soap_post(
|
|
||||||
method, list(args.keys()), self.registered_soap_commands[method].encode(),
|
|
||||||
self.gateway_address.encode(), u.gateway.services[self.registered_soap_commands[method]].controlURL.encode()
|
|
||||||
): response.encode() for args, response in reqs
|
|
||||||
})
|
|
||||||
|
|
||||||
async def replay(self, u: UPnP):
|
|
||||||
self.assertEqual('11.222.33.111', await u.get_external_ip())
|
|
||||||
|
|
||||||
async def test_replay(self):
|
|
||||||
with mock_tcp_and_udp(self.loop, udp_replies=self.udp_replies, tcp_replies=self.tcp_replies, udp_expected_addr=self.gateway_address, tcp_chunk_size=1450):
|
|
||||||
u = await UPnP.discover(lan_address=self.client_address, gateway_address=self.gateway_address, loop=self.loop)
|
|
||||||
await self.setup_request_replay(u)
|
|
||||||
await self.replay(u)
|
|
||||||
|
|
||||||
|
|
||||||
class TestNewMediaNet(TestActiontec):
|
|
||||||
name = "NewMedia-NET GmbH Generic X86"
|
|
||||||
|
|
||||||
async def replay(self, u: UPnP):
|
|
||||||
self.assertEqual('11.222.33.111', await u.get_external_ip())
|
|
||||||
await u.get_redirects()
|
|
||||||
# print(await u.get_next_mapping(4567, 'UDP', 'aioupnp test mapping'))
|
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
from collections import OrderedDict
|
|
||||||
from aioupnp import interfaces
|
|
||||||
from aioupnp.fault import UPnPError
|
from aioupnp.fault import UPnPError
|
||||||
from aioupnp.upnp import UPnP
|
from aioupnp.upnp import UPnP
|
||||||
from tests import AsyncioTestCase
|
from tests import AsyncioTestCase
|
||||||
|
@ -31,26 +29,22 @@ class mock_netifaces:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def ifaddresses(interface):
|
def ifaddresses(interface):
|
||||||
return {
|
return {
|
||||||
17: [
|
"test0": {
|
||||||
{
|
17: [
|
||||||
"addr": "01:02:03:04:05:06",
|
{
|
||||||
"broadcast": "ff:ff:ff:ff:ff:ff"
|
"addr": "01:02:03:04:05:06",
|
||||||
}
|
"broadcast": "ff:ff:ff:ff:ff:ff"
|
||||||
],
|
}
|
||||||
2: [
|
],
|
||||||
{
|
2: [
|
||||||
"addr": "192.168.1.2",
|
{
|
||||||
"netmask": "255.255.255.0",
|
"addr": "192.168.1.2",
|
||||||
"broadcast": "192.168.1.255"
|
"netmask": "255.255.255.0",
|
||||||
}
|
"broadcast": "192.168.1.255"
|
||||||
],
|
}
|
||||||
}
|
],
|
||||||
|
},
|
||||||
|
}[interface]
|
||||||
class mock_netifaces_extra_interface(mock_netifaces):
|
|
||||||
@staticmethod
|
|
||||||
def interfaces():
|
|
||||||
return ['lo', 'test0', 'test1']
|
|
||||||
|
|
||||||
|
|
||||||
class TestParseInterfaces(AsyncioTestCase):
|
class TestParseInterfaces(AsyncioTestCase):
|
||||||
|
@ -74,16 +68,3 @@ class TestParseInterfaces(AsyncioTestCase):
|
||||||
else:
|
else:
|
||||||
self.assertTrue(False)
|
self.assertTrue(False)
|
||||||
self.assertEqual(len(checked), 1)
|
self.assertEqual(len(checked), 1)
|
||||||
|
|
||||||
def test_guess_gateway(self):
|
|
||||||
# handle edge case where netifaces gives more interfaces than it does gateways
|
|
||||||
with mock.patch('aioupnp.interfaces.get_netifaces') as patch:
|
|
||||||
patch.return_value = mock_netifaces_extra_interface
|
|
||||||
self.assertDictEqual(
|
|
||||||
OrderedDict(
|
|
||||||
[
|
|
||||||
('test0', ('192.168.1.1', '192.168.1.2')),
|
|
||||||
('test1', ('192.168.1.1', '192.168.1.2')),
|
|
||||||
('default', ('192.168.1.1', '192.168.1.2'))
|
|
||||||
]), interfaces.get_interfaces()
|
|
||||||
)
|
|
||||||
|
|
|
@ -51,42 +51,6 @@ class TestGetExternalIPAddress(UPnPCommandTestCase):
|
||||||
external_ip = await upnp.get_external_ip()
|
external_ip = await upnp.get_external_ip()
|
||||||
self.assertEqual("11.222.3.44", external_ip)
|
self.assertEqual("11.222.3.44", external_ip)
|
||||||
|
|
||||||
async def test_handle_got_ipv6(self):
|
|
||||||
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
|
|
||||||
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 392 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>2a02:2e02:30c:db00:4a8d:36ff:feb0:5202</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
|
|
||||||
self.addCleanup(self.replies.pop, request)
|
|
||||||
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
|
|
||||||
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
|
|
||||||
await gateway.discover_commands()
|
|
||||||
upnp = UPnP(self.client_address, self.gateway_address, gateway)
|
|
||||||
with self.assertRaises(UPnPError) as err:
|
|
||||||
await upnp.get_external_ip()
|
|
||||||
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
|
|
||||||
|
|
||||||
async def test_handle_got_invalid_ip(self):
|
|
||||||
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
|
|
||||||
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 360 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>potato</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
|
|
||||||
self.addCleanup(self.replies.pop, request)
|
|
||||||
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
|
|
||||||
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
|
|
||||||
await gateway.discover_commands()
|
|
||||||
upnp = UPnP(self.client_address, self.gateway_address, gateway)
|
|
||||||
with self.assertRaises(UPnPError) as err:
|
|
||||||
await upnp.get_external_ip()
|
|
||||||
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
|
|
||||||
|
|
||||||
async def test_handle_got_carrier_nat_ip(self):
|
|
||||||
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
|
|
||||||
self.replies.update({request: b"HTTP/1.1 200 OK\r\nServer: WebServer\r\nDate: Wed, 22 May 2019 03:25:57 GMT\r\nConnection: close\r\nCONTENT-TYPE: text/xml; charset=\"utf-8\"\r\nCONTENT-LENGTH: 365 \r\nEXT:\r\n\r\n<?xml version=\"1.0\"?>\n<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\" s:encodingStyle=\"http://schemas.xmlsoap.org/soap/encoding/\">\n\t<s:Body>\n\t\t<u:GetExternalIPAddressResponse xmlns:u=\"urn:schemas-upnp-org:service:WANIPConnection:1\">\n<NewExternalIPAddress>192.88.99.2</NewExternalIPAddress>\n</u:GetExternalIPAddressResponse>\n\t</s:Body>\n</s:Envelope>\n"})
|
|
||||||
self.addCleanup(self.replies.pop, request)
|
|
||||||
with mock_tcp_and_udp(self.loop, tcp_replies=self.replies):
|
|
||||||
gateway = Gateway(self.reply, self.client_address, self.gateway_address, loop=self.loop)
|
|
||||||
await gateway.discover_commands()
|
|
||||||
upnp = UPnP(self.client_address, self.gateway_address, gateway)
|
|
||||||
with self.assertRaises(UPnPError) as err:
|
|
||||||
await upnp.get_external_ip()
|
|
||||||
self.assertTrue(str(err).startswith("Got invalid external ipv4 address"))
|
|
||||||
|
|
||||||
async def test_null_external_ip(self):
|
async def test_null_external_ip(self):
|
||||||
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
|
request = b'POST /soap.cgi?service=WANIPConn1 HTTP/1.1\r\nHost: 11.2.3.4\r\nUser-Agent: python3/aioupnp, UPnP/1.0, MiniUPnPc/1.9\r\nContent-Length: 285\r\nContent-Type: text/xml\r\nSOAPAction: "urn:schemas-upnp-org:service:WANIPConnection:1#GetExternalIPAddress"\r\nConnection: Close\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n\r\n<?xml version="1.0"?>\r\n<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"><s:Body><u:GetExternalIPAddress xmlns:u="urn:schemas-upnp-org:service:WANIPConnection:1"></u:GetExternalIPAddress></s:Body></s:Envelope>\r\n'
|
||||||
self.replies.update({
|
self.replies.update({
|
||||||
|
|
Loading…
Reference in a new issue