From 43ac928f0b7212b6354dd62ef0407d47335884f1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 7 Feb 2019 20:09:48 -0300 Subject: [PATCH] remove dht locks --- lbrynet/dht/node.py | 2 +- lbrynet/dht/protocol/protocol.py | 79 +++++++++++++++----------------- 2 files changed, 38 insertions(+), 43 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 2cb4c3cf3..45d73cb53 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -64,7 +64,7 @@ class Node: # ping the set of peers; upon success/failure the routing able and last replied/failed time will be updated to_ping = [peer for peer in set(total_peers) if self.protocol.peer_manager.peer_is_good(peer) is not True] if to_ping: - await self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) + self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) fut = asyncio.Future(loop=self.loop) self.loop.call_later(constants.refresh_interval, fut.set_result, None) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index ec3cd8dd0..913cac83b 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -192,23 +192,21 @@ class PingQueue: self._process_task: asyncio.Task = None self._next_task: asyncio.Future = None self._next_timer: asyncio.TimerHandle = None - self._lock = asyncio.Lock() self._running = False @property def running(self): return self._running - async def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): delay = constants.check_refresh_interval if delay is None else delay - async with self._lock: - for peer in peers: - if delay and peer not in self._enqueued_contacts: - self._pending_contacts[peer] = self._loop.time() + delay - elif peer not in self._enqueued_contacts: - self._enqueued_contacts.append(peer) - if peer in self._pending_contacts: - del self._pending_contacts[peer] + for peer in peers: + if delay and peer not in self._enqueued_contacts: + self._pending_contacts[peer] = self._loop.time() + delay + elif peer not in self._enqueued_contacts: + self._enqueued_contacts.append(peer) + if peer in self._pending_contacts: + del self._pending_contacts[peer] async def _process(self): async def _ping(p: 'KademliaPeer'): @@ -223,17 +221,16 @@ class PingQueue: while True: tasks = [] - async with self._lock: - if self._enqueued_contacts or self._pending_contacts: - now = self._loop.time() - scheduled = [k for k, d in self._pending_contacts.items() if now >= d] - for k in scheduled: - del self._pending_contacts[k] - if k not in self._enqueued_contacts: - self._enqueued_contacts.append(k) - while self._enqueued_contacts: - peer = self._enqueued_contacts.pop() - tasks.append(self._loop.create_task(_ping(peer))) + if self._enqueued_contacts or self._pending_contacts: + now = self._loop.time() + scheduled = [k for k, d in self._pending_contacts.items() if now >= d] + for k in scheduled: + del self._pending_contacts[k] + if k not in self._enqueued_contacts: + self._enqueued_contacts.append(k) + while self._enqueued_contacts: + peer = self._enqueued_contacts.pop() + tasks.append(self._loop.create_task(_ping(peer))) if tasks: await asyncio.wait(tasks, loop=self._loop) @@ -282,7 +279,6 @@ class KademliaProtocol(DatagramProtocol): self.data_store = DictDataStore(self.loop, self.peer_manager) self.ping_queue = PingQueue(self.loop, self) self.node_rpc = KademliaRPC(self, self.loop, self.peer_port) - self.lock = asyncio.Lock(loop=self.loop) self.rpc_timeout = rpc_timeout self._split_lock = asyncio.Lock(loop=self.loop) @@ -424,7 +420,7 @@ class KademliaProtocol(DatagramProtocol): # will be added to our routing table if successful is_good = self.peer_manager.peer_is_good(peer) if is_good is None: - await self.ping_queue.enqueue_maybe_ping(peer) + self.ping_queue.enqueue_maybe_ping(peer) elif is_good is True: await self.add_peer(peer) @@ -553,26 +549,25 @@ class KademliaProtocol(DatagramProtocol): if message.rpc_id in self.sent_messages: self.sent_messages.pop(message.rpc_id) - async with self.lock: + if isinstance(message, RequestDatagram): + response_fut = self.loop.create_future() + response_fut.add_done_callback(pop_from_sent_messages) + self.sent_messages[message.rpc_id] = (peer, response_fut, message) + try: + self.transport.sendto(data, (peer.address, peer.udp_port)) + except OSError as err: + # TODO: handle ENETUNREACH + if err.errno == socket.EWOULDBLOCK: + # i'm scared this may swallow important errors, but i get a million of these + # on Linux and it doesn't seem to affect anything -grin + log.warning("Can't send data to dht: EWOULDBLOCK") + else: + log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)", + len(data), peer.address, peer.udp_port, str(err), err.errno) if isinstance(message, RequestDatagram): - response_fut = self.loop.create_future() - response_fut.add_done_callback(pop_from_sent_messages) - self.sent_messages[message.rpc_id] = (peer, response_fut, message) - try: - self.transport.sendto(data, (peer.address, peer.udp_port)) - except OSError as err: - # TODO: handle ENETUNREACH - if err.errno == socket.EWOULDBLOCK: - # i'm scared this may swallow important errors, but i get a million of these - # on Linux and it doesn't seem to affect anything -grin - log.warning("Can't send data to dht: EWOULDBLOCK") - else: - log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)", - len(data), peer.address, peer.udp_port, str(err), err.errno) - if isinstance(message, RequestDatagram): - self.sent_messages[message.rpc_id][1].set_exception(err) - else: - raise err + self.sent_messages[message.rpc_id][1].set_exception(err) + else: + raise err if isinstance(message, RequestDatagram): self.peer_manager.report_last_sent(peer.address, peer.udp_port) elif isinstance(message, ErrorDatagram):