diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index c64b2dfb4..97b9aad8c 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -57,7 +57,7 @@ def start_daemon(settings: typing.Optional[typing.Dict] = None, if check_connection(): daemon = Daemon() - asyncio.get_event_loop().run_until_complete(daemon.start_listening()) + asyncio.get_event_loop().create_task(daemon.start_listening()) reactor.run() else: log.info("Not connected to internet, unable to start") diff --git a/lbrynet/extras/daemon/Component.py b/lbrynet/extras/daemon/Component.py index 8de53a6ae..b58a5802c 100644 --- a/lbrynet/extras/daemon/Component.py +++ b/lbrynet/extras/daemon/Component.py @@ -36,37 +36,35 @@ class Component(metaclass=ComponentType): def running(self): return self._running - def get_status(self): + async def get_status(self): return - def start(self): + async def start(self): raise NotImplementedError() - def stop(self): + async def stop(self): raise NotImplementedError() @property def component(self): raise NotImplementedError() - @defer.inlineCallbacks - def _setup(self): + async def _setup(self): try: - result = yield defer.maybeDeferred(self.start) + result = await self.start() self._running = True - defer.returnValue(result) + return result except (defer.CancelledError, AlreadyQuit): pass except Exception as err: log.exception("Error setting up %s", self.component_name or self.__class__.__name__) raise err - @defer.inlineCallbacks - def _stop(self): + async def _stop(self): try: - result = yield defer.maybeDeferred(self.stop) + result = await self.stop() self._running = False - defer.returnValue(result) + return result except (defer.CancelledError, AlreadyQuit): pass except Exception as err: diff --git a/lbrynet/extras/daemon/ComponentManager.py b/lbrynet/extras/daemon/ComponentManager.py index 9af07fd8d..8233a3edd 100644 --- a/lbrynet/extras/daemon/ComponentManager.py +++ b/lbrynet/extras/daemon/ComponentManager.py @@ -1,5 +1,5 @@ +import asyncio import logging -from twisted.internet import defer from lbrynet.p2p.Error import ComponentStartConditionNotMet from lbrynet.extras.daemon.PeerManager import PeerManager from lbrynet.extras.daemon.PeerFinder import DHTPeerFinder @@ -110,13 +110,8 @@ class ComponentManager: steps.reverse() return steps - @defer.inlineCallbacks - def setup(self, **callbacks): - """ - Start Components in sequence sorted by requirements - - :return: (defer.Deferred) - """ + async def setup(self, **callbacks): + """ Start Components in sequence sorted by requirements """ for component_name, cb in callbacks.items(): if component_name not in self.component_classes: if component_name not in self.skip_components: @@ -124,19 +119,22 @@ class ComponentManager: if not callable(cb): raise ValueError("%s is not callable" % cb) - def _setup(component): + async def _setup(component): + await component._setup() if component.component_name in callbacks: - d = component._setup() - d.addCallback(callbacks[component.component_name], component) - return d - return component._setup() + maybe_coro = callbacks[component.component_name](component) + if asyncio.iscoroutine(maybe_coro): + asyncio.create_task(maybe_coro) stages = self.sort_components() for stage in stages: - yield defer.DeferredList([_setup(component) for component in stage if not component.running]) + needing_start = [ + _setup(component) for component in stage if not component.running + ] + if needing_start: + await asyncio.wait(needing_start) - @defer.inlineCallbacks - def stop(self): + async def stop(self): """ Stop Components in reversed startup order @@ -144,7 +142,11 @@ class ComponentManager: """ stages = self.sort_components(reverse=True) for stage in stages: - yield defer.DeferredList([component._stop() for component in stage if component.running]) + needing_stop = [ + component._stop() for component in stage if component.running + ] + if needing_stop: + await asyncio.wait(needing_stop) def all_components_running(self, *component_names): """ diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 5514a4cae..d145eedb1 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -1,13 +1,13 @@ import os import asyncio +import aiohttp import logging import treq -import json import math import binascii from hashlib import sha256 from types import SimpleNamespace -from twisted.internet import defer, threads, reactor, error, task +from twisted.internet import defer, reactor, error, task from aioupnp import __version__ as aioupnp_version from aioupnp.upnp import UPnP @@ -16,6 +16,7 @@ from aioupnp.fault import UPnPError import lbrynet.schema from lbrynet import conf +from lbrynet.extras.compat import d2f from lbrynet.blob.EncryptedFileManager import EncryptedFileManager from lbrynet.blob.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier @@ -27,7 +28,7 @@ from lbrynet.extras.daemon.HashAnnouncer import DHTHashAnnouncer from lbrynet.extras.reflector.server.server import ReflectorServerFactory from lbrynet.extras.wallet import LbryWalletManager from lbrynet.extras.wallet import Network -from lbrynet.utils import DeferredDict, generate_id +from lbrynet.utils import generate_id from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.p2p.RateLimiter import RateLimiter from lbrynet.p2p.BlobManager import DiskBlobManager @@ -55,22 +56,23 @@ RATE_LIMITER_COMPONENT = "rate_limiter" PAYMENT_RATE_COMPONENT = "payment_rate_manager" -def from_future(coroutine: asyncio.coroutine) -> defer.Deferred: - return defer.Deferred.fromFuture(asyncio.ensure_future(coroutine)) +async def gather_dict(tasks: dict): + async def wait_value(key, value): + return key, await value + return dict(await asyncio.gather(*( + wait_value(*kv) for kv in tasks.items() + ))) -@defer.inlineCallbacks -def get_external_ip(): # used if upnp is disabled or non-functioning +async def get_external_ip(): # used if upnp is disabled or non-functioning try: - buf = [] - response = yield treq.get("https://api.lbry.io/ip") - yield treq.collect(response, buf.append) - parsed = json.loads(b"".join(buf).decode()) - if parsed['success']: - return parsed['data']['ip'] - return - except Exception as err: - return + async with aiohttp.ClientSession() as session: + async with session.get("https://api.lbry.io/ip") as resp: + response = await resp.json() + if response['success']: + return response['data']['ip'] + except Exception as e: + pass class DatabaseComponent(Component): @@ -97,8 +99,7 @@ class DatabaseComponent(Component): with open(conf.settings.get_db_revision_filename(), mode='w') as db_revision: db_revision.write(str(version_num)) - @defer.inlineCallbacks - def start(self): + async def start(self): # check directories exist, create them if they don't log.info("Loading databases") @@ -117,19 +118,19 @@ class DatabaseComponent(Component): if old_revision < self.get_current_db_revision(): from lbrynet.extras.daemon.migrator import dbmigrator log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision()) - yield threads.deferToThread( - dbmigrator.migrate_db, conf.settings.data_dir, old_revision, self.get_current_db_revision() + await asyncio.get_event_loop().run_in_executor( + None, dbmigrator.migrate_db, conf.settings.data_dir, old_revision, self.get_current_db_revision() ) self._write_db_revision_file(self.get_current_db_revision()) log.info("Finished upgrading the databases.") - # start SQLiteStorage - self.storage = SQLiteStorage(conf.settings.data_dir) - yield self.storage.setup() + self.storage = SQLiteStorage( + os.path.join(conf.settings.data_dir, "lbrynet.sqlite") + ) + await self.storage.open() - @defer.inlineCallbacks - def stop(self): - yield self.storage.stop() + async def stop(self): + await self.storage.close() self.storage = None @@ -250,33 +251,24 @@ class HeadersComponent(Component): with open(self.headers_file, "rb+") as headers_file: headers_file.truncate(checksum_length_in_bytes) - @defer.inlineCallbacks - def start(self): + async def start(self): conf.settings.ensure_wallet_dir() if not os.path.exists(self.headers_dir): os.mkdir(self.headers_dir) if os.path.exists(self.old_file): log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file) os.rename(self.old_file, self.headers_file) - self._downloading_headers = yield f2d(self.should_download_headers_from_s3()) + self._downloading_headers = await self.should_download_headers_from_s3() if self._downloading_headers: try: - yield self.fetch_headers_from_s3() + await d2f(self.fetch_headers_from_s3()) except Exception as err: log.error("failed to fetch headers from s3: %s", err) finally: self._downloading_headers = False - def stop(self): - return defer.succeed(None) - - -def d2f(deferred): - return deferred.asFuture(asyncio.get_event_loop()) - - -def f2d(future): - return defer.Deferred.fromFuture(asyncio.ensure_future(future)) + async def stop(self): + pass class WalletComponent(Component): @@ -304,19 +296,17 @@ class WalletComponent(Component): 'is_locked': not self.wallet_manager.is_wallet_unlocked, } - @defer.inlineCallbacks - def start(self): + async def start(self): conf.settings.ensure_wallet_dir() log.info("Starting torba wallet") storage = self.component_manager.get_component(DATABASE_COMPONENT) lbrynet.schema.BLOCKCHAIN_NAME = conf.settings['blockchain_name'] - self.wallet_manager = yield f2d(LbryWalletManager.from_lbrynet_config(conf.settings, storage)) + self.wallet_manager = await LbryWalletManager.from_lbrynet_config(conf.settings, storage) self.wallet_manager.old_db = storage - yield f2d(self.wallet_manager.start()) + await self.wallet_manager.start() - @defer.inlineCallbacks - def stop(self): - yield f2d(self.wallet_manager.stop()) + async def stop(self): + await self.wallet_manager.stop() self.wallet_manager = None @@ -345,14 +335,11 @@ class BlobComponent(Component): def stop(self): return self.blob_manager.stop() - @defer.inlineCallbacks - def get_status(self): + async def get_status(self): count = 0 if self.blob_manager: - count = yield self.blob_manager.storage.count_finished_blobs() - defer.returnValue({ - 'finished_blobs': count - }) + count = await self.blob_manager.storage.count_finished_blobs() + return {'finished_blobs': count} class DHTComponent(Component): @@ -376,8 +363,7 @@ class DHTComponent(Component): 'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.contacts) } - @defer.inlineCallbacks - def start(self): + async def start(self): self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", conf.settings["peer_port"]) self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", conf.settings["dht_node_port"]) @@ -387,7 +373,7 @@ class DHTComponent(Component): external_ip = self.upnp_component.external_ip if not external_ip: log.warning("UPnP component failed to get external ip") - external_ip = yield get_external_ip() + external_ip = await get_external_ip() if not external_ip: log.warning("failed to get external ip") @@ -399,12 +385,11 @@ class DHTComponent(Component): peerPort=self.external_peer_port ) - yield self.dht_node.start(conf.settings['known_dht_nodes'], block_on_join=False) + await d2f(self.dht_node.start(conf.settings['known_dht_nodes'], block_on_join=False)) log.info("Started the dht") - @defer.inlineCallbacks def stop(self): - yield self.dht_node.stop() + return d2f(self.dht_node.stop()) class HashAnnouncerComponent(Component): @@ -419,16 +404,14 @@ class HashAnnouncerComponent(Component): def component(self): return self.hash_announcer - @defer.inlineCallbacks def start(self): storage = self.component_manager.get_component(DATABASE_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT) self.hash_announcer = DHTHashAnnouncer(dht_node, storage) - yield self.hash_announcer.start() + self.hash_announcer.start() - @defer.inlineCallbacks def stop(self): - yield self.hash_announcer.stop() + self.hash_announcer.stop() def get_status(self): return { @@ -447,13 +430,11 @@ class RateLimiterComponent(Component): def component(self): return self.rate_limiter - def start(self): + async def start(self): self.rate_limiter.start() - return defer.succeed(None) - def stop(self): + async def stop(self): self.rate_limiter.stop() - return defer.succeed(None) class PaymentRateComponent(Component): @@ -467,11 +448,11 @@ class PaymentRateComponent(Component): def component(self): return self.payment_rate_manager - def start(self): - return defer.succeed(None) + async def start(self): + pass - def stop(self): - return defer.succeed(None) + async def stop(self): + pass class FileManagerComponent(Component): @@ -494,7 +475,6 @@ class FileManagerComponent(Component): 'managed_files': len(self.file_manager.lbry_files) } - @defer.inlineCallbacks def start(self): rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT) blob_manager = self.component_manager.get_component(BLOB_COMPONENT) @@ -511,18 +491,16 @@ class FileManagerComponent(Component): wallet, conf.settings.download_dir ) - yield sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory) + sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory) payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT) log.info('Starting the file manager') self.file_manager = EncryptedFileManager(self.component_manager.peer_finder, rate_limiter, blob_manager, wallet, payment_rate_manager, storage, sd_identifier) - yield self.file_manager.setup() - log.info('Done setting up file manager') + return self.file_manager.setup() - @defer.inlineCallbacks def stop(self): - yield self.file_manager.stop() + return d2f(self.file_manager.stop()) class PeerProtocolServerComponent(Component): @@ -538,8 +516,7 @@ class PeerProtocolServerComponent(Component): def component(self): return self.lbry_server_port - @defer.inlineCallbacks - def start(self): + async def start(self): wallet = self.component_manager.get_component(WALLET_COMPONENT) upnp = self.component_manager.get_component(UPNP_COMPONENT) peer_port = conf.settings['peer_port'] @@ -562,7 +539,7 @@ class PeerProtocolServerComponent(Component): try: log.info("Peer protocol listening on TCP %i (ext port %i)", peer_port, upnp.upnp_redirects.get("TCP", peer_port)) - self.lbry_server_port = yield reactor.listenTCP(peer_port, server_factory) + self.lbry_server_port = await d2f(reactor.listenTCP(peer_port, server_factory)) except error.CannotListenError as e: import traceback log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" @@ -570,12 +547,11 @@ class PeerProtocolServerComponent(Component): log.error("%s", traceback.format_exc()) raise ValueError("%s lbrynet may already be running on your computer." % str(e)) - @defer.inlineCallbacks - def stop(self): + async def stop(self): if self.lbry_server_port is not None: self.lbry_server_port, old_port = None, self.lbry_server_port log.info('Stop listening on port %s', old_port.port) - yield old_port.stopListening() + await d2f(old_port.stopListening()) class ReflectorComponent(Component): @@ -591,25 +567,23 @@ class ReflectorComponent(Component): def component(self): return self.reflector_server - @defer.inlineCallbacks - def start(self): + async def start(self): log.info("Starting reflector server") blob_manager = self.component_manager.get_component(BLOB_COMPONENT) file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) reflector_factory = ReflectorServerFactory(self.component_manager.peer_manager, blob_manager, file_manager) try: - self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory) + self.reflector_server = await d2f(reactor.listenTCP(self.reflector_server_port, reflector_factory)) log.info('Started reflector on port %s', self.reflector_server_port) except error.CannotListenError as e: log.exception("Couldn't bind reflector to port %d", self.reflector_server_port) raise ValueError(f"{e} lbrynet may already be running on your computer.") - @defer.inlineCallbacks - def stop(self): + async def stop(self): if self.reflector_server is not None: log.info("Stopping reflector server") self.reflector_server, p = None, self.reflector_server - yield p.stopListening + await d2f(p.stopListening()) class UPnPComponent(Component): @@ -630,22 +604,11 @@ class UPnPComponent(Component): def component(self): return self - @defer.inlineCallbacks - def _setup_redirects(self): - d = {} - if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components: - d["TCP"] = from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")) - if DHT_COMPONENT not in self.component_manager.skip_components: - d["UDP"] = from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")) - upnp_redirects = yield DeferredDict(d) - self.upnp_redirects.update(upnp_redirects) - - @defer.inlineCallbacks - def _maintain_redirects(self): + async def _maintain_redirects(self): # setup the gateway if necessary if not self.upnp: try: - self.upnp = yield from_future(UPnP.discover()) + self.upnp = await UPnP.discover() log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string) except Exception as err: log.warning("upnp discovery failed: %s", err) @@ -655,7 +618,7 @@ class UPnPComponent(Component): external_ip = None if self.upnp: try: - external_ip = yield from_future(self.upnp.get_external_ip()) + external_ip = await self.upnp.get_external_ip() if external_ip != "0.0.0.0" and not self.external_ip: log.info("got external ip from UPnP: %s", external_ip) except (asyncio.TimeoutError, UPnPError): @@ -663,7 +626,7 @@ class UPnPComponent(Component): if external_ip == "0.0.0.0" or not external_ip: log.warning("unable to get external ip from UPnP, checking lbry.io fallback") - external_ip = yield get_external_ip() + external_ip = await get_external_ip() if self.external_ip and self.external_ip != external_ip: log.info("external ip changed from %s to %s", self.external_ip, external_ip) self.external_ip = external_ip @@ -674,10 +637,10 @@ class UPnPComponent(Component): log.info("add UPnP port mappings") d = {} if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components: - d["TCP"] = from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")) + d["TCP"] = self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port") if DHT_COMPONENT not in self.component_manager.skip_components: - d["UDP"] = from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")) - upnp_redirects = yield DeferredDict(d) + d["UDP"] = self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port") + upnp_redirects = await gather_dict(d) log.info("set up redirects: %s", upnp_redirects) self.upnp_redirects.update(upnp_redirects) except (asyncio.TimeoutError, UPnPError): @@ -685,7 +648,7 @@ class UPnPComponent(Component): return self._maintain_redirects() elif self.upnp: # check existing redirects are still active found = set() - mappings = yield from_future(self.upnp.get_redirects()) + mappings = await self.upnp.get_redirects() for mapping in mappings: proto = mapping['NewProtocol'] if proto in self.upnp_redirects and mapping['NewExternalPort'] == self.upnp_redirects[proto]: @@ -693,18 +656,14 @@ class UPnPComponent(Component): found.add(proto) if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components: try: - udp_port = yield from_future( - self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port") - ) + udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port") self.upnp_redirects['UDP'] = udp_port log.info("refreshed upnp redirect for dht port: %i", udp_port) except (asyncio.TimeoutError, UPnPError): del self.upnp_redirects['UDP'] if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components: try: - tcp_port = yield from_future( - self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port") - ) + tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port") self.upnp_redirects['TCP'] = tcp_port log.info("refreshed upnp redirect for peer port: %i", tcp_port) except (asyncio.TimeoutError, UPnPError): @@ -715,14 +674,13 @@ class UPnPComponent(Component): if self.upnp_redirects: log.debug("upnp redirects are still active") - @defer.inlineCallbacks - def start(self): + async def start(self): log.info("detecting external ip") if not self.use_upnp: - self.external_ip = yield get_external_ip() + self.external_ip = await get_external_ip() return success = False - yield self._maintain_redirects() + await self._maintain_redirects() if self.upnp: if not self.upnp_redirects and not all([x in self.component_manager.skip_components for x in (DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)]): @@ -736,13 +694,11 @@ class UPnPComponent(Component): self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, self.get_status()) self._maintain_redirects_lc.start(360, now=False) - def stop(self): - if self._maintain_redirects_lc.running: - self._maintain_redirects_lc.stop() - return defer.DeferredList( - [from_future(self.upnp.delete_port_mapping(port, protocol)) - for protocol, port in self.upnp_redirects.items()] - ) + async def stop(self): + if self.upnp_redirects: + await asyncio.wait([ + self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items() + ]) def get_status(self): return { @@ -766,10 +722,8 @@ class ExchangeRateManagerComponent(Component): def component(self): return self.exchange_rate_manager - @defer.inlineCallbacks - def start(self): - yield self.exchange_rate_manager.start() + async def start(self): + self.exchange_rate_manager.start() - @defer.inlineCallbacks - def stop(self): - yield self.exchange_rate_manager.stop() + async def stop(self): + self.exchange_rate_manager.stop() diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 7f08251f2..ed8dd03c6 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -7,7 +7,6 @@ from typing import Callable, Optional, List from operator import itemgetter from binascii import hexlify, unhexlify from copy import deepcopy -from twisted.internet import reactor from twisted.internet.task import LoopingCall from torba.client.baseaccount import SingleKey, HierarchicalDeterministic @@ -17,7 +16,7 @@ from lbrynet.dht.error import TimeoutError from lbrynet.blob.blob_file import is_valid_blobhash from lbrynet.extras import system_info from lbrynet.extras.reflector import reupload -from lbrynet.extras.daemon.Components import d2f, f2d +from lbrynet.extras.daemon.Components import d2f from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT from lbrynet.extras.daemon.Components import FILE_MANAGER_COMPONENT, RATE_LIMITER_COMPONENT from lbrynet.extras.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT, PAYMENT_RATE_COMPONENT, UPNP_COMPONENT @@ -48,22 +47,15 @@ from lbrynet.extras.daemon.json_response_encoder import JSONResponseEncoder import asyncio import logging -from urllib import parse as urlparse import json import inspect import signal from functools import wraps from twisted.internet import defer -from twisted.internet.defer import Deferred from twisted.python.failure import Failure -from twisted.internet.error import ConnectionDone, ConnectionLost -from txjsonrpc import jsonrpclib -from traceback import format_exc from lbrynet import utils -from lbrynet.p2p.Error import InvalidAuthenticationToken from lbrynet.extras.daemon.undecorated import undecorated -from twisted.web import server from lbrynet import conf from aiohttp import web @@ -376,14 +368,13 @@ class Daemon(metaclass=JSONRPCServerType): self.component_manager = component_manager or ComponentManager( analytics_manager=self.analytics_manager, skip_components=to_skip or [], - reactor=reactor ) self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).items()}) self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).items()} self._use_authentication = use_authentication or conf.settings['use_auth_http'] self._use_https = use_https or conf.settings['use_https'] self.listening_port = None - self._component_setup_deferred = None + self._component_setup_task = None self.announced_startup = False self.sessions = {} self.is_first_run = is_first_run() @@ -409,6 +400,8 @@ class Daemon(metaclass=JSONRPCServerType): self.app = web.Application() self.app.router.add_get('/lbryapi', self.handle_old_jsonrpc) + self.app.router.add_post('/lbryapi', self.handle_old_jsonrpc) + self.app.router.add_post('/', self.handle_old_jsonrpc) self.handler = self.app.make_handler() self.server = None @@ -423,32 +416,28 @@ class Daemon(metaclass=JSONRPCServerType): except OSError: log.error('lbrynet API failed to bind TCP %s:%i for listening. Daemon is already running or this port is ' 'already in use by another application.', conf.settings['api_host'], conf.settings['api_port']) - reactor.fireSystemEvent("shutdown") except defer.CancelledError: log.info("shutting down before finished starting") - reactor.fireSystemEvent("shutdown") except Exception as err: self.analytics_manager.send_server_startup_error(str(err)) log.exception('Failed to start lbrynet-daemon') - reactor.fireSystemEvent("shutdown") async def setup(self): log.info("Starting lbrynet-daemon") log.info("Platform: %s", json.dumps(system_info.get_platform())) - reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown) if not self.analytics_manager.is_started: self.analytics_manager.start() self.analytics_manager.send_server_startup() for lc_name, lc_time in self._looping_call_times.items(): self.looping_call_manager.start(lc_name, lc_time) - def update_attribute(setup_result, component): + def update_attribute(component): setattr(self, self.component_attributes[component.component_name], component.component) kwargs = {component: update_attribute for component in self.component_attributes.keys()} - self._component_setup_deferred = self.component_manager.setup(**kwargs) - await self._component_setup_deferred.asFuture(asyncio.get_event_loop()) + self._component_setup_task = self.component_manager.setup(**kwargs) + await self._component_setup_task log.info("Started lbrynet-daemon") @@ -463,7 +452,6 @@ class Daemon(metaclass=JSONRPCServerType): signal.signal(signal.SIGTERM, self._already_shutting_down) if self.listening_port: self.listening_port.stopListening() - self.looping_call_manager.shutdown() if self.server is not None: self.server.close() await self.server.wait_closed() @@ -473,15 +461,11 @@ class Daemon(metaclass=JSONRPCServerType): if self.analytics_manager: self.analytics_manager.shutdown() try: - self._component_setup_deferred.cancel() - except (AttributeError, defer.CancelledError): + self._component_setup_task.cancel() + except (AttributeError, asyncio.CancelledError): pass if self.component_manager is not None: - d = self.component_manager.stop() - d.addErrback(log.fail(), 'Failure while shutting down') - else: - d = defer.succeed(None) - return d + await self.component_manager.stop() async def handle_old_jsonrpc(self, request): data = await request.json() @@ -518,7 +502,9 @@ class Daemon(metaclass=JSONRPCServerType): log.warning(params_error_message) raise web.HTTPBadRequest(text=params_error_message) - result = await fn(self, *_args, **_kwargs) + result = fn(self, *_args, **_kwargs) + if asyncio.iscoroutine(result): + result = await result return web.Response( text=jsonrpc_dumps_pretty(result, ledger=self.ledger), @@ -596,7 +582,7 @@ class Daemon(metaclass=JSONRPCServerType): for sd_hash, stream in self.streams.items(): stream.cancel(reason="daemon shutdown") - def _download_blob(self, blob_hash, rate_manager=None, timeout=None): + async def _download_blob(self, blob_hash, rate_manager=None, timeout=None): """ Download a blob @@ -615,13 +601,12 @@ class Daemon(metaclass=JSONRPCServerType): blob_hash, self.blob_manager, self.component_manager.peer_finder, self.rate_limiter, rate_manager, self.wallet_manager, timeout ) - return downloader.download() + return await d2f(downloader.download()) - @defer.inlineCallbacks - def _get_stream_analytics_report(self, claim_dict): + async def _get_stream_analytics_report(self, claim_dict): sd_hash = claim_dict.source_hash.decode() try: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash) except Exception: stream_hash = None report = { @@ -630,12 +615,12 @@ class Daemon(metaclass=JSONRPCServerType): } blobs = {} try: - sd_host = yield self.blob_manager.get_host_downloaded_from(sd_hash) + sd_host = await d2f(self.blob_manager.get_host_downloaded_from(sd_hash)) except Exception: sd_host = None report["sd_blob"] = sd_host if stream_hash: - blob_infos = yield self.storage.get_blobs_for_stream(stream_hash) + blob_infos = await self.storage.get_blobs_for_stream(stream_hash) report["known_blobs"] = len(blob_infos) else: blob_infos = [] @@ -648,32 +633,28 @@ class Daemon(metaclass=JSONRPCServerType): # if host: # blobs[blob_num] = host # report["blobs"] = json.dumps(blobs) - defer.returnValue(report) + return report - @defer.inlineCallbacks - def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None): + async def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file """ - @defer.inlineCallbacks - def _download_finished(download_id, name, claim_dict): - report = yield self._get_stream_analytics_report(claim_dict) + async def _download_finished(download_id, name, claim_dict): + report = await self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) self.analytics_manager.send_new_download_success(download_id, name, claim_dict) - @defer.inlineCallbacks - def _download_failed(error, download_id, name, claim_dict): - report = yield self._get_stream_analytics_report(claim_dict) + async def _download_failed(error, download_id, name, claim_dict): + report = await self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, report) self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error) if sd_hash in self.streams: downloader = self.streams[sd_hash] - result = yield downloader.finished_deferred - defer.returnValue(result) + return await d2f(downloader.finished_deferred) else: download_id = utils.random_string() self.analytics_manager.send_download_started(download_id, name, claim_dict) @@ -685,26 +666,26 @@ class Daemon(metaclass=JSONRPCServerType): timeout ) try: - lbry_file, finished_deferred = yield self.streams[sd_hash].start( + lbry_file, finished_deferred = await d2f(self.streams[sd_hash].start( claim_dict, name, txid, nout, file_name - ) + )) finished_deferred.addCallbacks( lambda _: _download_finished(download_id, name, claim_dict), lambda e: _download_failed(e, download_id, name, claim_dict) ) - result = yield self._get_lbry_file_dict(lbry_file) + result = await self._get_lbry_file_dict(lbry_file) except Exception as err: - yield _download_failed(err, download_id, name, claim_dict) + await _download_failed(err, download_id, name, claim_dict) if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)): log.warning('Failed to get %s (%s)', name, err) else: log.error('Failed to get %s (%s)', name, err) if self.streams[sd_hash].downloader and self.streams[sd_hash].code != 'running': - yield self.streams[sd_hash].downloader.stop(err) + await d2f(self.streams[sd_hash].downloader.stop(err)) result = {'error': str(err)} finally: del self.streams[sd_hash] - defer.returnValue(result) + return result async def _publish_stream(self, account, name, bid, claim_dict, file_path=None, certificate=None, claim_address=None, change_address=None): @@ -714,8 +695,8 @@ class Daemon(metaclass=JSONRPCServerType): ) parse_lbry_uri(name) if not file_path: - stream_hash = await d2f(self.storage.get_stream_hash_for_sd_hash( - claim_dict['stream']['source']['source'])) + stream_hash = await self.storage.get_stream_hash_for_sd_hash( + claim_dict['stream']['source']['source']) tx = await publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address) else: tx = await publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address) @@ -844,8 +825,7 @@ class Daemon(metaclass=JSONRPCServerType): return self.get_est_cost_using_known_size(uri, size) return self.get_est_cost_from_uri(uri) - @defer.inlineCallbacks - def _get_lbry_file_dict(self, lbry_file): + async def _get_lbry_file_dict(self, lbry_file): key = hexlify(lbry_file.key) if lbry_file.key else None full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name) mime_type = guess_mime_type(lbry_file.file_name) @@ -856,13 +836,13 @@ class Daemon(metaclass=JSONRPCServerType): else: written_bytes = 0 - size = yield lbry_file.get_total_bytes() - file_status = yield lbry_file.status() + size = await d2f(lbry_file.get_total_bytes()) + file_status = await d2f(lbry_file.status()) num_completed = file_status.num_completed num_known = file_status.num_known status = file_status.running_status - result = { + return { 'completed': lbry_file.completed, 'file_name': lbry_file.file_name, 'download_directory': lbry_file.download_directory, @@ -889,10 +869,8 @@ class Daemon(metaclass=JSONRPCServerType): 'channel_name': lbry_file.channel_name, 'claim_name': lbry_file.claim_name } - defer.returnValue(result) - @defer.inlineCallbacks - def _get_lbry_file(self, search_by, val, return_json=False): + async def _get_lbry_file(self, search_by, val, return_json=False): lbry_file = None if search_by in FileID: for l_f in self.file_manager.lbry_files: @@ -902,11 +880,10 @@ class Daemon(metaclass=JSONRPCServerType): else: raise NoValidSearch(f'{search_by} is not a valid search operation') if return_json and lbry_file: - lbry_file = yield self._get_lbry_file_dict(lbry_file) - defer.returnValue(lbry_file) + lbry_file = await self._get_lbry_file_dict(lbry_file) + return lbry_file - @defer.inlineCallbacks - def _get_lbry_files(self, return_json=False, **kwargs): + async def _get_lbry_files(self, return_json=False, **kwargs): lbry_files = list(self.file_manager.lbry_files) if kwargs: for search_type, value in iter_lbry_file_search_values(kwargs): @@ -914,11 +891,11 @@ class Daemon(metaclass=JSONRPCServerType): if return_json: file_dicts = [] for lbry_file in lbry_files: - lbry_file_dict = yield self._get_lbry_file_dict(lbry_file) + lbry_file_dict = await self._get_lbry_file_dict(lbry_file) file_dicts.append(lbry_file_dict) lbry_files = file_dicts log.debug("Collected %i lbry files", len(lbry_files)) - defer.returnValue(lbry_files) + return lbry_files def _sort_lbry_files(self, lbry_files, sort_by): for field, direction in sort_by: @@ -946,18 +923,15 @@ class Daemon(metaclass=JSONRPCServerType): downloader.setup(self.wallet_manager) return downloader - @defer.inlineCallbacks - def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None): + async def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None): if not downloader: downloader = self._get_single_peer_downloader() - result = {} search_timeout = search_timeout or conf.settings['peer_search_timeout'] blob_timeout = blob_timeout or conf.settings['sd_download_timeout'] - is_available = False reachable_peers = [] unreachable_peers = [] try: - peers = yield self.jsonrpc_peer_list(blob_hash, search_timeout) + peers = await d2f(self.jsonrpc_peer_list(blob_hash, search_timeout)) peer_infos = [{"peer": Peer(x['host'], x['port']), "blob_hash": blob_hash, "timeout": blob_timeout} for x in peers] @@ -968,8 +942,7 @@ class Daemon(metaclass=JSONRPCServerType): d = downloader.download_temp_blob_from_peer(**peer_info) dl.append(d) dl_peers.append("%s:%i" % (peer_info['peer'].host, peer_info['peer'].port)) - for dl_peer, (success, download_result) in zip(dl_peers, - (yield defer.DeferredList(dl))): + for dl_peer, (success, download_result) in zip(dl_peers, (await d2f(defer.DeferredList(dl)))): if success: if download_result: reachable_peers.append(dl_peer) @@ -978,14 +951,13 @@ class Daemon(metaclass=JSONRPCServerType): dl_results.append(download_result) is_available = any(dl_results) except Exception as err: - result['error'] = "Failed to get peers for blob: %s" % err + return {'error': "Failed to get peers for blob: %s" % err} - response = { + return { 'is_available': is_available, 'reachable_peers': reachable_peers, 'unreachable_peers': unreachable_peers, } - defer.returnValue(response) ############################################################################ # # @@ -1011,11 +983,9 @@ class Daemon(metaclass=JSONRPCServerType): (string) Shutdown message """ log.info("Shutting down lbrynet daemon") - reactor.callLater(0.1, reactor.fireSystemEvent, "shutdown") return "Shutting down" - @defer.inlineCallbacks - def jsonrpc_status(self): + async def jsonrpc_status(self): """ Get daemon status @@ -1101,10 +1071,10 @@ class Daemon(metaclass=JSONRPCServerType): }, } for component in self.component_manager.components: - status = yield defer.maybeDeferred(component.get_status) + status = await d2f(defer.maybeDeferred(component.get_status)) if status: response[component.component_name] = status - defer.returnValue(response) + return response def jsonrpc_version(self): """ @@ -1131,10 +1101,9 @@ class Daemon(metaclass=JSONRPCServerType): 'python_version': (str) python version, } """ - platform_info = system_info.get_platform() log.info("Get version info: " + json.dumps(platform_info)) - return self._render_response(platform_info) + return platform_info def jsonrpc_report_bug(self, message=None): """ @@ -1157,7 +1126,7 @@ class Daemon(metaclass=JSONRPCServerType): platform_name, __version__ ) - return self._render_response(True) + return True def jsonrpc_settings_get(self): """ @@ -1173,7 +1142,7 @@ class Daemon(metaclass=JSONRPCServerType): (dict) Dictionary of daemon settings See ADJUSTABLE_SETTINGS in lbrynet/conf.py for full list of settings """ - return self._render_response(conf.settings.get_adjustable_settings_dict()) + return conf.settings.get_adjustable_settings_dict() def jsonrpc_settings_set(self, **kwargs): """ @@ -1260,7 +1229,7 @@ class Daemon(metaclass=JSONRPCServerType): conf.settings.update({key: converted}, data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) conf.settings.save_conf_file_settings() - return self._render_response(conf.settings.get_adjustable_settings_dict()) + return conf.settings.get_adjustable_settings_dict() def jsonrpc_help(self, command=None): """ @@ -1277,13 +1246,13 @@ class Daemon(metaclass=JSONRPCServerType): """ if command is None: - return self._render_response({ + return { 'about': 'This is the LBRY JSON-RPC API', 'command_help': 'Pass a `command` parameter to this method to see ' + 'help for that command (e.g. `help command=resolve_name`)', 'command_list': 'Get a full list of commands using the `commands` method', 'more_info': 'Visit https://lbry.io/api for more info', - }) + } fn = self.callable_methods.get(command) if fn is None: @@ -1291,9 +1260,9 @@ class Daemon(metaclass=JSONRPCServerType): f"No help available for '{command}'. It is not a valid command." ) - return self._render_response({ + return { 'help': textwrap.dedent(fn.__doc__ or '') - }) + } def jsonrpc_commands(self): """ @@ -1308,7 +1277,7 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (list) list of available commands """ - return self._render_response(sorted([command for command in self.callable_methods.keys()])) + return sorted([command for command in self.callable_methods.keys()]) @deprecated("account_balance") def jsonrpc_wallet_balance(self, address=None): @@ -1882,8 +1851,7 @@ class Daemon(metaclass=JSONRPCServerType): return self.get_account_or_default(account_id).receiving.get_or_create_usable_address() @requires(FILE_MANAGER_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_file_list(self, sort=None, **kwargs): + async def jsonrpc_file_list(self, sort=None, **kwargs): """ List files limited by optional filters @@ -1943,13 +1911,11 @@ class Daemon(metaclass=JSONRPCServerType): }, ] """ - - result = yield self._get_lbry_files(return_json=True, **kwargs) + result = await self._get_lbry_files(return_json=True, **kwargs) if sort: sort_by = [self._parse_lbry_files_sort(s) for s in sort] result = self._sort_lbry_files(result, sort_by) - response = yield self._render_response(result) - defer.returnValue(response) + return result @requires(WALLET_COMPONENT) async def jsonrpc_resolve_name(self, name, force=False): @@ -2179,7 +2145,7 @@ class Daemon(metaclass=JSONRPCServerType): log.info("Already waiting on lbry://%s to start downloading", name) await d2f(self.streams[sd_hash].data_downloading_deferred) - lbry_file = await d2f(self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)) + lbry_file = await self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) if lbry_file: if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)): @@ -2188,15 +2154,14 @@ class Daemon(metaclass=JSONRPCServerType): await d2f(lbry_file.start()) else: log.info('Already have a file for %s', name) - result = await d2f(self._get_lbry_file_dict(lbry_file)) + result = await self._get_lbry_file_dict(lbry_file) else: - result = await d2f(self._download_name(name, claim_dict, sd_hash, txid, nout, - timeout=timeout, file_name=file_name)) + result = await self._download_name(name, claim_dict, sd_hash, txid, nout, + timeout=timeout, file_name=file_name) return result @requires(FILE_MANAGER_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_file_set_status(self, status, **kwargs): + async def jsonrpc_file_set_status(self, status, **kwargs): """ Start or stop downloading a file @@ -2220,24 +2185,22 @@ class Daemon(metaclass=JSONRPCServerType): raise Exception('Status must be "start" or "stop".') search_type, value = get_lbry_file_search_value(kwargs) - lbry_file = yield self._get_lbry_file(search_type, value, return_json=False) + lbry_file = await self._get_lbry_file(search_type, value, return_json=False) if not lbry_file: raise Exception(f'Unable to find a file for {search_type}:{value}') if status == 'start' and lbry_file.stopped or status == 'stop' and not lbry_file.stopped: - yield self.file_manager.toggle_lbry_file_running(lbry_file) + await d2f(self.file_manager.toggle_lbry_file_running(lbry_file)) msg = "Started downloading file" if status == 'start' else "Stopped downloading file" else: msg = ( "File was already being downloaded" if status == 'start' else "File was already stopped" ) - response = yield self._render_response(msg) - defer.returnValue(response) + return msg @requires(FILE_MANAGER_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): + async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): """ Delete a LBRY file @@ -2268,33 +2231,28 @@ class Daemon(metaclass=JSONRPCServerType): (bool) true if deletion was successful """ - lbry_files = yield self._get_lbry_files(return_json=False, **kwargs) + lbry_files = await self._get_lbry_files(return_json=False, **kwargs) if len(lbry_files) > 1: if not delete_all: log.warning("There are %i files to delete, use narrower filters to select one", len(lbry_files)) - response = yield self._render_response(False) - defer.returnValue(response) + return False else: log.warning("Deleting %i files", len(lbry_files)) if not lbry_files: log.warning("There is no file to delete") - result = False + return False else: for lbry_file in lbry_files: file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash if lbry_file.sd_hash in self.streams: del self.streams[lbry_file.sd_hash] - yield self.file_manager.delete_lbry_file(lbry_file, - delete_file=delete_from_download_dir) + await d2f(self.file_manager.delete_lbry_file(lbry_file, delete_file=delete_from_download_dir)) log.info("Deleted file: %s", file_name) - result = True - - response = yield self._render_response(result) - defer.returnValue(response) + return True @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, @@ -2397,8 +2355,7 @@ class Daemon(metaclass=JSONRPCServerType): ) @requires(WALLET_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_channel_export(self, claim_id): + async def jsonrpc_channel_export(self, claim_id): """ Export serialized channel signing information for a given certificate claim id @@ -2412,12 +2369,10 @@ class Daemon(metaclass=JSONRPCServerType): (str) Serialized certificate information """ - result = yield self.wallet_manager.export_certificate_info(claim_id) - defer.returnValue(result) + return await self.wallet_manager.export_certificate_info(claim_id) @requires(WALLET_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_channel_import(self, serialized_certificate_info): + async def jsonrpc_channel_import(self, serialized_certificate_info): """ Import serialized channel signing information (to allow signing new claims to the channel) @@ -2431,8 +2386,7 @@ class Daemon(metaclass=JSONRPCServerType): (dict) Result dictionary """ - result = yield self.wallet_manager.import_certificate_info(serialized_certificate_info) - defer.returnValue(result) + return await self.wallet_manager.import_certificate_info(serialized_certificate_info) @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @@ -3120,8 +3074,7 @@ class Daemon(metaclass=JSONRPCServerType): @requires(WALLET_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - @defer.inlineCallbacks - def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): + async def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): """ Download and return a blob @@ -3150,7 +3103,9 @@ class Daemon(metaclass=JSONRPCServerType): } timeout = timeout or 30 - blob = yield self._download_blob(blob_hash, rate_manager=self.payment_rate_manager, timeout=timeout) + blob = await self._download_blob( + blob_hash, rate_manager=self.payment_rate_manager, timeout=timeout + ) if encoding and encoding in decoders: blob_file = blob.open_for_reading() result = decoders[encoding](blob_file.read()) @@ -3161,8 +3116,7 @@ class Daemon(metaclass=JSONRPCServerType): return result @requires(BLOB_COMPONENT, DATABASE_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_blob_delete(self, blob_hash): + async def jsonrpc_blob_delete(self, blob_hash): """ Delete a blob @@ -3179,16 +3133,15 @@ class Daemon(metaclass=JSONRPCServerType): if blob_hash not in self.blob_manager.blobs: return "Don't have that blob" try: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(blob_hash) - yield self.storage.delete_stream(stream_hash) + stream_hash = await self.storage.get_stream_hash_for_sd_hash(blob_hash) + await self.storage.delete_stream(stream_hash) except Exception as err: pass - yield self.blob_manager.delete_blobs([blob_hash]) + await d2f(self.blob_manager.delete_blobs([blob_hash])) return "Deleted %s" % blob_hash @requires(DHT_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_peer_list(self, blob_hash, timeout=None): + async def jsonrpc_peer_list(self, blob_hash, timeout=None): """ Get peers for blob hash @@ -3214,7 +3167,7 @@ class Daemon(metaclass=JSONRPCServerType): finished_deferred.addTimeout(timeout or conf.settings['peer_search_timeout'], self.dht_node.clock) finished_deferred.addErrback(trap_timeout) - peers = yield finished_deferred + peers = await d2f(finished_deferred) results = [ { "node_id": hexlify(node_id).decode(), @@ -3226,8 +3179,7 @@ class Daemon(metaclass=JSONRPCServerType): return results @requires(DATABASE_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): + async def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): """ Announce blobs to the DHT @@ -3245,7 +3197,6 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (bool) true if successful """ - blob_hashes = [] if blob_hash: blob_hashes.append(blob_hash) @@ -3253,17 +3204,16 @@ class Daemon(metaclass=JSONRPCServerType): if sd_hash and stream_hash: raise Exception("either the sd hash or the stream hash should be provided, not both") if sd_hash: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) - blobs = yield self.storage.get_blobs_for_stream(stream_hash, only_completed=True) + stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash) + blobs = await self.storage.get_blobs_for_stream(stream_hash, only_completed=True) blob_hashes.extend(blob.blob_hash for blob in blobs if blob.blob_hash is not None) else: raise Exception('single argument must be specified') - yield self.storage.should_single_announce_blobs(blob_hashes, immediate=True) + await self.storage.should_single_announce_blobs(blob_hashes, immediate=True) return True @requires(FILE_MANAGER_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_file_reflect(self, **kwargs): + async def jsonrpc_file_reflect(self, **kwargs): """ Reflect all the blobs in a file matching the filter criteria @@ -3284,22 +3234,17 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (list) list of blobs reflected """ - - reflector_server = kwargs.get('reflector', None) - lbry_files = yield self._get_lbry_files(**kwargs) - + lbry_files = await self._get_lbry_files(**kwargs) if len(lbry_files) > 1: raise Exception('Too many (%i) files found, need one' % len(lbry_files)) elif not lbry_files: raise Exception('No file found') - lbry_file = lbry_files[0] - - results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) - return results + return await d2f(reupload.reflect_file( + lbry_files[0], reflector_server=kwargs.get('reflector', None) + )) @requires(BLOB_COMPONENT, WALLET_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, + async def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, finished=None, page_size=None, page=None): """ Returns blob hashes. If not given filters, returns all blobs known by the blob manager @@ -3325,20 +3270,20 @@ class Daemon(metaclass=JSONRPCServerType): """ if uri or stream_hash or sd_hash: if uri: - metadata = (yield f2d(self.wallet_manager.resolve(uri)))[uri] + metadata = (await self.wallet_manager.resolve(uri))[uri] sd_hash = utils.get_sd_hash(metadata) - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash) elif stream_hash: - sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) + sd_hash = await self.storage.get_sd_blob_hash_for_stream(stream_hash) elif sd_hash: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) - sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) + stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash) + sd_hash = await self.storage.get_sd_blob_hash_for_stream(stream_hash) if stream_hash: - crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash) - blobs = yield defer.gatherResults([ + crypt_blobs = await self.storage.get_blobs_for_stream(stream_hash) + blobs = await d2f(defer.gatherResults([ self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None - ]) + ])) else: blobs = [] # get_blobs_for_stream does not include the sd blob, so we'll add it manually @@ -3373,13 +3318,10 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (list) reflected blob hashes """ - - d = reupload.reflect_blob_hashes(blob_hashes, self.blob_manager, reflector_server) - d.addCallback(lambda r: self._render_response(r)) - return d + return d2f(reupload.reflect_blob_hashes(blob_hashes, self.blob_manager, reflector_server)) @requires(BLOB_COMPONENT) - def jsonrpc_blob_reflect_all(self): + async def jsonrpc_blob_reflect_all(self): """ Reflects all saved blobs @@ -3392,15 +3334,11 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (bool) true if successful """ - - d = self.blob_manager.get_all_verified_blobs() - d.addCallback(reupload.reflect_blob_hashes, self.blob_manager) - d.addCallback(lambda r: self._render_response(r)) - return d + blob_hashes = await d2f(self.blob_manager.get_all_verified_blobs()) + return await d2f(reupload.reflect_blob_hashes(blob_hashes, self.blob_manager)) @requires(DHT_COMPONENT) - @defer.inlineCallbacks - def jsonrpc_peer_ping(self, node_id, address=None, port=None): + async def jsonrpc_peer_ping(self, node_id, address=None, port=None): """ Send a kademlia ping to the specified peer. If address and port are provided the peer is directly pinged, if not provided the peer is located first. @@ -3416,7 +3354,6 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (str) pong, or {'error': } if an error is encountered """ - contact = None if node_id and address and port: contact = self.dht_node.contact_manager.get_contact(unhexlify(node_id), address, int(port)) @@ -3426,16 +3363,15 @@ class Daemon(metaclass=JSONRPCServerType): ) if not contact: try: - contact = yield self.dht_node.findContact(unhexlify(node_id)) + contact = await d2f(self.dht_node.findContact(unhexlify(node_id))) except TimeoutError: return {'error': 'timeout finding peer'} if not contact: return {'error': 'peer not found'} try: - result = (yield contact.ping()).decode() + return (await d2f(contact.ping())).decode() except TimeoutError: - result = {'error': 'ping timeout'} - return result + return {'error': 'ping timeout'} @requires(DHT_COMPONENT) def jsonrpc_routing_table_get(self): @@ -3495,7 +3431,7 @@ class Daemon(metaclass=JSONRPCServerType): result['contacts'] = list(contact_set) result['blob_hashes'] = list(blob_hashes) result['node_id'] = hexlify(self.dht_node.node_id).decode() - return self._render_response(result) + return result # the single peer downloader needs wallet access @requires(DHT_COMPONENT, WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @@ -3520,12 +3456,10 @@ class Daemon(metaclass=JSONRPCServerType): "unreachable_peers": [":"] } """ - return self._blob_availability(blob_hash, search_timeout, blob_timeout) @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - @defer.inlineCallbacks - def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): + async def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): """ Get stream availability for lbry uri @@ -3576,26 +3510,26 @@ class Daemon(metaclass=JSONRPCServerType): } try: - resolved_result = (yield self.wallet_manager.resolve(uri))[uri] + resolved_result = (await self.wallet_manager.resolve(uri))[uri] response['did_resolve'] = True except UnknownNameError: response['error'] = "Failed to resolve name" - defer.returnValue(response) + return response except URIParseError: response['error'] = "Invalid URI" - defer.returnValue(response) + return response try: claim_obj = smart_decode(resolved_result[uri]['claim']['hex']) response['did_decode'] = True except DecodeError: response['error'] = "Failed to decode claim value" - defer.returnValue(response) + return response response['is_stream'] = claim_obj.is_stream if not claim_obj.is_stream: response['error'] = "Claim for \"%s\" does not contain a stream" % uri - defer.returnValue(response) + return response sd_hash = claim_obj.source_hash response['sd_hash'] = sd_hash @@ -3603,28 +3537,23 @@ class Daemon(metaclass=JSONRPCServerType): downloader = self._get_single_peer_downloader() have_sd_blob = sd_hash in self.blob_manager.blobs try: - sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout, - encoding="json") + sd_blob = await self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout, encoding="json") if not have_sd_blob: - yield self.jsonrpc_blob_delete(sd_hash) + await self.jsonrpc_blob_delete(sd_hash) if sd_blob and 'blobs' in sd_blob: response['num_blobs_in_stream'] = len(sd_blob['blobs']) - 1 head_blob_hash = sd_blob['blobs'][0]['blob_hash'] - head_blob_availability = yield self._blob_availability(head_blob_hash, - search_timeout, - blob_timeout, - downloader) + head_blob_availability = await self._blob_availability( + head_blob_hash, search_timeout, blob_timeout, downloader) response['head_blob_availability'] = head_blob_availability except Exception as err: response['error'] = err response['head_blob_hash'] = head_blob_hash - response['sd_blob_availability'] = yield self._blob_availability(sd_hash, - search_timeout, - blob_timeout, - downloader) + response['sd_blob_availability'] = await self._blob_availability( + sd_hash, search_timeout, blob_timeout, downloader) response['is_available'] = response['sd_blob_availability'].get('is_available') and \ response['head_blob_availability'].get('is_available') - defer.returnValue(response) + return response async def get_channel_or_error( self, accounts: List[LBCAccount], channel_id: str = None, channel_name: str = None): diff --git a/lbrynet/extras/daemon/DaemonConsole.py b/lbrynet/extras/daemon/DaemonConsole.py index 274c68507..4735667b0 100644 --- a/lbrynet/extras/daemon/DaemonConsole.py +++ b/lbrynet/extras/daemon/DaemonConsole.py @@ -14,7 +14,6 @@ import aiohttp import logging from urllib.parse import urlparse -from lbrynet import conf log = logging.getLogger(__name__) USER_AGENT = "AuthServiceProxy/0.1" @@ -110,7 +109,7 @@ class AuthAPIClient: @classmethod async def get_client(cls, key_name=None): api_key_name = key_name or "api" - keyring = Keyring.load_from_disk() + keyring = Keyring.load_from_disk() # pylint: disable=E0602 api_key = keyring.api_key login_url = conf.settings.get_api_connection_string(api_key_name, api_key.secret) @@ -127,7 +126,7 @@ class AuthAPIClient: async with session.post(login_url, headers=headers) as r: cookies = r.cookies uid = cookies.get(TWISTED_SECURE_SESSION if conf.settings['use_https'] else TWISTED_SESSION).value - api_key = APIKey.create(seed=uid.encode()) + api_key = APIKey.create(seed=uid.encode()) # pylint: disable=E0602 return cls(api_key, session, cookies, url, login_url) diff --git a/lbrynet/extras/daemon/Downloader.py b/lbrynet/extras/daemon/Downloader.py index 955cb900a..9c7d4fc80 100644 --- a/lbrynet/extras/daemon/Downloader.py +++ b/lbrynet/extras/daemon/Downloader.py @@ -11,7 +11,7 @@ from lbrynet.p2p.StreamDescriptor import download_sd_blob from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from torba.client.constants import COIN from lbrynet.extras.wallet.dewies import dewies_to_lbc -from lbrynet.extras.daemon.Components import f2d +from lbrynet.extras.compat import f2d INITIALIZING_CODE = 'initializing' DOWNLOAD_METADATA_CODE = 'downloading_metadata' diff --git a/lbrynet/extras/daemon/Publisher.py b/lbrynet/extras/daemon/Publisher.py index bd35f7eb3..5846cb9a4 100644 --- a/lbrynet/extras/daemon/Publisher.py +++ b/lbrynet/extras/daemon/Publisher.py @@ -50,18 +50,18 @@ class Publisher: ) # check if we have a file already for this claim (if this is a publish update with a new stream) - old_stream_hashes = await d2f(self.storage.get_old_stream_hashes_for_claim_id( + old_stream_hashes = await self.storage.get_old_stream_hashes_for_claim_id( tx.outputs[0].claim_id, self.lbry_file.stream_hash - )) + ) if old_stream_hashes: for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes, list(self.lbry_file_manager.lbry_files)): await d2f(self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False)) log.info("Removed old stream for claim update: %s", lbry_file.stream_hash) - await d2f(self.storage.save_content_claim( + await self.storage.save_content_claim( self.lbry_file.stream_hash, tx.outputs[0].id - )) + ) return tx async def publish_stream(self, name, bid, claim_dict, stream_hash, holding_address=None): diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index f817c691c..62725f437 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -1,17 +1,16 @@ +import asyncio import logging import os -import sqlite3 import traceback import typing from binascii import hexlify, unhexlify -from twisted.internet import defer, task, threads -from twisted.enterprise import adbapi from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies from lbrynet import conf from lbrynet.schema.claim import ClaimDict from lbrynet.schema.decode import smart_decode from lbrynet.blob.CryptBlob import CryptBlobInfo from lbrynet.dht.constants import dataExpireTimeout +from torba.client.basedatabase import SQLiteMixin log = logging.getLogger(__name__) @@ -44,63 +43,23 @@ def _open_file_for_writing(download_directory, suggested_file_name): return os.path.basename(file_path) -def open_file_for_writing(download_directory, suggested_file_name): - """ - Used to touch the path of a file to be downloaded - - :param download_directory: (str) - :param suggested_file_name: (str) - :return: (str) basename - """ - return threads.deferToThread(_open_file_for_writing, download_directory, suggested_file_name) +async def open_file_for_writing(download_directory: str, suggested_file_name: str) -> str: + """ Used to touch the path of a file to be downloaded. """ + return await asyncio.get_event_loop().run_in_executor( + None, _open_file_for_writing, download_directory, suggested_file_name + ) -def rerun_if_locked(f): - max_attempts = 5 - - def rerun(err, rerun_count, *args, **kwargs): - connection = args[0] - reactor = connection.reactor - log.debug("Failed to execute (%s): %s", err, args) - if err.check(sqlite3.OperationalError) and "database is locked" in str(err.value): - log.warning("database was locked. rerunning %s with args %s, kwargs %s", - str(f), str(args), str(kwargs)) - if rerun_count < max_attempts: - delay = 2**rerun_count - return task.deferLater(reactor, delay, inner_wrapper, rerun_count + 1, *args, **kwargs) - raise err - - def check_needed_rerun(result, rerun_count): - if rerun_count: - log.info("successfully reran database query") - return result - - def inner_wrapper(rerun_count, *args, **kwargs): - d = f(*args, **kwargs) - d.addCallback(check_needed_rerun, rerun_count) - d.addErrback(rerun, rerun_count, *args, **kwargs) - return d - - def wrapper(*args, **kwargs): - return inner_wrapper(0, *args, **kwargs) - - return wrapper +async def looping_call(interval, fun): + while True: + try: + await fun() + except Exception as e: + log.exception('Looping call experienced exception:', exc_info=e) + await asyncio.sleep(interval) -class SqliteConnection(adbapi.ConnectionPool): - def __init__(self, db_path): - super().__init__('sqlite3', db_path, check_same_thread=False) - - @rerun_if_locked - def runInteraction(self, interaction, *args, **kw): - return super().runInteraction(interaction, *args, **kw) - - @classmethod - def set_reactor(cls, reactor): - cls.reactor = reactor - - -class SQLiteStorage: +class SQLiteStorage(SQLiteMixin): CREATE_TABLES_QUERY = """ pragma foreign_keys=on; @@ -173,70 +132,45 @@ class SQLiteStorage: ); """ - def __init__(self, db_dir, reactor=None): - if not reactor: - from twisted.internet import reactor - self.db_dir = db_dir - self._db_path = os.path.join(db_dir, "lbrynet.sqlite") - log.info("connecting to database: %s", self._db_path) - self.db = SqliteConnection(self._db_path) - self.db.set_reactor(reactor) + def __init__(self, path): + super().__init__(path) + from twisted.internet import reactor self.clock = reactor - - # used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a - # change to the associated content claim occurs. these are added by the file manager - # when it loads each file - self.content_claim_callbacks = {} # {: } + self.content_claim_callbacks = {} self.check_should_announce_lc = None + + async def open(self): + await super().open() if 'reflector' not in conf.settings['components_to_skip']: - self.check_should_announce_lc = task.LoopingCall(self.verify_will_announce_all_head_and_sd_blobs) + self.check_should_announce_lc = looping_call( + 600, self.verify_will_announce_all_head_and_sd_blobs + ) - @defer.inlineCallbacks - def setup(self): - def _create_tables(transaction): - transaction.executescript(self.CREATE_TABLES_QUERY) - yield self.db.runInteraction(_create_tables) - if self.check_should_announce_lc and not self.check_should_announce_lc.running: - self.check_should_announce_lc.start(600) - defer.returnValue(None) + async def close(self): + if self.check_should_announce_lc is not None: + self.check_should_announce_lc.close() + await super().close() - @defer.inlineCallbacks - def run_and_return_one_or_none(self, query, *args): - result = yield self.db.runQuery(query, args) - if result: - defer.returnValue(result[0][0]) - else: - defer.returnValue(None) + async def run_and_return_one_or_none(self, query, *args): + for row in await self.db.execute_fetchall(query, args): + return row - @defer.inlineCallbacks - def run_and_return_list(self, query, *args): - result = yield self.db.runQuery(query, args) - if result: - defer.returnValue([i[0] for i in result]) - else: - defer.returnValue([]) + async def run_and_return_list(self, query, *args): + rows = list(await self.db.execute_fetchall(query, args)) + return [col[0] for col in rows] if rows else [] - def run_and_return_id(self, query, *args): - def do_save(t): - t.execute(query, args) - return t.lastrowid - return self.db.runInteraction(do_save) - - def stop(self): - if self.check_should_announce_lc and self.check_should_announce_lc.running: - self.check_should_announce_lc.stop() - self.db.close() - return defer.succeed(True) + async def run_and_return_id(self, query, *args): + return (await self.db.execute(query, args)).lastrowid # # # # # # # # # blob functions # # # # # # # # # def add_completed_blob(self, blob_hash, length, next_announce_time, should_announce, status="finished"): log.debug("Adding a completed blob. blob_hash=%s, length=%i", blob_hash, length) values = (blob_hash, length, next_announce_time or 0, int(bool(should_announce)), status, 0, 0) - return self.db.runOperation("insert or replace into blob values (?, ?, ?, ?, ?, ?, ?)", values) + return self.db.execute("insert or replace into blob values (?, ?, ?, ?, ?, ?, ?)", values) def set_should_announce(self, blob_hash, next_announce_time, should_announce): - return self.db.runOperation( + return self.db.execute( "update blob set next_announce_time=?, should_announce=? where blob_hash=?", (next_announce_time or 0, int(bool(should_announce)), blob_hash) ) @@ -246,9 +180,8 @@ class SQLiteStorage: "select status from blob where blob_hash=?", blob_hash ) - @defer.inlineCallbacks def add_known_blob(self, blob_hash, length): - yield self.db.runOperation( + return self.db.execute( "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", (blob_hash, length, 0, 0, "pending", 0, 0) ) @@ -267,12 +200,11 @@ class SQLiteStorage: "select blob_hash from blob where should_announce=1 and status='finished'" ) - @defer.inlineCallbacks - def get_all_finished_blobs(self): - blob_hashes = yield self.run_and_return_list( + async def get_all_finished_blobs(self): + blob_hashes = await self.run_and_return_list( "select blob_hash from blob where status='finished'" ) - defer.returnValue([unhexlify(blob_hash) for blob_hash in blob_hashes]) + return [unhexlify(blob_hash) for blob_hash in blob_hashes] def count_finished_blobs(self): return self.run_and_return_one_or_none( @@ -280,10 +212,10 @@ class SQLiteStorage: ) def update_last_announced_blob(self, blob_hash, last_announced): - return self.db.runOperation( - "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?", - (int(last_announced + (dataExpireTimeout / 2)), int(last_announced), blob_hash) - ) + return self.db.execute( + "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?", + (int(last_announced + (dataExpireTimeout / 2)), int(last_announced), blob_hash) + ) def should_single_announce_blobs(self, blob_hashes, immediate=False): def set_single_announce(transaction): @@ -298,7 +230,7 @@ class SQLiteStorage: transaction.execute( "update blob set single_announce=1 where blob_hash=? and status='finished'", (blob_hash, ) ) - return self.db.runInteraction(set_single_announce) + return self.db.run(set_single_announce) def get_blobs_to_announce(self): def get_and_update(transaction): @@ -317,13 +249,13 @@ class SQLiteStorage: ) blobs = [b[0] for b in r.fetchall()] return blobs - return self.db.runInteraction(get_and_update) + return self.db.run(get_and_update) def delete_blobs_from_db(self, blob_hashes): def delete_blobs(transaction): for blob_hash in blob_hashes: transaction.execute("delete from blob where blob_hash=?;", (blob_hash,)) - return self.db.runInteraction(delete_blobs) + return self.db.run(delete_blobs) def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") @@ -336,17 +268,16 @@ class SQLiteStorage: transaction.execute("insert into stream_blob values (?, ?, ?, ?)", (stream_hash, blob_info.get('blob_hash', None), blob_info['blob_num'], blob_info['iv'])) - return self.db.runInteraction(_add_stream_blobs) + return self.db.run(_add_stream_blobs) - @defer.inlineCallbacks - def add_known_blobs(self, blob_infos): + async def add_known_blobs(self, blob_infos): for blob_info in blob_infos: if blob_info.get('blob_hash') and blob_info['length']: - yield self.add_known_blob(blob_info['blob_hash'], blob_info['length']) + await self.add_known_blob(blob_info['blob_hash'], blob_info['length']) def verify_will_announce_head_and_sd_blobs(self, stream_hash): # fix should_announce for imported head and sd blobs - return self.db.runOperation( + return self.db.execute( "update blob set should_announce=1 " "where should_announce=0 and " "blob.blob_hash in " @@ -358,7 +289,7 @@ class SQLiteStorage: ) def verify_will_announce_all_head_and_sd_blobs(self): - return self.db.runOperation( + return self.db.execute( "update blob set should_announce=1 " "where should_announce=0 and " "blob.blob_hash in " @@ -383,23 +314,24 @@ class SQLiteStorage: :param stream_blob_infos: (list) of blob info dictionaries :return: (defer.Deferred) """ - def _store_stream(transaction): - transaction.execute("insert into stream values (?, ?, ?, ?, ?);", - (stream_hash, sd_hash, stream_key, stream_name, - suggested_file_name)) - + transaction.execute( + "insert into stream values (?, ?, ?, ?, ?);", ( + stream_hash, sd_hash, stream_key, stream_name, suggested_file_name + ) + ) for blob_info in stream_blob_infos: - transaction.execute("insert into stream_blob values (?, ?, ?, ?)", - (stream_hash, blob_info.get('blob_hash', None), - blob_info['blob_num'], blob_info['iv'])) + transaction.execute( + "insert into stream_blob values (?, ?, ?, ?)", ( + stream_hash, blob_info.get('blob_hash', None), + blob_info['blob_num'], blob_info['iv'] + ) + ) + return self.db.run(_store_stream) - return self.db.runInteraction(_store_stream) - - @defer.inlineCallbacks - def delete_stream(self, stream_hash): - sd_hash = yield self.get_sd_blob_hash_for_stream(stream_hash) - stream_blobs = yield self.get_blobs_for_stream(stream_hash) + async def delete_stream(self, stream_hash): + sd_hash = await self.get_sd_blob_hash_for_stream(stream_hash) + stream_blobs = await self.get_blobs_for_stream(stream_hash) blob_hashes = [b.blob_hash for b in stream_blobs if b.blob_hash is not None] def _delete_stream(transaction): @@ -407,24 +339,28 @@ class SQLiteStorage: transaction.execute("delete from file where stream_hash=? ", (stream_hash, )) transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, )) transaction.execute("delete from stream where stream_hash=? ", (stream_hash, )) - transaction.execute("delete from blob where blob_hash=?", (sd_hash, )) + transaction.execute("delete from blob where blob_hash=?", sd_hash) for blob_hash in blob_hashes: transaction.execute("delete from blob where blob_hash=?;", (blob_hash, )) - yield self.db.runInteraction(_delete_stream) + + await self.db.run(_delete_stream) def get_all_streams(self): return self.run_and_return_list("select stream_hash from stream") def get_stream_info(self, stream_hash): - d = self.db.runQuery("select stream_name, stream_key, suggested_filename, sd_hash from stream " - "where stream_hash=?", (stream_hash, )) - d.addCallback(lambda r: None if not r else r[0]) - return d + return self.run_and_return_one_or_none( + "select stream_name, stream_key, suggested_filename, sd_hash from stream " + "where stream_hash=?", stream_hash + ) - def check_if_stream_exists(self, stream_hash): - d = self.db.runQuery("select stream_hash from stream where stream_hash=?", (stream_hash, )) - d.addCallback(lambda r: bool(len(r))) - return d + async def check_if_stream_exists(self, stream_hash): + row = await self.run_and_return_one_or_none( + "select stream_hash from stream where stream_hash=?", stream_hash + ) + if row is not None: + return bool(len(row)) + return False def get_blob_num_by_hash(self, stream_hash, blob_hash): return self.run_and_return_one_or_none( @@ -466,7 +402,7 @@ class SQLiteStorage: crypt_blob_infos.append(CryptBlobInfo(blob_hash, position, blob_length, iv)) crypt_blob_infos = sorted(crypt_blob_infos, key=lambda info: info.blob_num) return crypt_blob_infos - return self.db.runInteraction(_get_blobs_for_stream) + return self.db.run(_get_blobs_for_stream) def get_pending_blobs_for_stream(self, stream_hash): return self.run_and_return_list( @@ -493,14 +429,12 @@ class SQLiteStorage: # # # # # # # # # file stuff # # # # # # # # # - @defer.inlineCallbacks - def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate): + async def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate): # touch the closest available file to the file name - file_name = yield open_file_for_writing(unhexlify(download_directory).decode(), unhexlify(file_name).decode()) - result = yield self.save_published_file( + file_name = await open_file_for_writing(unhexlify(download_directory).decode(), unhexlify(file_name).decode()) + return await self.save_published_file( stream_hash, hexlify(file_name.encode()), download_directory, data_payment_rate ) - defer.returnValue(result) def save_published_file(self, stream_hash, file_name, download_directory, data_payment_rate, status="stopped"): return self.run_and_return_id( @@ -509,7 +443,9 @@ class SQLiteStorage: ) def get_filename_for_rowid(self, rowid): - return self.run_and_return_one_or_none("select file_name from file where rowid=?", rowid) + return self.run_and_return_one_or_none( + "select file_name from file where rowid=?", rowid + ) def get_all_lbry_files(self): def _lbry_file_dict(rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, @@ -535,13 +471,11 @@ class SQLiteStorage: ).fetchall() ] - d = self.db.runInteraction(_get_all_files) - return d + return self.db.run(_get_all_files) - def change_file_status(self, rowid, new_status): - d = self.db.runQuery("update file set status=? where rowid=?", (new_status, rowid)) - d.addCallback(lambda _: new_status) - return d + async def change_file_status(self, rowid, new_status): + await self.db.execute("update file set status=? where rowid=?", (new_status, rowid)) + return new_status def get_lbry_file_status(self, rowid): return self.run_and_return_one_or_none( @@ -565,7 +499,7 @@ class SQLiteStorage: ("%s:%i" % (support['txid'], support['nout']), claim_id, lbc_to_dewies(support['amount']), support.get('address', "")) ) - return self.db.runInteraction(_save_support) + return self.db.run(_save_support) def get_supports(self, *claim_ids): def _format_support(outpoint, supported_id, amount, address): @@ -587,15 +521,16 @@ class SQLiteStorage: ) ] - return self.db.runInteraction(_get_supports) + return self.db.run(_get_supports) # # # # # # # # # claim functions # # # # # # # # # - @defer.inlineCallbacks - def save_claims(self, claim_infos): + async def save_claims(self, claim_infos): + support_callbacks = [] + update_file_callbacks = [] + def _save_claims(transaction): content_claims_to_update = [] - support_callbacks = [] for claim_info in claim_infos: outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout']) claim_id = claim_info['claim_id'] @@ -622,7 +557,7 @@ class SQLiteStorage: ) if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing # support info - support_callbacks.append(self.save_supports(claim_id, claim_info['supports'])) + support_callbacks.append((claim_id, claim_info['supports'])) if not source_hash: continue stream_hash = transaction.execute( @@ -644,18 +579,18 @@ class SQLiteStorage: content_claims_to_update.append((stream_hash, outpoint)) elif known_outpoint != outpoint: content_claims_to_update.append((stream_hash, outpoint)) - update_file_callbacks = [] for stream_hash, outpoint in content_claims_to_update: self._save_content_claim(transaction, outpoint, stream_hash) if stream_hash in self.content_claim_callbacks: update_file_callbacks.append(self.content_claim_callbacks[stream_hash]()) - return update_file_callbacks, support_callbacks - content_dl, support_dl = yield self.db.runInteraction(_save_claims) - if content_dl: - yield defer.DeferredList(content_dl) - if support_dl: - yield defer.DeferredList(support_dl) + await self.db.run(_save_claims) + if update_file_callbacks: + await asyncio.wait(update_file_callbacks) + if support_callbacks: + await asyncio.wait([ + self.save_supports(*args) for args in support_callbacks + ]) def save_claims_for_resolve(self, claim_infos): to_save = [] @@ -718,16 +653,13 @@ class SQLiteStorage: # update the claim associated to the file transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) - @defer.inlineCallbacks - def save_content_claim(self, stream_hash, claim_outpoint): - yield self.db.runInteraction(self._save_content_claim, claim_outpoint, stream_hash) + async def save_content_claim(self, stream_hash, claim_outpoint): + await self.db.run(self._save_content_claim, claim_outpoint, stream_hash) # update corresponding ManagedEncryptedFileDownloader object if stream_hash in self.content_claim_callbacks: - file_callback = self.content_claim_callbacks[stream_hash] - yield file_callback() + await self.content_claim_callbacks[stream_hash]() - @defer.inlineCallbacks - def get_content_claim(self, stream_hash, include_supports=True): + async def get_content_claim(self, stream_hash, include_supports=True): def _get_claim_from_stream_hash(transaction): claim_info = transaction.execute( "select c.*, " @@ -745,15 +677,13 @@ class SQLiteStorage: result['channel_name'] = channel_name return result - result = yield self.db.runInteraction(_get_claim_from_stream_hash) + result = await self.db.run(_get_claim_from_stream_hash) if result and include_supports: - supports = yield self.get_supports(result['claim_id']) - result['supports'] = supports - result['effective_amount'] = calculate_effective_amount(result['amount'], supports) - defer.returnValue(result) + result['supports'] = await self.get_supports(result['claim_id']) + result['effective_amount'] = calculate_effective_amount(result['amount'], result['supports']) + return result - @defer.inlineCallbacks - def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True): + async def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True): def _batch_get_claim(transaction): results = {} claim_infos = _batched_select( @@ -781,10 +711,10 @@ class SQLiteStorage: results[stream_hash]['channel_name'] = channel_name return results - claims = yield self.db.runInteraction(_batch_get_claim) + claims = await self.db.run(_batch_get_claim) if include_supports: all_supports = {} - for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])): + for support in await self.get_supports(*[claim['claim_id'] for claim in claims.values()]): all_supports.setdefault(support['claim_id'], []).append(support) for stream_hash in claims.keys(): claim = claims[stream_hash] @@ -792,28 +722,28 @@ class SQLiteStorage: claim['supports'] = supports claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports) claims[stream_hash] = claim - defer.returnValue(claims) + return claims - @defer.inlineCallbacks - def get_claim(self, claim_outpoint, include_supports=True): + async def get_claim(self, claim_outpoint, include_supports=True): def _get_claim(transaction): - claim_info = transaction.execute("select c.*, " - "case when c.channel_claim_id is not null then " - "(select claim_name from claim where claim_id==c.channel_claim_id) " - "else null end as channel_name from claim c where claim_outpoint = ?", - (claim_outpoint,)).fetchone() + claim_info = transaction.execute( + "select c.*, " + "case when c.channel_claim_id is not null then " + "(select claim_name from claim where claim_id==c.channel_claim_id) " + "else null end as channel_name from claim c where claim_outpoint = ?", + (claim_outpoint,) + ).fetchone() channel_name = claim_info[-1] result = _format_claim_response(*claim_info[:-1]) if channel_name: result['channel_name'] = channel_name return result - result = yield self.db.runInteraction(_get_claim) + result = await self.db.run(_get_claim) if include_supports: - supports = yield self.get_supports(result['claim_id']) - result['supports'] = supports - result['effective_amount'] = calculate_effective_amount(result['amount'], supports) - defer.returnValue(result) + result['supports'] = await self.get_supports(result['claim_id']) + result['effective_amount'] = calculate_effective_amount(result['amount'], result['supports']) + return result def get_unknown_certificate_ids(self): def _get_unknown_certificate_claim_ids(transaction): @@ -825,11 +755,10 @@ class SQLiteStorage: "(select c2.claim_id from claim as c2)" ).fetchall() ] - return self.db.runInteraction(_get_unknown_certificate_claim_ids) + return self.db.run(_get_unknown_certificate_claim_ids) - @defer.inlineCallbacks - def get_pending_claim_outpoints(self): - claim_outpoints = yield self.run_and_return_list("select claim_outpoint from claim where height=-1") + async def get_pending_claim_outpoints(self): + claim_outpoints = await self.run_and_return_list("select claim_outpoint from claim where height=-1") results = {} # {txid: [nout, ...]} for outpoint_str in claim_outpoints: txid, nout = outpoint_str.split(":") @@ -838,7 +767,7 @@ class SQLiteStorage: results[txid] = outputs if results: log.debug("missing transaction heights for %i claims", len(results)) - defer.returnValue(results) + return results def save_claim_tx_heights(self, claim_tx_heights): def _save_claim_heights(transaction): @@ -847,17 +776,17 @@ class SQLiteStorage: "update claim set height=? where claim_outpoint=? and height=-1", (height, outpoint) ) - return self.db.runInteraction(_save_claim_heights) + return self.db.run(_save_claim_heights) # # # # # # # # # reflector functions # # # # # # # # # def update_reflected_stream(self, sd_hash, reflector_address, success=True): if success: - return self.db.runOperation( + return self.db.execute( "insert or replace into reflected_stream values (?, ?, ?)", (sd_hash, reflector_address, self.clock.seconds()) ) - return self.db.runOperation( + return self.db.execute( "delete from reflected_stream where sd_hash=? and reflector_address=?", (sd_hash, reflector_address) ) diff --git a/lbrynet/extras/wallet/manager.py b/lbrynet/extras/wallet/manager.py index 038e7dd87..1517b6ac6 100644 --- a/lbrynet/extras/wallet/manager.py +++ b/lbrynet/extras/wallet/manager.py @@ -277,7 +277,7 @@ class LbryWalletManager(BaseWalletManager): if 'error' not in results: await self.old_db.save_claims_for_resolve([ value for value in results.values() if 'error' not in value - ]).asFuture(asyncio.get_event_loop()) + ]) return results async def get_claims_for_name(self, name: str): @@ -432,7 +432,7 @@ class LbryWalletManager(BaseWalletManager): await account.ledger.broadcast(tx) await self.old_db.save_claims([self._old_get_temp_claim_info( tx, tx.outputs[0], claim_address, claim_dict, name, dewies_to_lbc(amount) - )]).asFuture(asyncio.get_event_loop()) + )]) # TODO: release reserved tx outputs in case anything fails by this point return tx @@ -446,7 +446,7 @@ class LbryWalletManager(BaseWalletManager): 'address': holding_address, 'claim_id': claim_id, 'amount': dewies_to_lbc(amount) - }]).asFuture(asyncio.get_event_loop()) + }]) return tx async def tip_claim(self, amount, claim_id, account): @@ -461,7 +461,7 @@ class LbryWalletManager(BaseWalletManager): 'address': claim_to_tip['address'], 'claim_id': claim_id, 'amount': dewies_to_lbc(amount) - }]).asFuture(asyncio.get_event_loop()) + }]) return tx async def abandon_claim(self, claim_id, txid, nout, account): @@ -484,7 +484,7 @@ class LbryWalletManager(BaseWalletManager): await self.old_db.save_claims([self._old_get_temp_claim_info( tx, tx.outputs[0], address, cert, channel_name, dewies_to_lbc(amount) - )]).asFuture(asyncio.get_event_loop()) + )]) return tx def _old_get_temp_claim_info(self, tx, txo, address, claim_dict, name, bid): diff --git a/lbrynet/extras/wallet/server/block_processor.py b/lbrynet/extras/wallet/server/block_processor.py index eae699d67..614bc0c50 100644 --- a/lbrynet/extras/wallet/server/block_processor.py +++ b/lbrynet/extras/wallet/server/block_processor.py @@ -131,9 +131,6 @@ class LBRYBlockProcessor(BlockProcessor): super().backup_blocks(raw_blocks=raw_blocks) self.db.batched_flush_claims() - def shutdown(self): - self.db.shutdown() - async def flush(self, flush_utxos): self.db.batched_flush_claims() return await super().flush(flush_utxos) diff --git a/lbrynet/extras/wallet/server/db.py b/lbrynet/extras/wallet/server/db.py index de2ad15b2..b92bcad67 100644 --- a/lbrynet/extras/wallet/server/db.py +++ b/lbrynet/extras/wallet/server/db.py @@ -21,7 +21,7 @@ class LBRYDB(DB): self.pending_abandons = {} super().__init__(*args, **kwargs) - def shutdown(self): + def close(self): self.batched_flush_claims() self.claims_db.close() self.names_db.close() @@ -29,9 +29,7 @@ class LBRYDB(DB): self.outpoint_to_claim_id_db.close() self.claim_undo_db.close() self.utxo_db.close() - # electrumx ones - self.utxo_db.close() - self.history.close_db() + super().close() async def _open_dbs(self, for_sync, compacting): await super()._open_dbs(for_sync=for_sync, compacting=compacting)