Merge pull request #2503 from lbryio/reuse-loggly
Reuse loggly connection
This commit is contained in:
commit
7055d250d0
2 changed files with 23 additions and 34 deletions
|
@ -532,7 +532,7 @@ class KademliaProtocol(DatagramProtocol):
|
|||
elif error_datagram.response not in old_protocol_errors:
|
||||
log.warning(error_msg)
|
||||
else:
|
||||
log.warning("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
||||
log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
||||
peer.address, peer.udp_port, old_protocol_errors[error_datagram.response])
|
||||
df.set_exception(remote_exception)
|
||||
return
|
||||
|
@ -542,7 +542,7 @@ class KademliaProtocol(DatagramProtocol):
|
|||
f"pending request: {str(remote_exception)}"
|
||||
log.warning(msg)
|
||||
else:
|
||||
log.warning("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
||||
log.debug("known dht protocol backwards compatibility error with %s:%i (lbrynet v%s)",
|
||||
address[0], address[1], old_protocol_errors[error_datagram.response])
|
||||
|
||||
def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> None:
|
||||
|
@ -646,7 +646,8 @@ class KademliaProtocol(DatagramProtocol):
|
|||
return False
|
||||
return True
|
||||
|
||||
async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer') -> typing.Tuple[bytes, bool]:
|
||||
async def store_to_peer(self, hash_value: bytes, peer: 'KademliaPeer',
|
||||
retry: bool = True) -> typing.Tuple[bytes, bool]:
|
||||
async def __store():
|
||||
res = await self.get_rpc_peer(peer).store(hash_value)
|
||||
if res != b"OK":
|
||||
|
@ -658,34 +659,15 @@ class KademliaProtocol(DatagramProtocol):
|
|||
return await __store()
|
||||
except asyncio.TimeoutError:
|
||||
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
||||
return peer.node_id, False
|
||||
except ValueError as err:
|
||||
log.error("Unexpected response: %s" % err)
|
||||
return peer.node_id, False
|
||||
except RemoteException as err:
|
||||
if 'Invalid token' in str(err):
|
||||
self.peer_manager.clear_token(peer.node_id)
|
||||
try:
|
||||
return await __store()
|
||||
except (ValueError, asyncio.TimeoutError, RemoteException):
|
||||
return peer.node_id, False
|
||||
else:
|
||||
if 'Invalid token' not in str(err):
|
||||
log.exception("Unexpected error while storing blob_hash")
|
||||
return peer.node_id, False
|
||||
|
||||
def _write(self, data: bytes, address: typing.Tuple[str, int]):
|
||||
if self.transport:
|
||||
try:
|
||||
self.transport.sendto(data, address)
|
||||
except OSError as err:
|
||||
if err.errno == socket.EWOULDBLOCK:
|
||||
# i'm scared this may swallow important errors, but i get a million of these
|
||||
# on Linux and it doesn't seem to affect anything -grin
|
||||
log.warning("Can't send data to dht: EWOULDBLOCK")
|
||||
# elif err.errno == socket.ENETUNREACH:
|
||||
# # this should probably try to retransmit when the network connection is back
|
||||
# log.error("Network is unreachable")
|
||||
else:
|
||||
log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)",
|
||||
len(data), address[0], address[1], str(err), err.errno)
|
||||
raise err
|
||||
else:
|
||||
raise TransportNotConnected()
|
||||
return peer.node_id, False
|
||||
self.peer_manager.clear_token(peer.node_id)
|
||||
if not retry:
|
||||
return peer.node_id, False
|
||||
return await self.store_to_peer(hash_value, peer, retry=False)
|
||||
|
|
|
@ -3,6 +3,7 @@ from aiohttp.client_exceptions import ClientError
|
|||
import json
|
||||
import logging.handlers
|
||||
import traceback
|
||||
import aiohttp
|
||||
from lbry import utils, __version__
|
||||
|
||||
|
||||
|
@ -42,6 +43,8 @@ class HTTPSLogglyHandler(logging.Handler):
|
|||
self.url = "https://logs-01.loggly.com/inputs/{token}/tag/{tag}".format(
|
||||
token=utils.deobfuscate(loggly_token), tag='lbrynet-' + __version__
|
||||
)
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._session = aiohttp.ClientSession()
|
||||
|
||||
def get_full_message(self, record):
|
||||
if record.exc_info:
|
||||
|
@ -49,13 +52,17 @@ class HTTPSLogglyHandler(logging.Handler):
|
|||
else:
|
||||
return record.getMessage()
|
||||
|
||||
async def _emit(self, record):
|
||||
async def _emit(self, record, retry=True):
|
||||
data = self.format(record).encode()
|
||||
try:
|
||||
async with utils.aiohttp_request('post', self.url, data=self.format(record).encode(),
|
||||
cookies=self.cookies) as response:
|
||||
async with self._session.post(self.url, data=data,
|
||||
cookies=self.cookies) as response:
|
||||
self.cookies.update(response.cookies)
|
||||
except ClientError:
|
||||
pass
|
||||
if self._loop.is_running() and retry:
|
||||
await self._session.close()
|
||||
self._session = aiohttp.ClientSession()
|
||||
return await self._emit(record, retry=False)
|
||||
|
||||
def emit(self, record):
|
||||
asyncio.ensure_future(self._emit(record))
|
||||
|
|
Loading…
Reference in a new issue