diff --git a/lbrynet/blob/EncryptedFileDownloader.py b/lbrynet/blob/EncryptedFileDownloader.py index bf258a76c..76e52d851 100644 --- a/lbrynet/blob/EncryptedFileDownloader.py +++ b/lbrynet/blob/EncryptedFileDownloader.py @@ -149,7 +149,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): status = ManagedEncryptedFileDownloader.STATUS_RUNNING status = yield self.lbry_file_manager.change_lbry_file_status(self, status) self._saving_status = False - defer.returnValue(status) + return status def save_status(self): return self._save_status() diff --git a/lbrynet/blob/EncryptedFileManager.py b/lbrynet/blob/EncryptedFileManager.py index ece77bb99..7c6f3d7a3 100644 --- a/lbrynet/blob/EncryptedFileManager.py +++ b/lbrynet/blob/EncryptedFileManager.py @@ -236,7 +236,7 @@ class EncryptedFileManager: def reflect_lbry_files(self): sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) ds = [] - sd_hashes_to_reflect = yield self.storage.get_streams_to_re_reflect() + sd_hashes_to_reflect = yield f2d(self.storage.get_streams_to_re_reflect()) for lbry_file in self.lbry_files: if lbry_file.sd_hash in sd_hashes_to_reflect: ds.append(sem.run(reflect_file, lbry_file)) diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index 34b959dd9..54dab503b 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -57,7 +57,7 @@ def start_daemon(settings: typing.Optional[typing.Dict] = None, if check_connection(): daemon = Daemon() - asyncio.create_task(daemon.start_listening()) + reactor._asyncioEventloop.create_task(daemon.start_listening()) reactor.run() else: log.info("Not connected to internet, unable to start") diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index e9c65b84b..afc7f91b1 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -7,7 +7,7 @@ import math import binascii from hashlib import sha256 from types import SimpleNamespace -from twisted.internet import defer, reactor, error, task +from twisted.internet import defer, reactor, error from aioupnp import __version__ as aioupnp_version from aioupnp.upnp import UPnP @@ -404,7 +404,7 @@ class HashAnnouncerComponent(Component): def component(self): return self.hash_announcer - def start(self): + async def start(self): storage = self.component_manager.get_component(DATABASE_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT) self.hash_announcer = DHTHashAnnouncer(dht_node, storage) @@ -539,7 +539,7 @@ class PeerProtocolServerComponent(Component): try: log.info("Peer protocol listening on TCP %i (ext port %i)", peer_port, upnp.upnp_redirects.get("TCP", peer_port)) - self.lbry_server_port = await d2f(reactor.listenTCP(peer_port, server_factory)) + self.lbry_server_port = reactor.listenTCP(peer_port, server_factory) except error.CannotListenError as e: import traceback log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" @@ -597,13 +597,18 @@ class UPnPComponent(Component): self.upnp = None self.upnp_redirects = {} self.external_ip = None - self._maintain_redirects_lc = task.LoopingCall(self._maintain_redirects) - self._maintain_redirects_lc.clock = self.component_manager.reactor + self._maintain_redirects_task = None @property def component(self): return self + async def _repeatedly_maintain_redirects(self, now=True): + while True: + if now: + await self._maintain_redirects() + await asyncio.sleep(360) + async def _maintain_redirects(self): # setup the gateway if necessary if not self.upnp: @@ -692,13 +697,15 @@ class UPnPComponent(Component): else: log.error("failed to setup upnp") self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status()) - self._maintain_redirects_lc.start(360, now=False) + self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False)) async def stop(self): if self.upnp_redirects: await asyncio.wait([ self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items() ]) + if self._maintain_redirects_task is not None and not self._maintain_redirects_task.done(): + self._maintain_redirects_task.cancel() async def get_status(self): return { diff --git a/lbrynet/extras/reflector/client/client.py b/lbrynet/extras/reflector/client/client.py index 213b33f69..4fd76fb2c 100644 --- a/lbrynet/extras/reflector/client/client.py +++ b/lbrynet/extras/reflector/client/client.py @@ -6,6 +6,7 @@ from twisted.protocols.basic import FileSender from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import defer, error +from lbrynet.extras.compat import f2d from lbrynet.extras.reflector.common import IncompleteResponse, ReflectorRequestError from lbrynet.extras.reflector.common import REFLECTOR_V1, REFLECTOR_V2 @@ -61,7 +62,9 @@ class EncryptedFileReflectorClient(Protocol): else: reflected = False - d = self.blob_manager.storage.update_reflected_stream(self.sd_hash, self.transport.getPeer().host, reflected) + d = f2d(self.blob_manager.storage.update_reflected_stream( + self.sd_hash, self.transport.getPeer().host, reflected + )) d.addCallback(lambda _: result) return d @@ -113,10 +116,7 @@ class EncryptedFileReflectorClient(Protocol): for crypt_blob in blobs: if crypt_blob.blob_hash and crypt_blob.length: yield self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) - - dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True) - dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()]) - return dl + return [blob for r, blob in get_blobs(blobs_in_stream) if r and blob.get_is_verified()] def set_blobs_to_send(self, blobs_to_send): for blob in blobs_to_send: @@ -132,7 +132,7 @@ class EncryptedFileReflectorClient(Protocol): len(filtered)) return filtered - d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash) + d = f2d(self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash)) d.addCallback(self.get_validated_blobs) if not self.descriptor_needed: d.addCallback(lambda filtered: diff --git a/lbrynet/p2p/HTTPBlobDownloader.py b/lbrynet/p2p/HTTPBlobDownloader.py index 28e6e2a4b..405dec7f3 100644 --- a/lbrynet/p2p/HTTPBlobDownloader.py +++ b/lbrynet/p2p/HTTPBlobDownloader.py @@ -5,7 +5,7 @@ from twisted.internet import defer, task from twisted.internet.error import ConnectingCancelledError from twisted.web._newclient import ResponseNeverReceived -from lbrynet.utils import DeferredDict +from lbrynet.extras.compat import f2d from lbrynet.p2p.Error import DownloadCanceledError log = logging.getLogger(__name__) @@ -104,9 +104,7 @@ class HTTPBlobDownloader: log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8]) else: log.debug("trying to download %i blobs from mirror", len(self.blob_hashes)) - blobs = yield DeferredDict( - {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} - ) + blobs = {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] yield defer.DeferredList(self.deferreds) if self.retry and self.missing_blob_hashes: @@ -175,7 +173,7 @@ class HTTPBlobDownloader: @defer.inlineCallbacks def download_stream(self, stream_hash, sd_hash): - stream_crypt_blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash) + stream_crypt_blobs = yield f2d(self.blob_manager.storage.get_blobs_for_stream(stream_hash)) self.blob_hashes.extend([ b.blob_hash for b in stream_crypt_blobs if b.blob_hash and b.blob_hash not in self.blob_hashes diff --git a/tests/mocks.py b/tests/mocks.py index d6bf2fb3f..e1bfcfc40 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -152,16 +152,15 @@ class PointTraderKeyQueryHandler: try: decode_rsa_key(new_encoded_pub_key) except (ValueError, TypeError, IndexError): - value_error = ValueError(f"Client sent an invalid public key: {new_encoded_pub_key}") - return defer.fail(Failure(value_error)) + raise ValueError(f"Client sent an invalid public key: {new_encoded_pub_key}") self.public_key = new_encoded_pub_key self.wallet.set_public_key_for_peer(self.peer, self.public_key) fields = {'public_key': self.wallet.encoded_public_key.decode()} - return defer.succeed(fields) + return fields if self.public_key is None: - return defer.fail(Failure(ValueError("Expected but did not receive a public key"))) + raise ValueError("Expected but did not receive a public key") else: - return defer.succeed({}) + return {} class Wallet: