fully asyncio daemon api

This commit is contained in:
Lex Berezhny 2018-12-15 15:31:02 -05:00
parent 6c52450858
commit 86fb99a37c
12 changed files with 400 additions and 594 deletions

View file

@ -57,7 +57,7 @@ def start_daemon(settings: typing.Optional[typing.Dict] = None,
if check_connection():
daemon = Daemon()
asyncio.get_event_loop().run_until_complete(daemon.start_listening())
asyncio.get_event_loop().create_task(daemon.start_listening())
reactor.run()
else:
log.info("Not connected to internet, unable to start")

View file

@ -36,37 +36,35 @@ class Component(metaclass=ComponentType):
def running(self):
return self._running
def get_status(self):
async def get_status(self):
return
def start(self):
async def start(self):
raise NotImplementedError()
def stop(self):
async def stop(self):
raise NotImplementedError()
@property
def component(self):
raise NotImplementedError()
@defer.inlineCallbacks
def _setup(self):
async def _setup(self):
try:
result = yield defer.maybeDeferred(self.start)
result = await self.start()
self._running = True
defer.returnValue(result)
return result
except (defer.CancelledError, AlreadyQuit):
pass
except Exception as err:
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
raise err
@defer.inlineCallbacks
def _stop(self):
async def _stop(self):
try:
result = yield defer.maybeDeferred(self.stop)
result = await self.stop()
self._running = False
defer.returnValue(result)
return result
except (defer.CancelledError, AlreadyQuit):
pass
except Exception as err:

View file

@ -1,5 +1,5 @@
import asyncio
import logging
from twisted.internet import defer
from lbrynet.p2p.Error import ComponentStartConditionNotMet
from lbrynet.extras.daemon.PeerManager import PeerManager
from lbrynet.extras.daemon.PeerFinder import DHTPeerFinder
@ -110,13 +110,8 @@ class ComponentManager:
steps.reverse()
return steps
@defer.inlineCallbacks
def setup(self, **callbacks):
"""
Start Components in sequence sorted by requirements
:return: (defer.Deferred)
"""
async def setup(self, **callbacks):
""" Start Components in sequence sorted by requirements """
for component_name, cb in callbacks.items():
if component_name not in self.component_classes:
if component_name not in self.skip_components:
@ -124,19 +119,22 @@ class ComponentManager:
if not callable(cb):
raise ValueError("%s is not callable" % cb)
def _setup(component):
async def _setup(component):
await component._setup()
if component.component_name in callbacks:
d = component._setup()
d.addCallback(callbacks[component.component_name], component)
return d
return component._setup()
maybe_coro = callbacks[component.component_name](component)
if asyncio.iscoroutine(maybe_coro):
asyncio.create_task(maybe_coro)
stages = self.sort_components()
for stage in stages:
yield defer.DeferredList([_setup(component) for component in stage if not component.running])
needing_start = [
_setup(component) for component in stage if not component.running
]
if needing_start:
await asyncio.wait(needing_start)
@defer.inlineCallbacks
def stop(self):
async def stop(self):
"""
Stop Components in reversed startup order
@ -144,7 +142,11 @@ class ComponentManager:
"""
stages = self.sort_components(reverse=True)
for stage in stages:
yield defer.DeferredList([component._stop() for component in stage if component.running])
needing_stop = [
component._stop() for component in stage if component.running
]
if needing_stop:
await asyncio.wait(needing_stop)
def all_components_running(self, *component_names):
"""

View file

@ -1,13 +1,13 @@
import os
import asyncio
import aiohttp
import logging
import treq
import json
import math
import binascii
from hashlib import sha256
from types import SimpleNamespace
from twisted.internet import defer, threads, reactor, error, task
from twisted.internet import defer, reactor, error, task
from aioupnp import __version__ as aioupnp_version
from aioupnp.upnp import UPnP
@ -16,6 +16,7 @@ from aioupnp.fault import UPnPError
import lbrynet.schema
from lbrynet import conf
from lbrynet.extras.compat import d2f
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
from lbrynet.blob.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
@ -27,7 +28,7 @@ from lbrynet.extras.daemon.HashAnnouncer import DHTHashAnnouncer
from lbrynet.extras.reflector.server.server import ReflectorServerFactory
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.wallet import Network
from lbrynet.utils import DeferredDict, generate_id
from lbrynet.utils import generate_id
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.p2p.RateLimiter import RateLimiter
from lbrynet.p2p.BlobManager import DiskBlobManager
@ -55,22 +56,23 @@ RATE_LIMITER_COMPONENT = "rate_limiter"
PAYMENT_RATE_COMPONENT = "payment_rate_manager"
def from_future(coroutine: asyncio.coroutine) -> defer.Deferred:
return defer.Deferred.fromFuture(asyncio.ensure_future(coroutine))
async def gather_dict(tasks: dict):
async def wait_value(key, value):
return key, await value
return dict(await asyncio.gather(*(
wait_value(*kv) for kv in tasks.items()
)))
@defer.inlineCallbacks
def get_external_ip(): # used if upnp is disabled or non-functioning
async def get_external_ip(): # used if upnp is disabled or non-functioning
try:
buf = []
response = yield treq.get("https://api.lbry.io/ip")
yield treq.collect(response, buf.append)
parsed = json.loads(b"".join(buf).decode())
if parsed['success']:
return parsed['data']['ip']
return
except Exception as err:
return
async with aiohttp.ClientSession() as session:
async with session.get("https://api.lbry.io/ip") as resp:
response = await resp.json()
if response['success']:
return response['data']['ip']
except Exception as e:
pass
class DatabaseComponent(Component):
@ -97,8 +99,7 @@ class DatabaseComponent(Component):
with open(conf.settings.get_db_revision_filename(), mode='w') as db_revision:
db_revision.write(str(version_num))
@defer.inlineCallbacks
def start(self):
async def start(self):
# check directories exist, create them if they don't
log.info("Loading databases")
@ -117,19 +118,19 @@ class DatabaseComponent(Component):
if old_revision < self.get_current_db_revision():
from lbrynet.extras.daemon.migrator import dbmigrator
log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision())
yield threads.deferToThread(
dbmigrator.migrate_db, conf.settings.data_dir, old_revision, self.get_current_db_revision()
await asyncio.get_event_loop().run_in_executor(
None, dbmigrator.migrate_db, conf.settings.data_dir, old_revision, self.get_current_db_revision()
)
self._write_db_revision_file(self.get_current_db_revision())
log.info("Finished upgrading the databases.")
# start SQLiteStorage
self.storage = SQLiteStorage(conf.settings.data_dir)
yield self.storage.setup()
self.storage = SQLiteStorage(
os.path.join(conf.settings.data_dir, "lbrynet.sqlite")
)
await self.storage.open()
@defer.inlineCallbacks
def stop(self):
yield self.storage.stop()
async def stop(self):
await self.storage.close()
self.storage = None
@ -250,33 +251,24 @@ class HeadersComponent(Component):
with open(self.headers_file, "rb+") as headers_file:
headers_file.truncate(checksum_length_in_bytes)
@defer.inlineCallbacks
def start(self):
async def start(self):
conf.settings.ensure_wallet_dir()
if not os.path.exists(self.headers_dir):
os.mkdir(self.headers_dir)
if os.path.exists(self.old_file):
log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file)
os.rename(self.old_file, self.headers_file)
self._downloading_headers = yield f2d(self.should_download_headers_from_s3())
self._downloading_headers = await self.should_download_headers_from_s3()
if self._downloading_headers:
try:
yield self.fetch_headers_from_s3()
await d2f(self.fetch_headers_from_s3())
except Exception as err:
log.error("failed to fetch headers from s3: %s", err)
finally:
self._downloading_headers = False
def stop(self):
return defer.succeed(None)
def d2f(deferred):
return deferred.asFuture(asyncio.get_event_loop())
def f2d(future):
return defer.Deferred.fromFuture(asyncio.ensure_future(future))
async def stop(self):
pass
class WalletComponent(Component):
@ -304,19 +296,17 @@ class WalletComponent(Component):
'is_locked': not self.wallet_manager.is_wallet_unlocked,
}
@defer.inlineCallbacks
def start(self):
async def start(self):
conf.settings.ensure_wallet_dir()
log.info("Starting torba wallet")
storage = self.component_manager.get_component(DATABASE_COMPONENT)
lbrynet.schema.BLOCKCHAIN_NAME = conf.settings['blockchain_name']
self.wallet_manager = yield f2d(LbryWalletManager.from_lbrynet_config(conf.settings, storage))
self.wallet_manager = await LbryWalletManager.from_lbrynet_config(conf.settings, storage)
self.wallet_manager.old_db = storage
yield f2d(self.wallet_manager.start())
await self.wallet_manager.start()
@defer.inlineCallbacks
def stop(self):
yield f2d(self.wallet_manager.stop())
async def stop(self):
await self.wallet_manager.stop()
self.wallet_manager = None
@ -345,14 +335,11 @@ class BlobComponent(Component):
def stop(self):
return self.blob_manager.stop()
@defer.inlineCallbacks
def get_status(self):
async def get_status(self):
count = 0
if self.blob_manager:
count = yield self.blob_manager.storage.count_finished_blobs()
defer.returnValue({
'finished_blobs': count
})
count = await self.blob_manager.storage.count_finished_blobs()
return {'finished_blobs': count}
class DHTComponent(Component):
@ -376,8 +363,7 @@ class DHTComponent(Component):
'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.contacts)
}
@defer.inlineCallbacks
def start(self):
async def start(self):
self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", conf.settings["peer_port"])
self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", conf.settings["dht_node_port"])
@ -387,7 +373,7 @@ class DHTComponent(Component):
external_ip = self.upnp_component.external_ip
if not external_ip:
log.warning("UPnP component failed to get external ip")
external_ip = yield get_external_ip()
external_ip = await get_external_ip()
if not external_ip:
log.warning("failed to get external ip")
@ -399,12 +385,11 @@ class DHTComponent(Component):
peerPort=self.external_peer_port
)
yield self.dht_node.start(conf.settings['known_dht_nodes'], block_on_join=False)
await d2f(self.dht_node.start(conf.settings['known_dht_nodes'], block_on_join=False))
log.info("Started the dht")
@defer.inlineCallbacks
def stop(self):
yield self.dht_node.stop()
return d2f(self.dht_node.stop())
class HashAnnouncerComponent(Component):
@ -419,16 +404,14 @@ class HashAnnouncerComponent(Component):
def component(self):
return self.hash_announcer
@defer.inlineCallbacks
def start(self):
storage = self.component_manager.get_component(DATABASE_COMPONENT)
dht_node = self.component_manager.get_component(DHT_COMPONENT)
self.hash_announcer = DHTHashAnnouncer(dht_node, storage)
yield self.hash_announcer.start()
self.hash_announcer.start()
@defer.inlineCallbacks
def stop(self):
yield self.hash_announcer.stop()
self.hash_announcer.stop()
def get_status(self):
return {
@ -447,13 +430,11 @@ class RateLimiterComponent(Component):
def component(self):
return self.rate_limiter
def start(self):
async def start(self):
self.rate_limiter.start()
return defer.succeed(None)
def stop(self):
async def stop(self):
self.rate_limiter.stop()
return defer.succeed(None)
class PaymentRateComponent(Component):
@ -467,11 +448,11 @@ class PaymentRateComponent(Component):
def component(self):
return self.payment_rate_manager
def start(self):
return defer.succeed(None)
async def start(self):
pass
def stop(self):
return defer.succeed(None)
async def stop(self):
pass
class FileManagerComponent(Component):
@ -494,7 +475,6 @@ class FileManagerComponent(Component):
'managed_files': len(self.file_manager.lbry_files)
}
@defer.inlineCallbacks
def start(self):
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
@ -511,18 +491,16 @@ class FileManagerComponent(Component):
wallet,
conf.settings.download_dir
)
yield sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT)
log.info('Starting the file manager')
self.file_manager = EncryptedFileManager(self.component_manager.peer_finder, rate_limiter, blob_manager, wallet,
payment_rate_manager, storage, sd_identifier)
yield self.file_manager.setup()
log.info('Done setting up file manager')
return self.file_manager.setup()
@defer.inlineCallbacks
def stop(self):
yield self.file_manager.stop()
return d2f(self.file_manager.stop())
class PeerProtocolServerComponent(Component):
@ -538,8 +516,7 @@ class PeerProtocolServerComponent(Component):
def component(self):
return self.lbry_server_port
@defer.inlineCallbacks
def start(self):
async def start(self):
wallet = self.component_manager.get_component(WALLET_COMPONENT)
upnp = self.component_manager.get_component(UPNP_COMPONENT)
peer_port = conf.settings['peer_port']
@ -562,7 +539,7 @@ class PeerProtocolServerComponent(Component):
try:
log.info("Peer protocol listening on TCP %i (ext port %i)", peer_port,
upnp.upnp_redirects.get("TCP", peer_port))
self.lbry_server_port = yield reactor.listenTCP(peer_port, server_factory)
self.lbry_server_port = await d2f(reactor.listenTCP(peer_port, server_factory))
except error.CannotListenError as e:
import traceback
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
@ -570,12 +547,11 @@ class PeerProtocolServerComponent(Component):
log.error("%s", traceback.format_exc())
raise ValueError("%s lbrynet may already be running on your computer." % str(e))
@defer.inlineCallbacks
def stop(self):
async def stop(self):
if self.lbry_server_port is not None:
self.lbry_server_port, old_port = None, self.lbry_server_port
log.info('Stop listening on port %s', old_port.port)
yield old_port.stopListening()
await d2f(old_port.stopListening())
class ReflectorComponent(Component):
@ -591,25 +567,23 @@ class ReflectorComponent(Component):
def component(self):
return self.reflector_server
@defer.inlineCallbacks
def start(self):
async def start(self):
log.info("Starting reflector server")
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
reflector_factory = ReflectorServerFactory(self.component_manager.peer_manager, blob_manager, file_manager)
try:
self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory)
self.reflector_server = await d2f(reactor.listenTCP(self.reflector_server_port, reflector_factory))
log.info('Started reflector on port %s', self.reflector_server_port)
except error.CannotListenError as e:
log.exception("Couldn't bind reflector to port %d", self.reflector_server_port)
raise ValueError(f"{e} lbrynet may already be running on your computer.")
@defer.inlineCallbacks
def stop(self):
async def stop(self):
if self.reflector_server is not None:
log.info("Stopping reflector server")
self.reflector_server, p = None, self.reflector_server
yield p.stopListening
await d2f(p.stopListening())
class UPnPComponent(Component):
@ -630,22 +604,11 @@ class UPnPComponent(Component):
def component(self):
return self
@defer.inlineCallbacks
def _setup_redirects(self):
d = {}
if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
d["TCP"] = from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port"))
if DHT_COMPONENT not in self.component_manager.skip_components:
d["UDP"] = from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port"))
upnp_redirects = yield DeferredDict(d)
self.upnp_redirects.update(upnp_redirects)
@defer.inlineCallbacks
def _maintain_redirects(self):
async def _maintain_redirects(self):
# setup the gateway if necessary
if not self.upnp:
try:
self.upnp = yield from_future(UPnP.discover())
self.upnp = await UPnP.discover()
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err:
log.warning("upnp discovery failed: %s", err)
@ -655,7 +618,7 @@ class UPnPComponent(Component):
external_ip = None
if self.upnp:
try:
external_ip = yield from_future(self.upnp.get_external_ip())
external_ip = await self.upnp.get_external_ip()
if external_ip != "0.0.0.0" and not self.external_ip:
log.info("got external ip from UPnP: %s", external_ip)
except (asyncio.TimeoutError, UPnPError):
@ -663,7 +626,7 @@ class UPnPComponent(Component):
if external_ip == "0.0.0.0" or not external_ip:
log.warning("unable to get external ip from UPnP, checking lbry.io fallback")
external_ip = yield get_external_ip()
external_ip = await get_external_ip()
if self.external_ip and self.external_ip != external_ip:
log.info("external ip changed from %s to %s", self.external_ip, external_ip)
self.external_ip = external_ip
@ -674,10 +637,10 @@ class UPnPComponent(Component):
log.info("add UPnP port mappings")
d = {}
if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
d["TCP"] = from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port"))
d["TCP"] = self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
if DHT_COMPONENT not in self.component_manager.skip_components:
d["UDP"] = from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port"))
upnp_redirects = yield DeferredDict(d)
d["UDP"] = self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
upnp_redirects = await gather_dict(d)
log.info("set up redirects: %s", upnp_redirects)
self.upnp_redirects.update(upnp_redirects)
except (asyncio.TimeoutError, UPnPError):
@ -685,7 +648,7 @@ class UPnPComponent(Component):
return self._maintain_redirects()
elif self.upnp: # check existing redirects are still active
found = set()
mappings = yield from_future(self.upnp.get_redirects())
mappings = await self.upnp.get_redirects()
for mapping in mappings:
proto = mapping['NewProtocol']
if proto in self.upnp_redirects and mapping['NewExternalPort'] == self.upnp_redirects[proto]:
@ -693,18 +656,14 @@ class UPnPComponent(Component):
found.add(proto)
if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components:
try:
udp_port = yield from_future(
self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
)
udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
self.upnp_redirects['UDP'] = udp_port
log.info("refreshed upnp redirect for dht port: %i", udp_port)
except (asyncio.TimeoutError, UPnPError):
del self.upnp_redirects['UDP']
if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
try:
tcp_port = yield from_future(
self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
)
tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
self.upnp_redirects['TCP'] = tcp_port
log.info("refreshed upnp redirect for peer port: %i", tcp_port)
except (asyncio.TimeoutError, UPnPError):
@ -715,14 +674,13 @@ class UPnPComponent(Component):
if self.upnp_redirects:
log.debug("upnp redirects are still active")
@defer.inlineCallbacks
def start(self):
async def start(self):
log.info("detecting external ip")
if not self.use_upnp:
self.external_ip = yield get_external_ip()
self.external_ip = await get_external_ip()
return
success = False
yield self._maintain_redirects()
await self._maintain_redirects()
if self.upnp:
if not self.upnp_redirects and not all([x in self.component_manager.skip_components for x in
(DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)]):
@ -736,13 +694,11 @@ class UPnPComponent(Component):
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, self.get_status())
self._maintain_redirects_lc.start(360, now=False)
def stop(self):
if self._maintain_redirects_lc.running:
self._maintain_redirects_lc.stop()
return defer.DeferredList(
[from_future(self.upnp.delete_port_mapping(port, protocol))
for protocol, port in self.upnp_redirects.items()]
)
async def stop(self):
if self.upnp_redirects:
await asyncio.wait([
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
])
def get_status(self):
return {
@ -766,10 +722,8 @@ class ExchangeRateManagerComponent(Component):
def component(self):
return self.exchange_rate_manager
@defer.inlineCallbacks
def start(self):
yield self.exchange_rate_manager.start()
async def start(self):
self.exchange_rate_manager.start()
@defer.inlineCallbacks
def stop(self):
yield self.exchange_rate_manager.stop()
async def stop(self):
self.exchange_rate_manager.stop()

View file

@ -7,7 +7,6 @@ from typing import Callable, Optional, List
from operator import itemgetter
from binascii import hexlify, unhexlify
from copy import deepcopy
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from torba.client.baseaccount import SingleKey, HierarchicalDeterministic
@ -17,7 +16,7 @@ from lbrynet.dht.error import TimeoutError
from lbrynet.blob.blob_file import is_valid_blobhash
from lbrynet.extras import system_info
from lbrynet.extras.reflector import reupload
from lbrynet.extras.daemon.Components import d2f, f2d
from lbrynet.extras.daemon.Components import d2f
from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
from lbrynet.extras.daemon.Components import FILE_MANAGER_COMPONENT, RATE_LIMITER_COMPONENT
from lbrynet.extras.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT, PAYMENT_RATE_COMPONENT, UPNP_COMPONENT
@ -48,22 +47,15 @@ from lbrynet.extras.daemon.json_response_encoder import JSONResponseEncoder
import asyncio
import logging
from urllib import parse as urlparse
import json
import inspect
import signal
from functools import wraps
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from twisted.internet.error import ConnectionDone, ConnectionLost
from txjsonrpc import jsonrpclib
from traceback import format_exc
from lbrynet import utils
from lbrynet.p2p.Error import InvalidAuthenticationToken
from lbrynet.extras.daemon.undecorated import undecorated
from twisted.web import server
from lbrynet import conf
from aiohttp import web
@ -376,14 +368,13 @@ class Daemon(metaclass=JSONRPCServerType):
self.component_manager = component_manager or ComponentManager(
analytics_manager=self.analytics_manager,
skip_components=to_skip or [],
reactor=reactor
)
self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).items()})
self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).items()}
self._use_authentication = use_authentication or conf.settings['use_auth_http']
self._use_https = use_https or conf.settings['use_https']
self.listening_port = None
self._component_setup_deferred = None
self._component_setup_task = None
self.announced_startup = False
self.sessions = {}
self.is_first_run = is_first_run()
@ -409,6 +400,8 @@ class Daemon(metaclass=JSONRPCServerType):
self.app = web.Application()
self.app.router.add_get('/lbryapi', self.handle_old_jsonrpc)
self.app.router.add_post('/lbryapi', self.handle_old_jsonrpc)
self.app.router.add_post('/', self.handle_old_jsonrpc)
self.handler = self.app.make_handler()
self.server = None
@ -423,32 +416,28 @@ class Daemon(metaclass=JSONRPCServerType):
except OSError:
log.error('lbrynet API failed to bind TCP %s:%i for listening. Daemon is already running or this port is '
'already in use by another application.', conf.settings['api_host'], conf.settings['api_port'])
reactor.fireSystemEvent("shutdown")
except defer.CancelledError:
log.info("shutting down before finished starting")
reactor.fireSystemEvent("shutdown")
except Exception as err:
self.analytics_manager.send_server_startup_error(str(err))
log.exception('Failed to start lbrynet-daemon')
reactor.fireSystemEvent("shutdown")
async def setup(self):
log.info("Starting lbrynet-daemon")
log.info("Platform: %s", json.dumps(system_info.get_platform()))
reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
if not self.analytics_manager.is_started:
self.analytics_manager.start()
self.analytics_manager.send_server_startup()
for lc_name, lc_time in self._looping_call_times.items():
self.looping_call_manager.start(lc_name, lc_time)
def update_attribute(setup_result, component):
def update_attribute(component):
setattr(self, self.component_attributes[component.component_name], component.component)
kwargs = {component: update_attribute for component in self.component_attributes.keys()}
self._component_setup_deferred = self.component_manager.setup(**kwargs)
await self._component_setup_deferred.asFuture(asyncio.get_event_loop())
self._component_setup_task = self.component_manager.setup(**kwargs)
await self._component_setup_task
log.info("Started lbrynet-daemon")
@ -463,7 +452,6 @@ class Daemon(metaclass=JSONRPCServerType):
signal.signal(signal.SIGTERM, self._already_shutting_down)
if self.listening_port:
self.listening_port.stopListening()
self.looping_call_manager.shutdown()
if self.server is not None:
self.server.close()
await self.server.wait_closed()
@ -473,15 +461,11 @@ class Daemon(metaclass=JSONRPCServerType):
if self.analytics_manager:
self.analytics_manager.shutdown()
try:
self._component_setup_deferred.cancel()
except (AttributeError, defer.CancelledError):
self._component_setup_task.cancel()
except (AttributeError, asyncio.CancelledError):
pass
if self.component_manager is not None:
d = self.component_manager.stop()
d.addErrback(log.fail(), 'Failure while shutting down')
else:
d = defer.succeed(None)
return d
await self.component_manager.stop()
async def handle_old_jsonrpc(self, request):
data = await request.json()
@ -518,7 +502,9 @@ class Daemon(metaclass=JSONRPCServerType):
log.warning(params_error_message)
raise web.HTTPBadRequest(text=params_error_message)
result = await fn(self, *_args, **_kwargs)
result = fn(self, *_args, **_kwargs)
if asyncio.iscoroutine(result):
result = await result
return web.Response(
text=jsonrpc_dumps_pretty(result, ledger=self.ledger),
@ -596,7 +582,7 @@ class Daemon(metaclass=JSONRPCServerType):
for sd_hash, stream in self.streams.items():
stream.cancel(reason="daemon shutdown")
def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
async def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
"""
Download a blob
@ -615,13 +601,12 @@ class Daemon(metaclass=JSONRPCServerType):
blob_hash, self.blob_manager, self.component_manager.peer_finder, self.rate_limiter,
rate_manager, self.wallet_manager, timeout
)
return downloader.download()
return await d2f(downloader.download())
@defer.inlineCallbacks
def _get_stream_analytics_report(self, claim_dict):
async def _get_stream_analytics_report(self, claim_dict):
sd_hash = claim_dict.source_hash.decode()
try:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash)
except Exception:
stream_hash = None
report = {
@ -630,12 +615,12 @@ class Daemon(metaclass=JSONRPCServerType):
}
blobs = {}
try:
sd_host = yield self.blob_manager.get_host_downloaded_from(sd_hash)
sd_host = await d2f(self.blob_manager.get_host_downloaded_from(sd_hash))
except Exception:
sd_host = None
report["sd_blob"] = sd_host
if stream_hash:
blob_infos = yield self.storage.get_blobs_for_stream(stream_hash)
blob_infos = await self.storage.get_blobs_for_stream(stream_hash)
report["known_blobs"] = len(blob_infos)
else:
blob_infos = []
@ -648,32 +633,28 @@ class Daemon(metaclass=JSONRPCServerType):
# if host:
# blobs[blob_num] = host
# report["blobs"] = json.dumps(blobs)
defer.returnValue(report)
return report
@defer.inlineCallbacks
def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None):
async def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None):
"""
Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file
"""
@defer.inlineCallbacks
def _download_finished(download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
async def _download_finished(download_id, name, claim_dict):
report = await self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_finished(download_id, name, report, claim_dict)
self.analytics_manager.send_new_download_success(download_id, name, claim_dict)
@defer.inlineCallbacks
def _download_failed(error, download_id, name, claim_dict):
report = yield self._get_stream_analytics_report(claim_dict)
async def _download_failed(error, download_id, name, claim_dict):
report = await self._get_stream_analytics_report(claim_dict)
self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
report)
self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error)
if sd_hash in self.streams:
downloader = self.streams[sd_hash]
result = yield downloader.finished_deferred
defer.returnValue(result)
return await d2f(downloader.finished_deferred)
else:
download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, claim_dict)
@ -685,26 +666,26 @@ class Daemon(metaclass=JSONRPCServerType):
timeout
)
try:
lbry_file, finished_deferred = yield self.streams[sd_hash].start(
lbry_file, finished_deferred = await d2f(self.streams[sd_hash].start(
claim_dict, name, txid, nout, file_name
)
))
finished_deferred.addCallbacks(
lambda _: _download_finished(download_id, name, claim_dict),
lambda e: _download_failed(e, download_id, name, claim_dict)
)
result = yield self._get_lbry_file_dict(lbry_file)
result = await self._get_lbry_file_dict(lbry_file)
except Exception as err:
yield _download_failed(err, download_id, name, claim_dict)
await _download_failed(err, download_id, name, claim_dict)
if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)):
log.warning('Failed to get %s (%s)', name, err)
else:
log.error('Failed to get %s (%s)', name, err)
if self.streams[sd_hash].downloader and self.streams[sd_hash].code != 'running':
yield self.streams[sd_hash].downloader.stop(err)
await d2f(self.streams[sd_hash].downloader.stop(err))
result = {'error': str(err)}
finally:
del self.streams[sd_hash]
defer.returnValue(result)
return result
async def _publish_stream(self, account, name, bid, claim_dict, file_path=None, certificate=None,
claim_address=None, change_address=None):
@ -714,8 +695,8 @@ class Daemon(metaclass=JSONRPCServerType):
)
parse_lbry_uri(name)
if not file_path:
stream_hash = await d2f(self.storage.get_stream_hash_for_sd_hash(
claim_dict['stream']['source']['source']))
stream_hash = await self.storage.get_stream_hash_for_sd_hash(
claim_dict['stream']['source']['source'])
tx = await publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address)
else:
tx = await publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address)
@ -844,8 +825,7 @@ class Daemon(metaclass=JSONRPCServerType):
return self.get_est_cost_using_known_size(uri, size)
return self.get_est_cost_from_uri(uri)
@defer.inlineCallbacks
def _get_lbry_file_dict(self, lbry_file):
async def _get_lbry_file_dict(self, lbry_file):
key = hexlify(lbry_file.key) if lbry_file.key else None
full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name)
mime_type = guess_mime_type(lbry_file.file_name)
@ -856,13 +836,13 @@ class Daemon(metaclass=JSONRPCServerType):
else:
written_bytes = 0
size = yield lbry_file.get_total_bytes()
file_status = yield lbry_file.status()
size = await d2f(lbry_file.get_total_bytes())
file_status = await d2f(lbry_file.status())
num_completed = file_status.num_completed
num_known = file_status.num_known
status = file_status.running_status
result = {
return {
'completed': lbry_file.completed,
'file_name': lbry_file.file_name,
'download_directory': lbry_file.download_directory,
@ -889,10 +869,8 @@ class Daemon(metaclass=JSONRPCServerType):
'channel_name': lbry_file.channel_name,
'claim_name': lbry_file.claim_name
}
defer.returnValue(result)
@defer.inlineCallbacks
def _get_lbry_file(self, search_by, val, return_json=False):
async def _get_lbry_file(self, search_by, val, return_json=False):
lbry_file = None
if search_by in FileID:
for l_f in self.file_manager.lbry_files:
@ -902,11 +880,10 @@ class Daemon(metaclass=JSONRPCServerType):
else:
raise NoValidSearch(f'{search_by} is not a valid search operation')
if return_json and lbry_file:
lbry_file = yield self._get_lbry_file_dict(lbry_file)
defer.returnValue(lbry_file)
lbry_file = await self._get_lbry_file_dict(lbry_file)
return lbry_file
@defer.inlineCallbacks
def _get_lbry_files(self, return_json=False, **kwargs):
async def _get_lbry_files(self, return_json=False, **kwargs):
lbry_files = list(self.file_manager.lbry_files)
if kwargs:
for search_type, value in iter_lbry_file_search_values(kwargs):
@ -914,11 +891,11 @@ class Daemon(metaclass=JSONRPCServerType):
if return_json:
file_dicts = []
for lbry_file in lbry_files:
lbry_file_dict = yield self._get_lbry_file_dict(lbry_file)
lbry_file_dict = await self._get_lbry_file_dict(lbry_file)
file_dicts.append(lbry_file_dict)
lbry_files = file_dicts
log.debug("Collected %i lbry files", len(lbry_files))
defer.returnValue(lbry_files)
return lbry_files
def _sort_lbry_files(self, lbry_files, sort_by):
for field, direction in sort_by:
@ -946,18 +923,15 @@ class Daemon(metaclass=JSONRPCServerType):
downloader.setup(self.wallet_manager)
return downloader
@defer.inlineCallbacks
def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None):
async def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None):
if not downloader:
downloader = self._get_single_peer_downloader()
result = {}
search_timeout = search_timeout or conf.settings['peer_search_timeout']
blob_timeout = blob_timeout or conf.settings['sd_download_timeout']
is_available = False
reachable_peers = []
unreachable_peers = []
try:
peers = yield self.jsonrpc_peer_list(blob_hash, search_timeout)
peers = await d2f(self.jsonrpc_peer_list(blob_hash, search_timeout))
peer_infos = [{"peer": Peer(x['host'], x['port']),
"blob_hash": blob_hash,
"timeout": blob_timeout} for x in peers]
@ -968,8 +942,7 @@ class Daemon(metaclass=JSONRPCServerType):
d = downloader.download_temp_blob_from_peer(**peer_info)
dl.append(d)
dl_peers.append("%s:%i" % (peer_info['peer'].host, peer_info['peer'].port))
for dl_peer, (success, download_result) in zip(dl_peers,
(yield defer.DeferredList(dl))):
for dl_peer, (success, download_result) in zip(dl_peers, (await d2f(defer.DeferredList(dl)))):
if success:
if download_result:
reachable_peers.append(dl_peer)
@ -978,14 +951,13 @@ class Daemon(metaclass=JSONRPCServerType):
dl_results.append(download_result)
is_available = any(dl_results)
except Exception as err:
result['error'] = "Failed to get peers for blob: %s" % err
return {'error': "Failed to get peers for blob: %s" % err}
response = {
return {
'is_available': is_available,
'reachable_peers': reachable_peers,
'unreachable_peers': unreachable_peers,
}
defer.returnValue(response)
############################################################################
# #
@ -1011,11 +983,9 @@ class Daemon(metaclass=JSONRPCServerType):
(string) Shutdown message
"""
log.info("Shutting down lbrynet daemon")
reactor.callLater(0.1, reactor.fireSystemEvent, "shutdown")
return "Shutting down"
@defer.inlineCallbacks
def jsonrpc_status(self):
async def jsonrpc_status(self):
"""
Get daemon status
@ -1101,10 +1071,10 @@ class Daemon(metaclass=JSONRPCServerType):
},
}
for component in self.component_manager.components:
status = yield defer.maybeDeferred(component.get_status)
status = await d2f(defer.maybeDeferred(component.get_status))
if status:
response[component.component_name] = status
defer.returnValue(response)
return response
def jsonrpc_version(self):
"""
@ -1131,10 +1101,9 @@ class Daemon(metaclass=JSONRPCServerType):
'python_version': (str) python version,
}
"""
platform_info = system_info.get_platform()
log.info("Get version info: " + json.dumps(platform_info))
return self._render_response(platform_info)
return platform_info
def jsonrpc_report_bug(self, message=None):
"""
@ -1157,7 +1126,7 @@ class Daemon(metaclass=JSONRPCServerType):
platform_name,
__version__
)
return self._render_response(True)
return True
def jsonrpc_settings_get(self):
"""
@ -1173,7 +1142,7 @@ class Daemon(metaclass=JSONRPCServerType):
(dict) Dictionary of daemon settings
See ADJUSTABLE_SETTINGS in lbrynet/conf.py for full list of settings
"""
return self._render_response(conf.settings.get_adjustable_settings_dict())
return conf.settings.get_adjustable_settings_dict()
def jsonrpc_settings_set(self, **kwargs):
"""
@ -1260,7 +1229,7 @@ class Daemon(metaclass=JSONRPCServerType):
conf.settings.update({key: converted},
data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED))
conf.settings.save_conf_file_settings()
return self._render_response(conf.settings.get_adjustable_settings_dict())
return conf.settings.get_adjustable_settings_dict()
def jsonrpc_help(self, command=None):
"""
@ -1277,13 +1246,13 @@ class Daemon(metaclass=JSONRPCServerType):
"""
if command is None:
return self._render_response({
return {
'about': 'This is the LBRY JSON-RPC API',
'command_help': 'Pass a `command` parameter to this method to see ' +
'help for that command (e.g. `help command=resolve_name`)',
'command_list': 'Get a full list of commands using the `commands` method',
'more_info': 'Visit https://lbry.io/api for more info',
})
}
fn = self.callable_methods.get(command)
if fn is None:
@ -1291,9 +1260,9 @@ class Daemon(metaclass=JSONRPCServerType):
f"No help available for '{command}'. It is not a valid command."
)
return self._render_response({
return {
'help': textwrap.dedent(fn.__doc__ or '')
})
}
def jsonrpc_commands(self):
"""
@ -1308,7 +1277,7 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(list) list of available commands
"""
return self._render_response(sorted([command for command in self.callable_methods.keys()]))
return sorted([command for command in self.callable_methods.keys()])
@deprecated("account_balance")
def jsonrpc_wallet_balance(self, address=None):
@ -1882,8 +1851,7 @@ class Daemon(metaclass=JSONRPCServerType):
return self.get_account_or_default(account_id).receiving.get_or_create_usable_address()
@requires(FILE_MANAGER_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_file_list(self, sort=None, **kwargs):
async def jsonrpc_file_list(self, sort=None, **kwargs):
"""
List files limited by optional filters
@ -1943,13 +1911,11 @@ class Daemon(metaclass=JSONRPCServerType):
},
]
"""
result = yield self._get_lbry_files(return_json=True, **kwargs)
result = await self._get_lbry_files(return_json=True, **kwargs)
if sort:
sort_by = [self._parse_lbry_files_sort(s) for s in sort]
result = self._sort_lbry_files(result, sort_by)
response = yield self._render_response(result)
defer.returnValue(response)
return result
@requires(WALLET_COMPONENT)
async def jsonrpc_resolve_name(self, name, force=False):
@ -2179,7 +2145,7 @@ class Daemon(metaclass=JSONRPCServerType):
log.info("Already waiting on lbry://%s to start downloading", name)
await d2f(self.streams[sd_hash].data_downloading_deferred)
lbry_file = await d2f(self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False))
lbry_file = await self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
if lbry_file:
if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)):
@ -2188,15 +2154,14 @@ class Daemon(metaclass=JSONRPCServerType):
await d2f(lbry_file.start())
else:
log.info('Already have a file for %s', name)
result = await d2f(self._get_lbry_file_dict(lbry_file))
result = await self._get_lbry_file_dict(lbry_file)
else:
result = await d2f(self._download_name(name, claim_dict, sd_hash, txid, nout,
timeout=timeout, file_name=file_name))
result = await self._download_name(name, claim_dict, sd_hash, txid, nout,
timeout=timeout, file_name=file_name)
return result
@requires(FILE_MANAGER_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_file_set_status(self, status, **kwargs):
async def jsonrpc_file_set_status(self, status, **kwargs):
"""
Start or stop downloading a file
@ -2220,24 +2185,22 @@ class Daemon(metaclass=JSONRPCServerType):
raise Exception('Status must be "start" or "stop".')
search_type, value = get_lbry_file_search_value(kwargs)
lbry_file = yield self._get_lbry_file(search_type, value, return_json=False)
lbry_file = await self._get_lbry_file(search_type, value, return_json=False)
if not lbry_file:
raise Exception(f'Unable to find a file for {search_type}:{value}')
if status == 'start' and lbry_file.stopped or status == 'stop' and not lbry_file.stopped:
yield self.file_manager.toggle_lbry_file_running(lbry_file)
await d2f(self.file_manager.toggle_lbry_file_running(lbry_file))
msg = "Started downloading file" if status == 'start' else "Stopped downloading file"
else:
msg = (
"File was already being downloaded" if status == 'start'
else "File was already stopped"
)
response = yield self._render_response(msg)
defer.returnValue(response)
return msg
@requires(FILE_MANAGER_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs):
async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs):
"""
Delete a LBRY file
@ -2268,33 +2231,28 @@ class Daemon(metaclass=JSONRPCServerType):
(bool) true if deletion was successful
"""
lbry_files = yield self._get_lbry_files(return_json=False, **kwargs)
lbry_files = await self._get_lbry_files(return_json=False, **kwargs)
if len(lbry_files) > 1:
if not delete_all:
log.warning("There are %i files to delete, use narrower filters to select one",
len(lbry_files))
response = yield self._render_response(False)
defer.returnValue(response)
return False
else:
log.warning("Deleting %i files",
len(lbry_files))
if not lbry_files:
log.warning("There is no file to delete")
result = False
return False
else:
for lbry_file in lbry_files:
file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash
if lbry_file.sd_hash in self.streams:
del self.streams[lbry_file.sd_hash]
yield self.file_manager.delete_lbry_file(lbry_file,
delete_file=delete_from_download_dir)
await d2f(self.file_manager.delete_lbry_file(lbry_file, delete_file=delete_from_download_dir))
log.info("Deleted file: %s", file_name)
result = True
response = yield self._render_response(result)
defer.returnValue(response)
return True
@requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
@ -2397,8 +2355,7 @@ class Daemon(metaclass=JSONRPCServerType):
)
@requires(WALLET_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_channel_export(self, claim_id):
async def jsonrpc_channel_export(self, claim_id):
"""
Export serialized channel signing information for a given certificate claim id
@ -2412,12 +2369,10 @@ class Daemon(metaclass=JSONRPCServerType):
(str) Serialized certificate information
"""
result = yield self.wallet_manager.export_certificate_info(claim_id)
defer.returnValue(result)
return await self.wallet_manager.export_certificate_info(claim_id)
@requires(WALLET_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_channel_import(self, serialized_certificate_info):
async def jsonrpc_channel_import(self, serialized_certificate_info):
"""
Import serialized channel signing information (to allow signing new claims to the channel)
@ -2431,8 +2386,7 @@ class Daemon(metaclass=JSONRPCServerType):
(dict) Result dictionary
"""
result = yield self.wallet_manager.import_certificate_info(serialized_certificate_info)
defer.returnValue(result)
return await self.wallet_manager.import_certificate_info(serialized_certificate_info)
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
@ -3120,8 +3074,7 @@ class Daemon(metaclass=JSONRPCServerType):
@requires(WALLET_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None):
async def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None):
"""
Download and return a blob
@ -3150,7 +3103,9 @@ class Daemon(metaclass=JSONRPCServerType):
}
timeout = timeout or 30
blob = yield self._download_blob(blob_hash, rate_manager=self.payment_rate_manager, timeout=timeout)
blob = await self._download_blob(
blob_hash, rate_manager=self.payment_rate_manager, timeout=timeout
)
if encoding and encoding in decoders:
blob_file = blob.open_for_reading()
result = decoders[encoding](blob_file.read())
@ -3161,8 +3116,7 @@ class Daemon(metaclass=JSONRPCServerType):
return result
@requires(BLOB_COMPONENT, DATABASE_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_blob_delete(self, blob_hash):
async def jsonrpc_blob_delete(self, blob_hash):
"""
Delete a blob
@ -3179,16 +3133,15 @@ class Daemon(metaclass=JSONRPCServerType):
if blob_hash not in self.blob_manager.blobs:
return "Don't have that blob"
try:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(blob_hash)
yield self.storage.delete_stream(stream_hash)
stream_hash = await self.storage.get_stream_hash_for_sd_hash(blob_hash)
await self.storage.delete_stream(stream_hash)
except Exception as err:
pass
yield self.blob_manager.delete_blobs([blob_hash])
await d2f(self.blob_manager.delete_blobs([blob_hash]))
return "Deleted %s" % blob_hash
@requires(DHT_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_peer_list(self, blob_hash, timeout=None):
async def jsonrpc_peer_list(self, blob_hash, timeout=None):
"""
Get peers for blob hash
@ -3214,7 +3167,7 @@ class Daemon(metaclass=JSONRPCServerType):
finished_deferred.addTimeout(timeout or conf.settings['peer_search_timeout'], self.dht_node.clock)
finished_deferred.addErrback(trap_timeout)
peers = yield finished_deferred
peers = await d2f(finished_deferred)
results = [
{
"node_id": hexlify(node_id).decode(),
@ -3226,8 +3179,7 @@ class Daemon(metaclass=JSONRPCServerType):
return results
@requires(DATABASE_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None):
async def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None):
"""
Announce blobs to the DHT
@ -3245,7 +3197,6 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(bool) true if successful
"""
blob_hashes = []
if blob_hash:
blob_hashes.append(blob_hash)
@ -3253,17 +3204,16 @@ class Daemon(metaclass=JSONRPCServerType):
if sd_hash and stream_hash:
raise Exception("either the sd hash or the stream hash should be provided, not both")
if sd_hash:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
blobs = yield self.storage.get_blobs_for_stream(stream_hash, only_completed=True)
stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash)
blobs = await self.storage.get_blobs_for_stream(stream_hash, only_completed=True)
blob_hashes.extend(blob.blob_hash for blob in blobs if blob.blob_hash is not None)
else:
raise Exception('single argument must be specified')
yield self.storage.should_single_announce_blobs(blob_hashes, immediate=True)
await self.storage.should_single_announce_blobs(blob_hashes, immediate=True)
return True
@requires(FILE_MANAGER_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_file_reflect(self, **kwargs):
async def jsonrpc_file_reflect(self, **kwargs):
"""
Reflect all the blobs in a file matching the filter criteria
@ -3284,22 +3234,17 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(list) list of blobs reflected
"""
reflector_server = kwargs.get('reflector', None)
lbry_files = yield self._get_lbry_files(**kwargs)
lbry_files = await self._get_lbry_files(**kwargs)
if len(lbry_files) > 1:
raise Exception('Too many (%i) files found, need one' % len(lbry_files))
elif not lbry_files:
raise Exception('No file found')
lbry_file = lbry_files[0]
results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server)
return results
return await d2f(reupload.reflect_file(
lbry_files[0], reflector_server=kwargs.get('reflector', None)
))
@requires(BLOB_COMPONENT, WALLET_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None,
async def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None,
finished=None, page_size=None, page=None):
"""
Returns blob hashes. If not given filters, returns all blobs known by the blob manager
@ -3325,20 +3270,20 @@ class Daemon(metaclass=JSONRPCServerType):
"""
if uri or stream_hash or sd_hash:
if uri:
metadata = (yield f2d(self.wallet_manager.resolve(uri)))[uri]
metadata = (await self.wallet_manager.resolve(uri))[uri]
sd_hash = utils.get_sd_hash(metadata)
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash)
elif stream_hash:
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash)
sd_hash = await self.storage.get_sd_blob_hash_for_stream(stream_hash)
elif sd_hash:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash)
stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash)
sd_hash = await self.storage.get_sd_blob_hash_for_stream(stream_hash)
if stream_hash:
crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash)
blobs = yield defer.gatherResults([
crypt_blobs = await self.storage.get_blobs_for_stream(stream_hash)
blobs = await d2f(defer.gatherResults([
self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length)
for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None
])
]))
else:
blobs = []
# get_blobs_for_stream does not include the sd blob, so we'll add it manually
@ -3373,13 +3318,10 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(list) reflected blob hashes
"""
d = reupload.reflect_blob_hashes(blob_hashes, self.blob_manager, reflector_server)
d.addCallback(lambda r: self._render_response(r))
return d
return d2f(reupload.reflect_blob_hashes(blob_hashes, self.blob_manager, reflector_server))
@requires(BLOB_COMPONENT)
def jsonrpc_blob_reflect_all(self):
async def jsonrpc_blob_reflect_all(self):
"""
Reflects all saved blobs
@ -3392,15 +3334,11 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(bool) true if successful
"""
d = self.blob_manager.get_all_verified_blobs()
d.addCallback(reupload.reflect_blob_hashes, self.blob_manager)
d.addCallback(lambda r: self._render_response(r))
return d
blob_hashes = await d2f(self.blob_manager.get_all_verified_blobs())
return await d2f(reupload.reflect_blob_hashes(blob_hashes, self.blob_manager))
@requires(DHT_COMPONENT)
@defer.inlineCallbacks
def jsonrpc_peer_ping(self, node_id, address=None, port=None):
async def jsonrpc_peer_ping(self, node_id, address=None, port=None):
"""
Send a kademlia ping to the specified peer. If address and port are provided the peer is directly pinged,
if not provided the peer is located first.
@ -3416,7 +3354,6 @@ class Daemon(metaclass=JSONRPCServerType):
Returns:
(str) pong, or {'error': <error message>} if an error is encountered
"""
contact = None
if node_id and address and port:
contact = self.dht_node.contact_manager.get_contact(unhexlify(node_id), address, int(port))
@ -3426,16 +3363,15 @@ class Daemon(metaclass=JSONRPCServerType):
)
if not contact:
try:
contact = yield self.dht_node.findContact(unhexlify(node_id))
contact = await d2f(self.dht_node.findContact(unhexlify(node_id)))
except TimeoutError:
return {'error': 'timeout finding peer'}
if not contact:
return {'error': 'peer not found'}
try:
result = (yield contact.ping()).decode()
return (await d2f(contact.ping())).decode()
except TimeoutError:
result = {'error': 'ping timeout'}
return result
return {'error': 'ping timeout'}
@requires(DHT_COMPONENT)
def jsonrpc_routing_table_get(self):
@ -3495,7 +3431,7 @@ class Daemon(metaclass=JSONRPCServerType):
result['contacts'] = list(contact_set)
result['blob_hashes'] = list(blob_hashes)
result['node_id'] = hexlify(self.dht_node.node_id).decode()
return self._render_response(result)
return result
# the single peer downloader needs wallet access
@requires(DHT_COMPONENT, WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@ -3520,12 +3456,10 @@ class Daemon(metaclass=JSONRPCServerType):
"unreachable_peers": ["<ip>:<port>"]
}
"""
return self._blob_availability(blob_hash, search_timeout, blob_timeout)
@requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED])
@defer.inlineCallbacks
def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None):
async def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None):
"""
Get stream availability for lbry uri
@ -3576,26 +3510,26 @@ class Daemon(metaclass=JSONRPCServerType):
}
try:
resolved_result = (yield self.wallet_manager.resolve(uri))[uri]
resolved_result = (await self.wallet_manager.resolve(uri))[uri]
response['did_resolve'] = True
except UnknownNameError:
response['error'] = "Failed to resolve name"
defer.returnValue(response)
return response
except URIParseError:
response['error'] = "Invalid URI"
defer.returnValue(response)
return response
try:
claim_obj = smart_decode(resolved_result[uri]['claim']['hex'])
response['did_decode'] = True
except DecodeError:
response['error'] = "Failed to decode claim value"
defer.returnValue(response)
return response
response['is_stream'] = claim_obj.is_stream
if not claim_obj.is_stream:
response['error'] = "Claim for \"%s\" does not contain a stream" % uri
defer.returnValue(response)
return response
sd_hash = claim_obj.source_hash
response['sd_hash'] = sd_hash
@ -3603,28 +3537,23 @@ class Daemon(metaclass=JSONRPCServerType):
downloader = self._get_single_peer_downloader()
have_sd_blob = sd_hash in self.blob_manager.blobs
try:
sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout,
encoding="json")
sd_blob = await self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout, encoding="json")
if not have_sd_blob:
yield self.jsonrpc_blob_delete(sd_hash)
await self.jsonrpc_blob_delete(sd_hash)
if sd_blob and 'blobs' in sd_blob:
response['num_blobs_in_stream'] = len(sd_blob['blobs']) - 1
head_blob_hash = sd_blob['blobs'][0]['blob_hash']
head_blob_availability = yield self._blob_availability(head_blob_hash,
search_timeout,
blob_timeout,
downloader)
head_blob_availability = await self._blob_availability(
head_blob_hash, search_timeout, blob_timeout, downloader)
response['head_blob_availability'] = head_blob_availability
except Exception as err:
response['error'] = err
response['head_blob_hash'] = head_blob_hash
response['sd_blob_availability'] = yield self._blob_availability(sd_hash,
search_timeout,
blob_timeout,
downloader)
response['sd_blob_availability'] = await self._blob_availability(
sd_hash, search_timeout, blob_timeout, downloader)
response['is_available'] = response['sd_blob_availability'].get('is_available') and \
response['head_blob_availability'].get('is_available')
defer.returnValue(response)
return response
async def get_channel_or_error(
self, accounts: List[LBCAccount], channel_id: str = None, channel_name: str = None):

View file

@ -14,7 +14,6 @@ import aiohttp
import logging
from urllib.parse import urlparse
from lbrynet import conf
log = logging.getLogger(__name__)
USER_AGENT = "AuthServiceProxy/0.1"
@ -110,7 +109,7 @@ class AuthAPIClient:
@classmethod
async def get_client(cls, key_name=None):
api_key_name = key_name or "api"
keyring = Keyring.load_from_disk()
keyring = Keyring.load_from_disk() # pylint: disable=E0602
api_key = keyring.api_key
login_url = conf.settings.get_api_connection_string(api_key_name, api_key.secret)
@ -127,7 +126,7 @@ class AuthAPIClient:
async with session.post(login_url, headers=headers) as r:
cookies = r.cookies
uid = cookies.get(TWISTED_SECURE_SESSION if conf.settings['use_https'] else TWISTED_SESSION).value
api_key = APIKey.create(seed=uid.encode())
api_key = APIKey.create(seed=uid.encode()) # pylint: disable=E0602
return cls(api_key, session, cookies, url, login_url)

View file

@ -11,7 +11,7 @@ from lbrynet.p2p.StreamDescriptor import download_sd_blob
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from torba.client.constants import COIN
from lbrynet.extras.wallet.dewies import dewies_to_lbc
from lbrynet.extras.daemon.Components import f2d
from lbrynet.extras.compat import f2d
INITIALIZING_CODE = 'initializing'
DOWNLOAD_METADATA_CODE = 'downloading_metadata'

View file

@ -50,18 +50,18 @@ class Publisher:
)
# check if we have a file already for this claim (if this is a publish update with a new stream)
old_stream_hashes = await d2f(self.storage.get_old_stream_hashes_for_claim_id(
old_stream_hashes = await self.storage.get_old_stream_hashes_for_claim_id(
tx.outputs[0].claim_id, self.lbry_file.stream_hash
))
)
if old_stream_hashes:
for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes,
list(self.lbry_file_manager.lbry_files)):
await d2f(self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False))
log.info("Removed old stream for claim update: %s", lbry_file.stream_hash)
await d2f(self.storage.save_content_claim(
await self.storage.save_content_claim(
self.lbry_file.stream_hash, tx.outputs[0].id
))
)
return tx
async def publish_stream(self, name, bid, claim_dict, stream_hash, holding_address=None):

View file

@ -1,17 +1,16 @@
import asyncio
import logging
import os
import sqlite3
import traceback
import typing
from binascii import hexlify, unhexlify
from twisted.internet import defer, task, threads
from twisted.enterprise import adbapi
from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbrynet import conf
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.decode import smart_decode
from lbrynet.blob.CryptBlob import CryptBlobInfo
from lbrynet.dht.constants import dataExpireTimeout
from torba.client.basedatabase import SQLiteMixin
log = logging.getLogger(__name__)
@ -44,63 +43,23 @@ def _open_file_for_writing(download_directory, suggested_file_name):
return os.path.basename(file_path)
def open_file_for_writing(download_directory, suggested_file_name):
"""
Used to touch the path of a file to be downloaded
:param download_directory: (str)
:param suggested_file_name: (str)
:return: (str) basename
"""
return threads.deferToThread(_open_file_for_writing, download_directory, suggested_file_name)
async def open_file_for_writing(download_directory: str, suggested_file_name: str) -> str:
""" Used to touch the path of a file to be downloaded. """
return await asyncio.get_event_loop().run_in_executor(
None, _open_file_for_writing, download_directory, suggested_file_name
)
def rerun_if_locked(f):
max_attempts = 5
def rerun(err, rerun_count, *args, **kwargs):
connection = args[0]
reactor = connection.reactor
log.debug("Failed to execute (%s): %s", err, args)
if err.check(sqlite3.OperationalError) and "database is locked" in str(err.value):
log.warning("database was locked. rerunning %s with args %s, kwargs %s",
str(f), str(args), str(kwargs))
if rerun_count < max_attempts:
delay = 2**rerun_count
return task.deferLater(reactor, delay, inner_wrapper, rerun_count + 1, *args, **kwargs)
raise err
def check_needed_rerun(result, rerun_count):
if rerun_count:
log.info("successfully reran database query")
return result
def inner_wrapper(rerun_count, *args, **kwargs):
d = f(*args, **kwargs)
d.addCallback(check_needed_rerun, rerun_count)
d.addErrback(rerun, rerun_count, *args, **kwargs)
return d
def wrapper(*args, **kwargs):
return inner_wrapper(0, *args, **kwargs)
return wrapper
async def looping_call(interval, fun):
while True:
try:
await fun()
except Exception as e:
log.exception('Looping call experienced exception:', exc_info=e)
await asyncio.sleep(interval)
class SqliteConnection(adbapi.ConnectionPool):
def __init__(self, db_path):
super().__init__('sqlite3', db_path, check_same_thread=False)
@rerun_if_locked
def runInteraction(self, interaction, *args, **kw):
return super().runInteraction(interaction, *args, **kw)
@classmethod
def set_reactor(cls, reactor):
cls.reactor = reactor
class SQLiteStorage:
class SQLiteStorage(SQLiteMixin):
CREATE_TABLES_QUERY = """
pragma foreign_keys=on;
@ -173,70 +132,45 @@ class SQLiteStorage:
);
"""
def __init__(self, db_dir, reactor=None):
if not reactor:
def __init__(self, path):
super().__init__(path)
from twisted.internet import reactor
self.db_dir = db_dir
self._db_path = os.path.join(db_dir, "lbrynet.sqlite")
log.info("connecting to database: %s", self._db_path)
self.db = SqliteConnection(self._db_path)
self.db.set_reactor(reactor)
self.clock = reactor
# used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a
# change to the associated content claim occurs. these are added by the file manager
# when it loads each file
self.content_claim_callbacks = {} # {<stream_hash>: <callable returning a deferred>}
self.content_claim_callbacks = {}
self.check_should_announce_lc = None
async def open(self):
await super().open()
if 'reflector' not in conf.settings['components_to_skip']:
self.check_should_announce_lc = task.LoopingCall(self.verify_will_announce_all_head_and_sd_blobs)
self.check_should_announce_lc = looping_call(
600, self.verify_will_announce_all_head_and_sd_blobs
)
@defer.inlineCallbacks
def setup(self):
def _create_tables(transaction):
transaction.executescript(self.CREATE_TABLES_QUERY)
yield self.db.runInteraction(_create_tables)
if self.check_should_announce_lc and not self.check_should_announce_lc.running:
self.check_should_announce_lc.start(600)
defer.returnValue(None)
async def close(self):
if self.check_should_announce_lc is not None:
self.check_should_announce_lc.close()
await super().close()
@defer.inlineCallbacks
def run_and_return_one_or_none(self, query, *args):
result = yield self.db.runQuery(query, args)
if result:
defer.returnValue(result[0][0])
else:
defer.returnValue(None)
async def run_and_return_one_or_none(self, query, *args):
for row in await self.db.execute_fetchall(query, args):
return row
@defer.inlineCallbacks
def run_and_return_list(self, query, *args):
result = yield self.db.runQuery(query, args)
if result:
defer.returnValue([i[0] for i in result])
else:
defer.returnValue([])
async def run_and_return_list(self, query, *args):
rows = list(await self.db.execute_fetchall(query, args))
return [col[0] for col in rows] if rows else []
def run_and_return_id(self, query, *args):
def do_save(t):
t.execute(query, args)
return t.lastrowid
return self.db.runInteraction(do_save)
def stop(self):
if self.check_should_announce_lc and self.check_should_announce_lc.running:
self.check_should_announce_lc.stop()
self.db.close()
return defer.succeed(True)
async def run_and_return_id(self, query, *args):
return (await self.db.execute(query, args)).lastrowid
# # # # # # # # # blob functions # # # # # # # # #
def add_completed_blob(self, blob_hash, length, next_announce_time, should_announce, status="finished"):
log.debug("Adding a completed blob. blob_hash=%s, length=%i", blob_hash, length)
values = (blob_hash, length, next_announce_time or 0, int(bool(should_announce)), status, 0, 0)
return self.db.runOperation("insert or replace into blob values (?, ?, ?, ?, ?, ?, ?)", values)
return self.db.execute("insert or replace into blob values (?, ?, ?, ?, ?, ?, ?)", values)
def set_should_announce(self, blob_hash, next_announce_time, should_announce):
return self.db.runOperation(
return self.db.execute(
"update blob set next_announce_time=?, should_announce=? where blob_hash=?",
(next_announce_time or 0, int(bool(should_announce)), blob_hash)
)
@ -246,9 +180,8 @@ class SQLiteStorage:
"select status from blob where blob_hash=?", blob_hash
)
@defer.inlineCallbacks
def add_known_blob(self, blob_hash, length):
yield self.db.runOperation(
return self.db.execute(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", (blob_hash, length, 0, 0, "pending", 0, 0)
)
@ -267,12 +200,11 @@ class SQLiteStorage:
"select blob_hash from blob where should_announce=1 and status='finished'"
)
@defer.inlineCallbacks
def get_all_finished_blobs(self):
blob_hashes = yield self.run_and_return_list(
async def get_all_finished_blobs(self):
blob_hashes = await self.run_and_return_list(
"select blob_hash from blob where status='finished'"
)
defer.returnValue([unhexlify(blob_hash) for blob_hash in blob_hashes])
return [unhexlify(blob_hash) for blob_hash in blob_hashes]
def count_finished_blobs(self):
return self.run_and_return_one_or_none(
@ -280,7 +212,7 @@ class SQLiteStorage:
)
def update_last_announced_blob(self, blob_hash, last_announced):
return self.db.runOperation(
return self.db.execute(
"update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?",
(int(last_announced + (dataExpireTimeout / 2)), int(last_announced), blob_hash)
)
@ -298,7 +230,7 @@ class SQLiteStorage:
transaction.execute(
"update blob set single_announce=1 where blob_hash=? and status='finished'", (blob_hash, )
)
return self.db.runInteraction(set_single_announce)
return self.db.run(set_single_announce)
def get_blobs_to_announce(self):
def get_and_update(transaction):
@ -317,13 +249,13 @@ class SQLiteStorage:
)
blobs = [b[0] for b in r.fetchall()]
return blobs
return self.db.runInteraction(get_and_update)
return self.db.run(get_and_update)
def delete_blobs_from_db(self, blob_hashes):
def delete_blobs(transaction):
for blob_hash in blob_hashes:
transaction.execute("delete from blob where blob_hash=?;", (blob_hash,))
return self.db.runInteraction(delete_blobs)
return self.db.run(delete_blobs)
def get_all_blob_hashes(self):
return self.run_and_return_list("select blob_hash from blob")
@ -336,17 +268,16 @@ class SQLiteStorage:
transaction.execute("insert into stream_blob values (?, ?, ?, ?)",
(stream_hash, blob_info.get('blob_hash', None),
blob_info['blob_num'], blob_info['iv']))
return self.db.runInteraction(_add_stream_blobs)
return self.db.run(_add_stream_blobs)
@defer.inlineCallbacks
def add_known_blobs(self, blob_infos):
async def add_known_blobs(self, blob_infos):
for blob_info in blob_infos:
if blob_info.get('blob_hash') and blob_info['length']:
yield self.add_known_blob(blob_info['blob_hash'], blob_info['length'])
await self.add_known_blob(blob_info['blob_hash'], blob_info['length'])
def verify_will_announce_head_and_sd_blobs(self, stream_hash):
# fix should_announce for imported head and sd blobs
return self.db.runOperation(
return self.db.execute(
"update blob set should_announce=1 "
"where should_announce=0 and "
"blob.blob_hash in "
@ -358,7 +289,7 @@ class SQLiteStorage:
)
def verify_will_announce_all_head_and_sd_blobs(self):
return self.db.runOperation(
return self.db.execute(
"update blob set should_announce=1 "
"where should_announce=0 and "
"blob.blob_hash in "
@ -383,23 +314,24 @@ class SQLiteStorage:
:param stream_blob_infos: (list) of blob info dictionaries
:return: (defer.Deferred)
"""
def _store_stream(transaction):
transaction.execute("insert into stream values (?, ?, ?, ?, ?);",
(stream_hash, sd_hash, stream_key, stream_name,
suggested_file_name))
transaction.execute(
"insert into stream values (?, ?, ?, ?, ?);", (
stream_hash, sd_hash, stream_key, stream_name, suggested_file_name
)
)
for blob_info in stream_blob_infos:
transaction.execute("insert into stream_blob values (?, ?, ?, ?)",
(stream_hash, blob_info.get('blob_hash', None),
blob_info['blob_num'], blob_info['iv']))
transaction.execute(
"insert into stream_blob values (?, ?, ?, ?)", (
stream_hash, blob_info.get('blob_hash', None),
blob_info['blob_num'], blob_info['iv']
)
)
return self.db.run(_store_stream)
return self.db.runInteraction(_store_stream)
@defer.inlineCallbacks
def delete_stream(self, stream_hash):
sd_hash = yield self.get_sd_blob_hash_for_stream(stream_hash)
stream_blobs = yield self.get_blobs_for_stream(stream_hash)
async def delete_stream(self, stream_hash):
sd_hash = await self.get_sd_blob_hash_for_stream(stream_hash)
stream_blobs = await self.get_blobs_for_stream(stream_hash)
blob_hashes = [b.blob_hash for b in stream_blobs if b.blob_hash is not None]
def _delete_stream(transaction):
@ -407,24 +339,28 @@ class SQLiteStorage:
transaction.execute("delete from file where stream_hash=? ", (stream_hash, ))
transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, ))
transaction.execute("delete from stream where stream_hash=? ", (stream_hash, ))
transaction.execute("delete from blob where blob_hash=?", (sd_hash, ))
transaction.execute("delete from blob where blob_hash=?", sd_hash)
for blob_hash in blob_hashes:
transaction.execute("delete from blob where blob_hash=?;", (blob_hash, ))
yield self.db.runInteraction(_delete_stream)
await self.db.run(_delete_stream)
def get_all_streams(self):
return self.run_and_return_list("select stream_hash from stream")
def get_stream_info(self, stream_hash):
d = self.db.runQuery("select stream_name, stream_key, suggested_filename, sd_hash from stream "
"where stream_hash=?", (stream_hash, ))
d.addCallback(lambda r: None if not r else r[0])
return d
return self.run_and_return_one_or_none(
"select stream_name, stream_key, suggested_filename, sd_hash from stream "
"where stream_hash=?", stream_hash
)
def check_if_stream_exists(self, stream_hash):
d = self.db.runQuery("select stream_hash from stream where stream_hash=?", (stream_hash, ))
d.addCallback(lambda r: bool(len(r)))
return d
async def check_if_stream_exists(self, stream_hash):
row = await self.run_and_return_one_or_none(
"select stream_hash from stream where stream_hash=?", stream_hash
)
if row is not None:
return bool(len(row))
return False
def get_blob_num_by_hash(self, stream_hash, blob_hash):
return self.run_and_return_one_or_none(
@ -466,7 +402,7 @@ class SQLiteStorage:
crypt_blob_infos.append(CryptBlobInfo(blob_hash, position, blob_length, iv))
crypt_blob_infos = sorted(crypt_blob_infos, key=lambda info: info.blob_num)
return crypt_blob_infos
return self.db.runInteraction(_get_blobs_for_stream)
return self.db.run(_get_blobs_for_stream)
def get_pending_blobs_for_stream(self, stream_hash):
return self.run_and_return_list(
@ -493,14 +429,12 @@ class SQLiteStorage:
# # # # # # # # # file stuff # # # # # # # # #
@defer.inlineCallbacks
def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate):
async def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate):
# touch the closest available file to the file name
file_name = yield open_file_for_writing(unhexlify(download_directory).decode(), unhexlify(file_name).decode())
result = yield self.save_published_file(
file_name = await open_file_for_writing(unhexlify(download_directory).decode(), unhexlify(file_name).decode())
return await self.save_published_file(
stream_hash, hexlify(file_name.encode()), download_directory, data_payment_rate
)
defer.returnValue(result)
def save_published_file(self, stream_hash, file_name, download_directory, data_payment_rate, status="stopped"):
return self.run_and_return_id(
@ -509,7 +443,9 @@ class SQLiteStorage:
)
def get_filename_for_rowid(self, rowid):
return self.run_and_return_one_or_none("select file_name from file where rowid=?", rowid)
return self.run_and_return_one_or_none(
"select file_name from file where rowid=?", rowid
)
def get_all_lbry_files(self):
def _lbry_file_dict(rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key,
@ -535,13 +471,11 @@ class SQLiteStorage:
).fetchall()
]
d = self.db.runInteraction(_get_all_files)
return d
return self.db.run(_get_all_files)
def change_file_status(self, rowid, new_status):
d = self.db.runQuery("update file set status=? where rowid=?", (new_status, rowid))
d.addCallback(lambda _: new_status)
return d
async def change_file_status(self, rowid, new_status):
await self.db.execute("update file set status=? where rowid=?", (new_status, rowid))
return new_status
def get_lbry_file_status(self, rowid):
return self.run_and_return_one_or_none(
@ -565,7 +499,7 @@ class SQLiteStorage:
("%s:%i" % (support['txid'], support['nout']), claim_id, lbc_to_dewies(support['amount']),
support.get('address', ""))
)
return self.db.runInteraction(_save_support)
return self.db.run(_save_support)
def get_supports(self, *claim_ids):
def _format_support(outpoint, supported_id, amount, address):
@ -587,15 +521,16 @@ class SQLiteStorage:
)
]
return self.db.runInteraction(_get_supports)
return self.db.run(_get_supports)
# # # # # # # # # claim functions # # # # # # # # #
@defer.inlineCallbacks
def save_claims(self, claim_infos):
async def save_claims(self, claim_infos):
support_callbacks = []
update_file_callbacks = []
def _save_claims(transaction):
content_claims_to_update = []
support_callbacks = []
for claim_info in claim_infos:
outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout'])
claim_id = claim_info['claim_id']
@ -622,7 +557,7 @@ class SQLiteStorage:
)
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info
support_callbacks.append(self.save_supports(claim_id, claim_info['supports']))
support_callbacks.append((claim_id, claim_info['supports']))
if not source_hash:
continue
stream_hash = transaction.execute(
@ -644,18 +579,18 @@ class SQLiteStorage:
content_claims_to_update.append((stream_hash, outpoint))
elif known_outpoint != outpoint:
content_claims_to_update.append((stream_hash, outpoint))
update_file_callbacks = []
for stream_hash, outpoint in content_claims_to_update:
self._save_content_claim(transaction, outpoint, stream_hash)
if stream_hash in self.content_claim_callbacks:
update_file_callbacks.append(self.content_claim_callbacks[stream_hash]())
return update_file_callbacks, support_callbacks
content_dl, support_dl = yield self.db.runInteraction(_save_claims)
if content_dl:
yield defer.DeferredList(content_dl)
if support_dl:
yield defer.DeferredList(support_dl)
await self.db.run(_save_claims)
if update_file_callbacks:
await asyncio.wait(update_file_callbacks)
if support_callbacks:
await asyncio.wait([
self.save_supports(*args) for args in support_callbacks
])
def save_claims_for_resolve(self, claim_infos):
to_save = []
@ -718,16 +653,13 @@ class SQLiteStorage:
# update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint))
@defer.inlineCallbacks
def save_content_claim(self, stream_hash, claim_outpoint):
yield self.db.runInteraction(self._save_content_claim, claim_outpoint, stream_hash)
async def save_content_claim(self, stream_hash, claim_outpoint):
await self.db.run(self._save_content_claim, claim_outpoint, stream_hash)
# update corresponding ManagedEncryptedFileDownloader object
if stream_hash in self.content_claim_callbacks:
file_callback = self.content_claim_callbacks[stream_hash]
yield file_callback()
await self.content_claim_callbacks[stream_hash]()
@defer.inlineCallbacks
def get_content_claim(self, stream_hash, include_supports=True):
async def get_content_claim(self, stream_hash, include_supports=True):
def _get_claim_from_stream_hash(transaction):
claim_info = transaction.execute(
"select c.*, "
@ -745,15 +677,13 @@ class SQLiteStorage:
result['channel_name'] = channel_name
return result
result = yield self.db.runInteraction(_get_claim_from_stream_hash)
result = await self.db.run(_get_claim_from_stream_hash)
if result and include_supports:
supports = yield self.get_supports(result['claim_id'])
result['supports'] = supports
result['effective_amount'] = calculate_effective_amount(result['amount'], supports)
defer.returnValue(result)
result['supports'] = await self.get_supports(result['claim_id'])
result['effective_amount'] = calculate_effective_amount(result['amount'], result['supports'])
return result
@defer.inlineCallbacks
def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True):
async def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True):
def _batch_get_claim(transaction):
results = {}
claim_infos = _batched_select(
@ -781,10 +711,10 @@ class SQLiteStorage:
results[stream_hash]['channel_name'] = channel_name
return results
claims = yield self.db.runInteraction(_batch_get_claim)
claims = await self.db.run(_batch_get_claim)
if include_supports:
all_supports = {}
for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])):
for support in await self.get_supports(*[claim['claim_id'] for claim in claims.values()]):
all_supports.setdefault(support['claim_id'], []).append(support)
for stream_hash in claims.keys():
claim = claims[stream_hash]
@ -792,28 +722,28 @@ class SQLiteStorage:
claim['supports'] = supports
claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
claims[stream_hash] = claim
defer.returnValue(claims)
return claims
@defer.inlineCallbacks
def get_claim(self, claim_outpoint, include_supports=True):
async def get_claim(self, claim_outpoint, include_supports=True):
def _get_claim(transaction):
claim_info = transaction.execute("select c.*, "
claim_info = transaction.execute(
"select c.*, "
"case when c.channel_claim_id is not null then "
"(select claim_name from claim where claim_id==c.channel_claim_id) "
"else null end as channel_name from claim c where claim_outpoint = ?",
(claim_outpoint,)).fetchone()
(claim_outpoint,)
).fetchone()
channel_name = claim_info[-1]
result = _format_claim_response(*claim_info[:-1])
if channel_name:
result['channel_name'] = channel_name
return result
result = yield self.db.runInteraction(_get_claim)
result = await self.db.run(_get_claim)
if include_supports:
supports = yield self.get_supports(result['claim_id'])
result['supports'] = supports
result['effective_amount'] = calculate_effective_amount(result['amount'], supports)
defer.returnValue(result)
result['supports'] = await self.get_supports(result['claim_id'])
result['effective_amount'] = calculate_effective_amount(result['amount'], result['supports'])
return result
def get_unknown_certificate_ids(self):
def _get_unknown_certificate_claim_ids(transaction):
@ -825,11 +755,10 @@ class SQLiteStorage:
"(select c2.claim_id from claim as c2)"
).fetchall()
]
return self.db.runInteraction(_get_unknown_certificate_claim_ids)
return self.db.run(_get_unknown_certificate_claim_ids)
@defer.inlineCallbacks
def get_pending_claim_outpoints(self):
claim_outpoints = yield self.run_and_return_list("select claim_outpoint from claim where height=-1")
async def get_pending_claim_outpoints(self):
claim_outpoints = await self.run_and_return_list("select claim_outpoint from claim where height=-1")
results = {} # {txid: [nout, ...]}
for outpoint_str in claim_outpoints:
txid, nout = outpoint_str.split(":")
@ -838,7 +767,7 @@ class SQLiteStorage:
results[txid] = outputs
if results:
log.debug("missing transaction heights for %i claims", len(results))
defer.returnValue(results)
return results
def save_claim_tx_heights(self, claim_tx_heights):
def _save_claim_heights(transaction):
@ -847,17 +776,17 @@ class SQLiteStorage:
"update claim set height=? where claim_outpoint=? and height=-1",
(height, outpoint)
)
return self.db.runInteraction(_save_claim_heights)
return self.db.run(_save_claim_heights)
# # # # # # # # # reflector functions # # # # # # # # #
def update_reflected_stream(self, sd_hash, reflector_address, success=True):
if success:
return self.db.runOperation(
return self.db.execute(
"insert or replace into reflected_stream values (?, ?, ?)",
(sd_hash, reflector_address, self.clock.seconds())
)
return self.db.runOperation(
return self.db.execute(
"delete from reflected_stream where sd_hash=? and reflector_address=?",
(sd_hash, reflector_address)
)

View file

@ -277,7 +277,7 @@ class LbryWalletManager(BaseWalletManager):
if 'error' not in results:
await self.old_db.save_claims_for_resolve([
value for value in results.values() if 'error' not in value
]).asFuture(asyncio.get_event_loop())
])
return results
async def get_claims_for_name(self, name: str):
@ -432,7 +432,7 @@ class LbryWalletManager(BaseWalletManager):
await account.ledger.broadcast(tx)
await self.old_db.save_claims([self._old_get_temp_claim_info(
tx, tx.outputs[0], claim_address, claim_dict, name, dewies_to_lbc(amount)
)]).asFuture(asyncio.get_event_loop())
)])
# TODO: release reserved tx outputs in case anything fails by this point
return tx
@ -446,7 +446,7 @@ class LbryWalletManager(BaseWalletManager):
'address': holding_address,
'claim_id': claim_id,
'amount': dewies_to_lbc(amount)
}]).asFuture(asyncio.get_event_loop())
}])
return tx
async def tip_claim(self, amount, claim_id, account):
@ -461,7 +461,7 @@ class LbryWalletManager(BaseWalletManager):
'address': claim_to_tip['address'],
'claim_id': claim_id,
'amount': dewies_to_lbc(amount)
}]).asFuture(asyncio.get_event_loop())
}])
return tx
async def abandon_claim(self, claim_id, txid, nout, account):
@ -484,7 +484,7 @@ class LbryWalletManager(BaseWalletManager):
await self.old_db.save_claims([self._old_get_temp_claim_info(
tx, tx.outputs[0], address, cert, channel_name, dewies_to_lbc(amount)
)]).asFuture(asyncio.get_event_loop())
)])
return tx
def _old_get_temp_claim_info(self, tx, txo, address, claim_dict, name, bid):

View file

@ -131,9 +131,6 @@ class LBRYBlockProcessor(BlockProcessor):
super().backup_blocks(raw_blocks=raw_blocks)
self.db.batched_flush_claims()
def shutdown(self):
self.db.shutdown()
async def flush(self, flush_utxos):
self.db.batched_flush_claims()
return await super().flush(flush_utxos)

View file

@ -21,7 +21,7 @@ class LBRYDB(DB):
self.pending_abandons = {}
super().__init__(*args, **kwargs)
def shutdown(self):
def close(self):
self.batched_flush_claims()
self.claims_db.close()
self.names_db.close()
@ -29,9 +29,7 @@ class LBRYDB(DB):
self.outpoint_to_claim_id_db.close()
self.claim_undo_db.close()
self.utxo_db.close()
# electrumx ones
self.utxo_db.close()
self.history.close_db()
super().close()
async def _open_dbs(self, for_sync, compacting):
await super()._open_dbs(for_sync=for_sync, compacting=compacting)