cancel reflector uploads upon file delete
-remove unnecessary db call in stream_update
This commit is contained in:
parent
2d644bdfb0
commit
34eb856d09
2 changed files with 30 additions and 20 deletions
|
@ -3333,17 +3333,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
stream_hash = None
|
stream_hash = None
|
||||||
if not preview:
|
if not preview:
|
||||||
old_stream_hash = await self.storage.get_stream_hash_for_sd_hash(old_txo.claim.stream.source.sd_hash)
|
old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None)
|
||||||
if file_path is not None:
|
if file_path is not None:
|
||||||
if old_stream_hash:
|
if old_stream:
|
||||||
stream_to_delete = self.stream_manager.get_stream_by_stream_hash(old_stream_hash)
|
await self.stream_manager.delete_stream(old_stream, delete_file=False)
|
||||||
await self.stream_manager.delete_stream(stream_to_delete, delete_file=False)
|
|
||||||
file_stream = await self.stream_manager.create_stream(file_path)
|
file_stream = await self.stream_manager.create_stream(file_path)
|
||||||
new_txo.claim.stream.source.sd_hash = file_stream.sd_hash
|
new_txo.claim.stream.source.sd_hash = file_stream.sd_hash
|
||||||
new_txo.script.generate()
|
new_txo.script.generate()
|
||||||
stream_hash = file_stream.stream_hash
|
stream_hash = file_stream.stream_hash
|
||||||
else:
|
elif old_stream:
|
||||||
stream_hash = old_stream_hash
|
stream_hash = old_stream.stream_hash
|
||||||
|
|
||||||
if channel:
|
if channel:
|
||||||
new_txo.sign(channel)
|
new_txo.sign(channel)
|
||||||
|
@ -4390,7 +4389,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
else:
|
else:
|
||||||
server, port = random.choice(self.conf.reflector_servers)
|
server, port = random.choice(self.conf.reflector_servers)
|
||||||
reflected = await asyncio.gather(*[
|
reflected = await asyncio.gather(*[
|
||||||
stream.upload_to_reflector(server, port)
|
self.stream_manager.reflect_stream(stream, server, port)
|
||||||
for stream in self.stream_manager.get_filtered_streams(**kwargs)
|
for stream in self.stream_manager.get_filtered_streams(**kwargs)
|
||||||
])
|
])
|
||||||
total = []
|
total = []
|
||||||
|
|
|
@ -77,7 +77,7 @@ class StreamManager:
|
||||||
self.resume_saving_task: Optional[asyncio.Task] = None
|
self.resume_saving_task: Optional[asyncio.Task] = None
|
||||||
self.re_reflect_task: Optional[asyncio.Task] = None
|
self.re_reflect_task: Optional[asyncio.Task] = None
|
||||||
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
||||||
self.running_reflector_uploads: typing.List[asyncio.Task] = []
|
self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {}
|
||||||
self.started = asyncio.Event(loop=self.loop)
|
self.started = asyncio.Event(loop=self.loop)
|
||||||
|
|
||||||
async def _update_content_claim(self, stream: ManagedStream):
|
async def _update_content_claim(self, stream: ManagedStream):
|
||||||
|
@ -185,10 +185,10 @@ class StreamManager:
|
||||||
batch = []
|
batch = []
|
||||||
while sd_hashes:
|
while sd_hashes:
|
||||||
stream = self.streams[sd_hashes.pop()]
|
stream = self.streams[sd_hashes.pop()]
|
||||||
if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed:
|
if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed and \
|
||||||
if not stream.fully_reflected.is_set():
|
stream.sd_hash not in self.running_reflector_uploads and not \
|
||||||
host, port = random.choice(self.config.reflector_servers)
|
stream.fully_reflected.is_set():
|
||||||
batch.append(stream.upload_to_reflector(host, port))
|
batch.append(self.reflect_stream(stream))
|
||||||
if len(batch) >= self.config.concurrent_reflector_uploads:
|
if len(batch) >= self.config.concurrent_reflector_uploads:
|
||||||
await asyncio.gather(*batch, loop=self.loop)
|
await asyncio.gather(*batch, loop=self.loop)
|
||||||
batch = []
|
batch = []
|
||||||
|
@ -212,26 +212,37 @@ class StreamManager:
|
||||||
while self.update_stream_finished_futs:
|
while self.update_stream_finished_futs:
|
||||||
self.update_stream_finished_futs.pop().cancel()
|
self.update_stream_finished_futs.pop().cancel()
|
||||||
while self.running_reflector_uploads:
|
while self.running_reflector_uploads:
|
||||||
self.running_reflector_uploads.pop().cancel()
|
_, t = self.running_reflector_uploads.popitem()
|
||||||
|
t.cancel()
|
||||||
self.started.clear()
|
self.started.clear()
|
||||||
log.info("finished stopping the stream manager")
|
log.info("finished stopping the stream manager")
|
||||||
|
|
||||||
|
def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None,
|
||||||
|
port: Optional[int] = None) -> asyncio.Task:
|
||||||
|
if not server or not port:
|
||||||
|
server, port = random.choice(self.config.reflector_servers)
|
||||||
|
if stream.sd_hash in self.running_reflector_uploads:
|
||||||
|
return self.running_reflector_uploads[stream.sd_hash]
|
||||||
|
task = self.loop.create_task(stream.upload_to_reflector(server, port))
|
||||||
|
self.running_reflector_uploads[stream.sd_hash] = task
|
||||||
|
task.add_done_callback(
|
||||||
|
lambda _: None if stream.sd_hash not in self.running_reflector_uploads else
|
||||||
|
self.running_reflector_uploads.pop(stream.sd_hash)
|
||||||
|
)
|
||||||
|
return task
|
||||||
|
|
||||||
async def create_stream(self, file_path: str, key: Optional[bytes] = None,
|
async def create_stream(self, file_path: str, key: Optional[bytes] = None,
|
||||||
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
||||||
stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator)
|
stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator)
|
||||||
self.streams[stream.sd_hash] = stream
|
self.streams[stream.sd_hash] = stream
|
||||||
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||||
if self.config.reflect_streams and self.config.reflector_servers:
|
if self.config.reflect_streams and self.config.reflector_servers:
|
||||||
host, port = random.choice(self.config.reflector_servers)
|
self.reflect_stream(stream)
|
||||||
task = self.loop.create_task(stream.upload_to_reflector(host, port))
|
|
||||||
self.running_reflector_uploads.append(task)
|
|
||||||
task.add_done_callback(
|
|
||||||
lambda _: None
|
|
||||||
if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task)
|
|
||||||
)
|
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False):
|
async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False):
|
||||||
|
if stream.sd_hash in self.running_reflector_uploads:
|
||||||
|
self.running_reflector_uploads[stream.sd_hash].cancel()
|
||||||
stream.stop_tasks()
|
stream.stop_tasks()
|
||||||
if stream.sd_hash in self.streams:
|
if stream.sd_hash in self.streams:
|
||||||
del self.streams[stream.sd_hash]
|
del self.streams[stream.sd_hash]
|
||||||
|
|
Loading…
Reference in a new issue