forked from LBRYCommunity/lbry-sdk
update daemon and components
This commit is contained in:
parent
ea15674e62
commit
494917158c
9 changed files with 590 additions and 1350 deletions
|
@ -2,22 +2,63 @@ import sys
|
|||
import json
|
||||
import asyncio
|
||||
import argparse
|
||||
import logging
|
||||
import typing
|
||||
import logging.handlers
|
||||
import aiohttp
|
||||
from docopt import docopt
|
||||
from textwrap import dedent
|
||||
|
||||
import aiohttp
|
||||
from lbrynet.extras.compat import force_asyncioreactor_install
|
||||
force_asyncioreactor_install()
|
||||
|
||||
from lbrynet import log_support, __name__ as lbrynet_name, __version__ as lbrynet_version
|
||||
from lbrynet.extras.daemon.loggly_handler import get_loggly_handler
|
||||
from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version
|
||||
from lbrynet.conf import Config, CLIConfig
|
||||
from lbrynet.utils import check_connection
|
||||
from lbrynet.extras.daemon.Daemon import Daemon
|
||||
from lbrynet.extras.daemon.loggly_handler import get_loggly_handler
|
||||
|
||||
log = logging.getLogger(lbrynet_name)
|
||||
log.addHandler(logging.NullHandler())
|
||||
default_formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s")
|
||||
|
||||
optional_path_getter_type = typing.Optional[typing.Callable[[], str]]
|
||||
|
||||
|
||||
async def start_daemon(conf: Config, args):
|
||||
file_handler = logging.handlers.RotatingFileHandler(conf.log_file_path,
|
||||
maxBytes=2097152, backupCount=5)
|
||||
file_handler.setFormatter(default_formatter)
|
||||
log.addHandler(file_handler)
|
||||
|
||||
if not args.quiet:
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(default_formatter)
|
||||
log.addHandler(handler)
|
||||
|
||||
# mostly disable third part logging
|
||||
logging.getLogger('urllib3').setLevel(logging.CRITICAL)
|
||||
logging.getLogger('BitcoinRPC').setLevel(logging.INFO)
|
||||
logging.getLogger('aioupnp').setLevel(logging.WARNING)
|
||||
logging.getLogger('aiohttp').setLevel(logging.CRITICAL)
|
||||
|
||||
if args.verbose:
|
||||
log.setLevel(logging.DEBUG)
|
||||
else:
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
if conf.share_usage_data:
|
||||
loggly_handler = get_loggly_handler()
|
||||
loggly_handler.setLevel(logging.ERROR)
|
||||
log.addHandler(loggly_handler)
|
||||
|
||||
log.info("Starting lbrynet-daemon from command line")
|
||||
daemon = Daemon(conf)
|
||||
|
||||
try:
|
||||
await daemon.start_listening()
|
||||
except (OSError, asyncio.CancelledError):
|
||||
return 1
|
||||
try:
|
||||
await daemon.server.wait_closed()
|
||||
except (KeyboardInterrupt, asyncio.CancelledError):
|
||||
await daemon.shutdown()
|
||||
return 0
|
||||
|
||||
|
||||
def display(data):
|
||||
|
@ -179,55 +220,27 @@ def main(argv=None):
|
|||
argv = argv or sys.argv[1:]
|
||||
parser = get_argument_parser()
|
||||
args, command_args = parser.parse_known_args(argv)
|
||||
|
||||
conf = Config.create_from_arguments(args)
|
||||
|
||||
if args.cli_version:
|
||||
print(f"{lbrynet_name} {lbrynet_version}")
|
||||
return 0
|
||||
|
||||
elif args.command == 'start':
|
||||
|
||||
if args.help:
|
||||
args.start_parser.print_help()
|
||||
return 0
|
||||
|
||||
log_support.configure_logging(conf.log_file_path, not args.quiet, args.verbose)
|
||||
|
||||
if conf.share_usage_data:
|
||||
loggly_handler = get_loggly_handler()
|
||||
loggly_handler.setLevel(logging.ERROR)
|
||||
log.addHandler(loggly_handler)
|
||||
|
||||
log.debug('Final Settings: %s', conf.settings_dict)
|
||||
log.info("Starting lbrynet-daemon from command line")
|
||||
|
||||
daemon = Daemon(conf)
|
||||
|
||||
if check_connection():
|
||||
from twisted.internet import reactor
|
||||
reactor._asyncioEventloop.create_task(daemon.start())
|
||||
reactor.run()
|
||||
else:
|
||||
log.info("Not connected to internet, unable to start")
|
||||
|
||||
return asyncio.run(start_daemon(conf, args))
|
||||
elif args.command is not None:
|
||||
|
||||
doc = args.doc
|
||||
api_method_name = args.api_method_name
|
||||
if args.replaced_by:
|
||||
print(f"{args.api_method_name} is deprecated, using {args.replaced_by['api_method_name']}.")
|
||||
doc = args.replaced_by['doc']
|
||||
api_method_name = args.replaced_by['api_method_name']
|
||||
|
||||
if args.help:
|
||||
print(doc)
|
||||
return 0
|
||||
else:
|
||||
parsed = docopt(doc, command_args)
|
||||
params = set_kwargs(parsed)
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(execute_command(conf, api_method_name, params))
|
||||
|
||||
asyncio.run(execute_command(conf, api_method_name, params))
|
||||
elif args.group is not None:
|
||||
args.group_parser.print_help()
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from twisted.internet import defer
|
||||
from twisted._threads import AlreadyQuit
|
||||
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.extras.daemon.ComponentManager import ComponentManager
|
||||
|
||||
|
@ -57,7 +55,7 @@ class Component(metaclass=ComponentType):
|
|||
result = await self.start()
|
||||
self._running = True
|
||||
return result
|
||||
except (defer.CancelledError, AlreadyQuit):
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as err:
|
||||
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
|
||||
|
@ -68,7 +66,7 @@ class Component(metaclass=ComponentType):
|
|||
result = await self.stop()
|
||||
self._running = False
|
||||
return result
|
||||
except (defer.CancelledError, AlreadyQuit):
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as err:
|
||||
log.exception("Error stopping %s", self.__class__.__name__)
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from lbrynet.p2p.Error import ComponentStartConditionNotMet
|
||||
from lbrynet.extras.daemon.PeerManager import PeerManager
|
||||
from lbrynet.extras.daemon.PeerFinder import DHTPeerFinder
|
||||
import asyncio
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.error import ComponentStartConditionNotMet
|
||||
from lbrynet.dht.peer import PeerManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -35,16 +34,16 @@ class RequiredCondition(metaclass=RequiredConditionType):
|
|||
class ComponentManager:
|
||||
default_component_classes = {}
|
||||
|
||||
def __init__(self, conf: Config, reactor=None, analytics_manager=None, skip_components=None,
|
||||
peer_manager=None, peer_finder=None, **override_components):
|
||||
def __init__(self, conf: Config, analytics_manager=None, skip_components=None,
|
||||
peer_manager=None, **override_components):
|
||||
self.conf = conf
|
||||
self.skip_components = skip_components or []
|
||||
self.reactor = reactor
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.analytics_manager = analytics_manager
|
||||
self.component_classes = {}
|
||||
self.components = set()
|
||||
self.analytics_manager = analytics_manager
|
||||
self.peer_manager = peer_manager or PeerManager()
|
||||
self.peer_finder = peer_finder or DHTPeerFinder(self)
|
||||
self.started = asyncio.Event(loop=self.loop)
|
||||
self.peer_manager = peer_manager or PeerManager(asyncio.get_event_loop_policy().get_event_loop())
|
||||
|
||||
for component_name, component_class in self.default_component_classes.items():
|
||||
if component_name in override_components:
|
||||
|
@ -127,7 +126,7 @@ class ComponentManager:
|
|||
if component.component_name in callbacks:
|
||||
maybe_coro = callbacks[component.component_name](component)
|
||||
if asyncio.iscoroutine(maybe_coro):
|
||||
asyncio.create_task(maybe_coro)
|
||||
await asyncio.create_task(maybe_coro)
|
||||
|
||||
stages = self.sort_components()
|
||||
for stage in stages:
|
||||
|
@ -136,12 +135,11 @@ class ComponentManager:
|
|||
]
|
||||
if needing_start:
|
||||
await asyncio.wait(needing_start)
|
||||
self.started.set()
|
||||
|
||||
async def stop(self):
|
||||
"""
|
||||
Stop Components in reversed startup order
|
||||
|
||||
:return: (defer.Deferred)
|
||||
"""
|
||||
stages = self.sort_components(reverse=True)
|
||||
for stage in stages:
|
||||
|
@ -149,7 +147,7 @@ class ComponentManager:
|
|||
component._stop() for component in stage if component.running
|
||||
]
|
||||
if needing_stop:
|
||||
await asyncio.wait(needing_stop)
|
||||
await asyncio.wait(needing_stop, loop=self.loop)
|
||||
|
||||
def all_components_running(self, *component_names):
|
||||
"""
|
||||
|
|
|
@ -2,39 +2,30 @@ import os
|
|||
import asyncio
|
||||
import aiohttp
|
||||
import logging
|
||||
import treq
|
||||
import math
|
||||
import binascii
|
||||
import typing
|
||||
import socket
|
||||
from hashlib import sha256
|
||||
from types import SimpleNamespace
|
||||
from twisted.internet import defer, reactor, error
|
||||
|
||||
from aioupnp import __version__ as aioupnp_version
|
||||
from aioupnp.upnp import UPnP
|
||||
from aioupnp.fault import UPnPError
|
||||
|
||||
import lbrynet.schema
|
||||
|
||||
from lbrynet.conf import HEADERS_FILE_SHA256_CHECKSUM
|
||||
from lbrynet.extras.compat import d2f
|
||||
from lbrynet.blob.EncryptedFileManager import EncryptedFileManager
|
||||
from lbrynet.blob.client.EncryptedFileDownloader import EncryptedFileSaverFactory
|
||||
from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
|
||||
from lbrynet.dht.node import Node
|
||||
from lbrynet.dht.peer import KademliaPeer
|
||||
from lbrynet.dht.blob_announcer import BlobAnnouncer
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.blob_exchange.server import BlobServer
|
||||
from lbrynet.stream.stream_manager import StreamManager
|
||||
from lbrynet.extras.daemon.Component import Component
|
||||
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.extras.daemon.HashAnnouncer import DHTHashAnnouncer
|
||||
from lbrynet.extras.reflector.server.server import ReflectorServerFactory
|
||||
from lbrynet.extras.wallet import LbryWalletManager
|
||||
from lbrynet.extras.wallet import Network
|
||||
from lbrynet.utils import generate_id
|
||||
from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager
|
||||
from lbrynet.p2p.RateLimiter import RateLimiter
|
||||
from lbrynet.p2p.BlobManager import DiskBlobManager
|
||||
from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType
|
||||
from lbrynet.p2p.server.BlobRequestHandler import BlobRequestHandlerFactory
|
||||
from lbrynet.p2p.server.ServerProtocol import ServerProtocolFactory
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -47,13 +38,10 @@ HEADERS_COMPONENT = "blockchain_headers"
|
|||
WALLET_COMPONENT = "wallet"
|
||||
DHT_COMPONENT = "dht"
|
||||
HASH_ANNOUNCER_COMPONENT = "hash_announcer"
|
||||
FILE_MANAGER_COMPONENT = "file_manager"
|
||||
STREAM_MANAGER_COMPONENT = "stream_manager"
|
||||
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
|
||||
REFLECTOR_COMPONENT = "reflector"
|
||||
UPNP_COMPONENT = "upnp"
|
||||
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
|
||||
RATE_LIMITER_COMPONENT = "rate_limiter"
|
||||
PAYMENT_RATE_COMPONENT = "payment_rate_manager"
|
||||
|
||||
|
||||
async def gather_dict(tasks: dict):
|
||||
|
@ -75,6 +63,14 @@ async def get_external_ip(): # used if upnp is disabled or non-functioning
|
|||
pass
|
||||
|
||||
|
||||
async def resolve_host(loop: asyncio.BaseEventLoop, url: str):
|
||||
info = await loop.getaddrinfo(
|
||||
url, 'https',
|
||||
proto=socket.IPPROTO_TCP,
|
||||
)
|
||||
return info[0][4][0]
|
||||
|
||||
|
||||
class DatabaseComponent(Component):
|
||||
component_name = DATABASE_COMPONENT
|
||||
|
||||
|
@ -158,39 +154,41 @@ class HeadersComponent(Component):
|
|||
'download_progress': self._headers_progress_percent
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def fetch_headers_from_s3(self):
|
||||
def collector(data, h_file):
|
||||
h_file.write(data)
|
||||
async def fetch_headers_from_s3(self):
|
||||
def collector(d, h_file):
|
||||
h_file.write(d)
|
||||
local_size = float(h_file.tell())
|
||||
final_size = float(final_size_after_download)
|
||||
self._headers_progress_percent = math.ceil(local_size / final_size * 100)
|
||||
|
||||
local_header_size = self.local_header_file_size()
|
||||
resume_header = {"Range": f"bytes={local_header_size}-"}
|
||||
response = yield treq.get(HEADERS_URL, headers=resume_header)
|
||||
got_406 = response.code == 406 # our file is bigger
|
||||
final_size_after_download = response.length + local_header_size
|
||||
if got_406:
|
||||
log.warning("s3 is more out of date than we are")
|
||||
# should have something to download and a final length divisible by the header size
|
||||
elif final_size_after_download and not final_size_after_download % HEADER_SIZE:
|
||||
s3_height = (final_size_after_download / HEADER_SIZE) - 1
|
||||
local_height = self.local_header_file_height()
|
||||
if s3_height > local_height:
|
||||
if local_header_size:
|
||||
log.info("Resuming download of %i bytes from s3", response.length)
|
||||
with open(self.headers_file, "a+b") as headers_file:
|
||||
yield treq.collect(response, lambda d: collector(d, headers_file))
|
||||
else:
|
||||
with open(self.headers_file, "wb") as headers_file:
|
||||
yield treq.collect(response, lambda d: collector(d, headers_file))
|
||||
log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height)
|
||||
self._check_header_file_integrity()
|
||||
else:
|
||||
async with aiohttp.request('get', HEADERS_URL, headers=resume_header) as response:
|
||||
got_406 = response.status == 406 # our file is bigger
|
||||
final_size_after_download = response.content_length + local_header_size
|
||||
if got_406:
|
||||
log.warning("s3 is more out of date than we are")
|
||||
else:
|
||||
log.error("invalid size for headers from s3")
|
||||
# should have something to download and a final length divisible by the header size
|
||||
elif final_size_after_download and not final_size_after_download % HEADER_SIZE:
|
||||
s3_height = (final_size_after_download / HEADER_SIZE) - 1
|
||||
local_height = self.local_header_file_height()
|
||||
if s3_height > local_height:
|
||||
data = await response.read()
|
||||
|
||||
if local_header_size:
|
||||
log.info("Resuming download of %i bytes from s3", response.content_length)
|
||||
with open(self.headers_file, "a+b") as headers_file:
|
||||
collector(data, headers_file)
|
||||
else:
|
||||
with open(self.headers_file, "wb") as headers_file:
|
||||
collector(data, headers_file)
|
||||
log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.",
|
||||
s3_height)
|
||||
self._check_header_file_integrity()
|
||||
else:
|
||||
log.warning("s3 is more out of date than we are")
|
||||
else:
|
||||
log.error("invalid size for headers from s3")
|
||||
|
||||
def local_header_file_height(self):
|
||||
return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0)
|
||||
|
@ -259,7 +257,7 @@ class HeadersComponent(Component):
|
|||
self._downloading_headers = await self.should_download_headers_from_s3()
|
||||
if self._downloading_headers:
|
||||
try:
|
||||
await d2f(self.fetch_headers_from_s3())
|
||||
await self.fetch_headers_from_s3()
|
||||
except Exception as err:
|
||||
log.error("failed to fetch headers from s3: %s", err)
|
||||
finally:
|
||||
|
@ -313,29 +311,32 @@ class BlobComponent(Component):
|
|||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.blob_manager = None
|
||||
self.blob_manager: BlobFileManager = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
def component(self) -> typing.Optional[BlobFileManager]:
|
||||
return self.blob_manager
|
||||
|
||||
def start(self):
|
||||
async def start(self):
|
||||
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||
datastore = None
|
||||
data_store = None
|
||||
if DHT_COMPONENT not in self.component_manager.skip_components:
|
||||
dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
||||
dht_node: Node = self.component_manager.get_component(DHT_COMPONENT)
|
||||
if dht_node:
|
||||
datastore = dht_node._dataStore
|
||||
self.blob_manager = DiskBlobManager(os.path.join(self.conf.data_dir, "blobfiles"), storage, datastore)
|
||||
return self.blob_manager.setup()
|
||||
data_store = dht_node.protocol.data_store
|
||||
self.blob_manager = BlobFileManager(asyncio.get_event_loop(), os.path.join(self.conf.data_dir, "blobfiles"),
|
||||
storage, data_store)
|
||||
return await self.blob_manager.setup()
|
||||
|
||||
def stop(self):
|
||||
return self.blob_manager.stop()
|
||||
async def stop(self):
|
||||
while self.blob_manager and self.blob_manager.blobs:
|
||||
_, blob = self.blob_manager.blobs.popitem()
|
||||
await blob.close()
|
||||
|
||||
async def get_status(self):
|
||||
count = 0
|
||||
if self.blob_manager:
|
||||
count = await self.blob_manager.storage.count_finished_blobs()
|
||||
count = len(self.blob_manager.completed_blob_hashes)
|
||||
return {'finished_blobs': count}
|
||||
|
||||
|
||||
|
@ -345,28 +346,27 @@ class DHTComponent(Component):
|
|||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.dht_node = None
|
||||
self.dht_node: Node = None
|
||||
self.upnp_component = None
|
||||
self.external_udp_port = None
|
||||
self.external_peer_port = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
def component(self) -> typing.Optional[Node]:
|
||||
return self.dht_node
|
||||
|
||||
async def get_status(self):
|
||||
return {
|
||||
'node_id': binascii.hexlify(self.component_manager.daemon.node_id),
|
||||
'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.contacts)
|
||||
'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.protocol.routing_table.get_peers())
|
||||
}
|
||||
|
||||
async def start(self):
|
||||
log.info("start the dht")
|
||||
self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
|
||||
self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.peer_port)
|
||||
self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.dht_node_port)
|
||||
node_id = self.component_manager.daemon.node_id
|
||||
if node_id is None:
|
||||
node_id = generate_id()
|
||||
external_ip = self.upnp_component.external_ip
|
||||
if not external_ip:
|
||||
log.warning("UPnP component failed to get external ip")
|
||||
|
@ -375,18 +375,21 @@ class DHTComponent(Component):
|
|||
log.warning("failed to get external ip")
|
||||
|
||||
self.dht_node = Node(
|
||||
asyncio.get_event_loop(),
|
||||
self.component_manager.peer_manager,
|
||||
node_id=node_id,
|
||||
udpPort=self.conf.dht_node_port,
|
||||
externalUDPPort=self.external_udp_port,
|
||||
externalIP=external_ip,
|
||||
peerPort=self.external_peer_port
|
||||
internal_udp_port=self.conf.dht_node_port,
|
||||
udp_port=self.external_udp_port,
|
||||
external_ip=external_ip,
|
||||
peer_port=self.external_peer_port
|
||||
)
|
||||
self.dht_node.start(
|
||||
interface='0.0.0.0', known_node_urls=self.conf.known_dht_nodes
|
||||
)
|
||||
|
||||
await d2f(self.dht_node.start(self.conf.known_dht_nodes, block_on_join=False))
|
||||
log.info("Started the dht")
|
||||
|
||||
def stop(self):
|
||||
return d2f(self.dht_node.stop())
|
||||
async def stop(self):
|
||||
self.dht_node.stop()
|
||||
|
||||
|
||||
class HashAnnouncerComponent(Component):
|
||||
|
@ -395,195 +398,96 @@ class HashAnnouncerComponent(Component):
|
|||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.hash_announcer = None
|
||||
self.hash_announcer: BlobAnnouncer = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
def component(self) -> typing.Optional[BlobAnnouncer]:
|
||||
return self.hash_announcer
|
||||
|
||||
async def start(self):
|
||||
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||
dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
||||
self.hash_announcer = DHTHashAnnouncer(self.conf, dht_node, storage)
|
||||
self.hash_announcer.start()
|
||||
self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage)
|
||||
self.hash_announcer.start(self.conf.concurrent_announcers)
|
||||
log.info("Started blob announcer")
|
||||
|
||||
def stop(self):
|
||||
async def stop(self):
|
||||
self.hash_announcer.stop()
|
||||
log.info("Stopped blob announcer")
|
||||
|
||||
async def get_status(self):
|
||||
return {
|
||||
'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.hash_queue)
|
||||
'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.announce_queue)
|
||||
}
|
||||
|
||||
|
||||
class RateLimiterComponent(Component):
|
||||
component_name = RATE_LIMITER_COMPONENT
|
||||
class StreamManagerComponent(Component):
|
||||
component_name = STREAM_MANAGER_COMPONENT
|
||||
depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT]
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.rate_limiter = RateLimiter()
|
||||
self.stream_manager: StreamManager = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return self.rate_limiter
|
||||
|
||||
async def start(self):
|
||||
self.rate_limiter.start()
|
||||
|
||||
async def stop(self):
|
||||
self.rate_limiter.stop()
|
||||
|
||||
|
||||
class PaymentRateComponent(Component):
|
||||
component_name = PAYMENT_RATE_COMPONENT
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.payment_rate_manager = OnlyFreePaymentsManager()
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return self.payment_rate_manager
|
||||
|
||||
async def start(self):
|
||||
pass
|
||||
|
||||
async def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
class FileManagerComponent(Component):
|
||||
component_name = FILE_MANAGER_COMPONENT
|
||||
depends_on = [RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT,
|
||||
PAYMENT_RATE_COMPONENT]
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.file_manager = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return self.file_manager
|
||||
def component(self) -> typing.Optional[StreamManager]:
|
||||
return self.stream_manager
|
||||
|
||||
async def get_status(self):
|
||||
if not self.file_manager:
|
||||
if not self.stream_manager:
|
||||
return
|
||||
return {
|
||||
'managed_files': len(self.file_manager.lbry_files)
|
||||
'managed_files': len(self.stream_manager.streams)
|
||||
}
|
||||
|
||||
def start(self):
|
||||
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
|
||||
async def start(self):
|
||||
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||
wallet = self.component_manager.get_component(WALLET_COMPONENT)
|
||||
node = self.component_manager.get_component(DHT_COMPONENT)
|
||||
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
add_lbry_file_to_sd_identifier(sd_identifier)
|
||||
file_saver_factory = EncryptedFileSaverFactory(
|
||||
self.conf,
|
||||
self.component_manager.peer_finder,
|
||||
rate_limiter,
|
||||
blob_manager,
|
||||
storage,
|
||||
wallet,
|
||||
self.conf.download_dir
|
||||
)
|
||||
sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
|
||||
|
||||
payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT)
|
||||
log.info('Starting the file manager')
|
||||
self.file_manager = EncryptedFileManager(
|
||||
self.conf, self.component_manager.peer_finder, rate_limiter,
|
||||
blob_manager, wallet, payment_rate_manager, storage, sd_identifier
|
||||
loop = asyncio.get_event_loop()
|
||||
self.stream_manager = StreamManager(
|
||||
loop, blob_manager, wallet, storage, node, self.conf.blob_download_timeout,
|
||||
self.conf.peer_connect_timeout, [
|
||||
KademliaPeer(loop, address=(await resolve_host(loop, url)), tcp_port=port + 1)
|
||||
for url, port in self.conf.reflector_servers
|
||||
]
|
||||
)
|
||||
return self.file_manager.setup()
|
||||
await self.stream_manager.start()
|
||||
log.info('Done setting up file manager')
|
||||
|
||||
def stop(self):
|
||||
return d2f(self.file_manager.stop())
|
||||
async def stop(self):
|
||||
await self.stream_manager.stop()
|
||||
|
||||
|
||||
class PeerProtocolServerComponent(Component):
|
||||
component_name = PEER_PROTOCOL_SERVER_COMPONENT
|
||||
depends_on = [UPNP_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT,
|
||||
PAYMENT_RATE_COMPONENT]
|
||||
depends_on = [UPNP_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT]
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.lbry_server_port = None
|
||||
self.blob_server: BlobServer = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return self.lbry_server_port
|
||||
def component(self) -> typing.Optional[BlobServer]:
|
||||
return self.blob_server
|
||||
|
||||
async def start(self):
|
||||
wallet = self.component_manager.get_component(WALLET_COMPONENT)
|
||||
log.info("start blob server")
|
||||
upnp = self.component_manager.get_component(UPNP_COMPONENT)
|
||||
peer_port = self.conf.peer_port
|
||||
query_handlers = {
|
||||
handler.get_primary_query_identifier(): handler for handler in [
|
||||
BlobRequestHandlerFactory(
|
||||
self.component_manager.get_component(BLOB_COMPONENT),
|
||||
wallet,
|
||||
self.component_manager.get_component(PAYMENT_RATE_COMPONENT),
|
||||
self.component_manager.analytics_manager
|
||||
),
|
||||
wallet.get_wallet_info_query_handler_factory(),
|
||||
]
|
||||
}
|
||||
server_factory = ServerProtocolFactory(
|
||||
self.component_manager.get_component(RATE_LIMITER_COMPONENT), query_handlers,
|
||||
self.component_manager.peer_manager
|
||||
)
|
||||
|
||||
try:
|
||||
log.info("Peer protocol listening on TCP %i (ext port %i)", peer_port,
|
||||
upnp.upnp_redirects.get("TCP", peer_port))
|
||||
self.lbry_server_port = reactor.listenTCP(peer_port, server_factory)
|
||||
except error.CannotListenError as e:
|
||||
import traceback
|
||||
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
|
||||
" more details.", peer_port)
|
||||
log.error("%s", traceback.format_exc())
|
||||
raise ValueError("%s lbrynet may already be running on your computer." % str(e))
|
||||
blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||
wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT)
|
||||
peer_port = upnp.upnp_redirects.get("TCP", self.conf.settings["peer_port"])
|
||||
address = await wallet.get_unused_address()
|
||||
self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address)
|
||||
self.blob_server.start_server(peer_port, interface='0.0.0.0')
|
||||
await self.blob_server.started_listening.wait()
|
||||
|
||||
async def stop(self):
|
||||
if self.lbry_server_port is not None:
|
||||
self.lbry_server_port, old_port = None, self.lbry_server_port
|
||||
log.info('Stop listening on port %s', old_port.port)
|
||||
await d2f(old_port.stopListening())
|
||||
|
||||
|
||||
class ReflectorComponent(Component):
|
||||
component_name = REFLECTOR_COMPONENT
|
||||
depends_on = [BLOB_COMPONENT, FILE_MANAGER_COMPONENT]
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.reflector_server_port = self.conf.reflector_port
|
||||
self.reflector_server = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return self.reflector_server
|
||||
|
||||
async def start(self):
|
||||
log.info("Starting reflector server")
|
||||
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||
file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
|
||||
reflector_factory = ReflectorServerFactory(self.component_manager.peer_manager, blob_manager, file_manager)
|
||||
try:
|
||||
self.reflector_server = await d2f(reactor.listenTCP(self.reflector_server_port, reflector_factory))
|
||||
log.info('Started reflector on port %s', self.reflector_server_port)
|
||||
except error.CannotListenError as e:
|
||||
log.exception("Couldn't bind reflector to port %d", self.reflector_server_port)
|
||||
raise ValueError(f"{e} lbrynet may already be running on your computer.")
|
||||
|
||||
async def stop(self):
|
||||
if self.reflector_server is not None:
|
||||
log.info("Stopping reflector server")
|
||||
self.reflector_server, p = None, self.reflector_server
|
||||
await d2f(p.stopListening())
|
||||
if self.blob_server:
|
||||
self.blob_server.stop_server()
|
||||
|
||||
|
||||
class UPnPComponent(Component):
|
||||
|
@ -600,7 +504,7 @@ class UPnPComponent(Component):
|
|||
self._maintain_redirects_task = None
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
def component(self) -> 'UPnPComponent':
|
||||
return self
|
||||
|
||||
async def _repeatedly_maintain_redirects(self, now=True):
|
||||
|
@ -696,7 +600,8 @@ class UPnPComponent(Component):
|
|||
log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
|
||||
else:
|
||||
log.error("failed to setup upnp")
|
||||
await self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
|
||||
if self.component_manager.analytics_manager:
|
||||
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
|
||||
self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False))
|
||||
|
||||
async def stop(self):
|
||||
|
@ -704,7 +609,7 @@ class UPnPComponent(Component):
|
|||
await asyncio.wait([
|
||||
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
|
||||
])
|
||||
if self._maintain_redirects_task is not None and not self._maintain_redirects_task.done():
|
||||
if self._maintain_redirects_task and not self._maintain_redirects_task.done():
|
||||
self._maintain_redirects_task.cancel()
|
||||
|
||||
async def get_status(self):
|
||||
|
@ -726,7 +631,7 @@ class ExchangeRateManagerComponent(Component):
|
|||
self.exchange_rate_manager = ExchangeRateManager()
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
def component(self) -> ExchangeRateManager:
|
||||
return self.exchange_rate_manager
|
||||
|
||||
async def start(self):
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -160,28 +160,6 @@ class Manager:
|
|||
async def _send_heartbeat(self):
|
||||
await self.track(self._event(HEARTBEAT))
|
||||
|
||||
async def _update_tracked_metrics(self):
|
||||
should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED)
|
||||
if should_send:
|
||||
await self.track(self._metric_event(BLOB_BYTES_UPLOADED, value))
|
||||
|
||||
def add_observation(self, metric, value):
|
||||
self._tracked_data[metric].append(value)
|
||||
|
||||
def summarize_and_reset(self, metric, op=sum):
|
||||
"""Apply `op` on the current values for `metric`.
|
||||
|
||||
This operation also resets the metric.
|
||||
|
||||
Returns:
|
||||
a tuple (should_send, value)
|
||||
"""
|
||||
try:
|
||||
values = self._tracked_data.pop(metric)
|
||||
return True, op(values)
|
||||
except KeyError:
|
||||
return False, None
|
||||
|
||||
def _event(self, event, event_properties=None):
|
||||
return {
|
||||
'userId': 'lbry',
|
||||
|
|
58
lbrynet/extras/daemon/client.py
Normal file
58
lbrynet/extras/daemon/client.py
Normal file
|
@ -0,0 +1,58 @@
|
|||
from lbrynet import conf
|
||||
import aiohttp
|
||||
import logging
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
USER_AGENT = "AuthServiceProxy/0.1"
|
||||
TWISTED_SECURE_SESSION = "TWISTED_SECURE_SESSION"
|
||||
TWISTED_SESSION = "TWISTED_SESSION"
|
||||
LBRY_SECRET = "LBRY_SECRET"
|
||||
HTTP_TIMEOUT = 30
|
||||
|
||||
|
||||
class JSONRPCException(Exception):
|
||||
def __init__(self, rpc_error):
|
||||
super().__init__()
|
||||
self.error = rpc_error
|
||||
|
||||
|
||||
class UnAuthAPIClient:
|
||||
def __init__(self, host, port, session):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.session = session
|
||||
|
||||
def __getattr__(self, method):
|
||||
async def f(*args, **kwargs):
|
||||
return await self.call(method, [args, kwargs])
|
||||
|
||||
return f
|
||||
|
||||
@classmethod
|
||||
async def from_url(cls, url):
|
||||
url_fragment = urlparse(url)
|
||||
host = url_fragment.hostname
|
||||
port = url_fragment.port
|
||||
connector = aiohttp.TCPConnector()
|
||||
session = aiohttp.ClientSession(connector=connector)
|
||||
return cls(host, port, session)
|
||||
|
||||
async def call(self, method, params=None):
|
||||
message = {'method': method, 'params': params}
|
||||
async with self.session.get(conf.settings.get_api_connection_string(), json=message) as resp:
|
||||
response_dict = await resp.json()
|
||||
if 'error' in response_dict:
|
||||
raise JSONRPCException(response_dict['error'])
|
||||
else:
|
||||
return response_dict['result']
|
||||
|
||||
|
||||
class LBRYAPIClient:
|
||||
@staticmethod
|
||||
def get_client(conf_path=None):
|
||||
conf.conf_file = conf_path
|
||||
if not conf.settings:
|
||||
conf.initialize_settings()
|
||||
return UnAuthAPIClient.from_url(conf.settings.get_api_connection_string())
|
|
@ -1,10 +1,9 @@
|
|||
import sqlite3
|
||||
import logging
|
||||
import os
|
||||
|
||||
from lbrynet.p2p.Error import InvalidStreamDescriptorError
|
||||
from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor
|
||||
from lbrynet.blob.CryptBlob import CryptBlobInfo
|
||||
import asyncio
|
||||
from lbrynet.blob.blob_info import BlobInfo
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -22,18 +21,13 @@ def do_migration(conf):
|
|||
"left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
|
||||
blobs_by_stream = {}
|
||||
for stream_hash, position, iv, blob_hash, blob_length in blobs:
|
||||
blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv))
|
||||
blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, blob_hash))
|
||||
|
||||
for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
|
||||
sd_info = format_sd_info(
|
||||
EncryptedFileStreamType, stream_name, stream_key,
|
||||
suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash])
|
||||
)
|
||||
try:
|
||||
validate_descriptor(sd_info)
|
||||
except InvalidStreamDescriptorError as err:
|
||||
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
||||
sd_hash, err.message)
|
||||
sd = StreamDescriptor(asyncio.get_event_loop(), blob_dir, stream_name, stream_key, suggested_filename,
|
||||
blobs_by_stream[stream_hash], stream_hash, sd_hash)
|
||||
if sd_hash != sd.calculate_sd_hash():
|
||||
log.warning("Stream for descriptor %s is invalid, cleaning it up", sd_hash)
|
||||
blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]]
|
||||
delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir)
|
||||
|
||||
|
|
115
lbrynet/utils.py
115
lbrynet/utils.py
|
@ -5,15 +5,14 @@ import random
|
|||
import socket
|
||||
import string
|
||||
import json
|
||||
import traceback
|
||||
import functools
|
||||
import typing
|
||||
import asyncio
|
||||
import logging
|
||||
import pkg_resources
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.internet import defer
|
||||
from lbrynet.schema.claim import ClaimDict
|
||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -43,21 +42,6 @@ def datetime_obj(*args, **kwargs):
|
|||
return datetime.datetime(*args, **kwargs)
|
||||
|
||||
|
||||
def call_later(delay, func, *args, **kwargs):
|
||||
# Import here to ensure that it gets called after installing a reactor
|
||||
# see: http://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html
|
||||
from twisted.internet import reactor
|
||||
return reactor.callLater(delay, func, *args, **kwargs)
|
||||
|
||||
|
||||
def safe_start_looping_call(looping_call, interval_sec):
|
||||
if not looping_call.running:
|
||||
looping_call.start(interval_sec)
|
||||
|
||||
def safe_stop_looping_call(looping_call):
|
||||
if looping_call.running:
|
||||
looping_call.stop()
|
||||
|
||||
def generate_id(num=None):
|
||||
h = get_lbry_hash_obj()
|
||||
if num is not None:
|
||||
|
@ -139,91 +123,16 @@ def json_dumps_pretty(obj, **kwargs):
|
|||
return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs)
|
||||
|
||||
|
||||
class DeferredLockContextManager:
|
||||
def __init__(self, lock):
|
||||
self._lock = lock
|
||||
|
||||
def __enter__(self):
|
||||
yield self._lock.acquire()
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
yield self._lock.release()
|
||||
def cancel_task(task: typing.Optional[asyncio.Task]):
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def DeferredDict(d, consumeErrors=False):
|
||||
keys = []
|
||||
dl = []
|
||||
response = {}
|
||||
for k, v in d.items():
|
||||
keys.append(k)
|
||||
dl.append(v)
|
||||
results = yield defer.DeferredList(dl, consumeErrors=consumeErrors)
|
||||
for k, (success, result) in zip(keys, results):
|
||||
if success:
|
||||
response[k] = result
|
||||
defer.returnValue(response)
|
||||
def cancel_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]):
|
||||
for task in tasks:
|
||||
cancel_task(task)
|
||||
|
||||
|
||||
class DeferredProfiler:
|
||||
def __init__(self):
|
||||
self.profile_results = {}
|
||||
|
||||
def add_result(self, fn, start_time, finished_time, stack, success):
|
||||
self.profile_results[fn].append((start_time, finished_time, stack, success))
|
||||
|
||||
def show_profile_results(self, fn):
|
||||
profile_results = list(self.profile_results[fn])
|
||||
call_counts = {
|
||||
caller: [(start, finished, finished - start, success)
|
||||
for (start, finished, _caller, success) in profile_results
|
||||
if _caller == caller]
|
||||
for caller in {result[2] for result in profile_results}
|
||||
}
|
||||
|
||||
log.info("called %s %i times from %i sources\n", fn.__name__, len(profile_results), len(call_counts))
|
||||
for caller in sorted(list(call_counts.keys()), key=lambda c: len(call_counts[c]), reverse=True):
|
||||
call_info = call_counts[caller]
|
||||
times = [r[2] for r in call_info]
|
||||
own_time = sum(times)
|
||||
times.sort()
|
||||
longest = 0 if not times else times[-1]
|
||||
shortest = 0 if not times else times[0]
|
||||
log.info(
|
||||
"%i successes and %i failures\nlongest %f, shortest %f, avg %f\ncaller:\n%s",
|
||||
len([r for r in call_info if r[3]]),
|
||||
len([r for r in call_info if not r[3]]),
|
||||
longest, shortest, own_time / float(len(call_info)), caller
|
||||
)
|
||||
|
||||
def profiled_deferred(self, reactor=None):
|
||||
if not reactor:
|
||||
from twisted.internet import reactor
|
||||
|
||||
def _cb(result, fn, start, caller_info):
|
||||
got_error = isinstance(result, (Failure, Exception))
|
||||
self.add_result(fn, start, reactor.seconds(), caller_info, not got_error)
|
||||
if got_error:
|
||||
raise result
|
||||
else:
|
||||
return result
|
||||
|
||||
def _profiled_deferred(fn):
|
||||
reactor.addSystemEventTrigger("after", "shutdown", self.show_profile_results, fn)
|
||||
self.profile_results[fn] = []
|
||||
|
||||
@functools.wraps(fn)
|
||||
def _wrapper(*args, **kwargs):
|
||||
caller_info = "".join(traceback.format_list(traceback.extract_stack()[-3:-1]))
|
||||
start = reactor.seconds()
|
||||
d = defer.maybeDeferred(fn, *args, **kwargs)
|
||||
d.addBoth(_cb, fn, start, caller_info)
|
||||
return d
|
||||
|
||||
return _wrapper
|
||||
|
||||
return _profiled_deferred
|
||||
|
||||
|
||||
_profiler = DeferredProfiler()
|
||||
profile_deferred = _profiler.profiled_deferred
|
||||
def drain_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]):
|
||||
while tasks:
|
||||
cancel_task(tasks.pop())
|
||||
|
|
Loading…
Reference in a new issue