From 69bcb052dede8877713371f220c6a8563071abf7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 25 Jan 2019 13:10:40 -0500 Subject: [PATCH] fix blob server protocol factory --- lbrynet/blob_exchange/server.py | 16 ++++- .../unit/blob_exchange/test_transfer_blob.py | 72 ++++++++++++++++++- 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/lbrynet/blob_exchange/server.py b/lbrynet/blob_exchange/server.py index 2f6c8b5c1..b080e8139 100644 --- a/lbrynet/blob_exchange/server.py +++ b/lbrynet/blob_exchange/server.py @@ -13,7 +13,7 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -class BlobServer(asyncio.Protocol): +class BlobServerProtocol(asyncio.Protocol): def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', lbrycrd_address: str): self.loop = loop self.blob_manager = blob_manager @@ -87,12 +87,24 @@ class BlobServer(asyncio.Protocol): return self.loop.create_task(self.handle_request(request)) + +class BlobServer: + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', lbrycrd_address: str): + self.loop = loop + self.blob_manager = blob_manager + self.server_task: asyncio.Task = None + self.started_listening = asyncio.Event(loop=self.loop) + self.lbrycrd_address = lbrycrd_address + def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): if self.server_task is not None: raise Exception("already running") async def _start_server(): - server = await self.loop.create_server(lambda: self, interface, port) + server = await self.loop.create_server( + lambda: BlobServerProtocol(self.loop, self.blob_manager, self.lbrycrd_address), + interface, port + ) self.started_listening.set() log.info("Blob server listening on TCP %s:%i", interface, port) async with server: diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index 1aa0c6b27..c3bd576ff 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -42,7 +42,7 @@ class BlobExchangeTestBase(AsyncioTestCase): class TestBlobExchange(BlobExchangeTestBase): - async def _test_transfer_blob(self, blob_hash: str, blob_bytes: bytes): + async def _add_blob_to_server(self, blob_hash: str, blob_bytes: bytes): # add the blob on the server server_blob = self.server_blob_manager.get_blob(blob_hash, len(blob_bytes)) writer = server_blob.open_for_writing() @@ -51,6 +51,7 @@ class TestBlobExchange(BlobExchangeTestBase): self.assertTrue(os.path.isfile(server_blob.file_path)) self.assertEqual(server_blob.get_is_verified(), True) + async def _test_transfer_blob(self, blob_hash: str): client_blob = self.client_blob_manager.get_blob(blob_hash) protocol = BlobExchangeClientProtocol(self.loop, 2) @@ -64,9 +65,74 @@ class TestBlobExchange(BlobExchangeTestBase): async def test_transfer_sd_blob(self): sd_hash = "3e2706157a59aaa47ef52bc264fce488078b4026c0b9bab649a8f2fe1ecc5e5cad7182a2bb7722460f856831a1ac0f02" mock_sd_blob_bytes = b"""{"blobs": [{"blob_hash": "6f53c72de100f6f007aa1b9720632e2d049cc6049e609ad790b556dba262159f739d5a14648d5701afc84b991254206a", "blob_num": 0, "iv": "3b6110c2d8e742bff66e4314863dee7e", "length": 2097152}, {"blob_hash": "18493bc7c5164b00596153859a0faffa45765e47a6c3f12198a4f7be4658111505b7f8a15ed0162306a0672c4a9b505d", "blob_num": 1, "iv": "df973fa64e73b4ff2677d682cdc32d3e", "length": 2097152}, {"blob_num": 2, "iv": "660d2dc2645da7c7d4540a466fcb0c60", "length": 0}], "key": "6465616462656566646561646265656664656164626565666465616462656566", "stream_hash": "22423c6786584974bd6b462af47ecb03e471da0ef372fe85a4e71a78bef7560c4afb0835c689f03916105404653b7bdf", "stream_name": "746573745f66696c65", "stream_type": "lbryfile", "suggested_file_name": "746573745f66696c65"}""" - return await self._test_transfer_blob(sd_hash, mock_sd_blob_bytes) + await self._add_blob_to_server(sd_hash, mock_sd_blob_bytes) + return await self._test_transfer_blob(sd_hash) async def test_transfer_blob(self): blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" mock_blob_bytes = b'1' * ((2 * 2 ** 20) - 1) - return await self._test_transfer_blob(blob_hash, mock_blob_bytes) + await self._add_blob_to_server(blob_hash, mock_blob_bytes) + return await self._test_transfer_blob(blob_hash) + + async def test_host_same_blob_to_multiple_peers_at_once(self): + blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" + mock_blob_bytes = b'1' * ((2 * 2 ** 20) - 1) + + second_client_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, second_client_dir) + + second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite")) + second_client_blob_manager = BlobFileManager(self.loop, second_client_dir, second_client_storage) + server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + + await second_client_storage.open() + await second_client_blob_manager.setup() + + await self._add_blob_to_server(blob_hash, mock_blob_bytes) + + second_client_blob = self.client_blob_manager.get_blob(blob_hash) + protocol = BlobExchangeClientProtocol(self.loop, 2) + + # download the blob + await asyncio.gather( + request_blob( + self.loop, second_client_blob, protocol, server_from_second_client.address, + server_from_second_client.tcp_port, 2 + ), + self._test_transfer_blob(blob_hash) + ) + await protocol.close() + self.assertEqual(second_client_blob.get_is_verified(), True) + + async def test_host_different_blobs_to_multiple_peers_at_once(self): + blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" + mock_blob_bytes = b'1' * ((2 * 2 ** 20) - 1) + + sd_hash = "3e2706157a59aaa47ef52bc264fce488078b4026c0b9bab649a8f2fe1ecc5e5cad7182a2bb7722460f856831a1ac0f02" + mock_sd_blob_bytes = b"""{"blobs": [{"blob_hash": "6f53c72de100f6f007aa1b9720632e2d049cc6049e609ad790b556dba262159f739d5a14648d5701afc84b991254206a", "blob_num": 0, "iv": "3b6110c2d8e742bff66e4314863dee7e", "length": 2097152}, {"blob_hash": "18493bc7c5164b00596153859a0faffa45765e47a6c3f12198a4f7be4658111505b7f8a15ed0162306a0672c4a9b505d", "blob_num": 1, "iv": "df973fa64e73b4ff2677d682cdc32d3e", "length": 2097152}, {"blob_num": 2, "iv": "660d2dc2645da7c7d4540a466fcb0c60", "length": 0}], "key": "6465616462656566646561646265656664656164626565666465616462656566", "stream_hash": "22423c6786584974bd6b462af47ecb03e471da0ef372fe85a4e71a78bef7560c4afb0835c689f03916105404653b7bdf", "stream_name": "746573745f66696c65", "stream_type": "lbryfile", "suggested_file_name": "746573745f66696c65"}""" + + second_client_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, second_client_dir) + + second_client_storage = SQLiteStorage(Config(), os.path.join(second_client_dir, "lbrynet.sqlite")) + second_client_blob_manager = BlobFileManager(self.loop, second_client_dir, second_client_storage) + server_from_second_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + + await second_client_storage.open() + await second_client_blob_manager.setup() + + await self._add_blob_to_server(blob_hash, mock_blob_bytes) + await self._add_blob_to_server(sd_hash, mock_sd_blob_bytes) + + second_client_blob = self.client_blob_manager.get_blob(blob_hash) + protocol = BlobExchangeClientProtocol(self.loop, 2) + + await asyncio.gather( + request_blob( + self.loop, second_client_blob, protocol, server_from_second_client.address, + server_from_second_client.tcp_port, 2 + ), + self._test_transfer_blob(sd_hash) + ) + await protocol.close() + self.assertEqual(second_client_blob.get_is_verified(), True)