Merge pull request #1432 from lbryio/get-block-on-bytes-written

[API] block get on bytes being written to the output file
This commit is contained in:
Jack Robison 2018-11-26 12:16:42 -05:00 committed by GitHub
commit e74c78b607
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 141 deletions

View file

@ -75,7 +75,7 @@ class StreamBlobDecryptor:
def finish_decrypt(): def finish_decrypt():
bytes_left = len(self.buff) % (AES.block_size // 8) bytes_left = len(self.buff) % (AES.block_size // 8)
if bytes_left != 0: if bytes_left != 0:
log.warning(self.buff[-1 * (AES.block_size // 8):].encode('hex')) log.warning(binascii.hexlify(self.buff[-1 * (AES.block_size // 8):]).decode())
raise Exception("blob %s has incorrect padding: %i bytes left" % raise Exception("blob %s has incorrect padding: %i bytes left" %
(self.blob.blob_hash, bytes_left)) (self.blob.blob_hash, bytes_left))
data_to_decrypt, self.buff = self.buff, b'' data_to_decrypt, self.buff = self.buff, b''

View file

@ -44,8 +44,19 @@ class EncryptedFileDownloader(CryptStreamDownloader):
self.blob_manager, download_manager) self.blob_manager, download_manager)
def _start(self): def _start(self):
def check_start_succeeded(success):
if success:
self.starting = False
self.stopped = False
self.completed = False
return True
else:
return self._start_failed()
self.download_manager = self._get_download_manager()
d = self._setup_output() d = self._setup_output()
d.addCallback(lambda _: CryptStreamDownloader._start(self)) d.addCallback(lambda _: self.download_manager.start_downloading())
d.addCallbacks(check_start_succeeded)
return d return d
def _setup_output(self): def _setup_output(self):

View file

@ -1,14 +1,12 @@
import logging import logging
import os import os
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet import conf from lbrynet import conf
from lbrynet.schema.fee import Fee from lbrynet.schema.fee import Fee
from lbrynet.p2p.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError from lbrynet.p2p.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError
from lbrynet.p2p.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout from lbrynet.p2p.Error import DownloadDataTimeout, DownloadCanceledError
from lbrynet.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet.p2p.StreamDescriptor import download_sd_blob from lbrynet.p2p.StreamDescriptor import download_sd_blob
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from torba.client.constants import COIN from torba.client.constants import COIN
@ -34,8 +32,11 @@ log = logging.getLogger(__name__)
class GetStream: class GetStream:
def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter, def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter,
payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None,
reactor=None):
if not reactor:
from twisted.internet import reactor
self.reactor = reactor
self.timeout = timeout or conf.settings['download_timeout'] self.timeout = timeout or conf.settings['download_timeout']
self.data_rate = data_rate or conf.settings['data_rate'] self.data_rate = data_rate or conf.settings['data_rate']
self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1]
@ -53,44 +54,17 @@ class GetStream:
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.storage = storage self.storage = storage
self.downloader = None self.downloader = None
self.checker = LoopingCall(self.check_status)
# fired when the download is complete # fired when the download is complete
self.finished_deferred = None self.finished_deferred = None
# fired after the metadata and the first data blob have been downloaded # fired after the metadata and the first data blob have been downloaded
self.data_downloading_deferred = defer.Deferred(None) self.data_downloading_deferred = defer.Deferred(None)
self.wrote_data = False
@property @property
def download_path(self): def download_path(self):
return os.path.join(self.download_directory, self.downloader.file_name) return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status):
if status.num_completed > 0 and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called:
safe_stop_looping_call(self.checker)
else:
log.debug("Waiting for stream data (%i seconds)", self.timeout_counter)
def check_status(self):
"""
Check if we've got the first data blob in the stream yet
"""
self.timeout_counter += 1
if self.timeout_counter > self.timeout:
if not self.data_downloading_deferred.called:
if self.downloader:
err = DownloadDataTimeout(self.sd_hash)
else:
err = DownloadSDTimeout(self.sd_hash)
self.data_downloading_deferred.errback(err)
safe_stop_looping_call(self.checker)
elif self.downloader:
d = self.downloader.status()
d.addCallback(self._check_status)
else:
log.debug("Waiting for stream descriptor (%i seconds)", self.timeout_counter)
def convert_max_fee(self): def convert_max_fee(self):
currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount']
return self.exchange_rate_manager.convert_currency(currency, "LBC", amount) return self.exchange_rate_manager.convert_currency(currency, "LBC", amount)
@ -151,18 +125,13 @@ class GetStream:
else: else:
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks
def finish(self, results, name): def finish(self, results, name):
self.set_status(DOWNLOAD_STOPPED_CODE, name) self.set_status(DOWNLOAD_STOPPED_CODE, name)
log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6],
self.download_path) self.download_path)
safe_stop_looping_call(self.checker) return defer.succeed(self.download_path)
status = yield self.downloader.status()
self._check_status(status)
defer.returnValue(self.download_path)
def fail(self, err): def fail(self, err):
safe_stop_looping_call(self.checker)
raise err raise err
@defer.inlineCallbacks @defer.inlineCallbacks
@ -194,8 +163,10 @@ class GetStream:
self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) self.downloader = yield self._create_downloader(sd_blob, file_name=file_name)
yield self.pay_key_fee(key_fee, name) yield self.pay_key_fee(key_fee, name)
yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout))
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.downloader.download_manager.progress_manager.wrote_first_data.addCallback(
self.data_downloading_deferred.callback
)
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -211,16 +182,19 @@ class GetStream:
""" """
self.set_status(INITIALIZING_CODE, name) self.set_status(INITIALIZING_CODE, name)
key_fee = yield self._initialize(stream_info) key_fee = yield self._initialize(stream_info)
safe_start_looping_call(self.checker, 1)
self.set_status(DOWNLOAD_METADATA_CODE, name) self.set_status(DOWNLOAD_METADATA_CODE, name)
try: try:
sd_blob = yield self._download_sd_blob() sd_blob = yield self._download_sd_blob()
yield self._download(sd_blob, name, key_fee, txid, nout, file_name) yield self._download(sd_blob, name, key_fee, txid, nout, file_name)
self.set_status(DOWNLOAD_RUNNING_CODE, name) self.set_status(DOWNLOAD_RUNNING_CODE, name)
yield self.data_downloading_deferred log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.data_downloading_deferred.addTimeout(self.timeout, self.reactor)
try:
yield self.data_downloading_deferred
self.wrote_data = True
except defer.TimeoutError:
raise DownloadDataTimeout("data download timed out")
except (DownloadDataTimeout, InvalidStreamDescriptorError) as err: except (DownloadDataTimeout, InvalidStreamDescriptorError) as err:
safe_stop_looping_call(self.checker)
raise err raise err
defer.returnValue((self.downloader, self.finished_deferred)) defer.returnValue((self.downloader, self.finished_deferred))

