Compare commits

...
Sign in to create a new pull request.

7 commits

Author SHA1 Message Date
Alex Grintsvayg
e2634974e7
api accepts urls, resolves them, returns seeder count 2021-01-26 14:47:27 -05:00
Alex Grintsvayg
9ed985d6ec
prune stale peers when loading node state 2021-01-25 12:39:30 -05:00
Alex Grintsvayg
2d898f802d
improve db 2021-01-22 12:01:53 -05:00
Alex Grintsvayg
99c64021eb
up to 128 nodes 2021-01-22 09:41:51 -05:00
Alex Grintsvayg
366b0d590c
api to check seeder count, better saving of routing table 2021-01-20 17:12:20 -05:00
Alex Grintsvayg
53e683d307
16-node tracker works 2021-01-19 16:04:04 -05:00
Alex Grintsvayg
411698f306
basic tracker works 2021-01-18 14:20:25 -05:00
6 changed files with 331 additions and 11 deletions

View file

@ -3,6 +3,7 @@ import asyncio
import typing import typing
import binascii import binascii
import socket import socket
import time
from lbry.utils import resolve_host from lbry.utils import resolve_host
from lbry.dht import constants from lbry.dht import constants
from lbry.dht.peer import make_kademlia_peer from lbry.dht.peer import make_kademlia_peer
@ -17,6 +18,16 @@ if typing.TYPE_CHECKING:
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class NodeState:
def __init__(self,
routing_table_peers: typing.List[typing.Tuple[bytes, str, int, int]],
datastore: typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]):
# List[Tuple[node_id, address, udp_port, tcp_port]]
self.routing_table_peers = routing_table_peers
# List[Tuple[node_id, address, udp_port, tcp_port, blob_hash, added_at]]
self.datastore = datastore
class Node: class Node:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
@ -31,6 +42,7 @@ class Node:
self._join_task: asyncio.Task = None self._join_task: asyncio.Task = None
self._refresh_task: asyncio.Task = None self._refresh_task: asyncio.Task = None
self._storage = storage self._storage = storage
self.started_listening = asyncio.Event()
async def refresh_node(self, force_once=False): async def refresh_node(self, force_once=False):
while True: while True:
@ -103,6 +115,7 @@ class Node:
return stored_to return stored_to
def stop(self) -> None: def stop(self) -> None:
self.started_listening.clear()
if self.joined.is_set(): if self.joined.is_set():
self.joined.clear() self.joined.clear()
if self._join_task: if self._join_task:
@ -118,18 +131,35 @@ class Node:
self.listening_port = None self.listening_port = None
log.info("Stopped DHT node") log.info("Stopped DHT node")
def get_state(self) -> NodeState:
return NodeState(
routing_table_peers=[(p.node_id, p.address, p.udp_port, p.tcp_port)
for p in self.protocol.routing_table.get_peers()],
datastore=self.protocol.data_store.dump()
)
def load_state(self, state: NodeState):
now = self.loop.time()
for node_id, address, udp_port, tcp_port, blob_hash, added_at in state.datastore:
if added_at + constants.DATA_EXPIRATION < now:
continue
p = make_kademlia_peer(node_id, address, udp_port, tcp_port)
self.protocol.data_store.add_peer_to_blob(p, blob_hash)
async def start_listening(self, interface: str = '0.0.0.0') -> None: async def start_listening(self, interface: str = '0.0.0.0') -> None:
if not self.listening_port: if not self.listening_port:
self.listening_port, _ = await self.loop.create_datagram_endpoint( self.listening_port, _ = await self.loop.create_datagram_endpoint(
lambda: self.protocol, (interface, self.internal_udp_port) lambda: self.protocol, (interface, self.internal_udp_port)
) )
self.started_listening.set()
log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port) log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port)
self.protocol.start() self.protocol.start()
else: else:
log.warning("Already bound to port %s", self.listening_port) log.warning("Already bound to port %s", self.listening_port)
async def join_network(self, interface: str = '0.0.0.0', async def join_network(self, interface: str = '0.0.0.0',
known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None): known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
persisted_peers: typing.List[typing.Tuple[bytes, str, int, int]] = []):
def peers_from_urls(urls: typing.Optional[typing.List[typing.Tuple[bytes, str, int, int]]]): def peers_from_urls(urls: typing.Optional[typing.List[typing.Tuple[bytes, str, int, int]]]):
peer_addresses = [] peer_addresses = []
for node_id, address, udp_port, tcp_port in urls: for node_id, address, udp_port, tcp_port in urls:
@ -148,15 +178,15 @@ class Node:
if not self.joined.is_set(): if not self.joined.is_set():
self.joined.set() self.joined.set()
log.info( log.info(
"joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), "%s: joined dht, %i peers known in %i buckets",
bytes.fromhex(self.protocol.node_id)[:8],
len(self.protocol.routing_table.get_peers()),
self.protocol.routing_table.buckets_with_contacts() self.protocol.routing_table.buckets_with_contacts()
) )
else: else:
if self.joined.is_set(): if self.joined.is_set():
self.joined.clear() self.joined.clear()
seed_peers = peers_from_urls( seed_peers = peers_from_urls(persisted_peers) if persisted_peers else []
await self._storage.get_persisted_kademlia_peers()
) if self._storage else []
if not seed_peers: if not seed_peers:
try: try:
seed_peers.extend(peers_from_urls([ seed_peers.extend(peers_from_urls([
@ -173,8 +203,11 @@ class Node:
await asyncio.sleep(1, loop=self.loop) await asyncio.sleep(1, loop=self.loop)
def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None): def start(self, interface: str,
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
persisted_peers: typing.List[typing.Tuple[bytes, str, int, int]] = []):
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls, persisted_peers))
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT, bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,

View file

@ -68,3 +68,9 @@ class DictDataStore:
for _, stored in self._data_store.items(): for _, stored in self._data_store.items():
peers.update(set(map(lambda tup: tup[0], stored))) peers.update(set(map(lambda tup: tup[0], stored)))
return list(peers) return list(peers)
def dump(self) -> typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]:
data = []
for k, peers in self._data_store.items():
data.extend([(p.node_id, p.address, p.udp_port, p.tcp_port, k, added_at) for (p, added_at) in peers])
return data

View file

@ -287,6 +287,7 @@ class KademliaProtocol(DatagramProtocol):
self._to_add: typing.Set['KademliaPeer'] = set() self._to_add: typing.Set['KademliaPeer'] = set()
self._wakeup_routing_task = asyncio.Event(loop=self.loop) self._wakeup_routing_task = asyncio.Event(loop=self.loop)
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None self.maintaing_routing_task: typing.Optional[asyncio.Task] = None
self.event_queue = asyncio.Queue(maxsize=100)
@functools.lru_cache(128) @functools.lru_cache(128)
def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC: def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC:
@ -428,6 +429,11 @@ class KademliaProtocol(DatagramProtocol):
log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(), log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
sender_contact.address, sender_contact.udp_port) sender_contact.address, sender_contact.udp_port)
try:
self.event_queue.put_nowait((sender_contact.node_id, sender_contact.address, method, args))
except asyncio.QueueFull:
pass
if method == b'ping': if method == b'ping':
result = self.node_rpc.ping() result = self.node_rpc.ping()
elif method == b'store': elif method == b'store':

