forked from LBRYCommunity/lbry-sdk
Merge pull request #1986 from lbryio/fix-timeout-no-peers
fix default download timeout not being set
This commit is contained in:
commit
8106eb1fa5
4 changed files with 129 additions and 32 deletions
|
@ -45,7 +45,8 @@ def _event_properties(installation_id: str, session_id: str,
|
||||||
def _download_properties(conf: Config, external_ip: str, resolve_duration: float,
|
def _download_properties(conf: Config, external_ip: str, resolve_duration: float,
|
||||||
total_duration: typing.Optional[float], download_id: str, name: str,
|
total_duration: typing.Optional[float], download_id: str, name: str,
|
||||||
outpoint: str, active_peer_count: int, tried_peers_count: int,
|
outpoint: str, active_peer_count: int, tried_peers_count: int,
|
||||||
sd_hash: str, sd_download_duration: typing.Optional[float] = None,
|
added_fixed_peers: bool, fixed_peer_delay: float, sd_hash: str,
|
||||||
|
sd_download_duration: typing.Optional[float] = None,
|
||||||
head_blob_hash: typing.Optional[str] = None,
|
head_blob_hash: typing.Optional[str] = None,
|
||||||
head_blob_length: typing.Optional[int] = None,
|
head_blob_length: typing.Optional[int] = None,
|
||||||
head_blob_download_duration: typing.Optional[float] = None,
|
head_blob_download_duration: typing.Optional[float] = None,
|
||||||
|
@ -63,9 +64,8 @@ def _download_properties(conf: Config, external_ip: str, resolve_duration: float
|
||||||
"peer_connect_timeout": conf.peer_connect_timeout,
|
"peer_connect_timeout": conf.peer_connect_timeout,
|
||||||
"blob_download_timeout": conf.blob_download_timeout,
|
"blob_download_timeout": conf.blob_download_timeout,
|
||||||
"use_fixed_peers": len(conf.reflector_servers) > 0,
|
"use_fixed_peers": len(conf.reflector_servers) > 0,
|
||||||
"fixed_peer_delay": conf.fixed_peer_delay,
|
"fixed_peer_delay": fixed_peer_delay,
|
||||||
"added_fixed_peers": (conf.fixed_peer_delay < total_duration) and len(conf.reflector_servers) > 0,
|
"added_fixed_peers": added_fixed_peers,
|
||||||
|
|
||||||
"active_peer_count": active_peer_count,
|
"active_peer_count": active_peer_count,
|
||||||
"tried_peers_count": tried_peers_count,
|
"tried_peers_count": tried_peers_count,
|
||||||
|
|
||||||
|
@ -173,7 +173,8 @@ class AnalyticsManager:
|
||||||
async def send_time_to_first_bytes(self, resolve_duration: typing.Optional[float],
|
async def send_time_to_first_bytes(self, resolve_duration: typing.Optional[float],
|
||||||
total_duration: typing.Optional[float], download_id: str,
|
total_duration: typing.Optional[float], download_id: str,
|
||||||
name: str, outpoint: str, found_peers_count: int,
|
name: str, outpoint: str, found_peers_count: int,
|
||||||
tried_peers_count: int, sd_hash: str,
|
tried_peers_count: int, added_fixed_peers: bool,
|
||||||
|
fixed_peers_delay: float, sd_hash: str,
|
||||||
sd_download_duration: typing.Optional[float] = None,
|
sd_download_duration: typing.Optional[float] = None,
|
||||||
head_blob_hash: typing.Optional[str] = None,
|
head_blob_hash: typing.Optional[str] = None,
|
||||||
head_blob_length: typing.Optional[int] = None,
|
head_blob_length: typing.Optional[int] = None,
|
||||||
|
@ -181,8 +182,8 @@ class AnalyticsManager:
|
||||||
error: typing.Optional[str] = None):
|
error: typing.Optional[str] = None):
|
||||||
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
|
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
|
||||||
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
|
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
|
||||||
found_peers_count, tried_peers_count, sd_hash, sd_download_duration, head_blob_hash, head_blob_length,
|
found_peers_count, tried_peers_count, added_fixed_peers, fixed_peers_delay, sd_hash,
|
||||||
head_blob_duration, error
|
sd_download_duration, head_blob_hash, head_blob_length, head_blob_duration, error
|
||||||
)))
|
)))
|
||||||
|
|
||||||
async def send_download_finished(self, download_id, name, sd_hash):
|
async def send_download_finished(self, download_id, name, sd_hash):
|
||||||
|
|
|
@ -36,6 +36,8 @@ class StreamDownloader(StreamAssembler):
|
||||||
self.node: typing.Optional['Node'] = None
|
self.node: typing.Optional['Node'] = None
|
||||||
self.assemble_task: typing.Optional[asyncio.Task] = None
|
self.assemble_task: typing.Optional[asyncio.Task] = None
|
||||||
self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None
|
self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None
|
||||||
|
self.fixed_peers_delay: typing.Optional[float] = None
|
||||||
|
self.added_fixed_peers = False
|
||||||
|
|
||||||
async def setup(self): # start the peer accumulator and initialize the downloader
|
async def setup(self): # start the peer accumulator and initialize the downloader
|
||||||
if self.blob_downloader:
|
if self.blob_downloader:
|
||||||
|
@ -82,16 +84,22 @@ class StreamDownloader(StreamAssembler):
|
||||||
(await resolve_host(url, port + 1, proto='tcp'), port)
|
(await resolve_host(url, port + 1, proto='tcp'), port)
|
||||||
for url, port in self.config.reflector_servers
|
for url, port in self.config.reflector_servers
|
||||||
]
|
]
|
||||||
delay = self.config.fixed_peer_delay if (
|
|
||||||
'dht' not in self.config.components_to_skip
|
def _delayed_add_fixed_peers():
|
||||||
and self.node and len(self.node.protocol.routing_table.get_peers())
|
self.added_fixed_peers = True
|
||||||
) else 0.0
|
|
||||||
self.loop.call_later(delay, lambda:
|
|
||||||
self.peer_queue.put_nowait([
|
self.peer_queue.put_nowait([
|
||||||
KademliaPeer(self.loop, address=address, tcp_port=port + 1)
|
KademliaPeer(self.loop, address=address, tcp_port=port + 1)
|
||||||
for address, port in addresses
|
for address, port in addresses
|
||||||
]))
|
])
|
||||||
if self.config.reflector_servers:
|
|
||||||
|
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers)
|
||||||
|
if not self.config.reflector_servers:
|
||||||
|
return
|
||||||
|
if 'dht' in self.config.components_to_skip or not self.node or not \
|
||||||
|
len(self.node.protocol.routing_table.get_peers()):
|
||||||
|
self.fixed_peers_delay = 0.0
|
||||||
|
else:
|
||||||
|
self.fixed_peers_delay = self.config.fixed_peer_delay
|
||||||
self.loop.create_task(_add_fixed_peers())
|
self.loop.create_task(_add_fixed_peers())
|
||||||
|
|
||||||
def download(self, node: typing.Optional['Node'] = None):
|
def download(self, node: typing.Optional['Node'] = None):
|
||||||
|
|
|
@ -390,9 +390,8 @@ class StreamManager:
|
||||||
self.streams.add(stream)
|
self.streams.add(stream)
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def _download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
async def _download_stream_from_uri(self, uri, timeout: float, exchange_rate_manager: 'ExchangeRateManager',
|
||||||
file_name: typing.Optional[str] = None,
|
file_name: typing.Optional[str] = None) -> ManagedStream:
|
||||||
timeout: typing.Optional[float] = None) -> ManagedStream:
|
|
||||||
start_time = self.loop.time()
|
start_time = self.loop.time()
|
||||||
parsed_uri = parse_lbry_uri(uri)
|
parsed_uri = parse_lbry_uri(uri)
|
||||||
if parsed_uri.is_channel:
|
if parsed_uri.is_channel:
|
||||||
|
@ -483,6 +482,8 @@ class StreamManager:
|
||||||
resolved_time, self.loop.time() - start_time, download_id, parse_lbry_uri(uri).name, outpoint,
|
resolved_time, self.loop.time() - start_time, download_id, parse_lbry_uri(uri).name, outpoint,
|
||||||
None if not stream else len(stream.downloader.blob_downloader.active_connections),
|
None if not stream else len(stream.downloader.blob_downloader.active_connections),
|
||||||
None if not stream else len(stream.downloader.blob_downloader.scores),
|
None if not stream else len(stream.downloader.blob_downloader.scores),
|
||||||
|
False if not downloader else downloader.added_fixed_peers,
|
||||||
|
self.config.fixed_peer_delay if not downloader else downloader.fixed_peers_delay,
|
||||||
claim.source_hash.decode(), time_to_descriptor,
|
claim.source_hash.decode(), time_to_descriptor,
|
||||||
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
|
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
|
||||||
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
|
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
|
||||||
|
@ -496,14 +497,14 @@ class StreamManager:
|
||||||
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
||||||
file_name: typing.Optional[str] = None,
|
file_name: typing.Optional[str] = None,
|
||||||
timeout: typing.Optional[float] = None) -> ManagedStream:
|
timeout: typing.Optional[float] = None) -> ManagedStream:
|
||||||
|
timeout = timeout or self.config.download_timeout
|
||||||
if uri in self.starting_streams:
|
if uri in self.starting_streams:
|
||||||
return await self.starting_streams[uri]
|
return await self.starting_streams[uri]
|
||||||
fut = asyncio.Future(loop=self.loop)
|
fut = asyncio.Future(loop=self.loop)
|
||||||
self.starting_streams[uri] = fut
|
self.starting_streams[uri] = fut
|
||||||
try:
|
try:
|
||||||
stream = await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout)
|
stream = await self._download_stream_from_uri(uri, timeout, exchange_rate_manager, file_name)
|
||||||
fut.set_result(stream)
|
fut.set_result(stream)
|
||||||
return stream
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
fut.set_exception(err)
|
fut.set_exception(err)
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -14,18 +14,23 @@ from lbrynet.extras.daemon.analytics import AnalyticsManager
|
||||||
from lbrynet.stream.stream_manager import StreamManager
|
from lbrynet.stream.stream_manager import StreamManager
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
|
from lbrynet.dht.protocol.protocol import KademliaProtocol
|
||||||
|
from lbrynet.dht.protocol.routing_table import TreeRoutingTable
|
||||||
from lbrynet.schema.claim import ClaimDict
|
from lbrynet.schema.claim import ClaimDict
|
||||||
|
|
||||||
|
|
||||||
def get_mock_node(peer):
|
def get_mock_node(peer=None):
|
||||||
def mock_accumulate_peers(q1: asyncio.Queue, q2: asyncio.Queue):
|
def mock_accumulate_peers(q1: asyncio.Queue, q2: asyncio.Queue):
|
||||||
async def _task():
|
async def _task():
|
||||||
pass
|
pass
|
||||||
|
if peer:
|
||||||
q2.put_nowait([peer])
|
q2.put_nowait([peer])
|
||||||
return q2, asyncio.create_task(_task())
|
return q2, asyncio.create_task(_task())
|
||||||
|
|
||||||
mock_node = mock.Mock(spec=Node)
|
mock_node = mock.Mock(spec=Node)
|
||||||
|
mock_node.protocol = mock.Mock(spec=KademliaProtocol)
|
||||||
|
mock_node.protocol.routing_table = mock.Mock(spec=TreeRoutingTable)
|
||||||
|
mock_node.protocol.routing_table.get_peers = lambda: []
|
||||||
mock_node.accumulate_peers = mock_accumulate_peers
|
mock_node.accumulate_peers = mock_accumulate_peers
|
||||||
mock_node.joined = asyncio.Event()
|
mock_node.joined = asyncio.Event()
|
||||||
mock_node.joined.set()
|
mock_node.joined.set()
|
||||||
|
@ -113,24 +118,106 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
binascii.hexlify(generate_id()).decode()))
|
binascii.hexlify(generate_id()).decode()))
|
||||||
self.exchange_rate_manager = get_dummy_exchange_rate_manager(time)
|
self.exchange_rate_manager = get_dummy_exchange_rate_manager(time)
|
||||||
|
|
||||||
async def test_time_to_first_bytes(self):
|
async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
checked_post = False
|
if after_setup:
|
||||||
|
after_setup()
|
||||||
|
checked_analytics_event = False
|
||||||
|
|
||||||
async def check_post(event):
|
async def _check_post(event):
|
||||||
|
check_post(event)
|
||||||
|
nonlocal checked_analytics_event
|
||||||
|
checked_analytics_event = True
|
||||||
|
|
||||||
|
self.stream_manager.analytics_manager._post = _check_post
|
||||||
|
if error:
|
||||||
|
with self.assertRaises(error):
|
||||||
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||||
|
else:
|
||||||
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||||
|
await asyncio.sleep(0, loop=self.loop)
|
||||||
|
self.assertTrue(checked_analytics_event)
|
||||||
|
|
||||||
|
async def test_time_to_first_bytes(self):
|
||||||
|
def check_post(event):
|
||||||
self.assertEqual(event['event'], 'Time To First Bytes')
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
||||||
total_duration = event['properties']['total_duration']
|
total_duration = event['properties']['total_duration']
|
||||||
resolve_duration = event['properties']['resolve_duration']
|
resolve_duration = event['properties']['resolve_duration']
|
||||||
head_blob_duration = event['properties']['head_blob_duration']
|
head_blob_duration = event['properties']['head_blob_duration']
|
||||||
sd_blob_duration = event['properties']['sd_blob_duration']
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
||||||
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
||||||
self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration)
|
self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration)
|
||||||
nonlocal checked_post
|
|
||||||
checked_post = True
|
|
||||||
|
|
||||||
self.stream_manager.analytics_manager._post = check_post
|
await self._test_time_to_first_bytes(check_post)
|
||||||
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
|
||||||
await asyncio.sleep(0, loop=self.loop)
|
async def test_fixed_peer_delay_dht_peers_found(self):
|
||||||
self.assertTrue(checked_post)
|
self.client_config.reflector_servers = [(self.server_from_client.address, self.server_from_client.tcp_port - 1)]
|
||||||
|
server_from_client = None
|
||||||
|
self.server_from_client, server_from_client = server_from_client, self.server_from_client
|
||||||
|
|
||||||
|
def after_setup():
|
||||||
|
self.stream_manager.node.protocol.routing_table.get_peers = lambda: [server_from_client]
|
||||||
|
|
||||||
|
def check_post(event):
|
||||||
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
||||||
|
total_duration = event['properties']['total_duration']
|
||||||
|
resolve_duration = event['properties']['resolve_duration']
|
||||||
|
head_blob_duration = event['properties']['head_blob_duration']
|
||||||
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
||||||
|
|
||||||
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
||||||
|
self.assertEqual(event['properties']['tried_peers_count'], 1)
|
||||||
|
self.assertEqual(event['properties']['active_peer_count'], 1)
|
||||||
|
self.assertEqual(event['properties']['use_fixed_peers'], True)
|
||||||
|
self.assertEqual(event['properties']['added_fixed_peers'], True)
|
||||||
|
self.assertEqual(event['properties']['fixed_peer_delay'], self.client_config.fixed_peer_delay)
|
||||||
|
self.assertGreaterEqual(total_duration, resolve_duration + head_blob_duration + sd_blob_duration)
|
||||||
|
|
||||||
|
await self._test_time_to_first_bytes(check_post, after_setup=after_setup)
|
||||||
|
|
||||||
|
async def test_override_fixed_peer_delay_dht_disabled(self):
|
||||||
|
self.client_config.reflector_servers = [(self.server_from_client.address, self.server_from_client.tcp_port - 1)]
|
||||||
|
self.client_config.components_to_skip = ['dht', 'hash_announcer']
|
||||||
|
self.client_config.fixed_peer_delay = 9001.0
|
||||||
|
self.server_from_client = None
|
||||||
|
|
||||||
|
def check_post(event):
|
||||||
|
total_duration = event['properties']['total_duration']
|
||||||
|
resolve_duration = event['properties']['resolve_duration']
|
||||||
|
head_blob_duration = event['properties']['head_blob_duration']
|
||||||
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
||||||
|
|
||||||
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
||||||
|
self.assertEqual(event['properties']['tried_peers_count'], 1)
|
||||||
|
self.assertEqual(event['properties']['active_peer_count'], 1)
|
||||||
|
self.assertEqual(event['properties']['use_fixed_peers'], True)
|
||||||
|
self.assertEqual(event['properties']['added_fixed_peers'], True)
|
||||||
|
self.assertEqual(event['properties']['fixed_peer_delay'], 0.0)
|
||||||
|
self.assertGreaterEqual(total_duration, resolve_duration + head_blob_duration + sd_blob_duration)
|
||||||
|
|
||||||
|
start = self.loop.time()
|
||||||
|
await self._test_time_to_first_bytes(check_post)
|
||||||
|
self.assertTrue(self.loop.time() - start < 3)
|
||||||
|
|
||||||
|
async def test_no_peers_timeout(self):
|
||||||
|
# FIXME: the download should ideally fail right away if there are no peers
|
||||||
|
# to initialize the shortlist and fixed peers are disabled
|
||||||
|
self.server_from_client = None
|
||||||
|
self.client_config.download_timeout = 3.0
|
||||||
|
|
||||||
|
def check_post(event):
|
||||||
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
||||||
|
self.assertEqual(event['properties']['error'], 'DownloadSDTimeout')
|
||||||
|
self.assertEqual(event['properties']['tried_peers_count'], None)
|
||||||
|
self.assertEqual(event['properties']['active_peer_count'], None)
|
||||||
|
self.assertEqual(event['properties']['use_fixed_peers'], False)
|
||||||
|
self.assertEqual(event['properties']['added_fixed_peers'], False)
|
||||||
|
self.assertEqual(event['properties']['fixed_peer_delay'], None)
|
||||||
|
|
||||||
|
start = self.loop.time()
|
||||||
|
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout)
|
||||||
|
duration = self.loop.time() - start
|
||||||
|
self.assertTrue(4.0 >= duration >= 3.0)
|
||||||
|
|
||||||
async def test_download_stop_resume_delete(self):
|
async def test_download_stop_resume_delete(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
|
|
Loading…
Reference in a new issue