Bug [#2070] where blob_get RPC timed out.

Both stream.downloader and blob_exchange.downloader paths are adding the fixed_peers list to the DHT node.
Tested jsonrpc_blob_get daemon call.

Bug [#2070] where blob_get RPC timed out.

Both stream.downloader and blob_exchange.downloader paths are adding the fixed_peers list to the DHT node.
Tested jsonrpc_blob_get daemon call.
This commit is contained in:
Cristian Vicas 2021-07-20 16:50:24 +03:00 committed by Victor Shyba
parent 6eeabb1a1a
commit 90aaf64b77
4 changed files with 54 additions and 16 deletions

View file

@ -3,6 +3,7 @@ import typing
import logging import logging
from lbry.utils import cache_concurrent from lbry.utils import cache_concurrent
from lbry.blob_exchange.client import request_blob from lbry.blob_exchange.client import request_blob
from lbry.dht.node import get_kademlia_peers_from_hosts
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.dht.node import Node from lbry.dht.node import Node
@ -133,11 +134,14 @@ class BlobDownloader:
protocol.close() protocol.close()
async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', node: 'Node', async def download_blob(loop, config: 'Config', blob_manager: 'BlobManager', dht_node: 'Node',
blob_hash: str) -> 'AbstractBlob': blob_hash: str) -> 'AbstractBlob':
search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download) search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
search_queue.put_nowait(blob_hash) search_queue.put_nowait(blob_hash)
peer_queue, accumulate_task = node.accumulate_peers(search_queue) peer_queue, accumulate_task = dht_node.accumulate_peers(search_queue)
fixed_peers = None if not config.fixed_peers else await get_kademlia_peers_from_hosts(config.fixed_peers)
if fixed_peers:
loop.call_later(config.fixed_peer_delay, peer_queue.put_nowait, fixed_peers)
downloader = BlobDownloader(loop, config, blob_manager, peer_queue) downloader = BlobDownloader(loop, config, blob_manager, peer_queue)
try: try:
return await downloader.download_blob(blob_hash) return await downloader.download_blob(blob_hash)

View file

@ -257,3 +257,10 @@ class Node:
) -> typing.Tuple[asyncio.Queue, asyncio.Task]: ) -> typing.Tuple[asyncio.Queue, asyncio.Task]:
queue = peer_queue or asyncio.Queue(loop=self.loop) queue = peer_queue or asyncio.Queue(loop=self.loop)
return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue)) return queue, self.loop.create_task(self._accumulate_peers_for_value(search_queue, queue))
async def get_kademlia_peers_from_hosts(peer_list: typing.List[typing.Tuple[str, int]]) -> typing.List['KademliaPeer']:
peer_address_list = [(await resolve_host(url, port, proto='tcp'), port) for url, port in peer_list]
kademlia_peer_list = [make_kademlia_peer(None, address, None, tcp_port=port, allow_localhost=True)
for address, port in peer_address_list]
return kademlia_peer_list

View file

@ -3,9 +3,9 @@ import typing
import logging import logging
import binascii import binascii
from lbry.dht.peer import make_kademlia_peer from lbry.dht.node import get_kademlia_peers_from_hosts
from lbry.error import DownloadSDTimeoutError from lbry.error import DownloadSDTimeoutError
from lbry.utils import resolve_host, lru_cache_concurrent from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader from lbry.blob_exchange.downloader import BlobDownloader
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -48,26 +48,19 @@ class StreamDownloader:
self.cached_read_blob = cached_read_blob self.cached_read_blob = cached_read_blob
async def add_fixed_peers(self): async def add_fixed_peers(self):
def _delayed_add_fixed_peers(): def _add_fixed_peers(fixed_peers):
self.peer_queue.put_nowait(fixed_peers)
self.added_fixed_peers = True self.added_fixed_peers = True
self.peer_queue.put_nowait([
make_kademlia_peer(None, address, None, tcp_port=port, allow_localhost=True)
for address, port in addresses
])
if not self.config.fixed_peers: if not self.config.fixed_peers:
return return
addresses = [
(await resolve_host(url, port, proto='tcp'), port)
for url, port in self.config.fixed_peers
]
if 'dht' in self.config.components_to_skip or not self.node or not \ if 'dht' in self.config.components_to_skip or not self.node or not \
len(self.node.protocol.routing_table.get_peers()) > 0: len(self.node.protocol.routing_table.get_peers()) > 0:
self.fixed_peers_delay = 0.0 self.fixed_peers_delay = 0.0
else: else:
self.fixed_peers_delay = self.config.fixed_peer_delay self.fixed_peers_delay = self.config.fixed_peer_delay
fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers)
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers) self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)
async def load_descriptor(self, connection_id: int = 0): async def load_descriptor(self, connection_id: int = 0):
# download or get the sd blob # download or get the sd blob

View file

@ -1,19 +1,22 @@
import asyncio import asyncio
import tempfile import tempfile
from io import BytesIO from io import BytesIO
from unittest import mock
import shutil import shutil
import os import os
import copy
from lbry.blob_exchange.serialization import BlobRequest from lbry.blob_exchange.serialization import BlobRequest
from lbry.testcase import AsyncioTestCase from lbry.testcase import AsyncioTestCase
from lbry.conf import Config from lbry.conf import Config
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.extras.daemon.daemon import Daemon
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.blob_exchange.server import BlobServer, BlobServerProtocol from lbry.blob_exchange.server import BlobServer, BlobServerProtocol
from lbry.blob_exchange.client import request_blob from lbry.blob_exchange.client import request_blob
from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht.peer import PeerManager, make_kademlia_peer
from lbry.dht.node import Node
# import logging # import logging
# logging.getLogger("lbry").setLevel(logging.DEBUG) # logging.getLogger("lbry").setLevel(logging.DEBUG)
@ -326,3 +329,34 @@ class TestBlobExchange(BlobExchangeTestBase):
with self.assertRaises(asyncio.CancelledError): with self.assertRaises(asyncio.CancelledError):
await request_blob(self.loop, client_blob, self.server_from_client.address, await request_blob(self.loop, client_blob, self.server_from_client.address,
self.server_from_client.tcp_port, 2, 3) self.server_from_client.tcp_port, 2, 3)
async def test_download_blob_using_jsonrpc_blob_get(self):
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"
mock_blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
await self._add_blob_to_server(blob_hash, mock_blob_bytes)
# setup RPC Daemon
daemon_config = copy.deepcopy(self.client_config)
daemon_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)]
daemon = Daemon(daemon_config)
mock_node = mock.Mock(spec=Node)
def _mock_accumulate_peers(q1, q2=None):
async def _task():
pass
q2 = q2 or asyncio.Queue(loop=self.loop)
return q2, self.loop.create_task(_task())
mock_node.accumulate_peers = _mock_accumulate_peers
with mock.patch('lbry.extras.daemon.componentmanager.ComponentManager.all_components_running',
return_value=True):
with mock.patch('lbry.extras.daemon.daemon.Daemon.dht_node', new_callable=mock.PropertyMock) \
as daemon_mock_dht:
with mock.patch('lbry.extras.daemon.daemon.Daemon.blob_manager', new_callable=mock.PropertyMock) \
as daemon_mock_blob_manager:
daemon_mock_dht.return_value = mock_node
daemon_mock_blob_manager.return_value = self.client_blob_manager
result = await daemon.jsonrpc_blob_get(blob_hash, read=True)
self.assertIsNotNone(result)
self.assertEqual(mock_blob_bytes.decode(), result, "Downloaded blob is different than server blob")