forked from LBRYCommunity/lbry-sdk
Profile get using rpc calls
This commit is contained in:
parent
ab3db2e626
commit
2de155f90f
1 changed files with 75 additions and 180 deletions
|
@ -1,127 +1,18 @@
|
|||
# Script to download the sd_blobs of the front page
|
||||
# Author: hackrush(hackrush@lbry.io)
|
||||
import asyncio
|
||||
|
||||
from twisted.internet import asyncioreactor
|
||||
asyncioreactor.install()
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import keyring
|
||||
import logging
|
||||
import tempfile
|
||||
import time
|
||||
import treq
|
||||
import typing
|
||||
import shutil
|
||||
|
||||
from binascii import unhexlify
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
from lbrynet import conf, log_support
|
||||
from lbrynet.p2p.Peer import Peer
|
||||
from lbrynet.p2p.BlobManager import DiskBlobManager
|
||||
from lbrynet.p2p import SinglePeerDownloader
|
||||
from lbrynet.extras.daemon.ComponentManager import ComponentManager
|
||||
from lbrynet.extras.daemon.Components import DatabaseComponent
|
||||
from lbrynet.extras.daemon.PeerFinder import DummyPeerFinder
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
component_manager = None
|
||||
from aiohttp import ClientConnectorError
|
||||
from lbrynet import conf
|
||||
from lbrynet.extras.daemon.auth.client import UnAuthAPIClient
|
||||
|
||||
|
||||
def f2d(future):
|
||||
return defer.Deferred.fromFuture(asyncio.ensure_future(future))
|
||||
|
||||
|
||||
class FakeAnalytics:
|
||||
@property
|
||||
def is_started(self):
|
||||
return True
|
||||
|
||||
def send_server_startup_success(self):
|
||||
pass
|
||||
|
||||
def send_server_startup(self):
|
||||
pass
|
||||
|
||||
def shutdown(self):
|
||||
pass
|
||||
|
||||
def send_upnp_setup_success_fail(self, success, status):
|
||||
pass
|
||||
|
||||
|
||||
class TempDatabaseComponent(DatabaseComponent):
|
||||
@defer.inlineCallbacks
|
||||
def start(self):
|
||||
self.storage = SQLiteStorage(tempfile.tempdir)
|
||||
yield self.storage.setup()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self):
|
||||
yield self.storage.stop()
|
||||
self.storage = None
|
||||
|
||||
|
||||
class MultiplePeerFinder(DummyPeerFinder):
|
||||
def __init__(self, peer):
|
||||
super().__init__()
|
||||
# This is just a dud!! It has absolutely no use
|
||||
self.peer = peer
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):
|
||||
dht = component_manager.get_component('dht')
|
||||
peers = yield dht.iterativeFindValue(unhexlify(blob_hash))
|
||||
peers = [Peer(host, port) for node_id, host, port in peers]
|
||||
return peers
|
||||
|
||||
|
||||
# monkeypatching custom peer finder
|
||||
SinglePeerDownloader.SinglePeerFinder = MultiplePeerFinder
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def download_blob_for_uri(blob_hash: typing.Text):
|
||||
tmp_blob_manager = DiskBlobManager(tempfile.tempdir, component_manager.get_component('database'))
|
||||
|
||||
downloader = SinglePeerDownloader.SinglePeerDownloader()
|
||||
downloader.setup(component_manager.get_component('wallet'))
|
||||
|
||||
peer = Peer(None, None) # required for the log statements in SinglePeerDownloader
|
||||
result = yield downloader.download_blob_from_peer(peer, 180, blob_hash, tmp_blob_manager)
|
||||
tmp_blob_manager.stop()
|
||||
return result
|
||||
|
||||
|
||||
async def get_sd_hash_from_uri(uri: typing.Text):
|
||||
wallet = component_manager.get_component("wallet")
|
||||
resolved = await wallet.resolve(uri)
|
||||
sd_hash = resolved[uri]["claim"]["value"]["stream"]["source"]["source"]
|
||||
print(sd_hash)
|
||||
return sd_hash
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def benchmark_performance(uris: list, output_path: str) -> dict:
|
||||
results = dict()
|
||||
for uri in uris:
|
||||
sd_hash = yield f2d(get_sd_hash_from_uri(uri))
|
||||
|
||||
start = time.time()
|
||||
was_download_successful = yield download_blob_for_uri(sd_hash)
|
||||
end = time.time()
|
||||
|
||||
if was_download_successful:
|
||||
results[uri] = end - start
|
||||
else:
|
||||
results[uri] = "Could not download"
|
||||
|
||||
print(results[uri], uri, file=open(output_path, "a"))
|
||||
|
||||
return results
|
||||
def kill_loop():
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.stop()
|
||||
# loop.close()
|
||||
|
||||
|
||||
def extract_uris(response):
|
||||
|
@ -133,85 +24,89 @@ def extract_uris(response):
|
|||
return uris
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_frontpage_uris():
|
||||
async def get_frontpage_uris():
|
||||
kr = keyring.get_keyring()
|
||||
c = kr.get_preferred_collection()
|
||||
lbry_keyring = None
|
||||
for col in c.get_all_items():
|
||||
if col.get_label() == "LBRY/auth_token":
|
||||
lbry_keyring = col
|
||||
lbry_keyring = lbry_keyring.get_secret().decode("ascii")
|
||||
response = yield treq.get("https://api.lbry.io/file/list_homepage?auth_token={}".format(lbry_keyring))
|
||||
if response.code != 200:
|
||||
log.error("API returned non 200 code!!")
|
||||
reactor.callLater(0, reactor.stop)
|
||||
break
|
||||
|
||||
body = yield response.json()
|
||||
if lbry_keyring is None:
|
||||
print("An auth token is needed to fetch the front page uris")
|
||||
print("To generate the auth token, run the LBRY app at least once")
|
||||
print("Then run the script again")
|
||||
|
||||
lbry_keyring = lbry_keyring.get_secret().decode("ascii")
|
||||
|
||||
session = aiohttp.ClientSession()
|
||||
response = await session.get("https://api.lbry.io/file/list_homepage?auth_token={}".format(lbry_keyring))
|
||||
if response.status != 200:
|
||||
print("API returned non 200 code!!")
|
||||
await session.close()
|
||||
kill_loop()
|
||||
|
||||
body = await response.json()
|
||||
await session.close()
|
||||
uris = extract_uris(body['data']['Uris'])
|
||||
return uris
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def main(output_path):
|
||||
global component_manager
|
||||
yield component_manager.setup()
|
||||
yield component_manager.get_component('dht')._join_deferred
|
||||
uris = yield get_frontpage_uris()
|
||||
# uris = [
|
||||
# "linux-kernel-patch-replaces-naughty#e4fc36ac921970b3d15138781d697841dfb745f7",
|
||||
# "linux-thursday-dec-1-2018-linux-y#062615ab5b974f5536b7e3d47731e7c74279ea23",
|
||||
# "what"
|
||||
# ]
|
||||
results = yield benchmark_performance(uris)
|
||||
yield component_manager.stop()
|
||||
shutil.rmtree(tempfile.tempdir)
|
||||
async def main():
|
||||
uris = await get_frontpage_uris()
|
||||
api = await UnAuthAPIClient.from_url(conf.settings.get_api_connection_string())
|
||||
|
||||
max_len = len(max(uris, key=len))
|
||||
_sum = num = 0
|
||||
for result, value in results.items():
|
||||
if not isinstance(value, str):
|
||||
_sum += value
|
||||
num += 1
|
||||
print("{0:>{1:d}s}: {2}".format(result, max_len, value))
|
||||
try:
|
||||
await api.status()
|
||||
except (ClientConnectorError, ConnectionError):
|
||||
await api.session.close()
|
||||
kill_loop()
|
||||
print("Could not connect to daemon. Are you sure it's running?")
|
||||
return 1
|
||||
|
||||
avg = _sum / num
|
||||
print("Average for {0:d} downloaded files is: {1:f}".format(num, avg))
|
||||
could_not_download = len(uris) - num
|
||||
smiley = ":-)" if could_not_download == 0 else ":-("
|
||||
print("{0:d} files could not be downloaded {1:s}".format(could_not_download, smiley))
|
||||
results = dict()
|
||||
|
||||
reactor.callLater(0, reactor.stop)
|
||||
# uris = ["what", "holi", "aweqwfq"]
|
||||
_sum = 0
|
||||
downloaded = len(uris)
|
||||
|
||||
for uri in uris:
|
||||
start = time.time()
|
||||
resp = await api.call("get", {"uri": uri})
|
||||
end = time.time()
|
||||
|
||||
await api.call("file_delete", {"delete_from_download_dir": True,
|
||||
"delete_all": True,
|
||||
"claim_name": uri
|
||||
})
|
||||
|
||||
time_taken = end - start
|
||||
results[uri] = time_taken
|
||||
_sum += time_taken
|
||||
|
||||
if resp.get('error'):
|
||||
results[uri] = "Could not download"
|
||||
downloaded -= 1
|
||||
_sum -= time_taken
|
||||
|
||||
print(results[uri], uri)
|
||||
|
||||
avg = _sum / downloaded
|
||||
print()
|
||||
print("Average time taken:", avg)
|
||||
print("Downloaded {} Not Downloaded {}".format(downloaded, len(uris) - downloaded))
|
||||
|
||||
await api.session.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--wallet_path", help="Enter None to use a temporary directory")
|
||||
parser.add_argument("--output_path")
|
||||
parser.add_argument("--data_dir")
|
||||
parser.add_argument("--wallet_dir")
|
||||
parser.add_argument("--download_directory")
|
||||
args = parser.parse_args()
|
||||
wallet_path = args.wallet_path
|
||||
output_path = args.output_path
|
||||
|
||||
tempfile.tempdir = tempfile.mkdtemp()
|
||||
if wallet_path == "None":
|
||||
wallet_path = tempfile.tempdir
|
||||
|
||||
log_support.configure_console(level='INFO')
|
||||
log_support.configure_twisted()
|
||||
|
||||
conf.initialize_settings()
|
||||
conf.settings.set('download_directory', tempfile.tempdir)
|
||||
conf.settings.set('lbryum_wallet_dir', wallet_path)
|
||||
conf.settings.set('data_dir', tempfile.tempdir)
|
||||
conf.settings.set('use_upnp', False)
|
||||
|
||||
skip_components = ["blob_manager", "hash_announcer", "file_manager", "peer_protocol_server", "reflector",
|
||||
"exchange_rate_manager", "rate_limiter", "payment_rate_manager"]
|
||||
component_manager = ComponentManager(
|
||||
analytics_manager=FakeAnalytics,
|
||||
skip_components=skip_components,
|
||||
database=TempDatabaseComponent
|
||||
)
|
||||
|
||||
main(output_path)
|
||||
reactor.run()
|
||||
conf.initialize_settings(data_dir=args.data_dir, wallet_dir=args.wallet_dir, download_dir=args.download_directory)
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
|
|
Loading…
Reference in a new issue