Merge pull request #1862 from lbryio/fix_announce
adds announce tests on stream creation/assemble, refactor calls to set announce
This commit is contained in:
commit
43d424978d
7 changed files with 39 additions and 22 deletions
|
@ -111,9 +111,9 @@ class BlobFile:
|
|||
if self.verified.is_set():
|
||||
return
|
||||
await self.loop.run_in_executor(None, _save_verified)
|
||||
self.verified.set()
|
||||
if self.blob_completed_callback:
|
||||
await self.blob_completed_callback(self)
|
||||
self.verified.set()
|
||||
|
||||
def open_for_writing(self) -> HashBlobWriter:
|
||||
if os.path.exists(self.file_path):
|
||||
|
|
|
@ -59,9 +59,6 @@ class BlobFileManager:
|
|||
blobs = [self.get_blob(b) for b in blob_hashes]
|
||||
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
||||
|
||||
async def set_should_announce(self, blob_hash: str, should_announce: int):
|
||||
return await self.storage.set_should_announce(blob_hash, should_announce)
|
||||
|
||||
async def get_all_verified_blobs(self) -> typing.List[str]:
|
||||
blob_hashes = await self.storage.get_all_blob_hashes()
|
||||
return self.check_completed_blobs(blob_hashes)
|
||||
|
|
|
@ -208,12 +208,6 @@ class SQLiteStorage(SQLiteMixin):
|
|||
log.debug("Adding a completed blob. blob_hash=%s", blob_hash)
|
||||
return self.db.execute("update blob set status='finished' where blob.blob_hash=?", (blob_hash, ))
|
||||
|
||||
def set_should_announce(self, blob_hash: str, should_announce: int):
|
||||
return self.db.execute(
|
||||
"update blob set next_announce_time=?, should_announce=? where blob_hash=?",
|
||||
(int(self.time_getter()), should_announce, blob_hash)
|
||||
)
|
||||
|
||||
def get_blob_status(self, blob_hash: str):
|
||||
return self.run_and_return_one_or_none(
|
||||
"select status from blob where blob_hash=?", blob_hash
|
||||
|
@ -690,7 +684,7 @@ class SQLiteStorage(SQLiteMixin):
|
|||
if success:
|
||||
return self.db.execute(
|
||||
"insert or replace into reflected_stream values (?, ?, ?)",
|
||||
(sd_hash, reflector_address, time.time())
|
||||
(sd_hash, reflector_address, self.time_getter())
|
||||
)
|
||||
return self.db.execute(
|
||||
"delete from reflected_stream where sd_hash=? and reflector_address=?",
|
||||
|
|
|
@ -59,6 +59,7 @@ class StreamAssembler:
|
|||
self.wrote_bytes_event.set()
|
||||
|
||||
await self.loop.run_in_executor(None, _decrypt_and_write)
|
||||
return True
|
||||
|
||||
async def setup(self):
|
||||
pass
|
||||
|
@ -74,24 +75,25 @@ class StreamAssembler:
|
|||
raise OSError(f"output directory does not exist: '{output_dir}' '{output_file_name}'")
|
||||
await self.setup()
|
||||
self.sd_blob = await self.get_blob(self.sd_hash)
|
||||
await self.blob_manager.blob_completed(self.sd_blob)
|
||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
|
||||
self.sd_blob)
|
||||
await self.after_got_descriptor()
|
||||
self.output_path = await get_next_available_file_name(self.loop, output_dir,
|
||||
output_file_name or self.descriptor.suggested_file_name)
|
||||
if not self.got_descriptor.is_set():
|
||||
self.got_descriptor.set()
|
||||
await self.after_got_descriptor()
|
||||
await self.blob_manager.storage.store_stream(
|
||||
self.sd_blob, self.descriptor
|
||||
)
|
||||
await self.blob_manager.blob_completed(self.sd_blob)
|
||||
with open(self.output_path, 'wb') as stream_handle:
|
||||
self.stream_handle = stream_handle
|
||||
for blob_info in self.descriptor.blobs[:-1]:
|
||||
while True:
|
||||
while not stream_handle.closed:
|
||||
try:
|
||||
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
|
||||
await self._decrypt_blob(blob, blob_info, self.descriptor.key)
|
||||
if await self._decrypt_blob(blob, blob_info, self.descriptor.key):
|
||||
await self.blob_manager.blob_completed(blob)
|
||||
break
|
||||
except FileNotFoundError:
|
||||
log.debug("stream assembler stopped")
|
||||
|
|
|
@ -47,11 +47,9 @@ class StreamDownloader(StreamAssembler):
|
|||
async def after_got_descriptor(self):
|
||||
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
||||
log.info("added head blob to search")
|
||||
await self.blob_manager.set_should_announce(self.sd_hash, 1)
|
||||
|
||||
async def after_finished(self):
|
||||
log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path)
|
||||
await self.blob_manager.set_should_announce(self.descriptor.blobs[0].blob_hash, 1)
|
||||
await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished')
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -160,8 +160,6 @@ class ManagedStream:
|
|||
await blob_manager.blob_completed(sd_blob)
|
||||
for blob in descriptor.blobs[:-1]:
|
||||
await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length))
|
||||
await blob_manager.set_should_announce(sd_blob.blob_hash, 1)
|
||||
await blob_manager.set_should_announce(descriptor.blobs[0].blob_hash, 1)
|
||||
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
|
||||
status=cls.STATUS_FINISHED)
|
||||
|
||||
|
|
|
@ -2,13 +2,15 @@ import os
|
|||
import asyncio
|
||||
import tempfile
|
||||
import shutil
|
||||
|
||||
from torba.testcase import AsyncioTestCase
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.stream.assembler import StreamAssembler
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
from lbrynet.stream.stream_manager import StreamManager
|
||||
|
||||
|
||||
class TestStreamAssembler(AsyncioTestCase):
|
||||
|
@ -20,7 +22,7 @@ class TestStreamAssembler(AsyncioTestCase):
|
|||
async def test_create_and_decrypt_one_blob_stream(self):
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||
self.storage = SQLiteStorage(Config(), ":memory:")
|
||||
await self.storage.open()
|
||||
self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage)
|
||||
|
||||
|
@ -55,6 +57,12 @@ class TestStreamAssembler(AsyncioTestCase):
|
|||
decrypted = f.read()
|
||||
self.assertEqual(decrypted, self.cleartext)
|
||||
self.assertEqual(True, self.blob_manager.get_blob(sd_hash).get_is_verified())
|
||||
self.assertEqual(True, self.blob_manager.get_blob(descriptor.blobs[0].blob_hash).get_is_verified())
|
||||
# its all blobs + sd blob - last blob, which is the same size as descriptor.blobs
|
||||
self.assertEqual(len(descriptor.blobs), len(await downloader_storage.get_all_finished_blobs()))
|
||||
self.assertEqual(
|
||||
[descriptor.sd_hash, descriptor.blobs[0].blob_hash], await downloader_storage.get_blobs_to_announce()
|
||||
)
|
||||
|
||||
await downloader_storage.close()
|
||||
await self.storage.close()
|
||||
|
@ -75,3 +83,23 @@ class TestStreamAssembler(AsyncioTestCase):
|
|||
async def test_create_and_decrypt_random(self):
|
||||
self.cleartext = os.urandom(20000000)
|
||||
await self.test_create_and_decrypt_one_blob_stream()
|
||||
|
||||
async def test_create_managed_stream_announces(self):
|
||||
# setup a blob manager
|
||||
storage = SQLiteStorage(Config(), ":memory:")
|
||||
await storage.open()
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
blob_manager = BlobFileManager(self.loop, tmp_dir, storage)
|
||||
stream_manager = StreamManager(self.loop, Config(), blob_manager, None, storage, None)
|
||||
# create the stream
|
||||
download_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(download_dir))
|
||||
file_path = os.path.join(download_dir, "test_file")
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(b'testtest')
|
||||
|
||||
stream = await stream_manager.create_stream(file_path)
|
||||
self.assertEqual(
|
||||
[stream.sd_hash, stream.descriptor.blobs[0].blob_hash],
|
||||
await storage.get_blobs_to_announce())
|
||||
|
|
Loading…
Reference in a new issue