forked from LBRYCommunity/lbry-sdk
Merge pull request #1905 from lbryio/faster-file-delete
Faster file delete
This commit is contained in:
commit
19fdcad5af
5 changed files with 43 additions and 37 deletions
|
@ -144,9 +144,8 @@ class BlobFile:
|
||||||
while self.writers:
|
while self.writers:
|
||||||
self.writers.pop().finished.cancel()
|
self.writers.pop().finished.cancel()
|
||||||
|
|
||||||
async def delete(self):
|
def delete(self):
|
||||||
self.close()
|
self.close()
|
||||||
async with self.blob_write_lock:
|
|
||||||
self.saved_verified_blob = False
|
self.saved_verified_blob = False
|
||||||
if os.path.isfile(self.file_path):
|
if os.path.isfile(self.file_path):
|
||||||
os.remove(self.file_path)
|
os.remove(self.file_path)
|
||||||
|
|
|
@ -2,7 +2,6 @@ import os
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from sqlite3 import IntegrityError
|
|
||||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||||
from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash
|
from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
|
@ -63,23 +62,21 @@ class BlobFileManager:
|
||||||
blobs = [self.get_blob(b) for b in blob_hashes]
|
blobs = [self.get_blob(b) for b in blob_hashes]
|
||||||
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
||||||
|
|
||||||
async def delete_blob(self, blob_hash: str):
|
def delete_blob(self, blob_hash: str):
|
||||||
try:
|
if not is_valid_blobhash(blob_hash):
|
||||||
blob = self.get_blob(blob_hash)
|
raise Exception("invalid blob hash to delete")
|
||||||
await blob.delete()
|
|
||||||
except Exception as e:
|
if blob_hash not in self.blobs:
|
||||||
log.warning("Failed to delete blob file. Reason: %s", e)
|
if os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
|
||||||
|
os.remove(os.path.join(self.blob_dir, blob_hash))
|
||||||
|
else:
|
||||||
|
self.blobs.pop(blob_hash).delete()
|
||||||
if blob_hash in self.completed_blob_hashes:
|
if blob_hash in self.completed_blob_hashes:
|
||||||
self.completed_blob_hashes.remove(blob_hash)
|
self.completed_blob_hashes.remove(blob_hash)
|
||||||
if blob_hash in self.blobs:
|
|
||||||
del self.blobs[blob_hash]
|
|
||||||
|
|
||||||
async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True):
|
async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True):
|
||||||
bh_to_delete_from_db = []
|
for blob_hash in blob_hashes:
|
||||||
await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop)
|
self.delete_blob(blob_hash)
|
||||||
|
|
||||||
if delete_from_db:
|
if delete_from_db:
|
||||||
try:
|
await self.storage.delete_blobs_from_db(blob_hashes)
|
||||||
await self.storage.delete_blobs_from_db(bh_to_delete_from_db)
|
|
||||||
except IntegrityError as err:
|
|
||||||
if str(err) != "FOREIGN KEY constraint failed":
|
|
||||||
raise err
|
|
||||||
|
|
|
@ -154,6 +154,16 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
|
||||||
return files
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'):
|
||||||
|
blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]]
|
||||||
|
blob_hashes.append((descriptor.sd_hash, ))
|
||||||
|
transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,))
|
||||||
|
transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash,))
|
||||||
|
transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash,))
|
||||||
|
transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash,))
|
||||||
|
transaction.executemany("delete from blob where blob_hash=?", blob_hashes)
|
||||||
|
|
||||||
|
|
||||||
class SQLiteStorage(SQLiteMixin):
|
class SQLiteStorage(SQLiteMixin):
|
||||||
CREATE_TABLES_QUERY = """
|
CREATE_TABLES_QUERY = """
|
||||||
pragma foreign_keys=on;
|
pragma foreign_keys=on;
|
||||||
|
@ -336,7 +346,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
transaction.executemany(
|
transaction.executemany(
|
||||||
"delete from blob where blob_hash=?;", [(blob_hash,) for blob_hash in blob_hashes]
|
"delete from blob where blob_hash=?;", [(blob_hash,) for blob_hash in blob_hashes]
|
||||||
)
|
)
|
||||||
return self.db.run(delete_blobs)
|
return self.db.run_with_foreign_keys_disabled(delete_blobs)
|
||||||
|
|
||||||
def get_all_blob_hashes(self):
|
def get_all_blob_hashes(self):
|
||||||
return self.run_and_return_list("select blob_hash from blob")
|
return self.run_and_return_list("select blob_hash from blob")
|
||||||
|
@ -425,15 +435,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
)
|
)
|
||||||
|
|
||||||
def delete_stream(self, descriptor: 'StreamDescriptor'):
|
def delete_stream(self, descriptor: 'StreamDescriptor'):
|
||||||
def _delete_stream(transaction: sqlite3.Connection):
|
return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor)
|
||||||
transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,))
|
|
||||||
transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash, ))
|
|
||||||
transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash, ))
|
|
||||||
transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash, ))
|
|
||||||
transaction.execute("delete from blob where blob_hash=?", (descriptor.sd_hash, ))
|
|
||||||
transaction.executemany("delete from blob where blob_hash=?",
|
|
||||||
[(blob.blob_hash, ) for blob in descriptor.blobs[:-1]])
|
|
||||||
return self.db.run(_delete_stream)
|
|
||||||
|
|
||||||
# # # # # # # # # file stuff # # # # # # # # #
|
# # # # # # # # # file stuff # # # # # # # # #
|
||||||
|
|
||||||
|
|
|
@ -65,6 +65,7 @@ class StreamManager:
|
||||||
self.resume_downloading_task: asyncio.Task = None
|
self.resume_downloading_task: asyncio.Task = None
|
||||||
self.re_reflect_task: asyncio.Task = None
|
self.re_reflect_task: asyncio.Task = None
|
||||||
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
||||||
|
self.running_reflector_uploads: typing.List[asyncio.Task] = []
|
||||||
|
|
||||||
async def _update_content_claim(self, stream: ManagedStream):
|
async def _update_content_claim(self, stream: ManagedStream):
|
||||||
claim_info = await self.storage.get_content_claim(stream.stream_hash)
|
claim_info = await self.storage.get_content_claim(stream.stream_hash)
|
||||||
|
@ -200,6 +201,8 @@ class StreamManager:
|
||||||
stream.stop_download()
|
stream.stop_download()
|
||||||
while self.update_stream_finished_futs:
|
while self.update_stream_finished_futs:
|
||||||
self.update_stream_finished_futs.pop().cancel()
|
self.update_stream_finished_futs.pop().cancel()
|
||||||
|
while self.running_reflector_uploads:
|
||||||
|
self.running_reflector_uploads.pop().cancel()
|
||||||
|
|
||||||
async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None,
|
async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None,
|
||||||
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
||||||
|
@ -208,7 +211,12 @@ class StreamManager:
|
||||||
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||||
if self.config.reflect_streams and self.config.reflector_servers:
|
if self.config.reflect_streams and self.config.reflector_servers:
|
||||||
host, port = random.choice(self.config.reflector_servers)
|
host, port = random.choice(self.config.reflector_servers)
|
||||||
self.loop.create_task(stream.upload_to_reflector(host, port))
|
task = self.loop.create_task(stream.upload_to_reflector(host, port))
|
||||||
|
self.running_reflector_uploads.append(task)
|
||||||
|
task.add_done_callback(
|
||||||
|
lambda _: None
|
||||||
|
if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task)
|
||||||
|
)
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):
|
async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):
|
||||||
|
|
|
@ -37,7 +37,7 @@ class FileCommands(CommandTestCase):
|
||||||
self.assertIn('error', resp)
|
self.assertIn('error', resp)
|
||||||
self.assertEquals('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error'])
|
self.assertEquals('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error'])
|
||||||
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
||||||
await self.server.blob_manager.delete_blob(sd_hash)
|
await self.server.blob_manager.delete_blobs([sd_hash])
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
||||||
self.assertIn('error', resp)
|
self.assertIn('error', resp)
|
||||||
self.assertEquals('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])
|
self.assertEquals('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])
|
||||||
|
|
Loading…
Reference in a new issue