lbry-sdk/lbrynet/core/client/ConnectionManager.py

193 lines
7.5 KiB
Python
Raw Normal View History

2015-08-20 17:27:15 +02:00
import logging
from twisted.internet import defer
from zope.interface import implements
from lbrynet import interfaces
from lbrynet.conf import MAX_CONNECTIONS_PER_STREAM
from lbrynet.core.client.ClientProtocol import ClientProtocolFactory
from lbrynet.core.Error import InsufficientFundsError
log = logging.getLogger(__name__)
class PeerConnectionHandler(object):
def __init__(self, request_creators, factory):
self.request_creators = request_creators
self.factory = factory
self.connection = None
2015-08-20 17:27:15 +02:00
class ConnectionManager(object):
implements(interfaces.IConnectionManager)
def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators):
self.downloader = downloader
self.rate_limiter = rate_limiter
self._primary_request_creators = primary_request_creators
self._secondary_request_creators = secondary_request_creators
self._peer_connections = {} # {Peer: PeerConnectionHandler}
self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)}
self._next_manage_call = None
self.stopped = True
2015-08-20 17:27:15 +02:00
def start(self):
from twisted.internet import reactor
self.stopped = False
if self._next_manage_call is not None and self._next_manage_call.active() is True:
self._next_manage_call.cancel()
self._next_manage_call = reactor.callLater(0, self._manage)
2015-08-20 17:27:15 +02:00
return defer.succeed(True)
def stop(self):
self.stopped = True
if self._next_manage_call is not None and self._next_manage_call.active() is True:
self._next_manage_call.cancel()
self._next_manage_call = None
2015-08-20 17:27:15 +02:00
closing_deferreds = []
for peer in self._peer_connections.keys():
2015-08-20 17:27:15 +02:00
def close_connection(p):
log.info(
"Abruptly closing a connection to %s due to downloading being paused", p)
2015-08-20 17:27:15 +02:00
if self._peer_connections[p].factory.p is not None:
d = self._peer_connections[p].factory.p.cancel_requests()
2015-08-20 17:27:15 +02:00
else:
d = defer.succeed(True)
def disconnect_peer():
d = defer.Deferred()
self._connections_closing[p] = d
self._peer_connections[p].connection.disconnect()
if p in self._peer_connections:
del self._peer_connections[p]
2015-08-20 17:27:15 +02:00
return d
d.addBoth(lambda _: disconnect_peer())
return d
closing_deferreds.append(close_connection(peer))
return defer.DeferredList(closing_deferreds)
def get_next_request(self, peer, protocol):
log.debug("Trying to get the next request for peer %s", peer)
2015-08-20 17:27:15 +02:00
if not peer in self._peer_connections or self.stopped is True:
log.debug("The peer has already been told to shut down.")
2015-08-20 17:27:15 +02:00
return defer.succeed(False)
def handle_error(err):
err.trap(InsufficientFundsError)
self.downloader.insufficient_funds(err)
return False
2015-08-20 17:27:15 +02:00
def check_if_request_sent(request_sent, request_creator):
if peer not in self._peer_connections:
# This can happen if the connection is told to close
return False
2015-08-20 17:27:15 +02:00
if request_sent is False:
if request_creator in self._peer_connections[peer].request_creators:
self._peer_connections[peer].request_creators.remove(request_creator)
2015-08-20 17:27:15 +02:00
else:
if not request_creator in self._peer_connections[peer].request_creators:
self._peer_connections[peer].request_creators.append(request_creator)
2015-08-20 17:27:15 +02:00
return request_sent
def check_requests(requests):
have_request = True in [r[1] for r in requests if r[0] is True]
return have_request
def get_secondary_requests_if_necessary(have_request):
if have_request is True:
ds = []
for s_r_c in self._secondary_request_creators:
2015-08-20 17:27:15 +02:00
d = s_r_c.send_next_request(peer, protocol)
ds.append(d)
dl = defer.DeferredList(ds)
else:
dl = defer.succeed(None)
dl.addCallback(lambda _: have_request)
return dl
ds = []
for p_r_c in self._primary_request_creators:
2015-08-20 17:27:15 +02:00
d = p_r_c.send_next_request(peer, protocol)
d.addErrback(handle_error)
d.addCallback(check_if_request_sent, p_r_c)
ds.append(d)
dl = defer.DeferredList(ds, fireOnOneErrback=True)
dl.addCallback(check_requests)
dl.addCallback(get_secondary_requests_if_necessary)
return dl
def protocol_disconnected(self, peer, protocol):
if peer in self._peer_connections:
del self._peer_connections[peer]
if peer in self._connections_closing:
d = self._connections_closing[peer]
del self._connections_closing[peer]
2015-08-20 17:27:15 +02:00
d.callback(True)
def _rank_request_creator_connections(self):
"""
@return: an ordered list of our request creators, ranked according to which has the least number of
connections open that it likes
"""
def count_peers(request_creator):
return len([p for p in self._peer_connections.itervalues() if request_creator in p.request_creators])
2015-08-20 17:27:15 +02:00
return sorted(self._primary_request_creators, key=count_peers)
2015-08-20 17:27:15 +02:00
def _connect_to_peer(self, peer):
from twisted.internet import reactor
if peer is not None and self.stopped is False:
log.debug("Trying to connect to %s", peer)
2015-08-20 17:27:15 +02:00
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
factory)
2015-08-20 17:27:15 +02:00
connection = reactor.connectTCP(peer.host, peer.port, factory)
self._peer_connections[peer].connection = connection
2015-08-20 17:27:15 +02:00
def _manage(self):
from twisted.internet import reactor
def get_new_peers(request_creators):
log.debug("Trying to get a new peer to connect to")
2015-08-20 17:27:15 +02:00
if len(request_creators) > 0:
log.debug("Got a creator to check: %s", request_creators[0])
2015-08-20 17:27:15 +02:00
d = request_creators[0].get_new_peers()
d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:]))
return d
else:
return defer.succeed(None)
def pick_best_peer(peers):
# TODO: Eventually rank them based on past performance/reputation. For now
# TODO: just pick the first to which we don't have an open connection
log.debug("Got a list of peers to choose from: %s", peers)
2015-08-20 17:27:15 +02:00
if peers is None:
return None
for peer in peers:
if not peer in self._peer_connections:
log.debug("Got a good peer. Returning peer %s", peer)
2015-08-20 17:27:15 +02:00
return peer
log.debug("Couldn't find a good peer to connect to")
2015-08-20 17:27:15 +02:00
return None
if len(self._peer_connections) < MAX_CONNECTIONS_PER_STREAM:
2015-08-20 17:27:15 +02:00
ordered_request_creators = self._rank_request_creator_connections()
d = get_new_peers(ordered_request_creators)
d.addCallback(pick_best_peer)
d.addCallback(self._connect_to_peer)
2016-09-19 21:07:47 +02:00
self._next_manage_call = reactor.callLater(1, self._manage)