lbry-sdk/lbry/extras/daemon/components.py

761 lines
30 KiB
Python
Raw Normal View History

import math
import os
2018-10-16 17:03:56 +02:00
import asyncio
import logging
2018-08-03 15:36:03 +02:00
import binascii
2019-01-22 23:44:17 +01:00
import typing
2021-10-18 09:59:55 +02:00
2019-01-23 16:41:34 +01:00
import base58
2018-11-09 20:02:03 +01:00
from aioupnp import __version__ as aioupnp_version
2018-10-17 16:57:10 +02:00
from aioupnp.upnp import UPnP
from aioupnp.fault import UPnPError
2018-11-09 20:02:03 +01:00
2019-06-21 02:55:47 +02:00
from lbry import utils
from lbry.dht.node import Node
2019-11-29 21:28:41 +01:00
from lbry.dht.peer import is_valid_public_ipv4
2019-06-21 02:55:47 +02:00
from lbry.dht.blob_announcer import BlobAnnouncer
from lbry.blob.blob_manager import BlobManager
2021-08-06 16:44:57 +02:00
from lbry.blob.disk_space_manager import DiskSpaceManager
2019-06-21 02:55:47 +02:00
from lbry.blob_exchange.server import BlobServer
from lbry.stream.background_downloader import BackgroundDownloader
2019-06-21 02:55:47 +02:00
from lbry.stream.stream_manager import StreamManager
2020-01-27 06:10:55 +01:00
from lbry.file.file_manager import FileManager
from lbry.extras.daemon.component import Component
2019-06-21 02:55:47 +02:00
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.extras.daemon.storage import SQLiteStorage
2020-02-05 16:29:26 +01:00
from lbry.torrent.torrent_manager import TorrentManager
2020-01-03 04:18:49 +01:00
from lbry.wallet import WalletManager
from lbry.wallet.usage_payment import WalletServerPayer
from lbry.torrent.tracker import TrackerClient
2020-01-24 14:58:01 +01:00
try:
from lbry.torrent.session import TorrentSession
except ImportError:
TorrentSession = None
log = logging.getLogger(__name__)
# settings must be initialized before this file is imported
DATABASE_COMPONENT = "database"
BLOB_COMPONENT = "blob_manager"
2018-07-24 18:26:29 +02:00
WALLET_COMPONENT = "wallet"
WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments"
2018-07-24 18:26:29 +02:00
DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer"
FILE_MANAGER_COMPONENT = "file_manager"
2021-08-06 16:44:57 +02:00
DISK_SPACE_COMPONENT = "disk_space"
BACKGROUND_DOWNLOADER_COMPONENT = "background_downloader"
2018-07-24 18:26:29 +02:00
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
2022-03-05 08:36:26 +01:00
TRACKER_ANNOUNCER_COMPONENT = "tracker_announcer_component"
2020-01-24 14:58:01 +01:00
LIBTORRENT_COMPONENT = "libtorrent_component"
2018-07-24 18:26:29 +02:00
class DatabaseComponent(Component):
component_name = DATABASE_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
2018-07-24 18:26:29 +02:00
self.storage = None
@property
def component(self):
return self.storage
@staticmethod
def get_current_db_revision():
2021-09-10 16:53:52 +02:00
return 15
2019-01-21 21:55:50 +01:00
@property
def revision_filename(self):
2019-01-23 16:41:34 +01:00
return os.path.join(self.conf.data_dir, 'db_revision')
2019-01-21 21:55:50 +01:00
def _write_db_revision_file(self, version_num):
with open(self.revision_filename, mode='w') as db_revision:
db_revision.write(str(version_num))
2018-12-15 21:31:02 +01:00
async def start(self):
# check directories exist, create them if they don't
log.info("Loading databases")
2018-07-24 18:26:29 +02:00
2019-01-21 21:55:50 +01:00
if not os.path.exists(self.revision_filename):
log.info("db_revision file not found. Creating it")
2018-07-24 18:26:29 +02:00
self._write_db_revision_file(self.get_current_db_revision())
# check the db migration and run any needed migrations
2019-01-21 21:55:50 +01:00
with open(self.revision_filename, "r") as revision_read_handle:
old_revision = int(revision_read_handle.read().strip())
2018-07-24 18:26:29 +02:00
if old_revision > self.get_current_db_revision():
raise Exception('This version of lbrynet is not compatible with the database\n'
'Your database is revision %i, expected %i' %
2018-07-24 18:26:29 +02:00
(old_revision, self.get_current_db_revision()))
if old_revision < self.get_current_db_revision():
2019-10-18 17:28:26 +02:00
from lbry.extras.daemon.migrator import dbmigrator # pylint: disable=import-outside-toplevel
2018-07-24 18:26:29 +02:00
log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision())
2018-12-15 21:31:02 +01:00
await asyncio.get_event_loop().run_in_executor(
None, dbmigrator.migrate_db, self.conf, old_revision, self.get_current_db_revision()
)
2018-07-24 18:26:29 +02:00
self._write_db_revision_file(self.get_current_db_revision())
log.info("Finished upgrading the databases.")
2018-12-15 21:31:02 +01:00
self.storage = SQLiteStorage(
2019-01-21 21:55:50 +01:00
self.conf, os.path.join(self.conf.data_dir, "lbrynet.sqlite")
2018-12-15 21:31:02 +01:00
)
await self.storage.open()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
await self.storage.close()
2018-07-24 18:26:29 +02:00
self.storage = None
class WalletComponent(Component):
component_name = WALLET_COMPONENT
2019-09-27 14:26:35 +02:00
depends_on = [DATABASE_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
self.wallet_manager = None
2018-07-24 18:26:29 +02:00
@property
def component(self):
return self.wallet_manager
async def get_status(self):
2019-11-26 17:01:11 +01:00
if self.wallet_manager is None:
return
is_connected = self.wallet_manager.ledger.network.is_connected
sessions = []
connected = None
if is_connected:
addr, port = self.wallet_manager.ledger.network.client.server
connected = f"{addr}:{port}"
sessions.append(self.wallet_manager.ledger.network.client)
2019-11-26 17:01:11 +01:00
result = {
'connected': connected,
'connected_features': self.wallet_manager.ledger.network.server_features,
2019-11-26 17:01:11 +01:00
'servers': [
{
'host': session.server[0],
'port': session.server[1],
'latency': session.connection_latency,
'availability': session.available,
} for session in sessions
],
'known_servers': len(self.wallet_manager.ledger.network.known_hubs),
'available_servers': 1 if is_connected else 0
2019-11-26 17:01:11 +01:00
}
if self.wallet_manager.ledger.network.remote_height:
local_height = self.wallet_manager.ledger.local_height_including_downloaded_height
2019-12-09 10:43:07 +01:00
disk_height = len(self.wallet_manager.ledger.headers)
2019-11-26 17:01:11 +01:00
remote_height = self.wallet_manager.ledger.network.remote_height
2020-01-12 04:56:43 +01:00
download_height, target_height = local_height - disk_height, remote_height - disk_height
if target_height > 0:
progress = min(max(math.ceil(float(download_height) / float(target_height) * 100), 0), 100)
2019-12-18 16:44:16 +01:00
else:
2020-01-12 04:56:43 +01:00
progress = 100
2020-03-21 08:32:03 +01:00
best_hash = await self.wallet_manager.get_best_blockhash()
2019-11-26 17:01:11 +01:00
result.update({
'headers_synchronization_progress': progress,
'blocks': max(local_height, 0),
'blocks_behind': max(remote_height - local_height, 0),
'best_blockhash': best_hash,
})
return result
2018-12-15 21:31:02 +01:00
async def start(self):
log.info("Starting wallet")
self.wallet_manager = await WalletManager.from_lbrynet_config(self.conf)
2018-12-15 21:31:02 +01:00
await self.wallet_manager.start()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
await self.wallet_manager.stop()
self.wallet_manager = None
2018-07-24 18:26:29 +02:00
class WalletServerPaymentsComponent(Component):
component_name = WALLET_SERVER_PAYMENTS_COMPONENT
depends_on = [WALLET_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
self.usage_payment_service = WalletServerPayer(
max_fee=self.conf.max_wallet_server_fee, analytics_manager=self.component_manager.analytics_manager,
)
@property
def component(self) -> typing.Optional[WalletServerPayer]:
return self.usage_payment_service
async def start(self):
wallet_manager = self.component_manager.get_component(WALLET_COMPONENT)
await self.usage_payment_service.start(wallet_manager.ledger, wallet_manager.default_wallet)
async def stop(self):
await self.usage_payment_service.stop()
async def get_status(self):
return {
'max_fee': self.usage_payment_service.max_fee,
'running': self.usage_payment_service.running
}
class BlobComponent(Component):
component_name = BLOB_COMPONENT
depends_on = [DATABASE_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2019-06-03 05:50:17 +02:00
self.blob_manager: typing.Optional[BlobManager] = None
2018-07-24 18:26:29 +02:00
@property
2019-03-28 19:51:55 +01:00
def component(self) -> typing.Optional[BlobManager]:
return self.blob_manager
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def start(self):
storage = self.component_manager.get_component(DATABASE_COMPONENT)
2019-01-22 23:44:17 +01:00
data_store = None
if DHT_COMPONENT not in self.component_manager.skip_components:
2019-01-22 23:44:17 +01:00
dht_node: Node = self.component_manager.get_component(DHT_COMPONENT)
if dht_node:
2019-01-22 23:44:17 +01:00
data_store = dht_node.protocol.data_store
2019-03-28 19:51:55 +01:00
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
if not os.path.isdir(blob_dir):
os.mkdir(blob_dir)
2019-06-03 05:50:17 +02:00
self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store)
2019-01-22 23:44:17 +01:00
return await self.blob_manager.setup()
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
self.blob_manager.stop()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def get_status(self):
2018-08-03 15:36:03 +02:00
count = 0
if self.blob_manager:
2019-01-22 23:44:17 +01:00
count = len(self.blob_manager.completed_blob_hashes)
2019-06-03 05:50:17 +02:00
return {
'finished_blobs': count,
'connections': {} if not self.blob_manager else self.blob_manager.connection_manager.status
2019-06-03 05:50:17 +02:00
}
2018-08-02 23:33:56 +02:00
2018-07-24 18:26:29 +02:00
class DHTComponent(Component):
component_name = DHT_COMPONENT
2019-10-31 13:57:30 +01:00
depends_on = [UPNP_COMPONENT, DATABASE_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2019-06-23 23:54:04 +02:00
self.dht_node: typing.Optional[Node] = None
self.external_udp_port = None
self.external_peer_port = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[Node]:
2018-07-24 18:26:29 +02:00
return self.dht_node
async def get_status(self):
return {
2019-02-02 04:12:07 +01:00
'node_id': None if not self.dht_node else binascii.hexlify(self.dht_node.protocol.node_id),
2019-01-22 23:44:17 +01:00
'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.protocol.routing_table.get_peers())
}
2019-01-23 16:41:34 +01:00
def get_node_id(self):
node_id_filename = os.path.join(self.conf.data_dir, "node_id")
if os.path.isfile(node_id_filename):
with open(node_id_filename, "r") as node_id_file:
return base58.b58decode(str(node_id_file.read()).strip())
node_id = utils.generate_id()
with open(node_id_filename, "w") as node_id_file:
node_id_file.write(base58.b58encode(node_id).decode())
return node_id
2018-12-15 21:31:02 +01:00
async def start(self):
2019-01-22 23:44:17 +01:00
log.info("start the dht")
2019-06-23 23:54:04 +02:00
upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
self.external_peer_port = upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port)
self.external_udp_port = upnp_component.upnp_redirects.get("UDP", self.conf.udp_port)
external_ip = upnp_component.external_ip
2019-10-31 13:57:30 +01:00
storage = self.component_manager.get_component(DATABASE_COMPONENT)
if not external_ip:
external_ip, _ = await utils.get_external_ip(self.conf.lbryum_servers)
if not external_ip:
log.warning("failed to get external ip")
2018-07-24 18:26:29 +02:00
2018-11-07 21:15:05 +01:00
self.dht_node = Node(
2019-06-23 23:54:04 +02:00
self.component_manager.loop,
2019-01-22 23:44:17 +01:00
self.component_manager.peer_manager,
2019-01-23 16:41:34 +01:00
node_id=self.get_node_id(),
2019-01-28 15:51:02 +01:00
internal_udp_port=self.conf.udp_port,
2019-01-22 23:44:17 +01:00
udp_port=self.external_udp_port,
external_ip=external_ip,
2019-01-28 15:51:02 +01:00
peer_port=self.external_peer_port,
rpc_timeout=self.conf.node_rpc_timeout,
2019-10-31 13:57:30 +01:00
split_buckets_under_index=self.conf.split_buckets_under_index,
2022-07-13 04:26:36 +02:00
is_bootstrap_node=self.conf.is_bootstrap_node,
2019-10-31 13:57:30 +01:00
storage=storage
2018-07-24 18:26:29 +02:00
)
2019-10-31 13:57:30 +01:00
self.dht_node.start(self.conf.network_interface, self.conf.known_dht_nodes)
log.info("Started the dht")
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
self.dht_node.stop()
2018-07-24 18:26:29 +02:00
class HashAnnouncerComponent(Component):
component_name = HASH_ANNOUNCER_COMPONENT
depends_on = [DHT_COMPONENT, DATABASE_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
2019-06-23 23:54:04 +02:00
self.hash_announcer: typing.Optional[BlobAnnouncer] = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[BlobAnnouncer]:
2018-07-24 18:26:29 +02:00
return self.hash_announcer
async def start(self):
2018-07-24 18:26:29 +02:00
storage = self.component_manager.get_component(DATABASE_COMPONENT)
dht_node = self.component_manager.get_component(DHT_COMPONENT)
2019-06-23 23:54:04 +02:00
self.hash_announcer = BlobAnnouncer(self.component_manager.loop, dht_node, storage)
2019-01-28 15:51:02 +01:00
self.hash_announcer.start(self.conf.concurrent_blob_announcers)
2019-01-22 23:44:17 +01:00
log.info("Started blob announcer")
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
2018-12-15 21:31:02 +01:00
self.hash_announcer.stop()
2019-01-22 23:44:17 +01:00
log.info("Stopped blob announcer")
2018-07-24 18:26:29 +02:00
async def get_status(self):
2018-08-02 23:33:56 +02:00
return {
2019-01-22 23:44:17 +01:00
'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.announce_queue)
2018-08-02 23:33:56 +02:00
}
2018-07-24 18:26:29 +02:00
class FileManagerComponent(Component):
component_name = FILE_MANAGER_COMPONENT
2021-01-17 21:53:13 +01:00
depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2020-01-27 06:10:55 +01:00
self.file_manager: typing.Optional[FileManager] = None
2018-07-24 18:26:29 +02:00
@property
2020-01-27 06:10:55 +01:00
def component(self) -> typing.Optional[FileManager]:
return self.file_manager
2018-07-24 18:26:29 +02:00
async def get_status(self):
2020-01-27 06:10:55 +01:00
if not self.file_manager:
return
return {
2020-01-29 02:37:52 +01:00
'managed_files': len(self.file_manager.get_filtered()),
}
2019-01-22 23:44:17 +01:00
async def start(self):
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT)
2019-11-26 17:01:11 +01:00
node = self.component_manager.get_component(DHT_COMPONENT) \
2019-07-13 00:54:04 +02:00
if self.component_manager.has_component(DHT_COMPONENT) else None
2021-01-17 21:53:13 +01:00
try:
torrent = self.component_manager.get_component(LIBTORRENT_COMPONENT) if TorrentSession else None
except NameError:
torrent = None
2018-07-24 18:26:29 +02:00
log.info('Starting the file manager')
2019-01-22 23:44:17 +01:00
loop = asyncio.get_event_loop()
2020-01-27 06:10:55 +01:00
self.file_manager = FileManager(
loop, self.conf, wallet, storage, self.component_manager.analytics_manager
)
self.file_manager.source_managers['stream'] = StreamManager(
2020-02-05 16:29:26 +01:00
loop, self.conf, blob_manager, wallet, storage, node,
2019-01-21 21:55:50 +01:00
)
2021-01-17 21:53:13 +01:00
if TorrentSession and LIBTORRENT_COMPONENT not in self.conf.components_to_skip:
2020-02-05 16:29:26 +01:00
self.file_manager.source_managers['torrent'] = TorrentManager(
loop, self.conf, torrent, storage, self.component_manager.analytics_manager
)
2020-01-27 06:10:55 +01:00
await self.file_manager.start()
2019-01-22 23:44:17 +01:00
log.info('Done setting up file manager')
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
2020-01-27 06:10:55 +01:00
self.file_manager.stop()
2018-07-24 18:26:29 +02:00
class BackgroundDownloaderComponent(Component):
MIN_PREFIX_COLLIDING_BITS = 8
component_name = BACKGROUND_DOWNLOADER_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
self.background_task: typing.Optional[asyncio.Task] = None
self.download_loop_delay_seconds = 60
2021-10-20 03:06:41 +02:00
self.ongoing_download: typing.Optional[asyncio.Task] = None
self.space_manager: typing.Optional[DiskSpaceManager] = None
self.blob_manager: typing.Optional[BlobManager] = None
self.background_downloader: typing.Optional[BackgroundDownloader] = None
self.dht_node: typing.Optional[Node] = None
self.space_available: typing.Optional[int] = None
@property
def is_busy(self):
return bool(self.ongoing_download and not self.ongoing_download.done())
@property
def component(self) -> 'BackgroundDownloaderComponent':
return self
async def get_status(self):
return {'running': self.background_task is not None and not self.background_task.done(),
'available_free_space_mb': self.space_available,
'ongoing_download': self.is_busy}
async def download_blobs_in_background(self):
while True:
self.space_available = await self.space_manager.get_free_space_mb(True)
if not self.is_busy and self.space_available > 10:
self._download_next_close_blob_hash()
await asyncio.sleep(self.download_loop_delay_seconds)
def _download_next_close_blob_hash(self):
2022-02-22 19:51:36 +01:00
node_id = self.dht_node.protocol.node_id
for blob_hash in self.dht_node.stored_blob_hashes:
if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
continue
if utils.get_colliding_prefix_bits(node_id, blob_hash) >= self.MIN_PREFIX_COLLIDING_BITS:
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
2022-02-22 19:54:39 +01:00
return
async def start(self):
self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
if not self.component_manager.has_component(DHT_COMPONENT):
return
self.dht_node = self.component_manager.get_component(DHT_COMPONENT)
self.blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT)
self.background_downloader = BackgroundDownloader(self.conf, storage, self.blob_manager, self.dht_node)
self.background_task = asyncio.create_task(self.download_blobs_in_background())
async def stop(self):
2021-10-20 03:06:41 +02:00
if self.ongoing_download and not self.ongoing_download.done():
self.ongoing_download.cancel()
if self.background_task:
self.background_task.cancel()
2021-08-06 16:44:57 +02:00
class DiskSpaceComponent(Component):
component_name = DISK_SPACE_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
2021-08-06 16:44:57 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
self.disk_space_manager: typing.Optional[DiskSpaceManager] = None
2021-08-06 16:44:57 +02:00
@property
def component(self) -> typing.Optional[DiskSpaceManager]:
return self.disk_space_manager
async def get_status(self):
if self.disk_space_manager:
space_used = await self.disk_space_manager.get_space_used_mb(cached=True)
return {
'total_used_mb': space_used['total'],
'published_blobs_storage_used_mb': space_used['private_storage'],
'content_blobs_storage_used_mb': space_used['content_storage'],
'seed_blobs_storage_used_mb': space_used['network_storage'],
'running': self.disk_space_manager.running,
}
return {'space_used': '0', 'network_seeding_space_used': '0', 'running': False}
2021-08-06 16:44:57 +02:00
async def start(self):
db = self.component_manager.get_component(DATABASE_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
2021-10-01 03:38:02 +02:00
self.disk_space_manager = DiskSpaceManager(
self.conf, db, blob_manager,
analytics=self.component_manager.analytics_manager
)
2021-08-16 20:15:12 +02:00
await self.disk_space_manager.start()
2021-08-06 16:44:57 +02:00
async def stop(self):
2021-08-16 20:15:12 +02:00
await self.disk_space_manager.stop()
2021-08-06 16:44:57 +02:00
2020-01-24 14:58:01 +01:00
class TorrentComponent(Component):
component_name = LIBTORRENT_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
self.torrent_session = None
@property
2020-01-27 06:10:55 +01:00
def component(self) -> typing.Optional[TorrentSession]:
2020-01-24 14:58:01 +01:00
return self.torrent_session
async def get_status(self):
if not self.torrent_session:
return
return {
'running': True, # TODO: what to return here?
}
async def start(self):
2020-01-29 02:37:52 +01:00
if TorrentSession:
2020-01-24 14:58:01 +01:00
self.torrent_session = TorrentSession(asyncio.get_event_loop(), None)
await self.torrent_session.bind() # TODO: specify host/port
async def stop(self):
if self.torrent_session:
await self.torrent_session.pause()
2018-07-24 18:26:29 +02:00
class PeerProtocolServerComponent(Component):
component_name = PEER_PROTOCOL_SERVER_COMPONENT
2019-01-22 23:44:17 +01:00
depends_on = [UPNP_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2019-06-03 05:50:17 +02:00
self.blob_server: typing.Optional[BlobServer] = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[BlobServer]:
return self.blob_server
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def start(self):
2019-01-22 23:44:17 +01:00
log.info("start blob server")
2019-03-28 19:51:55 +01:00
blob_manager: BlobManager = self.component_manager.get_component(BLOB_COMPONENT)
wallet: WalletManager = self.component_manager.get_component(WALLET_COMPONENT)
peer_port = self.conf.tcp_port
2019-01-22 23:44:17 +01:00
address = await wallet.get_unused_address()
self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address)
2019-01-28 15:51:02 +01:00
self.blob_server.start_server(peer_port, interface=self.conf.network_interface)
2019-01-22 23:44:17 +01:00
await self.blob_server.started_listening.wait()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
2019-01-22 23:44:17 +01:00
if self.blob_server:
self.blob_server.stop_server()
2018-07-24 18:26:29 +02:00
class UPnPComponent(Component):
component_name = UPNP_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-28 15:51:02 +01:00
self._int_peer_port = self.conf.tcp_port
self._int_dht_node_port = self.conf.udp_port
2019-01-21 21:55:50 +01:00
self.use_upnp = self.conf.use_upnp
2019-06-23 23:54:04 +02:00
self.upnp: typing.Optional[UPnP] = None
2018-07-30 23:58:17 +02:00
self.upnp_redirects = {}
2019-06-23 23:54:04 +02:00
self.external_ip: typing.Optional[str] = None
self._maintain_redirects_task = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> 'UPnPComponent':
2018-07-24 18:26:29 +02:00
return self
async def _repeatedly_maintain_redirects(self, now=True):
while True:
if now:
await self._maintain_redirects()
await asyncio.sleep(360, loop=self.component_manager.loop)
2018-12-15 21:31:02 +01:00
async def _maintain_redirects(self):
# setup the gateway if necessary
if not self.upnp:
try:
2019-06-23 23:54:04 +02:00
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err:
2020-01-10 18:27:56 +01:00
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
2019-06-23 23:54:04 +02:00
raise
log.warning("upnp discovery failed: %s", err)
self.upnp = None
# update the external ip
external_ip = None
if self.upnp:
try:
2018-12-15 21:31:02 +01:00
external_ip = await self.upnp.get_external_ip()
2018-11-12 20:47:11 +01:00
if external_ip != "0.0.0.0" and not self.external_ip:
log.info("got external ip from UPnP: %s", external_ip)
2019-06-23 23:54:04 +02:00
except (asyncio.TimeoutError, UPnPError, NotImplementedError):
pass
2019-11-29 21:28:41 +01:00
if external_ip and not is_valid_public_ipv4(external_ip):
log.warning("UPnP returned a private/reserved ip - %s, checking lbry.com fallback", external_ip)
external_ip, _ = await utils.get_external_ip(self.conf.lbryum_servers)
if self.external_ip and self.external_ip != external_ip:
log.info("external ip changed from %s to %s", self.external_ip, external_ip)
2019-06-24 21:39:31 +02:00
if external_ip:
2019-06-23 23:54:04 +02:00
self.external_ip = external_ip
dht_component = self.component_manager.get_component(DHT_COMPONENT)
if dht_component:
dht_node = dht_component.component
dht_node.protocol.external_ip = external_ip
2019-06-23 23:54:04 +02:00
# assert self.external_ip is not None # TODO: handle going/starting offline
if not self.upnp_redirects and self.upnp: # setup missing redirects
2019-06-23 23:54:04 +02:00
log.info("add UPnP port mappings")
upnp_redirects = {}
if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
try:
upnp_redirects["TCP"] = await self.upnp.get_next_mapping(
self._int_peer_port, "TCP", "LBRY peer port", self._int_peer_port
)
except (UPnPError, asyncio.TimeoutError, NotImplementedError):
pass
if DHT_COMPONENT not in self.component_manager.skip_components:
try:
upnp_redirects["UDP"] = await self.upnp.get_next_mapping(
self._int_dht_node_port, "UDP", "LBRY DHT port", self._int_dht_node_port
)
except (UPnPError, asyncio.TimeoutError, NotImplementedError):
pass
if upnp_redirects:
log.info("set up redirects: %s", upnp_redirects)
self.upnp_redirects.update(upnp_redirects)
elif self.upnp: # check existing redirects are still active
found = set()
2018-12-15 21:31:02 +01:00
mappings = await self.upnp.get_redirects()
for mapping in mappings:
2019-06-23 23:54:04 +02:00
proto = mapping.protocol
if proto in self.upnp_redirects and mapping.external_port == self.upnp_redirects[proto]:
if mapping.lan_address == self.upnp.lan_address:
found.add(proto)
if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components:
try:
2018-12-15 21:31:02 +01:00
udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
self.upnp_redirects['UDP'] = udp_port
log.info("refreshed upnp redirect for dht port: %i", udp_port)
2019-06-23 23:54:04 +02:00
except (asyncio.TimeoutError, UPnPError, NotImplementedError):
del self.upnp_redirects['UDP']
if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
try:
2018-12-15 21:31:02 +01:00
tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
self.upnp_redirects['TCP'] = tcp_port
log.info("refreshed upnp redirect for peer port: %i", tcp_port)
2019-06-23 23:54:04 +02:00
except (asyncio.TimeoutError, UPnPError, NotImplementedError):
del self.upnp_redirects['TCP']
if ('TCP' in self.upnp_redirects and
PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components) and \
('UDP' in self.upnp_redirects and DHT_COMPONENT not in self.component_manager.skip_components):
if self.upnp_redirects:
log.debug("upnp redirects are still active")
2018-12-15 21:31:02 +01:00
async def start(self):
log.info("detecting external ip")
2018-08-05 19:12:39 +02:00
if not self.use_upnp:
self.external_ip, _ = await utils.get_external_ip(self.conf.lbryum_servers)
2018-08-05 19:12:39 +02:00
return
success = False
2018-12-15 21:31:02 +01:00
await self._maintain_redirects()
if self.upnp:
2021-04-07 02:48:10 +02:00
if not self.upnp_redirects and not all(
x in self.component_manager.skip_components
for x in (DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)
):
2019-06-23 23:54:04 +02:00
log.error("failed to setup upnp")
else:
success = True
if self.upnp_redirects:
log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
else:
log.error("failed to setup upnp")
if not self.external_ip:
self.external_ip, probed_url = await utils.get_external_ip(self.conf.lbryum_servers)
if self.external_ip:
log.info("detected external ip using %s fallback", probed_url)
2019-01-22 23:44:17 +01:00
if self.component_manager.analytics_manager:
2020-01-16 21:40:24 +01:00
self.component_manager.loop.create_task(
self.component_manager.analytics_manager.send_upnp_setup_success_fail(
success, await self.get_status()
)
2019-01-28 23:27:39 +01:00
)
2019-06-23 23:54:04 +02:00
self._maintain_redirects_task = self.component_manager.loop.create_task(
self._repeatedly_maintain_redirects(now=False)
)
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
if self.upnp_redirects:
2019-06-23 23:54:04 +02:00
log.info("Removing upnp redirects: %s", self.upnp_redirects)
2018-12-15 21:31:02 +01:00
await asyncio.wait([
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
2019-06-23 23:54:04 +02:00
], loop=self.component_manager.loop)
2019-01-22 23:44:17 +01:00
if self._maintain_redirects_task and not self._maintain_redirects_task.done():
self._maintain_redirects_task.cancel()
2018-07-24 18:26:29 +02:00
async def get_status(self):
return {
'aioupnp_version': aioupnp_version,
'redirects': self.upnp_redirects,
'gateway': 'No gateway found' if not self.upnp else self.upnp.gateway.manufacturer_string,
'dht_redirect_set': 'UDP' in self.upnp_redirects,
'peer_redirect_set': 'TCP' in self.upnp_redirects,
'external_ip': self.external_ip
}
2018-07-24 18:26:29 +02:00
class ExchangeRateManagerComponent(Component):
component_name = EXCHANGE_RATE_MANAGER_COMPONENT
def __init__(self, component_manager):
2019-02-12 05:54:24 +01:00
super().__init__(component_manager)
2018-07-24 18:26:29 +02:00
self.exchange_rate_manager = ExchangeRateManager()
@property
2019-01-22 23:44:17 +01:00
def component(self) -> ExchangeRateManager:
2018-07-24 18:26:29 +02:00
return self.exchange_rate_manager
2018-12-15 21:31:02 +01:00
async def start(self):
self.exchange_rate_manager.start()
2018-12-15 21:31:02 +01:00
async def stop(self):
self.exchange_rate_manager.stop()
2022-03-05 08:36:26 +01:00
class TrackerAnnouncerComponent(Component):
component_name = TRACKER_ANNOUNCER_COMPONENT
depends_on = [FILE_MANAGER_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
self.file_manager = None
self.announce_task = None
self.tracker_client: typing.Optional[TrackerClient] = None
2022-03-05 08:36:26 +01:00
@property
2022-03-05 09:00:57 +01:00
def component(self):
2022-04-04 04:20:02 +02:00
return self.tracker_client
2022-03-05 08:36:26 +01:00
@property
def running(self):
return self._running and self.announce_task and not self.announce_task.done()
2022-03-05 08:36:26 +01:00
async def announce_forever(self):
while True:
2022-05-06 09:01:01 +02:00
sleep_seconds = 60.0
announce_sd_hashes = []
2022-03-05 08:36:26 +01:00
for file in self.file_manager.get_filtered():
if not file.downloader:
continue
2022-05-06 09:01:01 +02:00
announce_sd_hashes.append(bytes.fromhex(file.sd_hash))
await self.tracker_client.announce_many(*announce_sd_hashes)
await asyncio.sleep(sleep_seconds)
2022-03-05 08:36:26 +01:00
async def start(self):
node = self.component_manager.get_component(DHT_COMPONENT) \
if self.component_manager.has_component(DHT_COMPONENT) else None
node_id = node.protocol.node_id if node else None
2022-03-12 06:15:45 +01:00
self.tracker_client = TrackerClient(node_id, self.conf.tcp_port, lambda: self.conf.tracker_servers)
await self.tracker_client.start()
2022-03-05 08:36:26 +01:00
self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
self.announce_task = asyncio.create_task(self.announce_forever())
async def stop(self):
self.file_manager = None
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()
self.announce_task = None
2022-03-08 21:40:53 +01:00
self.tracker_client.stop()