lbry-sdk/lbrynet/blob/client/CryptStreamDownloader.py
2018-11-09 16:52:42 -05:00

221 lines
7.3 KiB
Python

import logging
from binascii import unhexlify
from twisted.internet import defer
from twisted.python.failure import Failure
from lbrynet.p2p.client.BlobRequester import BlobRequester
from lbrynet.p2p.client.ConnectionManager import ConnectionManager
from lbrynet.p2p.client.DownloadManager import DownloadManager
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.blob.client.CryptBlobHandler import CryptBlobHandler
log = logging.getLogger(__name__)
class StartFailedError(Exception):
pass
class AlreadyRunningError(Exception):
pass
class AlreadyStoppedError(Exception):
pass
class CurrentlyStoppingError(Exception):
pass
class CurrentlyStartingError(Exception):
pass
class CryptStreamDownloader:
#implements(IStreamDownloader)
def __init__(self, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet,
key, stream_name):
"""Initialize a CryptStreamDownloader
@param peer_finder: An object which implements the IPeerFinder
interface. Used to look up peers by a hashsum.
@param rate_limiter: An object which implements the IRateLimiter interface
@param blob_manager: A BlobManager object
@param payment_rate_manager: A NegotiatedPaymentRateManager object
@param wallet: An object which implements the IWallet interface
@return:
"""
self.peer_finder = peer_finder
self.rate_limiter = rate_limiter
self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager
self.wallet = wallet
self.key = unhexlify(key)
self.stream_name = unhexlify(stream_name).decode()
self.completed = False
self.stopped = True
self.stopping = False
self.starting = False
self.download_manager = None
self.finished_deferred = None
self.points_paid = 0.0
self.blob_requester = None
def __str__(self):
return str(self.stream_name)
def toggle_running(self):
if self.stopped is True:
return self.start()
else:
return self.stop()
def start(self):
if self.starting is True:
raise CurrentlyStartingError()
if self.stopping is True:
raise CurrentlyStoppingError()
if self.stopped is False:
raise AlreadyRunningError()
assert self.download_manager is None
self.starting = True
self.completed = False
self.finished_deferred = defer.Deferred()
d = self._start()
d.addCallback(lambda _: self.finished_deferred)
return d
@defer.inlineCallbacks
def stop(self, err=None):
if self.stopped is True:
raise AlreadyStoppedError()
if self.stopping is True:
raise CurrentlyStoppingError()
assert self.download_manager is not None
self.stopping = True
success = yield self.download_manager.stop_downloading()
self.stopping = False
if success is True:
self.stopped = True
self._remove_download_manager()
yield self._fire_completed_deferred(err)
def _start_failed(self):
def set_stopped():
self.stopped = True
self.stopping = False
self.starting = False
if self.download_manager is not None:
d = self.download_manager.stop_downloading()
d.addCallback(lambda _: self._remove_download_manager())
else:
d = defer.succeed(True)
d.addCallback(lambda _: set_stopped())
d.addCallback(lambda _: Failure(StartFailedError()))
return d
def _start(self):
def check_start_succeeded(success):
if success:
self.starting = False
self.stopped = False
self.completed = False
return True
else:
return self._start_failed()
self.download_manager = self._get_download_manager()
d = self.download_manager.start_downloading()
d.addCallbacks(check_start_succeeded)
return d
def _get_download_manager(self):
assert self.blob_requester is None
download_manager = DownloadManager(self.blob_manager)
# TODO: can we get rid of these circular references. I'm not
# smart enough to handle thinking about the interactions
# between them and have hope that there is a simpler way
# to accomplish what we want
download_manager.blob_info_finder = self._get_metadata_handler(download_manager)
download_manager.progress_manager = self._get_progress_manager(download_manager)
download_manager.blob_handler = self._get_blob_handler(download_manager)
download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger()
# blob_requester needs to be set before the connection manager is setup
self.blob_requester = self._get_blob_requester(download_manager)
download_manager.connection_manager = self._get_connection_manager(download_manager)
return download_manager
def _remove_download_manager(self):
self.download_manager.blob_info_finder = None
self.download_manager.progress_manager = None
self.download_manager.blob_handler = None
self.download_manager.wallet_info_exchanger = None
self.blob_requester = None
self.download_manager.connection_manager = None
self.download_manager = None
def _get_primary_request_creators(self, download_manager):
return [self.blob_requester]
def _get_secondary_request_creators(self, download_manager):
return [download_manager.wallet_info_exchanger]
def _get_metadata_handler(self, download_manager):
pass
def _get_blob_requester(self, download_manager):
return BlobRequester(self.blob_manager, self.peer_finder,
self.payment_rate_manager, self.wallet,
download_manager)
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager, download_manager)
def _get_write_func(self):
pass
def _get_blob_handler(self, download_manager):
return CryptBlobHandler(self.key, self._get_write_func())
def _get_connection_manager(self, download_manager):
return ConnectionManager(self, self.rate_limiter,
self._get_primary_request_creators(download_manager),
self._get_secondary_request_creators(download_manager))
def _fire_completed_deferred(self, err=None):
self.finished_deferred, d = None, self.finished_deferred
if d is not None:
if err is not None:
d.errback(err)
else:
value = self._get_finished_deferred_callback_value()
d.callback(value)
else:
log.debug("Not firing the completed deferred because d is None")
def _get_finished_deferred_callback_value(self):
return None
def _finished_downloading(self, finished):
if finished is True:
self.completed = True
return self.stop()
def insufficient_funds(self, err):
return self.stop(err=err)