View file

@ -1,15 +1,16 @@
import logging import logging
from twisted.internet import defer from twisted.internet import defer, task
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class StreamProgressManager: class FullStreamProgressManager:
#implements(IProgressManager) def __init__(self, finished_callback, blob_manager, download_manager,
delete_blob_after_finished: bool = False, reactor: task.Clock = None):
def __init__(self, finished_callback, blob_manager, if not reactor:
download_manager, delete_blob_after_finished=False): from twisted.internet import reactor
self.reactor = reactor
self.finished_callback = finished_callback self.finished_callback = finished_callback
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.delete_blob_after_finished = delete_blob_after_finished self.delete_blob_after_finished = delete_blob_after_finished
@ -19,15 +20,11 @@ class StreamProgressManager:
self.stopped = True self.stopped = True
self._next_try_to_output_call = None self._next_try_to_output_call = None
self.outputting_d = None self.outputting_d = None
self.wrote_first_data = defer.Deferred()
######### IProgressManager #########
def start(self): def start(self):
from twisted.internet import reactor
self.stopped = False self.stopped = False
self._next_try_to_output_call = reactor.callLater(0, self._try_to_output) self._next_try_to_output_call = self.reactor.callLater(0, self._try_to_output)
return defer.succeed(True) return defer.succeed(True)
def stop(self): def stop(self):
@ -37,64 +34,9 @@ class StreamProgressManager:
self._next_try_to_output_call = None self._next_try_to_output_call = None
return self._stop_outputting() return self._stop_outputting()
def blob_downloaded(self, blob, blob_num): # def blob_downloaded(self, blob, blob_num):
if self.outputting_d is None: # if self.outputting_d is None:
self._output_loop() # self._output_loop()
######### internal #########
def _finished_outputting(self):
self.finished_callback(True)
def _try_to_output(self):
from twisted.internet import reactor
self._next_try_to_output_call = reactor.callLater(1, self._try_to_output)
if self.outputting_d is None:
self._output_loop()
def _output_loop(self):
pass
def _stop_outputting(self):
if self.outputting_d is not None:
return self.outputting_d
return defer.succeed(None)
def _finished_with_blob(self, blob_num):
log.debug("In _finished_with_blob, blob_num = %s", str(blob_num))
if self.delete_blob_after_finished is True:
log.debug("delete_blob_after_finished is True")
blobs = self.download_manager.blobs
if blob_num in blobs:
log.debug("Telling the blob manager, %s, to delete blob %s",
self.blob_manager, blobs[blob_num].blob_hash)
self.blob_manager.delete_blobs([blobs[blob_num].blob_hash])
else:
log.debug("Blob number %s was not in blobs", str(blob_num))
else:
log.debug("delete_blob_after_finished is False")
class FullStreamProgressManager(StreamProgressManager):
def __init__(self, finished_callback, blob_manager,
download_manager, delete_blob_after_finished=False):
super().__init__(finished_callback, blob_manager, download_manager,
delete_blob_after_finished)
self.outputting_d = None
######### IProgressManager #########
def _done(self, i, blobs):
"""Return true if `i` is a blob number we don't have"""
return (
i not in blobs or
(
not blobs[i].get_is_verified() and
i not in self.provided_blob_nums
)
)
def stream_position(self): def stream_position(self):
blobs = self.download_manager.blobs blobs = self.download_manager.blobs
@ -113,12 +55,46 @@ class FullStreamProgressManager(StreamProgressManager):
if not b.get_is_verified() and not n in self.provided_blob_nums if not b.get_is_verified() and not n in self.provided_blob_nums
] ]
######### internal ######### def _finished_outputting(self):
self.finished_callback(True)
def _try_to_output(self):
self._next_try_to_output_call = self.reactor.callLater(1, self._try_to_output)
if self.outputting_d is None:
self._output_loop()
def _stop_outputting(self):
if self.outputting_d is not None:
return self.outputting_d
return defer.succeed(None)
def _finished_with_blob(self, blob_num: int) -> None:
if blob_num == 0 and not self.wrote_first_data.called:
self.wrote_first_data.callback(True)
log.debug("In _finished_with_blob, blob_num = %s", str(blob_num))
if self.delete_blob_after_finished is True:
log.debug("delete_blob_after_finished is True")
blobs = self.download_manager.blobs
if blob_num in blobs:
log.debug("Telling the blob manager, %s, to delete blob %s",
self.blob_manager, blobs[blob_num].blob_hash)
self.blob_manager.delete_blobs([blobs[blob_num].blob_hash])
else:
log.debug("Blob number %s was not in blobs", str(blob_num))
else:
log.debug("delete_blob_after_finished is False")
def _done(self, i: int, blobs: list) -> bool:
"""Return true if `i` is a blob number we don't have"""
return (
i not in blobs or
(
not blobs[i].get_is_verified() and
i not in self.provided_blob_nums
)
)
def _output_loop(self): def _output_loop(self):
from twisted.internet import reactor
if self.stopped: if self.stopped:
if self.outputting_d is not None: if self.outputting_d is not None:
self.outputting_d.callback(True) self.outputting_d.callback(True)
@ -139,7 +115,7 @@ class FullStreamProgressManager(StreamProgressManager):
self.outputting_d.callback(True) self.outputting_d.callback(True)
self.outputting_d = None self.outputting_d = None
else: else:
reactor.callLater(0, self._output_loop) self.reactor.callLater(0, self._output_loop)
current_blob_num = self.last_blob_outputted + 1 current_blob_num = self.last_blob_outputted + 1

