forked from LBRYCommunity/lbry-sdk
move concurreny control to lower layer
This commit is contained in:
parent
3855db6c66
commit
28fdd62945
1 changed files with 6 additions and 6 deletions
|
@ -64,14 +64,16 @@ class UDPTrackerClientProtocol(asyncio.DatagramProtocol):
|
||||||
self.transport = None
|
self.transport = None
|
||||||
self.data_queue = {}
|
self.data_queue = {}
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
|
self.semaphore = asyncio.Semaphore(10)
|
||||||
|
|
||||||
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
|
def connection_made(self, transport: asyncio.DatagramTransport) -> None:
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
|
|
||||||
async def request(self, obj, tracker_ip, tracker_port):
|
async def request(self, obj, tracker_ip, tracker_port):
|
||||||
self.data_queue[obj.transaction_id] = asyncio.get_running_loop().create_future()
|
self.data_queue[obj.transaction_id] = asyncio.get_running_loop().create_future()
|
||||||
self.transport.sendto(encode(obj), (tracker_ip, tracker_port))
|
|
||||||
try:
|
try:
|
||||||
|
async with self.semaphore:
|
||||||
|
self.transport.sendto(encode(obj), (tracker_ip, tracker_port))
|
||||||
return await asyncio.wait_for(self.data_queue[obj.transaction_id], self.timeout)
|
return await asyncio.wait_for(self.data_queue[obj.transaction_id], self.timeout)
|
||||||
finally:
|
finally:
|
||||||
self.data_queue.pop(obj.transaction_id, None)
|
self.data_queue.pop(obj.transaction_id, None)
|
||||||
|
@ -129,7 +131,6 @@ class TrackerClient:
|
||||||
self.announce_port = announce_port
|
self.announce_port = announce_port
|
||||||
self.servers = servers
|
self.servers = servers
|
||||||
self.results = {} # we can't probe the server before the interval, so we keep the result here until it expires
|
self.results = {} # we can't probe the server before the interval, so we keep the result here until it expires
|
||||||
self.semaphore = asyncio.Semaphore(10)
|
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self.transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(
|
self.transport, _ = await asyncio.get_running_loop().create_datagram_endpoint(
|
||||||
|
@ -163,7 +164,6 @@ class TrackerClient:
|
||||||
return result
|
return result
|
||||||
try:
|
try:
|
||||||
tracker_ip = await resolve_host(tracker_host, tracker_port, 'udp')
|
tracker_ip = await resolve_host(tracker_host, tracker_port, 'udp')
|
||||||
async with self.semaphore:
|
|
||||||
result = await self.client.announce(
|
result = await self.client.announce(
|
||||||
info_hash, self.node_id, self.announce_port, tracker_ip, tracker_port, stopped)
|
info_hash, self.node_id, self.announce_port, tracker_ip, tracker_port, stopped)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
|
Loading…
Reference in a new issue