fix integration test

-cancel running reflector uploads from publishes upon shutdown
-refactor blob.delete to be non-async
-delete blobs synchronously
This commit is contained in:
Jack Robison 2019-02-14 15:42:12 -05:00
parent 9c8593d88e
commit 56c41b2fea
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 25 additions and 27 deletions

View file

@ -144,14 +144,13 @@ class BlobFile:
while self.writers:
self.writers.pop().finished.cancel()
async def delete(self):
def delete(self):
self.close()
async with self.blob_write_lock:
self.saved_verified_blob = False
if os.path.isfile(self.file_path):
os.remove(self.file_path)
self.verified.clear()
self.finished_writing.clear()
self.saved_verified_blob = False
if os.path.isfile(self.file_path):
os.remove(self.file_path)
self.verified.clear()
self.finished_writing.clear()
def decrypt(self, key: bytes, iv: bytes) -> bytes:
"""

View file

@ -2,7 +2,6 @@ import os
import typing
import asyncio
import logging
from sqlite3 import IntegrityError
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash
from lbrynet.stream.descriptor import StreamDescriptor
@ -63,29 +62,21 @@ class BlobFileManager:
blobs = [self.get_blob(b) for b in blob_hashes]
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
async def delete_blob(self, blob_hash: str):
def delete_blob(self, blob_hash: str):
if not is_valid_blobhash(blob_hash):
raise Exception("invalid blob hash to delete")
if blob_hash not in self.blobs:
if os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
os.remove(os.path.join(self.blob_dir, blob_hash))
else:
try:
blob = self.get_blob(blob_hash)
await blob.delete()
except Exception as e:
log.warning("Failed to delete blob file. Reason: %s", e)
if blob_hash in self.blobs:
del self.blobs[blob_hash]
if blob_hash in self.completed_blob_hashes:
self.completed_blob_hashes.remove(blob_hash)
self.blobs.pop(blob_hash).delete()
if blob_hash in self.completed_blob_hashes:
self.completed_blob_hashes.remove(blob_hash)
async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True):
bh_to_delete_from_db = []
await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop)
for blob_hash in blob_hashes:
self.delete_blob(blob_hash)
if delete_from_db:
try:
await self.storage.delete_blobs_from_db(bh_to_delete_from_db)
except IntegrityError as err:
if str(err) != "FOREIGN KEY constraint failed":
raise err
await self.storage.delete_blobs_from_db(blob_hashes)

View file

@ -353,7 +353,7 @@ class SQLiteStorage(SQLiteMixin):
transaction.executemany(
"delete from blob where blob_hash=?;", [(blob_hash,) for blob_hash in blob_hashes]
)
return self.db.run(delete_blobs)
return self.db.run_with_foreign_keys_disabled(delete_blobs)
def get_all_blob_hashes(self):
return self.run_and_return_list("select blob_hash from blob")

View file

@ -65,6 +65,7 @@ class StreamManager:
self.resume_downloading_task: asyncio.Task = None
self.re_reflect_task: asyncio.Task = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
self.running_reflector_uploads: typing.List[asyncio.Task] = []
async def _update_content_claim(self, stream: ManagedStream):
claim_info = await self.storage.get_content_claim(stream.stream_hash)
@ -200,6 +201,8 @@ class StreamManager:
stream.stop_download()
while self.update_stream_finished_futs:
self.update_stream_finished_futs.pop().cancel()
while self.running_reflector_uploads:
self.running_reflector_uploads.pop().cancel()
async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
@ -208,7 +211,12 @@ class StreamManager:
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
if self.config.reflect_streams and self.config.reflector_servers:
host, port = random.choice(self.config.reflector_servers)
self.loop.create_task(stream.upload_to_reflector(host, port))
task = self.loop.create_task(stream.upload_to_reflector(host, port))
self.running_reflector_uploads.append(task)
task.add_done_callback(
lambda _: None
if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task)
)
return stream
async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):