View file

@ -9,6 +9,7 @@ from lbrynet.p2p.Error import DownloadDataTimeout, DownloadSDTimeout
from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.p2p.BlobManager import DiskBlobManager from lbrynet.p2p.BlobManager import DiskBlobManager
from lbrynet.p2p.RateLimiter import DummyRateLimiter from lbrynet.p2p.RateLimiter import DummyRateLimiter
from lbrynet.p2p.client.DownloadManager import DownloadManager
from lbrynet.extras.daemon import Downloader from lbrynet.extras.daemon import Downloader
from lbrynet.extras.daemon import ExchangeRateManager from lbrynet.extras.daemon import ExchangeRateManager
from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.extras.daemon.storage import SQLiteStorage
@ -24,7 +25,7 @@ class MocDownloader:
def __init__(self): def __init__(self):
self.finish_deferred = defer.Deferred(None) self.finish_deferred = defer.Deferred(None)
self.stop_called = False self.stop_called = False
self.file_name = 'test'
self.name = 'test' self.name = 'test'
self.num_completed = 0 self.num_completed = 0
self.num_known = 1 self.num_known = 1
@ -60,8 +61,10 @@ def moc_download(self, sd_blob, name, txid, nout, key_fee, file_name):
self.downloader.start() self.downloader.start()
def moc_pay_key_fee(self, key_fee, name): def moc_pay_key_fee(d):
self.pay_key_fee_called = True def _moc_pay_key_fee(self, key_fee, name):
d.callback(True)
return _moc_pay_key_fee
class GetStreamTests(unittest.TestCase): class GetStreamTests(unittest.TestCase):
@ -83,10 +86,7 @@ class GetStreamTests(unittest.TestCase):
sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, DummyRateLimiter(), prm, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, DummyRateLimiter(), prm,
storage, max_key_fee, disable_max_key_fee, timeout=3, data_rate=data_rate storage, max_key_fee, disable_max_key_fee, timeout=3, data_rate=data_rate
) )
getstream.pay_key_fee_called = False getstream.download_manager = mock.Mock(spec=DownloadManager)
self.clock = task.Clock()
getstream.checker.clock = self.clock
return getstream return getstream
@defer.inlineCallbacks @defer.inlineCallbacks
@ -112,16 +112,18 @@ class GetStreamTests(unittest.TestCase):
def download_sd_blob(self): def download_sd_blob(self):
raise DownloadSDTimeout(self) raise DownloadSDTimeout(self)
called_pay_fee = defer.Deferred()
getstream = self.init_getstream_with_mocs() getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream) getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(download_sd_blob, getstream) getstream._download_sd_blob = types.MethodType(download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream) getstream._download = types.MethodType(moc_download, getstream)
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream)
name = 'test' name = 'test'
stream_info = None stream_info = None
with self.assertRaises(DownloadSDTimeout): with self.assertRaises(DownloadSDTimeout):
yield getstream.start(stream_info, name, "deadbeef" * 12, 0) yield getstream.start(stream_info, name, "deadbeef" * 12, 0)
self.assertFalse(getstream.pay_key_fee_called) self.assertFalse(called_pay_fee.called)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_timeout(self): def test_timeout(self):
@ -129,20 +131,18 @@ class GetStreamTests(unittest.TestCase):
test that timeout (set to 3 here) exception is raised test that timeout (set to 3 here) exception is raised
when download times out while downloading first blob, and key fee is paid when download times out while downloading first blob, and key fee is paid
""" """
called_pay_fee = defer.Deferred()
getstream = self.init_getstream_with_mocs() getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream) getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream) getstream._download = types.MethodType(moc_download, getstream)
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream)
name = 'test' name = 'test'
stream_info = None stream_info = None
start = getstream.start(stream_info, name, "deadbeef" * 12, 0) start = getstream.start(stream_info, name, "deadbeef" * 12, 0)
self.clock.advance(1)
self.clock.advance(1)
self.clock.advance(1)
with self.assertRaises(DownloadDataTimeout): with self.assertRaises(DownloadDataTimeout):
yield start yield start
self.assertTrue(getstream.pay_key_fee_called)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_finish_one_blob(self): def test_finish_one_blob(self):
@ -150,20 +150,22 @@ class GetStreamTests(unittest.TestCase):
test that if we have 1 completed blob, start() returns test that if we have 1 completed blob, start() returns
and key fee is paid and key fee is paid
""" """
called_pay_fee = defer.Deferred()
getstream = self.init_getstream_with_mocs() getstream = self.init_getstream_with_mocs()
getstream._initialize = types.MethodType(moc_initialize, getstream) getstream._initialize = types.MethodType(moc_initialize, getstream)
getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream)
getstream._download = types.MethodType(moc_download, getstream) getstream._download = types.MethodType(moc_download, getstream)
getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream)
name = 'test' name = 'test'
stream_info = None stream_info = None
start = getstream.start(stream_info, name, "deadbeef" * 12, 0) self.assertFalse(getstream.wrote_data)
getstream.downloader.num_completed = 1 getstream.data_downloading_deferred.callback(None)
self.clock.advance(1) yield getstream.start(stream_info, name, "deadbeef" * 12, 0)
self.assertTrue(getstream.wrote_data)
downloader, f_deferred = yield start # self.assertTrue(getstream.pay_key_fee_called)
self.assertTrue(getstream.pay_key_fee_called)
# @defer.inlineCallbacks # @defer.inlineCallbacks
# def test_finish_stopped_downloader(self): # def test_finish_stopped_downloader(self):