From 90aaf64b779e2d8e4b1fd9c31bc3a963897a360d Mon Sep 17 00:00:00 2001 From: Cristian Vicas Date: Tue, 20 Jul 2021 16:50:24 +0300 Subject: [PATCH] Bug [#2070] where blob_get RPC timed out. Both stream.downloader and blob_exchange.downloader paths are adding the fixed_peers list to the DHT node. Tested jsonrpc_blob_get daemon call. Bug [#2070] where blob_get RPC timed out. Both stream.downloader and blob_exchange.downloader paths are adding the fixed_peers list to the DHT node. Tested jsonrpc_blob_get daemon call. --- lbry/blob_exchange/downloader.py | 8 +++-- lbry/dht/node.py | 7 ++++ lbry/stream/downloader.py | 19 ++++------ .../unit/blob_exchange/test_transfer_blob.py | 36 ++++++++++++++++++- 4 files changed, 54 insertions(+), 16 deletions(-) diff --git a/lbry/blob_exchange/downloader.py b/lbry/blob_exchange/downloader.py index 8584bb464..5a8c90a27 100644 --- a/lbry/blob_exchange/downloader.py +++ b/lbry/blob_exchange/downloader.py @@ -3,6 +3,7 @@ import typing import logging from lbry.utils import cache_concurrent from lbry.blob_exchange.client import request_blob +from lbry.dht.node import get_kademlia_peers_from_hosts if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.dht.node import Node @@ -133,11 +134,14 @@ class BlobDownloader: protocol.close() -async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node', +async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', dht_node: 'Node', blob_hash: str) -> 'AbstractBlob': search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download) search_queue.put_nowait(blob_hash) - peer_queue, accumulate_task = node.accumulate_peers(search_queue) + peer_queue, accumulate_task = dht_node.accumulate_peers(search_queue) + fixed_peers = None if not config.fixed_peers else await get_kademlia_peers_from_hosts(config.fixed_peers) + if fixed_peers: + loop.call_later(config.fixed_peer_delay, peer_queue.put_nowait, fixed_peers) downloader = BlobDownloader(loop, config, blob_manager, peer_queue) try: return await downloader.download_blob(blob_hash) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index d44ce5cd9..de278a5f5 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -257,3 +257,10 @@ class Node: ) -> typing.Tuple[asyncio.Queue, asyncio.Task]: queue = peer_queue or asyncio.Queue(loop=self.loop) return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue)) + + +async def get_kademlia_peers_from_hosts(peer_list: typing.List[typing.Tuple[str, int]]) -> typing.List['KademliaPeer']: + peer_address_list = [(await resolve_host(url, port, proto='tcp'), port) for url, port in peer_list] + kademlia_peer_list = [make_kademlia_peer(None, address, None, tcp_port=port, allow_localhost=True) + for address, port in peer_address_list] + return kademlia_peer_list diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 4e6bf1641..83ff67fef 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -3,9 +3,9 @@ import typing import logging import binascii -from lbry.dht.peer import make_kademlia_peer +from lbry.dht.node import get_kademlia_peers_from_hosts from lbry.error import DownloadSDTimeoutError -from lbry.utils import resolve_host, lru_cache_concurrent +from lbry.utils import lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader if typing.TYPE_CHECKING: @@ -48,26 +48,19 @@ class StreamDownloader: self.cached_read_blob = cached_read_blob async def add_fixed_peers(self): - def _delayed_add_fixed_peers(): + def _add_fixed_peers(fixed_peers): + self.peer_queue.put_nowait(fixed_peers) self.added_fixed_peers = True - self.peer_queue.put_nowait([ - make_kademlia_peer(None, address, None, tcp_port=port, allow_localhost=True) - for address, port in addresses - ]) if not self.config.fixed_peers: return - addresses = [ - (await resolve_host(url, port, proto='tcp'), port) - for url, port in self.config.fixed_peers - ] if 'dht' in self.config.components_to_skip or not self.node or not \ len(self.node.protocol.routing_table.get_peers()) > 0: self.fixed_peers_delay = 0.0 else: self.fixed_peers_delay = self.config.fixed_peer_delay - - self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers) + fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers) + self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers) async def load_descriptor(self, connection_id: int = 0): # download or get the sd blob diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index 5bdf2aac2..a50a89e54 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -1,19 +1,22 @@ import asyncio import tempfile from io import BytesIO +from unittest import mock import shutil import os +import copy from lbry.blob_exchange.serialization import BlobRequest from lbry.testcase import AsyncioTestCase from lbry.conf import Config from lbry.extras.daemon.storage import SQLiteStorage +from lbry.extras.daemon.daemon import Daemon from lbry.blob.blob_manager import BlobManager from lbry.blob_exchange.server import BlobServer, BlobServerProtocol from lbry.blob_exchange.client import request_blob from lbry.dht.peer import PeerManager, make_kademlia_peer - +from lbry.dht.node import Node # import logging # logging.getLogger("lbry").setLevel(logging.DEBUG) @@ -326,3 +329,34 @@ class TestBlobExchange(BlobExchangeTestBase): with self.assertRaises(asyncio.CancelledError): await request_blob(self.loop, client_blob, self.server_from_client.address, self.server_from_client.tcp_port, 2, 3) + + async def test_download_blob_using_jsonrpc_blob_get(self): + blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" + mock_blob_bytes = b'1' * ((2 * 2 ** 20) - 1) + await self._add_blob_to_server(blob_hash, mock_blob_bytes) + + # setup RPC Daemon + daemon_config = copy.deepcopy(self.client_config) + daemon_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)] + daemon = Daemon(daemon_config) + + mock_node = mock.Mock(spec=Node) + + def _mock_accumulate_peers(q1, q2=None): + async def _task(): + pass + q2 = q2 or asyncio.Queue(loop=self.loop) + return q2, self.loop.create_task(_task()) + + mock_node.accumulate_peers = _mock_accumulate_peers + with mock.patch('lbry.extras.daemon.componentmanager.ComponentManager.all_components_running', + return_value=True): + with mock.patch('lbry.extras.daemon.daemon.Daemon.dht_node', new_callable=mock.PropertyMock) \ + as daemon_mock_dht: + with mock.patch('lbry.extras.daemon.daemon.Daemon.blob_manager', new_callable=mock.PropertyMock) \ + as daemon_mock_blob_manager: + daemon_mock_dht.return_value = mock_node + daemon_mock_blob_manager.return_value = self.client_blob_manager + result = await daemon.jsonrpc_blob_get(blob_hash, read=True) + self.assertIsNotNone(result) + self.assertEqual(mock_blob_bytes.decode(), result, "Downloaded blob is different than server blob")