From fc1b4658fc5d253e1127cc00f6900130d82d52c0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 4 Oct 2019 09:40:41 -0400 Subject: [PATCH 1/3] re-use loggly connection --- lbry/lbry/extras/daemon/loggly_handler.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lbry/lbry/extras/daemon/loggly_handler.py b/lbry/lbry/extras/daemon/loggly_handler.py index 91108bc73..f59e54baa 100644 --- a/lbry/lbry/extras/daemon/loggly_handler.py +++ b/lbry/lbry/extras/daemon/loggly_handler.py @@ -3,6 +3,7 @@ from aiohttp.client_exceptions import ClientError import json import logging.handlers import traceback +import aiohttp from lbry import utils, __version__ @@ -42,6 +43,8 @@ class HTTPSLogglyHandler(logging.Handler): self.url = "https://logs-01.loggly.com/inputs/{token}/tag/{tag}".format( token=utils.deobfuscate(loggly_token), tag='lbrynet-' + __version__ ) + self._loop = asyncio.get_event_loop() + self._session = aiohttp.ClientSession() def get_full_message(self, record): if record.exc_info: @@ -49,13 +52,17 @@ class HTTPSLogglyHandler(logging.Handler): else: return record.getMessage() - async def _emit(self, record): + async def _emit(self, record, retry=True): + data = self.format(record).encode() try: - async with utils.aiohttp_request('post', self.url, data=self.format(record).encode(), - cookies=self.cookies) as response: + async with self._session.post(self.url, data=data, + cookies=self.cookies) as response: self.cookies.update(response.cookies) except ClientError: - pass + if self._loop.is_running() and retry: + await self._session.close() + self._session = aiohttp.ClientSession() + return await self._emit(record, retry=False) def emit(self, record): asyncio.ensure_future(self._emit(record)) From 43b83bec832d4e53e7f3e9a62df9e0c248221f97 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 4 Oct 2019 09:41:28 -0400 Subject: [PATCH 2/3] remove dead code --- lbry/lbry/dht/protocol/protocol.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/lbry/lbry/dht/protocol/protocol.py b/lbry/lbry/dht/protocol/protocol.py index 838afb2db..8f452bbfa 100644 --- a/lbry/lbry/dht/protocol/protocol.py +++ b/lbry/lbry/dht/protocol/protocol.py @@ -671,21 +671,3 @@ class KademliaProtocol(DatagramProtocol): log.exception("Unexpected error while storing blob_hash") return peer.node_id, False - def _write(self, data: bytes, address: typing.Tuple[str, int]): - if self.transport: - try: - self.transport.sendto(data, address) - except OSError as err: - if err.errno == socket.EWOULDBLOCK: - # i'm scared this may swallow important errors, but i get a million of these - # on Linux and it doesn't seem to affect anything -grin - log.warning("Can't send data to dht: EWOULDBLOCK") - # elif err.errno == socket.ENETUNREACH: - # # this should probably try to retransmit when the network connection is back - # log.error("Network is unreachable") - else: - log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)", - len(data), address[0], address[1], str(err), err.errno) - raise err - else: - raise TransportNotConnected() From f99aacd9dcfd5ebda03a28945c5e89a28b53d78b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 4 Oct 2019 09:45:28 -0400 Subject: [PATCH 3/3] improve store_to_peer retry, reduce error verbosity --- lbry/lbry/dht/protocol/protocol.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lbry/lbry/dht/protocol/protocol.py b/lbry/lbry/dht/protocol/protocol.py index 8f452bbfa..809ebd0e1 100644 --- a/lbry/lbry/dht/protocol/protocol.py +++ b/lbry/lbry/dht/protocol/protocol.py @@ -532,7 +532,7 @@ class KademliaProtocol(DatagramProtocol): elif error_datagram.response not in old_protocol_errors: log.warning(error_msg) else: - log.warning("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", + log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", peer.address, peer.udp_port, old_protocol_errors[error_datagram.response]) df.set_exception(remote_exception) return @@ -542,7 +542,7 @@ class KademliaProtocol(DatagramProtocol): f"pending request: {str(remote_exception)}" log.warning(msg) else: - log.warning("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", + log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)", address[0], address[1], old_protocol_errors[error_datagram.response]) def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None: @@ -646,7 +646,8 @@ class KademliaProtocol(DatagramProtocol): return False return True - async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer') -> typing.Tuple[bytes, bool]: + async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer', + retry: bool = True) -> typing.Tuple[bytes, bool]: async def __store(): res = await self.get_rpc_peer(peer).store(hash_value) if res != b"OK": @@ -658,16 +659,15 @@ class KademliaProtocol(DatagramProtocol): return await __store() except asyncio.TimeoutError: log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer) + return peer.node_id, False except ValueError as err: log.error("Unexpected response: %s" % err) + return peer.node_id, False except RemoteException as err: - if 'Invalid token' in str(err): - self.peer_manager.clear_token(peer.node_id) - try: - return await __store() - except (ValueError, asyncio.TimeoutError, RemoteException): - return peer.node_id, False - else: + if 'Invalid token' not in str(err): log.exception("Unexpected error while storing blob_hash") - return peer.node_id, False - + return peer.node_id, False + self.peer_manager.clear_token(peer.node_id) + if not retry: + return peer.node_id, False + return await self.store_to_peer(hash_value, peer, retry=False)