time to first bytes analytics
This commit is contained in:
parent
0cc602ca53
commit
5d212a0f82
8 changed files with 268 additions and 203 deletions
|
@ -28,8 +28,7 @@ class DownloadTimeoutError(Exception):
|
||||||
|
|
||||||
class DownloadDataTimeout(Exception):
|
class DownloadDataTimeout(Exception):
|
||||||
def __init__(self, download):
|
def __init__(self, download):
|
||||||
super().__init__('Failed to download data blobs for sd hash '
|
super().__init__(f'Failed to download data blobs for sd hash {download} within timeout ')
|
||||||
'{} within timeout'.format(download))
|
|
||||||
self.download = download
|
self.download = download
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -51,16 +51,6 @@ async def gather_dict(tasks: dict):
|
||||||
)))
|
)))
|
||||||
|
|
||||||
|
|
||||||
async def get_external_ip(): # used if upnp is disabled or non-functioning
|
|
||||||
try:
|
|
||||||
async with utils.aiohttp_request("get", "https://api.lbry.io/ip") as resp:
|
|
||||||
response = await resp.json()
|
|
||||||
if response['success']:
|
|
||||||
return response['data']['ip']
|
|
||||||
except Exception as e:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class DatabaseComponent(Component):
|
class DatabaseComponent(Component):
|
||||||
component_name = DATABASE_COMPONENT
|
component_name = DATABASE_COMPONENT
|
||||||
|
|
||||||
|
@ -358,7 +348,7 @@ class DHTComponent(Component):
|
||||||
external_ip = self.upnp_component.external_ip
|
external_ip = self.upnp_component.external_ip
|
||||||
if not external_ip:
|
if not external_ip:
|
||||||
log.warning("UPnP component failed to get external ip")
|
log.warning("UPnP component failed to get external ip")
|
||||||
external_ip = await get_external_ip()
|
external_ip = await utils.get_external_ip()
|
||||||
if not external_ip:
|
if not external_ip:
|
||||||
log.warning("failed to get external ip")
|
log.warning("failed to get external ip")
|
||||||
|
|
||||||
|
@ -523,7 +513,7 @@ class UPnPComponent(Component):
|
||||||
|
|
||||||
if external_ip == "0.0.0.0" or not external_ip:
|
if external_ip == "0.0.0.0" or not external_ip:
|
||||||
log.warning("unable to get external ip from UPnP, checking lbry.io fallback")
|
log.warning("unable to get external ip from UPnP, checking lbry.io fallback")
|
||||||
external_ip = await get_external_ip()
|
external_ip = await utils.get_external_ip()
|
||||||
if self.external_ip and self.external_ip != external_ip:
|
if self.external_ip and self.external_ip != external_ip:
|
||||||
log.info("external ip changed from %s to %s", self.external_ip, external_ip)
|
log.info("external ip changed from %s to %s", self.external_ip, external_ip)
|
||||||
self.external_ip = external_ip
|
self.external_ip = external_ip
|
||||||
|
@ -574,7 +564,7 @@ class UPnPComponent(Component):
|
||||||
async def start(self):
|
async def start(self):
|
||||||
log.info("detecting external ip")
|
log.info("detecting external ip")
|
||||||
if not self.use_upnp:
|
if not self.use_upnp:
|
||||||
self.external_ip = await get_external_ip()
|
self.external_ip = await utils.get_external_ip()
|
||||||
return
|
return
|
||||||
success = False
|
success = False
|
||||||
await self._maintain_redirects()
|
await self._maintain_redirects()
|
||||||
|
|
|
@ -416,7 +416,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
self.ensure_wallet_dir()
|
self.ensure_wallet_dir()
|
||||||
self.ensure_download_dir()
|
self.ensure_download_dir()
|
||||||
if not self.analytics_manager.is_started:
|
if not self.analytics_manager.is_started:
|
||||||
self.analytics_manager.start()
|
await self.analytics_manager.start()
|
||||||
self.component_startup_task = asyncio.create_task(self.component_manager.start())
|
self.component_startup_task = asyncio.create_task(self.component_manager.start())
|
||||||
await self.component_startup_task
|
await self.component_startup_task
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ import collections
|
||||||
import logging
|
import logging
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import typing
|
import typing
|
||||||
import binascii
|
|
||||||
from lbrynet import utils
|
from lbrynet import utils
|
||||||
from lbrynet.conf import Config
|
from lbrynet.conf import Config
|
||||||
from lbrynet.extras import system_info
|
from lbrynet.extras import system_info
|
||||||
|
@ -26,6 +25,10 @@ UPNP_SETUP = "UPnP Setup"
|
||||||
|
|
||||||
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
|
BLOB_BYTES_UPLOADED = 'Blob Bytes Uploaded'
|
||||||
|
|
||||||
|
|
||||||
|
TIME_TO_FIRST_BYTES = "Time To First Bytes"
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -39,13 +42,40 @@ def _event_properties(installation_id: str, session_id: str,
|
||||||
return properties
|
return properties
|
||||||
|
|
||||||
|
|
||||||
def _download_properties(download_id: str, name: str, sd_hash: str) -> typing.Dict:
|
def _download_properties(conf: Config, external_ip: str, resolve_duration: float,
|
||||||
p = {
|
total_duration: typing.Optional[float], download_id: str, name: str,
|
||||||
'download_id': download_id,
|
outpoint: str, active_peer_count: int, tried_peers_count: int,
|
||||||
|
sd_hash: str, sd_download_duration: typing.Optional[float] = None,
|
||||||
|
head_blob_hash: typing.Optional[str] = None,
|
||||||
|
head_blob_length: typing.Optional[int] = None,
|
||||||
|
head_blob_download_duration: typing.Optional[float] = None,
|
||||||
|
error: typing.Optional[str] = None) -> typing.Dict:
|
||||||
|
return {
|
||||||
|
"external_ip": external_ip,
|
||||||
|
"download_id": download_id,
|
||||||
|
"total_duration": round(total_duration, 4),
|
||||||
|
"resolve_duration": None if not resolve_duration else round(resolve_duration, 4),
|
||||||
|
"error": error,
|
||||||
'name': name,
|
'name': name,
|
||||||
'stream_info': sd_hash
|
"outpoint": outpoint,
|
||||||
|
|
||||||
|
"node_rpc_timeout": conf.node_rpc_timeout,
|
||||||
|
"peer_connect_timeout": conf.peer_connect_timeout,
|
||||||
|
"blob_download_timeout": conf.blob_download_timeout,
|
||||||
|
"use_fixed_peers": len(conf.reflector_servers) > 0,
|
||||||
|
"fixed_peer_delay": conf.fixed_peer_delay,
|
||||||
|
"added_fixed_peers": (conf.fixed_peer_delay < total_duration) and len(conf.reflector_servers) > 0,
|
||||||
|
|
||||||
|
"active_peer_count": active_peer_count,
|
||||||
|
"tried_peers_count": tried_peers_count,
|
||||||
|
|
||||||
|
"sd_blob_hash": sd_hash,
|
||||||
|
"sd_blob_duration": None if not sd_download_duration else round(sd_download_duration, 4),
|
||||||
|
|
||||||
|
"head_blob_hash": head_blob_hash,
|
||||||
|
"head_blob_length": head_blob_length,
|
||||||
|
"head_blob_duration": None if not head_blob_download_duration else round(head_blob_download_duration, 4)
|
||||||
}
|
}
|
||||||
return p
|
|
||||||
|
|
||||||
|
|
||||||
def _make_context(platform):
|
def _make_context(platform):
|
||||||
|
@ -71,6 +101,7 @@ def _make_context(platform):
|
||||||
class AnalyticsManager:
|
class AnalyticsManager:
|
||||||
|
|
||||||
def __init__(self, conf: Config, installation_id: str, session_id: str):
|
def __init__(self, conf: Config, installation_id: str, session_id: str):
|
||||||
|
self.conf = conf
|
||||||
self.cookies = {}
|
self.cookies = {}
|
||||||
self.url = ANALYTICS_ENDPOINT
|
self.url = ANALYTICS_ENDPOINT
|
||||||
self._write_key = utils.deobfuscate(ANALYTICS_TOKEN)
|
self._write_key = utils.deobfuscate(ANALYTICS_TOKEN)
|
||||||
|
@ -80,19 +111,22 @@ class AnalyticsManager:
|
||||||
self.installation_id = installation_id
|
self.installation_id = installation_id
|
||||||
self.session_id = session_id
|
self.session_id = session_id
|
||||||
self.task: asyncio.Task = None
|
self.task: asyncio.Task = None
|
||||||
|
self.external_ip: typing.Optional[str] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_started(self):
|
def is_started(self):
|
||||||
return self.task is not None
|
return self.task is not None
|
||||||
|
|
||||||
def start(self):
|
async def start(self):
|
||||||
if self._enabled and self.task is None:
|
if self._enabled and self.task is None:
|
||||||
|
self.external_ip = await utils.get_external_ip()
|
||||||
self.task = asyncio.create_task(self.run())
|
self.task = asyncio.create_task(self.run())
|
||||||
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
while True:
|
while True:
|
||||||
await self._send_heartbeat()
|
await self._send_heartbeat()
|
||||||
await asyncio.sleep(1800)
|
await asyncio.sleep(1800)
|
||||||
|
self.external_ip = await utils.get_external_ip()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.task is not None and not self.task.done():
|
if self.task is not None and not self.task.done():
|
||||||
|
@ -111,7 +145,7 @@ class AnalyticsManager:
|
||||||
async with utils.aiohttp_request(**request_kwargs) as response:
|
async with utils.aiohttp_request(**request_kwargs) as response:
|
||||||
self.cookies.update(response.cookies)
|
self.cookies.update(response.cookies)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.exception('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e)
|
log.debug('Encountered an exception while POSTing to %s: ', self.url + '/track', exc_info=e)
|
||||||
|
|
||||||
async def track(self, event: typing.Dict):
|
async def track(self, event: typing.Dict):
|
||||||
"""Send a single tracking event"""
|
"""Send a single tracking event"""
|
||||||
|
@ -136,23 +170,31 @@ class AnalyticsManager:
|
||||||
async def send_server_startup_error(self, message):
|
async def send_server_startup_error(self, message):
|
||||||
await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
|
await self.track(self._event(SERVER_STARTUP_ERROR, {'message': message}))
|
||||||
|
|
||||||
async def send_download_started(self, download_id, name, sd_hash):
|
async def send_time_to_first_bytes(self, resolve_duration: typing.Optional[float],
|
||||||
await self.track(
|
total_duration: typing.Optional[float], download_id: str,
|
||||||
self._event(DOWNLOAD_STARTED, _download_properties(download_id, name, sd_hash))
|
name: str, outpoint: str, found_peers_count: int,
|
||||||
)
|
tried_peers_count: int, sd_hash: str,
|
||||||
|
sd_download_duration: typing.Optional[float] = None,
|
||||||
|
head_blob_hash: typing.Optional[str] = None,
|
||||||
|
head_blob_length: typing.Optional[int] = None,
|
||||||
|
head_blob_duration: typing.Optional[int] = None,
|
||||||
|
error: typing.Optional[str] = None):
|
||||||
|
await self.track(self._event(TIME_TO_FIRST_BYTES, _download_properties(
|
||||||
|
self.conf, self.external_ip, resolve_duration, total_duration, download_id, name, outpoint,
|
||||||
|
found_peers_count, tried_peers_count, sd_hash, sd_download_duration, head_blob_hash, head_blob_length,
|
||||||
|
head_blob_duration, error
|
||||||
|
)))
|
||||||
|
|
||||||
async def send_download_finished(self, download_id, name, sd_hash):
|
async def send_download_finished(self, download_id, name, sd_hash):
|
||||||
await self.track(self._event(DOWNLOAD_FINISHED, _download_properties(download_id, name, sd_hash)))
|
await self.track(
|
||||||
|
self._event(
|
||||||
async def send_download_errored(self, error: Exception, name: str):
|
DOWNLOAD_FINISHED, {
|
||||||
await self.track(self._event(DOWNLOAD_ERRORED, {
|
'download_id': download_id,
|
||||||
'download_id': binascii.hexlify(utils.generate_id()).decode(),
|
'name': name,
|
||||||
'name': name,
|
'stream_info': sd_hash
|
||||||
'stream_info': None,
|
}
|
||||||
'error': type(error).__name__,
|
)
|
||||||
'reason': str(error),
|
)
|
||||||
'report': None
|
|
||||||
}))
|
|
||||||
|
|
||||||
async def send_claim_action(self, action):
|
async def send_claim_action(self, action):
|
||||||
await self.track(self._event(CLAIM_ACTION, {'action': action}))
|
await self.track(self._event(CLAIM_ACTION, {'action': action}))
|
||||||
|
|
|
@ -25,7 +25,8 @@ class ManagedStream:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int,
|
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int,
|
||||||
descriptor: 'StreamDescriptor', download_directory: str, file_name: typing.Optional[str],
|
descriptor: 'StreamDescriptor', download_directory: str, file_name: typing.Optional[str],
|
||||||
downloader: typing.Optional[StreamDownloader] = None,
|
downloader: typing.Optional[StreamDownloader] = None,
|
||||||
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None):
|
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
|
||||||
|
download_id: typing.Optional[str] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.rowid = rowid
|
self.rowid = rowid
|
||||||
|
@ -39,7 +40,7 @@ class ManagedStream:
|
||||||
|
|
||||||
self.fully_reflected = asyncio.Event(loop=self.loop)
|
self.fully_reflected = asyncio.Event(loop=self.loop)
|
||||||
self.tx = None
|
self.tx = None
|
||||||
self.download_id = binascii.hexlify(generate_id()).decode()
|
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def file_name(self) -> typing.Optional[str]:
|
def file_name(self) -> typing.Optional[str]:
|
||||||
|
|
|
@ -6,6 +6,7 @@ import logging
|
||||||
import random
|
import random
|
||||||
from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError, \
|
from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError, \
|
||||||
DownloadDataTimeout, DownloadSDTimeout
|
DownloadDataTimeout, DownloadSDTimeout
|
||||||
|
from lbrynet.utils import generate_id
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
from lbrynet.stream.downloader import StreamDownloader
|
from lbrynet.stream.downloader import StreamDownloader
|
||||||
from lbrynet.stream.managed_stream import ManagedStream
|
from lbrynet.stream.managed_stream import ManagedStream
|
||||||
|
@ -275,101 +276,6 @@ class StreamManager:
|
||||||
if delete_file and stream.output_file_exists:
|
if delete_file and stream.output_file_exists:
|
||||||
os.remove(stream.full_path)
|
os.remove(stream.full_path)
|
||||||
|
|
||||||
def wait_for_stream_finished(self, stream: ManagedStream):
|
|
||||||
async def _wait_for_stream_finished():
|
|
||||||
if stream.downloader and stream.running:
|
|
||||||
await stream.downloader.stream_finished_event.wait()
|
|
||||||
stream.update_status(ManagedStream.STATUS_FINISHED)
|
|
||||||
if self.analytics_manager:
|
|
||||||
self.loop.create_task(self.analytics_manager.send_download_finished(
|
|
||||||
stream.download_id, stream.claim_name, stream.sd_hash
|
|
||||||
))
|
|
||||||
|
|
||||||
task = self.loop.create_task(_wait_for_stream_finished())
|
|
||||||
self.update_stream_finished_futs.append(task)
|
|
||||||
task.add_done_callback(
|
|
||||||
lambda _: None if task not in self.update_stream_finished_futs else
|
|
||||||
self.update_stream_finished_futs.remove(task)
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict,
|
|
||||||
file_name: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
|
|
||||||
|
|
||||||
claim = smart_decode(claim_info['value'])
|
|
||||||
downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.source_hash.decode(),
|
|
||||||
download_directory, file_name)
|
|
||||||
try:
|
|
||||||
downloader.download(node)
|
|
||||||
await downloader.got_descriptor.wait()
|
|
||||||
log.info("got descriptor %s for %s", claim.source_hash.decode(), claim_info['name'])
|
|
||||||
except (asyncio.TimeoutError, asyncio.CancelledError):
|
|
||||||
log.info("stream timeout")
|
|
||||||
downloader.stop()
|
|
||||||
log.info("stopped stream")
|
|
||||||
raise DownloadSDTimeout(downloader.sd_hash)
|
|
||||||
rowid = await self._store_stream(downloader)
|
|
||||||
await self.storage.save_content_claim(
|
|
||||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
|
||||||
)
|
|
||||||
stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, download_directory,
|
|
||||||
file_name, downloader, ManagedStream.STATUS_RUNNING)
|
|
||||||
stream.set_claim(claim_info, claim)
|
|
||||||
self.streams.add(stream)
|
|
||||||
try:
|
|
||||||
await stream.downloader.wrote_bytes_event.wait()
|
|
||||||
self.wait_for_stream_finished(stream)
|
|
||||||
return stream
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
downloader.stop()
|
|
||||||
log.debug("stopped stream")
|
|
||||||
await self.stop_stream(stream)
|
|
||||||
raise DownloadDataTimeout(downloader.sd_hash)
|
|
||||||
|
|
||||||
async def _store_stream(self, downloader: StreamDownloader) -> int:
|
|
||||||
file_name = os.path.basename(downloader.output_path)
|
|
||||||
download_directory = os.path.dirname(downloader.output_path)
|
|
||||||
if not await self.storage.stream_exists(downloader.sd_hash):
|
|
||||||
await self.storage.store_stream(downloader.sd_blob, downloader.descriptor)
|
|
||||||
if not await self.storage.file_exists(downloader.sd_hash):
|
|
||||||
return await self.storage.save_downloaded_file(
|
|
||||||
downloader.descriptor.stream_hash, file_name, download_directory,
|
|
||||||
0.0
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return await self.storage.rowid_for_stream(downloader.descriptor.stream_hash)
|
|
||||||
|
|
||||||
async def download_stream_from_claim(self, node: 'Node', claim_info: typing.Dict,
|
|
||||||
file_name: typing.Optional[str] = None,
|
|
||||||
timeout: typing.Optional[float] = 60,
|
|
||||||
fee_amount: typing.Optional[float] = 0.0,
|
|
||||||
fee_address: typing.Optional[str] = None,
|
|
||||||
should_pay: typing.Optional[bool] = True) -> typing.Optional[ManagedStream]:
|
|
||||||
claim = ClaimDict.load_dict(claim_info['value'])
|
|
||||||
sd_hash = claim.source_hash.decode()
|
|
||||||
if sd_hash in self.starting_streams:
|
|
||||||
return await self.starting_streams[sd_hash]
|
|
||||||
already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams))
|
|
||||||
if already_started:
|
|
||||||
return already_started[0]
|
|
||||||
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
|
|
||||||
stream_task = self.loop.create_task(
|
|
||||||
self._download_stream_from_claim(node, self.config.download_dir, claim_info, file_name)
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(stream_task, timeout or self.config.download_timeout)
|
|
||||||
stream = await stream_task
|
|
||||||
self.starting_streams[sd_hash].set_result(stream)
|
|
||||||
if should_pay and fee_address and fee_amount:
|
|
||||||
stream.tx = await self.wallet.send_amount_to_address(
|
|
||||||
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1'))
|
|
||||||
return stream
|
|
||||||
except asyncio.TimeoutError as e:
|
|
||||||
if stream_task.exception():
|
|
||||||
raise stream_task.exception()
|
|
||||||
finally:
|
|
||||||
if sd_hash in self.starting_streams:
|
|
||||||
del self.starting_streams[sd_hash]
|
|
||||||
|
|
||||||
def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]:
|
def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]:
|
||||||
streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams))
|
streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams))
|
||||||
if streams:
|
if streams:
|
||||||
|
@ -411,30 +317,111 @@ class StreamManager:
|
||||||
streams.reverse()
|
streams.reverse()
|
||||||
return streams
|
return streams
|
||||||
|
|
||||||
|
def wait_for_stream_finished(self, stream: ManagedStream):
|
||||||
|
async def _wait_for_stream_finished():
|
||||||
|
if stream.downloader and stream.running:
|
||||||
|
await stream.downloader.stream_finished_event.wait()
|
||||||
|
stream.update_status(ManagedStream.STATUS_FINISHED)
|
||||||
|
if self.analytics_manager:
|
||||||
|
self.loop.create_task(self.analytics_manager.send_download_finished(
|
||||||
|
stream.download_id, stream.claim_name, stream.sd_hash
|
||||||
|
))
|
||||||
|
|
||||||
|
task = self.loop.create_task(_wait_for_stream_finished())
|
||||||
|
self.update_stream_finished_futs.append(task)
|
||||||
|
task.add_done_callback(
|
||||||
|
lambda _: None if task not in self.update_stream_finished_futs else
|
||||||
|
self.update_stream_finished_futs.remove(task)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _store_stream(self, downloader: StreamDownloader) -> int:
|
||||||
|
file_name = os.path.basename(downloader.output_path)
|
||||||
|
download_directory = os.path.dirname(downloader.output_path)
|
||||||
|
if not await self.storage.stream_exists(downloader.sd_hash):
|
||||||
|
await self.storage.store_stream(downloader.sd_blob, downloader.descriptor)
|
||||||
|
if not await self.storage.file_exists(downloader.sd_hash):
|
||||||
|
return await self.storage.save_downloaded_file(
|
||||||
|
downloader.descriptor.stream_hash, file_name, download_directory,
|
||||||
|
0.0
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return await self.storage.rowid_for_stream(downloader.descriptor.stream_hash)
|
||||||
|
|
||||||
|
async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: ClaimDict) -> typing.Tuple[
|
||||||
|
typing.Optional[ManagedStream], typing.Optional[ManagedStream]]:
|
||||||
|
existing = self.get_filtered_streams(outpoint=outpoint)
|
||||||
|
if existing:
|
||||||
|
await self.start_stream(existing[0])
|
||||||
|
return existing[0], None
|
||||||
|
existing = self.get_filtered_streams(sd_hash=claim.source_hash.decode())
|
||||||
|
if existing and existing[0].claim_id != claim_id:
|
||||||
|
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing "
|
||||||
|
f"download {claim_id}")
|
||||||
|
if existing:
|
||||||
|
log.info("claim contains a metadata only update to a stream we have")
|
||||||
|
await self.storage.save_content_claim(
|
||||||
|
existing[0].stream_hash, outpoint
|
||||||
|
)
|
||||||
|
await self._update_content_claim(existing[0])
|
||||||
|
await self.start_stream(existing[0])
|
||||||
|
return existing[0], None
|
||||||
|
else:
|
||||||
|
existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id)
|
||||||
|
if existing_for_claim_id:
|
||||||
|
log.info("claim contains an update to a stream we have, downloading it")
|
||||||
|
return None, existing_for_claim_id[0]
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
async def start_downloader(self, got_descriptor_time: asyncio.Future, downloader: StreamDownloader,
|
||||||
|
download_id: str, outpoint: str, claim: ClaimDict, resolved: typing.Dict,
|
||||||
|
file_name: typing.Optional[str] = None) -> ManagedStream:
|
||||||
|
start_time = self.loop.time()
|
||||||
|
downloader.download(self.node)
|
||||||
|
await downloader.got_descriptor.wait()
|
||||||
|
got_descriptor_time.set_result(self.loop.time() - start_time)
|
||||||
|
rowid = await self._store_stream(downloader)
|
||||||
|
await self.storage.save_content_claim(
|
||||||
|
downloader.descriptor.stream_hash, outpoint
|
||||||
|
)
|
||||||
|
stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, self.config.download_dir,
|
||||||
|
file_name, downloader, ManagedStream.STATUS_RUNNING, download_id=download_id)
|
||||||
|
stream.set_claim(resolved, claim)
|
||||||
|
await stream.downloader.wrote_bytes_event.wait()
|
||||||
|
self.streams.add(stream)
|
||||||
|
return stream
|
||||||
|
|
||||||
async def _download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
async def _download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
||||||
file_name: typing.Optional[str] = None,
|
file_name: typing.Optional[str] = None,
|
||||||
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
|
timeout: typing.Optional[float] = None) -> ManagedStream:
|
||||||
timeout = timeout or self.config.download_timeout
|
start_time = self.loop.time()
|
||||||
parsed_uri = parse_lbry_uri(uri)
|
parsed_uri = parse_lbry_uri(uri)
|
||||||
if parsed_uri.is_channel:
|
if parsed_uri.is_channel:
|
||||||
raise ResolveError("cannot download a channel claim, specify a /path")
|
raise ResolveError("cannot download a channel claim, specify a /path")
|
||||||
|
|
||||||
|
# resolve the claim
|
||||||
resolved = (await self.wallet.resolve(uri)).get(uri, {})
|
resolved = (await self.wallet.resolve(uri)).get(uri, {})
|
||||||
resolved = resolved if 'value' in resolved else resolved.get('claim')
|
resolved = resolved if 'value' in resolved else resolved.get('claim')
|
||||||
|
|
||||||
if not resolved:
|
if not resolved:
|
||||||
raise ResolveError(
|
raise ResolveError(f"Failed to resolve stream at '{uri}'")
|
||||||
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
|
|
||||||
)
|
|
||||||
if 'error' in resolved:
|
if 'error' in resolved:
|
||||||
raise ResolveError(f"error resolving stream: {resolved['error']}")
|
raise ResolveError(f"error resolving stream: {resolved['error']}")
|
||||||
|
|
||||||
claim = ClaimDict.load_dict(resolved['value'])
|
claim = ClaimDict.load_dict(resolved['value'])
|
||||||
|
outpoint = f"{resolved['txid']}:{resolved['nout']}"
|
||||||
|
resolved_time = self.loop.time() - start_time
|
||||||
|
|
||||||
|
# resume or update an existing stream, if the stream changed download it and delete the old one after
|
||||||
|
updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
|
||||||
|
if updated_stream:
|
||||||
|
return updated_stream
|
||||||
|
|
||||||
|
# check that the fee is payable
|
||||||
fee_amount, fee_address = None, None
|
fee_amount, fee_address = None, None
|
||||||
if claim.has_fee:
|
if claim.has_fee:
|
||||||
fee_amount = round(exchange_rate_manager.convert_currency(
|
fee_amount = round(exchange_rate_manager.convert_currency(
|
||||||
claim.source_fee.currency, "LBC", claim.source_fee.amount
|
claim.source_fee.currency, "LBC", claim.source_fee.amount
|
||||||
), 5)
|
), 5)
|
||||||
max_fee_amount = round(exchange_rate_manager.convert_currency(
|
max_fee_amount = round(exchange_rate_manager.convert_currency(
|
||||||
self.config.max_key_fee['currency'], "LBC", self.config.max_key_fee['amount']
|
self.config.max_key_fee['currency'], "LBC", self.config.max_key_fee['amount']
|
||||||
), 5)
|
), 5)
|
||||||
|
@ -448,52 +435,76 @@ class StreamManager:
|
||||||
log.warning(msg)
|
log.warning(msg)
|
||||||
raise InsufficientFundsError(msg)
|
raise InsufficientFundsError(msg)
|
||||||
fee_address = claim.source_fee.address.decode()
|
fee_address = claim.source_fee.address.decode()
|
||||||
outpoint = f"{resolved['txid']}:{resolved['nout']}"
|
|
||||||
existing = self.get_filtered_streams(outpoint=outpoint)
|
|
||||||
|
|
||||||
if not existing:
|
# download the stream
|
||||||
existing.extend(self.get_filtered_streams(sd_hash=claim.source_hash.decode()))
|
download_id = binascii.hexlify(generate_id()).decode()
|
||||||
if existing and existing[0].claim_id != resolved['claim_id']:
|
downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.source_hash.decode(),
|
||||||
raise Exception(f"stream for {existing[0].claim_id} collides with existing "
|
self.config.download_dir, file_name)
|
||||||
f"download {resolved['claim_id']}")
|
|
||||||
existing.extend(self.get_filtered_streams(claim_id=resolved['claim_id']))
|
stream = None
|
||||||
if existing and existing[0].sd_hash != claim.source_hash.decode():
|
descriptor_time_fut = self.loop.create_future()
|
||||||
log.info("claim contains an update to a stream we have, downloading it")
|
start_download_time = self.loop.time()
|
||||||
stream = await self.download_stream_from_claim(
|
time_to_descriptor = None
|
||||||
self.node, resolved, file_name, timeout, fee_amount, fee_address, False
|
time_to_first_bytes = None
|
||||||
)
|
error = None
|
||||||
log.info("started new stream, deleting old one")
|
try:
|
||||||
if self.analytics_manager:
|
stream = await asyncio.wait_for(
|
||||||
self.loop.create_task(self.analytics_manager.send_download_started(
|
asyncio.ensure_future(
|
||||||
stream.download_id, parsed_uri.name, claim.source_hash.decode()
|
self.start_downloader(descriptor_time_fut, downloader, download_id, outpoint, claim, resolved,
|
||||||
))
|
file_name)
|
||||||
await self.delete_stream(existing[0])
|
), timeout
|
||||||
return stream
|
)
|
||||||
elif existing:
|
time_to_descriptor = await descriptor_time_fut
|
||||||
log.info("already have matching stream for %s", uri)
|
time_to_first_bytes = self.loop.time() - start_download_time
|
||||||
stream = existing[0]
|
self.wait_for_stream_finished(stream)
|
||||||
await self.start_stream(stream)
|
if fee_address and fee_amount and not to_replace:
|
||||||
return stream
|
stream.tx = await self.wallet.send_amount_to_address(
|
||||||
else:
|
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1'))
|
||||||
stream = existing[0]
|
elif to_replace: # delete old stream now that the replacement has started downloading
|
||||||
await self.start_stream(stream)
|
await self.delete_stream(to_replace)
|
||||||
return stream
|
except asyncio.TimeoutError:
|
||||||
log.info("download stream from %s", uri)
|
if descriptor_time_fut.done():
|
||||||
stream = await self.download_stream_from_claim(
|
time_to_descriptor = descriptor_time_fut.result()
|
||||||
self.node, resolved, file_name, timeout, fee_amount, fee_address
|
error = DownloadDataTimeout(downloader.descriptor.blobs[0].blob_hash)
|
||||||
)
|
else:
|
||||||
|
descriptor_time_fut.cancel()
|
||||||
|
error = DownloadSDTimeout(downloader.sd_hash)
|
||||||
|
if stream:
|
||||||
|
await self.stop_stream(stream)
|
||||||
|
elif downloader:
|
||||||
|
downloader.stop()
|
||||||
|
if error:
|
||||||
|
log.warning(error)
|
||||||
if self.analytics_manager:
|
if self.analytics_manager:
|
||||||
self.loop.create_task(self.analytics_manager.send_download_started(
|
self.loop.create_task(
|
||||||
stream.download_id, parsed_uri.name, claim.source_hash.decode()
|
self.analytics_manager.send_time_to_first_bytes(
|
||||||
))
|
resolved_time, self.loop.time() - start_time, download_id, parse_lbry_uri(uri).name, outpoint,
|
||||||
|
None if not stream else len(stream.downloader.blob_downloader.active_connections),
|
||||||
|
None if not stream else len(stream.downloader.blob_downloader.scores),
|
||||||
|
claim.source_hash.decode(), time_to_descriptor,
|
||||||
|
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
|
||||||
|
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
|
||||||
|
time_to_first_bytes, None if not error else error.__class__.__name__
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if error:
|
||||||
|
raise error
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
||||||
file_name: typing.Optional[str] = None,
|
file_name: typing.Optional[str] = None,
|
||||||
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
|
timeout: typing.Optional[float] = None) -> ManagedStream:
|
||||||
|
if uri in self.starting_streams:
|
||||||
|
return await self.starting_streams[uri]
|
||||||
|
fut = asyncio.Future(loop=self.loop)
|
||||||
|
self.starting_streams[uri] = fut
|
||||||
try:
|
try:
|
||||||
return await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout)
|
stream = await self._download_stream_from_uri(uri, exchange_rate_manager, file_name, timeout)
|
||||||
except Exception as e:
|
fut.set_result(stream)
|
||||||
if self.analytics_manager:
|
return stream
|
||||||
await self.analytics_manager.send_download_errored(e, uri)
|
except Exception as err:
|
||||||
raise e
|
fut.set_exception(err)
|
||||||
|
try:
|
||||||
|
return await fut
|
||||||
|
finally:
|
||||||
|
del self.starting_streams[uri]
|
||||||
|
|
|
@ -173,3 +173,13 @@ async def aiohttp_request(method, url, **kwargs) -> typing.AsyncContextManager[a
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.request(method, url, ssl=get_ssl_context(), **kwargs) as response:
|
async with session.request(method, url, ssl=get_ssl_context(), **kwargs) as response:
|
||||||
yield response
|
yield response
|
||||||
|
|
||||||
|
|
||||||
|
async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled or non-functioning
|
||||||
|
try:
|
||||||
|
async with aiohttp_request("get", "https://api.lbry.io/ip") as resp:
|
||||||
|
response = await resp.json()
|
||||||
|
if response['success']:
|
||||||
|
return response['data']['ip']
|
||||||
|
except Exception as e:
|
||||||
|
return
|
||||||
|
|
|
@ -3,11 +3,12 @@ import binascii
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
import typing
|
import json
|
||||||
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
|
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
|
||||||
from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager
|
from tests.unit.lbrynet_daemon.test_ExchangeRateManager import get_dummy_exchange_rate_manager
|
||||||
from lbrynet.utils import generate_id
|
from lbrynet.utils import generate_id
|
||||||
from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout
|
from lbrynet.error import InsufficientFundsError, KeyFeeAboveMaxAllowed, ResolveError, DownloadSDTimeout, \
|
||||||
|
DownloadDataTimeout
|
||||||
from lbrynet.extras.wallet.manager import LbryWalletManager
|
from lbrynet.extras.wallet.manager import LbryWalletManager
|
||||||
from lbrynet.extras.daemon.analytics import AnalyticsManager
|
from lbrynet.extras.daemon.analytics import AnalyticsManager
|
||||||
from lbrynet.stream.stream_manager import StreamManager
|
from lbrynet.stream.stream_manager import StreamManager
|
||||||
|
@ -100,8 +101,9 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
file_path = os.path.join(self.server_dir, "test_file")
|
file_path = os.path.join(self.server_dir, "test_file")
|
||||||
with open(file_path, 'wb') as f:
|
with open(file_path, 'wb') as f:
|
||||||
f.write(os.urandom(20000000))
|
f.write(os.urandom(20000000))
|
||||||
descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path,
|
descriptor = await StreamDescriptor.create_stream(
|
||||||
old_sort=old_sort)
|
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
|
||||||
|
)
|
||||||
self.sd_hash = descriptor.sd_hash
|
self.sd_hash = descriptor.sd_hash
|
||||||
self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
|
self.mock_wallet, self.uri = get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
|
||||||
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
|
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
|
||||||
|
@ -114,7 +116,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
async def test_download_stop_resume_delete(self):
|
async def test_download_stop_resume_delete(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
received = []
|
received = []
|
||||||
expected_events = ['Download Started', 'Download Finished']
|
expected_events = ['Time To First Bytes', 'Download Finished']
|
||||||
|
|
||||||
async def check_post(event):
|
async def check_post(event):
|
||||||
received.append(event['event'])
|
received.append(event['event'])
|
||||||
|
@ -145,7 +147,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
|
|
||||||
await self.stream_manager.start_stream(stream)
|
await self.stream_manager.start_stream(stream)
|
||||||
await stream.downloader.stream_finished_event.wait()
|
await stream.downloader.stream_finished_event.wait()
|
||||||
await asyncio.sleep(0.01)
|
await asyncio.sleep(0, loop=self.loop)
|
||||||
self.assertTrue(stream.finished)
|
self.assertTrue(stream.finished)
|
||||||
self.assertFalse(stream.running)
|
self.assertFalse(stream.running)
|
||||||
self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file")))
|
self.assertTrue(os.path.isfile(os.path.join(self.client_dir, "test_file")))
|
||||||
|
@ -163,17 +165,20 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertEqual(stored_status, None)
|
self.assertEqual(stored_status, None)
|
||||||
self.assertListEqual(expected_events, received)
|
self.assertListEqual(expected_events, received)
|
||||||
|
|
||||||
async def _test_download_error(self, expected_error):
|
async def _test_download_error_on_start(self, expected_error, timeout=None):
|
||||||
|
with self.assertRaises(expected_error):
|
||||||
|
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout=timeout)
|
||||||
|
|
||||||
|
async def _test_download_error_analytics_on_start(self, expected_error, timeout=None):
|
||||||
received = []
|
received = []
|
||||||
|
|
||||||
async def check_post(event):
|
async def check_post(event):
|
||||||
self.assertEqual("Download Errored", event['event'])
|
self.assertEqual("Time To First Bytes", event['event'])
|
||||||
received.append(event['properties']['error'])
|
received.append(event['properties']['error'])
|
||||||
|
|
||||||
self.stream_manager.analytics_manager._post = check_post
|
self.stream_manager.analytics_manager._post = check_post
|
||||||
|
await self._test_download_error_on_start(expected_error, timeout)
|
||||||
with self.assertRaises(expected_error):
|
await asyncio.sleep(0, loop=self.loop)
|
||||||
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
|
||||||
self.assertListEqual([expected_error.__name__], received)
|
self.assertListEqual([expected_error.__name__], received)
|
||||||
|
|
||||||
async def test_insufficient_funds(self):
|
async def test_insufficient_funds(self):
|
||||||
|
@ -184,7 +189,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
'version': '_0_0_1'
|
'version': '_0_0_1'
|
||||||
}
|
}
|
||||||
await self.setup_stream_manager(10.0, fee)
|
await self.setup_stream_manager(10.0, fee)
|
||||||
await self._test_download_error(InsufficientFundsError)
|
await self._test_download_error_on_start(InsufficientFundsError)
|
||||||
|
|
||||||
async def test_fee_above_max_allowed(self):
|
async def test_fee_above_max_allowed(self):
|
||||||
fee = {
|
fee = {
|
||||||
|
@ -194,22 +199,28 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
'version': '_0_0_1'
|
'version': '_0_0_1'
|
||||||
}
|
}
|
||||||
await self.setup_stream_manager(1000000.0, fee)
|
await self.setup_stream_manager(1000000.0, fee)
|
||||||
await self._test_download_error(KeyFeeAboveMaxAllowed)
|
await self._test_download_error_on_start(KeyFeeAboveMaxAllowed)
|
||||||
|
|
||||||
async def test_resolve_error(self):
|
async def test_resolve_error(self):
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
self.uri = "fake"
|
self.uri = "fake"
|
||||||
await self._test_download_error(ResolveError)
|
await self._test_download_error_on_start(ResolveError)
|
||||||
|
|
||||||
async def test_download_timeout(self):
|
async def test_download_sd_timeout(self):
|
||||||
self.server.stop_server()
|
self.server.stop_server()
|
||||||
self.client_config.download_timeout = 1.0
|
|
||||||
await self.setup_stream_manager()
|
await self.setup_stream_manager()
|
||||||
await self._test_download_error(DownloadSDTimeout)
|
await self._test_download_error_analytics_on_start(DownloadSDTimeout, timeout=1)
|
||||||
|
|
||||||
|
async def test_download_data_timeout(self):
|
||||||
|
await self.setup_stream_manager()
|
||||||
|
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
|
||||||
|
head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash']
|
||||||
|
self.server_blob_manager.delete_blob(head_blob_hash)
|
||||||
|
await self._test_download_error_analytics_on_start(DownloadDataTimeout, timeout=1)
|
||||||
|
|
||||||
async def test_download_then_recover_stream_on_startup(self, old_sort=False):
|
async def test_download_then_recover_stream_on_startup(self, old_sort=False):
|
||||||
expected_analytics_events = [
|
expected_analytics_events = [
|
||||||
'Download Started',
|
'Time To First Bytes',
|
||||||
'Download Finished'
|
'Download Finished'
|
||||||
]
|
]
|
||||||
received_events = []
|
received_events = []
|
||||||
|
@ -223,6 +234,7 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.assertSetEqual(self.stream_manager.streams, set())
|
self.assertSetEqual(self.stream_manager.streams, set())
|
||||||
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
|
||||||
await stream.downloader.stream_finished_event.wait()
|
await stream.downloader.stream_finished_event.wait()
|
||||||
|
await asyncio.sleep(0, loop=self.loop)
|
||||||
self.stream_manager.stop()
|
self.stream_manager.stop()
|
||||||
self.client_blob_manager.stop()
|
self.client_blob_manager.stop()
|
||||||
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
|
os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash))
|
||||||
|
|
Loading…
Reference in a new issue