lbry-sdk/lbrynet/blob/client/CryptStreamDownloader.py

220 lines
7.3 KiB
Python
Raw Normal View History

2018-08-10 06:55:28 +02:00
from binascii import unhexlify
import logging
from lbrynet.p2p.client.BlobRequester import BlobRequester
from lbrynet.p2p.client.ConnectionManager import ConnectionManager
from lbrynet.p2p.client.DownloadManager import DownloadManager
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.blob.client.CryptBlobHandler import CryptBlobHandler
2015-08-20 17:27:15 +02:00
from twisted.internet import defer
from twisted.python.failure import Failure
log = logging.getLogger(__name__)
2015-08-20 17:27:15 +02:00
class StartFailedError(Exception):
pass
class AlreadyRunningError(Exception):
pass
class AlreadyStoppedError(Exception):
pass
class CurrentlyStoppingError(Exception):
pass
class CurrentlyStartingError(Exception):
pass
class CryptStreamDownloader:
2015-08-20 17:27:15 +02:00
#implements(IStreamDownloader)
2015-08-20 17:27:15 +02:00
def __init__(self, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet,
key, stream_name):
2016-11-30 21:20:45 +01:00
"""Initialize a CryptStreamDownloader
2015-08-20 17:27:15 +02:00
2016-11-30 21:20:45 +01:00
@param peer_finder: An object which implements the IPeerFinder
interface. Used to look up peers by a hashsum.
2015-08-20 17:27:15 +02:00
@param rate_limiter: An object which implements the IRateLimiter interface
@param blob_manager: A BlobManager object
@param payment_rate_manager: A NegotiatedPaymentRateManager object
2015-08-20 17:27:15 +02:00
@param wallet: An object which implements the IWallet interface
2015-08-20 17:27:15 +02:00
@return:
2016-11-30 21:20:45 +01:00
2015-08-20 17:27:15 +02:00
"""
self.peer_finder = peer_finder
self.rate_limiter = rate_limiter
self.blob_manager = blob_manager
self.payment_rate_manager = payment_rate_manager
self.wallet = wallet
2018-08-10 06:55:28 +02:00
self.key = unhexlify(key)
self.stream_name = unhexlify(stream_name).decode()
2015-08-20 17:27:15 +02:00
self.completed = False
self.stopped = True
self.stopping = False
self.starting = False
self.download_manager = None
self.finished_deferred = None
self.points_paid = 0.0
self.blob_requester = None
2015-08-20 17:27:15 +02:00
def __str__(self):
return str(self.stream_name)
2015-08-20 17:27:15 +02:00
def toggle_running(self):
if self.stopped is True:
return self.start()
else:
return self.stop()
def start(self):
if self.starting is True:
raise CurrentlyStartingError()
if self.stopping is True:
raise CurrentlyStoppingError()
if self.stopped is False:
raise AlreadyRunningError()
assert self.download_manager is None
self.starting = True
self.completed = False
self.finished_deferred = defer.Deferred()
2015-08-20 17:27:15 +02:00
d = self._start()
2016-12-30 07:51:03 +01:00
d.addCallback(lambda _: self.finished_deferred)
2015-08-20 17:27:15 +02:00
return d
2016-12-30 17:47:34 +01:00
@defer.inlineCallbacks
def stop(self, err=None):
2015-08-20 17:27:15 +02:00
if self.stopped is True:
raise AlreadyStoppedError()
if self.stopping is True:
raise CurrentlyStoppingError()
assert self.download_manager is not None
self.stopping = True
2016-12-30 17:47:34 +01:00
success = yield self.download_manager.stop_downloading()
self.stopping = False
if success is True:
self.stopped = True
self._remove_download_manager()
yield self._fire_completed_deferred(err)
2015-08-20 17:27:15 +02:00
def _start_failed(self):
def set_stopped():
self.stopped = True
self.stopping = False
self.starting = False
if self.download_manager is not None:
d = self.download_manager.stop_downloading()
d.addCallback(lambda _: self._remove_download_manager())
else:
d = defer.succeed(True)
d.addCallback(lambda _: set_stopped())
d.addCallback(lambda _: Failure(StartFailedError()))
return d
def _start(self):
def check_start_succeeded(success):
if success:
self.starting = False
self.stopped = False
self.completed = False
return True
else:
return self._start_failed()
self.download_manager = self._get_download_manager()
d = self.download_manager.start_downloading()
d.addCallbacks(check_start_succeeded)
return d
def _get_download_manager(self):
assert self.blob_requester is None
download_manager = DownloadManager(self.blob_manager)
2017-01-11 18:52:38 +01:00
# TODO: can we get rid of these circular references. I'm not
# smart enough to handle thinking about the interactions
# between them and have hope that there is a simpler way
# to accomplish what we want
2015-08-20 17:27:15 +02:00
download_manager.blob_info_finder = self._get_metadata_handler(download_manager)
download_manager.progress_manager = self._get_progress_manager(download_manager)
download_manager.blob_handler = self._get_blob_handler(download_manager)
download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger()
# blob_requester needs to be set before the connection manager is setup
self.blob_requester = self._get_blob_requester(download_manager)
2015-08-20 17:27:15 +02:00
download_manager.connection_manager = self._get_connection_manager(download_manager)
return download_manager
def _remove_download_manager(self):
self.download_manager.blob_info_finder = None
self.download_manager.progress_manager = None
self.download_manager.blob_handler = None
self.download_manager.wallet_info_exchanger = None
self.blob_requester = None
2015-08-20 17:27:15 +02:00
self.download_manager.connection_manager = None
self.download_manager = None
def _get_primary_request_creators(self, download_manager):
return [self.blob_requester]
2015-08-20 17:27:15 +02:00
def _get_secondary_request_creators(self, download_manager):
return [download_manager.wallet_info_exchanger]
def _get_metadata_handler(self, download_manager):
pass
def _get_blob_requester(self, download_manager):
2016-11-30 21:20:45 +01:00
return BlobRequester(self.blob_manager, self.peer_finder,
self.payment_rate_manager, self.wallet,
2015-08-20 17:27:15 +02:00
download_manager)
def _get_progress_manager(self, download_manager):
2016-11-30 21:20:45 +01:00
return FullStreamProgressManager(self._finished_downloading,
self.blob_manager, download_manager)
2015-08-20 17:27:15 +02:00
def _get_write_func(self):
pass
def _get_blob_handler(self, download_manager):
return CryptBlobHandler(self.key, self._get_write_func())
def _get_connection_manager(self, download_manager):
return ConnectionManager(self, self.rate_limiter,
self._get_primary_request_creators(download_manager),
self._get_secondary_request_creators(download_manager))
def _fire_completed_deferred(self, err=None):
2015-08-20 17:27:15 +02:00
self.finished_deferred, d = None, self.finished_deferred
if d is not None:
if err is not None:
d.errback(err)
else:
2016-12-30 17:47:34 +01:00
value = self._get_finished_deferred_callback_value()
d.callback(value)
else:
log.debug("Not firing the completed deferred because d is None")
2015-08-20 17:27:15 +02:00
def _get_finished_deferred_callback_value(self):
return None
def _finished_downloading(self, finished):
if finished is True:
self.completed = True
return self.stop()
def insufficient_funds(self, err):
2016-11-30 21:20:45 +01:00
return self.stop(err=err)