View file

@ -286,7 +286,7 @@ class TreeRoutingTable:
to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0] to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
if not to_pop: if not to_pop:
return return
log.info("join buckets %i", len(to_pop)) log.debug("%s: join buckets %i", bytes.hex(self._parent_node_id)[:8], len(to_pop))
bucket_index_to_pop = to_pop[0] bucket_index_to_pop = to_pop[0]
assert len(self.buckets[bucket_index_to_pop]) == 0 assert len(self.buckets[bucket_index_to_pop]) == 0
can_go_lower = bucket_index_to_pop - 1 >= 0 can_go_lower = bucket_index_to_pop - 1 >= 0

View file

@ -5,8 +5,10 @@ from binascii import hexlify
from itertools import chain from itertools import chain
from lbry.error import ResolveCensoredError from lbry.error import ResolveCensoredError
from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage from lbry.schema.types.v2.result_pb2 import \
from lbry.schema.types.v2.result_pb2 import Error as ErrorMessage Outputs as OutputsMessage, \
Output as OutputMessage, \
Error as ErrorMessage
INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID) INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID)
NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND) NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND)
@ -70,7 +72,7 @@ class Outputs:
__slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total', 'blocked', 'blocked_total' __slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total', 'blocked', 'blocked_total'
def __init__(self, txos: List, extra_txos: List, txs: set, def __init__(self, txos: List[OutputMessage], extra_txos: List, txs: set,
offset: int, total: int, blocked: List, blocked_total: int): offset: int, total: int, blocked: List, blocked_total: int):
self.txos = txos self.txos = txos
self.txs = txs self.txs = txs

