forked from LBRYCommunity/lbry-sdk
220 lines
7.3 KiB
Python
220 lines
7.3 KiB
Python
import logging
|
|
from binascii import unhexlify
|
|
from twisted.internet import defer
|
|
from twisted.python.failure import Failure
|
|
|
|
from lbrynet.p2p.client.BlobRequester import BlobRequester
|
|
from lbrynet.p2p.client.ConnectionManager import ConnectionManager
|
|
from lbrynet.p2p.client.DownloadManager import DownloadManager
|
|
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
|
|
from lbrynet.blob.client.CryptBlobHandler import CryptBlobHandler
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class StartFailedError(Exception):
|
|
pass
|
|
|
|
|
|
class AlreadyRunningError(Exception):
|
|
pass
|
|
|
|
|
|
class AlreadyStoppedError(Exception):
|
|
pass
|
|
|
|
|
|
class CurrentlyStoppingError(Exception):
|
|
pass
|
|
|
|
|
|
class CurrentlyStartingError(Exception):
|
|
pass
|
|
|
|
|
|
class CryptStreamDownloader:
|
|
|
|
#implements(IStreamDownloader)
|
|
|
|
def __init__(self, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet,
|
|
key, stream_name):
|
|
"""Initialize a CryptStreamDownloader
|
|
|
|
@param peer_finder: An object which implements the IPeerFinder
|
|
interface. Used to look up peers by a hashsum.
|
|
|
|
@param rate_limiter: An object which implements the IRateLimiter interface
|
|
|
|
@param blob_manager: A BlobManager object
|
|
|
|
@param payment_rate_manager: A NegotiatedPaymentRateManager object
|
|
|
|
@param wallet: An object which implements the IWallet interface
|
|
|
|
@return:
|
|
|
|
"""
|
|
|
|
self.peer_finder = peer_finder
|
|
self.rate_limiter = rate_limiter
|
|
self.blob_manager = blob_manager
|
|
self.payment_rate_manager = payment_rate_manager
|
|
self.wallet = wallet
|
|
self.key = unhexlify(key)
|
|
self.stream_name = unhexlify(stream_name).decode()
|
|
self.completed = False
|
|
self.stopped = True
|
|
self.stopping = False
|
|
self.starting = False
|
|
self.download_manager = None
|
|
self.finished_deferred = None
|
|
self.points_paid = 0.0
|
|
self.blob_requester = None
|
|
|
|
def __str__(self):
|
|
return str(self.stream_name)
|
|
|
|
def toggle_running(self):
|
|
if self.stopped is True:
|
|
return self.start()
|
|
else:
|
|
return self.stop()
|
|
|
|
def start(self):
|
|
if self.starting is True:
|
|
raise CurrentlyStartingError()
|
|
if self.stopping is True:
|
|
raise CurrentlyStoppingError()
|
|
if self.stopped is False:
|
|
raise AlreadyRunningError()
|
|
assert self.download_manager is None
|
|
self.starting = True
|
|
self.completed = False
|
|
self.finished_deferred = defer.Deferred()
|
|
d = self._start()
|
|
d.addCallback(lambda _: self.finished_deferred)
|
|
return d
|
|
|
|
@defer.inlineCallbacks
|
|
def stop(self, err=None):
|
|
if self.stopped is True:
|
|
raise AlreadyStoppedError()
|
|
if self.stopping is True:
|
|
raise CurrentlyStoppingError()
|
|
assert self.download_manager is not None
|
|
self.stopping = True
|
|
success = yield self.download_manager.stop_downloading()
|
|
self.stopping = False
|
|
if success is True:
|
|
self.stopped = True
|
|
self._remove_download_manager()
|
|
yield self._fire_completed_deferred(err)
|
|
|
|
def _start_failed(self):
|
|
|
|
def set_stopped():
|
|
self.stopped = True
|
|
self.stopping = False
|
|
self.starting = False
|
|
|
|
if self.download_manager is not None:
|
|
d = self.download_manager.stop_downloading()
|
|
d.addCallback(lambda _: self._remove_download_manager())
|
|
else:
|
|
d = defer.succeed(True)
|
|
d.addCallback(lambda _: set_stopped())
|
|
d.addCallback(lambda _: Failure(StartFailedError()))
|
|
return d
|
|
|
|
def _start(self):
|
|
|
|
def check_start_succeeded(success):
|
|
if success:
|
|
self.starting = False
|
|
self.stopped = False
|
|
self.completed = False
|
|
return True
|
|
else:
|
|
return self._start_failed()
|
|
|
|
self.download_manager = self._get_download_manager()
|
|
d = self.download_manager.start_downloading()
|
|
d.addCallbacks(check_start_succeeded)
|
|
return d
|
|
|
|
def _get_download_manager(self):
|
|
assert self.blob_requester is None
|
|
download_manager = DownloadManager(self.blob_manager)
|
|
# TODO: can we get rid of these circular references. I'm not
|
|
# smart enough to handle thinking about the interactions
|
|
# between them and have hope that there is a simpler way
|
|
# to accomplish what we want
|
|
download_manager.blob_info_finder = self._get_metadata_handler(download_manager)
|
|
download_manager.progress_manager = self._get_progress_manager(download_manager)
|
|
download_manager.blob_handler = self._get_blob_handler(download_manager)
|
|
download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger()
|
|
# blob_requester needs to be set before the connection manager is setup
|
|
self.blob_requester = self._get_blob_requester(download_manager)
|
|
download_manager.connection_manager = self._get_connection_manager(download_manager)
|
|
return download_manager
|
|
|
|
def _remove_download_manager(self):
|
|
self.download_manager.blob_info_finder = None
|
|
self.download_manager.progress_manager = None
|
|
self.download_manager.blob_handler = None
|
|
self.download_manager.wallet_info_exchanger = None
|
|
self.blob_requester = None
|
|
self.download_manager.connection_manager = None
|
|
self.download_manager = None
|
|
|
|
def _get_primary_request_creators(self, download_manager):
|
|
return [self.blob_requester]
|
|
|
|
def _get_secondary_request_creators(self, download_manager):
|
|
return [download_manager.wallet_info_exchanger]
|
|
|
|
def _get_metadata_handler(self, download_manager):
|
|
pass
|
|
|
|
def _get_blob_requester(self, download_manager):
|
|
return BlobRequester(self.blob_manager, self.peer_finder,
|
|
self.payment_rate_manager, self.wallet,
|
|
download_manager)
|
|
|
|
def _get_progress_manager(self, download_manager):
|
|
return FullStreamProgressManager(self._finished_downloading,
|
|
self.blob_manager, download_manager)
|
|
|
|
def _get_write_func(self):
|
|
pass
|
|
|
|
def _get_blob_handler(self, download_manager):
|
|
return CryptBlobHandler(self.key, self._get_write_func())
|
|
|
|
def _get_connection_manager(self, download_manager):
|
|
return ConnectionManager(self, self.rate_limiter,
|
|
self._get_primary_request_creators(download_manager),
|
|
self._get_secondary_request_creators(download_manager))
|
|
|
|
def _fire_completed_deferred(self, err=None):
|
|
self.finished_deferred, d = None, self.finished_deferred
|
|
if d is not None:
|
|
if err is not None:
|
|
d.errback(err)
|
|
else:
|
|
value = self._get_finished_deferred_callback_value()
|
|
d.callback(value)
|
|
else:
|
|
log.debug("Not firing the completed deferred because d is None")
|
|
|
|
def _get_finished_deferred_callback_value(self):
|
|
return None
|
|
|
|
def _finished_downloading(self, finished):
|
|
if finished is True:
|
|
self.completed = True
|
|
return self.stop()
|
|
|
|
def insufficient_funds(self, err):
|
|
return self.stop(err=err)
|