diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index 42ed3a856..8b7b9ee49 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -356,7 +356,7 @@ class BlobFile(AbstractBlob): @classmethod async def create_from_unencrypted( cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, - unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool, + unencrypted: bytes, blob_num: int, added_on: float, is_mine: bool, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None ) -> BlobInfo: if not blob_dir or not os.path.isdir(blob_dir): diff --git a/lbry/blob/blob_manager.py b/lbry/blob/blob_manager.py index c99a017b9..52441ecfb 100644 --- a/lbry/blob/blob_manager.py +++ b/lbry/blob/blob_manager.py @@ -36,30 +36,30 @@ class BlobManager: self.config.blob_lru_cache_size) self.connection_manager = ConnectionManager(loop) - def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None): + def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None, is_mine: bool = False): if self.config.save_blobs or ( is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))): return BlobFile( - self.loop, blob_hash, length, self.blob_completed, self.blob_dir + self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine ) return BlobBuffer( - self.loop, blob_hash, length, self.blob_completed, self.blob_dir + self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine ) - def get_blob(self, blob_hash, length: typing.Optional[int] = None): + def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool = False): if blob_hash in self.blobs: if self.config.save_blobs and isinstance(self.blobs[blob_hash], BlobBuffer): buffer = self.blobs.pop(blob_hash) if blob_hash in self.completed_blob_hashes: self.completed_blob_hashes.remove(blob_hash) - self.blobs[blob_hash] = self._get_blob(blob_hash, length) + self.blobs[blob_hash] = self._get_blob(blob_hash, length, is_mine) if buffer.is_readable(): with buffer.reader_context() as reader: self.blobs[blob_hash].write_blob(reader.read()) if length and self.blobs[blob_hash].length is None: self.blobs[blob_hash].set_length(length) else: - self.blobs[blob_hash] = self._get_blob(blob_hash, length) + self.blobs[blob_hash] = self._get_blob(blob_hash, length, is_mine) return self.blobs[blob_hash] def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool: diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 4ed445129..7d00323a6 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -34,6 +34,7 @@ class DiskSpaceManager: if available > 0: break if delete: + await self.db.stop_all_files() await self.blob_manager.delete_blobs(delete, delete_from_db=True) return len(delete) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 4b51074ec..758c25970 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -457,7 +457,7 @@ class SQLiteStorage(SQLiteMixin): await self.db.execute_fetchall( "update blob set is_mine = ? where blob_hash in (" " select blob_hash from blob natural join stream_blob natural join stream where sd_hash = ?" - ")", (is_mine, sd_hash) + ") OR blob_hash = ?", (is_mine, sd_hash, sd_hash) ) def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]: @@ -595,6 +595,10 @@ class SQLiteStorage(SQLiteMixin): log.debug("update file status %s -> %s", stream_hash, new_status) return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash)) + def stop_all_files(self): + log.debug("stopping all files") + return self.db.execute_fetchall("update file set status=?", ("stopped",)) + async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str], file_name: typing.Optional[str]): if not file_name or not download_dir: diff --git a/lbry/stream/descriptor.py b/lbry/stream/descriptor.py index bd0cd2d97..b68184433 100644 --- a/lbry/stream/descriptor.py +++ b/lbry/stream/descriptor.py @@ -153,15 +153,19 @@ class StreamDescriptor: h.update(self.old_sort_json()) return h.hexdigest() - async def make_sd_blob(self, blob_file_obj: typing.Optional[AbstractBlob] = None, - old_sort: typing.Optional[bool] = False, - blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None): + async def make_sd_blob( + self, blob_file_obj: typing.Optional[AbstractBlob] = None, old_sort: typing.Optional[bool] = False, + blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None, + added_on: float = None, is_mine: bool = False + ): sd_hash = self.calculate_sd_hash() if not old_sort else self.calculate_old_sort_sd_hash() if not old_sort: sd_data = self.as_json() else: sd_data = self.old_sort_json() - sd_blob = blob_file_obj or BlobFile(self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir) + sd_blob = blob_file_obj or BlobFile( + self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir, added_on, is_mine + ) if blob_file_obj: blob_file_obj.set_length(len(sd_data)) if not sd_blob.get_is_verified(): @@ -269,7 +273,9 @@ class StreamDescriptor: descriptor = cls( loop, blob_dir, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs ) - sd_blob = await descriptor.make_sd_blob(old_sort=old_sort, blob_completed_callback=blob_completed_callback) + sd_blob = await descriptor.make_sd_blob( + old_sort=old_sort, blob_completed_callback=blob_completed_callback, added_on=added_on, is_mine=True + ) descriptor.sd_hash = sd_blob.blob_hash return descriptor diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index a9ce211e6..cce283473 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -236,7 +236,7 @@ class StreamManager(SourceManager): blob_completed_callback=self.blob_manager.blob_completed ) await self.storage.store_stream( - self.blob_manager.get_blob(descriptor.sd_hash), descriptor + self.blob_manager.get_blob(descriptor.sd_hash, is_mine=True), descriptor ) row_id = await self.storage.save_published_file( descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0 diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 994ad3506..e1e92aead 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -528,13 +528,13 @@ class DiskSpaceManagement(CommandTestCase): self.assertEqual('0', status['disk_space']['space_used']) self.assertEqual(True, status['disk_space']['running']) sd_hash1, blobs1 = await self.get_referenced_blobs( - await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode()) + await self.stream_create('foo1', '0.01', data=('0' * 2 * 1024 * 1024).encode()) ) sd_hash2, blobs2 = await self.get_referenced_blobs( - await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode()) + await self.stream_create('foo2', '0.01', data=('0' * 3 * 1024 * 1024).encode()) ) sd_hash3, blobs3 = await self.get_referenced_blobs( - await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode()) + await self.stream_create('foo3', '0.01', data=('0' * 3 * 1024 * 1024).encode()) ) sd_hash4, blobs4 = await self.get_referenced_blobs( await self.stream_create('foo4', '0.01', data=('0' * 2 * 1024 * 1024).encode()) @@ -544,17 +544,20 @@ class DiskSpaceManagement(CommandTestCase): await self.daemon.storage.update_blob_ownership(sd_hash3, False) await self.daemon.storage.update_blob_ownership(sd_hash4, False) - self.assertEqual('9', (await self.status())['disk_space']['space_used']) + self.assertEqual('10', (await self.status())['disk_space']['space_used']) self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) await self.blob_clean() - self.assertEqual('9', (await self.status())['disk_space']['space_used']) + self.assertEqual('10', (await self.status())['disk_space']['space_used']) self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) - self.daemon.conf.blob_storage_limit = 5 + self.daemon.conf.blob_storage_limit = 6 await self.blob_clean() - self.assertEqual('4', (await self.status())['disk_space']['space_used']) + self.assertEqual('5', (await self.status())['disk_space']['space_used']) blobs = set(await self.blob_list()) - self.assertEqual(blobs2 | blobs4, set(await self.blob_list())) + self.assertFalse(blobs1.issubset(blobs)) + self.assertTrue(blobs2.issubset(blobs)) + self.assertFalse(blobs3.issubset(blobs)) + self.assertTrue(blobs4.issubset(blobs))