diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index ca0a30532..cbc3d6be1 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -16,6 +16,7 @@ if typing.TYPE_CHECKING: from lbrynet.blob.blob_info import BlobInfo from lbrynet.dht.node import Node from lbrynet.extras.daemon.analytics import AnalyticsManager + from lbrynet.wallet.transaction import Transaction log = logging.getLogger(__name__) @@ -44,6 +45,7 @@ class ManagedStream: status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, descriptor: typing.Optional[StreamDescriptor] = None, + content_fee: typing.Optional['Transaction'] = None, analytics_manager: typing.Optional['AnalyticsManager'] = None): self.loop = loop self.config = config @@ -56,6 +58,7 @@ class ManagedStream: self.download_id = download_id or binascii.hexlify(generate_id()).decode() self.rowid = rowid self.written_bytes = 0 + self.content_fee = content_fee self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager self.fully_reflected = asyncio.Event(loop=self.loop) @@ -126,7 +129,7 @@ class ManagedStream: return None if not self.stream_claim_info else self.stream_claim_info.claim_name @property - def metadata(self) ->typing.Optional[typing.Dict]: + def metadata(self) -> typing.Optional[typing.Dict]: return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict() @property @@ -158,16 +161,13 @@ class ManagedStream: @property def mime_type(self): - return guess_media_type(os.path.basename(self.descriptor.suggested_file_name)) + return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] def as_dict(self) -> typing.Dict: - full_path = self.full_path if self.output_file_exists else None - mime_type = guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] - if self.written_bytes: written_bytes = self.written_bytes - elif full_path: - written_bytes = os.stat(full_path).st_size + elif self.output_file_exists: + written_bytes = os.stat(self.full_path).st_size else: written_bytes = None return { @@ -180,7 +180,7 @@ class ManagedStream: 'stream_name': self.descriptor.stream_name, 'suggested_file_name': self.descriptor.suggested_file_name, 'sd_hash': self.descriptor.sd_hash, - 'download_path': full_path, + 'download_path': self.full_path, 'mime_type': self.mime_type, 'key': self.descriptor.key, 'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(), @@ -198,7 +198,8 @@ class ManagedStream: 'protobuf': self.metadata_protobuf, 'channel_claim_id': self.channel_claim_id, 'channel_name': self.channel_name, - 'claim_name': self.claim_name + 'claim_name': self.claim_name, + 'content_fee': self.content_fee # TODO: this isn't in the database } @classmethod @@ -228,12 +229,12 @@ class ManagedStream: self.rowid = self.blob_manager.storage.save_downloaded_file( self.stream_hash, None, None, 0.0 ) + self.update_status(ManagedStream.STATUS_RUNNING) + await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) self.update_delayed_stop() else: await self.save_file(file_name, download_directory) await self.started_writing.wait() - self.update_status(ManagedStream.STATUS_RUNNING) - await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) def update_delayed_stop(self): def _delayed_stop(): @@ -261,6 +262,7 @@ class ManagedStream: raise async def _save_file(self, output_path: str): + log.debug("save file %s -> %s", self.sd_hash, output_path) self.saving.set() self.finished_writing.clear() self.started_writing.clear() @@ -316,6 +318,8 @@ class ManagedStream: await self.blob_manager.storage.change_file_download_dir_and_file_name( self.stream_hash, self.download_directory, self.file_name ) + self.update_status(ManagedStream.STATUS_RUNNING) + await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) self.written_bytes = 0 self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index d339aa9a5..730df8078 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -337,15 +337,17 @@ class StreamManager: if 'error' in resolved: raise ResolveError(f"error resolving stream: {resolved['error']}") - claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) - outpoint = f"{resolved['txid']}:{resolved['nout']}" - resolved_time = self.loop.time() - start_time + claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) + outpoint = f"{resolved['txid']}:{resolved['nout']}" + resolved_time = self.loop.time() - start_time # resume or update an existing stream, if the stream changed download it and delete the old one after updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) if updated_stream: return updated_stream + content_fee = None + # check that the fee is payable if not to_replace and claim.stream.has_fee: fee_amount = round(exchange_rate_manager.convert_currency( @@ -364,9 +366,11 @@ class StreamManager: log.warning(msg) raise InsufficientFundsError(msg) fee_address = claim.stream.fee.address - await self.wallet.send_amount_to_address( + + content_fee = await self.wallet.send_amount_to_address( lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1') ) + log.info("paid fee of %s for %s", fee_amount, uri) download_directory = download_directory or self.config.download_dir @@ -374,7 +378,8 @@ class StreamManager: download_dir, file_name = None, None stream = ManagedStream( self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, - file_name, ManagedStream.STATUS_RUNNING, analytics_manager=self.analytics_manager + file_name, ManagedStream.STATUS_RUNNING, content_fee=content_fee, + analytics_manager=self.analytics_manager ) try: await asyncio.wait_for(stream.setup( diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 7d4e41307..5fbc9eaff 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -36,12 +36,12 @@ class FileCommands(CommandTestCase): await self.server.blob_manager.delete_blobs(all_except_sd) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) self.assertIn('error', resp) - self.assertEquals('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error']) + self.assertEqual('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error']) await self.daemon.jsonrpc_file_delete(claim_name='foo') await self.server.blob_manager.delete_blobs([sd_hash]) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) self.assertIn('error', resp) - self.assertEquals('Failed to download sd blob %s within timeout' % sd_hash, resp['error']) + self.assertEqual('Failed to download sd blob %s within timeout' % sd_hash, resp['error']) async def wait_files_to_complete(self): while self.sout(self.daemon.jsonrpc_file_list(status='running')): @@ -66,7 +66,7 @@ class FileCommands(CommandTestCase): self.assertEqual(stream.full_path, file_info['download_path']) async def test_incomplete_downloads_erases_output_file_on_stop(self): - tx = await self.stream_create('foo', '0.01') + tx = await self.stream_create('foo', '0.01', data=b'deadbeef' * 1000000) sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] file_info = self.sout(self.daemon.jsonrpc_file_list())[0] await self.daemon.jsonrpc_file_delete(claim_name='foo') @@ -74,7 +74,7 @@ class FileCommands(CommandTestCase): await self.server_storage.get_stream_hash_for_sd_hash(sd_hash) ) all_except_sd_and_head = [ - blob.blob_hash for blob in blobs[1:] if blob.blob_hash + blob.blob_hash for blob in blobs[1:-1] ] await self.server.blob_manager.delete_blobs(all_except_sd_and_head) self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name']))) @@ -82,17 +82,18 @@ class FileCommands(CommandTestCase): self.assertNotIn('error', resp) self.assertTrue(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name']))) self.daemon.stream_manager.stop() + await asyncio.sleep(0, loop=self.loop) self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name']))) async def test_incomplete_downloads_retry(self): - tx = await self.stream_create('foo', '0.01') + tx = await self.stream_create('foo', '0.01', data=b'deadbeef' * 1000000) sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] await self.daemon.jsonrpc_file_delete(claim_name='foo') blobs = await self.server_storage.get_blobs_for_stream( await self.server_storage.get_stream_hash_for_sd_hash(sd_hash) ) all_except_sd_and_head = [ - blob.blob_hash for blob in blobs[1:] if blob.blob_hash + blob.blob_hash for blob in blobs[1:-1] ] # backup server blobs @@ -175,7 +176,7 @@ class FileCommands(CommandTestCase): await self.assertBalance(self.account, '9.925679') response = await self.out(self.daemon.jsonrpc_get('lbry://icanpay')) self.assertNotIn('error', response) - await self.on_transaction_dict(response['tx']) + await self.on_transaction_dict(response['content_fee']) await self.assertBalance(self.account, '8.925555') self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1)