2022-03-05 02:12:11 -03:00
import random
2022-03-10 21:15:39 -03:00
import socket
2022-03-10 21:10:44 -03:00
import string
2022-03-05 02:12:11 -03:00
import struct
import asyncio
import logging
2022-03-08 00:58:18 -03:00
import time
2022-04-03 23:20:02 -03:00
import ipaddress
2022-03-05 02:12:11 -03:00
from collections import namedtuple
2022-03-12 02:17:37 -03:00
from functools import reduce
2022-03-10 21:10:44 -03:00
from typing import Optional
2022-03-05 02:12:11 -03:00
2022-04-03 23:20:02 -03:00
from lbry.dht.node import get_kademlia_peers_from_hosts
2022-03-09 20:11:30 -03:00
from lbry.utils import resolve_host, async_timed_cache, cache_concurrent
2022-03-07 23:35:12 -03:00
from lbry.wallet.stream import StreamController
2022-03-10 21:10:44 -03:00
from lbry import version
2022-03-05 02:42:58 -03:00
2022-03-05 02:12:11 -03:00
log = logging.getLogger(__name__)
2022-03-09 20:16:18 -03:00
2022-03-12 02:16:18 -03:00
PREFIX = 'LB' # todo: PR BEP20 to add ourselves
2022-05-11 14:35:15 -03:00
2022-03-05 02:21:15 -03:00
# see: http://bittorrent.org/beps/bep_0015.html and http://xbtt.sourceforge.net/udp_tracker_protocol.html
2022-03-05 02:12:11 -03:00
ConnectRequest = namedtuple("ConnectRequest", ["connection_id", "action", "transaction_id"])
ConnectResponse = namedtuple("ConnectResponse", ["action", "transaction_id", "connection_id"])
AnnounceRequest = namedtuple("AnnounceRequest",
["connection_id", "action", "transaction_id", "info_hash", "peer_id", "downloaded", "left",
"uploaded", "event", "ip_addr", "key", "num_want", "port"])
AnnounceResponse = namedtuple("AnnounceResponse",
["action", "transaction_id", "interval", "leechers", "seeders", "peers"])
CompactIPv4Peer = namedtuple("CompactPeer", ["address", "port"])
ScrapeRequest = namedtuple("ScrapeRequest", ["connection_id", "action", "transaction_id", "infohashes"])
ScrapeResponse = namedtuple("ScrapeResponse", ["action", "transaction_id", "items"])
ScrapeResponseItem = namedtuple("ScrapeResponseItem", ["seeders", "completed", "leechers"])
ErrorResponse = namedtuple("ErrorResponse", ["action", "transaction_id", "message"])
2022-05-06 04:01:01 -03:00
structs = {
2022-03-05 02:12:11 -03:00
ConnectRequest: struct.Struct(">QII"),
ConnectResponse: struct.Struct(">IIQ"),
AnnounceRequest: struct.Struct(">QII20s20sQQQIIIiH"),
AnnounceResponse: struct.Struct(">IIIII"),
CompactIPv4Peer: struct.Struct(">IH"),
ScrapeRequest: struct.Struct(">QII"),
ScrapeResponse: struct.Struct(">II"),
ScrapeResponseItem: struct.Struct(">III"),
ErrorResponse: struct.Struct(">II")
def decode(cls, data, offset=0):
2022-05-06 04:01:01 -03:00
decoder = structs[cls]
if cls is AnnounceResponse:
2022-03-05 02:12:11 -03:00
return AnnounceResponse(*decoder.unpack_from(data, offset),
peers=[decode(CompactIPv4Peer, data, index) for index in range(20, len(data), 6)])
2022-05-06 04:01:01 -03:00
elif cls is ScrapeResponse:
2022-03-05 02:12:11 -03:00
return ScrapeResponse(*decoder.unpack_from(data, offset),
items=[decode(ScrapeResponseItem, data, index) for index in range(8, len(data), 12)])
2022-05-06 04:01:01 -03:00
elif cls is ErrorResponse:
2022-03-05 02:12:11 -03:00
return ErrorResponse(*decoder.unpack_from(data, offset), data[decoder.size:])
return cls(*decoder.unpack_from(data, offset))
def encode(obj):
if isinstance(obj, ScrapeRequest):
2022-05-06 04:01:01 -03:00
return structs[ScrapeRequest].pack(*obj[:-1]) + b''.join(obj.infohashes)
2022-03-05 02:12:11 -03:00
elif isinstance(obj, ErrorResponse):
2022-05-06 04:01:01 -03:00
return structs[ErrorResponse].pack(*obj[:-1]) + obj.message
2022-03-05 02:12:11 -03:00
elif isinstance(obj, AnnounceResponse):
2022-05-06 04:01:01 -03:00
return structs[AnnounceResponse].pack(*obj[:-1]) + b''.join([encode(peer) for peer in obj.peers])
return structs[type(obj)].pack(*obj)
2022-03-05 02:12:11 -03:00
2022-03-10 21:10:44 -03:00
def make_peer_id(random_part: Optional[str] = None) -> bytes:
# see https://wiki.theory.org/BitTorrentSpecification#peer_id and https://www.bittorrent.org/beps/bep_0020.html
# not to confuse with node id; peer id identifies uniquely the software, version and instance
random_part = random_part or ''.join(random.choice(string.ascii_letters) for _ in range(20))
return f"{PREFIX}-{'-'.join(map(str, version))}-{random_part}"[:20].encode()
2022-03-05 02:12:11 -03:00
class UDPTrackerClientProtocol(asyncio.DatagramProtocol):
2022-05-11 14:35:15 -03:00
def __init__(self, timeout: float = DEFAULT_TIMEOUT_SECONDS):
2022-03-05 02:12:11 -03:00
self.transport = None
self.data_queue = {}
2022-03-07 23:44:08 -03:00
self.timeout = timeout
2022-05-11 14:35:15 -03:00
self.semaphore = asyncio.Semaphore(DEFAULT_CONCURRENCY_LIMIT)
2022-03-05 02:12:11 -03:00
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
self.transport = transport
async def request(self, obj, tracker_ip, tracker_port):
self.data_queue[obj.transaction_id] = asyncio.get_running_loop().create_future()
2022-03-08 17:25:03 -03:00
async with self.semaphore:
self.transport.sendto(encode(obj), (tracker_ip, tracker_port))
return await asyncio.wait_for(self.data_queue[obj.transaction_id], self.timeout)
2022-03-05 02:12:11 -03:00
self.data_queue.pop(obj.transaction_id, None)
async def connect(self, tracker_ip, tracker_port):
transaction_id = random.getrandbits(32)
return decode(ConnectResponse,
await self.request(ConnectRequest(0x41727101980, 0, transaction_id), tracker_ip, tracker_port))
2022-03-09 20:11:30 -03:00
2022-03-07 23:35:12 -03:00
async def ensure_connection_id(self, peer_id, tracker_ip, tracker_port):
# peer_id is just to ensure cache coherency
return (await self.connect(tracker_ip, tracker_port)).connection_id
async def announce(self, info_hash, peer_id, port, tracker_ip, tracker_port, stopped=False):
connection_id = await self.ensure_connection_id(peer_id, tracker_ip, tracker_port)
2022-03-05 02:12:11 -03:00
# this should make the key deterministic but unique per info hash + peer id
key = int.from_bytes(info_hash[:4], "big") ^ int.from_bytes(peer_id[:4], "big") ^ port
transaction_id = random.getrandbits(32)
2022-03-05 02:55:19 -03:00
req = AnnounceRequest(
connection_id, 1, transaction_id, info_hash, peer_id, 0, 0, 0, 3 if stopped else 1, 0, key, -1, port)
2022-03-07 23:35:12 -03:00
return decode(AnnounceResponse, await self.request(req, tracker_ip, tracker_port))
2022-03-05 02:12:11 -03:00
async def scrape(self, infohashes, tracker_ip, tracker_port, connection_id=None):
2022-03-09 20:11:30 -03:00
connection_id = await self.ensure_connection_id(None, tracker_ip, tracker_port)
2022-03-05 02:12:11 -03:00
transaction_id = random.getrandbits(32)
reply = await self.request(
ScrapeRequest(connection_id, 2, transaction_id, infohashes), tracker_ip, tracker_port)
return decode(ScrapeResponse, reply), connection_id
def datagram_received(self, data: bytes, addr: (str, int)) -> None:
if len(data) < 8:
transaction_id = int.from_bytes(data[4:8], byteorder="big", signed=False)
if transaction_id in self.data_queue:
if not self.data_queue[transaction_id].done():
if data[3] == 3:
return self.data_queue[transaction_id].set_exception(Exception(decode(ErrorResponse, data).message))
return self.data_queue[transaction_id].set_result(data)
2022-03-07 23:35:12 -03:00
log.debug("unexpected packet (can be a response for a previously timed out request): %s", data.hex())
2022-03-05 02:12:11 -03:00
def connection_lost(self, exc: Exception = None) -> None:
self.transport = None
2022-03-05 02:42:58 -03:00
2022-03-07 23:35:12 -03:00
class TrackerClient:
2022-05-06 04:01:01 -03:00
event_controller = StreamController()
2022-03-08 00:58:18 -03:00
2022-03-12 02:15:45 -03:00
def __init__(self, node_id, announce_port, get_servers, timeout=10.0):
2022-03-09 17:07:16 -03:00
self.client = UDPTrackerClientProtocol(timeout=timeout)
2022-03-07 23:35:12 -03:00
self.transport = None
2022-03-10 21:10:44 -03:00
self.peer_id = make_peer_id(node_id.hex() if node_id else None)
2022-03-07 23:35:12 -03:00
self.announce_port = announce_port
2022-03-12 02:15:45 -03:00
self._get_servers = get_servers
2022-03-08 00:58:18 -03:00
self.results = {} # we can't probe the server before the interval, so we keep the result here until it expires
2022-03-08 17:32:35 -03:00
self.tasks = {}
2022-03-07 23:35:12 -03:00
async def start(self):
self.transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(
lambda: self.client, local_addr=("", 0))
2022-05-06 04:01:01 -03:00
2022-03-09 17:47:23 -03:00
lambda request: self.on_hash(request[1], request[2]) if request[0] == 'search' else None)
2022-03-07 23:35:12 -03:00
def stop(self):
2022-03-09 20:01:17 -03:00
while self.tasks:
2022-03-07 23:35:12 -03:00
if self.transport is not None:
self.client = None
self.transport = None
2022-05-06 04:01:01 -03:00
2022-03-07 23:35:12 -03:00
2022-03-09 17:47:23 -03:00
def on_hash(self, info_hash, on_announcement=None):
2022-03-08 17:32:35 -03:00
if info_hash not in self.tasks:
2022-03-09 17:47:23 -03:00
task = asyncio.create_task(self.get_peer_list(info_hash, on_announcement=on_announcement))
2022-03-09 19:59:30 -03:00
task.add_done_callback(lambda *_: self.tasks.pop(info_hash, None))
2022-03-09 14:24:16 -03:00
self.tasks[info_hash] = task
2022-03-07 23:35:12 -03:00
2022-03-09 19:59:30 -03:00
async def announce_many(self, *info_hashes, stopped=False):
await asyncio.gather(
2022-03-12 02:15:45 -03:00
*[self._announce_many(server, info_hashes, stopped=stopped) for server in self._get_servers()],
2022-03-09 19:59:30 -03:00
async def _announce_many(self, server, info_hashes, stopped=False):
tracker_ip = await resolve_host(*server, 'udp')
still_good_info_hashes = {
info_hash for (info_hash, (next_announcement, _)) in self.results.get(tracker_ip, {}).items()
if time.time() < next_announcement
results = await asyncio.gather(
*[self._probe_server(info_hash, tracker_ip, server[1], stopped=stopped)
for info_hash in info_hashes if info_hash not in still_good_info_hashes],
if results:
errors = sum([1 for result in results if result is None or isinstance(result, Exception)])
log.info("Tracker: finished announcing %d files to %s:%d, %d errors", len(results), *server, errors)
2022-04-04 00:09:20 -03:00
async def get_peer_list(self, info_hash, stopped=False, on_announcement=None, no_port=False):
2022-03-07 23:35:12 -03:00
found = []
2022-04-04 00:09:20 -03:00
probes = [self._probe_server(info_hash, *server, stopped, no_port) for server in self._get_servers()]
for done in asyncio.as_completed(probes):
2022-03-08 00:58:18 -03:00
result = await done
if result is not None:
2022-03-09 17:47:23 -03:00
await asyncio.gather(*filter(asyncio.iscoroutine, [on_announcement(result)] if on_announcement else []))
2022-03-08 00:58:18 -03:00
2022-03-07 23:35:12 -03:00
return found
2022-04-03 23:20:02 -03:00
async def get_kademlia_peer_list(self, info_hash):
2022-04-04 00:09:20 -03:00
responses = await self.get_peer_list(info_hash, no_port=True)
2022-04-04 23:53:38 -03:00
return await announcement_to_kademlia_peers(*responses)
2022-04-03 23:20:02 -03:00
2022-04-04 00:09:20 -03:00
async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False, no_port=False):
2022-03-08 00:58:18 -03:00
result = None
2022-03-10 21:15:39 -03:00
tracker_host = await resolve_host(tracker_host, tracker_port, 'udp')
except socket.error:
log.warning("DNS failure while resolving tracker host: %s, skipping.", tracker_host)
2022-03-12 02:18:15 -03:00
self.results.setdefault(tracker_host, {})
2022-03-09 19:59:30 -03:00
if info_hash in self.results[tracker_host]:
next_announcement, result = self.results[tracker_host][info_hash]
2022-03-08 00:58:18 -03:00
if time.time() < next_announcement:
return result
2022-03-07 23:44:08 -03:00
2022-03-08 17:25:03 -03:00
result = await self.client.announce(
2022-04-04 00:09:20 -03:00
info_hash, self.peer_id, 0 if no_port else self.announce_port, tracker_host, tracker_port, stopped)
2022-03-09 19:59:30 -03:00
self.results[tracker_host][info_hash] = (time.time() + result.interval, result)
2022-03-09 12:57:35 -03:00
except asyncio.TimeoutError: # todo: this is UDP, timeout is common, we need a better metric for failures
2022-03-09 19:59:30 -03:00
self.results[tracker_host][info_hash] = (time.time() + 60.0, result)
2022-03-09 12:57:35 -03:00
log.debug("Tracker timed out: %s:%d", tracker_host, tracker_port)
2022-03-08 00:58:18 -03:00
return None
2022-03-09 16:42:20 -03:00
log.debug("Announced: %s found %d peers for %s", tracker_host, len(result.peers), info_hash.hex()[:8])
2022-03-07 23:35:12 -03:00
return result
2022-04-04 23:53:38 -03:00
def enqueue_tracker_search(info_hash: bytes, peer_q: asyncio.Queue):
async def on_announcement(announcement: AnnounceResponse):
peers = await announcement_to_kademlia_peers(announcement)
log.info("Found %d peers from tracker for %s", len(peers), info_hash.hex()[:8])
2022-05-06 04:01:01 -03:00
TrackerClient.event_controller.add(('search', info_hash, on_announcement))
2022-04-04 23:53:38 -03:00
def announcement_to_kademlia_peers(*announcements: AnnounceResponse):
peers = [
(str(ipaddress.ip_address(peer.address)), peer.port)
for announcement in announcements for peer in announcement.peers if peer.port > 1024 # no privileged or 0
return get_kademlia_peers_from_hosts(peers)
2022-03-12 02:17:37 -03:00
class UDPTrackerServerProtocol(asyncio.DatagramProtocol): # for testing. Not suitable for production
def __init__(self):
self.transport = None
self.known_conns = set()
self.peers = {}
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
self.transport = transport
def add_peer(self, info_hash, ip_address: str, port: int):
self.peers.setdefault(info_hash, [])
self.peers[info_hash].append(encode_peer(ip_address, port))
2022-03-12 02:51:13 -03:00
def datagram_received(self, data: bytes, addr: (str, int)) -> None:
2022-03-12 02:17:37 -03:00
if len(data) < 16:
action = int.from_bytes(data[8:12], "big", signed=False)
if action == 0:
req = decode(ConnectRequest, data)
connection_id = random.getrandbits(32)
2022-03-12 02:51:13 -03:00
return self.transport.sendto(encode(ConnectResponse(0, req.transaction_id, connection_id)), addr)
2022-03-12 02:17:37 -03:00
elif action == 1:
req = decode(AnnounceRequest, data)
if req.connection_id not in self.known_conns:
resp = encode(ErrorResponse(3, req.transaction_id, b'Connection ID missmatch.\x00'))
2022-03-12 02:51:13 -03:00
compact_address = encode_peer(addr[0], req.port)
2022-03-12 02:17:37 -03:00
if req.event != 3:
2022-03-12 02:51:13 -03:00
self.add_peer(req.info_hash, addr[0], req.port)
2022-03-12 02:17:37 -03:00
elif compact_address in self.peers.get(req.info_hash, []):
peers = [decode(CompactIPv4Peer, peer) for peer in self.peers[req.info_hash]]
resp = encode(AnnounceResponse(1, req.transaction_id, 1700, 0, len(peers), peers))
2022-03-12 02:51:13 -03:00
return self.transport.sendto(resp, addr)
2022-03-12 02:17:37 -03:00
def encode_peer(ip_address: str, port: int):
compact_ip = reduce(lambda buff, x: buff + bytearray([int(x)]), ip_address.split('.'), bytearray())
return compact_ip + port.to_bytes(2, "big", signed=False)