forked from LBRYCommunity/lbry-sdk
update and add scripts
This commit is contained in:
parent
5bb94d744e
commit
c01728a6a7
4 changed files with 158 additions and 78 deletions
|
@ -1,7 +1,9 @@
|
|||
import curses
|
||||
import time
|
||||
import logging
|
||||
from lbrynet.daemon import get_client
|
||||
import asyncio
|
||||
from lbrynet import conf
|
||||
from lbrynet.extras.daemon.client import LBRYAPIClient
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log.addHandler(logging.FileHandler("dht contacts.log"))
|
||||
|
@ -9,8 +11,6 @@ log.addHandler(logging.FileHandler("dht contacts.log"))
|
|||
log.setLevel(logging.INFO)
|
||||
stdscr = curses.initscr()
|
||||
|
||||
api = get_client()
|
||||
|
||||
|
||||
def init_curses():
|
||||
curses.noecho()
|
||||
|
@ -26,79 +26,47 @@ def teardown_curses():
|
|||
curses.endwin()
|
||||
|
||||
|
||||
def refresh(last_contacts, last_blobs):
|
||||
def refresh(routing_table_info):
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
try:
|
||||
routing_table_info = api.routing_table_get()
|
||||
node_id = routing_table_info['node_id']
|
||||
except:
|
||||
node_id = "UNKNOWN"
|
||||
routing_table_info = {
|
||||
'buckets': {},
|
||||
'contacts': [],
|
||||
'blob_hashes': []
|
||||
}
|
||||
node_id = routing_table_info['node_id']
|
||||
|
||||
for y in range(height):
|
||||
stdscr.addstr(y, 0, " " * (width - 1))
|
||||
|
||||
buckets = routing_table_info['buckets']
|
||||
stdscr.addstr(0, 0, "node id: %s" % node_id)
|
||||
stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" %
|
||||
(len(buckets), len(routing_table_info['contacts']),
|
||||
len(routing_table_info['blob_hashes'])))
|
||||
stdscr.addstr(0, 0, f"node id: {node_id}")
|
||||
stdscr.addstr(1, 0, f"{len(buckets)} buckets")
|
||||
|
||||
y = 3
|
||||
for i in sorted(buckets.keys()):
|
||||
for i in range(len(buckets)):
|
||||
stdscr.addstr(y, 0, "bucket %s" % i)
|
||||
y += 1
|
||||
for h in sorted(buckets[i], key=lambda x: x['node_id'].decode('hex')):
|
||||
stdscr.addstr(y, 0, '%s (%s:%i) - %i blobs' % (h['node_id'], h['address'], h['port'],
|
||||
len(h['blobs'])))
|
||||
for peer in buckets[str(i)]:
|
||||
stdscr.addstr(y, 0, f"{peer['node_id'][:8]} ({peer['address']}:{peer['udp_port']})")
|
||||
y += 1
|
||||
y += 1
|
||||
|
||||
new_contacts = set(routing_table_info['contacts']) - last_contacts
|
||||
lost_contacts = last_contacts - set(routing_table_info['contacts'])
|
||||
|
||||
if new_contacts:
|
||||
for c in new_contacts:
|
||||
log.debug("added contact %s", c)
|
||||
if lost_contacts:
|
||||
for c in lost_contacts:
|
||||
log.info("lost contact %s", c)
|
||||
|
||||
new_blobs = set(routing_table_info['blob_hashes']) - last_blobs
|
||||
lost_blobs = last_blobs - set(routing_table_info['blob_hashes'])
|
||||
|
||||
if new_blobs:
|
||||
for c in new_blobs:
|
||||
log.debug("added blob %s", c)
|
||||
if lost_blobs:
|
||||
for c in lost_blobs:
|
||||
log.info("lost blob %s", c)
|
||||
|
||||
stdscr.addstr(y + 1, 0, str(time.time()))
|
||||
stdscr.refresh()
|
||||
return set(routing_table_info['contacts']), set(routing_table_info['blob_hashes'])
|
||||
|
||||
|
||||
def do_main():
|
||||
c = None
|
||||
last_contacts, last_blobs = set(), set()
|
||||
while c not in [ord('q'), ord('Q')]:
|
||||
last_contacts, last_blobs = refresh(last_contacts, last_blobs)
|
||||
c = stdscr.getch()
|
||||
time.sleep(0.1)
|
||||
async def main():
|
||||
conf.initialize_settings()
|
||||
api = await LBRYAPIClient.get_client()
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
init_curses()
|
||||
do_main()
|
||||
c = None
|
||||
while c not in [ord('q'), ord('Q')]:
|
||||
routing_info = await api.routing_table_get()
|
||||
refresh(routing_info)
|
||||
c = stdscr.getch()
|
||||
time.sleep(0.1)
|
||||
finally:
|
||||
await api.session.close()
|
||||
teardown_curses()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
asyncio.run(main())
|
||||
|
|
|
@ -2,7 +2,6 @@ import os
|
|||
import re
|
||||
import json
|
||||
import inspect
|
||||
from textwrap import dedent
|
||||
from lbrynet.extras.daemon.Daemon import Daemon
|
||||
|
||||
|
||||
|
|
33
scripts/standalone_blob_server.py
Normal file
33
scripts/standalone_blob_server.py
Normal file
|
@ -0,0 +1,33 @@
|
|||
import sys
|
||||
import os
|
||||
import asyncio
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.blob_exchange.server import BlobServer
|
||||
from lbrynet.schema.address import decode_address
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
|
||||
|
||||
async def main(address: str):
|
||||
try:
|
||||
decode_address(address)
|
||||
except:
|
||||
print(f"'{address}' is not a valid lbrycrd address")
|
||||
return 1
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
storage = SQLiteStorage(os.path.expanduser("~/.lbrynet/lbrynet.sqlite"))
|
||||
await storage.open()
|
||||
blob_manager = BlobFileManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage)
|
||||
await blob_manager.setup()
|
||||
|
||||
server = await loop.create_server(
|
||||
lambda: BlobServer(loop, blob_manager, address),
|
||||
'0.0.0.0', 4444)
|
||||
try:
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
finally:
|
||||
await storage.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main(sys.argv[1]))
|
|
@ -1,12 +1,15 @@
|
|||
import os
|
||||
import json
|
||||
import argparse
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import time
|
||||
|
||||
from aiohttp import ClientConnectorError
|
||||
from lbrynet import conf
|
||||
from lbrynet import conf, __version__
|
||||
from lbrynet.schema.uri import parse_lbry_uri
|
||||
from lbrynet.extras.daemon.DaemonConsole import LBRYAPIClient
|
||||
from lbrynet.extras.daemon.client import LBRYAPIClient
|
||||
from lbrynet.extras import system_info, cli
|
||||
|
||||
|
||||
def extract_uris(response):
|
||||
|
@ -33,21 +36,80 @@ async def get_frontpage_uris():
|
|||
await session.close()
|
||||
|
||||
|
||||
async def main():
|
||||
uris = await get_frontpage_uris()
|
||||
print("got %i uris" % len(uris))
|
||||
api = await LBRYAPIClient.get_client()
|
||||
async def report_to_slack(output, webhook):
|
||||
payload = {
|
||||
"text": f"lbrynet {__version__} ({system_info.get_platform()['platform']}) time to first byte:\n{output}"
|
||||
}
|
||||
async with aiohttp.request('post', webhook, data=json.dumps(payload)):
|
||||
pass
|
||||
|
||||
|
||||
def confidence(times, z):
|
||||
mean = sum(times) / len(times)
|
||||
standard_dev = (sum(((t - sum(times) / len(times)) ** 2.0 for t in times)) / len(times)) ** 0.5
|
||||
err = (z * standard_dev) / (len(times) ** 0.5)
|
||||
return f"{round(mean, 3) + round(err, 3)}s"
|
||||
|
||||
|
||||
def variance(times):
|
||||
mean = sum(times) / len(times)
|
||||
return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3)
|
||||
|
||||
|
||||
async def wait_for_done(api, uri):
|
||||
name = uri.split("#")[0]
|
||||
last_complete = 0
|
||||
hang_count = 0
|
||||
while True:
|
||||
files = await api.file_list(claim_name=name)
|
||||
file = files[0]
|
||||
if file['status'] in ['finished', 'stopped']:
|
||||
return True, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed'])
|
||||
if last_complete < int(file['blobs_completed']):
|
||||
print(f"{file['blobs_completed']}/{file['blobs_in_stream']}...")
|
||||
hang_count = 0
|
||||
last_complete = int(file['blobs_completed'])
|
||||
else:
|
||||
hang_count += 1
|
||||
await asyncio.sleep(1.0)
|
||||
if hang_count > 30:
|
||||
return False, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed'])
|
||||
|
||||
|
||||
async def main(start_daemon=True, uris=None):
|
||||
if not uris:
|
||||
uris = await get_frontpage_uris()
|
||||
api = await LBRYAPIClient.get_client()
|
||||
daemon = None
|
||||
try:
|
||||
await api.status()
|
||||
except (ClientConnectorError, ConnectionError):
|
||||
await api.session.close()
|
||||
print("Could not connect to daemon. Are you sure it's running?")
|
||||
return
|
||||
if start_daemon:
|
||||
print("Could not connect to running daemon, starting...")
|
||||
daemon = await cli.start_daemon(console_output=False)
|
||||
await daemon.component_manager.started.wait()
|
||||
print("Started daemon")
|
||||
return await main(start_daemon=False, uris=uris)
|
||||
print("Could not connect to daemon")
|
||||
return 1
|
||||
print(f"Checking {len(uris)} uris from the front page")
|
||||
print("**********************************************")
|
||||
|
||||
resolvable = []
|
||||
for name in uris:
|
||||
resolved = await api.resolve(uri=name)
|
||||
if 'error' not in resolved.get(name, {}):
|
||||
resolvable.append(name)
|
||||
|
||||
print(f"{len(resolvable)}/{len(uris)} are resolvable")
|
||||
|
||||
first_byte_times = []
|
||||
downloaded_times = []
|
||||
failures = []
|
||||
download_failures = []
|
||||
|
||||
for uri in uris:
|
||||
for uri in resolvable:
|
||||
await api.call(
|
||||
"file_delete", {
|
||||
"delete_from_download_dir": True,
|
||||
|
@ -56,37 +118,55 @@ async def main():
|
|||
}
|
||||
)
|
||||
|
||||
for i, uri in enumerate(uris):
|
||||
for i, uri in enumerate(resolvable):
|
||||
start = time.time()
|
||||
try:
|
||||
await api.call("get", {"uri": uri})
|
||||
first_byte = time.time()
|
||||
first_byte_times.append(first_byte - start)
|
||||
print(f"{i + 1}/{len(uris)} - {first_byte - start} {uri}")
|
||||
print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}")
|
||||
# downloaded, msg, blobs_in_stream = await wait_for_done(api, uri)
|
||||
# if downloaded:
|
||||
# downloaded_times.append((time.time()-start) / downloaded)
|
||||
# print(f"{i + 1}/{len(uris)} - downloaded @ {(time.time()-start) / blobs_in_stream}, {msg} {uri}")
|
||||
# else:
|
||||
# print(f"failed to downlload {uri}, got {msg}")
|
||||
# download_failures.append(uri)
|
||||
except:
|
||||
print(f"{i + 1}/{len(uris)} - timed out in {time.time() - start} {uri}")
|
||||
print(f"{i + 1}/{len(uris)} - timeout in {time.time() - start} {uri}")
|
||||
failures.append(uri)
|
||||
await api.call(
|
||||
"file_delete", {
|
||||
"delete_from_download_dir": True,
|
||||
"claim_name": parse_lbry_uri(uri).name
|
||||
}
|
||||
)
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
avg = sum(first_byte_times) / len(first_byte_times)
|
||||
print()
|
||||
print(f"Average time to first byte: {avg} ({len(first_byte_times)} streams)")
|
||||
print(f"Started {len(first_byte_times)} Timed out {len(uris) - len(first_byte_times)}")
|
||||
|
||||
print("**********************************************")
|
||||
result = f"Tried to start downloading {len(resolvable)} streams from the front page\n" \
|
||||
f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}\n" \
|
||||
f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}\n" \
|
||||
f"Variance: {variance(first_byte_times)}\n" \
|
||||
f"Started {len(first_byte_times)}/{len(resolvable)} streams"
|
||||
if failures:
|
||||
nt = '\n\t'
|
||||
result += f"\nFailures:\n\t{nt.join([f for f in failures])}"
|
||||
print(result)
|
||||
await api.session.close()
|
||||
if daemon:
|
||||
await daemon.shutdown()
|
||||
webhook = os.environ.get('TTFB_SLACK_TOKEN', None)
|
||||
if webhook:
|
||||
await report_to_slack(result, webhook)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--data_dir", default=None)
|
||||
parser.add_argument("--wallet_dir", default=None)
|
||||
parser.add_argument("--download_directory", default=None)
|
||||
parser.add_argument("--data_dir")
|
||||
parser.add_argument("--wallet_dir")
|
||||
parser.add_argument("--download_directory")
|
||||
args = parser.parse_args()
|
||||
conf.initialize_settings(
|
||||
data_dir=args.data_dir, wallet_dir=args.wallet_dir, download_dir=args.download_directory
|
||||
)
|
||||
|
||||
conf.initialize_settings()
|
||||
asyncio.run(main())
|
||||
|
|
Loading…
Reference in a new issue