make a helper function to announce

This commit is contained in:
Victor Shyba 2022-03-05 02:42:58 -03:00
parent 4ea858fdd3
commit 2df8a1d99d
2 changed files with 26 additions and 1 deletions

View file

@ -4,6 +4,8 @@ import asyncio
import logging
from collections import namedtuple
from lbry.utils import resolve_host
log = logging.getLogger(__name__)
# see: http://bittorrent.org/beps/bep_0015.html and http://xbtt.sourceforge.net/udp_tracker_protocol.html
ConnectRequest = namedtuple("ConnectRequest", ["connection_id", "action", "transaction_id"])
@ -108,3 +110,20 @@ class UDPTrackerClientProtocol(asyncio.DatagramProtocol):
def connection_lost(self, exc: Exception = None) -> None:
self.transport = None
async def get_peer_list(info_hash, node_id, port, tracker_ip, tracker_port):
node_id = node_id or random.getrandbits(160).to_bytes(20, "big", signed=False)
tracker_ip = await resolve_host(tracker_ip, tracker_port, 'udp')
proto = UDPTrackerClientProtocol()
transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(lambda: proto, local_addr=("0.0.0.0", 0))
try:
reply, _ = await proto.announce(info_hash, node_id, port, tracker_ip, tracker_port)
return reply.peers
except asyncio.CancelledError:
raise
except Exception as exc:
log.warning("Error fetching from tracker: %s", exc.args)
return []
finally:
transport.close()

View file

@ -4,7 +4,7 @@ from functools import reduce
from lbry.testcase import AsyncioTestCase
from lbry.torrent.tracker import UDPTrackerClientProtocol, encode, decode, CompactIPv4Peer, ConnectRequest, \
ConnectResponse, AnnounceRequest, ErrorResponse, AnnounceResponse
ConnectResponse, AnnounceRequest, ErrorResponse, AnnounceResponse, get_peer_list
class UDPTrackerServerProtocol(asyncio.DatagramProtocol): # for testing. Not suitable for production
@ -54,6 +54,12 @@ class UDPTrackerClientTestCase(AsyncioTestCase):
self.assertEqual(announcement.peers,
[CompactIPv4Peer(int.from_bytes(bytes([127, 0, 0, 1]), "big", signed=False), 4444)])
async def test_announce_using_helper_function(self):
info_hash = random.getrandbits(160).to_bytes(20, "big", signed=False)
peers = await get_peer_list(info_hash, None, 4444, "127.0.0.1", 59900)
self.assertEqual(len(peers), 1)
self.assertEqual(peers, [CompactIPv4Peer(int.from_bytes(bytes([127, 0, 0, 1]), "big", signed=False), 4444)])
async def test_error(self):
info_hash = random.getrandbits(160).to_bytes(20, "big", signed=False)
peer_id = random.getrandbits(160).to_bytes(20, "big", signed=False)