fix tests
This commit is contained in:
parent
7a160f7335
commit
287b89db66
3 changed files with 33 additions and 23 deletions
|
@ -16,6 +16,7 @@ if typing.TYPE_CHECKING:
|
||||||
from lbrynet.blob.blob_info import BlobInfo
|
from lbrynet.blob.blob_info import BlobInfo
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
from lbrynet.extras.daemon.analytics import AnalyticsManager
|
from lbrynet.extras.daemon.analytics import AnalyticsManager
|
||||||
|
from lbrynet.wallet.transaction import Transaction
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -44,6 +45,7 @@ class ManagedStream:
|
||||||
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
|
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
|
||||||
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
|
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
|
||||||
descriptor: typing.Optional[StreamDescriptor] = None,
|
descriptor: typing.Optional[StreamDescriptor] = None,
|
||||||
|
content_fee: typing.Optional['Transaction'] = None,
|
||||||
analytics_manager: typing.Optional['AnalyticsManager'] = None):
|
analytics_manager: typing.Optional['AnalyticsManager'] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.config = config
|
self.config = config
|
||||||
|
@ -56,6 +58,7 @@ class ManagedStream:
|
||||||
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
|
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
|
||||||
self.rowid = rowid
|
self.rowid = rowid
|
||||||
self.written_bytes = 0
|
self.written_bytes = 0
|
||||||
|
self.content_fee = content_fee
|
||||||
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
|
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
|
||||||
self.analytics_manager = analytics_manager
|
self.analytics_manager = analytics_manager
|
||||||
self.fully_reflected = asyncio.Event(loop=self.loop)
|
self.fully_reflected = asyncio.Event(loop=self.loop)
|
||||||
|
@ -126,7 +129,7 @@ class ManagedStream:
|
||||||
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
|
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def metadata(self) ->typing.Optional[typing.Dict]:
|
def metadata(self) -> typing.Optional[typing.Dict]:
|
||||||
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
|
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -158,16 +161,13 @@ class ManagedStream:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def mime_type(self):
|
def mime_type(self):
|
||||||
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))
|
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
|
||||||
|
|
||||||
def as_dict(self) -> typing.Dict:
|
def as_dict(self) -> typing.Dict:
|
||||||
full_path = self.full_path if self.output_file_exists else None
|
|
||||||
mime_type = guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
|
|
||||||
|
|
||||||
if self.written_bytes:
|
if self.written_bytes:
|
||||||
written_bytes = self.written_bytes
|
written_bytes = self.written_bytes
|
||||||
elif full_path:
|
elif self.output_file_exists:
|
||||||
written_bytes = os.stat(full_path).st_size
|
written_bytes = os.stat(self.full_path).st_size
|
||||||
else:
|
else:
|
||||||
written_bytes = None
|
written_bytes = None
|
||||||
return {
|
return {
|
||||||
|
@ -180,7 +180,7 @@ class ManagedStream:
|
||||||
'stream_name': self.descriptor.stream_name,
|
'stream_name': self.descriptor.stream_name,
|
||||||
'suggested_file_name': self.descriptor.suggested_file_name,
|
'suggested_file_name': self.descriptor.suggested_file_name,
|
||||||
'sd_hash': self.descriptor.sd_hash,
|
'sd_hash': self.descriptor.sd_hash,
|
||||||
'download_path': full_path,
|
'download_path': self.full_path,
|
||||||
'mime_type': self.mime_type,
|
'mime_type': self.mime_type,
|
||||||
'key': self.descriptor.key,
|
'key': self.descriptor.key,
|
||||||
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
|
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
|
||||||
|
@ -198,7 +198,8 @@ class ManagedStream:
|
||||||
'protobuf': self.metadata_protobuf,
|
'protobuf': self.metadata_protobuf,
|
||||||
'channel_claim_id': self.channel_claim_id,
|
'channel_claim_id': self.channel_claim_id,
|
||||||
'channel_name': self.channel_name,
|
'channel_name': self.channel_name,
|
||||||
'claim_name': self.claim_name
|
'claim_name': self.claim_name,
|
||||||
|
'content_fee': self.content_fee # TODO: this isn't in the database
|
||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -228,12 +229,12 @@ class ManagedStream:
|
||||||
self.rowid = self.blob_manager.storage.save_downloaded_file(
|
self.rowid = self.blob_manager.storage.save_downloaded_file(
|
||||||
self.stream_hash, None, None, 0.0
|
self.stream_hash, None, None, 0.0
|
||||||
)
|
)
|
||||||
|
self.update_status(ManagedStream.STATUS_RUNNING)
|
||||||
|
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING)
|
||||||
self.update_delayed_stop()
|
self.update_delayed_stop()
|
||||||
else:
|
else:
|
||||||
await self.save_file(file_name, download_directory)
|
await self.save_file(file_name, download_directory)
|
||||||
await self.started_writing.wait()
|
await self.started_writing.wait()
|
||||||
self.update_status(ManagedStream.STATUS_RUNNING)
|
|
||||||
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING)
|
|
||||||
|
|
||||||
def update_delayed_stop(self):
|
def update_delayed_stop(self):
|
||||||
def _delayed_stop():
|
def _delayed_stop():
|
||||||
|
@ -261,6 +262,7 @@ class ManagedStream:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def _save_file(self, output_path: str):
|
async def _save_file(self, output_path: str):
|
||||||
|
log.debug("save file %s -> %s", self.sd_hash, output_path)
|
||||||
self.saving.set()
|
self.saving.set()
|
||||||
self.finished_writing.clear()
|
self.finished_writing.clear()
|
||||||
self.started_writing.clear()
|
self.started_writing.clear()
|
||||||
|
@ -316,6 +318,8 @@ class ManagedStream:
|
||||||
await self.blob_manager.storage.change_file_download_dir_and_file_name(
|
await self.blob_manager.storage.change_file_download_dir_and_file_name(
|
||||||
self.stream_hash, self.download_directory, self.file_name
|
self.stream_hash, self.download_directory, self.file_name
|
||||||
)
|
)
|
||||||
|
self.update_status(ManagedStream.STATUS_RUNNING)
|
||||||
|
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING)
|
||||||
self.written_bytes = 0
|
self.written_bytes = 0
|
||||||
self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
|
self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
|
||||||
|
|
||||||
|
|
|
@ -337,15 +337,17 @@ class StreamManager:
|
||||||
if 'error' in resolved:
|
if 'error' in resolved:
|
||||||
raise ResolveError(f"error resolving stream: {resolved['error']}")
|
raise ResolveError(f"error resolving stream: {resolved['error']}")
|
||||||
|
|
||||||
claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
|
claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
|
||||||
outpoint = f"{resolved['txid']}:{resolved['nout']}"
|
outpoint = f"{resolved['txid']}:{resolved['nout']}"
|
||||||
resolved_time = self.loop.time() - start_time
|
resolved_time = self.loop.time() - start_time
|
||||||
|
|
||||||
# resume or update an existing stream, if the stream changed download it and delete the old one after
|
# resume or update an existing stream, if the stream changed download it and delete the old one after
|
||||||
updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
|
updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
|
||||||
if updated_stream:
|
if updated_stream:
|
||||||
return updated_stream
|
return updated_stream
|
||||||
|
|
||||||
|
content_fee = None
|
||||||
|
|
||||||
# check that the fee is payable
|
# check that the fee is payable
|
||||||
if not to_replace and claim.stream.has_fee:
|
if not to_replace and claim.stream.has_fee:
|
||||||
fee_amount = round(exchange_rate_manager.convert_currency(
|
fee_amount = round(exchange_rate_manager.convert_currency(
|
||||||
|
@ -364,9 +366,11 @@ class StreamManager:
|
||||||
log.warning(msg)
|
log.warning(msg)
|
||||||
raise InsufficientFundsError(msg)
|
raise InsufficientFundsError(msg)
|
||||||
fee_address = claim.stream.fee.address
|
fee_address = claim.stream.fee.address
|
||||||
await self.wallet.send_amount_to_address(
|
|
||||||
|
content_fee = await self.wallet.send_amount_to_address(
|
||||||
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')
|
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info("paid fee of %s for %s", fee_amount, uri)
|
log.info("paid fee of %s for %s", fee_amount, uri)
|
||||||
|
|
||||||
download_directory = download_directory or self.config.download_dir
|
download_directory = download_directory or self.config.download_dir
|
||||||
|
@ -374,7 +378,8 @@ class StreamManager:
|
||||||
download_dir, file_name = None, None
|
download_dir, file_name = None, None
|
||||||
stream = ManagedStream(
|
stream = ManagedStream(
|
||||||
self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
|
self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
|
||||||
file_name, ManagedStream.STATUS_RUNNING, analytics_manager=self.analytics_manager
|
file_name, ManagedStream.STATUS_RUNNING, content_fee=content_fee,
|
||||||
|
analytics_manager=self.analytics_manager
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(stream.setup(
|
await asyncio.wait_for(stream.setup(
|
||||||
|
|
|
@ -36,12 +36,12 @@ class FileCommands(CommandTestCase):
|
||||||
await self.server.blob_manager.delete_blobs(all_except_sd)
|
await self.server.blob_manager.delete_blobs(all_except_sd)
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
||||||
self.assertIn('error', resp)
|
self.assertIn('error', resp)
|
||||||
self.assertEquals('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error'])
|
self.assertEqual('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error'])
|
||||||
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
||||||
await self.server.blob_manager.delete_blobs([sd_hash])
|
await self.server.blob_manager.delete_blobs([sd_hash])
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
||||||
self.assertIn('error', resp)
|
self.assertIn('error', resp)
|
||||||
self.assertEquals('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])
|
self.assertEqual('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])
|
||||||
|
|
||||||
async def wait_files_to_complete(self):
|
async def wait_files_to_complete(self):
|
||||||
while self.sout(self.daemon.jsonrpc_file_list(status='running')):
|
while self.sout(self.daemon.jsonrpc_file_list(status='running')):
|
||||||
|
@ -66,7 +66,7 @@ class FileCommands(CommandTestCase):
|
||||||
self.assertEqual(stream.full_path, file_info['download_path'])
|
self.assertEqual(stream.full_path, file_info['download_path'])
|
||||||
|
|
||||||
async def test_incomplete_downloads_erases_output_file_on_stop(self):
|
async def test_incomplete_downloads_erases_output_file_on_stop(self):
|
||||||
tx = await self.stream_create('foo', '0.01')
|
tx = await self.stream_create('foo', '0.01', data=b'deadbeef' * 1000000)
|
||||||
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
|
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
|
||||||
file_info = self.sout(self.daemon.jsonrpc_file_list())[0]
|
file_info = self.sout(self.daemon.jsonrpc_file_list())[0]
|
||||||
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
||||||
|
@ -74,7 +74,7 @@ class FileCommands(CommandTestCase):
|
||||||
await self.server_storage.get_stream_hash_for_sd_hash(sd_hash)
|
await self.server_storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
)
|
)
|
||||||
all_except_sd_and_head = [
|
all_except_sd_and_head = [
|
||||||
blob.blob_hash for blob in blobs[1:] if blob.blob_hash
|
blob.blob_hash for blob in blobs[1:-1]
|
||||||
]
|
]
|
||||||
await self.server.blob_manager.delete_blobs(all_except_sd_and_head)
|
await self.server.blob_manager.delete_blobs(all_except_sd_and_head)
|
||||||
self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
||||||
|
@ -82,17 +82,18 @@ class FileCommands(CommandTestCase):
|
||||||
self.assertNotIn('error', resp)
|
self.assertNotIn('error', resp)
|
||||||
self.assertTrue(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
self.assertTrue(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
||||||
self.daemon.stream_manager.stop()
|
self.daemon.stream_manager.stop()
|
||||||
|
await asyncio.sleep(0, loop=self.loop)
|
||||||
self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
||||||
|
|
||||||
async def test_incomplete_downloads_retry(self):
|
async def test_incomplete_downloads_retry(self):
|
||||||
tx = await self.stream_create('foo', '0.01')
|
tx = await self.stream_create('foo', '0.01', data=b'deadbeef' * 1000000)
|
||||||
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
|
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
|
||||||
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
||||||
blobs = await self.server_storage.get_blobs_for_stream(
|
blobs = await self.server_storage.get_blobs_for_stream(
|
||||||
await self.server_storage.get_stream_hash_for_sd_hash(sd_hash)
|
await self.server_storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
)
|
)
|
||||||
all_except_sd_and_head = [
|
all_except_sd_and_head = [
|
||||||
blob.blob_hash for blob in blobs[1:] if blob.blob_hash
|
blob.blob_hash for blob in blobs[1:-1]
|
||||||
]
|
]
|
||||||
|
|
||||||
# backup server blobs
|
# backup server blobs
|
||||||
|
@ -175,7 +176,7 @@ class FileCommands(CommandTestCase):
|
||||||
await self.assertBalance(self.account, '9.925679')
|
await self.assertBalance(self.account, '9.925679')
|
||||||
response = await self.out(self.daemon.jsonrpc_get('lbry://icanpay'))
|
response = await self.out(self.daemon.jsonrpc_get('lbry://icanpay'))
|
||||||
self.assertNotIn('error', response)
|
self.assertNotIn('error', response)
|
||||||
await self.on_transaction_dict(response['tx'])
|
await self.on_transaction_dict(response['content_fee'])
|
||||||
await self.assertBalance(self.account, '8.925555')
|
await self.assertBalance(self.account, '8.925555')
|
||||||
self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1)
|
self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue