Merge pull request #402 from lbryio/fix-needed-blobs
Bug Fix for ConnectionManager
This commit is contained in:
commit
3bed768d10
7 changed files with 91 additions and 73 deletions
|
@ -46,7 +46,7 @@ class BlobRequester(object):
|
||||||
self.peer_finder = peer_finder
|
self.peer_finder = peer_finder
|
||||||
self.payment_rate_manager = payment_rate_manager
|
self.payment_rate_manager = payment_rate_manager
|
||||||
self.wallet = wallet
|
self.wallet = wallet
|
||||||
self.download_manager = download_manager
|
self._download_manager = download_manager
|
||||||
self._peers = defaultdict(int) # {Peer: score}
|
self._peers = defaultdict(int) # {Peer: score}
|
||||||
self._available_blobs = defaultdict(list) # {Peer: [blob_hash]}
|
self._available_blobs = defaultdict(list) # {Peer: [blob_hash]}
|
||||||
self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}}
|
self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}}
|
||||||
|
@ -159,12 +159,12 @@ class BlobRequester(object):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _blobs_to_download(self):
|
def _blobs_to_download(self):
|
||||||
needed_blobs = self.download_manager.needed_blobs()
|
needed_blobs = self._download_manager.needed_blobs()
|
||||||
return sorted(needed_blobs, key=lambda b: b.is_downloading())
|
return sorted(needed_blobs, key=lambda b: b.is_downloading())
|
||||||
|
|
||||||
def _blobs_without_sources(self):
|
def _blobs_without_sources(self):
|
||||||
return [
|
return [
|
||||||
b for b in self.download_manager.needed_blobs()
|
b for b in self._download_manager.needed_blobs()
|
||||||
if not self._hash_available(b.blob_hash)
|
if not self._hash_available(b.blob_hash)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,8 @@ class ConnectionManager(object):
|
||||||
self._peer_connections = {} # {Peer: PeerConnectionHandler}
|
self._peer_connections = {} # {Peer: PeerConnectionHandler}
|
||||||
self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)}
|
self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)}
|
||||||
self._next_manage_call = None
|
self._next_manage_call = None
|
||||||
|
# a deferred that gets fired when a _manage call is set
|
||||||
|
self._manage_deferred = None
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
|
@ -41,35 +43,38 @@ class ConnectionManager(object):
|
||||||
self._next_manage_call = reactor.callLater(0, self._manage)
|
self._next_manage_call = reactor.callLater(0, self._manage)
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.stopped = True
|
self.stopped = True
|
||||||
if self._next_manage_call is not None and self._next_manage_call.active() is True:
|
# wait for the current manage call to finish
|
||||||
|
if self._manage_deferred:
|
||||||
|
yield self._manage_deferred
|
||||||
|
# in case we stopped between manage calls, cancel the next one
|
||||||
|
if self._next_manage_call and self._next_manage_call.active():
|
||||||
self._next_manage_call.cancel()
|
self._next_manage_call.cancel()
|
||||||
self._next_manage_call = None
|
self._next_manage_call = None
|
||||||
closing_deferreds = []
|
yield self._close_peers()
|
||||||
for peer in self._peer_connections.keys():
|
|
||||||
|
|
||||||
def close_connection(p):
|
def _close_peers(self):
|
||||||
log.info(
|
|
||||||
"Abruptly closing a connection to %s due to downloading being paused", p)
|
|
||||||
|
|
||||||
if self._peer_connections[p].factory.p is not None:
|
def disconnect_peer(p):
|
||||||
d = self._peer_connections[p].factory.p.cancel_requests()
|
d = defer.Deferred()
|
||||||
else:
|
self._connections_closing[p] = d
|
||||||
d = defer.succeed(True)
|
self._peer_connections[p].connection.disconnect()
|
||||||
|
if p in self._peer_connections:
|
||||||
|
del self._peer_connections[p]
|
||||||
|
return d
|
||||||
|
|
||||||
def disconnect_peer():
|
def close_connection(p):
|
||||||
d = defer.Deferred()
|
log.info("Abruptly closing a connection to %s due to downloading being paused", p)
|
||||||
self._connections_closing[p] = d
|
if self._peer_connections[p].factory.p is not None:
|
||||||
self._peer_connections[p].connection.disconnect()
|
d = self._peer_connections[p].factory.p.cancel_requests()
|
||||||
if p in self._peer_connections:
|
else:
|
||||||
del self._peer_connections[p]
|
d = defer.succeed(True)
|
||||||
return d
|
d.addBoth(lambda _: disconnect_peer(p))
|
||||||
|
return d
|
||||||
|
|
||||||
d.addBoth(lambda _: disconnect_peer())
|
closing_deferreds = [close_connection(peer) for peer in self._peer_connections.keys()]
|
||||||
return d
|
|
||||||
|
|
||||||
closing_deferreds.append(close_connection(peer))
|
|
||||||
return defer.DeferredList(closing_deferreds)
|
return defer.DeferredList(closing_deferreds)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -126,6 +131,25 @@ class ConnectionManager(object):
|
||||||
del self._connections_closing[peer]
|
del self._connections_closing[peer]
|
||||||
d.callback(True)
|
d.callback(True)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _manage(self):
|
||||||
|
self._manage_deferred = defer.Deferred()
|
||||||
|
|
||||||
|
from twisted.internet import reactor
|
||||||
|
if len(self._peer_connections) < conf.settings.max_connections_per_stream:
|
||||||
|
try:
|
||||||
|
ordered_request_creators = self._rank_request_creator_connections()
|
||||||
|
peers = yield self._get_new_peers(ordered_request_creators)
|
||||||
|
peer = self._pick_best_peer(peers)
|
||||||
|
yield self._connect_to_peer(peer)
|
||||||
|
except Exception:
|
||||||
|
# log this otherwise it will just end up as an unhandled error in deferred
|
||||||
|
log.exception('Something bad happened picking a peer')
|
||||||
|
self._manage_deferred.callback(None)
|
||||||
|
self._manage_deferred = None
|
||||||
|
if not self.stopped:
|
||||||
|
self._next_manage_call = reactor.callLater(1, self._manage)
|
||||||
|
|
||||||
def _rank_request_creator_connections(self):
|
def _rank_request_creator_connections(self):
|
||||||
"""Returns an ordered list of our request creators, ranked according
|
"""Returns an ordered list of our request creators, ranked according
|
||||||
to which has the least number of connections open that it
|
to which has the least number of connections open that it
|
||||||
|
@ -138,33 +162,6 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
return sorted(self._primary_request_creators, key=count_peers)
|
return sorted(self._primary_request_creators, key=count_peers)
|
||||||
|
|
||||||
def _connect_to_peer(self, peer):
|
|
||||||
if peer is None or self.stopped:
|
|
||||||
return
|
|
||||||
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
log.debug("Trying to connect to %s", peer)
|
|
||||||
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
|
|
||||||
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
|
||||||
factory)
|
|
||||||
connection = reactor.connectTCP(peer.host, peer.port, factory)
|
|
||||||
self._peer_connections[peer].connection = connection
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _manage(self):
|
|
||||||
from twisted.internet import reactor
|
|
||||||
if len(self._peer_connections) < conf.settings.max_connections_per_stream:
|
|
||||||
try:
|
|
||||||
ordered_request_creators = self._rank_request_creator_connections()
|
|
||||||
peers = yield self._get_new_peers(ordered_request_creators)
|
|
||||||
peer = self._pick_best_peer(peers)
|
|
||||||
yield self._connect_to_peer(peer)
|
|
||||||
except Exception:
|
|
||||||
# log this otherwise it will just end up as an unhandled error in deferred
|
|
||||||
log.exception('Something bad happened picking a peer')
|
|
||||||
self._next_manage_call = reactor.callLater(1, self._manage)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_new_peers(self, request_creators):
|
def _get_new_peers(self, request_creators):
|
||||||
log.debug("Trying to get a new peer to connect to")
|
log.debug("Trying to get a new peer to connect to")
|
||||||
|
@ -188,3 +185,16 @@ class ConnectionManager(object):
|
||||||
return peer
|
return peer
|
||||||
log.debug("Couldn't find a good peer to connect to")
|
log.debug("Couldn't find a good peer to connect to")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _connect_to_peer(self, peer):
|
||||||
|
if peer is None or self.stopped:
|
||||||
|
return
|
||||||
|
|
||||||
|
from twisted.internet import reactor
|
||||||
|
|
||||||
|
log.debug("Trying to connect to %s", peer)
|
||||||
|
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
|
||||||
|
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
||||||
|
factory)
|
||||||
|
connection = reactor.connectTCP(peer.host, peer.port, factory)
|
||||||
|
self._peer_connections[peer].connection = connection
|
||||||
|
|
|
@ -18,7 +18,6 @@ class DownloadManager(object):
|
||||||
self.progress_manager = None
|
self.progress_manager = None
|
||||||
self.blob_handler = None
|
self.blob_handler = None
|
||||||
self.connection_manager = None
|
self.connection_manager = None
|
||||||
|
|
||||||
self.blobs = {}
|
self.blobs = {}
|
||||||
self.blob_infos = {}
|
self.blob_infos = {}
|
||||||
|
|
||||||
|
@ -59,8 +58,8 @@ class DownloadManager(object):
|
||||||
d1.addBoth(check_stop, "progress manager")
|
d1.addBoth(check_stop, "progress manager")
|
||||||
d2 = self.connection_manager.stop()
|
d2 = self.connection_manager.stop()
|
||||||
d2.addBoth(check_stop, "connection manager")
|
d2.addBoth(check_stop, "connection manager")
|
||||||
dl = defer.DeferredList([d1, d2])
|
dl = defer.DeferredList([d1, d2], consumeErrors=True)
|
||||||
dl.addCallback(lambda xs: False not in xs)
|
dl.addCallback(lambda results: all([success for success, val in results]))
|
||||||
return dl
|
return dl
|
||||||
|
|
||||||
def add_blobs_to_download(self, blob_infos):
|
def add_blobs_to_download(self, blob_infos):
|
||||||
|
|
|
@ -147,6 +147,10 @@ class CryptStreamDownloader(object):
|
||||||
def _get_download_manager(self):
|
def _get_download_manager(self):
|
||||||
assert self.blob_requester is None
|
assert self.blob_requester is None
|
||||||
download_manager = DownloadManager(self.blob_manager, self.upload_allowed)
|
download_manager = DownloadManager(self.blob_manager, self.upload_allowed)
|
||||||
|
# TODO: can we get rid of these circular references. I'm not
|
||||||
|
# smart enough to handle thinking about the interactions
|
||||||
|
# between them and have hope that there is a simpler way
|
||||||
|
# to accomplish what we want
|
||||||
download_manager.blob_info_finder = self._get_metadata_handler(download_manager)
|
download_manager.blob_info_finder = self._get_metadata_handler(download_manager)
|
||||||
download_manager.progress_manager = self._get_progress_manager(download_manager)
|
download_manager.progress_manager = self._get_progress_manager(download_manager)
|
||||||
download_manager.blob_handler = self._get_blob_handler(download_manager)
|
download_manager.blob_handler = self._get_blob_handler(download_manager)
|
||||||
|
@ -161,6 +165,7 @@ class CryptStreamDownloader(object):
|
||||||
self.download_manager.progress_manager = None
|
self.download_manager.progress_manager = None
|
||||||
self.download_manager.blob_handler = None
|
self.download_manager.blob_handler = None
|
||||||
self.download_manager.wallet_info_exchanger = None
|
self.download_manager.wallet_info_exchanger = None
|
||||||
|
self.blob_requester = None
|
||||||
self.download_manager.connection_manager = None
|
self.download_manager.connection_manager = None
|
||||||
self.download_manager = None
|
self.download_manager = None
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
self.claim_id = None
|
self.claim_id = None
|
||||||
self.rowid = rowid
|
self.rowid = rowid
|
||||||
self.lbry_file_manager = lbry_file_manager
|
self.lbry_file_manager = lbry_file_manager
|
||||||
self.saving_status = False
|
self._saving_status = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def saving_status(self):
|
||||||
|
return self._saving_status
|
||||||
|
|
||||||
def restore(self):
|
def restore(self):
|
||||||
d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash)
|
d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash)
|
||||||
|
@ -96,18 +100,13 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
reflector_server = random.choice(conf.settings.reflector_servers)
|
reflector_server = random.choice(conf.settings.reflector_servers)
|
||||||
return reupload.check_and_restore_availability(self, reflector_server)
|
return reupload.check_and_restore_availability(self, reflector_server)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def stop(self, err=None, change_status=True):
|
def stop(self, err=None, change_status=True):
|
||||||
|
log.debug('Stopping download for %s', self.sd_hash)
|
||||||
def set_saving_status_done():
|
|
||||||
self.saving_status = False
|
|
||||||
|
|
||||||
# EncryptedFileSaver deletes metadata when it's stopped. We don't want that here.
|
# EncryptedFileSaver deletes metadata when it's stopped. We don't want that here.
|
||||||
d = EncryptedFileDownloader.stop(self, err=err)
|
yield EncryptedFileDownloader.stop(self, err=err)
|
||||||
if change_status is True:
|
if change_status is True:
|
||||||
self.saving_status = True
|
status = yield self._save_status()
|
||||||
d.addCallback(lambda _: self._save_status())
|
|
||||||
d.addCallback(lambda _: set_saving_status_done())
|
|
||||||
return d
|
|
||||||
|
|
||||||
def status(self):
|
def status(self):
|
||||||
def find_completed_blobhashes(blobs):
|
def find_completed_blobhashes(blobs):
|
||||||
|
@ -158,14 +157,17 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
else:
|
else:
|
||||||
return "Download stopped"
|
return "Download stopped"
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def _save_status(self):
|
def _save_status(self):
|
||||||
|
self._saving_status = True
|
||||||
if self.completed is True:
|
if self.completed is True:
|
||||||
s = ManagedEncryptedFileDownloader.STATUS_FINISHED
|
status = ManagedEncryptedFileDownloader.STATUS_FINISHED
|
||||||
elif self.stopped is True:
|
elif self.stopped is True:
|
||||||
s = ManagedEncryptedFileDownloader.STATUS_STOPPED
|
status = ManagedEncryptedFileDownloader.STATUS_STOPPED
|
||||||
else:
|
else:
|
||||||
s = ManagedEncryptedFileDownloader.STATUS_RUNNING
|
status = ManagedEncryptedFileDownloader.STATUS_RUNNING
|
||||||
return self.lbry_file_manager.change_lbry_file_status(self, s)
|
yield self.lbry_file_manager.change_lbry_file_status(self, status)
|
||||||
|
self._saving_status = False
|
||||||
|
|
||||||
def _get_progress_manager(self, download_manager):
|
def _get_progress_manager(self, download_manager):
|
||||||
return FullStreamProgressManager(self._finished_downloading,
|
return FullStreamProgressManager(self._finished_downloading,
|
||||||
|
|
|
@ -1677,7 +1677,8 @@ class Daemon(AuthJSONRPCServer):
|
||||||
msg = (
|
msg = (
|
||||||
"File was already being seeded" if status == 'start' else "File was already stopped"
|
"File was already being seeded" if status == 'start' else "File was already stopped"
|
||||||
)
|
)
|
||||||
defer.returnValue(self._render_response(msg))
|
response = yield self._render_response(msg)
|
||||||
|
defer.returnValue(response)
|
||||||
|
|
||||||
@AuthJSONRPCServer.auth_required
|
@AuthJSONRPCServer.auth_required
|
||||||
def jsonrpc_delete_lbry_file(self, p):
|
def jsonrpc_delete_lbry_file(self, p):
|
||||||
|
|
|
@ -85,9 +85,10 @@ class GetStream(object):
|
||||||
return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount
|
return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount
|
||||||
|
|
||||||
def start(self, stream_info, name):
|
def start(self, stream_info, name):
|
||||||
def _cause_timeout(err):
|
def _cancel(err):
|
||||||
log.info('Cancelling download')
|
if self.checker:
|
||||||
self.timeout_counter = self.timeout * 2
|
self.checker.stop()
|
||||||
|
self.finished.errback(err)
|
||||||
|
|
||||||
def _set_status(x, status):
|
def _set_status(x, status):
|
||||||
log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status))
|
log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status))
|
||||||
|
@ -138,7 +139,7 @@ class GetStream(object):
|
||||||
self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
|
self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
|
||||||
self._d.addCallback(get_downloader_factory)
|
self._d.addCallback(get_downloader_factory)
|
||||||
self._d.addCallback(make_downloader)
|
self._d.addCallback(make_downloader)
|
||||||
self._d.addCallbacks(self._start_download, _cause_timeout)
|
self._d.addCallbacks(self._start_download, _cancel)
|
||||||
self._d.callback(None)
|
self._d.callback(None)
|
||||||
|
|
||||||
return self.finished
|
return self.finished
|
||||||
|
|
Loading…
Reference in a new issue