From a2953ac9ac289784246c2622c88638ac9866ed56 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 26 Jun 2019 16:28:13 -0400 Subject: [PATCH] test session bloat from timed out sockets --- .../test_wallet_server_sessions.py | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 lbry/tests/integration/test_wallet_server_sessions.py diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py new file mode 100644 index 000000000..0ba1914cf --- /dev/null +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -0,0 +1,132 @@ +import asyncio +import socket +import time +import logging +from unittest.mock import Mock +from torba.testcase import IntegrationTestCase, Conductor +import lbry.wallet +from lbry.schema.claim import Claim +from lbry.wallet.transaction import Transaction, Output +from lbry.wallet.dewies import dewies_to_lbc as d2l, lbc_to_dewies as l2d + + +log = logging.getLogger(__name__) +def wrap_callback_event(fn, callback): + def inner(*a, **kw): + callback() + return fn(*a, **kw) + return inner + + +class TestSessionBloat(IntegrationTestCase): + """ + ERROR:asyncio:Fatal read error on socket transport + protocol: + transport: <_SelectorSocketTransport fd=3236 read=polling write=> + Traceback (most recent call last): + File "/usr/lib/python3.7/asyncio/selector_events.py", line 801, in _read_ready__data_received + data = self._sock.recv(self.max_size) + TimeoutError: [Errno 110] Connection timed out + """ + + LEDGER = lbry.wallet + + async def asyncSetUp(self): + self.conductor = Conductor( + ledger_module=self.LEDGER, manager_module=self.MANAGER, verbosity=self.VERBOSITY + ) + await self.conductor.start_blockchain() + self.addCleanup(self.conductor.stop_blockchain) + + await self.conductor.start_spv() + + self.session_manager = self.conductor.spv_node.server.session_mgr + self.session_manager.servers['TCP'].sockets[0].setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 64) + self.session_manager.servers['TCP'].sockets[0].setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 64) + + self.addCleanup(self.conductor.stop_spv) + await self.conductor.start_wallet() + self.addCleanup(self.conductor.stop_wallet) + + self.client_session = list(self.session_manager.sessions)[0] + self.client_session.transport.set_write_buffer_limits(0, 0) + + self.paused_session = asyncio.Event(loop=self.loop) + self.resumed_session = asyncio.Event(loop=self.loop) + + def paused(): + self.resumed_session.clear() + self.paused_session.set() + + def delayed_resume(): + self.paused_session.clear() + + time.sleep(1) + self.resumed_session.set() + + self.client_session.pause_writing = wrap_callback_event(self.client_session.pause_writing, paused) + self.client_session.resume_writing = wrap_callback_event(self.client_session.resume_writing, delayed_resume) + + self.blockchain = self.conductor.blockchain_node + self.wallet_node = self.conductor.wallet_node + self.manager = self.wallet_node.manager + self.ledger = self.wallet_node.ledger + self.wallet = self.wallet_node.wallet + self.account = self.wallet_node.wallet.default_account + + async def test_session_bloat_from_socket_timeout(self): + await self.account.ensure_address_gap() + + address1, address2 = await self.account.receiving.get_addresses(limit=2, only_usable=True) + sendtxid1 = await self.blockchain.send_to_address(address1, 5) + sendtxid2 = await self.blockchain.send_to_address(address2, 5) + + await self.blockchain.generate(1) + await asyncio.wait([ + self.on_transaction_id(sendtxid1), + self.on_transaction_id(sendtxid2) + ]) + + self.assertEqual(d2l(await self.account.get_balance()), '10.0') + + channel = Claim() + channel_txo = Output.pay_claim_name_pubkey_hash( + l2d('1.0'), '@bar', channel, self.account.ledger.address_to_hash160(address1) + ) + channel_txo.generate_channel_private_key() + channel_txo.script.generate() + channel_tx = await Transaction.create([], [channel_txo], [self.account], self.account) + + stream = Claim() + stream.stream.description = "0" * 8000 + stream_txo = Output.pay_claim_name_pubkey_hash( + l2d('1.0'), 'foo', stream, self.account.ledger.address_to_hash160(address1) + ) + stream_tx = await Transaction.create([], [stream_txo], [self.account], self.account) + stream_txo.sign(channel_txo) + await stream_tx.sign([self.account]) + self.paused_session.clear() + self.resumed_session.clear() + + await self.broadcast(channel_tx) + await self.broadcast(stream_tx) + await asyncio.wait_for(self.paused_session.wait(), 2) + self.assertEqual(1, len(self.session_manager.sessions)) + + real_sock = self.client_session.transport._extra.pop('socket') + mock_sock = Mock(spec=socket.socket) + + for attr in dir(real_sock): + if not attr.startswith('__'): + setattr(mock_sock, attr, getattr(real_sock, attr)) + + def recv(*a, **kw): + raise TimeoutError("[Errno 110] Connection timed out") + + mock_sock.recv = recv + self.client_session.transport._sock = mock_sock + self.client_session.transport._extra['socket'] = mock_sock + self.assertFalse(self.resumed_session.is_set()) + self.assertFalse(self.session_manager.session_event.is_set()) + await self.session_manager.session_event.wait() + self.assertEqual(0, len(self.session_manager.sessions))