Compare commits
7 commits
Author | SHA1 | Date | |
---|---|---|---|
|
e2634974e7 | ||
|
9ed985d6ec | ||
|
2d898f802d | ||
|
99c64021eb | ||
|
366b0d590c | ||
|
53e683d307 | ||
|
411698f306 |
6 changed files with 331 additions and 11 deletions
|
@ -3,6 +3,7 @@ import asyncio
|
||||||
import typing
|
import typing
|
||||||
import binascii
|
import binascii
|
||||||
import socket
|
import socket
|
||||||
|
import time
|
||||||
from lbry.utils import resolve_host
|
from lbry.utils import resolve_host
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.peer import make_kademlia_peer
|
from lbry.dht.peer import make_kademlia_peer
|
||||||
|
@ -17,6 +18,16 @@ if typing.TYPE_CHECKING:
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class NodeState:
|
||||||
|
def __init__(self,
|
||||||
|
routing_table_peers: typing.List[typing.Tuple[bytes, str, int, int]],
|
||||||
|
datastore: typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]):
|
||||||
|
# List[Tuple[node_id, address, udp_port, tcp_port]]
|
||||||
|
self.routing_table_peers = routing_table_peers
|
||||||
|
# List[Tuple[node_id, address, udp_port, tcp_port, blob_hash, added_at]]
|
||||||
|
self.datastore = datastore
|
||||||
|
|
||||||
|
|
||||||
class Node:
|
class Node:
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
||||||
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
|
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
|
||||||
|
@ -31,6 +42,7 @@ class Node:
|
||||||
self._join_task: asyncio.Task = None
|
self._join_task: asyncio.Task = None
|
||||||
self._refresh_task: asyncio.Task = None
|
self._refresh_task: asyncio.Task = None
|
||||||
self._storage = storage
|
self._storage = storage
|
||||||
|
self.started_listening = asyncio.Event()
|
||||||
|
|
||||||
async def refresh_node(self, force_once=False):
|
async def refresh_node(self, force_once=False):
|
||||||
while True:
|
while True:
|
||||||
|
@ -103,6 +115,7 @@ class Node:
|
||||||
return stored_to
|
return stored_to
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
|
self.started_listening.clear()
|
||||||
if self.joined.is_set():
|
if self.joined.is_set():
|
||||||
self.joined.clear()
|
self.joined.clear()
|
||||||
if self._join_task:
|
if self._join_task:
|
||||||
|
@ -118,18 +131,35 @@ class Node:
|
||||||
self.listening_port = None
|
self.listening_port = None
|
||||||
log.info("Stopped DHT node")
|
log.info("Stopped DHT node")
|
||||||
|
|
||||||
|
def get_state(self) -> NodeState:
|
||||||
|
return NodeState(
|
||||||
|
routing_table_peers=[(p.node_id, p.address, p.udp_port, p.tcp_port)
|
||||||
|
for p in self.protocol.routing_table.get_peers()],
|
||||||
|
datastore=self.protocol.data_store.dump()
|
||||||
|
)
|
||||||
|
|
||||||
|
def load_state(self, state: NodeState):
|
||||||
|
now = self.loop.time()
|
||||||
|
for node_id, address, udp_port, tcp_port, blob_hash, added_at in state.datastore:
|
||||||
|
if added_at + constants.DATA_EXPIRATION < now:
|
||||||
|
continue
|
||||||
|
p = make_kademlia_peer(node_id, address, udp_port, tcp_port)
|
||||||
|
self.protocol.data_store.add_peer_to_blob(p, blob_hash)
|
||||||
|
|
||||||
async def start_listening(self, interface: str = '0.0.0.0') -> None:
|
async def start_listening(self, interface: str = '0.0.0.0') -> None:
|
||||||
if not self.listening_port:
|
if not self.listening_port:
|
||||||
self.listening_port, _ = await self.loop.create_datagram_endpoint(
|
self.listening_port, _ = await self.loop.create_datagram_endpoint(
|
||||||
lambda: self.protocol, (interface, self.internal_udp_port)
|
lambda: self.protocol, (interface, self.internal_udp_port)
|
||||||
)
|
)
|
||||||
|
self.started_listening.set()
|
||||||
log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port)
|
log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port)
|
||||||
self.protocol.start()
|
self.protocol.start()
|
||||||
else:
|
else:
|
||||||
log.warning("Already bound to port %s", self.listening_port)
|
log.warning("Already bound to port %s", self.listening_port)
|
||||||
|
|
||||||
async def join_network(self, interface: str = '0.0.0.0',
|
async def join_network(self, interface: str = '0.0.0.0',
|
||||||
known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
|
known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||||
|
persisted_peers: typing.List[typing.Tuple[bytes, str, int, int]] = []):
|
||||||
def peers_from_urls(urls: typing.Optional[typing.List[typing.Tuple[bytes, str, int, int]]]):
|
def peers_from_urls(urls: typing.Optional[typing.List[typing.Tuple[bytes, str, int, int]]]):
|
||||||
peer_addresses = []
|
peer_addresses = []
|
||||||
for node_id, address, udp_port, tcp_port in urls:
|
for node_id, address, udp_port, tcp_port in urls:
|
||||||
|
@ -148,15 +178,15 @@ class Node:
|
||||||
if not self.joined.is_set():
|
if not self.joined.is_set():
|
||||||
self.joined.set()
|
self.joined.set()
|
||||||
log.info(
|
log.info(
|
||||||
"joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()),
|
"%s: joined dht, %i peers known in %i buckets",
|
||||||
|
bytes.fromhex(self.protocol.node_id)[:8],
|
||||||
|
len(self.protocol.routing_table.get_peers()),
|
||||||
self.protocol.routing_table.buckets_with_contacts()
|
self.protocol.routing_table.buckets_with_contacts()
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
if self.joined.is_set():
|
if self.joined.is_set():
|
||||||
self.joined.clear()
|
self.joined.clear()
|
||||||
seed_peers = peers_from_urls(
|
seed_peers = peers_from_urls(persisted_peers) if persisted_peers else []
|
||||||
await self._storage.get_persisted_kademlia_peers()
|
|
||||||
) if self._storage else []
|
|
||||||
if not seed_peers:
|
if not seed_peers:
|
||||||
try:
|
try:
|
||||||
seed_peers.extend(peers_from_urls([
|
seed_peers.extend(peers_from_urls([
|
||||||
|
@ -173,8 +203,11 @@ class Node:
|
||||||
|
|
||||||
await asyncio.sleep(1, loop=self.loop)
|
await asyncio.sleep(1, loop=self.loop)
|
||||||
|
|
||||||
def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
|
def start(self, interface: str,
|
||||||
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
|
known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||||
|
persisted_peers: typing.List[typing.Tuple[bytes, str, int, int]] = []):
|
||||||
|
|
||||||
|
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls, persisted_peers))
|
||||||
|
|
||||||
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
||||||
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
|
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
|
||||||
|
|
|
@ -68,3 +68,9 @@ class DictDataStore:
|
||||||
for _, stored in self._data_store.items():
|
for _, stored in self._data_store.items():
|
||||||
peers.update(set(map(lambda tup: tup[0], stored)))
|
peers.update(set(map(lambda tup: tup[0], stored)))
|
||||||
return list(peers)
|
return list(peers)
|
||||||
|
|
||||||
|
def dump(self) -> typing.List[typing.Tuple[bytes, str, int, int, bytes, float]]:
|
||||||
|
data = []
|
||||||
|
for k, peers in self._data_store.items():
|
||||||
|
data.extend([(p.node_id, p.address, p.udp_port, p.tcp_port, k, added_at) for (p, added_at) in peers])
|
||||||
|
return data
|
||||||
|
|
|
@ -287,6 +287,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self._to_add: typing.Set['KademliaPeer'] = set()
|
self._to_add: typing.Set['KademliaPeer'] = set()
|
||||||
self._wakeup_routing_task = asyncio.Event(loop=self.loop)
|
self._wakeup_routing_task = asyncio.Event(loop=self.loop)
|
||||||
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None
|
self.maintaing_routing_task: typing.Optional[asyncio.Task] = None
|
||||||
|
self.event_queue = asyncio.Queue(maxsize=100)
|
||||||
|
|
||||||
@functools.lru_cache(128)
|
@functools.lru_cache(128)
|
||||||
def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC:
|
def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC:
|
||||||
|
@ -428,6 +429,11 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
|
log.debug("%s:%i RECV CALL %s %s:%i", self.external_ip, self.udp_port, message.method.decode(),
|
||||||
sender_contact.address, sender_contact.udp_port)
|
sender_contact.address, sender_contact.udp_port)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.event_queue.put_nowait((sender_contact.node_id, sender_contact.address, method, args))
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
pass
|
||||||
|
|
||||||
if method == b'ping':
|
if method == b'ping':
|
||||||
result = self.node_rpc.ping()
|
result = self.node_rpc.ping()
|
||||||
elif method == b'store':
|
elif method == b'store':
|
||||||
|
|
|
@ -286,7 +286,7 @@ class TreeRoutingTable:
|
||||||
to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
|
to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
|
||||||
if not to_pop:
|
if not to_pop:
|
||||||
return
|
return
|
||||||
log.info("join buckets %i", len(to_pop))
|
log.debug("%s: join buckets %i", bytes.hex(self._parent_node_id)[:8], len(to_pop))
|
||||||
bucket_index_to_pop = to_pop[0]
|
bucket_index_to_pop = to_pop[0]
|
||||||
assert len(self.buckets[bucket_index_to_pop]) == 0
|
assert len(self.buckets[bucket_index_to_pop]) == 0
|
||||||
can_go_lower = bucket_index_to_pop - 1 >= 0
|
can_go_lower = bucket_index_to_pop - 1 >= 0
|
||||||
|
|
|
@ -5,8 +5,10 @@ from binascii import hexlify
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
|
|
||||||
from lbry.error import ResolveCensoredError
|
from lbry.error import ResolveCensoredError
|
||||||
from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage
|
from lbry.schema.types.v2.result_pb2 import \
|
||||||
from lbry.schema.types.v2.result_pb2 import Error as ErrorMessage
|
Outputs as OutputsMessage, \
|
||||||
|
Output as OutputMessage, \
|
||||||
|
Error as ErrorMessage
|
||||||
|
|
||||||
INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID)
|
INVALID = ErrorMessage.Code.Name(ErrorMessage.INVALID)
|
||||||
NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND)
|
NOT_FOUND = ErrorMessage.Code.Name(ErrorMessage.NOT_FOUND)
|
||||||
|
@ -70,7 +72,7 @@ class Outputs:
|
||||||
|
|
||||||
__slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total', 'blocked', 'blocked_total'
|
__slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total', 'blocked', 'blocked_total'
|
||||||
|
|
||||||
def __init__(self, txos: List, extra_txos: List, txs: set,
|
def __init__(self, txos: List[OutputMessage], extra_txos: List, txs: set,
|
||||||
offset: int, total: int, blocked: List, blocked_total: int):
|
offset: int, total: int, blocked: List, blocked_total: int):
|
||||||
self.txos = txos
|
self.txos = txos
|
||||||
self.txs = txs
|
self.txs = txs
|
||||||
|
|
273
scripts/tracker.py
Executable file
273
scripts/tracker.py
Executable file
|
@ -0,0 +1,273 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import pickle
|
||||||
|
import signal
|
||||||
|
import sqlite3
|
||||||
|
import time
|
||||||
|
from os import path
|
||||||
|
from pprint import pprint
|
||||||
|
from urllib.parse import unquote
|
||||||
|
|
||||||
|
from aioupnp import upnp, fault as upnpfault
|
||||||
|
from aiohttp import web
|
||||||
|
|
||||||
|
from lbry.wallet.network import ClientSession
|
||||||
|
from lbry.schema.result import Outputs
|
||||||
|
from lbry.wallet.transaction import Transaction
|
||||||
|
from binascii import hexlify, unhexlify
|
||||||
|
from lbry.dht import node, peer
|
||||||
|
|
||||||
|
log = logging.getLogger("lbry")
|
||||||
|
log.addHandler(logging.StreamHandler())
|
||||||
|
log.setLevel(logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
data_dir = "/home/grin/code/lbry/sdk"
|
||||||
|
state_dir = data_dir + '/nodestate/'
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
try:
|
||||||
|
loop.add_signal_handler(signal.SIGINT, shutdown)
|
||||||
|
loop.add_signal_handler(signal.SIGTERM, shutdown)
|
||||||
|
except NotImplementedError:
|
||||||
|
pass # Not implemented on Windows
|
||||||
|
|
||||||
|
db = sqlite3.connect(data_dir + "/tracker.sqlite3")
|
||||||
|
db.execute('CREATE TABLE IF NOT EXISTS announce (local_id TEXT, hash TEXT, node_id TEXT, ip TEXT, port INT, timestamp INT)')
|
||||||
|
db.execute('CREATE UNIQUE INDEX IF NOT EXISTS node_id_hash_idx ON announce (node_id, hash)')
|
||||||
|
|
||||||
|
spv_host = 'spv13.lbry.com'
|
||||||
|
wallet_client = ClientSession(network=None, server=(spv_host, 50001))
|
||||||
|
await wallet_client.create_connection()
|
||||||
|
|
||||||
|
asyncio.create_task(run_web_api(loop, db, wallet_client))
|
||||||
|
|
||||||
|
num_nodes = 128
|
||||||
|
u = await upnp.UPnP.discover()
|
||||||
|
external_ip = await u.get_external_ip()
|
||||||
|
|
||||||
|
try:
|
||||||
|
nodes = await start_nodes(loop, num_nodes, external_ip, state_dir)
|
||||||
|
|
||||||
|
await asyncio.gather(*map(lambda n: n.started_listening.wait(), nodes), loop=loop)
|
||||||
|
print("joined")
|
||||||
|
|
||||||
|
queue = asyncio.Queue(maxsize=100*num_nodes)
|
||||||
|
for n in nodes:
|
||||||
|
asyncio.create_task(drain_events(n, queue))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
(n, node_id, ip, method, args) = await queue.get()
|
||||||
|
local_node_id = bytes.hex(n.protocol.node_id)
|
||||||
|
if method != b'store':
|
||||||
|
# print(f"{local_node_id[:8]}: {method} from {bytes.hex(node_id)} ({ip})")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if len(args)< 5:
|
||||||
|
print(f'malformed args to Store')
|
||||||
|
pprint(args)
|
||||||
|
continue
|
||||||
|
|
||||||
|
blob_hash, token, port, original_publisher_id, age = args[:5]
|
||||||
|
print(f"STORE to {local_node_id[:8]} from {bytes.hex(node_id)[:8]} ({ip}) for blob {bytes.hex(blob_hash)[:8]}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cur = db.cursor()
|
||||||
|
cur.execute(
|
||||||
|
'''
|
||||||
|
INSERT INTO announce (local_id, hash, node_id, ip, port, timestamp) VALUES (?,?,?,?,?,?)
|
||||||
|
ON CONFLICT (node_id, hash) DO UPDATE SET
|
||||||
|
local_id=excluded.local_id, ip=excluded.ip, port=excluded.port, timestamp=excluded.timestamp
|
||||||
|
''',
|
||||||
|
(local_node_id, bytes.hex(blob_hash), bytes.hex(node_id), ip, port, int(time.time()))
|
||||||
|
)
|
||||||
|
db.commit()
|
||||||
|
cur.close()
|
||||||
|
except sqlite3.Error as err:
|
||||||
|
print("failed sqlite insert", err)
|
||||||
|
finally:
|
||||||
|
print("shutting down")
|
||||||
|
for n in nodes:
|
||||||
|
node_id = bytes.hex(n.protocol.node_id)
|
||||||
|
n.stop()
|
||||||
|
# print(f'deleting upnp port mapping {n.protocol.udp_port}')
|
||||||
|
try:
|
||||||
|
await u.delete_port_mapping(n.protocol.udp_port, "UDP")
|
||||||
|
except upnpfault.UPnPError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
state = n.get_state()
|
||||||
|
# keep existing rt if there is one
|
||||||
|
if len(state.routing_table_peers) == 0 and path.exists(state_dir + node_id):
|
||||||
|
with open(state_dir + node_id, 'rb') as f:
|
||||||
|
existing_state = pickle.load(f)
|
||||||
|
if len(existing_state.routing_table_peers) > 0:
|
||||||
|
state.routing_table_peers = existing_state.routing_table_peers
|
||||||
|
print(f'rt empty on save, but old rt was recovered ({len(state.routing_table_peers)} peers)')
|
||||||
|
with open(state_dir + node_id, 'wb') as f:
|
||||||
|
# pprint(state.routing_table_peers)
|
||||||
|
# pprint(state.datastore)
|
||||||
|
print(f'{node_id[:8]}: saved {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
|
||||||
|
pickle.dump(state, f)
|
||||||
|
await wallet_client.close()
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
class ShutdownErr(BaseException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def shutdown():
|
||||||
|
print("got interrupt signal...")
|
||||||
|
raise ShutdownErr()
|
||||||
|
|
||||||
|
|
||||||
|
def make_node_id(i: int, n: int) -> str:
|
||||||
|
"""
|
||||||
|
split dht address space into N chunks and return the first id of the i'th chunk
|
||||||
|
make_node_id(0,n) returns 000...000 for any n
|
||||||
|
"""
|
||||||
|
if not 0 <= i < n:
|
||||||
|
raise ValueError("i must be between 0 (inclusive) and n (exclusive)")
|
||||||
|
bytes_in_id = 48
|
||||||
|
return "{0:0{1}x}".format(i * ((2**8)**bytes_in_id // n), bytes_in_id*2)
|
||||||
|
|
||||||
|
|
||||||
|
async def start_nodes(loop, num_nodes, external_ip, state_dir):
|
||||||
|
start_port = 4445
|
||||||
|
known_node_urls = [("lbrynet1.lbry.com", 4444), ("lbrynet2.lbry.com", 4444), ("lbrynet3.lbry.com", 4444)]
|
||||||
|
peer_manager = peer.PeerManager(loop)
|
||||||
|
|
||||||
|
nodes = []
|
||||||
|
for i in range(num_nodes):
|
||||||
|
node_id = make_node_id(i, num_nodes)
|
||||||
|
# pprint(node_id)
|
||||||
|
|
||||||
|
port = start_port + i
|
||||||
|
# await u.get_next_mapping(port, "UDP", "lbry dht tracker") # not necessary, i just opened ports in router
|
||||||
|
|
||||||
|
n = node.Node(loop, peer_manager, node_id=bytes.fromhex(node_id), external_ip=external_ip,
|
||||||
|
udp_port=port, internal_udp_port=port, peer_port=3333)
|
||||||
|
|
||||||
|
persisted_peers = []
|
||||||
|
if path.exists(state_dir + node_id):
|
||||||
|
with open(state_dir + node_id, 'rb') as f:
|
||||||
|
state = pickle.load(f)
|
||||||
|
# pprint(state.routing_table_peers)
|
||||||
|
# pprint(state.datastore)
|
||||||
|
print(f'{node_id[:8]}: loaded {len(state.routing_table_peers)} rt peers, {len(state.datastore)} in store')
|
||||||
|
n.load_state(state)
|
||||||
|
persisted_peers = state.routing_table_peers
|
||||||
|
# if len(persisted_peers) == 0 and len(state.datastore) > 0:
|
||||||
|
# peers_to_import = map(lambda p: (p[0], p[1], p[2], p[3]), n.get_state().datastore)
|
||||||
|
# persisted_peers.extend(peers_to_import)
|
||||||
|
# print(f'{node_id[:8]}: rt is empty but we recovered {len(state.datastore)} '
|
||||||
|
# f'peers from the datastore. {len(peers_to_import)} of those were recent enough to import')
|
||||||
|
|
||||||
|
n.start("0.0.0.0", known_node_urls, persisted_peers)
|
||||||
|
nodes.append(n)
|
||||||
|
return nodes
|
||||||
|
|
||||||
|
|
||||||
|
async def drain_events(n, q):
|
||||||
|
# print(f'drain started on {bytes.hex(n.protocol.node_id)[:8]}')
|
||||||
|
while True:
|
||||||
|
(node_id, ip, method, args) = await n.protocol.event_queue.get()
|
||||||
|
try:
|
||||||
|
q.put_nowait((n, node_id, ip, method, args))
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def run_web_api(loop, db, wallet_client):
|
||||||
|
try:
|
||||||
|
app = web.Application(loop=loop)
|
||||||
|
app['db'] = db
|
||||||
|
app['wallet_client'] = wallet_client
|
||||||
|
app.add_routes([
|
||||||
|
web.get('/', api_handler),
|
||||||
|
web.get('/seeds/hash/{hash}', seeds_handler),
|
||||||
|
web.get('/seeds/url/{url}', url_handler),
|
||||||
|
])
|
||||||
|
# server = web.Server(api_handler, loop=loop)
|
||||||
|
# runner = web.ServerRunner(server)
|
||||||
|
runner = web.AppRunner(app)
|
||||||
|
await runner.setup()
|
||||||
|
site = web.TCPSite(runner, 'localhost', 8080)
|
||||||
|
await site.start()
|
||||||
|
except Exception as err:
|
||||||
|
pprint(err)
|
||||||
|
await runner.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
async def seeds_handler(request):
|
||||||
|
blobhash = request.match_info['hash']
|
||||||
|
db = request.app['db']
|
||||||
|
try:
|
||||||
|
cur = db.cursor()
|
||||||
|
c = cur.execute("""
|
||||||
|
select count(distinct(node_id)) from announce where hash = ? and timestamp > strftime('%s','now','-1 day')
|
||||||
|
""", (blobhash,)).fetchone()[0]
|
||||||
|
cur.close()
|
||||||
|
return web.Response(text=json.dumps({'seeds': c})+"\n")
|
||||||
|
except Exception as err:
|
||||||
|
return web.Response(text=json.dumps({'error': err})+"\n")
|
||||||
|
|
||||||
|
|
||||||
|
async def url_handler(request):
|
||||||
|
url = unquote(request.match_info['url'])
|
||||||
|
log.warning(url)
|
||||||
|
db = request.app['db']
|
||||||
|
wallet_client = request.app['wallet_client']
|
||||||
|
|
||||||
|
try:
|
||||||
|
sd_hash = await get_sd_hash(wallet_client, url)
|
||||||
|
if sd_hash is None:
|
||||||
|
return web.Response(text=json.dumps({'error': 'Could not get sd hash for url', 'url': url})+"\n")
|
||||||
|
seeds = get_seeds(db, sd_hash)
|
||||||
|
return web.Response(text=json.dumps({'url': url, 'sd_hash': sd_hash, 'seeds': seeds})+"\n")
|
||||||
|
except Exception as err:
|
||||||
|
return web.Response(text=json.dumps({'error': err})+"\n")
|
||||||
|
|
||||||
|
|
||||||
|
def get_seeds(db, blobhash):
|
||||||
|
cur = db.cursor()
|
||||||
|
c = cur.execute(
|
||||||
|
"select count(distinct(node_id)) from announce where hash = ? and timestamp > strftime('%s','now','-1 day')",
|
||||||
|
(blobhash,)
|
||||||
|
).fetchone()[0]
|
||||||
|
cur.close()
|
||||||
|
return c
|
||||||
|
|
||||||
|
|
||||||
|
async def get_sd_hash(wallet_client, url):
|
||||||
|
try:
|
||||||
|
resolved_txos = Outputs.from_base64(await wallet_client.send_request('blockchain.claimtrie.resolve', [url]))
|
||||||
|
if not resolved_txos.txos:
|
||||||
|
return None
|
||||||
|
raw_txs = await wallet_client.send_request('blockchain.transaction.get_batch', [txid for (txid, height) in resolved_txos.txs])
|
||||||
|
txo_proto = resolved_txos.txos[0]
|
||||||
|
txid = txo_proto.tx_hash[::-1].hex()
|
||||||
|
raw_tx_hex, _ = raw_txs[txid]
|
||||||
|
txo = Transaction(bytes.fromhex(raw_tx_hex)).outputs[txo_proto.nout]
|
||||||
|
return txo.claim.stream.source.sd_hash
|
||||||
|
except Exception as err: # claim is not a stream, stream has no source, protobuf err, etc
|
||||||
|
if isinstance(err, asyncio.CancelledError):
|
||||||
|
raise err
|
||||||
|
log.exception("failed to get sd_hash")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def api_handler(request):
|
||||||
|
return web.Response(text="tracker OK")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except ShutdownErr:
|
||||||
|
pass
|
Loading…
Reference in a new issue