during disk clean your own sd blob is now kept and file status of deleted files is set to stopped
This commit is contained in:
parent
bbcdc881cd
commit
c66b1646a6
7 changed files with 36 additions and 22 deletions
|
@ -356,7 +356,7 @@ class BlobFile(AbstractBlob):
|
|||
@classmethod
|
||||
async def create_from_unencrypted(
|
||||
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
|
||||
unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool,
|
||||
unencrypted: bytes, blob_num: int, added_on: float, is_mine: bool,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None
|
||||
) -> BlobInfo:
|
||||
if not blob_dir or not os.path.isdir(blob_dir):
|
||||
|
|
|
@ -36,30 +36,30 @@ class BlobManager:
|
|||
self.config.blob_lru_cache_size)
|
||||
self.connection_manager = ConnectionManager(loop)
|
||||
|
||||
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
|
||||
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None, is_mine: bool = False):
|
||||
if self.config.save_blobs or (
|
||||
is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))):
|
||||
return BlobFile(
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
|
||||
)
|
||||
return BlobBuffer(
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir
|
||||
self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
|
||||
)
|
||||
|
||||
def get_blob(self, blob_hash, length: typing.Optional[int] = None):
|
||||
def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool = False):
|
||||
if blob_hash in self.blobs:
|
||||
if self.config.save_blobs and isinstance(self.blobs[blob_hash], BlobBuffer):
|
||||
buffer = self.blobs.pop(blob_hash)
|
||||
if blob_hash in self.completed_blob_hashes:
|
||||
self.completed_blob_hashes.remove(blob_hash)
|
||||
self.blobs[blob_hash] = self._get_blob(blob_hash, length)
|
||||
self.blobs[blob_hash] = self._get_blob(blob_hash, length, is_mine)
|
||||
if buffer.is_readable():
|
||||
with buffer.reader_context() as reader:
|
||||
self.blobs[blob_hash].write_blob(reader.read())
|
||||
if length and self.blobs[blob_hash].length is None:
|
||||
self.blobs[blob_hash].set_length(length)
|
||||
else:
|
||||
self.blobs[blob_hash] = self._get_blob(blob_hash, length)
|
||||
self.blobs[blob_hash] = self._get_blob(blob_hash, length, is_mine)
|
||||
return self.blobs[blob_hash]
|
||||
|
||||
def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool:
|
||||
|
|
|
@ -34,6 +34,7 @@ class DiskSpaceManager:
|
|||
if available > 0:
|
||||
break
|
||||
if delete:
|
||||
await self.db.stop_all_files()
|
||||
await self.blob_manager.delete_blobs(delete, delete_from_db=True)
|
||||
return len(delete)
|
||||
|
||||
|
|
|
@ -457,7 +457,7 @@ class SQLiteStorage(SQLiteMixin):
|
|||
await self.db.execute_fetchall(
|
||||
"update blob set is_mine = ? where blob_hash in ("
|
||||
" select blob_hash from blob natural join stream_blob natural join stream where sd_hash = ?"
|
||||
")", (is_mine, sd_hash)
|
||||
") OR blob_hash = ?", (is_mine, sd_hash, sd_hash)
|
||||
)
|
||||
|
||||
def sync_missing_blobs(self, blob_files: typing.Set[str]) -> typing.Awaitable[typing.Set[str]]:
|
||||
|
@ -595,6 +595,10 @@ class SQLiteStorage(SQLiteMixin):
|
|||
log.debug("update file status %s -> %s", stream_hash, new_status)
|
||||
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
||||
|
||||
def stop_all_files(self):
|
||||
log.debug("stopping all files")
|
||||
return self.db.execute_fetchall("update file set status=?", ("stopped",))
|
||||
|
||||
async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str],
|
||||
file_name: typing.Optional[str]):
|
||||
if not file_name or not download_dir:
|
||||
|
|
|
@ -153,15 +153,19 @@ class StreamDescriptor:
|
|||
h.update(self.old_sort_json())
|
||||
return h.hexdigest()
|
||||
|
||||
async def make_sd_blob(self, blob_file_obj: typing.Optional[AbstractBlob] = None,
|
||||
old_sort: typing.Optional[bool] = False,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None):
|
||||
async def make_sd_blob(
|
||||
self, blob_file_obj: typing.Optional[AbstractBlob] = None, old_sort: typing.Optional[bool] = False,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
|
||||
added_on: float = None, is_mine: bool = False
|
||||
):
|
||||
sd_hash = self.calculate_sd_hash() if not old_sort else self.calculate_old_sort_sd_hash()
|
||||
if not old_sort:
|
||||
sd_data = self.as_json()
|
||||
else:
|
||||
sd_data = self.old_sort_json()
|
||||
sd_blob = blob_file_obj or BlobFile(self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir)
|
||||
sd_blob = blob_file_obj or BlobFile(
|
||||
self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir, added_on, is_mine
|
||||
)
|
||||
if blob_file_obj:
|
||||
blob_file_obj.set_length(len(sd_data))
|
||||
if not sd_blob.get_is_verified():
|
||||
|
@ -269,7 +273,9 @@ class StreamDescriptor:
|
|||
descriptor = cls(
|
||||
loop, blob_dir, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs
|
||||
)
|
||||
sd_blob = await descriptor.make_sd_blob(old_sort=old_sort, blob_completed_callback=blob_completed_callback)
|
||||
sd_blob = await descriptor.make_sd_blob(
|
||||
old_sort=old_sort, blob_completed_callback=blob_completed_callback, added_on=added_on, is_mine=True
|
||||
)
|
||||
descriptor.sd_hash = sd_blob.blob_hash
|
||||
return descriptor
|
||||
|
||||
|
|
|
@ -236,7 +236,7 @@ class StreamManager(SourceManager):
|
|||
blob_completed_callback=self.blob_manager.blob_completed
|
||||
)
|
||||
await self.storage.store_stream(
|
||||
self.blob_manager.get_blob(descriptor.sd_hash), descriptor
|
||||
self.blob_manager.get_blob(descriptor.sd_hash, is_mine=True), descriptor
|
||||
)
|
||||
row_id = await self.storage.save_published_file(
|
||||
descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0
|
||||
|
|
|
@ -528,13 +528,13 @@ class DiskSpaceManagement(CommandTestCase):
|
|||
self.assertEqual('0', status['disk_space']['space_used'])
|
||||
self.assertEqual(True, status['disk_space']['running'])
|
||||
sd_hash1, blobs1 = await self.get_referenced_blobs(
|
||||
await self.stream_create('foo1', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
||||
await self.stream_create('foo1', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||
)
|
||||
sd_hash2, blobs2 = await self.get_referenced_blobs(
|
||||
await self.stream_create('foo2', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||
await self.stream_create('foo2', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
||||
)
|
||||
sd_hash3, blobs3 = await self.get_referenced_blobs(
|
||||
await self.stream_create('foo3', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||
await self.stream_create('foo3', '0.01', data=('0' * 3 * 1024 * 1024).encode())
|
||||
)
|
||||
sd_hash4, blobs4 = await self.get_referenced_blobs(
|
||||
await self.stream_create('foo4', '0.01', data=('0' * 2 * 1024 * 1024).encode())
|
||||
|
@ -544,17 +544,20 @@ class DiskSpaceManagement(CommandTestCase):
|
|||
await self.daemon.storage.update_blob_ownership(sd_hash3, False)
|
||||
await self.daemon.storage.update_blob_ownership(sd_hash4, False)
|
||||
|
||||
self.assertEqual('9', (await self.status())['disk_space']['space_used'])
|
||||
self.assertEqual('10', (await self.status())['disk_space']['space_used'])
|
||||
self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list()))
|
||||
|
||||
await self.blob_clean()
|
||||
|
||||
self.assertEqual('9', (await self.status())['disk_space']['space_used'])
|
||||
self.assertEqual('10', (await self.status())['disk_space']['space_used'])
|
||||
self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list()))
|
||||
|
||||
self.daemon.conf.blob_storage_limit = 5
|
||||
self.daemon.conf.blob_storage_limit = 6
|
||||
await self.blob_clean()
|
||||
|
||||
self.assertEqual('4', (await self.status())['disk_space']['space_used'])
|
||||
self.assertEqual('5', (await self.status())['disk_space']['space_used'])
|
||||
blobs = set(await self.blob_list())
|
||||
self.assertEqual(blobs2 | blobs4, set(await self.blob_list()))
|
||||
self.assertFalse(blobs1.issubset(blobs))
|
||||
self.assertTrue(blobs2.issubset(blobs))
|
||||
self.assertFalse(blobs3.issubset(blobs))
|
||||
self.assertTrue(blobs4.issubset(blobs))
|
||||
|
|
Loading…
Add table
Reference in a new issue