From e2634974e7b47029f132783445790db81aa33321 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 26 Jan 2021 14:47:27 -0500 Subject: [PATCH] api accepts urls, resolves them, returns seeder count --- lbry/dht/protocol/routing_table.py | 2 +- lbry/schema/result.py | 8 ++- scripts/tracker.py | 109 +++++++++++++++++++++++------ 3 files changed, 93 insertions(+), 26 deletions(-) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 815ea7b98..e5d487c40 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -286,7 +286,7 @@ class TreeRoutingTable: to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0] if not to_pop: return - log.info("%s: join buckets %i", bytes.hex(self._parent_node_id)[:8], len(to_pop)) + log.debug("%s: join buckets %i", bytes.hex(self._parent_node_id)[:8], len(to_pop)) bucket_index_to_pop = to_pop[0] assert len(self.buckets[bucket_index_to_pop]) == 0 can_go_lower = bucket_index_to_pop - 1 >= 0 diff --git a/lbry/schema/result.py b/lbry/schema/result.py index 9ecca5888..2013cac3b 100644 --- a/lbry/schema/result.py +++ b/lbry/schema/result.py @@ -5,8 +5,10 @@ from binascii import hexlify from itertools import chain from lbry.error import ResolveCensoredError -from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage -from lbry.schema.types.v2.result_pb2 import Error as ErrorMessage +from lbry.schema.types.v2.result_pb2 import \ + Outputs as OutputsMessage, \ + Output as OutputMessage, \ + Error as ErrorMessage INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID) NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND) @@ -70,7 +72,7 @@ class Outputs: __slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total', 'blocked', 'blocked_total' - def __init__(self, txos: List, extra_txos: List, txs: set, + def __init__(self, txos: List[OutputMessage], extra_txos: List, txs: set, offset: int, total: int, blocked: List, blocked_total: int): self.txos = txos self.txs = txs diff --git a/scripts/tracker.py b/scripts/tracker.py index 4f77582da..23d8c9cd7 100755 --- a/scripts/tracker.py +++ b/scripts/tracker.py @@ -1,18 +1,23 @@ #!/usr/bin/env python import asyncio +import json import logging -import signal -import time -import sqlite3 import pickle +import signal +import sqlite3 +import time from os import path from pprint import pprint +from urllib.parse import unquote + from aioupnp import upnp, fault as upnpfault from aiohttp import web -import json - +from lbry.wallet.network import ClientSession +from lbry.schema.result import Outputs +from lbry.wallet.transaction import Transaction +from binascii import hexlify, unhexlify from lbry.dht import node, peer log = logging.getLogger("lbry") @@ -35,7 +40,11 @@ async def main(): db.execute('CREATE TABLE IF NOT EXISTS announce (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)') db.execute('CREATE UNIQUE INDEX IF NOT EXISTS node_id_hash_idx ON announce (node_id, hash)') - asyncio.create_task(run_web_api(loop, db)) + spv_host = 'spv13.lbry.com' + wallet_client = ClientSession(network=None, server=(spv_host, 50001)) + await wallet_client.create_connection() + + asyncio.create_task(run_web_api(loop, db, wallet_client)) num_nodes = 128 u = await upnp.UPnP.discover() @@ -58,6 +67,11 @@ async def main(): # print(f"{local_node_id[:8]}: {method} from {bytes.hex(node_id)} ({ip})") continue + if len(args)< 5: + print(f'malformed args to Store') + pprint(args) + continue + blob_hash, token, port, original_publisher_id, age = args[:5] print(f"STORE to {local_node_id[:8]} from {bytes.hex(node_id)[:8]} ({ip}) for blob {bytes.hex(blob_hash)[:8]}") @@ -74,7 +88,7 @@ async def main(): db.commit() cur.close() except sqlite3.Error as err: - print("failed insert", err) + print("failed sqlite insert", err) finally: print("shutting down") for n in nodes: @@ -99,6 +113,7 @@ async def main(): # pprint(state.datastore) print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store') pickle.dump(state, f) + await wallet_client.close() db.close() @@ -159,7 +174,7 @@ async def start_nodes(loop, num_nodes, external_ip, state_dir): async def drain_events(n, q): - print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}') + # print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}') while True: (node_id, ip, method, args) = await n.protocol.event_queue.get() try: @@ -168,20 +183,25 @@ async def drain_events(n, q): pass -async def run_web_api(loop, db): - app = web.Application(loop=loop) - app['db'] = db - app.add_routes([ - web.get('/', api_handler), - web.get('/seeds/{hash}', seeds_handler), - ]) - # server = web.Server(api_handler, loop=loop) - # runner = web.ServerRunner(server) - runner = web.AppRunner(app) - await runner.setup() - site = web.TCPSite(runner, 'localhost', 8080) - await site.start() - # web.run_app(app) +async def run_web_api(loop, db, wallet_client): + try: + app = web.Application(loop=loop) + app['db'] = db + app['wallet_client'] = wallet_client + app.add_routes([ + web.get('/', api_handler), + web.get('/seeds/hash/{hash}', seeds_handler), + web.get('/seeds/url/{url}', url_handler), + ]) + # server = web.Server(api_handler, loop=loop) + # runner = web.ServerRunner(server) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, 'localhost', 8080) + await site.start() + except Exception as err: + pprint(err) + await runner.cleanup() async def seeds_handler(request): @@ -198,9 +218,54 @@ async def seeds_handler(request): return web.Response(text=json.dumps({'error': err})+"\n") +async def url_handler(request): + url = unquote(request.match_info['url']) + log.warning(url) + db = request.app['db'] + wallet_client = request.app['wallet_client'] + + try: + sd_hash = await get_sd_hash(wallet_client, url) + if sd_hash is None: + return web.Response(text=json.dumps({'error': 'Could not get sd hash for url', 'url': url})+"\n") + seeds = get_seeds(db, sd_hash) + return web.Response(text=json.dumps({'url': url, 'sd_hash': sd_hash, 'seeds': seeds})+"\n") + except Exception as err: + return web.Response(text=json.dumps({'error': err})+"\n") + + +def get_seeds(db, blobhash): + cur = db.cursor() + c = cur.execute( + "select count(distinct(node_id)) from announce where hash = ? and timestamp > strftime('%s','now','-1 day')", + (blobhash,) + ).fetchone()[0] + cur.close() + return c + + +async def get_sd_hash(wallet_client, url): + try: + resolved_txos = Outputs.from_base64(await wallet_client.send_request('blockchain.claimtrie.resolve', [url])) + if not resolved_txos.txos: + return None + raw_txs = await wallet_client.send_request('blockchain.transaction.get_batch', [txid for (txid, height) in resolved_txos.txs]) + txo_proto = resolved_txos.txos[0] + txid = txo_proto.tx_hash[::-1].hex() + raw_tx_hex, _ = raw_txs[txid] + txo = Transaction(bytes.fromhex(raw_tx_hex)).outputs[txo_proto.nout] + return txo.claim.stream.source.sd_hash + except Exception as err: # claim is not a stream, stream has no source, protobuf err, etc + if isinstance(err, asyncio.CancelledError): + raise err + log.exception("failed to get sd_hash") + return None + + async def api_handler(request): return web.Response(text="tracker OK") + if __name__ == "__main__": try: asyncio.run(main())