2019-02-06 13:08:57 -05:00
|
|
|
import os
|
2019-08-07 12:55:18 -04:00
|
|
|
import shutil
|
2019-02-06 13:08:57 -05:00
|
|
|
import binascii
|
|
|
|
from unittest import mock
|
|
|
|
import asyncio
|
|
|
|
import time
|
2019-03-10 21:55:33 -04:00
|
|
|
import json
|
2019-03-22 02:59:04 -04:00
|
|
|
from decimal import Decimal
|
2019-02-06 13:08:57 -05:00
|
|
|
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
|
|
|
|
from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager
|
2019-06-20 21:02:58 -04:00
|
|
|
from lbry.utils import generate_id
|
|
|
|
from lbry.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout, \
|
2019-03-10 21:55:33 -04:00
|
|
|
DownloadDataTimeout
|
2019-06-20 21:02:58 -04:00
|
|
|
from lbry.wallet.manager import LbryWalletManager
|
|
|
|
from lbry.extras.daemon.analytics import AnalyticsManager
|
|
|
|
from lbry.stream.stream_manager import StreamManager
|
|
|
|
from lbry.stream.descriptor import StreamDescriptor
|
|
|
|
from lbry.dht.node import Node
|
|
|
|
from lbry.dht.protocol.protocol import KademliaProtocol
|
|
|
|
from lbry.dht.protocol.routing_table import TreeRoutingTable
|
|
|
|
from lbry.schema.claim import Claim
|
2019-02-06 13:08:57 -05:00
|
|
|
|
|
|
|
|
2019-03-14 13:07:11 -04:00
|
|
|
def get_mock_node(peer=None):
|
2019-02-06 13:08:57 -05:00
|
|
|
def mock_accumulate_peers(q1: asyncio.Queue, q2: asyncio.Queue):
|
|
|
|
async def _task():
|
|
|
|
pass
|
2019-03-14 13:07:11 -04:00
|
|
|
if peer:
|
|
|
|
q2.put_nowait([peer])
|
2019-02-06 13:08:57 -05:00
|
|
|
return q2, asyncio.create_task(_task())
|
|
|
|
|
|
|
|
mock_node = mock.Mock(spec=Node)
|
2019-03-14 15:08:26 -04:00
|
|
|
mock_node.protocol = mock.Mock(spec=KademliaProtocol)
|
|
|
|
mock_node.protocol.routing_table = mock.Mock(spec=TreeRoutingTable)
|
|
|
|
mock_node.protocol.routing_table.get_peers = lambda: []
|
2019-02-06 13:08:57 -05:00
|
|
|
mock_node.accumulate_peers = mock_accumulate_peers
|
2019-02-14 18:19:01 -05:00
|
|
|
mock_node.joined = asyncio.Event()
|
|
|
|
mock_node.joined.set()
|
2019-02-06 13:08:57 -05:00
|
|
|
return mock_node
|
|
|
|
|
|
|
|
|
2019-02-08 21:08:41 -05:00
|
|
|
def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
2019-02-06 13:08:57 -05:00
|
|
|
claim = {
|
|
|
|
"address": "bYFeMtSL7ARuG1iMpjFyrnTe4oJHSAVNXF",
|
|
|
|
"amount": "0.1",
|
|
|
|
"claim_id": "c49566d631226492317d06ad7fdbe1ed32925124",
|
|
|
|
"claim_sequence": 1,
|
|
|
|
"decoded_claim": True,
|
2019-04-29 00:03:15 -03:00
|
|
|
"confirmations": 1057,
|
2019-02-06 13:08:57 -05:00
|
|
|
"effective_amount": "0.1",
|
|
|
|
"has_signature": False,
|
|
|
|
"height": 514081,
|
|
|
|
"hex": "",
|
|
|
|
"name": "33rpm",
|
|
|
|
"nout": 0,
|
|
|
|
"permanent_url": "33rpm#c49566d631226492317d06ad7fdbe1ed32925124",
|
|
|
|
"supports": [],
|
|
|
|
"txid": "81ac52662af926fdf639d56920069e0f63449d4cde074c61717cb99ddde40e3c",
|
|
|
|
}
|
2019-03-20 01:46:23 -04:00
|
|
|
claim_obj = Claim()
|
2019-02-08 21:08:41 -05:00
|
|
|
if fee:
|
2019-03-20 01:46:23 -04:00
|
|
|
if fee['currency'] == 'LBC':
|
|
|
|
claim_obj.stream.fee.lbc = Decimal(fee['amount'])
|
|
|
|
elif fee['currency'] == 'USD':
|
|
|
|
claim_obj.stream.fee.usd = Decimal(fee['amount'])
|
|
|
|
claim_obj.stream.title = "33rpm"
|
2019-03-29 20:41:18 -04:00
|
|
|
claim_obj.stream.languages.append("en")
|
2019-04-20 01:28:59 -04:00
|
|
|
claim_obj.stream.source.sd_hash = sd_hash
|
|
|
|
claim_obj.stream.source.media_type = "image/png"
|
2019-03-20 01:46:23 -04:00
|
|
|
claim['value'] = claim_obj
|
2019-04-21 12:27:52 -04:00
|
|
|
claim['protobuf'] = binascii.hexlify(claim_obj.to_bytes()).decode()
|
2019-02-06 13:08:57 -05:00
|
|
|
|
|
|
|
async def mock_resolve(*args):
|
|
|
|
await storage.save_claims([claim])
|
|
|
|
return {
|
|
|
|
claim['permanent_url']: claim
|
|
|
|
}
|
|
|
|
|
2019-09-19 19:39:44 +02:00
|
|
|
mock_wallet = mock.Mock(spec=LbryWalletManager)
|
2019-03-24 22:39:38 -04:00
|
|
|
mock_wallet.ledger.resolve = mock_resolve
|
2019-08-07 12:55:18 -04:00
|
|
|
mock_wallet.ledger.network.client.server = ('fakespv.lbry.com', 50001)
|
2019-02-08 21:08:41 -05:00
|
|
|
|
|
|
|
async def get_balance(*_):
|
|
|
|
return balance
|
|
|
|
|
|
|
|
mock_wallet.default_account.get_balance = get_balance
|
2019-02-06 13:08:57 -05:00
|
|
|
return mock_wallet, claim['permanent_url']
|
|
|
|
|
|
|
|
|
|
|
|
class TestStreamManager(BlobExchangeTestBase):
|
2019-02-14 18:19:01 -05:00
|
|
|
async def setup_stream_manager(self, balance=10.0, fee=None, old_sort=False):
|
2019-02-06 13:08:57 -05:00
|
|
|
file_path = os.path.join(self.server_dir, "test_file")
|
|
|
|
with open(file_path, 'wb') as f:
|
|
|
|
f.write(os.urandom(20000000))
|
2019-03-10 21:55:33 -04:00
|
|
|
descriptor = await StreamDescriptor.create_stream(
|
|
|
|
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
|
|
|
|
)
|
2019-02-14 18:19:01 -05:00
|
|
|
self.sd_hash = descriptor.sd_hash
|
2019-02-08 21:08:41 -05:00
|
|
|
self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
|
2019-02-06 13:08:57 -05:00
|
|
|
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
|
2019-03-01 14:48:49 -05:00
|
|
|
self.client_storage, get_mock_node(self.server_from_client),
|
|
|
|
AnalyticsManager(self.client_config,
|
|
|
|
binascii.hexlify(generate_id()).decode(),
|
|
|
|
binascii.hexlify(generate_id()).decode()))
|
2019-02-06 13:08:57 -05:00
|
|
|
self.exchange_rate_manager = get_dummy_exchange_rate_manager(time)
|
|
|
|
|
2019-03-14 15:08:26 -04:00
|
|
|
async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None):
|
2019-03-13 16:23:34 -04:00
|
|
|
await self.setup_stream_manager()
|
2019-03-14 15:08:26 -04:00
|
|
|
if after_setup:
|
|
|
|
after_setup()
|
|
|
|
checked_analytics_event = False
|
|
|
|
|
|
|
|
async def _check_post(event):
|
|
|
|
check_post(event)
|
|
|
|
nonlocal checked_analytics_event
|
|
|
|
checked_analytics_event = True
|
|
|
|
|
|
|
|
self.stream_manager.analytics_manager._post = _check_post
|
|
|
|
if error:
|
|
|
|
with self.assertRaises(error):
|
|
|
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
|
|
|
else:
|
|
|
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
|
|
|
await asyncio.sleep(0, loop=self.loop)
|
|
|
|
self.assertTrue(checked_analytics_event)
|
2019-03-13 16:23:34 -04:00
|
|
|
|
2019-03-14 15:08:26 -04:00
|
|
|
async def test_time_to_first_bytes(self):
|
|
|
|
def check_post(event):
|
2019-03-13 16:23:34 -04:00
|
|
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
|
|
|
total_duration = event['properties']['total_duration']
|
|
|
|
resolve_duration = event['properties']['resolve_duration']
|
|
|
|
head_blob_duration = event['properties']['head_blob_duration']
|
|
|
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
2019-03-14 15:08:26 -04:00
|
|
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
2019-08-07 12:55:18 -04:00
|
|
|
self.assertEqual(event['properties']['wallet_server'], "fakespv.lbry.com:50001")
|
2019-03-13 16:23:34 -04:00
|
|
|
self.assertTrue(total_duration >= resolve_duration + head_blob_duration + sd_blob_duration)
|
|
|
|
|
2019-03-14 15:08:26 -04:00
|
|
|
await self._test_time_to_first_bytes(check_post)
|
|
|
|
|
|
|
|
async def test_fixed_peer_delay_dht_peers_found(self):
|
|
|
|
self.client_config.reflector_servers = [(self.server_from_client.address, self.server_from_client.tcp_port - 1)]
|
|
|
|
server_from_client = None
|
|
|
|
self.server_from_client, server_from_client = server_from_client, self.server_from_client
|
|
|
|
|
|
|
|
def after_setup():
|
|
|
|
self.stream_manager.node.protocol.routing_table.get_peers = lambda: [server_from_client]
|
|
|
|
|
|
|
|
def check_post(event):
|
|
|
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
|
|
|
total_duration = event['properties']['total_duration']
|
|
|
|
resolve_duration = event['properties']['resolve_duration']
|
|
|
|
head_blob_duration = event['properties']['head_blob_duration']
|
|
|
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
|
|
|
|
|
|
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
|
|
|
self.assertEqual(event['properties']['tried_peers_count'], 1)
|
|
|
|
self.assertEqual(event['properties']['active_peer_count'], 1)
|
2019-08-05 11:56:01 -04:00
|
|
|
self.assertEqual(event['properties']['connection_failures_count'], 0)
|
2019-03-14 15:08:26 -04:00
|
|
|
self.assertEqual(event['properties']['use_fixed_peers'], True)
|
|
|
|
self.assertEqual(event['properties']['added_fixed_peers'], True)
|
|
|
|
self.assertEqual(event['properties']['fixed_peer_delay'], self.client_config.fixed_peer_delay)
|
|
|
|
self.assertGreaterEqual(total_duration, resolve_duration + head_blob_duration + sd_blob_duration)
|
|
|
|
|
|
|
|
await self._test_time_to_first_bytes(check_post, after_setup=after_setup)
|
2019-08-05 11:56:01 -04:00
|
|
|
|
|
|
|
async def test_tcp_connection_failure_analytics(self):
|
|
|
|
self.client_config.download_timeout = 3.0
|
|
|
|
|
|
|
|
def after_setup():
|
|
|
|
self.server.stop_server()
|
|
|
|
|
|
|
|
def check_post(event):
|
|
|
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
|
|
|
self.assertIsNone(event['properties']['head_blob_duration'])
|
|
|
|
self.assertIsNone(event['properties']['sd_blob_duration'])
|
|
|
|
self.assertFalse(event['properties']['added_fixed_peers'])
|
|
|
|
self.assertEqual(event['properties']['connection_failures_count'], 1)
|
2019-08-07 12:55:18 -04:00
|
|
|
self.assertEqual(
|
|
|
|
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout'
|
|
|
|
)
|
2019-08-05 11:56:01 -04:00
|
|
|
|
|
|
|
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout, after_setup=after_setup)
|
2019-03-14 15:08:26 -04:00
|
|
|
|
|
|
|
async def test_override_fixed_peer_delay_dht_disabled(self):
|
|
|
|
self.client_config.reflector_servers = [(self.server_from_client.address, self.server_from_client.tcp_port - 1)]
|
|
|
|
self.client_config.components_to_skip = ['dht', 'hash_announcer']
|
|
|
|
self.client_config.fixed_peer_delay = 9001.0
|
|
|
|
self.server_from_client = None
|
|
|
|
|
|
|
|
def check_post(event):
|
|
|
|
total_duration = event['properties']['total_duration']
|
|
|
|
resolve_duration = event['properties']['resolve_duration']
|
|
|
|
head_blob_duration = event['properties']['head_blob_duration']
|
|
|
|
sd_blob_duration = event['properties']['sd_blob_duration']
|
|
|
|
|
|
|
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
|
|
|
self.assertEqual(event['properties']['tried_peers_count'], 1)
|
|
|
|
self.assertEqual(event['properties']['active_peer_count'], 1)
|
|
|
|
self.assertEqual(event['properties']['use_fixed_peers'], True)
|
|
|
|
self.assertEqual(event['properties']['added_fixed_peers'], True)
|
|
|
|
self.assertEqual(event['properties']['fixed_peer_delay'], 0.0)
|
|
|
|
self.assertGreaterEqual(total_duration, resolve_duration + head_blob_duration + sd_blob_duration)
|
|
|
|
|
|
|
|
start = self.loop.time()
|
|
|
|
await self._test_time_to_first_bytes(check_post)
|
|
|
|
self.assertTrue(self.loop.time() - start < 3)
|
2019-03-13 16:23:34 -04:00
|
|
|
|
2019-03-14 13:07:11 -04:00
|
|
|
async def test_no_peers_timeout(self):
|
2019-03-14 15:08:26 -04:00
|
|
|
# FIXME: the download should ideally fail right away if there are no peers
|
|
|
|
# to initialize the shortlist and fixed peers are disabled
|
2019-03-14 13:07:11 -04:00
|
|
|
self.server_from_client = None
|
|
|
|
self.client_config.download_timeout = 3.0
|
|
|
|
|
2019-03-14 15:08:26 -04:00
|
|
|
def check_post(event):
|
2019-03-14 13:07:11 -04:00
|
|
|
self.assertEqual(event['event'], 'Time To First Bytes')
|
|
|
|
self.assertEqual(event['properties']['error'], 'DownloadSDTimeout')
|
2019-03-31 13:42:27 -04:00
|
|
|
self.assertEqual(event['properties']['tried_peers_count'], 0)
|
|
|
|
self.assertEqual(event['properties']['active_peer_count'], 0)
|
2019-03-14 13:07:11 -04:00
|
|
|
self.assertEqual(event['properties']['use_fixed_peers'], False)
|
|
|
|
self.assertEqual(event['properties']['added_fixed_peers'], False)
|
2019-03-14 15:08:26 -04:00
|
|
|
self.assertEqual(event['properties']['fixed_peer_delay'], None)
|
2019-08-07 12:55:18 -04:00
|
|
|
self.assertEqual(
|
|
|
|
event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout'
|
|
|
|
)
|
2019-03-14 13:07:11 -04:00
|
|
|
|
|
|
|
start = self.loop.time()
|
2019-03-14 15:08:26 -04:00
|
|
|
await self._test_time_to_first_bytes(check_post, DownloadSDTimeout)
|
2019-03-14 13:07:11 -04:00
|
|
|
duration = self.loop.time() - start
|
|
|
|
self.assertTrue(4.0 >= duration >= 3.0)
|
|
|
|
|
2019-02-06 13:32:50 -05:00
|
|
|
async def test_download_stop_resume_delete(self):
|
2019-02-08 21:08:41 -05:00
|
|
|
await self.setup_stream_manager()
|
2019-03-01 14:48:49 -05:00
|
|
|
received = []
|
2019-03-10 21:55:33 -04:00
|
|
|
expected_events = ['Time To First Bytes', 'Download Finished']
|
2019-03-01 14:48:49 -05:00
|
|
|
|
|
|
|
async def check_post(event):
|
|
|
|
received.append(event['event'])
|
|
|
|
|
|
|
|
self.stream_manager.analytics_manager._post = check_post
|
|
|
|
|
2019-03-31 13:42:27 -04:00
|
|
|
self.assertDictEqual(self.stream_manager.streams, {})
|
2019-02-06 13:08:57 -05:00
|
|
|
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
2019-02-06 13:32:50 -05:00
|
|
|
stream_hash = stream.stream_hash
|
2019-03-31 13:42:27 -04:00
|
|
|
self.assertDictEqual(self.stream_manager.streams, {stream.sd_hash: stream})
|
2019-02-06 13:08:57 -05:00
|
|
|
self.assertTrue(stream.running)
|
|
|
|
self.assertFalse(stream.finished)
|
2019-02-06 13:32:50 -05:00
|
|
|
self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file")))
|
|
|
|
stored_status = await self.client_storage.run_and_return_one_or_none(
|
|
|
|
"select status from file where stream_hash=?", stream_hash
|
|
|
|
)
|
|
|
|
self.assertEqual(stored_status, "running")
|
|
|
|
|
2019-05-01 17:09:50 -04:00
|
|
|
await stream.stop()
|
2019-02-06 13:32:50 -05:00
|
|
|
|
|
|
|
self.assertFalse(stream.finished)
|
|
|
|
self.assertFalse(stream.running)
|
|
|
|
self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file")))
|
|
|
|
stored_status = await self.client_storage.run_and_return_one_or_none(
|
|
|
|
"select status from file where stream_hash=?", stream_hash
|
|
|
|
)
|
|
|
|
self.assertEqual(stored_status, "stopped")
|
|
|
|
|
2019-05-01 17:09:50 -04:00
|
|
|
await stream.save_file(node=self.stream_manager.node)
|
2019-03-31 13:42:27 -04:00
|
|
|
await stream.finished_writing.wait()
|
2019-03-10 21:55:33 -04:00
|
|
|
await asyncio.sleep(0, loop=self.loop)
|
2019-02-06 13:08:57 -05:00
|
|
|
self.assertTrue(stream.finished)
|
|
|
|
self.assertFalse(stream.running)
|
2019-02-06 13:32:50 -05:00
|
|
|
self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file")))
|
|
|
|
stored_status = await self.client_storage.run_and_return_one_or_none(
|
|
|
|
"select status from file where stream_hash=?", stream_hash
|
|
|
|
)
|
|
|
|
self.assertEqual(stored_status, "finished")
|
|
|
|
|
|
|
|
await self.stream_manager.delete_stream(stream, True)
|
2019-03-31 13:42:27 -04:00
|
|
|
self.assertDictEqual(self.stream_manager.streams, {})
|
2019-02-06 13:32:50 -05:00
|
|
|
self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file")))
|
|
|
|
stored_status = await self.client_storage.run_and_return_one_or_none(
|
|
|
|
"select status from file where stream_hash=?", stream_hash
|
|
|
|
)
|
|
|
|
self.assertEqual(stored_status, None)
|
2019-03-01 14:48:49 -05:00
|
|
|
self.assertListEqual(expected_events, received)
|
|
|
|
|
2019-03-10 21:55:33 -04:00
|
|
|
async def _test_download_error_on_start(self, expected_error, timeout=None):
|
2019-08-07 12:55:18 -04:00
|
|
|
error = None
|
|
|
|
try:
|
2019-03-31 13:42:27 -04:00
|
|
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout)
|
2019-08-07 12:55:18 -04:00
|
|
|
except Exception as err:
|
|
|
|
if isinstance(err, asyncio.CancelledError):
|
|
|
|
raise
|
|
|
|
error = err
|
|
|
|
self.assertEqual(expected_error, type(error))
|
2019-03-10 21:55:33 -04:00
|
|
|
|
2019-08-07 12:55:18 -04:00
|
|
|
async def _test_download_error_analytics_on_start(self, expected_error, error_message, timeout=None):
|
2019-03-01 14:48:49 -05:00
|
|
|
received = []
|
|
|
|
|
|
|
|
async def check_post(event):
|
2019-03-10 21:55:33 -04:00
|
|
|
self.assertEqual("Time To First Bytes", event['event'])
|
2019-08-07 12:55:18 -04:00
|
|
|
self.assertEqual(event['properties']['error_message'], error_message)
|
2019-03-01 14:48:49 -05:00
|
|
|
received.append(event['properties']['error'])
|
|
|
|
|
|
|
|
self.stream_manager.analytics_manager._post = check_post
|
2019-03-10 21:55:33 -04:00
|
|
|
await self._test_download_error_on_start(expected_error, timeout)
|
|
|
|
await asyncio.sleep(0, loop=self.loop)
|
2019-03-01 14:48:49 -05:00
|
|
|
self.assertListEqual([expected_error.__name__], received)
|
2019-02-08 21:08:41 -05:00
|
|
|
|
|
|
|
async def test_insufficient_funds(self):
|
|
|
|
fee = {
|
|
|
|
'currency': 'LBC',
|
|
|
|
'amount': 11.0,
|
|
|
|
'address': 'bYFeMtSL7ARuG1iMpjFyrnTe4oJHSAVNXF',
|
|
|
|
'version': '_0_0_1'
|
|
|
|
}
|
|
|
|
await self.setup_stream_manager(10.0, fee)
|
2019-08-07 12:55:18 -04:00
|
|
|
await self._test_download_error_on_start(InsufficientFundsError, "")
|
2019-02-08 21:08:41 -05:00
|
|
|
|
|
|
|
async def test_fee_above_max_allowed(self):
|
|
|
|
fee = {
|
|
|
|
'currency': 'USD',
|
|
|
|
'amount': 51.0,
|
|
|
|
'address': 'bYFeMtSL7ARuG1iMpjFyrnTe4oJHSAVNXF',
|
|
|
|
'version': '_0_0_1'
|
|
|
|
}
|
|
|
|
await self.setup_stream_manager(1000000.0, fee)
|
2019-08-07 12:55:18 -04:00
|
|
|
await self._test_download_error_on_start(KeyFeeAboveMaxAllowed, "")
|
2019-03-01 14:48:49 -05:00
|
|
|
|
|
|
|
async def test_resolve_error(self):
|
|
|
|
await self.setup_stream_manager()
|
|
|
|
self.uri = "fake"
|
2019-03-10 21:55:33 -04:00
|
|
|
await self._test_download_error_on_start(ResolveError)
|
2019-03-01 14:48:49 -05:00
|
|
|
|
2019-03-10 21:55:33 -04:00
|
|
|
async def test_download_sd_timeout(self):
|
2019-03-01 14:48:49 -05:00
|
|
|
self.server.stop_server()
|
|
|
|
await self.setup_stream_manager()
|
2019-08-07 12:55:18 -04:00
|
|
|
await self._test_download_error_analytics_on_start(
|
|
|
|
DownloadSDTimeout, f'Failed to download sd blob {self.sd_hash} within timeout', timeout=1
|
|
|
|
)
|
2019-03-10 21:55:33 -04:00
|
|
|
|
|
|
|
async def test_download_data_timeout(self):
|
|
|
|
await self.setup_stream_manager()
|
|
|
|
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
|
|
|
|
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
|
|
|
self.server_blob_manager.delete_blob(head_blob_hash)
|
2019-08-07 12:55:18 -04:00
|
|
|
await self._test_download_error_analytics_on_start(
|
|
|
|
DownloadDataTimeout, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout', timeout=1
|
|
|
|
)
|
|
|
|
|
|
|
|
async def test_unexpected_error(self):
|
|
|
|
await self.setup_stream_manager()
|
|
|
|
err_msg = f"invalid blob directory '{self.client_dir}'"
|
|
|
|
shutil.rmtree(self.client_dir)
|
|
|
|
await self._test_download_error_analytics_on_start(
|
|
|
|
OSError, err_msg, timeout=1
|
|
|
|
)
|
|
|
|
os.mkdir(self.client_dir) # so the test cleanup doesn't error
|
2019-02-14 18:19:01 -05:00
|
|
|
|
2019-05-10 14:50:01 -04:00
|
|
|
async def test_non_head_data_timeout(self):
|
|
|
|
await self.setup_stream_manager()
|
|
|
|
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
|
2019-05-10 14:57:04 -04:00
|
|
|
last_blob_hash = json.loads(sdf.read())['blobs'][-2]['blob_hash']
|
|
|
|
self.server_blob_manager.delete_blob(last_blob_hash)
|
2019-05-10 14:50:01 -04:00
|
|
|
self.client_config.blob_download_timeout = 0.1
|
|
|
|
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
2019-05-10 14:57:04 -04:00
|
|
|
await stream.started_writing.wait()
|
|
|
|
self.assertEqual('running', stream.status)
|
|
|
|
self.assertIsNotNone(stream.full_path)
|
|
|
|
self.assertGreater(stream.written_bytes, 0)
|
2019-05-10 14:50:01 -04:00
|
|
|
await stream.finished_write_attempt.wait()
|
|
|
|
self.assertEqual('stopped', stream.status)
|
|
|
|
self.assertIsNone(stream.full_path)
|
|
|
|
self.assertEqual(0, stream.written_bytes)
|
|
|
|
|
|
|
|
self.stream_manager.stop()
|
|
|
|
await self.stream_manager.start()
|
|
|
|
self.assertEqual(1, len(self.stream_manager.streams))
|
|
|
|
stream = list(self.stream_manager.streams.values())[0]
|
|
|
|
self.assertEqual('stopped', stream.status)
|
|
|
|
self.assertIsNone(stream.full_path)
|
|
|
|
self.assertEqual(0, stream.written_bytes)
|
|
|
|
|
2019-02-14 18:19:01 -05:00
|
|
|
async def test_download_then_recover_stream_on_startup(self, old_sort=False):
|
2019-03-01 14:48:49 -05:00
|
|
|
expected_analytics_events = [
|
2019-03-10 21:55:33 -04:00
|
|
|
'Time To First Bytes',
|
2019-03-01 14:48:49 -05:00
|
|
|
'Download Finished'
|
|
|
|
]
|
|
|
|
received_events = []
|
|
|
|
|
|
|
|
async def check_post(event):
|
|
|
|
received_events.append(event['event'])
|
|
|
|
|
2019-02-14 18:19:01 -05:00
|
|
|
await self.setup_stream_manager(old_sort=old_sort)
|
2019-03-01 14:48:49 -05:00
|
|
|
self.stream_manager.analytics_manager._post = check_post
|
|
|
|
|
2019-03-31 13:42:27 -04:00
|
|
|
self.assertDictEqual(self.stream_manager.streams, {})
|
2019-02-14 18:19:01 -05:00
|
|
|
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
2019-03-31 13:42:27 -04:00
|
|
|
await stream.finished_writing.wait()
|
2019-03-10 21:55:33 -04:00
|
|
|
await asyncio.sleep(0, loop=self.loop)
|
2019-02-14 18:19:01 -05:00
|
|
|
self.stream_manager.stop()
|
|
|
|
self.client_blob_manager.stop()
|
|
|
|
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
|
|
|
|
for blob in stream.descriptor.blobs[:-1]:
|
|
|
|
os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash))
|
|
|
|
await self.client_blob_manager.setup()
|
|
|
|
await self.stream_manager.start()
|
|
|
|
self.assertEqual(1, len(self.stream_manager.streams))
|
2019-03-31 13:42:27 -04:00
|
|
|
self.assertListEqual([self.sd_hash], list(self.stream_manager.streams.keys()))
|
|
|
|
for blob_hash in [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]:
|
|
|
|
blob_status = await self.client_storage.get_blob_status(blob_hash)
|
|
|
|
self.assertEqual('pending', blob_status)
|
2019-05-03 20:50:11 -04:00
|
|
|
self.assertEqual('finished', self.stream_manager.streams[self.sd_hash].status)
|
2019-02-14 18:19:01 -05:00
|
|
|
|
|
|
|
sd_blob = self.client_blob_manager.get_blob(stream.sd_hash)
|
|
|
|
self.assertTrue(sd_blob.file_exists)
|
|
|
|
self.assertTrue(sd_blob.get_is_verified())
|
2019-03-01 14:48:49 -05:00
|
|
|
self.assertListEqual(expected_analytics_events, received_events)
|
2019-02-14 18:19:01 -05:00
|
|
|
|
|
|
|
def test_download_then_recover_old_sort_stream_on_startup(self):
|
|
|
|
return self.test_download_then_recover_stream_on_startup(old_sort=True)
|