diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index d6f72662f..536b302e9 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -3524,6 +3524,7 @@ class Daemon(metaclass=JSONRPCServerType): await self.storage.save_claims_for_resolve([ value for value in results.values() if 'error' not in value ]) + await asyncio.create_task(self.stream_manager.check_from_resolve_response(results)) return results async def get_claims_for_name(self, name: str): diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 6aeb13f6b..f828ffdb7 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -5,6 +5,9 @@ import binascii import logging import random from decimal import Decimal + +from google.protobuf.message import DecodeError + from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError from lbrynet.error import DownloadSDTimeout, DownloadDataTimeout, ResolveTimeout from lbrynet.utils import cache_concurrent @@ -286,6 +289,21 @@ class StreamManager: streams.reverse() return streams + async def check_from_resolve_response(self, resolutions: dict): + futs = [] + for uri, resolution in resolutions.items(): + resolved = resolution if 'value' in resolution else resolution.get('claim') + if not resolved: + continue + outpoint = f"{resolved['txid']}:{resolved['nout']}" + try: + claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) + except DecodeError: + log.warning("Got an update for %s, but it doesn't decode. Skipping it.") + continue + futs.append(self._check_update_or_replace(outpoint, resolved['claim_id'], claim)) + return await asyncio.gather(*futs) + async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim) -> typing.Tuple[ typing.Optional[ManagedStream], typing.Optional[ManagedStream]]: existing = self.get_filtered_streams(outpoint=outpoint)