137 lines
5.4 KiB
Python
137 lines
5.4 KiB
Python
import asyncio
|
|
import contextlib
|
|
import socket
|
|
import mock
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mock_datagram_endpoint_factory(loop, expected_addr, replies=None, delay_reply=0.0, sent_packets=None):
|
|
sent_packets = sent_packets if sent_packets is not None else []
|
|
replies = replies or {}
|
|
|
|
def sendto(p: asyncio.DatagramProtocol):
|
|
def _sendto(data, addr):
|
|
sent_packets.append(data)
|
|
if (data, addr) in replies:
|
|
loop.call_later(delay_reply, p.datagram_received, replies[(data, addr)], (expected_addr, 1900))
|
|
return _sendto
|
|
|
|
async def create_datagram_endpoint(proto_lam, sock=None):
|
|
protocol = proto_lam()
|
|
transport = asyncio.DatagramTransport(extra={'socket': mock_sock})
|
|
transport.close = lambda: mock_sock.close()
|
|
mock_sock.sendto = sendto(protocol)
|
|
transport.sendto = mock_sock.sendto
|
|
protocol.connection_made(transport)
|
|
return transport, protocol
|
|
|
|
with mock.patch('socket.socket') as mock_socket:
|
|
mock_sock = mock.Mock(spec=socket.socket)
|
|
mock_sock.setsockopt = lambda *_: None
|
|
mock_sock.bind = lambda *_: None
|
|
mock_sock.setblocking = lambda *_: None
|
|
mock_sock.getsockname = lambda: "0.0.0.0"
|
|
mock_sock.getpeername = lambda: ""
|
|
mock_sock.close = lambda: None
|
|
mock_sock.type = socket.SOCK_DGRAM
|
|
mock_sock.fileno = lambda: 7
|
|
|
|
mock_socket.return_value = mock_sock
|
|
loop.create_datagram_endpoint = create_datagram_endpoint
|
|
yield
|
|
|
|
@contextlib.contextmanager
|
|
def mock_tcp_endpoint_factory(loop, replies=None, delay_reply=0.0, sent_packets=None):
|
|
sent_packets = sent_packets if sent_packets is not None else []
|
|
replies = replies or {}
|
|
|
|
def write(p: asyncio.Protocol):
|
|
def _write(data):
|
|
sent_packets.append(data)
|
|
if data in replies:
|
|
loop.call_later(delay_reply, p.data_received, replies[data])
|
|
return _write
|
|
|
|
async def create_connection(protocol_factory, host=None, port=None):
|
|
protocol = protocol_factory()
|
|
transport = asyncio.Transport(extra={'socket': mock_sock})
|
|
transport.close = lambda: mock_sock.close()
|
|
mock_sock.write = write(protocol)
|
|
transport.write = mock_sock.write
|
|
protocol.connection_made(transport)
|
|
return transport, protocol
|
|
|
|
with mock.patch('socket.socket') as mock_socket:
|
|
mock_sock = mock.Mock(spec=socket.socket)
|
|
mock_sock.setsockopt = lambda *_: None
|
|
mock_sock.bind = lambda *_: None
|
|
mock_sock.setblocking = lambda *_: None
|
|
mock_sock.getsockname = lambda: "0.0.0.0"
|
|
mock_sock.getpeername = lambda: ""
|
|
mock_sock.close = lambda: None
|
|
mock_sock.type = socket.SOCK_STREAM
|
|
mock_sock.fileno = lambda: 7
|
|
|
|
mock_socket.return_value = mock_sock
|
|
loop.create_connection = create_connection
|
|
yield
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mock_tcp_and_udp(loop, udp_expected_addr, udp_replies=None, udp_delay_reply=0.0, sent_udp_packets=None,
|
|
tcp_replies=None, tcp_delay_reply=0.0, tcp_sent_packets=None):
|
|
sent_udp_packets = sent_udp_packets if sent_udp_packets is not None else []
|
|
udp_replies = udp_replies or {}
|
|
|
|
tcp_sent_packets = tcp_sent_packets if tcp_sent_packets is not None else []
|
|
tcp_replies = tcp_replies or {}
|
|
|
|
async def create_connection(protocol_factory, host=None, port=None):
|
|
def write(p: asyncio.Protocol):
|
|
def _write(data):
|
|
tcp_sent_packets.append(data)
|
|
if data in tcp_replies:
|
|
loop.call_later(tcp_delay_reply, p.data_received, tcp_replies[data])
|
|
|
|
return _write
|
|
|
|
protocol = protocol_factory()
|
|
transport = asyncio.Transport(extra={'socket': mock.Mock(spec=socket.socket)})
|
|
transport.close = lambda: None
|
|
transport.write = write(protocol)
|
|
protocol.connection_made(transport)
|
|
return transport, protocol
|
|
|
|
async def create_datagram_endpoint(proto_lam, sock=None):
|
|
def sendto(p: asyncio.DatagramProtocol):
|
|
def _sendto(data, addr):
|
|
sent_udp_packets.append(data)
|
|
if (data, addr) in udp_replies:
|
|
loop.call_later(udp_delay_reply, p.datagram_received, udp_replies[(data, addr)],
|
|
(udp_expected_addr, 1900))
|
|
|
|
return _sendto
|
|
|
|
protocol = proto_lam()
|
|
transport = asyncio.DatagramTransport(extra={'socket': mock_sock})
|
|
transport.close = lambda: mock_sock.close()
|
|
mock_sock.sendto = sendto(protocol)
|
|
transport.sendto = mock_sock.sendto
|
|
protocol.connection_made(transport)
|
|
return transport, protocol
|
|
|
|
with mock.patch('socket.socket') as mock_socket:
|
|
mock_sock = mock.Mock(spec=socket.socket)
|
|
mock_sock.setsockopt = lambda *_: None
|
|
mock_sock.bind = lambda *_: None
|
|
mock_sock.setblocking = lambda *_: None
|
|
mock_sock.getsockname = lambda: "0.0.0.0"
|
|
mock_sock.getpeername = lambda: ""
|
|
mock_sock.close = lambda: None
|
|
mock_sock.type = socket.SOCK_DGRAM
|
|
mock_sock.fileno = lambda: 7
|
|
|
|
mock_socket.return_value = mock_sock
|
|
loop.create_datagram_endpoint = create_datagram_endpoint
|
|
loop.create_connection = create_connection
|
|
yield
|