273
scripts/tracker.py Executable file
View file

@ -0,0 +1,273 @@
#!/usr/bin/env python
import asyncio
import json
import logging
import pickle
import signal
import sqlite3
import time
from os import path
from pprint import pprint
from urllib.parse import unquote
from aioupnp import upnp, fault as upnpfault
from aiohttp import web
from lbry.wallet.network import ClientSession
from lbry.schema.result import Outputs
from lbry.wallet.transaction import Transaction
from binascii import hexlify, unhexlify
from lbry.dht import node, peer
log = logging.getLogger("lbry")
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)
async def main():
data_dir = "/home/grin/code/lbry/sdk"
state_dir = data_dir + '/nodestate/'
loop = asyncio.get_event_loop()
try:
loop.add_signal_handler(signal.SIGINT, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
except NotImplementedError:
pass # Not implemented on Windows
db = sqlite3.connect(data_dir + "/tracker.sqlite3")
db.execute('CREATE TABLE IF NOT EXISTS announce (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)')
db.execute('CREATE UNIQUE INDEX IF NOT EXISTS node_id_hash_idx ON announce (node_id, hash)')
spv_host = 'spv13.lbry.com'
wallet_client = ClientSession(network=None, server=(spv_host, 50001))
await wallet_client.create_connection()
asyncio.create_task(run_web_api(loop, db, wallet_client))
num_nodes = 128
u = await upnp.UPnP.discover()
external_ip = await u.get_external_ip()
try:
nodes = await start_nodes(loop, num_nodes, external_ip, state_dir)
await asyncio.gather(*map(lambda n: n.started_listening.wait(), nodes), loop=loop)
print("joined")
queue = asyncio.Queue(maxsize=100*num_nodes)
for n in nodes:
asyncio.create_task(drain_events(n, queue))
while True:
(n, node_id, ip, method, args) = await queue.get()
local_node_id = bytes.hex(n.protocol.node_id)
if method != b'store':
# print(f"{local_node_id[:8]}: {method} from {bytes.hex(node_id)} ({ip})")
continue
if len(args)< 5:
print(f'malformed args to Store')
pprint(args)
continue
blob_hash, token, port, original_publisher_id, age = args[:5]
print(f"STORE to {local_node_id[:8]} from {bytes.hex(node_id)[:8]} ({ip}) for blob {bytes.hex(blob_hash)[:8]}")
try:
cur = db.cursor()
cur.execute(
'''
INSERT INTO announce (local_id, hash, node_id, ip, port, timestamp) VALUES (?,?,?,?,?,?)
ON CONFLICT (node_id, hash) DO UPDATE SET
local_id=excluded.local_id, ip=excluded.ip, port=excluded.port, timestamp=excluded.timestamp
''',
(local_node_id, bytes.hex(blob_hash), bytes.hex(node_id), ip, port, int(time.time()))
)
db.commit()
cur.close()
except sqlite3.Error as err:
print("failed sqlite insert", err)
finally:
print("shutting down")
for n in nodes:
node_id = bytes.hex(n.protocol.node_id)
n.stop()
# print(f'deleting upnp port mapping {n.protocol.udp_port}')
try:
await u.delete_port_mapping(n.protocol.udp_port, "UDP")
except upnpfault.UPnPError:
pass
state = n.get_state()
# keep existing rt if there is one
if len(state.routing_table_peers) == 0 and path.exists(state_dir + node_id):
with open(state_dir + node_id, 'rb') as f:
existing_state = pickle.load(f)
if len(existing_state.routing_table_peers) > 0:
state.routing_table_peers = existing_state.routing_table_peers
print(f'rt empty on save, but old rt was recovered ({len(state.routing_table_peers)} peers)')
with open(state_dir + node_id, 'wb') as f:
# pprint(state.routing_table_peers)
# pprint(state.datastore)
print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
pickle.dump(state, f)
await wallet_client.close()
db.close()
class ShutdownErr(BaseException):
pass
def shutdown():
print("got interrupt signal...")
raise ShutdownErr()
def make_node_id(i: int, n: int) -> str:
"""
split dht address space into N chunks and return the first id of the i'th chunk
make_node_id(0,n) returns 000...000 for any n
"""
if not 0 <= i < n:
raise ValueError("i must be between 0 (inclusive) and n (exclusive)")
bytes_in_id = 48
return "{0:0{1}x}".format(i * ((2**8)**bytes_in_id // n), bytes_in_id*2)
async def start_nodes(loop, num_nodes, external_ip, state_dir):
start_port = 4445
known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)]
peer_manager = peer.PeerManager(loop)
nodes = []
for i in range(num_nodes):
node_id = make_node_id(i, num_nodes)
# pprint(node_id)
port = start_port + i
# await u.get_next_mapping(port, "UDP", "lbry dht tracker") # not necessary, i just opened ports in router
n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip,
udp_port=port, internal_udp_port=port, peer_port=3333)
persisted_peers = []
if path.exists(state_dir + node_id):
with open(state_dir + node_id, 'rb') as f:
state = pickle.load(f)
# pprint(state.routing_table_peers)
# pprint(state.datastore)
print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
n.load_state(state)
persisted_peers = state.routing_table_peers
# if len(persisted_peers) == 0 and len(state.datastore) > 0:
# peers_to_import = map(lambda p: (p[0], p[1], p[2], p[3]), n.get_state().datastore)
# persisted_peers.extend(peers_to_import)
# print(f'{node_id[:8]}: rt is empty but we recovered {len(state.datastore)} '
# f'peers from the datastore. {len(peers_to_import)} of those were recent enough to import')
n.start("0.0.0.0", known_node_urls, persisted_peers)
nodes.append(n)
return nodes
async def drain_events(n, q):
# print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}')
while True:
(node_id, ip, method, args) = await n.protocol.event_queue.get()
try:
q.put_nowait((n, node_id, ip, method, args))
except asyncio.QueueFull:
pass
async def run_web_api(loop, db, wallet_client):
try:
app = web.Application(loop=loop)
app['db'] = db
app['wallet_client'] = wallet_client
app.add_routes([
web.get('/', api_handler),
web.get('/seeds/hash/{hash}', seeds_handler),
web.get('/seeds/url/{url}', url_handler),
])
# server = web.Server(api_handler, loop=loop)
# runner = web.ServerRunner(server)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
except Exception as err:
pprint(err)
await runner.cleanup()
async def seeds_handler(request):
blobhash = request.match_info['hash']
db = request.app['db']
try:
cur = db.cursor()
c = cur.execute("""
select count(distinct(node_id)) from announce where hash = ? and timestamp > strftime('%s','now','-1 day')
""", (blobhash,)).fetchone()[0]
cur.close()
return web.Response(text=json.dumps({'seeds': c})+"\n")
except Exception as err:
return web.Response(text=json.dumps({'error': err})+"\n")
async def url_handler(request):
url = unquote(request.match_info['url'])
log.warning(url)
db = request.app['db']
wallet_client = request.app['wallet_client']
try:
sd_hash = await get_sd_hash(wallet_client, url)
if sd_hash is None:
return web.Response(text=json.dumps({'error': 'Could not get sd hash for url', 'url': url})+"\n")
seeds = get_seeds(db, sd_hash)
return web.Response(text=json.dumps({'url': url, 'sd_hash': sd_hash, 'seeds': seeds})+"\n")
except Exception as err:
return web.Response(text=json.dumps({'error': err})+"\n")
def get_seeds(db, blobhash):
cur = db.cursor()
c = cur.execute(
"select count(distinct(node_id)) from announce where hash = ? and timestamp > strftime('%s','now','-1 day')",
(blobhash,)
).fetchone()[0]
cur.close()
return c
async def get_sd_hash(wallet_client, url):
try:
resolved_txos = Outputs.from_base64(await wallet_client.send_request('blockchain.claimtrie.resolve', [url]))
if not resolved_txos.txos:
return None
raw_txs = await wallet_client.send_request('blockchain.transaction.get_batch', [txid for (txid, height) in resolved_txos.txs])
txo_proto = resolved_txos.txos[0]
txid = txo_proto.tx_hash[::-1].hex()
raw_tx_hex, _ = raw_txs[txid]
txo = Transaction(bytes.fromhex(raw_tx_hex)).outputs[txo_proto.nout]
return txo.claim.stream.source.sd_hash
except Exception as err: # claim is not a stream, stream has no source, protobuf err, etc
if isinstance(err, asyncio.CancelledError):
raise err
log.exception("failed to get sd_hash")
return None
async def api_handler(request):
return web.Response(text="tracker OK")
if __name__ == "__main__":
try:
asyncio.run(main())
except ShutdownErr:
pass