forked from LBRYCommunity/lbry-sdk
improve timeout handling
This commit is contained in:
parent
9e9a64d989
commit
888e9918a6
2 changed files with 12 additions and 7 deletions
|
@ -730,7 +730,7 @@ class TrackerAnnouncerComponent(Component):
|
||||||
|
|
||||||
async def announce_forever(self):
|
async def announce_forever(self):
|
||||||
while True:
|
while True:
|
||||||
to_sleep = 60 * 10
|
to_sleep = 60 * 1
|
||||||
for file in self.file_manager.get_filtered():
|
for file in self.file_manager.get_filtered():
|
||||||
if not file.downloader:
|
if not file.downloader:
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -59,9 +59,10 @@ def encode(obj):
|
||||||
|
|
||||||
|
|
||||||
class UDPTrackerClientProtocol(asyncio.DatagramProtocol):
|
class UDPTrackerClientProtocol(asyncio.DatagramProtocol):
|
||||||
def __init__(self):
|
def __init__(self, timeout = 30.0):
|
||||||
self.transport = None
|
self.transport = None
|
||||||
self.data_queue = {}
|
self.data_queue = {}
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
|
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
|
@ -70,7 +71,7 @@ class UDPTrackerClientProtocol(asyncio.DatagramProtocol):
|
||||||
self.data_queue[obj.transaction_id] = asyncio.get_running_loop().create_future()
|
self.data_queue[obj.transaction_id] = asyncio.get_running_loop().create_future()
|
||||||
self.transport.sendto(encode(obj), (tracker_ip, tracker_port))
|
self.transport.sendto(encode(obj), (tracker_ip, tracker_port))
|
||||||
try:
|
try:
|
||||||
return await asyncio.wait_for(self.data_queue[obj.transaction_id], 3.0)
|
return await asyncio.wait_for(self.data_queue[obj.transaction_id], self.timeout)
|
||||||
finally:
|
finally:
|
||||||
self.data_queue.pop(obj.transaction_id, None)
|
self.data_queue.pop(obj.transaction_id, None)
|
||||||
|
|
||||||
|
@ -144,13 +145,17 @@ class TrackerClient:
|
||||||
async def get_peer_list(self, info_hash, stopped=False):
|
async def get_peer_list(self, info_hash, stopped=False):
|
||||||
found = []
|
found = []
|
||||||
for done in asyncio.as_completed([self._probe_server(info_hash, *server, stopped) for server in self.servers]):
|
for done in asyncio.as_completed([self._probe_server(info_hash, *server, stopped) for server in self.servers]):
|
||||||
found.append(await done)
|
found.extend(await done)
|
||||||
return found
|
return found
|
||||||
|
|
||||||
async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False):
|
async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False):
|
||||||
|
try:
|
||||||
tracker_ip = await resolve_host(tracker_host, tracker_port, 'udp')
|
tracker_ip = await resolve_host(tracker_host, tracker_port, 'udp')
|
||||||
result = await self.client.announce(
|
result = await self.client.announce(
|
||||||
info_hash, self.node_id, self.announce_port, tracker_ip, tracker_port, stopped)
|
info_hash, self.node_id, self.announce_port, tracker_ip, tracker_port, stopped)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
log.info("Tracker timed out: %s:%d", tracker_host, tracker_port)
|
||||||
|
return []
|
||||||
log.info("Announced to tracker. Found %d peers for %s on %s",
|
log.info("Announced to tracker. Found %d peers for %s on %s",
|
||||||
len(result.peers), info_hash.hex()[:8], tracker_host)
|
len(result.peers), info_hash.hex()[:8], tracker_host)
|
||||||
self.EVENT_CONTROLLER.add((info_hash, result))
|
self.EVENT_CONTROLLER.add((info_hash, result))
|
||||||
|
|
Loading…
Reference in a new issue