from unittest import skipIf import asyncio import os from binascii import hexlify from lbry.schema import Claim from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT from lbry.wallet import Transaction class FileCommands(CommandTestCase): def __init__(self, *a, **kw): super().__init__(*a, **kw) self.skip_libtorrent = False async def initialize_torrent(self, tx_to_update=None): if not hasattr(self, 'seeder_session'): self.seeder_session = TorrentSession(self.loop, None) self.addCleanup(self.seeder_session.stop) await self.seeder_session.bind(port=4040) btih = await self.seeder_session.add_fake_torrent() address = await self.account.receiving.get_or_create_usable_address() if not tx_to_update: claim = Claim() claim.stream.update(bt_infohash=btih) tx = await Transaction.claim_create( 'torrent', claim, 1, address, [self.account], self.account ) else: claim = tx_to_update.outputs[0].claim claim.stream.update(bt_infohash=btih) tx = await Transaction.claim_update( tx_to_update.outputs[0], claim, 1, address, [self.account], self.account ) await tx.sign([self.account]) await self.broadcast(tx) await self.confirm_tx(tx.id) self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session self.client_session._session.add_dht_node(('localhost', 4040)) self.client_session.wait_start = False # fixme: this is super slow on tests return tx, btih @skipIf(TorrentSession is None, "libtorrent not installed") async def test_download_torrent(self): tx, btih = await self.initialize_torrent() self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) # second call, see its there and move on self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih) self.assertIn(btih, self.client_session._handles) tx, new_btih = await self.initialize_torrent(tx) self.assertNotEqual(btih, new_btih) # claim now points to another torrent, update to it self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) self.assertIn(new_btih, self.client_session._handles) self.assertNotIn(btih, self.client_session._handles) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) self.assertNotIn(new_btih, self.client_session._handles) async def create_streams_in_range(self, *args, **kwargs): self.stream_claim_ids = [] for i in range(*args, **kwargs): t = await self.stream_create(f'Stream_{i}', '0.00001') self.stream_claim_ids.append(t['outputs'][0]['claim_id']) async def test_file_reflect(self): tx = await self.stream_create('mirror', '0.01') sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] self.assertEqual([], await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash)) all_except_sd = [ blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash ] await self.reflector.blob_manager.delete_blobs(all_except_sd) self.assertEqual(all_except_sd, await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash)) async def test_file_management(self): await self.stream_create('foo', '0.01') await self.stream_create('foo2', '0.01') file1, file2 = await self.file_list('claim_name') self.assertEqual(file1['claim_name'], 'foo') self.assertEqual(file2['claim_name'], 'foo2') self.assertItemCount(await self.daemon.jsonrpc_file_list(claim_id=[file1['claim_id'], file2['claim_id']]), 2) self.assertItemCount(await self.daemon.jsonrpc_file_list(claim_id=file1['claim_id']), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(outpoint=[file1['outpoint'], file2['outpoint']]), 2) self.assertItemCount(await self.daemon.jsonrpc_file_list(outpoint=file1['outpoint']), 1) await self.daemon.jsonrpc_file_delete(claim_name='foo') self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) await self.daemon.jsonrpc_file_delete(claim_name='foo2') self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) await self.daemon.jsonrpc_get('lbry://foo') self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) async def test_announces(self): # announces on publish self.assertEqual(await self.daemon.storage.get_blobs_to_announce(), []) await self.stream_create('foo', '0.01') stream = (await self.daemon.jsonrpc_file_list())["items"][0] self.assertSetEqual(set(await self.daemon.storage.get_blobs_to_announce()), {stream.sd_hash}) self.assertTrue(await self.daemon.jsonrpc_file_delete(delete_all=True)) # announces on download self.assertEqual(await self.daemon.storage.get_blobs_to_announce(), []) stream = await self.daemon.jsonrpc_get('foo') self.assertSetEqual(set(await self.daemon.storage.get_blobs_to_announce()), {stream.sd_hash}) async def _purge_file(self, claim_name, full_path): self.assertTrue( await self.daemon.jsonrpc_file_delete(claim_name=claim_name, delete_from_download_dir=True) ) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) self.assertFalse(os.path.isfile(full_path)) async def test_publish_with_illegal_chars(self): def check_prefix_suffix(name, prefix, suffix): self.assertTrue(name.startswith(prefix)) self.assertTrue(name.endswith(suffix)) # Stream a file with file name containing invalid chars claim_name = 'lolwindows' prefix, suffix = 'derp?', '.ext.' san_prefix, san_suffix = 'derp', '.ext' tx = await self.stream_create(claim_name, '0.01', prefix=prefix, suffix=suffix) stream = (await self.daemon.jsonrpc_file_list())["items"][0] claim_id = self.get_claim_id(tx) # Assert that file list and source contains the local unsanitized name, but suggested name is sanitized full_path = (await self.daemon.jsonrpc_get('lbry://' + claim_name)).full_path stream_file_name = os.path.basename(full_path) source_file_name = tx['outputs'][0]['value']['source']['name'] file_list_name = stream.file_name suggested_file_name = stream.descriptor.suggested_file_name self.assertTrue(os.path.isfile(full_path)) check_prefix_suffix(stream_file_name, prefix, suffix) self.assertEqual(stream_file_name, source_file_name) self.assertEqual(stream_file_name, file_list_name) check_prefix_suffix(suggested_file_name, san_prefix, san_suffix) await self._purge_file(claim_name, full_path) # Re-download deleted file and assert that the file name is sanitized full_path = (await self.daemon.jsonrpc_get('lbry://' + claim_name, save_file=True)).full_path stream_file_name = os.path.basename(full_path) stream = (await self.daemon.jsonrpc_file_list())["items"][0] file_list_name = stream.file_name suggested_file_name = stream.descriptor.suggested_file_name self.assertTrue(os.path.isfile(full_path)) check_prefix_suffix(stream_file_name, san_prefix, san_suffix) self.assertEqual(stream_file_name, file_list_name) self.assertEqual(stream_file_name, suggested_file_name) await self._purge_file(claim_name, full_path) # Assert that the downloaded file name is not sanitized when user provides custom file name custom_name = 'cust*m_name' full_path = (await self.daemon.jsonrpc_get( 'lbry://' + claim_name, file_name=custom_name, save_file=True)).full_path file_name_on_disk = os.path.basename(full_path) self.assertTrue(os.path.isfile(full_path)) self.assertEqual(custom_name, file_name_on_disk) # Update the stream and assert the file name is not sanitized, but the suggested file name is prefix, suffix = 'derpyderp?', '.ext.' san_prefix, san_suffix = 'derpyderp', '.ext' tx = await self.stream_update(claim_id, data=b'amazing content', prefix=prefix, suffix=suffix) full_path = (await self.daemon.jsonrpc_get('lbry://' + claim_name, save_file=True)).full_path updated_stream = (await self.daemon.jsonrpc_file_list())["items"][0] stream_file_name = os.path.basename(full_path) source_file_name = tx['outputs'][0]['value']['source']['name'] file_list_name = updated_stream.file_name suggested_file_name = updated_stream.descriptor.suggested_file_name self.assertTrue(os.path.isfile(full_path)) check_prefix_suffix(stream_file_name, prefix, suffix) self.assertEqual(stream_file_name, source_file_name) self.assertEqual(stream_file_name, file_list_name) check_prefix_suffix(suggested_file_name, san_prefix, san_suffix) async def test_file_list_fields(self): await self.stream_create('foo', '0.01') file_list = await self.file_list() self.assertEqual( file_list[0]['timestamp'], self.ledger.headers.estimated_timestamp(file_list[0]['height']) ) self.assertEqual(file_list[0]['confirmations'], -1) await self.daemon.jsonrpc_resolve('foo') file_list = await self.file_list() self.assertEqual( file_list[0]['timestamp'], self.ledger.headers.estimated_timestamp(file_list[0]['height']) ) self.assertEqual(file_list[0]['confirmations'], 1) async def test_get_doesnt_touch_user_written_files_between_calls(self): await self.stream_create('foo', '0.01', data=bytes([0] * (2 << 23))) self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo')) first_path = (await self.daemon.jsonrpc_get('lbry://foo', save_file=True)).full_path await self.wait_files_to_complete() self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo')) with open(first_path, 'wb') as f: f.write(b' ') f.flush() second_path = await self.daemon.jsonrpc_get('lbry://foo', save_file=True) await self.wait_files_to_complete() self.assertNotEqual(first_path, second_path) async def test_file_list_updated_metadata_on_resolve(self): await self.stream_create('foo', '0.01') txo = (await self.daemon.resolve(self.wallet.accounts, ['lbry://foo']))['lbry://foo'] claim = txo.claim await self.daemon.jsonrpc_file_delete(claim_name='foo') txid = await self.blockchain_claim_name('bar', hexlify(claim.to_bytes()).decode(), '0.01') await self.daemon.jsonrpc_get('lbry://bar') claim.stream.description = "fix typos, fix the world" await self.blockchain_update_name(txid, hexlify(claim.to_bytes()).decode(), '0.01') await self.daemon.jsonrpc_resolve('lbry://bar') file_list = (await self.daemon.jsonrpc_file_list())['items'] self.assertEqual(file_list[0].stream_claim_info.claim.stream.description, claim.stream.description) async def test_sourceless_content(self): # claim has no source, then it has one tx = await self.stream_create('foo', '0.01', data=None) claim_id = self.get_claim_id(tx) await self.daemon.jsonrpc_file_delete(claim_name='foo') response = await self.out(self.daemon.jsonrpc_get('lbry://foo')) self.assertIn('error', response) self.assertIn('nothing to download', response['error']) # source is set (there isn't a way to clear the source field, so we stop here for now) await self.stream_update(claim_id, data=b'surpriiiiiiiise') response = await self.out(self.daemon.jsonrpc_get('lbry://foo')) self.assertNotIn('error', response) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) async def test_file_list_paginated_output(self): await self.create_streams_in_range(0, 20) page = await self.file_list(page_size=20) page_claim_ids = [item['claim_id'] for item in page] self.assertListEqual(page_claim_ids, self.stream_claim_ids) page = await self.file_list(page_size=6) page_claim_ids = [item['claim_id'] for item in page] self.assertListEqual(page_claim_ids, self.stream_claim_ids[:6]) page = await self.file_list(page_size=6, page=2) page_claim_ids = [item['claim_id'] for item in page] self.assertListEqual(page_claim_ids, self.stream_claim_ids[6:12]) out_of_bounds = await self.file_list(page=5, page_size=6) self.assertEqual(out_of_bounds, []) complete = await self.daemon.jsonrpc_file_list() self.assertEqual(complete['total_pages'], 1) self.assertEqual(complete['total_items'], 20) page = await self.daemon.jsonrpc_file_list(page_size=10, page=1) self.assertEqual(page['total_pages'], 2) self.assertEqual(page['total_items'], 20) self.assertEqual(page['page'], 1) full = await self.out(self.daemon.jsonrpc_file_list(page_size=20, page=1)) page1 = await self.file_list(page=1, page_size=10) page2 = await self.file_list(page=2, page_size=10) self.assertEqual(page1 + page2, full['items']) async def test_download_different_timeouts(self): tx = await self.stream_create('foo', '0.01') sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] await self.daemon.jsonrpc_file_delete(claim_name='foo') all_except_sd = [ blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash ] await self.server.blob_manager.delete_blobs(all_except_sd) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) self.assertEqual('Failed to download data blobs for sd hash %s within timeout.' % sd_hash, resp['error']) self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file") await self.server.blob_manager.delete_blobs([sd_hash]) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) self.assertEqual('Failed to download sd blob %s within timeout.' % sd_hash, resp['error']) async def wait_files_to_complete(self): while await self.file_list(status='running'): await asyncio.sleep(0.01) async def test_filename_conflicts_management_on_resume_download(self): await self.stream_create('foo', '0.01', data=bytes([0] * (1 << 23))) file_info = (await self.file_list())[0] original_path = os.path.join(self.daemon.conf.download_dir, file_info['file_name']) await self.daemon.jsonrpc_file_delete(claim_name='foo') await self.daemon.jsonrpc_get('lbry://foo') with open(original_path, 'wb') as handle: handle.write(b'some other stuff was there instead') self.daemon.file_manager.stop() await self.daemon.file_manager.start() await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed # check that internal state got through up to the file list API stream = self.daemon.file_manager.get_filtered(stream_hash=file_info['stream_hash'])[0] file_info = (await self.file_list())[0] self.assertEqual(stream.file_name, file_info['file_name']) # checks if what the API shows is what he have at the very internal level. self.assertEqual(stream.full_path, file_info['download_path']) async def test_incomplete_downloads_erases_output_file_on_stop(self): tx = await self.stream_create('foo', '0.01', data=b'deadbeef' * 1000000) sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] file_info = (await self.file_list())[0] blobs = await self.daemon.storage.get_blobs_for_stream( await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) ) await self.daemon.jsonrpc_file_delete(claim_name='foo') self.assertEqual(5, len(blobs)) all_except_sd_and_head = [ blob.blob_hash for blob in blobs[1:-1] ] await self.server.blob_manager.delete_blobs(all_except_sd_and_head) path = os.path.join(self.daemon.conf.download_dir, file_info['file_name']) self.assertFalse(os.path.isfile(path)) resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) self.assertNotIn('error', resp) self.assertTrue(os.path.isfile(path)) self.daemon.file_manager.stop() await asyncio.sleep(0.01, loop=self.loop) # FIXME: this sleep should not be needed self.assertFalse(os.path.isfile(path)) async def test_incomplete_downloads_retry(self): tx = await self.stream_create('foo', '0.01', data=b'deadbeef' * 1000000) sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] blobs = await self.daemon.storage.get_blobs_for_stream( await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) ) self.assertEqual(5, len(blobs)) await self.daemon.jsonrpc_file_delete(claim_name='foo') all_except_sd_and_head = [ blob.blob_hash for blob in blobs[1:-1] ] # backup server blobs for blob_hash in all_except_sd_and_head: blob = self.server_blob_manager.get_blob(blob_hash) os.rename(blob.file_path, blob.file_path + '__') # erase all except sd blob await self.server.blob_manager.delete_blobs(all_except_sd_and_head) # start the download resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) self.assertNotIn('error', resp) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual('running', (await self.file_list())[0]['status']) # recover blobs for blob_hash in all_except_sd_and_head: blob = self.server_blob_manager.get_blob(blob_hash) os.rename(blob.file_path + '__', blob.file_path) self.server_blob_manager.blobs.clear() await self.server_blob_manager.blob_completed(self.server_blob_manager.get_blob(blob_hash)) await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) file_info = (await self.file_list())[0] self.assertEqual(file_info['blobs_completed'], file_info['blobs_in_stream']) self.assertEqual('finished', file_info['status']) async def test_paid_download(self): target_address = await self.blockchain.get_raw_change_address() # FAIL: beyond available balance await self.stream_create( 'expensive', '0.01', data=b'pay me if you can', fee_currency='LBC', fee_amount='11.0', fee_address=target_address, claim_address=target_address ) await self.daemon.jsonrpc_file_delete(claim_name='expensive') response = await self.out(self.daemon.jsonrpc_get('lbry://expensive')) self.assertEqual(response['error'], 'Not enough funds to cover this transaction.') self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) # FAIL: beyond maximum key fee await self.stream_create( 'maxkey', '0.01', data=b'no pay me, no', fee_currency='LBC', fee_amount='111.0', fee_address=target_address, claim_address=target_address ) await self.daemon.jsonrpc_file_delete(claim_name='maxkey') response = await self.out(self.daemon.jsonrpc_get('lbry://maxkey')) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) self.assertEqual( response['error'], 'Purchase price of 111.0 LBC exceeds maximum configured price of 100.0 LBC (50.0 USD).' ) # PASS: purchase is successful await self.stream_create( 'icanpay', '0.01', data=b'I got the power!', fee_currency='LBC', fee_amount='1.0', fee_address=target_address, claim_address=target_address ) await self.daemon.jsonrpc_file_delete(claim_name='icanpay') await self.assertBalance(self.account, '9.925679') response = await self.daemon.jsonrpc_get('lbry://icanpay') raw_content_fee = response.content_fee.raw await self.ledger.wait(response.content_fee) await self.assertBalance(self.account, '8.925538') self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) await asyncio.wait_for(self.wait_files_to_complete(), timeout=1) # check that the fee was received starting_balance = float(await self.blockchain.get_balance()) await self.generate(1) block_reward_and_claim_fee = 2.0 self.assertEqual( float(await self.blockchain.get_balance()), starting_balance + block_reward_and_claim_fee ) # restart the daemon and make sure the fee is still there self.daemon.file_manager.stop() await self.daemon.file_manager.start() self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) await self.daemon.jsonrpc_file_delete(claim_name='icanpay') # PASS: no fee address --> use the claim address to pay tx = await self.stream_create( 'nofeeaddress', '0.01', data=b'free stuff?', ) await self.__raw_value_update_no_fee_address( tx, fee_amount='2.0', fee_currency='LBC', claim_address=target_address ) await self.daemon.jsonrpc_file_delete(claim_name='nofeeaddress') self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) response = await self.out(self.daemon.jsonrpc_get('lbry://nofeeaddress')) self.assertIsNone((await self.daemon.jsonrpc_file_list())['items'][0].stream_claim_info.claim.stream.fee.address) self.assertIsNotNone(response['content_fee']) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual(response['content_fee']['outputs'][0]['amount'], '2.0') self.assertEqual(response['content_fee']['outputs'][0]['address'], target_address) async def test_null_max_key_fee(self): target_address = await self.blockchain.get_raw_change_address() self.daemon.conf.max_key_fee = None await self.stream_create( 'somename', '0.5', data=b'Yes, please', fee_currency='LBC', fee_amount='1.0', fee_address=target_address, claim_address=target_address ) self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='somename')) # Assert the fee and bid are subtracted await self.assertBalance(self.account, '9.483893') response = await self.daemon.jsonrpc_get('lbry://somename') await self.ledger.wait(response.content_fee) await self.assertBalance(self.account, '8.483752') # Assert the file downloads await asyncio.wait_for(self.wait_files_to_complete(), timeout=1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) # Assert the transaction is recorded to the blockchain starting_balance = float(await self.blockchain.get_balance()) await self.generate(1) block_reward_and_claim_fee = 2.0 self.assertEqual( float(await self.blockchain.get_balance()), starting_balance + block_reward_and_claim_fee ) async def test_null_fee(self): target_address = await self.blockchain.get_raw_change_address() tx = await self.stream_create( 'nullfee', '0.01', data=b'no pay me, no', fee_currency='LBC', fee_address=target_address, fee_amount='1.0' ) await self.__raw_value_update_no_fee_amount(tx, target_address) await self.daemon.jsonrpc_file_delete(claim_name='nullfee') response = await self.daemon.jsonrpc_get('lbry://nullfee') self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertIsNone(response.content_fee) self.assertTrue(response.stream_claim_info.claim.stream.has_fee) self.assertDictEqual( response.stream_claim_info.claim.stream.to_dict()['fee'], {'currency': 'LBC', 'address': target_address} ) await self.daemon.jsonrpc_file_delete(claim_name='nullfee') async def __raw_value_update_no_fee_address(self, tx, claim_address, **kwargs): tx = await self.daemon.jsonrpc_stream_update( self.get_claim_id(tx), preview=True, claim_address=claim_address, **kwargs ) tx.outputs[0].claim.stream.fee.address_bytes = b'' tx.outputs[0].script.generate() await tx.sign([self.account]) await self.broadcast(tx) await self.confirm_tx(tx.id) async def __raw_value_update_no_fee_amount(self, tx, claim_address): tx = await self.daemon.jsonrpc_stream_update( self.get_claim_id(tx), preview=True, fee_currency='LBC', fee_amount='1.0', fee_address=claim_address, claim_address=claim_address ) tx.outputs[0].claim.stream.fee.message.ClearField('amount') tx.outputs[0].script.generate() await tx.sign([self.account]) await self.broadcast(tx) await self.confirm_tx(tx.id) class DiskSpaceManagement(CommandTestCase): async def get_referenced_blobs(self, tx): sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] stream_hash = await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) return tx['outputs'][0]['value']['source']['sd_hash'], set(await self.blob_list( stream_hash=stream_hash )) async def test_file_management(self): status = await self.status() self.assertIn('disk_space', status) self.assertEqual(0, status['disk_space']['total_used_mb']) self.assertEqual(True, status['disk_space']['running']) sd_hash1, blobs1 = await self.get_referenced_blobs( await self.stream_create('foo1', '0.01', data=('0' * 2 * 1024 * 1024).encode()) ) sd_hash2, blobs2 = await self.get_referenced_blobs( await self.stream_create('foo2', '0.01', data=('0' * 3 * 1024 * 1024).encode()) ) sd_hash3, blobs3 = await self.get_referenced_blobs( await self.stream_create('foo3', '0.01', data=('0' * 3 * 1024 * 1024).encode()) ) sd_hash4, blobs4 = await self.get_referenced_blobs( await self.stream_create('foo4', '0.01', data=('0' * 2 * 1024 * 1024).encode()) ) await self.daemon.storage.update_blob_ownership(sd_hash1, False) await self.daemon.storage.update_blob_ownership(sd_hash3, False) await self.daemon.storage.update_blob_ownership(sd_hash4, False) await self.blob_clean() # just to refresh caches, has no effect self.assertEqual(7, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(10, (await self.status())['disk_space']['total_used_mb']) self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) await self.blob_clean() self.assertEqual(10, (await self.status())['disk_space']['total_used_mb']) self.assertEqual(7, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(3, (await self.status())['disk_space']['published_blobs_storage_used_mb']) self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) self.daemon.conf.blob_storage_limit = 6 await self.blob_clean() self.assertEqual(5, (await self.status())['disk_space']['total_used_mb']) self.assertEqual(2, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(3, (await self.status())['disk_space']['published_blobs_storage_used_mb']) blobs = set(await self.blob_list()) self.assertFalse(blobs1.issubset(blobs)) self.assertTrue(blobs2.issubset(blobs)) self.assertFalse(blobs3.issubset(blobs)) self.assertTrue(blobs4.issubset(blobs)) # check that added_on gets set on downloads (was a bug) self.assertLess(0, await self.daemon.storage.run_and_return_one_or_none("select min(added_on) from blob")) await self.daemon.jsonrpc_file_delete(delete_all=True) await self.daemon.jsonrpc_get("foo4", save_file=False) self.assertLess(0, await self.daemon.storage.run_and_return_one_or_none("select min(added_on) from blob")) class TestBackgroundDownloaderComponent(CommandTestCase): async def get_blobs_from_sd_blob(self, sd_blob): descriptor = await StreamDescriptor.from_stream_descriptor_blob( asyncio.get_running_loop(), self.daemon.blob_manager.blob_dir, sd_blob ) return descriptor.blobs async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) for sd_hash in sd_hashes: sd_blob = self.daemon.blob_manager.get_blob(sd_hash) self.assertTrue(sd_blob.get_is_verified()) blobs = await self.get_blobs_from_sd_blob(sd_blob) for blob in blobs[:-1]: self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) if no_files: self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) self.assertEqual(0, len(await self.file_list())) async def clear(self): await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) async def test_download(self): content1 = await self.stream_create('content1', '0.01', data=bytes([0] * 32 * 1024 * 1024)) content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', data=bytes([0] * 16 * 1024 * 1024)) content2 = content2['outputs'][0]['value']['source']['sd_hash'] self.assertEqual(48, (await self.status())['disk_space']['published_blobs_storage_used_mb']) self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) background_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager) self.daemon.conf.network_storage_limit = 32 await self.clear() await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['total_used_mb']) await background_downloader.download_blobs(content1) await self.assertBlobs(content1) await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(32, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) self.daemon.conf.network_storage_limit = 48 await background_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(48, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) await self.clear() await background_downloader.download_blobs(content2) await self.assertBlobs(content2) await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(16, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) await self.clear() await background_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) # test that disk space manager doesn't delete orphan network blobs await background_downloader.download_blobs(content1) await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning await self.daemon.jsonrpc_get("content2", save_file=False) while (await self.file_list())[0]['status'] != 'stopped': await asyncio.sleep(0.5) await self.assertBlobs(content1, no_files=False) self.daemon.conf.blob_storage_limit = 1 await self.blob_clean() await self.assertBlobs(content1, no_files=False) self.daemon.conf.network_storage_limit = 0 await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['seed_blobs_storage_used_mb'])