forked from LBRYCommunity/lbry-sdk
Merge pull request #441 from lbryio/connection_manager_logging
Better Connection manager and Client Protocol logging
This commit is contained in:
commit
f54d42b8fa
2 changed files with 46 additions and 21 deletions
|
@ -27,6 +27,7 @@ class ClientProtocol(Protocol):
|
||||||
######### Protocol #########
|
######### Protocol #########
|
||||||
|
|
||||||
def connectionMade(self):
|
def connectionMade(self):
|
||||||
|
log.debug("Connection made to %s", self.factory.peer)
|
||||||
self._connection_manager = self.factory.connection_manager
|
self._connection_manager = self.factory.connection_manager
|
||||||
self._rate_limiter = self.factory.rate_limiter
|
self._rate_limiter = self.factory.rate_limiter
|
||||||
self.peer = self.factory.peer
|
self.peer = self.factory.peer
|
||||||
|
@ -43,13 +44,15 @@ class ClientProtocol(Protocol):
|
||||||
self._ask_for_request()
|
self._ask_for_request()
|
||||||
|
|
||||||
def dataReceived(self, data):
|
def dataReceived(self, data):
|
||||||
|
log.debug("Data receieved from %s", self.peer)
|
||||||
self._rate_limiter.report_dl_bytes(len(data))
|
self._rate_limiter.report_dl_bytes(len(data))
|
||||||
if self._downloading_blob is True:
|
if self._downloading_blob is True:
|
||||||
self._blob_download_request.write(data)
|
self._blob_download_request.write(data)
|
||||||
else:
|
else:
|
||||||
self._response_buff += data
|
self._response_buff += data
|
||||||
if len(self._response_buff) > conf.settings['MAX_RESPONSE_INFO_SIZE']:
|
if len(self._response_buff) > conf.settings['MAX_RESPONSE_INFO_SIZE']:
|
||||||
log.warning("Response is too large. Size %s", len(self._response_buff))
|
log.warning("Response is too large from %s. Size %s",
|
||||||
|
self.peer, len(self._response_buff))
|
||||||
self.transport.loseConnection()
|
self.transport.loseConnection()
|
||||||
response, extra_data = self._get_valid_response(self._response_buff)
|
response, extra_data = self._get_valid_response(self._response_buff)
|
||||||
if response is not None:
|
if response is not None:
|
||||||
|
@ -59,6 +62,7 @@ class ClientProtocol(Protocol):
|
||||||
self._blob_download_request.write(extra_data)
|
self._blob_download_request.write(extra_data)
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
|
log.debug("Connection lost to %s: %s", self.peer, reason)
|
||||||
self.connection_closed = True
|
self.connection_closed = True
|
||||||
if reason.check(error.ConnectionDone):
|
if reason.check(error.ConnectionDone):
|
||||||
err = failure.Failure(ConnectionClosedBeforeResponseError())
|
err = failure.Failure(ConnectionClosedBeforeResponseError())
|
||||||
|
@ -78,7 +82,7 @@ class ClientProtocol(Protocol):
|
||||||
raise ValueError("There is already a request for that response active")
|
raise ValueError("There is already a request for that response active")
|
||||||
self._next_request.update(request.request_dict)
|
self._next_request.update(request.request_dict)
|
||||||
d = defer.Deferred()
|
d = defer.Deferred()
|
||||||
log.debug("Adding a request. Request: %s", str(request.request_dict))
|
log.debug("Adding a request for %s. Request: %s", self.peer, request.request_dict)
|
||||||
self._response_deferreds[request.response_identifier] = d
|
self._response_deferreds[request.response_identifier] = d
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -116,9 +120,6 @@ class ClientProtocol(Protocol):
|
||||||
self.transport.loseConnection()
|
self.transport.loseConnection()
|
||||||
|
|
||||||
def _ask_for_request(self):
|
def _ask_for_request(self):
|
||||||
|
|
||||||
log.debug("In _ask_for_request")
|
|
||||||
|
|
||||||
if self.connection_closed is True or self.connection_closing is True:
|
if self.connection_closed is True or self.connection_closing is True:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -205,20 +206,20 @@ class ClientProtocol(Protocol):
|
||||||
failed = True
|
failed = True
|
||||||
if not isinstance(result.value, DownloadCanceledError):
|
if not isinstance(result.value, DownloadCanceledError):
|
||||||
log.info(result.value)
|
log.info(result.value)
|
||||||
log.info("The connection is closing due to an error: %s",
|
log.info("The connection to %s is closing due to an error: %s",
|
||||||
result.getTraceback())
|
self.peer, result.getTraceback())
|
||||||
if failed is False:
|
if failed is False:
|
||||||
log.debug("Asking for another request.")
|
log.debug("Asking for another request from %s", self.peer)
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
reactor.callLater(0, self._ask_for_request)
|
reactor.callLater(0, self._ask_for_request)
|
||||||
else:
|
else:
|
||||||
log.debug("Not asking for another request.")
|
log.debug("Not asking for another request from %s", self.peer)
|
||||||
self.transport.loseConnection()
|
self.transport.loseConnection()
|
||||||
|
|
||||||
dl.addCallback(get_next_request)
|
dl.addCallback(get_next_request)
|
||||||
|
|
||||||
def _downloading_finished(self, arg):
|
def _downloading_finished(self, arg):
|
||||||
log.debug("The blob has finished downloading")
|
log.debug("The blob has finished downloading from %s", self.peer)
|
||||||
self._blob_download_request = None
|
self._blob_download_request = None
|
||||||
self._downloading_blob = False
|
self._downloading_blob = False
|
||||||
return arg
|
return arg
|
||||||
|
@ -230,7 +231,7 @@ class ClientProtocol(Protocol):
|
||||||
# TODO: of telling the server it wants the download to stop. It would be great if the
|
# TODO: of telling the server it wants the download to stop. It would be great if the
|
||||||
# TODO: protocol had such a mechanism.
|
# TODO: protocol had such a mechanism.
|
||||||
log.debug("Closing the connection to %s because the download of blob %s was canceled",
|
log.debug("Closing the connection to %s because the download of blob %s was canceled",
|
||||||
str(self.peer), str(self._blob_download_request.blob))
|
self.peer, self._blob_download_request.blob)
|
||||||
return err
|
return err
|
||||||
|
|
||||||
######### IRateLimited #########
|
######### IRateLimited #########
|
||||||
|
@ -258,6 +259,7 @@ class ClientProtocolFactory(ClientFactory):
|
||||||
self.p = None
|
self.p = None
|
||||||
|
|
||||||
def clientConnectionFailed(self, connector, reason):
|
def clientConnectionFailed(self, connector, reason):
|
||||||
|
log.debug("Connection failed to %s: %s", self.peer, reason)
|
||||||
self.peer.report_down()
|
self.peer.report_down()
|
||||||
self.connection_manager.protocol_disconnected(self.peer, connector)
|
self.connection_manager.protocol_disconnected(self.peer, connector)
|
||||||
|
|
||||||
|
|
|
@ -32,8 +32,20 @@ class ConnectionManager(object):
|
||||||
# a deferred that gets fired when a _manage call is set
|
# a deferred that gets fired when a _manage call is set
|
||||||
self._manage_deferred = None
|
self._manage_deferred = None
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
|
log.info("%s initialized", self._get_log_name())
|
||||||
|
|
||||||
|
# this identifies what the connection manager is for,
|
||||||
|
# used for logging purposes only
|
||||||
|
def _get_log_name(self):
|
||||||
|
out = 'Connection Manager Unknown'
|
||||||
|
if hasattr(self.downloader, 'stream_name'):
|
||||||
|
out = 'Connection Manager '+self.downloader.stream_name
|
||||||
|
elif hasattr(self.downloader, 'blob_hash'):
|
||||||
|
out = 'Connection Manager '+self.downloader.blob_hash
|
||||||
|
return out
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
|
log.debug("%s starting", self._get_log_name())
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
|
|
||||||
self.stopped = False
|
self.stopped = False
|
||||||
|
@ -45,6 +57,7 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
log.debug("%s stopping", self._get_log_name())
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
# wait for the current manage call to finish
|
# wait for the current manage call to finish
|
||||||
if self._manage_deferred:
|
if self._manage_deferred:
|
||||||
|
@ -66,7 +79,8 @@ class ConnectionManager(object):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def close_connection(p):
|
def close_connection(p):
|
||||||
log.info("Abruptly closing a connection to %s due to downloading being paused", p)
|
log.debug("%s Abruptly closing a connection to %s due to downloading being paused",
|
||||||
|
self._get_log_name(), p)
|
||||||
if self._peer_connections[p].factory.p is not None:
|
if self._peer_connections[p].factory.p is not None:
|
||||||
d = self._peer_connections[p].factory.p.cancel_requests()
|
d = self._peer_connections[p].factory.p.cancel_requests()
|
||||||
else:
|
else:
|
||||||
|
@ -79,9 +93,10 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_next_request(self, peer, protocol):
|
def get_next_request(self, peer, protocol):
|
||||||
log.debug("Trying to get the next request for peer %s", peer)
|
log.debug("%s Trying to get the next request for peer %s", self._get_log_name(), peer)
|
||||||
if not peer in self._peer_connections or self.stopped is True:
|
if not peer in self._peer_connections or self.stopped is True:
|
||||||
log.debug("The peer has already been told to shut down.")
|
log.debug("%s The peer %s has already been told to shut down.",
|
||||||
|
self._get_log_name(), peer)
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
requests = yield self._send_primary_requests(peer, protocol)
|
requests = yield self._send_primary_requests(peer, protocol)
|
||||||
have_request = any(r[1] for r in requests if r[0] is True)
|
have_request = any(r[1] for r in requests if r[0] is True)
|
||||||
|
@ -90,7 +105,6 @@ class ConnectionManager(object):
|
||||||
defer.returnValue(have_request)
|
defer.returnValue(have_request)
|
||||||
|
|
||||||
def _send_primary_requests(self, peer, protocol):
|
def _send_primary_requests(self, peer, protocol):
|
||||||
|
|
||||||
def handle_error(err):
|
def handle_error(err):
|
||||||
err.trap(InsufficientFundsError)
|
err.trap(InsufficientFundsError)
|
||||||
self.downloader.insufficient_funds(err)
|
self.downloader.insufficient_funds(err)
|
||||||
|
@ -124,6 +138,8 @@ class ConnectionManager(object):
|
||||||
return defer.DeferredList(ds)
|
return defer.DeferredList(ds)
|
||||||
|
|
||||||
def protocol_disconnected(self, peer, protocol):
|
def protocol_disconnected(self, peer, protocol):
|
||||||
|
log.debug("%s protocol %s disconnected for %s",
|
||||||
|
self._get_log_name(), type(protocol).__name__, peer)
|
||||||
if peer in self._peer_connections:
|
if peer in self._peer_connections:
|
||||||
del self._peer_connections[peer]
|
del self._peer_connections[peer]
|
||||||
if peer in self._connections_closing:
|
if peer in self._connections_closing:
|
||||||
|
@ -137,6 +153,9 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
if len(self._peer_connections) < conf.settings['max_connections_per_stream']:
|
if len(self._peer_connections) < conf.settings['max_connections_per_stream']:
|
||||||
|
log.debug("%s have %d connections, looking for %d",
|
||||||
|
self._get_log_name(), len(self._peer_connections),
|
||||||
|
conf.settings['max_connections_per_stream'])
|
||||||
try:
|
try:
|
||||||
ordered_request_creators = self._rank_request_creator_connections()
|
ordered_request_creators = self._rank_request_creator_connections()
|
||||||
peers = yield self._get_new_peers(ordered_request_creators)
|
peers = yield self._get_new_peers(ordered_request_creators)
|
||||||
|
@ -164,10 +183,9 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_new_peers(self, request_creators):
|
def _get_new_peers(self, request_creators):
|
||||||
log.debug("Trying to get a new peer to connect to")
|
log.debug("%s Trying to get a new peer to connect to", self._get_log_name())
|
||||||
if not request_creators:
|
if not request_creators:
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
log.debug("Got a creator to check: %s", request_creators[0])
|
|
||||||
new_peers = yield request_creators[0].get_new_peers()
|
new_peers = yield request_creators[0].get_new_peers()
|
||||||
if not new_peers:
|
if not new_peers:
|
||||||
new_peers = yield self._get_new_peers(request_creators[1:])
|
new_peers = yield self._get_new_peers(request_creators[1:])
|
||||||
|
@ -176,14 +194,19 @@ class ConnectionManager(object):
|
||||||
def _pick_best_peer(self, peers):
|
def _pick_best_peer(self, peers):
|
||||||
# TODO: Eventually rank them based on past performance/reputation. For now
|
# TODO: Eventually rank them based on past performance/reputation. For now
|
||||||
# TODO: just pick the first to which we don't have an open connection
|
# TODO: just pick the first to which we don't have an open connection
|
||||||
log.debug("Got a list of peers to choose from: %s", peers)
|
log.debug("%s Got a list of peers to choose from: %s",
|
||||||
|
self._get_log_name(), peers)
|
||||||
|
log.debug("%s Current connections: %s",
|
||||||
|
self._get_log_name(), self._peer_connections.keys())
|
||||||
|
log.debug("%s List of connection states: %s", self._get_log_name(),
|
||||||
|
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
||||||
if peers is None:
|
if peers is None:
|
||||||
return None
|
return None
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
if not peer in self._peer_connections:
|
if not peer in self._peer_connections:
|
||||||
log.debug("Got a good peer. Returning peer %s", peer)
|
log.debug("%s Got a good peer %s", self._get_log_name(), peer)
|
||||||
return peer
|
return peer
|
||||||
log.debug("Couldn't find a good peer to connect to")
|
log.debug("%s Couldn't find a good peer to connect to", self._get_log_name())
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _connect_to_peer(self, peer):
|
def _connect_to_peer(self, peer):
|
||||||
|
@ -192,7 +215,7 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
|
|
||||||
log.debug("Trying to connect to %s", peer)
|
log.debug("%s Trying to connect to %s", self._get_log_name(), peer)
|
||||||
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
|
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
|
||||||
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
||||||
factory)
|
factory)
|
||||||
|
|
Loading…
Reference in a new issue