refactor and fix ttfb for new api

This commit is contained in:
Victor Shyba 2019-07-16 23:32:05 -03:00 committed by Jack Robison
parent 95017b5fc8
commit 0f8cf46626
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -1,43 +1,19 @@
import os import os
import sys
import json import json
import argparse import argparse
import asyncio import asyncio
import aiohttp
import time import time
import aiohttp
from aiohttp import ClientConnectorError from aiohttp import ClientConnectorError
from lbry import __version__ from lbry import __version__
from lbry.blob.blob_file import MAX_BLOB_SIZE from lbry.blob.blob_file import MAX_BLOB_SIZE
from lbry.conf import Config from lbry.conf import Config
from lbry.schema.uri import parse_lbry_uri
from lbry.extras.daemon.client import daemon_rpc from lbry.extras.daemon.client import daemon_rpc
from lbry.extras import system_info from lbry.extras import system_info
def extract_uris(response):
uris = list()
for key in response:
for value in response[key]:
uris.append(value)
return uris
async def get_frontpage_uris():
session = aiohttp.ClientSession()
try:
response = await session.get("https://api.lbry.com/file/list_homepage", timeout=10.0)
if response.status != 200:
print("API returned non 200 code!!")
return
body = await response.json()
await session.close()
uris = extract_uris(body['data']['Uris'])
return uris
finally:
await session.close()
async def report_to_slack(output, webhook): async def report_to_slack(output, webhook):
payload = { payload = {
"text": f"lbrynet {__version__} ({system_info.get_platform()['platform']}) time to first byte:\n{output}" "text": f"lbrynet {__version__} ({system_info.get_platform()['platform']}) time to first byte:\n{output}"
@ -58,120 +34,119 @@ def variance(times):
return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3) return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3)
async def wait_for_done(conf, uri, timeout): async def wait_for_done(conf, claim_name, timeout):
name = uri.split("#")[0] blobs_completed, last_completed = 0, time.time()
last_complete = 0
hang_count = 0
while True: while True:
files = await daemon_rpc(conf, "file_list", claim_name=name) file = (await daemon_rpc(conf, "file_list", claim_name=claim_name))[0]
file = files[0]
if file['status'] in ['finished', 'stopped']: if file['status'] in ['finished', 'stopped']:
return True, file['blobs_completed'], file['blobs_in_stream'] return True, file['blobs_completed'], file['blobs_in_stream']
if last_complete < int(file['blobs_completed']): elif blobs_completed < int(file['blobs_completed']):
hang_count = 0 blobs_completed, last_completed = int(file['blobs_completed']), time.time()
last_complete = int(file['blobs_completed']) elif (time.time() - last_completed) > timeout:
else:
hang_count += 1
await asyncio.sleep(1.0)
if hang_count > timeout:
return False, file['blobs_completed'], file['blobs_in_stream'] return False, file['blobs_completed'], file['blobs_in_stream']
await asyncio.sleep(1.0)
async def main(uris=None, cmd_args=None): async def main(cmd_args=None):
if not uris: print('Time to first byte started using parameters:')
uris = await get_frontpage_uris() for key, value in vars(cmd_args).items():
print(f"{key}: {value}")
conf = Config() conf = Config()
url_to_claim = {}
try: try:
await daemon_rpc(conf, 'status') for page in range(1, cmd_args.download_pages + 1):
start = time.time()
response = await daemon_rpc(
conf, 'claim_search', page=page, claim_type='stream', fee_amount=None if cmd_args.allow_fees else 0,
order_by=['trending_global'], no_totals=True
)
if 'error' in response or not response.get('items'):
print(f'Error getting claim list page {page}:')
print(response)
return 1
else:
url_to_claim.update({
claim['permanent_url']: claim for claim in response['items']
})
print(f'Claim search page {page} took: {time.time() - start}')
except (ClientConnectorError, ConnectionError): except (ClientConnectorError, ConnectionError):
print("Could not connect to daemon") print("Could not connect to daemon")
return 1 return 1
print(f"Checking {len(uris)} uris from the front page")
print("**********************************************") print("**********************************************")
resolvable = [] print(f"Attempting to download {len(url_to_claim)} claim_search streams")
async def __resolve(name):
resolved = await daemon_rpc(conf, 'resolve', urls=[name])
if 'error' not in resolved.get(name, {}):
if ("fee" not in resolved[name]['claim']['value']) or cmd_args.allow_fees:
resolvable.append(name)
else:
print(f"{name} has a fee, skipping it")
else:
print(f"failed to resolve {name}: {resolved[name]['error']}")
await asyncio.gather(*(__resolve(name) for name in uris))
print(f"attempting to download {len(resolvable)}/{len(uris)} frontpage streams")
first_byte_times = [] first_byte_times = []
download_speeds = [] download_speeds = []
download_successes = [] download_successes = []
failed_to_start = [] failed_to = {}
download_failures = []
for uri in resolvable: await asyncio.gather(*(
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=claim['name'])
for claim in url_to_claim.values() if not cmd_args.keep_files
))
for i, uri in enumerate(resolvable): for i, (url, claim) in enumerate(url_to_claim.items()):
start = time.time() start = time.time()
try: response = await daemon_rpc(conf, 'get', uri=url, save_file=not cmd_args.head_blob_only)
await daemon_rpc(conf, 'get', uri=uri, save_file=True) if 'error' in response:
first_byte = time.time() print(f"{i + 1}/{len(url_to_claim)} - failed to start {url}: {response['error']}")
first_byte_times.append(first_byte - start) failed_to[url] = 'start'
print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}")
if not cmd_args.head_blob_only:
downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(
conf, uri, cmd_args.stall_download_timeout
)
if downloaded:
download_successes.append(uri)
else:
download_failures.append(uri)
mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2)
download_speeds.append(mbs)
print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at "
f"{mbs}mb/s")
except Exception as e:
print(f"{i + 1}/{len(uris)} - failed to start {uri}: {e}")
failed_to_start.append(uri)
if cmd_args.exit_on_error: if cmd_args.exit_on_error:
return return
if cmd_args.delete_after_download or cmd_args.head_blob_only: continue
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) first_byte = time.time()
first_byte_times.append(first_byte - start)
print(f"{i + 1}/{len(url_to_claim)} - {first_byte - start} {url}")
if not cmd_args.head_blob_only:
downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(
conf, claim['name'], cmd_args.stall_download_timeout
)
if downloaded:
download_successes.append(url)
else:
failed_to[url] = 'finish'
mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2)
download_speeds.append(mbs)
print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {url} at "
f"{mbs}mb/s")
if not cmd_args.keep_files:
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=claim['name'])
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
print("**********************************************") print("**********************************************")
result = f"Started {len(first_byte_times)} of {len(resolvable)} attempted front page streams\n" \ result = f"Started {len(first_byte_times)} of {len(url_to_claim)} attempted front page streams\n"
f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \ if first_byte_times:
result += f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \
f"Best first byte time: {round(min(first_byte_times), 2)}\n" \ f"Best first byte time: {round(min(first_byte_times), 2)}\n" \
f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \ f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \
f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \ f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \
f"Variance: {variance(first_byte_times)}\n" f"Variance: {variance(first_byte_times)}\n"
if not cmd_args.head_blob_only: if download_successes:
result += f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \ result += f"Downloaded {len(download_successes)}/{len(url_to_claim)}\n" \
f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \ f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \
f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \ f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \
f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \ f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \
f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n" f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n"
if failed_to_start: for reason in ('start', 'finish'):
result += "\nFailed to start:" + "\n".join([f for f in failed_to_start]) failures = [url for url, why in failed_to.items() if reason == why]
if download_failures: if failures:
result += "\nFailed to finish:" + "\n".join([f for f in download_failures]) result += f"\nFailed to {reason}:\n" + "\n".join(failures)
print(result) print(result)
webhook = os.environ.get('TTFB_SLACK_TOKEN', None) webhook = os.environ.get('TTFB_SLACK_TOKEN', None)
if webhook: if webhook:
await report_to_slack(result, webhook) await report_to_slack(result, webhook)
return 0
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
#parser.add_argument("--data_dir")
#parser.add_argument("--wallet_dir")
#parser.add_argument("--download_directory")
parser.add_argument("--allow_fees", action='store_true') parser.add_argument("--allow_fees", action='store_true')
parser.add_argument("--exit_on_error", action='store_true') parser.add_argument("--exit_on_error", action='store_true')
parser.add_argument("--stall_download_timeout", default=10, type=int) parser.add_argument("--stall_download_timeout", default=0, type=int)
parser.add_argument("--delete_after_download", action='store_true') parser.add_argument("--keep_files", action='store_true')
parser.add_argument("--head_blob_only", action='store_true') parser.add_argument("--head_blob_only", action='store_true')
asyncio.run(main(cmd_args=parser.parse_args())) parser.add_argument("--download_pages", type=int, default=10)
sys.exit(asyncio.run(main(cmd_args=parser.parse_args())) or 0)