fix file reflect and add test

This commit is contained in:
Victor Shyba 2021-09-16 18:38:56 -03:00
parent 1004a83dae
commit 3d7ece91eb
3 changed files with 13 additions and 2 deletions

View file

@ -5068,8 +5068,8 @@ class Daemon(metaclass=JSONRPCServerType):
else: else:
server, port = random.choice(self.conf.reflector_servers) server, port = random.choice(self.conf.reflector_servers)
reflected = await asyncio.gather(*[ reflected = await asyncio.gather(*[
self.file_manager['stream'].reflect_stream(stream, server, port) self.file_manager.source_managers['stream'].reflect_stream(stream, server, port)
for stream in self.file_manager.get_filtered_streams(**kwargs) for stream in self.file_manager.get_filtered(**kwargs)
]) ])
total = [] total = []
for reflected_for_stream in reflected: for reflected_for_stream in reflected:

View file

@ -228,6 +228,7 @@ class StreamManager(SourceManager):
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0: while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0 stream.reflector_progress = 0
sent = await stream.upload_to_reflector(host, port) sent = await stream.upload_to_reflector(host, port)
return sent
async def create(self, file_path: str, key: Optional[bytes] = None, async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:

View file

@ -69,6 +69,16 @@ class FileCommands(CommandTestCase):
t = await self.stream_create(f'Stream_{i}', '0.00001') t = await self.stream_create(f'Stream_{i}', '0.00001')
self.stream_claim_ids.append(t['outputs'][0]['claim_id']) self.stream_claim_ids.append(t['outputs'][0]['claim_id'])
async def test_file_reflect(self):
tx = await self.stream_create('mirror', '0.01')
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
self.assertEqual([], await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash))
all_except_sd = [
blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash
]
await self.reflector.blob_manager.delete_blobs(all_except_sd)
self.assertEqual(all_except_sd, await self.daemon.jsonrpc_file_reflect(sd_hash=sd_hash))
async def test_file_management(self): async def test_file_management(self):
await self.stream_create('foo', '0.01') await self.stream_create('foo', '0.01')
await self.stream_create('foo2', '0.01') await self.stream_create('foo2', '0.01')