forked from LBRYCommunity/lbry-sdk
async lbrynet.stream
This commit is contained in:
parent
69b259c285
commit
4bf7742c19
24 changed files with 1085 additions and 2118 deletions
|
@ -1,221 +0,0 @@
|
|||
import logging
|
||||
from binascii import unhexlify
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.p2p.client.BlobRequester import BlobRequester
|
||||
from lbrynet.p2p.client.ConnectionManager import ConnectionManager
|
||||
from lbrynet.p2p.client.DownloadManager import DownloadManager
|
||||
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
|
||||
from lbrynet.blob.client.CryptBlobHandler import CryptBlobHandler
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StartFailedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AlreadyRunningError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AlreadyStoppedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CurrentlyStoppingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CurrentlyStartingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CryptStreamDownloader:
|
||||
|
||||
#implements(IStreamDownloader)
|
||||
|
||||
def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet,
|
||||
key, stream_name):
|
||||
"""Initialize a CryptStreamDownloader
|
||||
|
||||
@param peer_finder: An object which implements the IPeerFinder
|
||||
interface. Used to look up peers by a hashsum.
|
||||
|
||||
@param rate_limiter: An object which implements the IRateLimiter interface
|
||||
|
||||
@param blob_manager: A BlobManager object
|
||||
|
||||
@param payment_rate_manager: A NegotiatedPaymentRateManager object
|
||||
|
||||
@param wallet: An object which implements the IWallet interface
|
||||
|
||||
@return:
|
||||
|
||||
"""
|
||||
self.conf = conf
|
||||
self.peer_finder = peer_finder
|
||||
self.rate_limiter = rate_limiter
|
||||
self.blob_manager = blob_manager
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
self.wallet = wallet
|
||||
self.key = unhexlify(key)
|
||||
self.stream_name = unhexlify(stream_name).decode()
|
||||
self.completed = False
|
||||
self.stopped = True
|
||||
self.stopping = False
|
||||
self.starting = False
|
||||
self.download_manager = None
|
||||
self.finished_deferred = None
|
||||
self.points_paid = 0.0
|
||||
self.blob_requester = None
|
||||
|
||||
def __str__(self):
|
||||
return str(self.stream_name)
|
||||
|
||||
def toggle_running(self):
|
||||
if self.stopped is True:
|
||||
return self.start()
|
||||
else:
|
||||
return self.stop()
|
||||
|
||||
def start(self):
|
||||
if self.starting is True:
|
||||
raise CurrentlyStartingError()
|
||||
if self.stopping is True:
|
||||
raise CurrentlyStoppingError()
|
||||
if self.stopped is False:
|
||||
raise AlreadyRunningError()
|
||||
assert self.download_manager is None
|
||||
self.starting = True
|
||||
self.completed = False
|
||||
self.finished_deferred = defer.Deferred()
|
||||
d = self._start()
|
||||
d.addCallback(lambda _: self.finished_deferred)
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self, err=None):
|
||||
if self.stopped is True:
|
||||
raise AlreadyStoppedError()
|
||||
if self.stopping is True:
|
||||
raise CurrentlyStoppingError()
|
||||
assert self.download_manager is not None
|
||||
self.stopping = True
|
||||
success = yield self.download_manager.stop_downloading()
|
||||
self.stopping = False
|
||||
if success is True:
|
||||
self.stopped = True
|
||||
self._remove_download_manager()
|
||||
yield self._fire_completed_deferred(err)
|
||||
|
||||
def _start_failed(self):
|
||||
|
||||
def set_stopped():
|
||||
self.stopped = True
|
||||
self.stopping = False
|
||||
self.starting = False
|
||||
|
||||
if self.download_manager is not None:
|
||||
d = self.download_manager.stop_downloading()
|
||||
d.addCallback(lambda _: self._remove_download_manager())
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
d.addCallback(lambda _: set_stopped())
|
||||
d.addCallback(lambda _: Failure(StartFailedError()))
|
||||
return d
|
||||
|
||||
def _start(self):
|
||||
|
||||
def check_start_succeeded(success):
|
||||
if success:
|
||||
self.starting = False
|
||||
self.stopped = False
|
||||
self.completed = False
|
||||
return True
|
||||
else:
|
||||
return self._start_failed()
|
||||
|
||||
self.download_manager = self._get_download_manager()
|
||||
d = self.download_manager.start_downloading()
|
||||
d.addCallbacks(check_start_succeeded)
|
||||
return d
|
||||
|
||||
def _get_download_manager(self):
|
||||
assert self.blob_requester is None
|
||||
download_manager = DownloadManager(self.blob_manager)
|
||||
# TODO: can we get rid of these circular references. I'm not
|
||||
# smart enough to handle thinking about the interactions
|
||||
# between them and have hope that there is a simpler way
|
||||
# to accomplish what we want
|
||||
download_manager.blob_info_finder = self._get_metadata_handler(download_manager)
|
||||
download_manager.progress_manager = self._get_progress_manager(download_manager)
|
||||
download_manager.blob_handler = self._get_blob_handler(download_manager)
|
||||
download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger()
|
||||
# blob_requester needs to be set before the connection manager is setup
|
||||
self.blob_requester = self._get_blob_requester(download_manager)
|
||||
download_manager.connection_manager = self._get_connection_manager(download_manager)
|
||||
return download_manager
|
||||
|
||||
def _remove_download_manager(self):
|
||||
self.download_manager.blob_info_finder = None
|
||||
self.download_manager.progress_manager = None
|
||||
self.download_manager.blob_handler = None
|
||||
self.download_manager.wallet_info_exchanger = None
|
||||
self.blob_requester = None
|
||||
self.download_manager.connection_manager = None
|
||||
self.download_manager = None
|
||||
|
||||
def _get_primary_request_creators(self, download_manager):
|
||||
return [self.blob_requester]
|
||||
|
||||
def _get_secondary_request_creators(self, download_manager):
|
||||
return [download_manager.wallet_info_exchanger]
|
||||
|
||||
def _get_metadata_handler(self, download_manager):
|
||||
pass
|
||||
|
||||
def _get_blob_requester(self, download_manager):
|
||||
return BlobRequester(self.blob_manager, self.peer_finder,
|
||||
self.payment_rate_manager, self.wallet,
|
||||
download_manager)
|
||||
|
||||
def _get_progress_manager(self, download_manager):
|
||||
return FullStreamProgressManager(self._finished_downloading,
|
||||
self.blob_manager, download_manager)
|
||||
|
||||
def _get_write_func(self):
|
||||
pass
|
||||
|
||||
def _get_blob_handler(self, download_manager):
|
||||
return CryptBlobHandler(self.key, self._get_write_func())
|
||||
|
||||
def _get_connection_manager(self, download_manager):
|
||||
return ConnectionManager(self, self.rate_limiter,
|
||||
self._get_primary_request_creators(download_manager),
|
||||
self._get_secondary_request_creators(download_manager))
|
||||
|
||||
def _fire_completed_deferred(self, err=None):
|
||||
self.finished_deferred, d = None, self.finished_deferred
|
||||
if d is not None:
|
||||
if err is not None:
|
||||
d.errback(err)
|
||||
else:
|
||||
value = self._get_finished_deferred_callback_value()
|
||||
d.callback(value)
|
||||
else:
|
||||
log.debug("Not firing the completed deferred because d is None")
|
||||
|
||||
def _get_finished_deferred_callback_value(self):
|
||||
return None
|
||||
|
||||
def _finished_downloading(self, finished):
|
||||
if finished is True:
|
||||
self.completed = True
|
||||
return self.stop()
|
||||
|
||||
def insufficient_funds(self, err):
|
||||
return self.stop(err=err)
|
|
@ -1,199 +0,0 @@
|
|||
import os
|
||||
import logging
|
||||
import traceback
|
||||
from binascii import hexlify, unhexlify
|
||||
from twisted.internet import defer, threads
|
||||
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.p2p.StreamDescriptor import save_sd_info
|
||||
from lbrynet.blob.client.CryptStreamDownloader import CryptStreamDownloader
|
||||
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
|
||||
from lbrynet.p2p.Error import FileOpenError
|
||||
from lbrynet.blob.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EncryptedFileDownloader(CryptStreamDownloader):
|
||||
"""Classes which inherit from this class download LBRY files"""
|
||||
|
||||
def __init__(self, conf: Config, stream_hash, peer_finder, rate_limiter, blob_manager,
|
||||
storage, payment_rate_manager, wallet, key, stream_name, file_name):
|
||||
super().__init__(conf, peer_finder, rate_limiter, blob_manager,
|
||||
payment_rate_manager, wallet, key, stream_name)
|
||||
self.stream_hash = stream_hash
|
||||
self.storage = storage
|
||||
self.file_name = os.path.basename(unhexlify(file_name).decode())
|
||||
self._calculated_total_bytes = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_data(self):
|
||||
crypt_infos = yield f2d(self.storage.get_blobs_for_stream(self.stream_hash))
|
||||
blob_hashes = [b.blob_hash for b in crypt_infos if b.blob_hash]
|
||||
sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(self.stream_hash))
|
||||
blob_hashes.append(sd_hash)
|
||||
yield self.blob_manager.delete_blobs(blob_hashes)
|
||||
|
||||
def stop(self, err=None):
|
||||
self._close_output()
|
||||
return super().stop(err=err)
|
||||
|
||||
def _get_progress_manager(self, download_manager):
|
||||
return FullStreamProgressManager(self._finished_downloading,
|
||||
self.blob_manager, download_manager)
|
||||
|
||||
def _start(self):
|
||||
def check_start_succeeded(success):
|
||||
if success:
|
||||
self.starting = False
|
||||
self.stopped = False
|
||||
self.completed = False
|
||||
return True
|
||||
else:
|
||||
return self._start_failed()
|
||||
|
||||
self.download_manager = self._get_download_manager()
|
||||
d = self._setup_output()
|
||||
d.addCallback(lambda _: self.download_manager.start_downloading())
|
||||
d.addCallbacks(check_start_succeeded)
|
||||
return d
|
||||
|
||||
def _setup_output(self):
|
||||
pass
|
||||
|
||||
def _close_output(self):
|
||||
pass
|
||||
|
||||
async def get_total_bytes(self):
|
||||
blobs = await self.storage.get_blobs_for_stream(self.stream_hash)
|
||||
return sum([b.length for b in blobs])
|
||||
|
||||
def get_total_bytes_cached(self):
|
||||
if self._calculated_total_bytes is None or self._calculated_total_bytes == 0:
|
||||
if self.download_manager is None:
|
||||
return 0
|
||||
else:
|
||||
self._calculated_total_bytes = self.download_manager.calculate_total_bytes()
|
||||
return self._calculated_total_bytes
|
||||
|
||||
def get_bytes_left_to_output(self):
|
||||
if self.download_manager is not None:
|
||||
return self.download_manager.calculate_bytes_left_to_output()
|
||||
else:
|
||||
return 0
|
||||
|
||||
def get_bytes_left_to_download(self):
|
||||
if self.download_manager is not None:
|
||||
return self.download_manager.calculate_bytes_left_to_download()
|
||||
else:
|
||||
return 0
|
||||
|
||||
def _get_metadata_handler(self, download_manager):
|
||||
return EncryptedFileMetadataHandler(self.stream_hash,
|
||||
self.storage, download_manager)
|
||||
|
||||
|
||||
class EncryptedFileDownloaderFactory:
|
||||
#implements(IStreamDownloaderFactory)
|
||||
|
||||
def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, storage, wallet):
|
||||
self.conf = conf
|
||||
self.peer_finder = peer_finder
|
||||
self.rate_limiter = rate_limiter
|
||||
self.blob_manager = blob_manager
|
||||
self.storage = storage
|
||||
self.wallet = wallet
|
||||
|
||||
def can_download(self, sd_validator):
|
||||
return True
|
||||
|
||||
def make_downloader(self, metadata, options, payment_rate_manager, **kwargs):
|
||||
assert len(options) == 1
|
||||
data_rate = options[0]
|
||||
payment_rate_manager.min_blob_data_payment_rate = data_rate
|
||||
|
||||
def save_source_if_blob(stream_hash):
|
||||
return defer.succeed(metadata.source_blob_hash)
|
||||
|
||||
def create_downloader(stream_hash):
|
||||
downloader = self._make_downloader(stream_hash, payment_rate_manager,
|
||||
metadata.validator.raw_info)
|
||||
return defer.succeed(downloader)
|
||||
|
||||
d = save_sd_info(self.blob_manager, metadata.source_blob_hash, metadata.validator.raw_info)
|
||||
d.addCallback(save_source_if_blob)
|
||||
d.addCallback(create_downloader)
|
||||
return d
|
||||
|
||||
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info):
|
||||
pass
|
||||
|
||||
|
||||
class EncryptedFileSaver(EncryptedFileDownloader):
|
||||
def __init__(self, conf: Config, stream_hash, peer_finder, rate_limiter, blob_manager, storage,
|
||||
payment_rate_manager, wallet, download_directory, key, stream_name, file_name):
|
||||
super().__init__(conf, stream_hash, peer_finder, rate_limiter,
|
||||
blob_manager, storage, payment_rate_manager,
|
||||
wallet, key, stream_name, file_name)
|
||||
self.download_directory = unhexlify(download_directory).decode()
|
||||
self.file_written_to = os.path.join(self.download_directory, unhexlify(file_name).decode())
|
||||
self.file_handle = None
|
||||
|
||||
def __str__(self):
|
||||
return str(self.file_written_to)
|
||||
|
||||
def _get_progress_manager(self, download_manager):
|
||||
return FullStreamProgressManager(self._finished_downloading,
|
||||
self.blob_manager,
|
||||
download_manager)
|
||||
|
||||
def _setup_output(self):
|
||||
def open_file():
|
||||
if self.file_handle is None:
|
||||
file_written_to = os.path.join(self.download_directory, self.file_name)
|
||||
try:
|
||||
self.file_handle = open(file_written_to, 'wb')
|
||||
self.file_written_to = file_written_to
|
||||
except IOError:
|
||||
log.error(traceback.format_exc())
|
||||
raise FileOpenError(
|
||||
"Failed to open %s. Make sure you have permission to save files to that"
|
||||
" location." % file_written_to
|
||||
)
|
||||
return threads.deferToThread(open_file)
|
||||
|
||||
def _close_output(self):
|
||||
self.file_handle, file_handle = None, self.file_handle
|
||||
if file_handle is not None:
|
||||
name = file_handle.name
|
||||
file_handle.close()
|
||||
if self.completed is False:
|
||||
os.remove(name)
|
||||
|
||||
def _get_write_func(self):
|
||||
def write_func(data):
|
||||
if self.stopped is False and self.file_handle is not None:
|
||||
self.file_handle.write(data)
|
||||
return write_func
|
||||
|
||||
|
||||
class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory):
|
||||
def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, storage, wallet, download_directory):
|
||||
super().__init__(conf, peer_finder, rate_limiter, blob_manager, storage, wallet)
|
||||
self.download_directory = hexlify(download_directory.encode())
|
||||
|
||||
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info):
|
||||
stream_name = stream_info.raw_info['stream_name']
|
||||
key = stream_info.raw_info['key']
|
||||
suggested_file_name = stream_info.raw_info['suggested_file_name']
|
||||
return EncryptedFileSaver(
|
||||
self.conf, stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.storage,
|
||||
payment_rate_manager, self.wallet, self.download_directory, key=key, stream_name=stream_name,
|
||||
file_name=suggested_file_name
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_description():
|
||||
return "Save"
|
|
@ -1,40 +0,0 @@
|
|||
import logging
|
||||
from twisted.internet import defer
|
||||
from lbrynet.extras.compat import f2d
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EncryptedFileMetadataHandler:
|
||||
|
||||
def __init__(self, stream_hash, storage, download_manager):
|
||||
self.stream_hash = stream_hash
|
||||
self.storage = storage
|
||||
self.download_manager = download_manager
|
||||
self._final_blob_num = None
|
||||
|
||||
######### IMetadataHandler #########
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_initial_blobs(self):
|
||||
blob_infos = yield f2d(self.storage.get_blobs_for_stream(self.stream_hash))
|
||||
return self._format_initial_blobs_for_download_manager(blob_infos)
|
||||
|
||||
def final_blob_num(self):
|
||||
return self._final_blob_num
|
||||
|
||||
######### internal calls #########
|
||||
|
||||
def _format_initial_blobs_for_download_manager(self, blob_infos):
|
||||
infos = []
|
||||
for i, crypt_blob in enumerate(blob_infos):
|
||||
if crypt_blob.blob_hash is not None and crypt_blob.length:
|
||||
infos.append(crypt_blob)
|
||||
else:
|
||||
if i != len(blob_infos) - 1:
|
||||
raise Exception("Invalid stream terminator: %i of %i" %
|
||||
(i, len(blob_infos) - 1))
|
||||
log.debug("Setting _final_blob_num to %s", str(crypt_blob.blob_num - 1))
|
||||
self._final_blob_num = crypt_blob.blob_num - 1
|
||||
return infos
|
|
@ -1,47 +0,0 @@
|
|||
from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamType
|
||||
from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamDescriptorValidator
|
||||
from lbrynet.p2p.DownloadOption import DownloadOption, DownloadOptionChoice
|
||||
|
||||
|
||||
def add_lbry_file_to_sd_identifier(sd_identifier):
|
||||
sd_identifier.add_stream_type(EncryptedFileStreamType, EncryptedFileStreamDescriptorValidator,
|
||||
EncryptedFileOptions())
|
||||
|
||||
|
||||
class EncryptedFileOptions:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def get_downloader_options(self, sd_validator, payment_rate_manager):
|
||||
prm = payment_rate_manager
|
||||
|
||||
def get_default_data_rate_description():
|
||||
if prm.base.min_blob_data_payment_rate is None:
|
||||
return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)
|
||||
else:
|
||||
return "%f LBC/MB" % prm.base.min_blob_data_payment_rate
|
||||
|
||||
rate_choices = []
|
||||
rate_choices.append(DownloadOptionChoice(
|
||||
prm.base.min_blob_data_payment_rate,
|
||||
"No change - %s" % get_default_data_rate_description(),
|
||||
"No change - %s" % get_default_data_rate_description()))
|
||||
if prm.base.min_blob_data_payment_rate is not None:
|
||||
rate_choices.append(DownloadOptionChoice(
|
||||
None,
|
||||
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate),
|
||||
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)))
|
||||
rate_choices.append(DownloadOptionChoice(float,
|
||||
"Enter rate in LBC/MB",
|
||||
"Enter rate in LBC/MB"))
|
||||
|
||||
options = [
|
||||
DownloadOption(
|
||||
rate_choices,
|
||||
"Rate which will be paid for data",
|
||||
"data payment rate",
|
||||
prm.base.min_blob_data_payment_rate,
|
||||
get_default_data_rate_description()
|
||||
),
|
||||
]
|
||||
return options
|
|
@ -1,209 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
from twisted.internet import defer
|
||||
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.schema.fee import Fee
|
||||
|
||||
from lbrynet.p2p.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError
|
||||
from lbrynet.p2p.Error import DownloadDataTimeout, DownloadCanceledError
|
||||
from lbrynet.p2p.StreamDescriptor import download_sd_blob
|
||||
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||
from torba.client.constants import COIN
|
||||
from lbrynet.extras.wallet.dewies import dewies_to_lbc
|
||||
from lbrynet.extras.compat import f2d
|
||||
|
||||
INITIALIZING_CODE = 'initializing'
|
||||
DOWNLOAD_METADATA_CODE = 'downloading_metadata'
|
||||
DOWNLOAD_TIMEOUT_CODE = 'timeout'
|
||||
DOWNLOAD_RUNNING_CODE = 'running'
|
||||
DOWNLOAD_STOPPED_CODE = 'stopped'
|
||||
STREAM_STAGES = [
|
||||
(INITIALIZING_CODE, 'Initializing'),
|
||||
(DOWNLOAD_METADATA_CODE, 'Downloading metadata'),
|
||||
(DOWNLOAD_RUNNING_CODE, 'Started stream'),
|
||||
(DOWNLOAD_STOPPED_CODE, 'Paused stream'),
|
||||
(DOWNLOAD_TIMEOUT_CODE, 'Stream timed out')
|
||||
]
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GetStream:
|
||||
def __init__(self, conf: Config, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder,
|
||||
rate_limiter, payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None,
|
||||
timeout=None, reactor=None):
|
||||
if not reactor:
|
||||
from twisted.internet import reactor
|
||||
self.conf = conf
|
||||
self.reactor = reactor
|
||||
self.timeout = timeout or conf.download_timeout
|
||||
self.data_rate = data_rate or conf.data_rate
|
||||
self.max_key_fee = max_key_fee or conf.max_key_fee
|
||||
self.disable_max_key_fee = disable_max_key_fee or conf.disable_max_key_fee
|
||||
self.download_directory = conf.download_dir
|
||||
self.timeout_counter = 0
|
||||
self.code = None
|
||||
self.sd_hash = None
|
||||
self.blob_manager = blob_manager
|
||||
self.peer_finder = peer_finder
|
||||
self.rate_limiter = rate_limiter
|
||||
self.wallet = wallet
|
||||
self.exchange_rate_manager = exchange_rate_manager
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
self.sd_identifier = sd_identifier
|
||||
self.storage = storage
|
||||
self.downloader = None
|
||||
|
||||
# fired when the download is complete
|
||||
self.finished_deferred = None
|
||||
# fired after the metadata and the first data blob have been downloaded
|
||||
self.data_downloading_deferred = defer.Deferred(None)
|
||||
self.wrote_data = False
|
||||
|
||||
@property
|
||||
def download_path(self):
|
||||
return os.path.join(self.download_directory, self.downloader.file_name)
|
||||
|
||||
def convert_max_fee(self):
|
||||
currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount']
|
||||
return self.exchange_rate_manager.convert_currency(currency, "LBC", amount)
|
||||
|
||||
def set_status(self, status, name):
|
||||
log.info("Download lbry://%s status changed to %s" % (name, status))
|
||||
self.code = next(s for s in STREAM_STAGES if s[0] == status)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_fee_and_convert(self, fee):
|
||||
max_key_fee_amount = self.convert_max_fee()
|
||||
converted_fee_amount = self.exchange_rate_manager.convert_currency(fee.currency, "LBC",
|
||||
fee.amount)
|
||||
if converted_fee_amount > (yield f2d(self.wallet.default_account.get_balance())):
|
||||
raise InsufficientFundsError('Unable to pay the key fee of %s' % converted_fee_amount)
|
||||
if converted_fee_amount > max_key_fee_amount and not self.disable_max_key_fee:
|
||||
raise KeyFeeAboveMaxAllowed('Key fee {} above max allowed {}'.format(converted_fee_amount,
|
||||
max_key_fee_amount))
|
||||
converted_fee = {
|
||||
'currency': 'LBC',
|
||||
'amount': converted_fee_amount,
|
||||
'address': fee.address
|
||||
}
|
||||
return Fee(converted_fee)
|
||||
|
||||
def get_downloader_factory(self, factories):
|
||||
for factory in factories:
|
||||
if isinstance(factory, ManagedEncryptedFileDownloaderFactory):
|
||||
return factory
|
||||
raise Exception(f'No suitable factory was found in {factories}')
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_downloader(self, factory, stream_metadata, file_name=None):
|
||||
# TODO: we should use stream_metadata.options.get_downloader_options
|
||||
# instead of hard-coding the options to be [self.data_rate]
|
||||
downloader = yield factory.make_downloader(
|
||||
stream_metadata,
|
||||
self.data_rate,
|
||||
self.payment_rate_manager,
|
||||
self.download_directory,
|
||||
file_name=file_name
|
||||
)
|
||||
defer.returnValue(downloader)
|
||||
|
||||
def _pay_key_fee(self, address, fee_lbc, name):
|
||||
log.info("Pay key fee %s --> %s", dewies_to_lbc(fee_lbc), address)
|
||||
reserved_points = self.wallet.reserve_points(address, fee_lbc)
|
||||
if reserved_points is None:
|
||||
raise InsufficientFundsError(
|
||||
'Unable to pay the key fee of {} for {}'.format(dewies_to_lbc(fee_lbc), name)
|
||||
)
|
||||
return f2d(self.wallet.send_points_to_address(reserved_points, fee_lbc))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def pay_key_fee(self, fee, name):
|
||||
if fee is not None:
|
||||
yield self._pay_key_fee(fee.address.decode(), int(fee.amount * COIN), name)
|
||||
else:
|
||||
defer.returnValue(None)
|
||||
|
||||
def finish(self, results, name):
|
||||
self.set_status(DOWNLOAD_STOPPED_CODE, name)
|
||||
log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6],
|
||||
self.download_path)
|
||||
return defer.succeed(self.download_path)
|
||||
|
||||
def fail(self, err):
|
||||
raise err
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _initialize(self, stream_info):
|
||||
# Set sd_hash and return key_fee from stream_info
|
||||
self.sd_hash = stream_info.source_hash.decode()
|
||||
key_fee = None
|
||||
if stream_info.has_fee:
|
||||
key_fee = yield self.check_fee_and_convert(stream_info.source_fee)
|
||||
defer.returnValue(key_fee)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _create_downloader(self, sd_blob, file_name=None):
|
||||
stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob)
|
||||
factory = self.get_downloader_factory(stream_metadata.factories)
|
||||
downloader = yield self.get_downloader(factory, stream_metadata, file_name)
|
||||
defer.returnValue(downloader)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _download_sd_blob(self):
|
||||
sd_blob = yield download_sd_blob(
|
||||
self.conf, self.sd_hash, self.blob_manager, self.peer_finder, self.rate_limiter, self.payment_rate_manager,
|
||||
self.wallet, self.timeout, self.conf.download_mirrors
|
||||
)
|
||||
defer.returnValue(sd_blob)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _download(self, sd_blob, name, key_fee, txid, nout, file_name=None):
|
||||
self.downloader = yield self._create_downloader(sd_blob, file_name=file_name)
|
||||
yield self.pay_key_fee(key_fee, name)
|
||||
yield f2d(self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)))
|
||||
self.finished_deferred = self.downloader.start()
|
||||
self.downloader.download_manager.progress_manager.wrote_first_data.addCallback(
|
||||
self.data_downloading_deferred.callback
|
||||
)
|
||||
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start(self, stream_info, name, txid, nout, file_name=None):
|
||||
"""
|
||||
Start download
|
||||
|
||||
Returns:
|
||||
(tuple) Tuple containing (downloader, finished_deferred)
|
||||
|
||||
downloader - instance of ManagedEncryptedFileDownloader
|
||||
finished_deferred - deferred callbacked when download is finished
|
||||
"""
|
||||
self.set_status(INITIALIZING_CODE, name)
|
||||
key_fee = yield self._initialize(stream_info)
|
||||
self.set_status(DOWNLOAD_METADATA_CODE, name)
|
||||
try:
|
||||
sd_blob = yield self._download_sd_blob()
|
||||
yield self._download(sd_blob, name, key_fee, txid, nout, file_name)
|
||||
self.set_status(DOWNLOAD_RUNNING_CODE, name)
|
||||
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
|
||||
self.data_downloading_deferred.addTimeout(self.timeout, self.reactor)
|
||||
try:
|
||||
yield self.data_downloading_deferred
|
||||
self.wrote_data = True
|
||||
except defer.TimeoutError:
|
||||
raise DownloadDataTimeout("data download timed out")
|
||||
except (DownloadDataTimeout, InvalidStreamDescriptorError) as err:
|
||||
raise err
|
||||
|
||||
defer.returnValue((self.downloader, self.finished_deferred))
|
||||
|
||||
def cancel(self, reason=None):
|
||||
if reason:
|
||||
msg = "download stream cancelled: %s" % reason
|
||||
else:
|
||||
msg = "download stream cancelled"
|
||||
if self.data_downloading_deferred and not self.data_downloading_deferred.called:
|
||||
self.data_downloading_deferred.errback(DownloadCanceledError(msg))
|
|
@ -1,77 +0,0 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
from lbrynet.blob.EncryptedFileCreator import create_lbry_file
|
||||
from lbrynet.extras.daemon.mime_types import guess_media_type
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def d2f(d):
|
||||
return d.asFuture(asyncio.get_event_loop())
|
||||
|
||||
|
||||
class Publisher:
|
||||
def __init__(self, account, blob_manager, payment_rate_manager, storage,
|
||||
lbry_file_manager, wallet, certificate):
|
||||
self.account = account
|
||||
self.blob_manager = blob_manager
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
self.storage = storage
|
||||
self.lbry_file_manager = lbry_file_manager
|
||||
self.wallet = wallet
|
||||
self.certificate = certificate
|
||||
self.lbry_file = None
|
||||
|
||||
async def create_and_publish_stream(self, name, bid, claim_dict, file_path, holding_address=None):
|
||||
"""Create lbry file and make claim"""
|
||||
log.info('Starting publish for %s', name)
|
||||
if not os.path.isfile(file_path):
|
||||
raise Exception(f"File {file_path} not found")
|
||||
if os.path.getsize(file_path) == 0:
|
||||
raise Exception(f"Cannot publish empty file {file_path}")
|
||||
|
||||
file_name = os.path.basename(file_path)
|
||||
with open(file_path, 'rb') as read_handle:
|
||||
self.lbry_file = await d2f(create_lbry_file(
|
||||
self.blob_manager, self.storage, self.payment_rate_manager, self.lbry_file_manager, file_name,
|
||||
read_handle
|
||||
))
|
||||
|
||||
if 'source' not in claim_dict['stream']:
|
||||
claim_dict['stream']['source'] = {}
|
||||
claim_dict['stream']['source']['source'] = self.lbry_file.sd_hash
|
||||
claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash'
|
||||
claim_dict['stream']['source']['contentType'] = guess_media_type(file_path)
|
||||
claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here
|
||||
tx = await self.wallet.claim_name(
|
||||
self.account, name, bid, claim_dict, self.certificate, holding_address
|
||||
)
|
||||
|
||||
# check if we have a file already for this claim (if this is a publish update with a new stream)
|
||||
old_stream_hashes = await self.storage.get_old_stream_hashes_for_claim_id(
|
||||
tx.outputs[0].claim_id, self.lbry_file.stream_hash
|
||||
)
|
||||
if old_stream_hashes:
|
||||
for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes,
|
||||
list(self.lbry_file_manager.lbry_files)):
|
||||
await d2f(self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False))
|
||||
log.info("Removed old stream for claim update: %s", lbry_file.stream_hash)
|
||||
|
||||
await self.storage.save_content_claim(
|
||||
self.lbry_file.stream_hash, tx.outputs[0].id
|
||||
)
|
||||
return tx
|
||||
|
||||
async def publish_stream(self, name, bid, claim_dict, stream_hash, holding_address=None):
|
||||
"""Make a claim without creating a lbry file"""
|
||||
tx = await self.wallet.claim_name(
|
||||
self.account, name, bid, claim_dict, self.certificate, holding_address
|
||||
)
|
||||
if stream_hash: # the stream_hash returned from the db will be None if this isn't a stream we have
|
||||
await self.storage.save_content_claim(
|
||||
stream_hash, tx.outputs[0].id
|
||||
)
|
||||
self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0]
|
||||
return tx
|
|
@ -1,21 +0,0 @@
|
|||
class DownloadOptionChoice:
|
||||
"""A possible choice that can be picked for some option.
|
||||
|
||||
An option can have one or more choices that can be picked from.
|
||||
"""
|
||||
def __init__(self, value, short_description, long_description, bool_options_description=None):
|
||||
self.value = value
|
||||
self.short_description = short_description
|
||||
self.long_description = long_description
|
||||
self.bool_options_description = bool_options_description
|
||||
|
||||
|
||||
class DownloadOption:
|
||||
"""An option for a user to select a value from several different choices."""
|
||||
def __init__(self, option_types, long_description, short_description, default_value,
|
||||
default_value_description):
|
||||
self.option_types = option_types
|
||||
self.long_description = long_description
|
||||
self.short_description = short_description
|
||||
self.default_value = default_value
|
||||
self.default_value_description = default_value_description
|
|
@ -1,190 +0,0 @@
|
|||
import logging
|
||||
import treq
|
||||
from random import choice
|
||||
from twisted.internet import defer, task
|
||||
from twisted.internet.error import ConnectingCancelledError
|
||||
from twisted.web._newclient import ResponseNeverReceived
|
||||
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.p2p.Error import DownloadCanceledError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HTTPBlobDownloader:
|
||||
'''
|
||||
A downloader that is able to get blobs from HTTP mirrors.
|
||||
Note that when a blob gets downloaded from a mirror or from a peer, BlobManager will mark it as completed
|
||||
and cause any other type of downloader to progress to the next missing blob. Also, BlobFile is naturally able
|
||||
to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between
|
||||
different types of downloaders.
|
||||
'''
|
||||
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True,
|
||||
clock=None):
|
||||
if not clock:
|
||||
from twisted.internet import reactor
|
||||
self.clock = reactor
|
||||
else:
|
||||
self.clock = clock
|
||||
self.blob_manager = blob_manager
|
||||
self.servers = servers or []
|
||||
self.client = client or treq
|
||||
self.blob_hashes = blob_hashes or []
|
||||
self.missing_blob_hashes = []
|
||||
self.downloaded_blob_hashes = []
|
||||
self.sd_hashes = sd_hashes or []
|
||||
self.head_blob_hashes = []
|
||||
self.max_failures = 3
|
||||
self.semaphore = defer.DeferredSemaphore(2)
|
||||
self.deferreds = []
|
||||
self.writers = []
|
||||
self.retry = retry
|
||||
self.looping_call = task.LoopingCall(self._download_lc)
|
||||
self.looping_call.clock = self.clock
|
||||
self.finished_deferred = defer.Deferred()
|
||||
self.finished_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||
self.short_delay = 30
|
||||
self.long_delay = 600
|
||||
self.delay = self.short_delay
|
||||
self.last_missing = 10000000
|
||||
self.lc_deferred = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start(self):
|
||||
if not self.looping_call.running:
|
||||
self.lc_deferred = self.looping_call.start(self.short_delay, now=True)
|
||||
self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||
yield self.finished_deferred
|
||||
|
||||
def stop(self):
|
||||
for d in reversed(self.deferreds):
|
||||
d.cancel()
|
||||
while self.writers:
|
||||
writer = self.writers.pop()
|
||||
writer.close(DownloadCanceledError())
|
||||
self.blob_hashes = []
|
||||
if self.looping_call.running:
|
||||
self.looping_call.stop()
|
||||
if self.lc_deferred and not self.lc_deferred.called:
|
||||
self.lc_deferred.cancel()
|
||||
if not self.finished_deferred.called:
|
||||
self.finished_deferred.cancel()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _download_lc(self):
|
||||
delay = yield self._download_and_get_retry_delay()
|
||||
log.debug("delay: %s, missing: %i, downloaded from mirror: %i", delay, len(self.missing_blob_hashes),
|
||||
len(self.downloaded_blob_hashes))
|
||||
while self.missing_blob_hashes:
|
||||
self.blob_hashes.append(self.missing_blob_hashes.pop())
|
||||
if not delay:
|
||||
if self.looping_call.running:
|
||||
self.looping_call.stop()
|
||||
if not self.finished_deferred.called:
|
||||
log.debug("mirror finished")
|
||||
self.finished_deferred.callback(None)
|
||||
elif delay and delay != self.delay:
|
||||
if delay == self.long_delay:
|
||||
log.debug("mirror not making progress, trying less frequently")
|
||||
elif delay == self.short_delay:
|
||||
log.debug("queueing retry of %i blobs", len(self.missing_blob_hashes))
|
||||
if self.looping_call.running:
|
||||
self.looping_call.stop()
|
||||
self.delay = delay
|
||||
self.looping_call = task.LoopingCall(self._download_lc)
|
||||
self.looping_call.clock = self.clock
|
||||
self.lc_deferred = self.looping_call.start(self.delay, now=False)
|
||||
self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||
yield self.finished_deferred
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _download_and_get_retry_delay(self):
|
||||
if self.blob_hashes and self.servers:
|
||||
if self.sd_hashes:
|
||||
log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8])
|
||||
else:
|
||||
log.debug("trying to download %i blobs from mirror", len(self.blob_hashes))
|
||||
blobs = {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes}
|
||||
self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes]
|
||||
yield defer.DeferredList(self.deferreds)
|
||||
if self.retry and self.missing_blob_hashes:
|
||||
if not self.downloaded_blob_hashes:
|
||||
defer.returnValue(self.long_delay)
|
||||
if len(self.missing_blob_hashes) < self.last_missing:
|
||||
self.last_missing = len(self.missing_blob_hashes)
|
||||
defer.returnValue(self.short_delay)
|
||||
if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing:
|
||||
defer.returnValue(self.long_delay)
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _download_blob(self, blob):
|
||||
for _ in range(self.max_failures):
|
||||
writer, finished_deferred = blob.open_for_writing('mirror')
|
||||
self.writers.append(writer)
|
||||
try:
|
||||
downloaded = yield self._write_blob(writer, blob)
|
||||
if downloaded:
|
||||
yield finished_deferred # yield for verification errors, so we log them
|
||||
if blob.verified:
|
||||
log.info('Mirror completed download for %s', blob.blob_hash)
|
||||
should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes
|
||||
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
|
||||
self.downloaded_blob_hashes.append(blob.blob_hash)
|
||||
break
|
||||
except (IOError, Exception, defer.CancelledError, ConnectingCancelledError, ResponseNeverReceived) as e:
|
||||
if isinstance(
|
||||
e, (DownloadCanceledError, defer.CancelledError, ConnectingCancelledError,
|
||||
ResponseNeverReceived)
|
||||
) or 'closed file' in str(e):
|
||||
# some other downloader finished first or it was simply cancelled
|
||||
log.info("Mirror download cancelled: %s", blob.blob_hash)
|
||||
break
|
||||
else:
|
||||
log.exception('Mirror failed downloading')
|
||||
finally:
|
||||
finished_deferred.addBoth(lambda _: None) # suppress echoed errors
|
||||
if 'mirror' in blob.writers:
|
||||
writer.close()
|
||||
self.writers.remove(writer)
|
||||
|
||||
def download_blob(self, blob):
|
||||
if not blob.verified:
|
||||
d = self.semaphore.run(self._download_blob, blob)
|
||||
d.addErrback(lambda err: err.trap(defer.TimeoutError, defer.CancelledError))
|
||||
return d
|
||||
return defer.succeed(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _write_blob(self, writer, blob):
|
||||
response = yield self.client.get(url_for('{}:{}'.format(*choice(self.servers)), blob.blob_hash))
|
||||
if response.code != 200:
|
||||
log.debug('Missing a blob: %s', blob.blob_hash)
|
||||
if blob.blob_hash in self.blob_hashes:
|
||||
self.blob_hashes.remove(blob.blob_hash)
|
||||
if blob.blob_hash not in self.missing_blob_hashes:
|
||||
self.missing_blob_hashes.append(blob.blob_hash)
|
||||
defer.returnValue(False)
|
||||
|
||||
log.debug('Download started: %s', blob.blob_hash)
|
||||
blob.set_length(response.length)
|
||||
yield self.client.collect(response, writer.write)
|
||||
defer.returnValue(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def download_stream(self, stream_hash, sd_hash):
|
||||
stream_crypt_blobs = yield f2d(self.blob_manager.storage.get_blobs_for_stream(stream_hash))
|
||||
self.blob_hashes.extend([
|
||||
b.blob_hash for b in stream_crypt_blobs
|
||||
if b.blob_hash and b.blob_hash not in self.blob_hashes
|
||||
])
|
||||
if sd_hash not in self.sd_hashes:
|
||||
self.sd_hashes.append(sd_hash)
|
||||
head_blob_hash = stream_crypt_blobs[0].blob_hash
|
||||
if head_blob_hash not in self.head_blob_hashes:
|
||||
self.head_blob_hashes.append(head_blob_hash)
|
||||
yield self.start()
|
||||
|
||||
|
||||
def url_for(server, blob_hash=''):
|
||||
return f'http://{server}/{blob_hash}'
|
|
@ -1,62 +0,0 @@
|
|||
from decimal import Decimal
|
||||
|
||||
|
||||
class Offer:
|
||||
"""A rate offer to download blobs from a host."""
|
||||
|
||||
RATE_ACCEPTED = "RATE_ACCEPTED"
|
||||
RATE_TOO_LOW = "RATE_TOO_LOW"
|
||||
RATE_UNSET = "RATE_UNSET"
|
||||
|
||||
def __init__(self, offer):
|
||||
self._state = None
|
||||
self.rate = None
|
||||
if isinstance(offer, Decimal):
|
||||
self.rate = round(offer, 5)
|
||||
elif isinstance(offer, float):
|
||||
self.rate = round(Decimal(offer), 5)
|
||||
if self.rate is None or self.rate < Decimal(0.0):
|
||||
self.unset()
|
||||
|
||||
@property
|
||||
def is_accepted(self):
|
||||
return self._state is Offer.RATE_ACCEPTED
|
||||
|
||||
@property
|
||||
def is_too_low(self):
|
||||
return self._state is Offer.RATE_TOO_LOW
|
||||
|
||||
@property
|
||||
def is_unset(self):
|
||||
return self._state is Offer.RATE_UNSET
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
if self.is_accepted:
|
||||
return Offer.RATE_ACCEPTED
|
||||
elif self.is_too_low:
|
||||
return Offer.RATE_TOO_LOW
|
||||
elif self.is_unset:
|
||||
return Offer.RATE_UNSET
|
||||
return None
|
||||
|
||||
def accept(self):
|
||||
if self.is_unset or self._state is None:
|
||||
self._state = Offer.RATE_ACCEPTED
|
||||
|
||||
def reject(self):
|
||||
if self.is_unset or self._state is None:
|
||||
self._state = Offer.RATE_TOO_LOW
|
||||
|
||||
def unset(self):
|
||||
self._state = Offer.RATE_UNSET
|
||||
|
||||
def handle(self, reply_message):
|
||||
if reply_message == Offer.RATE_TOO_LOW:
|
||||
self.reject()
|
||||
elif reply_message == Offer.RATE_ACCEPTED:
|
||||
self.accept()
|
||||
elif reply_message == Offer.RATE_UNSET:
|
||||
self.unset()
|
||||
else:
|
||||
raise Exception("Unknown offer reply %s" % str(reply_message))
|
|
@ -1,126 +0,0 @@
|
|||
from decimal import Decimal
|
||||
from lbrynet.p2p.Strategy import get_default_strategy, OnlyFreeStrategy
|
||||
|
||||
|
||||
class BasePaymentRateManager:
|
||||
def __init__(self, rate, info_rate):
|
||||
self.min_blob_data_payment_rate = rate
|
||||
self.min_blob_info_payment_rate = info_rate
|
||||
|
||||
|
||||
class PaymentRateManager:
|
||||
def __init__(self, base, rate=None):
|
||||
"""
|
||||
@param base: a BasePaymentRateManager
|
||||
|
||||
@param rate: the min blob data payment rate
|
||||
"""
|
||||
self.base = base
|
||||
self.min_blob_data_payment_rate = rate
|
||||
self.points_paid = 0.0
|
||||
|
||||
def get_rate_blob_data(self, peer):
|
||||
return self.get_effective_min_blob_data_payment_rate()
|
||||
|
||||
def accept_rate_blob_data(self, peer, payment_rate):
|
||||
return payment_rate >= self.get_effective_min_blob_data_payment_rate()
|
||||
|
||||
def get_effective_min_blob_data_payment_rate(self):
|
||||
if self.min_blob_data_payment_rate is None:
|
||||
return self.base.min_blob_data_payment_rate
|
||||
return self.min_blob_data_payment_rate
|
||||
|
||||
def record_points_paid(self, amount):
|
||||
self.points_paid += amount
|
||||
|
||||
|
||||
class NegotiatedPaymentRateManager:
|
||||
def __init__(self, base, availability_tracker, generous):
|
||||
"""
|
||||
@param base: a BasePaymentRateManager
|
||||
@param availability_tracker: a BlobAvailabilityTracker
|
||||
@param rate: the min blob data payment rate
|
||||
"""
|
||||
|
||||
self.base = base
|
||||
self.min_blob_data_payment_rate = base.min_blob_data_payment_rate
|
||||
self.points_paid = 0.0
|
||||
self.blob_tracker = availability_tracker
|
||||
self.generous = generous
|
||||
self.strategy = get_default_strategy(
|
||||
self.blob_tracker, self.base.min_blob_data_payment_rate, generous
|
||||
)
|
||||
|
||||
def get_rate_blob_data(self, peer, blobs):
|
||||
response = self.strategy.make_offer(peer, blobs)
|
||||
return response.rate
|
||||
|
||||
def accept_rate_blob_data(self, peer, blobs, offer):
|
||||
offer = self.strategy.respond_to_offer(offer, peer, blobs)
|
||||
self.strategy.update_accepted_offers(peer, offer)
|
||||
return offer.is_accepted
|
||||
|
||||
def reply_to_offer(self, peer, blobs, offer):
|
||||
reply = self.strategy.respond_to_offer(offer, peer, blobs)
|
||||
self.strategy.update_accepted_offers(peer, reply)
|
||||
return reply
|
||||
|
||||
def get_rate_for_peer(self, peer):
|
||||
return self.strategy.accepted_offers.get(peer, False)
|
||||
|
||||
def record_points_paid(self, amount):
|
||||
self.points_paid += amount
|
||||
|
||||
def record_offer_reply(self, peer, offer):
|
||||
self.strategy.update_accepted_offers(peer, offer)
|
||||
|
||||
def price_limit_reached(self, peer):
|
||||
if peer in self.strategy.pending_sent_offers:
|
||||
offer = self.strategy.pending_sent_offers[peer]
|
||||
return (offer.is_too_low and
|
||||
round(Decimal.from_float(offer.rate), 5) >= round(self.strategy.max_rate, 5))
|
||||
return False
|
||||
|
||||
|
||||
class OnlyFreePaymentsManager:
|
||||
def __init__(self, **kwargs):
|
||||
"""
|
||||
A payment rate manager that will only ever accept and offer a rate of 0.0,
|
||||
Used for testing
|
||||
"""
|
||||
|
||||
self.base = BasePaymentRateManager(0.0, 0.0)
|
||||
self.points_paid = 0.0
|
||||
self.min_blob_data_payment_rate = 0.0
|
||||
self.generous = True
|
||||
self.strategy = OnlyFreeStrategy()
|
||||
|
||||
def get_rate_blob_data(self, peer, blobs):
|
||||
response = self.strategy.make_offer(peer, blobs)
|
||||
return response.rate
|
||||
|
||||
def accept_rate_blob_data(self, peer, blobs, offer):
|
||||
offer = self.strategy.respond_to_offer(offer, peer, blobs)
|
||||
self.strategy.update_accepted_offers(peer, offer)
|
||||
return offer.is_accepted
|
||||
|
||||
def reply_to_offer(self, peer, blobs, offer):
|
||||
reply = self.strategy.respond_to_offer(offer, peer, blobs)
|
||||
self.strategy.update_accepted_offers(peer, reply)
|
||||
return reply
|
||||
|
||||
def get_rate_for_peer(self, peer):
|
||||
return self.strategy.accepted_offers.get(peer, False)
|
||||
|
||||
def record_points_paid(self, amount):
|
||||
self.points_paid += amount
|
||||
|
||||
def record_offer_reply(self, peer, offer):
|
||||
self.strategy.update_accepted_offers(peer, offer)
|
||||
|
||||
def price_limit_reached(self, peer):
|
||||
if peer in self.strategy.pending_sent_offers:
|
||||
offer = self.strategy.pending_sent_offers[peer]
|
||||
if offer.rate > 0.0:
|
||||
return True
|
||||
return False
|
|
@ -1,468 +0,0 @@
|
|||
import string
|
||||
import json
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from binascii import unhexlify
|
||||
from twisted.internet import threads, defer
|
||||
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
from lbrynet.p2p.client.StandaloneBlobDownloader import StandaloneBlobDownloader
|
||||
from lbrynet.p2p.Error import UnknownStreamTypeError, InvalidStreamDescriptorError
|
||||
from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JSONBytesEncoder(json.JSONEncoder):
|
||||
def default(self, obj): # pylint: disable=E0202
|
||||
if isinstance(obj, bytes):
|
||||
return obj.decode()
|
||||
return super().default(obj)
|
||||
|
||||
|
||||
class StreamDescriptorReader:
|
||||
"""Classes which derive from this class read a stream descriptor file return
|
||||
a dictionary containing the fields in the file"""
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def _get_raw_data(self):
|
||||
"""This method must be overridden by subclasses. It should return a deferred
|
||||
which fires with the raw data in the stream descriptor"""
|
||||
|
||||
def get_info(self):
|
||||
"""Return the fields contained in the file"""
|
||||
d = self._get_raw_data()
|
||||
d.addCallback(json.loads)
|
||||
return d
|
||||
|
||||
|
||||
class PlainStreamDescriptorReader(StreamDescriptorReader):
|
||||
"""Read a stream descriptor file which is not a blob but a regular file"""
|
||||
def __init__(self, stream_descriptor_filename):
|
||||
super().__init__()
|
||||
self.stream_descriptor_filename = stream_descriptor_filename
|
||||
|
||||
def _get_raw_data(self):
|
||||
|
||||
def get_data():
|
||||
with open(self.stream_descriptor_filename) as file_handle:
|
||||
raw_data = file_handle.read()
|
||||
return raw_data
|
||||
|
||||
return threads.deferToThread(get_data)
|
||||
|
||||
|
||||
class BlobStreamDescriptorReader(StreamDescriptorReader):
|
||||
"""Read a stream descriptor file which is a blob"""
|
||||
def __init__(self, blob):
|
||||
super().__init__()
|
||||
self.blob = blob
|
||||
|
||||
def _get_raw_data(self):
|
||||
|
||||
def get_data():
|
||||
f = self.blob.open_for_reading()
|
||||
if f is not None:
|
||||
raw_data = f.read()
|
||||
f.close()
|
||||
return raw_data
|
||||
else:
|
||||
raise ValueError("Could not open the blob for reading")
|
||||
|
||||
return threads.deferToThread(get_data)
|
||||
|
||||
|
||||
class StreamDescriptorWriter:
|
||||
"""Classes which derive from this class write fields from a dictionary
|
||||
of fields to a stream descriptor"""
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def create_descriptor(self, sd_info):
|
||||
return self._write_stream_descriptor(
|
||||
json.dumps(sd_info, sort_keys=True).encode()
|
||||
)
|
||||
|
||||
def _write_stream_descriptor(self, raw_data):
|
||||
"""This method must be overridden by subclasses to write raw data to
|
||||
the stream descriptor
|
||||
"""
|
||||
|
||||
|
||||
class PlainStreamDescriptorWriter(StreamDescriptorWriter):
|
||||
def __init__(self, sd_file_name):
|
||||
super().__init__()
|
||||
self.sd_file_name = sd_file_name
|
||||
|
||||
def _write_stream_descriptor(self, raw_data):
|
||||
|
||||
def write_file():
|
||||
log.info("Writing the sd file to disk")
|
||||
with open(self.sd_file_name, 'w') as sd_file:
|
||||
sd_file.write(raw_data)
|
||||
return self.sd_file_name
|
||||
|
||||
return threads.deferToThread(write_file)
|
||||
|
||||
|
||||
class BlobStreamDescriptorWriter(StreamDescriptorWriter):
|
||||
def __init__(self, blob_manager):
|
||||
super().__init__()
|
||||
self.blob_manager = blob_manager
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _write_stream_descriptor(self, raw_data):
|
||||
log.debug("Creating the new blob for the stream descriptor")
|
||||
blob_creator = self.blob_manager.get_blob_creator()
|
||||
blob_creator.write(raw_data)
|
||||
log.debug("Wrote the data to the new blob")
|
||||
sd_hash = yield blob_creator.close()
|
||||
yield self.blob_manager.creator_finished(blob_creator, should_announce=True)
|
||||
defer.returnValue(sd_hash)
|
||||
|
||||
|
||||
class StreamMetadata:
|
||||
FROM_BLOB = 1
|
||||
FROM_PLAIN = 2
|
||||
|
||||
def __init__(self, validator, options, factories):
|
||||
self.validator = validator
|
||||
self.options = options
|
||||
self.factories = factories
|
||||
self.metadata_source = None
|
||||
self.source_blob_hash = None
|
||||
self.source_file = None
|
||||
|
||||
|
||||
class StreamDescriptorIdentifier:
|
||||
"""Tries to determine the type of stream described by the stream descriptor using the
|
||||
'stream_type' field. Keeps a list of StreamDescriptorValidators and StreamDownloaderFactorys
|
||||
and returns the appropriate ones based on the type of the stream descriptor given
|
||||
"""
|
||||
def __init__(self):
|
||||
# {stream_type: IStreamDescriptorValidator}
|
||||
self._sd_info_validators = {}
|
||||
# {stream_type: IStreamOptions
|
||||
self._stream_options = {}
|
||||
# {stream_type: [IStreamDownloaderFactory]}
|
||||
self._stream_downloader_factories = defaultdict(list)
|
||||
|
||||
def add_stream_type(self, stream_type, sd_info_validator, stream_options):
|
||||
"""This is how the StreamDescriptorIdentifier learns about new types of stream descriptors.
|
||||
|
||||
There can only be one StreamDescriptorValidator for each type of stream.
|
||||
|
||||
@param stream_type: A string representing the type of stream
|
||||
descriptor. This must be unique to this stream descriptor.
|
||||
|
||||
@param sd_info_validator: A class implementing the
|
||||
IStreamDescriptorValidator interface. This class's
|
||||
constructor will be passed the raw metadata in the stream
|
||||
descriptor file and its 'validate' method will then be
|
||||
called. If the validation step fails, an exception will be
|
||||
thrown, preventing the stream descriptor from being
|
||||
further processed.
|
||||
|
||||
@param stream_options: A class implementing the IStreamOptions
|
||||
interface. This class's constructor will be passed the
|
||||
sd_info_validator object containing the raw metadata from
|
||||
the stream descriptor file.
|
||||
|
||||
@return: None
|
||||
|
||||
"""
|
||||
self._sd_info_validators[stream_type] = sd_info_validator
|
||||
self._stream_options[stream_type] = stream_options
|
||||
|
||||
def add_stream_downloader_factory(self, stream_type, factory):
|
||||
"""Register a stream downloader factory with the StreamDescriptorIdentifier.
|
||||
|
||||
This is how the StreamDescriptorIdentifier determines what
|
||||
factories may be used to process different stream descriptor
|
||||
files. There must be at least one factory for each type of
|
||||
stream added via "add_stream_info_validator".
|
||||
|
||||
@param stream_type: A string representing the type of stream
|
||||
descriptor which the factory knows how to process.
|
||||
|
||||
@param factory: An object implementing the IStreamDownloaderFactory interface.
|
||||
|
||||
@return: None
|
||||
|
||||
"""
|
||||
self._stream_downloader_factories[stream_type].append(factory)
|
||||
|
||||
def _return_metadata(self, options_validator_factories, source_type, source):
|
||||
validator, options, factories = options_validator_factories
|
||||
m = StreamMetadata(validator, options, factories)
|
||||
m.metadata_source = source_type
|
||||
if source_type == StreamMetadata.FROM_BLOB:
|
||||
m.source_blob_hash = source
|
||||
if source_type == StreamMetadata.FROM_PLAIN:
|
||||
m.source_file = source
|
||||
return m
|
||||
|
||||
def get_metadata_for_sd_file(self, sd_path):
|
||||
sd_reader = PlainStreamDescriptorReader(sd_path)
|
||||
d = sd_reader.get_info()
|
||||
d.addCallback(self._return_options_and_validator_and_factories)
|
||||
d.addCallback(self._return_metadata, StreamMetadata.FROM_PLAIN, sd_path)
|
||||
return d
|
||||
|
||||
def get_metadata_for_sd_blob(self, sd_blob):
|
||||
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
||||
d = sd_reader.get_info()
|
||||
d.addCallback(self._return_options_and_validator_and_factories)
|
||||
d.addCallback(self._return_metadata, StreamMetadata.FROM_BLOB, sd_blob.blob_hash)
|
||||
return d
|
||||
|
||||
def _get_factories(self, stream_type):
|
||||
if not stream_type in self._stream_downloader_factories:
|
||||
raise UnknownStreamTypeError(stream_type)
|
||||
return self._stream_downloader_factories[stream_type]
|
||||
|
||||
def _get_validator(self, stream_type):
|
||||
if not stream_type in self._sd_info_validators:
|
||||
raise UnknownStreamTypeError(stream_type)
|
||||
return self._sd_info_validators[stream_type]
|
||||
|
||||
def _get_options(self, stream_type):
|
||||
if not stream_type in self._stream_downloader_factories:
|
||||
raise UnknownStreamTypeError(stream_type)
|
||||
return self._stream_options[stream_type]
|
||||
|
||||
def _return_options_and_validator_and_factories(self, sd_info):
|
||||
if not 'stream_type' in sd_info:
|
||||
raise InvalidStreamDescriptorError('No stream_type parameter in stream descriptor.')
|
||||
stream_type = sd_info['stream_type']
|
||||
validator = self._get_validator(stream_type)(sd_info)
|
||||
factories = [f for f in self._get_factories(stream_type) if f.can_download(validator)]
|
||||
|
||||
d = validator.validate()
|
||||
|
||||
def get_options():
|
||||
options = self._get_options(stream_type)
|
||||
return validator, options, factories
|
||||
|
||||
d.addCallback(lambda _: get_options())
|
||||
return d
|
||||
|
||||
|
||||
EncryptedFileStreamType = "lbryfile"
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def save_sd_info(blob_manager, sd_hash, sd_info):
|
||||
if not blob_manager.blobs.get(sd_hash) or not blob_manager.blobs[sd_hash].get_is_verified():
|
||||
descriptor_writer = BlobStreamDescriptorWriter(blob_manager)
|
||||
calculated_sd_hash = yield descriptor_writer.create_descriptor(sd_info)
|
||||
if calculated_sd_hash != sd_hash:
|
||||
raise InvalidStreamDescriptorError("%s does not match calculated %s" %
|
||||
(sd_hash, calculated_sd_hash))
|
||||
stream_hash = yield f2d(blob_manager.storage.get_stream_hash_for_sd_hash(sd_hash))
|
||||
if not stream_hash:
|
||||
log.debug("Saving info for %s", unhexlify(sd_info['stream_name']))
|
||||
stream_name = sd_info['stream_name']
|
||||
key = sd_info['key']
|
||||
stream_hash = sd_info['stream_hash']
|
||||
stream_blobs = sd_info['blobs']
|
||||
suggested_file_name = sd_info['suggested_file_name']
|
||||
yield f2d(blob_manager.storage.add_known_blobs(stream_blobs))
|
||||
yield f2d(blob_manager.storage.store_stream(
|
||||
stream_hash, sd_hash, stream_name, key, suggested_file_name, stream_blobs
|
||||
))
|
||||
defer.returnValue(stream_hash)
|
||||
|
||||
|
||||
def format_blobs(crypt_blob_infos):
|
||||
formatted_blobs = []
|
||||
for blob_info in crypt_blob_infos:
|
||||
blob = {}
|
||||
if blob_info.length != 0:
|
||||
blob['blob_hash'] = blob_info.blob_hash
|
||||
blob['blob_num'] = blob_info.blob_num
|
||||
blob['iv'] = blob_info.iv
|
||||
blob['length'] = blob_info.length
|
||||
formatted_blobs.append(blob)
|
||||
return formatted_blobs
|
||||
|
||||
|
||||
def format_sd_info(stream_type, stream_name, key, suggested_file_name, stream_hash, blobs):
|
||||
return {
|
||||
"stream_type": stream_type,
|
||||
"stream_name": stream_name,
|
||||
"key": key,
|
||||
"suggested_file_name": suggested_file_name,
|
||||
"stream_hash": stream_hash,
|
||||
"blobs": blobs
|
||||
}
|
||||
|
||||
|
||||
async def get_sd_info(storage, stream_hash, include_blobs):
|
||||
"""
|
||||
Get an sd info dictionary from storage
|
||||
|
||||
:param storage: (SQLiteStorage) storage instance
|
||||
:param stream_hash: (str) stream hash
|
||||
:param include_blobs: (bool) include stream blob infos
|
||||
|
||||
:return: {
|
||||
"stream_type": "lbryfile",
|
||||
"stream_name": <hex encoded stream name>,
|
||||
"key": <stream key>,
|
||||
"suggested_file_name": <hex encoded suggested file name>,
|
||||
"stream_hash": <stream hash>,
|
||||
"blobs": [
|
||||
{
|
||||
"blob_hash": <head blob_hash>,
|
||||
"blob_num": 0,
|
||||
"iv": <iv>,
|
||||
"length": <head blob length>
|
||||
}, ...
|
||||
{
|
||||
"blob_num": <stream length>,
|
||||
"iv": <iv>,
|
||||
"length": 0
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
stream_info = await storage.get_stream_info(stream_hash)
|
||||
blobs = []
|
||||
if include_blobs:
|
||||
blobs = await storage.get_blobs_for_stream(stream_hash)
|
||||
return format_sd_info(
|
||||
EncryptedFileStreamType, stream_info[0], stream_info[1],
|
||||
stream_info[2], stream_hash, format_blobs(blobs)
|
||||
)
|
||||
|
||||
|
||||
def get_blob_hashsum(b):
|
||||
length = b['length']
|
||||
if length != 0:
|
||||
blob_hash = b['blob_hash']
|
||||
else:
|
||||
blob_hash = None
|
||||
blob_num = b['blob_num']
|
||||
iv = b['iv']
|
||||
blob_hashsum = get_lbry_hash_obj()
|
||||
if length != 0:
|
||||
blob_hashsum.update(blob_hash.encode())
|
||||
blob_hashsum.update(str(blob_num).encode())
|
||||
blob_hashsum.update(iv.encode())
|
||||
blob_hashsum.update(str(length).encode())
|
||||
return blob_hashsum.digest()
|
||||
|
||||
|
||||
def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos):
|
||||
h = get_lbry_hash_obj()
|
||||
h.update(hex_stream_name.encode())
|
||||
h.update(key.encode())
|
||||
h.update(hex_suggested_file_name.encode())
|
||||
blobs_hashsum = get_lbry_hash_obj()
|
||||
for blob in blob_infos:
|
||||
blobs_hashsum.update(get_blob_hashsum(blob))
|
||||
h.update(blobs_hashsum.digest())
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def verify_hex(text, field_name):
|
||||
if not set(text).issubset(set(string.hexdigits)):
|
||||
raise InvalidStreamDescriptorError("%s is not a hex-encoded string" % field_name)
|
||||
|
||||
|
||||
def validate_descriptor(stream_info):
|
||||
try:
|
||||
hex_stream_name = stream_info['stream_name']
|
||||
key = stream_info['key']
|
||||
hex_suggested_file_name = stream_info['suggested_file_name']
|
||||
stream_hash = stream_info['stream_hash']
|
||||
blobs = stream_info['blobs']
|
||||
except KeyError as e:
|
||||
raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0]))
|
||||
if stream_info['blobs'][-1]['length'] != 0:
|
||||
raise InvalidStreamDescriptorError("Does not end with a zero-length blob.")
|
||||
if any([blob_info['length'] == 0 for blob_info in stream_info['blobs'][:-1]]):
|
||||
raise InvalidStreamDescriptorError("Contains zero-length data blob")
|
||||
if 'blob_hash' in stream_info['blobs'][-1]:
|
||||
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
|
||||
|
||||
verify_hex(key, "key")
|
||||
verify_hex(hex_suggested_file_name, "suggested file name")
|
||||
verify_hex(stream_hash, "stream_hash")
|
||||
|
||||
calculated_stream_hash = get_stream_hash(
|
||||
hex_stream_name, key, hex_suggested_file_name, blobs
|
||||
)
|
||||
if calculated_stream_hash != stream_hash:
|
||||
raise InvalidStreamDescriptorError("Stream hash does not match stream metadata")
|
||||
return True
|
||||
|
||||
|
||||
class EncryptedFileStreamDescriptorValidator:
|
||||
def __init__(self, raw_info):
|
||||
self.raw_info = raw_info
|
||||
|
||||
def validate(self):
|
||||
return defer.succeed(validate_descriptor(self.raw_info))
|
||||
|
||||
def info_to_show(self):
|
||||
info = []
|
||||
info.append(("stream_name", unhexlify(self.raw_info.get("stream_name"))))
|
||||
size_so_far = 0
|
||||
for blob_info in self.raw_info.get("blobs", []):
|
||||
size_so_far += int(blob_info['length'])
|
||||
info.append(("stream_size", str(self.get_length_of_stream())))
|
||||
suggested_file_name = self.raw_info.get("suggested_file_name", None)
|
||||
if suggested_file_name is not None:
|
||||
suggested_file_name = unhexlify(suggested_file_name)
|
||||
info.append(("suggested_file_name", suggested_file_name))
|
||||
return info
|
||||
|
||||
def get_length_of_stream(self):
|
||||
size_so_far = 0
|
||||
for blob_info in self.raw_info.get("blobs", []):
|
||||
size_so_far += int(blob_info['length'])
|
||||
return size_so_far
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def download_sd_blob(conf, blob_hash, blob_manager, peer_finder, rate_limiter, payment_rate_manager, wallet,
|
||||
timeout=None, download_mirrors=None):
|
||||
"""
|
||||
Downloads a single blob from the network
|
||||
|
||||
@param session:
|
||||
|
||||
@param blob_hash:
|
||||
|
||||
@param payment_rate_manager:
|
||||
|
||||
@return: An object of type HashBlob
|
||||
"""
|
||||
|
||||
downloader = StandaloneBlobDownloader(conf,
|
||||
blob_hash,
|
||||
blob_manager,
|
||||
peer_finder,
|
||||
rate_limiter,
|
||||
payment_rate_manager,
|
||||
wallet,
|
||||
timeout)
|
||||
mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash], retry=False)
|
||||
mirror.start()
|
||||
sd_blob = yield downloader.download()
|
||||
mirror.stop()
|
||||
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
||||
sd_info = yield sd_reader.get_info()
|
||||
try:
|
||||
validate_descriptor(sd_info)
|
||||
except InvalidStreamDescriptorError as err:
|
||||
yield blob_manager.delete_blobs([blob_hash])
|
||||
raise err
|
||||
raw_sd = yield sd_reader._get_raw_data()
|
||||
yield f2d(blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)))
|
||||
yield save_sd_info(blob_manager, sd_blob.blob_hash, sd_info)
|
||||
defer.returnValue(sd_blob)
|
|
@ -1,7 +0,0 @@
|
|||
"""
|
||||
Classes and functions which can be used by any application wishing to make use of the LBRY network.
|
||||
|
||||
This includes classes for connecting to other peers and downloading blobs from them, listening for
|
||||
connections from peers and responding to their requests, managing locally stored blobs, sending
|
||||
and receiving payments, and locating peers in the DHT.
|
||||
"""
|
|
@ -1,227 +0,0 @@
|
|||
import random
|
||||
import logging
|
||||
from twisted.internet import defer, reactor
|
||||
from lbrynet import utils
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.p2p.client.ClientProtocol import ClientProtocolFactory
|
||||
from lbrynet.p2p.Error import InsufficientFundsError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PeerConnectionHandler:
|
||||
def __init__(self, request_creators, factory):
|
||||
self.request_creators = request_creators
|
||||
self.factory = factory
|
||||
self.connection = None
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
#implements(interfaces.IConnectionManager)
|
||||
MANAGE_CALL_INTERVAL_SEC = 5
|
||||
TCP_CONNECT_TIMEOUT = 15
|
||||
|
||||
def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators):
|
||||
|
||||
self.conf: Config = downloader.conf
|
||||
self.seek_head_blob_first = self.conf.seek_head_blob_first
|
||||
self.max_connections_per_stream = self.conf.max_connections_per_stream
|
||||
|
||||
self.downloader = downloader
|
||||
self.rate_limiter = rate_limiter
|
||||
self._primary_request_creators = primary_request_creators
|
||||
self._secondary_request_creators = secondary_request_creators
|
||||
self._peer_connections = {} # {Peer: PeerConnectionHandler}
|
||||
self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)}
|
||||
self._next_manage_call = None
|
||||
# a deferred that gets fired when a _manage call is set
|
||||
self._manage_deferred = None
|
||||
self.stopped = True
|
||||
log.debug("%s initialized", self._get_log_name())
|
||||
|
||||
# this identifies what the connection manager is for,
|
||||
# used for logging purposes only
|
||||
def _get_log_name(self):
|
||||
out = 'Connection Manager Unknown'
|
||||
if hasattr(self.downloader, 'stream_name'):
|
||||
out = 'Connection Manager '+self.downloader.stream_name
|
||||
elif hasattr(self.downloader, 'blob_hash'):
|
||||
out = 'Connection Manager '+self.downloader.blob_hash
|
||||
return out
|
||||
|
||||
def _start(self):
|
||||
self.stopped = False
|
||||
if self._next_manage_call is not None and self._next_manage_call.active() is True:
|
||||
self._next_manage_call.cancel()
|
||||
|
||||
def start(self):
|
||||
log.debug("%s starting", self._get_log_name())
|
||||
self._start()
|
||||
self._next_manage_call = utils.call_later(0, self.manage)
|
||||
return defer.succeed(True)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self):
|
||||
log.debug("%s stopping", self._get_log_name())
|
||||
self.stopped = True
|
||||
# wait for the current manage call to finish
|
||||
if self._manage_deferred:
|
||||
yield self._manage_deferred
|
||||
# in case we stopped between manage calls, cancel the next one
|
||||
if self._next_manage_call and self._next_manage_call.active():
|
||||
self._next_manage_call.cancel()
|
||||
self._next_manage_call = None
|
||||
yield self._close_peers()
|
||||
|
||||
def num_peer_connections(self):
|
||||
return len(self._peer_connections)
|
||||
|
||||
def _close_peers(self):
|
||||
def disconnect_peer(p):
|
||||
d = defer.Deferred()
|
||||
self._connections_closing[p] = d
|
||||
self._peer_connections[p].connection.disconnect()
|
||||
if p in self._peer_connections:
|
||||
del self._peer_connections[p]
|
||||
return d
|
||||
|
||||
def close_connection(p):
|
||||
log.debug("%s Abruptly closing a connection to %s due to downloading being paused",
|
||||
self._get_log_name(), p)
|
||||
if self._peer_connections[p].factory.p is not None:
|
||||
d = self._peer_connections[p].factory.p.cancel_requests()
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
d.addBoth(lambda _: disconnect_peer(p))
|
||||
return d
|
||||
|
||||
# fixme: stop modifying dict during iteration
|
||||
closing_deferreds = [close_connection(peer) for peer in list(self._peer_connections)]
|
||||
return defer.DeferredList(closing_deferreds)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_next_request(self, peer, protocol):
|
||||
log.debug("%s Trying to get the next request for peer %s", self._get_log_name(), peer)
|
||||
if not peer in self._peer_connections or self.stopped is True:
|
||||
log.debug("%s The peer %s has already been told to shut down.",
|
||||
self._get_log_name(), peer)
|
||||
defer.returnValue(False)
|
||||
requests = yield self._send_primary_requests(peer, protocol)
|
||||
have_request = any(r[1] for r in requests if r[0] is True)
|
||||
if have_request:
|
||||
yield self._send_secondary_requests(peer, protocol)
|
||||
defer.returnValue(have_request)
|
||||
|
||||
def _send_primary_requests(self, peer, protocol):
|
||||
def handle_error(err):
|
||||
err.trap(InsufficientFundsError)
|
||||
self.downloader.insufficient_funds(err)
|
||||
return False
|
||||
|
||||
def check_if_request_sent(request_sent, request_creator):
|
||||
if peer not in self._peer_connections:
|
||||
# This can happen if the connection is told to close
|
||||
return False
|
||||
if request_sent is False:
|
||||
if request_creator in self._peer_connections[peer].request_creators:
|
||||
self._peer_connections[peer].request_creators.remove(request_creator)
|
||||
else:
|
||||
if not request_creator in self._peer_connections[peer].request_creators:
|
||||
self._peer_connections[peer].request_creators.append(request_creator)
|
||||
return request_sent
|
||||
|
||||
ds = []
|
||||
for p_r_c in self._primary_request_creators:
|
||||
d = p_r_c.send_next_request(peer, protocol)
|
||||
d.addErrback(handle_error)
|
||||
d.addCallback(check_if_request_sent, p_r_c)
|
||||
ds.append(d)
|
||||
return defer.DeferredList(ds, fireOnOneErrback=True)
|
||||
|
||||
def _send_secondary_requests(self, peer, protocol):
|
||||
ds = [
|
||||
s_r_c.send_next_request(peer, protocol)
|
||||
for s_r_c in self._secondary_request_creators
|
||||
]
|
||||
return defer.DeferredList(ds)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def manage(self, schedule_next_call=True):
|
||||
self._manage_deferred = defer.Deferred()
|
||||
if len(self._peer_connections) < self.max_connections_per_stream:
|
||||
log.debug("%s have %d connections, looking for %d",
|
||||
self._get_log_name(), len(self._peer_connections),
|
||||
self.max_connections_per_stream)
|
||||
peers = yield self._get_new_peers()
|
||||
for peer in peers:
|
||||
self._connect_to_peer(peer)
|
||||
self._manage_deferred.callback(None)
|
||||
self._manage_deferred = None
|
||||
if not self.stopped and schedule_next_call:
|
||||
self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage)
|
||||
|
||||
def return_shuffled_peers_not_connected_to(self, peers, new_conns_needed):
|
||||
out = [peer for peer in peers if peer not in self._peer_connections]
|
||||
random.shuffle(out)
|
||||
return out[0:new_conns_needed]
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_peers(self):
|
||||
new_conns_needed = self.max_connections_per_stream - len(self._peer_connections)
|
||||
if new_conns_needed < 1:
|
||||
defer.returnValue([])
|
||||
# we always get the peer from the first request creator
|
||||
# must be a type BlobRequester...
|
||||
request_creator = self._primary_request_creators[0]
|
||||
log.debug("%s Trying to get a new peer to connect to", self._get_log_name())
|
||||
|
||||
# find peers for the head blob if configured to do so
|
||||
if self.seek_head_blob_first:
|
||||
try:
|
||||
peers = yield request_creator.get_new_peers_for_head_blob()
|
||||
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||
except KeyError:
|
||||
log.warning("%s does not have a head blob", self._get_log_name())
|
||||
peers = []
|
||||
else:
|
||||
peers = []
|
||||
|
||||
# we didn't find any new peers on the head blob,
|
||||
# we have to look for the first unavailable blob
|
||||
if not peers:
|
||||
peers = yield request_creator.get_new_peers_for_next_unavailable()
|
||||
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||
|
||||
log.debug("%s Got a list of peers to choose from: %s",
|
||||
self._get_log_name(), peers)
|
||||
log.debug("%s Current connections: %s",
|
||||
self._get_log_name(), self._peer_connections.keys())
|
||||
log.debug("%s List of connection states: %s", self._get_log_name(),
|
||||
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
||||
defer.returnValue(peers)
|
||||
|
||||
def _connect_to_peer(self, peer):
|
||||
if self.stopped:
|
||||
return
|
||||
|
||||
log.debug("%s Trying to connect to %s", self._get_log_name(), peer)
|
||||
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
|
||||
factory.connection_was_made_deferred.addCallback(
|
||||
lambda c_was_made: self._peer_disconnected(c_was_made, peer))
|
||||
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
||||
factory)
|
||||
connection = reactor.connectTCP(peer.host, peer.port, factory,
|
||||
timeout=self.TCP_CONNECT_TIMEOUT)
|
||||
self._peer_connections[peer].connection = connection
|
||||
|
||||
def _peer_disconnected(self, connection_was_made, peer):
|
||||
log.debug("%s protocol disconnected for %s",
|
||||
self._get_log_name(), peer)
|
||||
if peer in self._peer_connections:
|
||||
del self._peer_connections[peer]
|
||||
if peer in self._connections_closing:
|
||||
d = self._connections_closing[peer]
|
||||
del self._connections_closing[peer]
|
||||
d.callback(True)
|
||||
return connection_was_made
|
|
@ -1,83 +0,0 @@
|
|||
import logging
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DownloadManager:
|
||||
#implements(interfaces.IDownloadManager)
|
||||
|
||||
def __init__(self, blob_manager):
|
||||
self.blob_manager = blob_manager
|
||||
self.blob_info_finder = None
|
||||
self.progress_manager = None
|
||||
self.blob_handler = None
|
||||
self.connection_manager = None
|
||||
self.blobs = {}
|
||||
self.blob_infos = {}
|
||||
|
||||
######### IDownloadManager #########
|
||||
|
||||
def start_downloading(self):
|
||||
d = self.blob_info_finder.get_initial_blobs()
|
||||
log.debug("Requested the initial blobs from the info finder")
|
||||
d.addCallback(self.add_blobs_to_download)
|
||||
d.addCallback(lambda _: self.resume_downloading())
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def resume_downloading(self):
|
||||
yield self.connection_manager.start()
|
||||
yield self.progress_manager.start()
|
||||
return True
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop_downloading(self):
|
||||
yield self.progress_manager.stop()
|
||||
yield self.connection_manager.stop()
|
||||
defer.returnValue(True)
|
||||
|
||||
def add_blobs_to_download(self, blob_infos):
|
||||
log.debug("Adding %s blobs to blobs", len(blob_infos))
|
||||
for blob_info in blob_infos:
|
||||
if not blob_info.blob_num in self.blobs:
|
||||
self.blob_infos[blob_info.blob_num] = blob_info
|
||||
log.debug("Trying to get the blob associated with blob hash %s", blob_info.blob_hash)
|
||||
blob = self.blob_manager.get_blob(blob_info.blob_hash, blob_info.length)
|
||||
self.blobs[blob_info.blob_num] = blob
|
||||
log.debug("Added blob (hash: %s, number %s) to the list", blob.blob_hash, blob_info.blob_num)
|
||||
|
||||
def stream_position(self):
|
||||
return self.progress_manager.stream_position()
|
||||
|
||||
def needed_blobs(self):
|
||||
return self.progress_manager.needed_blobs()
|
||||
|
||||
def final_blob_num(self):
|
||||
return self.blob_info_finder.final_blob_num()
|
||||
|
||||
def handle_blob(self, blob_num):
|
||||
return self.blob_handler.handle_blob(self.blobs[blob_num], self.blob_infos[blob_num])
|
||||
|
||||
def calculate_total_bytes(self):
|
||||
return sum([bi.length for bi in self.blob_infos.values()])
|
||||
|
||||
def calculate_bytes_left_to_output(self):
|
||||
if not self.blobs:
|
||||
return self.calculate_total_bytes()
|
||||
else:
|
||||
to_be_outputted = [
|
||||
b for n, b in self.blobs.items()
|
||||
if n >= self.progress_manager.last_blob_outputted
|
||||
]
|
||||
return sum([b.length for b in to_be_outputted if b.length is not None])
|
||||
|
||||
def calculate_bytes_left_to_download(self):
|
||||
if not self.blobs:
|
||||
return self.calculate_total_bytes()
|
||||
else:
|
||||
return sum([b.length for b in self.needed_blobs() if b.length is not None])
|
||||
|
||||
def get_head_blob_hash(self):
|
||||
return self.blobs[0].blob_hash
|
|
@ -1,141 +0,0 @@
|
|||
import logging
|
||||
from twisted.internet import defer, task
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FullStreamProgressManager:
|
||||
def __init__(self, finished_callback, blob_manager, download_manager,
|
||||
delete_blob_after_finished: bool = False, reactor: task.Clock = None):
|
||||
if not reactor:
|
||||
from twisted.internet import reactor
|
||||
self.reactor = reactor
|
||||
self.finished_callback = finished_callback
|
||||
self.blob_manager = blob_manager
|
||||
self.delete_blob_after_finished = delete_blob_after_finished
|
||||
self.download_manager = download_manager
|
||||
self.provided_blob_nums = []
|
||||
self.last_blob_outputted = -1
|
||||
self.stopped = True
|
||||
self._next_try_to_output_call = None
|
||||
self.outputting_d = None
|
||||
self.wrote_first_data = defer.Deferred()
|
||||
|
||||
def start(self):
|
||||
self.stopped = False
|
||||
self._next_try_to_output_call = self.reactor.callLater(0, self._try_to_output)
|
||||
return defer.succeed(True)
|
||||
|
||||
def stop(self):
|
||||
self.stopped = True
|
||||
if self._next_try_to_output_call is not None and self._next_try_to_output_call.active():
|
||||
self._next_try_to_output_call.cancel()
|
||||
self._next_try_to_output_call = None
|
||||
return self._stop_outputting()
|
||||
|
||||
# def blob_downloaded(self, blob, blob_num):
|
||||
# if self.outputting_d is None:
|
||||
# self._output_loop()
|
||||
|
||||
def stream_position(self):
|
||||
blobs = self.download_manager.blobs
|
||||
if not blobs:
|
||||
return 0
|
||||
else:
|
||||
for i in range(max(blobs.keys())):
|
||||
if self._done(i, blobs):
|
||||
return i
|
||||
return max(blobs.keys()) + 1
|
||||
|
||||
def needed_blobs(self):
|
||||
blobs = self.download_manager.blobs
|
||||
return [
|
||||
b for n, b in blobs.items()
|
||||
if not b.get_is_verified() and not n in self.provided_blob_nums
|
||||
]
|
||||
|
||||
def _finished_outputting(self):
|
||||
self.finished_callback(True)
|
||||
|
||||
def _try_to_output(self):
|
||||
self._next_try_to_output_call = self.reactor.callLater(1, self._try_to_output)
|
||||
if self.outputting_d is None:
|
||||
self._output_loop()
|
||||
|
||||
def _stop_outputting(self):
|
||||
if self.outputting_d is not None:
|
||||
return self.outputting_d
|
||||
return defer.succeed(None)
|
||||
|
||||
def _finished_with_blob(self, blob_num: int) -> None:
|
||||
if blob_num == 0 and not self.wrote_first_data.called:
|
||||
self.wrote_first_data.callback(True)
|
||||
log.debug("In _finished_with_blob, blob_num = %s", str(blob_num))
|
||||
if self.delete_blob_after_finished is True:
|
||||
log.debug("delete_blob_after_finished is True")
|
||||
blobs = self.download_manager.blobs
|
||||
if blob_num in blobs:
|
||||
log.debug("Telling the blob manager, %s, to delete blob %s",
|
||||
self.blob_manager, blobs[blob_num].blob_hash)
|
||||
self.blob_manager.delete_blobs([blobs[blob_num].blob_hash])
|
||||
else:
|
||||
log.debug("Blob number %s was not in blobs", str(blob_num))
|
||||
else:
|
||||
log.debug("delete_blob_after_finished is False")
|
||||
|
||||
def _done(self, i: int, blobs: list) -> bool:
|
||||
"""Return true if `i` is a blob number we don't have"""
|
||||
return (
|
||||
i not in blobs or
|
||||
(
|
||||
not blobs[i].get_is_verified() and
|
||||
i not in self.provided_blob_nums
|
||||
)
|
||||
)
|
||||
|
||||
def _output_loop(self):
|
||||
if self.stopped:
|
||||
if self.outputting_d is not None:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
return
|
||||
|
||||
if self.outputting_d is None:
|
||||
self.outputting_d = defer.Deferred()
|
||||
blobs = self.download_manager.blobs
|
||||
|
||||
def finished_outputting_blob():
|
||||
self.last_blob_outputted += 1
|
||||
|
||||
def check_if_finished():
|
||||
final_blob_num = self.download_manager.final_blob_num()
|
||||
if final_blob_num is not None and final_blob_num == self.last_blob_outputted:
|
||||
self._finished_outputting()
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
else:
|
||||
self.reactor.callLater(0, self._output_loop)
|
||||
|
||||
current_blob_num = self.last_blob_outputted + 1
|
||||
|
||||
if current_blob_num in blobs and blobs[current_blob_num].get_is_verified():
|
||||
log.debug("Outputting blob %s", str(self.last_blob_outputted + 1))
|
||||
self.provided_blob_nums.append(self.last_blob_outputted + 1)
|
||||
d = self.download_manager.handle_blob(self.last_blob_outputted + 1)
|
||||
d.addCallback(lambda _: finished_outputting_blob())
|
||||
d.addCallback(lambda _: self._finished_with_blob(current_blob_num))
|
||||
d.addCallback(lambda _: check_if_finished())
|
||||
|
||||
def log_error(err):
|
||||
log.warning("Error outputting blob %s: %s", blobs[current_blob_num].blob_hash,
|
||||
err.getErrorMessage())
|
||||
if self.outputting_d is not None and not self.outputting_d.called:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
self.stop()
|
||||
|
||||
d.addErrback(log_error)
|
||||
else:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
0
lbrynet/stream/__init__.py
Normal file
0
lbrynet/stream/__init__.py
Normal file
101
lbrynet/stream/assembler.py
Normal file
101
lbrynet/stream/assembler.py
Normal file
|
@ -0,0 +1,101 @@
|
|||
import os
|
||||
import binascii
|
||||
import logging
|
||||
import typing
|
||||
import asyncio
|
||||
from lbrynet.blob import MAX_BLOB_SIZE
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.blob.blob_info import BlobInfo
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
|
||||
base_name, ext = os.path.splitext(file_name)
|
||||
i = 0
|
||||
while os.path.isfile(os.path.join(download_directory, file_name)):
|
||||
i += 1
|
||||
file_name = "%s_%i%s" % (base_name, i, ext)
|
||||
|
||||
return os.path.join(download_directory, file_name)
|
||||
|
||||
|
||||
async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str:
|
||||
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
|
||||
|
||||
|
||||
class StreamAssembler:
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', sd_hash: str):
|
||||
self.loop = loop
|
||||
self.blob_manager = blob_manager
|
||||
self.sd_hash = sd_hash
|
||||
self.sd_blob: 'BlobFile' = None
|
||||
self.descriptor: StreamDescriptor = None
|
||||
self.got_descriptor = asyncio.Event(loop=self.loop)
|
||||
self.wrote_bytes_event = asyncio.Event(loop=self.loop)
|
||||
self.stream_finished_event = asyncio.Event(loop=self.loop)
|
||||
self.output_path = ''
|
||||
self.stream_handle = None
|
||||
self.written_bytes: int = 0
|
||||
|
||||
async def _decrypt_blob(self, blob: 'BlobFile', blob_info: 'BlobInfo', key: str):
|
||||
offset = blob_info.blob_num * (MAX_BLOB_SIZE - 1)
|
||||
|
||||
def _decrypt_and_write():
|
||||
if self.stream_handle.closed:
|
||||
return False
|
||||
self.stream_handle.seek(offset)
|
||||
_decrypted = blob.decrypt(
|
||||
binascii.unhexlify(key), binascii.unhexlify(blob_info.iv.encode())
|
||||
)
|
||||
self.stream_handle.write(_decrypted)
|
||||
self.stream_handle.flush()
|
||||
self.written_bytes += len(_decrypted)
|
||||
return True
|
||||
|
||||
decrypted = await self.loop.run_in_executor(None, _decrypt_and_write)
|
||||
if decrypted:
|
||||
log.info("decrypted %s", blob.blob_hash[:8])
|
||||
return
|
||||
|
||||
async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None):
|
||||
if not os.path.isdir(output_dir):
|
||||
raise OSError(f"output directory does not exist: '{output_dir}' '{output_file_name}'")
|
||||
self.sd_blob = await self.get_blob(self.sd_hash)
|
||||
await self.blob_manager.blob_completed(self.sd_blob)
|
||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
|
||||
self.sd_blob)
|
||||
if not self.got_descriptor.is_set():
|
||||
self.got_descriptor.set()
|
||||
self.output_path = await get_next_available_file_name(self.loop, output_dir,
|
||||
output_file_name or self.descriptor.suggested_file_name)
|
||||
|
||||
self.stream_handle = open(self.output_path, 'wb')
|
||||
await self.blob_manager.storage.store_stream(
|
||||
self.sd_blob, self.descriptor
|
||||
)
|
||||
try:
|
||||
for blob_info in self.descriptor.blobs[:-1]:
|
||||
while True:
|
||||
try:
|
||||
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
|
||||
await self._decrypt_blob(blob, blob_info, self.descriptor.key)
|
||||
break
|
||||
except ValueError as err:
|
||||
log.error("failed to decrypt blob %s for stream %s - %s", blob_info.blob_hash,
|
||||
self.descriptor.sd_hash, str(err))
|
||||
continue
|
||||
if not self.wrote_bytes_event.is_set():
|
||||
self.wrote_bytes_event.set()
|
||||
self.stream_finished_event.set()
|
||||
finally:
|
||||
self.stream_handle.close()
|
||||
|
||||
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||
f = asyncio.Future(loop=self.loop)
|
||||
f.set_result(self.blob_manager.get_blob(blob_hash, length))
|
||||
return await f
|
186
lbrynet/stream/descriptor.py
Normal file
186
lbrynet/stream/descriptor.py
Normal file
|
@ -0,0 +1,186 @@
|
|||
import os
|
||||
import json
|
||||
import binascii
|
||||
import logging
|
||||
import typing
|
||||
import asyncio
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from lbrynet.blob import MAX_BLOB_SIZE
|
||||
from lbrynet.blob.blob_info import BlobInfo
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
from lbrynet.error import InvalidStreamDescriptorError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def format_sd_info(stream_name: str, key: str, suggested_file_name: str, stream_hash: str,
|
||||
blobs: typing.List[typing.Dict]) -> typing.Dict:
|
||||
return {
|
||||
"stream_type": "lbryfile",
|
||||
"stream_name": stream_name,
|
||||
"key": key,
|
||||
"suggested_file_name": suggested_file_name,
|
||||
"stream_hash": stream_hash,
|
||||
"blobs": blobs
|
||||
}
|
||||
|
||||
|
||||
def random_iv_generator() -> typing.Generator[bytes, None, None]:
|
||||
while 1:
|
||||
yield os.urandom(AES.block_size // 8)
|
||||
|
||||
|
||||
def file_reader(file_path: str):
|
||||
length = int(os.stat(file_path).st_size)
|
||||
offset = 0
|
||||
|
||||
with open(file_path, 'rb') as stream_file:
|
||||
while offset < length:
|
||||
bytes_to_read = min((length - offset), MAX_BLOB_SIZE - 1)
|
||||
if not bytes_to_read:
|
||||
break
|
||||
blob_bytes = stream_file.read(bytes_to_read)
|
||||
yield blob_bytes
|
||||
offset += bytes_to_read
|
||||
|
||||
|
||||
class StreamDescriptor:
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, stream_name: str, key: str,
|
||||
suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None,
|
||||
sd_hash: typing.Optional[str] = None):
|
||||
self.loop = loop
|
||||
self.blob_dir = blob_dir
|
||||
self.stream_name = stream_name
|
||||
self.key = key
|
||||
self.suggested_file_name = suggested_file_name
|
||||
self.blobs = blobs
|
||||
self.stream_hash = stream_hash or self.get_stream_hash()
|
||||
self.sd_hash = sd_hash
|
||||
|
||||
def get_stream_hash(self) -> str:
|
||||
return self.calculate_stream_hash(
|
||||
binascii.hexlify(self.stream_name.encode()), self.key.encode(),
|
||||
binascii.hexlify(self.suggested_file_name.encode()),
|
||||
[blob_info.as_dict() for blob_info in self.blobs]
|
||||
)
|
||||
|
||||
def calculate_sd_hash(self) -> str:
|
||||
h = get_lbry_hash_obj()
|
||||
h.update(self.as_json())
|
||||
return h.hexdigest()
|
||||
|
||||
def as_json(self) -> bytes:
|
||||
return json.dumps(
|
||||
format_sd_info(binascii.hexlify(self.stream_name.encode()).decode(), self.key,
|
||||
binascii.hexlify(self.suggested_file_name.encode()).decode(),
|
||||
self.stream_hash,
|
||||
[blob_info.as_dict() for blob_info in self.blobs]), sort_keys=True
|
||||
).encode()
|
||||
|
||||
async def make_sd_blob(self):
|
||||
sd_hash = self.calculate_sd_hash()
|
||||
sd_data = self.as_json()
|
||||
sd_blob = BlobFile(self.loop, self.blob_dir, sd_hash, len(sd_data))
|
||||
if not sd_blob.get_is_verified():
|
||||
writer = sd_blob.open_for_writing()
|
||||
writer.write(sd_data)
|
||||
await sd_blob.verified.wait()
|
||||
await sd_blob.close()
|
||||
return sd_blob
|
||||
|
||||
@classmethod
|
||||
def _from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str,
|
||||
blob: BlobFile) -> 'StreamDescriptor':
|
||||
assert os.path.isfile(blob.file_path)
|
||||
with open(blob.file_path, 'rb') as f:
|
||||
json_bytes = f.read()
|
||||
decoded = json.loads(json_bytes.decode())
|
||||
if decoded['blobs'][-1]['length'] != 0:
|
||||
raise InvalidStreamDescriptorError("Does not end with a zero-length blob.")
|
||||
if any([blob_info['length'] == 0 for blob_info in decoded['blobs'][:-1]]):
|
||||
raise InvalidStreamDescriptorError("Contains zero-length data blob")
|
||||
if 'blob_hash' in decoded['blobs'][-1]:
|
||||
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
|
||||
descriptor = cls(
|
||||
loop, blob_dir,
|
||||
binascii.unhexlify(decoded['stream_name']).decode(),
|
||||
decoded['key'],
|
||||
binascii.unhexlify(decoded['suggested_file_name']).decode(),
|
||||
[BlobInfo(info['blob_num'], info['length'], info['iv'], info.get('blob_hash'))
|
||||
for info in decoded['blobs']],
|
||||
decoded['stream_hash'],
|
||||
blob.blob_hash
|
||||
)
|
||||
if descriptor.get_stream_hash() != decoded['stream_hash']:
|
||||
raise InvalidStreamDescriptorError("Stream hash does not match stream metadata")
|
||||
return descriptor
|
||||
|
||||
@classmethod
|
||||
async def from_stream_descriptor_blob(cls, loop: asyncio.BaseEventLoop, blob_dir: str,
|
||||
blob: BlobFile) -> 'StreamDescriptor':
|
||||
return await loop.run_in_executor(None, lambda: cls._from_stream_descriptor_blob(loop, blob_dir, blob))
|
||||
|
||||
@staticmethod
|
||||
def get_blob_hashsum(b: typing.Dict):
|
||||
length = b['length']
|
||||
if length != 0:
|
||||
blob_hash = b['blob_hash']
|
||||
else:
|
||||
blob_hash = None
|
||||
blob_num = b['blob_num']
|
||||
iv = b['iv']
|
||||
blob_hashsum = get_lbry_hash_obj()
|
||||
if length != 0:
|
||||
blob_hashsum.update(blob_hash.encode())
|
||||
blob_hashsum.update(str(blob_num).encode())
|
||||
blob_hashsum.update(iv.encode())
|
||||
blob_hashsum.update(str(length).encode())
|
||||
return blob_hashsum.digest()
|
||||
|
||||
@staticmethod
|
||||
def calculate_stream_hash(hex_stream_name: bytes, key: bytes, hex_suggested_file_name: bytes,
|
||||
blob_infos: typing.List[typing.Dict]) -> str:
|
||||
h = get_lbry_hash_obj()
|
||||
h.update(hex_stream_name)
|
||||
h.update(key)
|
||||
h.update(hex_suggested_file_name)
|
||||
blobs_hashsum = get_lbry_hash_obj()
|
||||
for blob in blob_infos:
|
||||
blobs_hashsum.update(StreamDescriptor.get_blob_hashsum(blob))
|
||||
h.update(blobs_hashsum.digest())
|
||||
return h.hexdigest()
|
||||
|
||||
@classmethod
|
||||
async def create_stream(cls, loop: asyncio.BaseEventLoop, blob_dir: str,
|
||||
file_path: str, key: typing.Optional[bytes] = None,
|
||||
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None
|
||||
) -> 'StreamDescriptor':
|
||||
|
||||
blobs: typing.List[BlobInfo] = []
|
||||
|
||||
iv_generator = iv_generator or random_iv_generator()
|
||||
key = key or os.urandom(AES.block_size // 8)
|
||||
blob_num = -1
|
||||
for blob_bytes in file_reader(file_path):
|
||||
blob_num += 1
|
||||
blob_info = await BlobFile.create_from_unencrypted(
|
||||
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num
|
||||
)
|
||||
blobs.append(blob_info)
|
||||
blobs.append(
|
||||
BlobInfo(len(blobs), 0, binascii.hexlify(next(iv_generator)).decode())) # add the stream terminator
|
||||
descriptor = cls(
|
||||
loop, blob_dir, os.path.basename(file_path), binascii.hexlify(key).decode(), os.path.basename(file_path),
|
||||
blobs
|
||||
)
|
||||
sd_blob = await descriptor.make_sd_blob()
|
||||
descriptor.sd_hash = sd_blob.blob_hash
|
||||
return descriptor
|
||||
|
||||
def lower_bound_decrypted_length(self) -> int:
|
||||
length = sum((blob.length - 1 for blob in self.blobs[:-2]))
|
||||
return length + self.blobs[-2].length - (AES.block_size // 8)
|
||||
|
||||
def upper_bound_decrypted_length(self) -> int:
|
||||
return self.lower_bound_decrypted_length() + (AES.block_size // 8)
|
235
lbrynet/stream/downloader.py
Normal file
235
lbrynet/stream/downloader.py
Normal file
|
@ -0,0 +1,235 @@
|
|||
import os
|
||||
import asyncio
|
||||
import typing
|
||||
import logging
|
||||
from lbrynet import conf
|
||||
from lbrynet.utils import drain_tasks, cancel_task
|
||||
from lbrynet.stream.assembler import StreamAssembler
|
||||
from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.dht.node import Node
|
||||
from lbrynet.dht.peer import KademliaPeer
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def drain_into(a: list, b: list):
|
||||
while a:
|
||||
b.append(a.pop())
|
||||
|
||||
|
||||
class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor to inherit BlobDownloader
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', sd_hash: str,
|
||||
peer_timeout: float, peer_connect_timeout: float, output_dir: typing.Optional[str] = None,
|
||||
output_file_name: typing.Optional[str] = None,
|
||||
fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||
super().__init__(loop, blob_manager, sd_hash)
|
||||
self.peer_timeout = peer_timeout
|
||||
self.peer_connect_timeout = peer_connect_timeout
|
||||
self.current_blob: 'BlobFile' = None
|
||||
|
||||
self.download_task: asyncio.Task = None
|
||||
self.accumulate_connections_task: asyncio.Task = None
|
||||
self.new_peer_event = asyncio.Event(loop=self.loop)
|
||||
self.active_connections: typing.Dict['KademliaPeer', BlobExchangeClientProtocol] = {}
|
||||
self.running_download_requests: typing.List[asyncio.Task] = []
|
||||
self.requested_from: typing.Dict[str, typing.Dict['KademliaPeer', asyncio.Task]] = {}
|
||||
self.output_dir = output_dir or os.getcwd()
|
||||
self.output_file_name = output_file_name
|
||||
self._lock = asyncio.Lock(loop=self.loop)
|
||||
self.max_connections_per_stream = 8 if not conf.settings else conf.settings['max_connections_per_stream']
|
||||
self.fixed_peers = fixed_peers or []
|
||||
|
||||
async def _update_current_blob(self, blob: 'BlobFile'):
|
||||
async with self._lock:
|
||||
drain_tasks(self.running_download_requests)
|
||||
self.current_blob = blob
|
||||
if not blob.get_is_verified():
|
||||
self._update_requests()
|
||||
|
||||
async def _request_blob(self, peer: 'KademliaPeer'):
|
||||
if self.current_blob.get_is_verified():
|
||||
log.info("already verified")
|
||||
return
|
||||
if peer not in self.active_connections:
|
||||
log.warning("not active, adding: %s", str(peer))
|
||||
self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.peer_timeout)
|
||||
protocol = self.active_connections[peer]
|
||||
success, keep_connection = await request_blob(self.loop, self.current_blob, protocol,
|
||||
peer.address, peer.tcp_port, self.peer_connect_timeout)
|
||||
await protocol.close()
|
||||
if not keep_connection:
|
||||
log.info("drop peer %s:%i", peer.address, peer.tcp_port)
|
||||
if peer in self.active_connections:
|
||||
async with self._lock:
|
||||
del self.active_connections[peer]
|
||||
return
|
||||
log.info("keep peer %s:%i", peer.address, peer.tcp_port)
|
||||
|
||||
def _update_requests(self):
|
||||
self.new_peer_event.clear()
|
||||
if self.current_blob.blob_hash not in self.requested_from:
|
||||
self.requested_from[self.current_blob.blob_hash] = {}
|
||||
to_add = []
|
||||
for peer in self.active_connections.keys():
|
||||
if peer not in self.requested_from[self.current_blob.blob_hash] and peer not in to_add:
|
||||
to_add.append(peer)
|
||||
if to_add or self.running_download_requests:
|
||||
log.info("adding download probes for %i peers to %i already active",
|
||||
min(len(to_add), 8 - len(self.running_download_requests)),
|
||||
len(self.running_download_requests))
|
||||
else:
|
||||
log.info("downloader idle...")
|
||||
for peer in to_add:
|
||||
if len(self.running_download_requests) >= 8:
|
||||
break
|
||||
task = self.loop.create_task(self._request_blob(peer))
|
||||
self.requested_from[self.current_blob.blob_hash][peer] = task
|
||||
self.running_download_requests.append(task)
|
||||
|
||||
async def wait_for_download_or_new_peer(self) -> typing.Optional['BlobFile']:
|
||||
async with self._lock:
|
||||
if len(self.running_download_requests) < self.max_connections_per_stream:
|
||||
# update the running download requests
|
||||
self._update_requests()
|
||||
|
||||
# drain the tasks into a temporary list
|
||||
download_tasks = []
|
||||
drain_into(self.running_download_requests, download_tasks)
|
||||
|
||||
got_new_peer = self.loop.create_task(self.new_peer_event.wait())
|
||||
|
||||
# wait for a new peer to be added or for a download attempt to finish
|
||||
await asyncio.wait([got_new_peer] + download_tasks, return_when='FIRST_COMPLETED',
|
||||
loop=self.loop)
|
||||
if got_new_peer and not got_new_peer.done():
|
||||
got_new_peer.cancel()
|
||||
async with self._lock:
|
||||
if self.current_blob.get_is_verified():
|
||||
if got_new_peer and not got_new_peer.done():
|
||||
got_new_peer.cancel()
|
||||
drain_tasks(download_tasks)
|
||||
return self.current_blob
|
||||
else:
|
||||
for task in download_tasks:
|
||||
if task and not task.done():
|
||||
self.running_download_requests.append(task)
|
||||
return
|
||||
|
||||
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||
blob = self.blob_manager.get_blob(blob_hash, length)
|
||||
await self._update_current_blob(blob)
|
||||
if blob.get_is_verified():
|
||||
return blob
|
||||
|
||||
# the blob must be downloaded
|
||||
try:
|
||||
while not self.current_blob.get_is_verified():
|
||||
if not self.active_connections: # wait for a new connection
|
||||
await self.new_peer_event.wait()
|
||||
continue
|
||||
blob = await self.wait_for_download_or_new_peer()
|
||||
if blob:
|
||||
drain_tasks(self.running_download_requests)
|
||||
return blob
|
||||
return blob
|
||||
except asyncio.CancelledError:
|
||||
drain_tasks(self.running_download_requests)
|
||||
raise
|
||||
|
||||
def _add_peer_protocols(self, peers: typing.List['KademliaPeer']):
|
||||
added = 0
|
||||
for peer in peers:
|
||||
if peer not in self.active_connections:
|
||||
self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.peer_timeout)
|
||||
added += 1
|
||||
if added:
|
||||
if not self.new_peer_event.is_set():
|
||||
log.info("added %i new peers", len(peers))
|
||||
self.new_peer_event.set()
|
||||
|
||||
async def _accumulate_connections(self, node: 'Node'):
|
||||
blob_queue = asyncio.Queue(loop=self.loop)
|
||||
blob_queue.put_nowait(self.sd_hash)
|
||||
task = asyncio.create_task(self.got_descriptor.wait())
|
||||
added_peers = asyncio.Event(loop=self.loop)
|
||||
add_fixed_peers_timer: typing.Optional[asyncio.Handle] = None
|
||||
|
||||
if self.fixed_peers:
|
||||
def check_added_peers():
|
||||
if not added_peers.is_set():
|
||||
self._add_peer_protocols(self.fixed_peers)
|
||||
log.info("no dht peers for download yet, adding fixed peer")
|
||||
added_peers.set()
|
||||
|
||||
add_fixed_peers_timer = self.loop.call_later(2, check_added_peers)
|
||||
|
||||
def got_descriptor(f):
|
||||
try:
|
||||
f.result()
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
log.info("add head blob hash to peer search")
|
||||
blob_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
||||
|
||||
task.add_done_callback(got_descriptor)
|
||||
try:
|
||||
async with node.stream_peer_search_junction(blob_queue) as search_junction:
|
||||
async for peers in search_junction:
|
||||
if not isinstance(peers, list): # TODO: what's up with this?
|
||||
log.error("not a list: %s %s", peers, str(type(peers)))
|
||||
else:
|
||||
self._add_peer_protocols(peers)
|
||||
if not added_peers.is_set():
|
||||
added_peers.set()
|
||||
return
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
log.info("cancelled head blob task")
|
||||
if add_fixed_peers_timer and not add_fixed_peers_timer.cancelled():
|
||||
add_fixed_peers_timer.cancel()
|
||||
|
||||
async def stop(self):
|
||||
cancel_task(self.accumulate_connections_task)
|
||||
self.accumulate_connections_task = None
|
||||
drain_tasks(self.running_download_requests)
|
||||
|
||||
while self.requested_from:
|
||||
_, peer_task_dict = self.requested_from.popitem()
|
||||
while peer_task_dict:
|
||||
peer, task = peer_task_dict.popitem()
|
||||
try:
|
||||
cancel_task(task)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
while self.active_connections:
|
||||
_, client = self.active_connections.popitem()
|
||||
if client:
|
||||
await client.close()
|
||||
log.info("stopped downloader")
|
||||
|
||||
async def _download(self):
|
||||
try:
|
||||
|
||||
log.info("download and decrypt stream")
|
||||
await self.assemble_decrypted_stream(self.output_dir, self.output_file_name)
|
||||
log.info(
|
||||
"downloaded stream %s -> %s", self.sd_hash, self.output_path
|
||||
)
|
||||
await self.blob_manager.storage.change_file_status(
|
||||
self.descriptor.stream_hash, 'finished'
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
await self.stop()
|
||||
|
||||
def download(self, node: 'Node'):
|
||||
self.accumulate_connections_task = self.loop.create_task(self._accumulate_connections(node))
|
||||
self.download_task = self.loop.create_task(self._download())
|
151
lbrynet/stream/managed_stream.py
Normal file
151
lbrynet/stream/managed_stream.py
Normal file
|
@ -0,0 +1,151 @@
|
|||
import os
|
||||
import asyncio
|
||||
import typing
|
||||
import logging
|
||||
from lbrynet.extras.daemon.mime_types import guess_media_type
|
||||
from lbrynet.stream.downloader import StreamDownloader
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.extras.daemon.storage import StoredStreamClaim
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ManagedStream:
|
||||
STATUS_RUNNING = "running"
|
||||
STATUS_STOPPED = "stopped"
|
||||
STATUS_FINISHED = "finished"
|
||||
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', descriptor: 'StreamDescriptor',
|
||||
download_directory: str, file_name: str, downloader: typing.Optional[StreamDownloader] = None,
|
||||
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional['StoredStreamClaim'] = None):
|
||||
self.loop = loop
|
||||
self.blob_manager = blob_manager
|
||||
self.download_directory = download_directory
|
||||
self.file_name = file_name
|
||||
self.descriptor = descriptor
|
||||
self.downloader = downloader
|
||||
self.stream_hash = descriptor.stream_hash
|
||||
self.stream_claim_info = claim
|
||||
self._status = status
|
||||
self._store_after_finished: asyncio.Task = None
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
return self._status
|
||||
|
||||
def update_status(self, status: str):
|
||||
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
|
||||
self._status = status
|
||||
|
||||
@property
|
||||
def finished(self) -> bool:
|
||||
return self.status == self.STATUS_FINISHED
|
||||
|
||||
@property
|
||||
def running(self) -> bool:
|
||||
return self.status == self.STATUS_RUNNING
|
||||
|
||||
@property
|
||||
def claim_id(self) -> typing.Optional[str]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.claim_id
|
||||
|
||||
@property
|
||||
def txid(self) -> typing.Optional[str]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.txid
|
||||
|
||||
@property
|
||||
def nout(self) -> typing.Optional[int]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.nout
|
||||
|
||||
@property
|
||||
def outpoint(self) -> typing.Optional[str]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.outpoint
|
||||
|
||||
@property
|
||||
def claim_height(self) -> typing.Optional[int]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.height
|
||||
|
||||
@property
|
||||
def channel_claim_id(self) -> typing.Optional[str]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
|
||||
|
||||
@property
|
||||
def channel_name(self) -> typing.Optional[str]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.channel_name
|
||||
|
||||
@property
|
||||
def claim_name(self) -> typing.Optional[str]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
|
||||
|
||||
@property
|
||||
def metadata(self) ->typing.Optional[typing.Dict]:
|
||||
return None if not self.stream_claim_info else self.stream_claim_info.claim.claim_dict['stream']['metadata']
|
||||
|
||||
@property
|
||||
def blobs_completed(self) -> int:
|
||||
return sum([1 if self.blob_manager.get_blob(b.blob_hash).get_is_verified() else 0
|
||||
for b in self.descriptor.blobs[:-1]])
|
||||
|
||||
@property
|
||||
def blobs_in_stream(self) -> int:
|
||||
return len(self.descriptor.blobs) - 1
|
||||
|
||||
@property
|
||||
def sd_hash(self):
|
||||
return self.descriptor.sd_hash
|
||||
|
||||
def as_dict(self) -> typing.Dict:
|
||||
full_path = os.path.join(self.download_directory, self.file_name)
|
||||
if not os.path.exists(full_path):
|
||||
full_path = None
|
||||
mime_type = guess_media_type(os.path.basename(self.file_name))
|
||||
return {
|
||||
'completed': self.finished,
|
||||
'file_name': self.file_name,
|
||||
'download_directory': self.download_directory,
|
||||
'points_paid': 0.0,
|
||||
'stopped': not self.running,
|
||||
'stream_hash': self.stream_hash,
|
||||
'stream_name': self.descriptor.stream_name,
|
||||
'suggested_file_name': self.descriptor.suggested_file_name,
|
||||
'sd_hash': self.descriptor.sd_hash,
|
||||
'download_path': full_path,
|
||||
'mime_type': mime_type,
|
||||
'key': self.descriptor.key,
|
||||
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
|
||||
'total_bytes': self.descriptor.upper_bound_decrypted_length(),
|
||||
'written_bytes': None if not full_path else self.downloader.written_bytes or os.stat(full_path).st_size,
|
||||
'blobs_completed': self.blobs_completed,
|
||||
'blobs_in_stream': self.blobs_in_stream,
|
||||
'status': self.status,
|
||||
'claim_id': self.claim_id,
|
||||
'txid': self.txid,
|
||||
'nout': self.nout,
|
||||
'outpoint': self.outpoint,
|
||||
'metadata': self.metadata,
|
||||
'channel_claim_id': self.channel_claim_id,
|
||||
'channel_name': self.channel_name,
|
||||
'claim_name': self.claim_name
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def create(cls, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager',
|
||||
file_path: str) -> 'ManagedStream':
|
||||
descriptor = await StreamDescriptor.create_stream(
|
||||
loop, blob_manager.blob_dir, file_path
|
||||
)
|
||||
sd_blob = blob_manager.get_blob(descriptor.sd_hash)
|
||||
await blob_manager.blob_completed(sd_blob)
|
||||
await blob_manager.storage.store_stream(
|
||||
blob_manager.get_blob(descriptor.sd_hash), descriptor
|
||||
)
|
||||
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
|
||||
status=cls.STATUS_FINISHED)
|
||||
|
||||
async def stop_download(self):
|
||||
if self.downloader:
|
||||
await self.downloader.stop()
|
||||
if not self.finished:
|
||||
self.update_status(self.STATUS_STOPPED)
|
255
lbrynet/stream/stream_manager.py
Normal file
255
lbrynet/stream/stream_manager.py
Normal file
|
@ -0,0 +1,255 @@
|
|||
import os
|
||||
import asyncio
|
||||
import typing
|
||||
import binascii
|
||||
import logging
|
||||
from lbrynet.stream.downloader import StreamDownloader
|
||||
from lbrynet.stream.managed_stream import ManagedStream
|
||||
from lbrynet.schema.claim import ClaimDict
|
||||
from lbrynet.extras.daemon.storage import StoredStreamClaim, lbc_to_dewies
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.dht.peer import KademliaPeer
|
||||
from lbrynet.dht.node import Node
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.extras.wallet import LbryWalletManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
filter_fields = [
|
||||
'status',
|
||||
'file_name',
|
||||
'sd_hash',
|
||||
'stream_hash',
|
||||
'claim_name',
|
||||
'claim_height',
|
||||
'claim_id',
|
||||
'outpoint',
|
||||
'txid',
|
||||
'nout',
|
||||
'channel_claim_id',
|
||||
'channel_name',
|
||||
]
|
||||
|
||||
comparison_operators = {
|
||||
'eq': lambda a, b: a == b,
|
||||
'ne': lambda a, b: a != b,
|
||||
'g': lambda a, b: a > b,
|
||||
'l': lambda a, b: a < b,
|
||||
'ge': lambda a, b: a >= b,
|
||||
'le': lambda a, b: a <= b,
|
||||
}
|
||||
|
||||
|
||||
class StreamManager:
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', wallet: 'LbryWalletManager',
|
||||
storage: 'SQLiteStorage', node: 'Node', peer_timeout: float, peer_connect_timeout: float,
|
||||
fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||
self.loop = loop
|
||||
self.blob_manager = blob_manager
|
||||
self.wallet = wallet
|
||||
self.storage = storage
|
||||
self.node = node
|
||||
self.peer_timeout = peer_timeout
|
||||
self.peer_connect_timeout = peer_connect_timeout
|
||||
self.streams: typing.Set[ManagedStream] = set()
|
||||
self.starting_streams: typing.Dict[str, asyncio.Future] = {}
|
||||
self.resume_downloading_task: asyncio.Task = None
|
||||
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
||||
self.fixed_peers = fixed_peers
|
||||
|
||||
async def load_streams_from_database(self):
|
||||
infos = await self.storage.get_all_lbry_files()
|
||||
for file_info in infos:
|
||||
sd_blob = self.blob_manager.get_blob(file_info['sd_hash'])
|
||||
if sd_blob.get_is_verified():
|
||||
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
|
||||
downloader = StreamDownloader(
|
||||
self.loop, self.blob_manager, descriptor.sd_hash, self.peer_timeout,
|
||||
self.peer_connect_timeout, binascii.unhexlify(file_info['download_directory']).decode(),
|
||||
binascii.unhexlify(file_info['file_name']).decode(), self.fixed_peers
|
||||
)
|
||||
stream = ManagedStream(
|
||||
self.loop, self.blob_manager, descriptor,
|
||||
binascii.unhexlify(file_info['download_directory']).decode(),
|
||||
binascii.unhexlify(file_info['file_name']).decode(),
|
||||
downloader, file_info['status'], file_info['claim']
|
||||
)
|
||||
self.streams.add(stream)
|
||||
|
||||
async def resume(self):
|
||||
await self.node.joined.wait()
|
||||
resumed = 0
|
||||
for stream in self.streams:
|
||||
if stream.status == ManagedStream.STATUS_RUNNING:
|
||||
resumed += 1
|
||||
stream.downloader.download(self.node)
|
||||
self.wait_for_stream_finished(stream)
|
||||
if resumed:
|
||||
log.info("resuming %i downloads", resumed)
|
||||
|
||||
async def start(self):
|
||||
await self.load_streams_from_database()
|
||||
self.resume_downloading_task = self.loop.create_task(self.resume())
|
||||
|
||||
async def stop(self):
|
||||
if self.resume_downloading_task and not self.resume_downloading_task.done():
|
||||
self.resume_downloading_task.cancel()
|
||||
while self.streams:
|
||||
stream = self.streams.pop()
|
||||
await stream.stop_download()
|
||||
while self.update_stream_finished_futs:
|
||||
self.update_stream_finished_futs.pop().cancel()
|
||||
|
||||
async def create_stream(self, file_path: str) -> ManagedStream:
|
||||
stream = await ManagedStream.create(self.loop, self.blob_manager, file_path)
|
||||
self.streams.add(stream)
|
||||
return stream
|
||||
|
||||
async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):
|
||||
await stream.stop_download()
|
||||
self.streams.remove(stream)
|
||||
await self.storage.delete_stream(stream.descriptor)
|
||||
|
||||
blob_hashes = [stream.sd_hash]
|
||||
for blob_info in stream.descriptor.blobs[:-1]:
|
||||
blob_hashes.append(blob_info.blob_hash)
|
||||
for blob_hash in blob_hashes:
|
||||
blob = self.blob_manager.get_blob(blob_hash)
|
||||
if blob.get_is_verified():
|
||||
await blob.delete()
|
||||
|
||||
if delete_file:
|
||||
path = os.path.join(stream.download_directory, stream.file_name)
|
||||
if os.path.isfile(path):
|
||||
os.remove(path)
|
||||
|
||||
def wait_for_stream_finished(self, stream: ManagedStream):
|
||||
async def _wait_for_stream_finished():
|
||||
if stream.downloader and stream.running:
|
||||
try:
|
||||
await stream.downloader.stream_finished_event.wait()
|
||||
stream.update_status(ManagedStream.STATUS_FINISHED)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
task = self.loop.create_task(_wait_for_stream_finished())
|
||||
self.update_stream_finished_futs.append(task)
|
||||
task.add_done_callback(
|
||||
lambda _: None if task not in self.update_stream_finished_futs else
|
||||
self.update_stream_finished_futs.remove(task)
|
||||
)
|
||||
|
||||
async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict,
|
||||
file_name: typing.Optional[str] = None, data_rate: typing.Optional[int] = 0,
|
||||
sd_blob_timeout: typing.Optional[float] = 60
|
||||
) -> typing.Optional[ManagedStream]:
|
||||
|
||||
claim = ClaimDict.load_dict(claim_info['value'])
|
||||
downloader = StreamDownloader(self.loop, self.blob_manager, claim.source_hash.decode(), self.peer_timeout,
|
||||
self.peer_connect_timeout, download_directory, file_name, self.fixed_peers)
|
||||
try:
|
||||
downloader.download(node)
|
||||
await asyncio.wait_for(downloader.got_descriptor.wait(), sd_blob_timeout)
|
||||
log.info("got descriptor %s for %s", claim.source_hash.decode(), claim_info['name'])
|
||||
except (asyncio.TimeoutError, asyncio.CancelledError):
|
||||
log.info("stream timeout")
|
||||
await downloader.stop()
|
||||
log.info("stopped stream")
|
||||
return
|
||||
if not await self.blob_manager.storage.stream_exists(downloader.sd_hash):
|
||||
await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor)
|
||||
if not await self.blob_manager.storage.file_exists(downloader.sd_hash):
|
||||
await self.blob_manager.storage.save_downloaded_file(
|
||||
downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory,
|
||||
data_rate
|
||||
)
|
||||
await self.blob_manager.storage.save_content_claim(
|
||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
||||
)
|
||||
|
||||
stored_claim = StoredStreamClaim(
|
||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
|
||||
claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['hex'],
|
||||
claim.certificate_id, claim_info['address'], claim_info['claim_sequence'],
|
||||
claim_info.get('channel_name')
|
||||
)
|
||||
stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory,
|
||||
os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING,
|
||||
stored_claim)
|
||||
self.streams.add(stream)
|
||||
try:
|
||||
await stream.downloader.wrote_bytes_event.wait()
|
||||
self.wait_for_stream_finished(stream)
|
||||
return stream
|
||||
except asyncio.CancelledError:
|
||||
await downloader.stop()
|
||||
|
||||
async def download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict,
|
||||
file_name: typing.Optional[str] = None,
|
||||
sd_blob_timeout: typing.Optional[float] = 60,
|
||||
fee_amount: typing.Optional[float] = 0.0,
|
||||
fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
|
||||
log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id'])
|
||||
claim = ClaimDict.load_dict(claim_info['value'])
|
||||
if fee_address and fee_amount:
|
||||
if fee_amount > await self.wallet.default_account.get_balance():
|
||||
raise Exception("not enough funds")
|
||||
sd_hash = claim.source_hash.decode()
|
||||
if sd_hash in self.starting_streams:
|
||||
return await self.starting_streams[sd_hash]
|
||||
already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams))
|
||||
if already_started:
|
||||
return already_started[0]
|
||||
|
||||
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
|
||||
stream_task = self.loop.create_task(
|
||||
self._download_stream_from_claim(node, download_directory, claim_info, file_name, 0, sd_blob_timeout)
|
||||
)
|
||||
try:
|
||||
await asyncio.wait_for(stream_task, sd_blob_timeout)
|
||||
stream = await stream_task
|
||||
self.starting_streams[sd_hash].set_result(stream)
|
||||
if fee_address and fee_amount:
|
||||
await self.wallet.send_amount_to_address(lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1'))
|
||||
return stream
|
||||
except (asyncio.TimeoutError, asyncio.CancelledError):
|
||||
return
|
||||
finally:
|
||||
if sd_hash in self.starting_streams:
|
||||
del self.starting_streams[sd_hash]
|
||||
log.info("returned from get lbry://%s#%s", claim_info['name'], claim_info['claim_id'])
|
||||
|
||||
def get_filtered_streams(self, sort_by: typing.Optional[str] = None, reverse: typing.Optional[bool] = False,
|
||||
comparison: typing.Optional[str] = None,
|
||||
**search_by) -> typing.List[ManagedStream]:
|
||||
"""
|
||||
Get a list of filtered and sorted ManagedStream objects
|
||||
|
||||
:param sort_by: field to sort by
|
||||
:param reverse: reverse sorting
|
||||
:param comparison: comparison operator used for filtering
|
||||
:param search_by: fields and values to filter by
|
||||
"""
|
||||
if sort_by and sort_by not in filter_fields:
|
||||
raise ValueError(f"'{sort_by}' is not a valid field to sort by")
|
||||
if comparison and comparison not in comparison_operators:
|
||||
raise ValueError(f"'{comparison}' is not a valid comparison")
|
||||
for search in search_by.keys():
|
||||
if search not in filter_fields:
|
||||
raise ValueError(f"'{search}' is not a valid search operation")
|
||||
if search_by:
|
||||
comparison = comparison or 'eq'
|
||||
streams = []
|
||||
for stream in self.streams:
|
||||
for search, val in search_by.items():
|
||||
if comparison_operators[comparison](getattr(stream, search), val):
|
||||
streams.append(stream)
|
||||
break
|
||||
else:
|
||||
streams = list(self.streams)
|
||||
if sort_by:
|
||||
streams.sort(key=lambda s: getattr(s, sort_by))
|
||||
if reverse:
|
||||
streams.reverse()
|
||||
return streams
|
0
tests/unit/stream/__init__.py
Normal file
0
tests/unit/stream/__init__.py
Normal file
76
tests/unit/stream/test_assembler.py
Normal file
76
tests/unit/stream/test_assembler.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
import os
|
||||
import asyncio
|
||||
import tempfile
|
||||
import shutil
|
||||
from torba.testcase import AsyncioTestCase
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
from lbrynet.stream.assembler import StreamAssembler
|
||||
|
||||
|
||||
class TestStreamAssembler(AsyncioTestCase):
|
||||
def setUp(self):
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.key = b'deadbeef' * 4
|
||||
self.cleartext = b'test'
|
||||
|
||||
async def test_create_and_decrypt_one_blob_stream(self):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
self.storage = SQLiteStorage(os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||
await self.storage.open()
|
||||
self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage)
|
||||
|
||||
download_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(download_dir))
|
||||
|
||||
# create the stream
|
||||
file_path = os.path.join(tmp_dir, "test_file")
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(self.cleartext)
|
||||
|
||||
sd = await StreamDescriptor.create_stream(self.loop, tmp_dir, file_path, key=self.key)
|
||||
|
||||
# copy blob files
|
||||
sd_hash = sd.calculate_sd_hash()
|
||||
shutil.copy(os.path.join(tmp_dir, sd_hash), os.path.join(download_dir, sd_hash))
|
||||
for blob_info in sd.blobs:
|
||||
if blob_info.blob_hash:
|
||||
shutil.copy(os.path.join(tmp_dir, blob_info.blob_hash), os.path.join(download_dir, blob_info.blob_hash))
|
||||
downloader_storage = SQLiteStorage(os.path.join(download_dir, "lbrynet.sqlite"))
|
||||
await downloader_storage.open()
|
||||
|
||||
# add the blobs to the blob table (this would happen upon a blob download finishing)
|
||||
downloader_blob_manager = BlobFileManager(self.loop, download_dir, downloader_storage)
|
||||
descriptor = await downloader_blob_manager.get_stream_descriptor(sd_hash)
|
||||
|
||||
# assemble the decrypted file
|
||||
assembler = StreamAssembler(self.loop, downloader_blob_manager, descriptor.sd_hash)
|
||||
await assembler.assemble_decrypted_stream(download_dir)
|
||||
|
||||
with open(os.path.join(download_dir, "test_file"), "rb") as f:
|
||||
decrypted = f.read()
|
||||
self.assertEqual(decrypted, self.cleartext)
|
||||
self.assertEqual(True, self.blob_manager.get_blob(sd_hash).get_is_verified())
|
||||
|
||||
await downloader_storage.close()
|
||||
await self.storage.close()
|
||||
|
||||
async def test_create_and_decrypt_multi_blob_stream(self):
|
||||
self.cleartext = b'test\n' * 20000000
|
||||
await self.test_create_and_decrypt_one_blob_stream()
|
||||
|
||||
async def test_create_and_decrypt_padding(self):
|
||||
for i in range(16):
|
||||
self.cleartext = os.urandom((MAX_BLOB_SIZE*2) + i)
|
||||
await self.test_create_and_decrypt_one_blob_stream()
|
||||
|
||||
for i in range(16):
|
||||
self.cleartext = os.urandom((MAX_BLOB_SIZE*2) - i)
|
||||
await self.test_create_and_decrypt_one_blob_stream()
|
||||
|
||||
async def test_create_and_decrypt_random(self):
|
||||
self.cleartext = os.urandom(20000000)
|
||||
await self.test_create_and_decrypt_one_blob_stream()
|
81
tests/unit/stream/test_downloader.py
Normal file
81
tests/unit/stream/test_downloader.py
Normal file
|
@ -0,0 +1,81 @@
|
|||
import os
|
||||
import mock
|
||||
import asyncio
|
||||
import contextlib
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
from lbrynet.stream.downloader import StreamDownloader
|
||||
from lbrynet.dht.node import Node
|
||||
from lbrynet.dht.peer import KademliaPeer
|
||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
|
||||
|
||||
|
||||
class TestStreamDownloader(BlobExchangeTestBase):
|
||||
async def setup_stream(self, blob_count: int = 10):
|
||||
self.stream_bytes = b''
|
||||
for _ in range(blob_count):
|
||||
self.stream_bytes += os.urandom((MAX_BLOB_SIZE - 1))
|
||||
# create the stream
|
||||
file_path = os.path.join(self.server_dir, "test_file")
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(self.stream_bytes)
|
||||
descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path)
|
||||
self.sd_hash = descriptor.calculate_sd_hash()
|
||||
self.downloader = StreamDownloader(self.loop, self.client_blob_manager, self.sd_hash, 3, 3, self.client_dir)
|
||||
|
||||
async def _test_transfer_stream(self, blob_count: int, mock_peer_search=None):
|
||||
await self.setup_stream(blob_count)
|
||||
|
||||
mock_node = mock.Mock(spec=Node)
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def _mock_peer_search(*_):
|
||||
async def _gen():
|
||||
yield [self.server_from_client]
|
||||
return
|
||||
|
||||
yield _gen()
|
||||
|
||||
mock_node.stream_peer_search_junction = mock_peer_search or _mock_peer_search
|
||||
|
||||
self.downloader.download(mock_node)
|
||||
await self.downloader.stream_finished_event.wait()
|
||||
await self.downloader.stop()
|
||||
self.assertTrue(os.path.isfile(self.downloader.output_path))
|
||||
with open(self.downloader.output_path, 'rb') as f:
|
||||
self.assertEqual(f.read(), self.stream_bytes)
|
||||
|
||||
async def test_transfer_stream(self):
|
||||
await self._test_transfer_stream(10)
|
||||
|
||||
async def test_transfer_hundred_blob_stream(self):
|
||||
await self._test_transfer_stream(100)
|
||||
|
||||
async def test_transfer_stream_bad_first_peer_good_second(self):
|
||||
await self.setup_stream(2)
|
||||
|
||||
mock_node = mock.Mock(spec=Node)
|
||||
|
||||
bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334)
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def mock_peer_search(*_):
|
||||
async def _gen():
|
||||
await asyncio.sleep(0.05, loop=self.loop)
|
||||
yield [bad_peer]
|
||||
await asyncio.sleep(0.1, loop=self.loop)
|
||||
yield [self.server_from_client]
|
||||
return
|
||||
|
||||
yield _gen()
|
||||
|
||||
mock_node.stream_peer_search_junction = mock_peer_search
|
||||
|
||||
self.downloader.download(mock_node)
|
||||
await self.downloader.stream_finished_event.wait()
|
||||
await self.downloader.stop()
|
||||
self.assertTrue(os.path.isfile(self.downloader.output_path))
|
||||
with open(self.downloader.output_path, 'rb') as f:
|
||||
self.assertEqual(f.read(), self.stream_bytes)
|
||||
# self.assertIs(self.server_from_client.tcp_last_down, None)
|
||||
# self.assertIsNot(bad_peer.tcp_last_down, None)
|
Loading…
Reference in a new issue