From 85f41887fd5f70d38ff016dff634f24bcefaa758 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 Feb 2019 18:27:14 -0500 Subject: [PATCH 1/5] don't create BlobFile object when deleting if it doesn't already exist --- lbrynet/blob/blob_manager.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 73c2b0573..8e0a5d2e6 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -64,15 +64,21 @@ class BlobFileManager: return [blob.blob_hash for blob in blobs if blob.get_is_verified()] async def delete_blob(self, blob_hash: str): - try: - blob = self.get_blob(blob_hash) - await blob.delete() - except Exception as e: - log.warning("Failed to delete blob file. Reason: %s", e) + if not is_valid_blobhash(blob_hash): + raise Exception("invalid blob hash to delete") + if blob_hash not in self.blobs: + if os.path.isfile(os.path.join(self.blob_dir, blob_hash)): + os.remove(os.path.join(self.blob_dir, blob_hash)) + else: + try: + blob = self.get_blob(blob_hash) + await blob.delete() + except Exception as e: + log.warning("Failed to delete blob file. Reason: %s", e) + if blob_hash in self.blobs: + del self.blobs[blob_hash] if blob_hash in self.completed_blob_hashes: self.completed_blob_hashes.remove(blob_hash) - if blob_hash in self.blobs: - del self.blobs[blob_hash] async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True): bh_to_delete_from_db = [] From 9c8593d88e0f455129e6f08d9736653a8af734c0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Feb 2019 12:33:48 -0500 Subject: [PATCH 2/5] fix file_delete being slow --- lbrynet/extras/daemon/storage.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index b37909240..6843b677b 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -105,6 +105,13 @@ def get_content_claim_from_outpoint(transaction: sqlite3.Connection, return StoredStreamClaim(*claim_fields) +def batched_operation(transaction, query, parameters, batch_size=900): + for start_index in range(0, len(parameters), batch_size): + current_batch = parameters[start_index:start_index+batch_size] + bind = "({})".format(','.join(['?'] * len(current_batch))) + transaction.execute(query.format(bind), current_batch) + + def _batched_select(transaction, query, parameters, batch_size=900): for start_index in range(0, len(parameters), batch_size): current_batch = parameters[start_index:start_index+batch_size] @@ -154,6 +161,16 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di return files +def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'): + blob_hashes = [blob.blob_hash for blob in descriptor.blobs[:-1]] + blob_hashes.append(descriptor.sd_hash) + transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)) + transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash,)) + transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash,)) + transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash,)) + batched_operation(transaction, "delete from blob where blob_hash in {}", blob_hashes) + + class SQLiteStorage(SQLiteMixin): CREATE_TABLES_QUERY = """ pragma foreign_keys=on; @@ -425,15 +442,7 @@ class SQLiteStorage(SQLiteMixin): ) def delete_stream(self, descriptor: 'StreamDescriptor'): - def _delete_stream(transaction: sqlite3.Connection): - transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)) - transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash, )) - transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash, )) - transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash, )) - transaction.execute("delete from blob where blob_hash=?", (descriptor.sd_hash, )) - transaction.executemany("delete from blob where blob_hash=?", - [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]]) - return self.db.run(_delete_stream) + return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor) # # # # # # # # # file stuff # # # # # # # # # From 56c41b2fea180e9bb8e4716e64599fd7cc2d50eb Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Feb 2019 15:42:12 -0500 Subject: [PATCH 3/5] fix integration test -cancel running reflector uploads from publishes upon shutdown -refactor blob.delete to be non-async -delete blobs synchronously --- lbrynet/blob/blob_file.py | 13 ++++++------- lbrynet/blob/blob_manager.py | 27 +++++++++------------------ lbrynet/extras/daemon/storage.py | 2 +- lbrynet/stream/stream_manager.py | 10 +++++++++- 4 files changed, 25 insertions(+), 27 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index f99dbe768..db4fd5f88 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -144,14 +144,13 @@ class BlobFile: while self.writers: self.writers.pop().finished.cancel() - async def delete(self): + def delete(self): self.close() - async with self.blob_write_lock: - self.saved_verified_blob = False - if os.path.isfile(self.file_path): - os.remove(self.file_path) - self.verified.clear() - self.finished_writing.clear() + self.saved_verified_blob = False + if os.path.isfile(self.file_path): + os.remove(self.file_path) + self.verified.clear() + self.finished_writing.clear() def decrypt(self, key: bytes, iv: bytes) -> bytes: """ diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 8e0a5d2e6..84d0daf59 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -2,7 +2,6 @@ import os import typing import asyncio import logging -from sqlite3 import IntegrityError from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash from lbrynet.stream.descriptor import StreamDescriptor @@ -63,29 +62,21 @@ class BlobFileManager: blobs = [self.get_blob(b) for b in blob_hashes] return [blob.blob_hash for blob in blobs if blob.get_is_verified()] - async def delete_blob(self, blob_hash: str): + def delete_blob(self, blob_hash: str): if not is_valid_blobhash(blob_hash): raise Exception("invalid blob hash to delete") + if blob_hash not in self.blobs: if os.path.isfile(os.path.join(self.blob_dir, blob_hash)): os.remove(os.path.join(self.blob_dir, blob_hash)) else: - try: - blob = self.get_blob(blob_hash) - await blob.delete() - except Exception as e: - log.warning("Failed to delete blob file. Reason: %s", e) - if blob_hash in self.blobs: - del self.blobs[blob_hash] - if blob_hash in self.completed_blob_hashes: - self.completed_blob_hashes.remove(blob_hash) + self.blobs.pop(blob_hash).delete() + if blob_hash in self.completed_blob_hashes: + self.completed_blob_hashes.remove(blob_hash) async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True): - bh_to_delete_from_db = [] - await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop) + for blob_hash in blob_hashes: + self.delete_blob(blob_hash) + if delete_from_db: - try: - await self.storage.delete_blobs_from_db(bh_to_delete_from_db) - except IntegrityError as err: - if str(err) != "FOREIGN KEY constraint failed": - raise err + await self.storage.delete_blobs_from_db(blob_hashes) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 6843b677b..1b4f9e167 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -353,7 +353,7 @@ class SQLiteStorage(SQLiteMixin): transaction.executemany( "delete from blob where blob_hash=?;", [(blob_hash,) for blob_hash in blob_hashes] ) - return self.db.run(delete_blobs) + return self.db.run_with_foreign_keys_disabled(delete_blobs) def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index b704da285..e911381e6 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -65,6 +65,7 @@ class StreamManager: self.resume_downloading_task: asyncio.Task = None self.re_reflect_task: asyncio.Task = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] + self.running_reflector_uploads: typing.List[asyncio.Task] = [] async def _update_content_claim(self, stream: ManagedStream): claim_info = await self.storage.get_content_claim(stream.stream_hash) @@ -200,6 +201,8 @@ class StreamManager: stream.stop_download() while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() + while self.running_reflector_uploads: + self.running_reflector_uploads.pop().cancel() async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: @@ -208,7 +211,12 @@ class StreamManager: self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: host, port = random.choice(self.config.reflector_servers) - self.loop.create_task(stream.upload_to_reflector(host, port)) + task = self.loop.create_task(stream.upload_to_reflector(host, port)) + self.running_reflector_uploads.append(task) + task.add_done_callback( + lambda _: None + if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task) + ) return stream async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): From 9d89f9b834dc14f31e8537493af67532807c4de9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Feb 2019 15:57:24 -0500 Subject: [PATCH 4/5] use executemany instead of batched_operation --- lbrynet/extras/daemon/storage.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 1b4f9e167..68e93145c 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -105,13 +105,6 @@ def get_content_claim_from_outpoint(transaction: sqlite3.Connection, return StoredStreamClaim(*claim_fields) -def batched_operation(transaction, query, parameters, batch_size=900): - for start_index in range(0, len(parameters), batch_size): - current_batch = parameters[start_index:start_index+batch_size] - bind = "({})".format(','.join(['?'] * len(current_batch))) - transaction.execute(query.format(bind), current_batch) - - def _batched_select(transaction, query, parameters, batch_size=900): for start_index in range(0, len(parameters), batch_size): current_batch = parameters[start_index:start_index+batch_size] @@ -162,13 +155,13 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'): - blob_hashes = [blob.blob_hash for blob in descriptor.blobs[:-1]] - blob_hashes.append(descriptor.sd_hash) + blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]] + blob_hashes.append((descriptor.sd_hash, )) transaction.execute("delete from content_claim where stream_hash=? ", (descriptor.stream_hash,)) transaction.execute("delete from file where stream_hash=? ", (descriptor.stream_hash,)) transaction.execute("delete from stream_blob where stream_hash=?", (descriptor.stream_hash,)) transaction.execute("delete from stream where stream_hash=? ", (descriptor.stream_hash,)) - batched_operation(transaction, "delete from blob where blob_hash in {}", blob_hashes) + transaction.executemany("delete from blob where blob_hash=?", blob_hashes) class SQLiteStorage(SQLiteMixin): From 1734e082971e199fad8c6ed4225bada6a993bccf Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 14 Feb 2019 16:34:37 -0500 Subject: [PATCH 5/5] fix test --- tests/integration/test_file_commands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 9f660d051..b9f501f5f 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -37,7 +37,7 @@ class FileCommands(CommandTestCase): self.assertIn('error', resp) self.assertEquals('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error']) await self.daemon.jsonrpc_file_delete(claim_name='foo') - await self.server.blob_manager.delete_blob(sd_hash) + await self.server.blob_manager.delete_blobs([sd_hash]) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) self.assertIn('error', resp) self.assertEquals('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])