lbry-sdk/lbrynet/extras/daemon/Components.py

656 lines
26 KiB
Python
Raw Normal View History

import os
2018-10-16 17:03:56 +02:00
import asyncio
2018-12-15 21:31:02 +01:00
import aiohttp
import logging
import math
2018-08-03 15:36:03 +02:00
import binascii
2019-01-22 23:44:17 +01:00
import typing
import socket
from hashlib import sha256
from types import SimpleNamespace
2019-01-23 16:41:34 +01:00
import base58
2018-11-09 20:02:03 +01:00
from aioupnp import __version__ as aioupnp_version
2018-10-17 16:57:10 +02:00
from aioupnp.upnp import UPnP
from aioupnp.fault import UPnPError
2018-11-09 20:02:03 +01:00
import lbrynet.schema
2019-01-23 16:41:34 +01:00
from lbrynet import utils
2019-01-21 21:55:50 +01:00
from lbrynet.conf import HEADERS_FILE_SHA256_CHECKSUM
2018-11-09 20:02:03 +01:00
from lbrynet.dht.node import Node
2019-01-22 23:44:17 +01:00
from lbrynet.dht.peer import KademliaPeer
from lbrynet.dht.blob_announcer import BlobAnnouncer
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob_exchange.server import BlobServer
from lbrynet.stream.stream_manager import StreamManager
2018-11-09 20:02:03 +01:00
from lbrynet.extras.daemon.Component import Component
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
2018-11-09 20:02:03 +01:00
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.wallet import Network
2018-11-07 21:15:05 +01:00
log = logging.getLogger(__name__)
# settings must be initialized before this file is imported
DATABASE_COMPONENT = "database"
BLOB_COMPONENT = "blob_manager"
HEADERS_COMPONENT = "blockchain_headers"
2018-07-24 18:26:29 +02:00
WALLET_COMPONENT = "wallet"
DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer"
2019-01-22 23:44:17 +01:00
STREAM_MANAGER_COMPONENT = "stream_manager"
2018-07-24 18:26:29 +02:00
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
2018-12-15 21:31:02 +01:00
async def gather_dict(tasks: dict):
async def wait_value(key, value):
return key, await value
return dict(await asyncio.gather(*(
wait_value(*kv) for kv in tasks.items()
)))
2018-10-17 16:57:10 +02:00
2018-12-15 21:31:02 +01:00
async def get_external_ip(): # used if upnp is disabled or non-functioning
try:
2018-12-15 21:31:02 +01:00
async with aiohttp.ClientSession() as session:
async with session.get("https://api.lbry.io/ip") as resp:
response = await resp.json()
if response['success']:
return response['data']['ip']
except Exception as e:
pass
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def resolve_host(loop: asyncio.BaseEventLoop, url: str):
info = await loop.getaddrinfo(
url, 'https',
proto=socket.IPPROTO_TCP,
)
return info[0][4][0]
2018-07-24 18:26:29 +02:00
class DatabaseComponent(Component):
component_name = DATABASE_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
2018-07-24 18:26:29 +02:00
self.storage = None
@property
def component(self):
return self.storage
@staticmethod
def get_current_db_revision():
2018-07-24 18:26:29 +02:00
return 9
2019-01-21 21:55:50 +01:00
@property
def revision_filename(self):
2019-01-23 16:41:34 +01:00
return os.path.join(self.conf.data_dir, 'db_revision')
2019-01-21 21:55:50 +01:00
def _write_db_revision_file(self, version_num):
with open(self.revision_filename, mode='w') as db_revision:
db_revision.write(str(version_num))
2018-12-15 21:31:02 +01:00
async def start(self):
# check directories exist, create them if they don't
log.info("Loading databases")
2018-07-24 18:26:29 +02:00
2019-01-21 21:55:50 +01:00
if not os.path.exists(self.revision_filename):
log.warning("db_revision file not found. Creating it")
2018-07-24 18:26:29 +02:00
self._write_db_revision_file(self.get_current_db_revision())
# check the db migration and run any needed migrations
2019-01-21 21:55:50 +01:00
with open(self.revision_filename, "r") as revision_read_handle:
old_revision = int(revision_read_handle.read().strip())
2018-07-24 18:26:29 +02:00
if old_revision > self.get_current_db_revision():
raise Exception('This version of lbrynet is not compatible with the database\n'
'Your database is revision %i, expected %i' %
2018-07-24 18:26:29 +02:00
(old_revision, self.get_current_db_revision()))
if old_revision < self.get_current_db_revision():
from lbrynet.extras.daemon.migrator import dbmigrator
2018-07-24 18:26:29 +02:00
log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision())
2018-12-15 21:31:02 +01:00
await asyncio.get_event_loop().run_in_executor(
2019-01-21 21:55:50 +01:00
None, dbmigrator.migrate_db, self.conf.data_dir, old_revision, self.get_current_db_revision()
)
2018-07-24 18:26:29 +02:00
self._write_db_revision_file(self.get_current_db_revision())
log.info("Finished upgrading the databases.")
2018-12-15 21:31:02 +01:00
self.storage = SQLiteStorage(
2019-01-21 21:55:50 +01:00
self.conf, os.path.join(self.conf.data_dir, "lbrynet.sqlite")
2018-12-15 21:31:02 +01:00
)
await self.storage.open()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
await self.storage.close()
2018-07-24 18:26:29 +02:00
self.storage = None
HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest"
HEADER_SIZE = 112
class HeadersComponent(Component):
component_name = HEADERS_COMPONENT
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-21 21:55:50 +01:00
self.headers_dir = os.path.join(self.conf.wallet_dir, 'lbc_mainnet')
self.headers_file = os.path.join(self.headers_dir, 'headers')
2019-01-21 21:55:50 +01:00
self.old_file = os.path.join(self.conf.wallet_dir, 'blockchain_headers')
self._downloading_headers = None
self._headers_progress_percent = 0
2018-07-24 18:26:29 +02:00
@property
def component(self):
return self
async def get_status(self):
2018-08-03 15:36:03 +02:00
return {} if not self._downloading_headers else {
'downloading_headers': self._downloading_headers,
'download_progress': self._headers_progress_percent
}
2019-01-22 23:44:17 +01:00
async def fetch_headers_from_s3(self):
def collector(d, h_file):
h_file.write(d)
local_size = float(h_file.tell())
final_size = float(final_size_after_download)
self._headers_progress_percent = math.ceil(local_size / final_size * 100)
local_header_size = self.local_header_file_size()
resume_header = {"Range": f"bytes={local_header_size}-"}
2019-01-22 23:44:17 +01:00
async with aiohttp.request('get', HEADERS_URL, headers=resume_header) as response:
got_406 = response.status == 406 # our file is bigger
final_size_after_download = response.content_length + local_header_size
if got_406:
log.warning("s3 is more out of date than we are")
# should have something to download and a final length divisible by the header size
elif final_size_after_download and not final_size_after_download % HEADER_SIZE:
s3_height = (final_size_after_download / HEADER_SIZE) - 1
local_height = self.local_header_file_height()
if s3_height > local_height:
data = await response.read()
if local_header_size:
log.info("Resuming download of %i bytes from s3", response.content_length)
with open(self.headers_file, "a+b") as headers_file:
collector(data, headers_file)
else:
with open(self.headers_file, "wb") as headers_file:
collector(data, headers_file)
log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.",
s3_height)
self._check_header_file_integrity()
else:
2019-01-22 23:44:17 +01:00
log.warning("s3 is more out of date than we are")
else:
2019-01-22 23:44:17 +01:00
log.error("invalid size for headers from s3")
def local_header_file_height(self):
return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0)
def local_header_file_size(self):
if os.path.isfile(self.headers_file):
return os.stat(self.headers_file).st_size
return 0
2018-10-16 17:57:43 +02:00
async def get_remote_height(self):
ledger = SimpleNamespace()
ledger.config = {
2019-01-21 21:55:50 +01:00
'default_servers': self.conf.lbryum_servers,
'data_path': self.conf.wallet_dir
}
net = Network(ledger)
2018-10-16 17:57:43 +02:00
first_connection = net.on_connected.first
asyncio.ensure_future(net.start())
await first_connection
remote_height = await net.get_server_height()
await net.stop()
return remote_height
async def should_download_headers_from_s3(self):
2019-01-21 21:55:50 +01:00
if self.conf.blockchain_name != "lbrycrd_main":
2018-10-16 17:57:43 +02:00
return False
self._check_header_file_integrity()
2019-01-21 21:55:50 +01:00
s3_headers_depth = self.conf.s3_headers_depth
if not s3_headers_depth:
2018-10-16 17:57:43 +02:00
return False
local_height = self.local_header_file_height()
2018-10-16 17:57:43 +02:00
remote_height = await self.get_remote_height()
log.info("remote height: %i, local height: %i", remote_height, local_height)
if remote_height > (local_height + s3_headers_depth):
2018-10-16 17:57:43 +02:00
return True
return False
def _check_header_file_integrity(self):
# TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity
2019-01-21 21:55:50 +01:00
if self.conf.blockchain_name != "lbrycrd_main":
return
hashsum = sha256()
2019-01-21 21:55:50 +01:00
checksum_height, checksum = HEADERS_FILE_SHA256_CHECKSUM
checksum_length_in_bytes = checksum_height * HEADER_SIZE
if self.local_header_file_size() < checksum_length_in_bytes:
return
with open(self.headers_file, "rb") as headers_file:
hashsum.update(headers_file.read(checksum_length_in_bytes))
current_checksum = hashsum.hexdigest()
if current_checksum != checksum:
msg = f"Expected checksum {checksum}, got {current_checksum}"
log.warning("Wallet file corrupted, checksum mismatch. " + msg)
log.warning("Deleting header file so it can be downloaded again.")
os.unlink(self.headers_file)
elif (self.local_header_file_size() % HEADER_SIZE) != 0:
log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.")
with open(self.headers_file, "rb+") as headers_file:
headers_file.truncate(checksum_length_in_bytes)
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def start(self):
if not os.path.exists(self.headers_dir):
os.mkdir(self.headers_dir)
if os.path.exists(self.old_file):
2018-08-05 02:35:04 +02:00
log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file)
os.rename(self.old_file, self.headers_file)
2018-12-15 21:31:02 +01:00
self._downloading_headers = await self.should_download_headers_from_s3()
if self._downloading_headers:
try:
2019-01-22 23:44:17 +01:00
await self.fetch_headers_from_s3()
except Exception as err:
log.error("failed to fetch headers from s3: %s", err)
finally:
self._downloading_headers = False
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
pass
2018-10-16 17:03:56 +02:00
class WalletComponent(Component):
component_name = WALLET_COMPONENT
depends_on = [DATABASE_COMPONENT, HEADERS_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
self.wallet_manager = None
2018-07-24 18:26:29 +02:00
@property
def component(self):
return self.wallet_manager
async def get_status(self):
2018-11-09 16:44:53 +01:00
if self.wallet_manager and self.running:
local_height = self.wallet_manager.network.get_local_height()
remote_height = self.wallet_manager.network.get_server_height()
2018-10-16 22:00:12 +02:00
best_hash = self.wallet_manager.get_best_blockhash()
return {
'blocks': max(local_height, 0),
'blocks_behind': max(remote_height - local_height, 0),
2018-08-03 15:36:03 +02:00
'best_blockhash': best_hash,
'is_encrypted': self.wallet_manager.wallet.use_encryption,
'is_locked': not self.wallet_manager.is_wallet_unlocked,
2018-10-16 22:00:12 +02:00
}
2018-12-15 21:31:02 +01:00
async def start(self):
log.info("Starting torba wallet")
storage = self.component_manager.get_component(DATABASE_COMPONENT)
2019-01-21 21:55:50 +01:00
lbrynet.schema.BLOCKCHAIN_NAME = self.conf.blockchain_name
self.wallet_manager = await LbryWalletManager.from_lbrynet_config(self.conf, storage)
self.wallet_manager.old_db = storage
2018-12-15 21:31:02 +01:00
await self.wallet_manager.start()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
await self.wallet_manager.stop()
self.wallet_manager = None
2018-07-24 18:26:29 +02:00
class BlobComponent(Component):
component_name = BLOB_COMPONENT
depends_on = [DATABASE_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-22 23:44:17 +01:00
self.blob_manager: BlobFileManager = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[BlobFileManager]:
return self.blob_manager
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def start(self):
storage = self.component_manager.get_component(DATABASE_COMPONENT)
2019-01-22 23:44:17 +01:00
data_store = None
if DHT_COMPONENT not in self.component_manager.skip_components:
2019-01-22 23:44:17 +01:00
dht_node: Node = self.component_manager.get_component(DHT_COMPONENT)
if dht_node:
2019-01-22 23:44:17 +01:00
data_store = dht_node.protocol.data_store
self.blob_manager = BlobFileManager(asyncio.get_event_loop(), os.path.join(self.conf.data_dir, "blobfiles"),
storage, data_store)
return await self.blob_manager.setup()
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
while self.blob_manager and self.blob_manager.blobs:
_, blob = self.blob_manager.blobs.popitem()
await blob.close()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def get_status(self):
2018-08-03 15:36:03 +02:00
count = 0
if self.blob_manager:
2019-01-22 23:44:17 +01:00
count = len(self.blob_manager.completed_blob_hashes)
2018-12-15 21:31:02 +01:00
return {'finished_blobs': count}
2018-08-02 23:33:56 +02:00
2018-07-24 18:26:29 +02:00
class DHTComponent(Component):
component_name = DHT_COMPONENT
depends_on = [UPNP_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-22 23:44:17 +01:00
self.dht_node: Node = None
2018-07-24 18:26:29 +02:00
self.upnp_component = None
self.external_udp_port = None
self.external_peer_port = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[Node]:
2018-07-24 18:26:29 +02:00
return self.dht_node
async def get_status(self):
return {
2019-01-23 16:41:34 +01:00
'node_id': binascii.hexlify(self.dht_node.protocol.node_id),
2019-01-22 23:44:17 +01:00
'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.protocol.routing_table.get_peers())
}
2019-01-23 16:41:34 +01:00
def get_node_id(self):
node_id_filename = os.path.join(self.conf.data_dir, "node_id")
if os.path.isfile(node_id_filename):
with open(node_id_filename, "r") as node_id_file:
return base58.b58decode(str(node_id_file.read()).strip())
node_id = utils.generate_id()
with open(node_id_filename, "w") as node_id_file:
node_id_file.write(base58.b58encode(node_id).decode())
return node_id
2018-12-15 21:31:02 +01:00
async def start(self):
2019-01-22 23:44:17 +01:00
log.info("start the dht")
2018-07-24 18:26:29 +02:00
self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
2019-01-28 15:51:02 +01:00
self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port)
self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.udp_port)
external_ip = self.upnp_component.external_ip
if not external_ip:
log.warning("UPnP component failed to get external ip")
2018-12-15 21:31:02 +01:00
external_ip = await get_external_ip()
if not external_ip:
log.warning("failed to get external ip")
2018-07-24 18:26:29 +02:00
2018-11-07 21:15:05 +01:00
self.dht_node = Node(
2019-01-22 23:44:17 +01:00
asyncio.get_event_loop(),
self.component_manager.peer_manager,
2019-01-23 16:41:34 +01:00
node_id=self.get_node_id(),
2019-01-28 15:51:02 +01:00
internal_udp_port=self.conf.udp_port,
2019-01-22 23:44:17 +01:00
udp_port=self.external_udp_port,
external_ip=external_ip,
2019-01-28 15:51:02 +01:00
peer_port=self.external_peer_port,
rpc_timeout=self.conf.node_rpc_timeout
2019-01-22 23:44:17 +01:00
)
self.dht_node.start(
2019-01-28 15:51:02 +01:00
interface=self.conf.network_interface, known_node_urls=self.conf.known_dht_nodes
2018-07-24 18:26:29 +02:00
)
log.info("Started the dht")
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
self.dht_node.stop()
2018-07-24 18:26:29 +02:00
class HashAnnouncerComponent(Component):
component_name = HASH_ANNOUNCER_COMPONENT
depends_on = [DHT_COMPONENT, DATABASE_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-22 23:44:17 +01:00
self.hash_announcer: BlobAnnouncer = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[BlobAnnouncer]:
2018-07-24 18:26:29 +02:00
return self.hash_announcer
async def start(self):
2018-07-24 18:26:29 +02:00
storage = self.component_manager.get_component(DATABASE_COMPONENT)
dht_node = self.component_manager.get_component(DHT_COMPONENT)
2019-01-22 23:44:17 +01:00
self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage)
2019-01-28 15:51:02 +01:00
self.hash_announcer.start(self.conf.concurrent_blob_announcers)
2019-01-22 23:44:17 +01:00
log.info("Started blob announcer")
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
2018-12-15 21:31:02 +01:00
self.hash_announcer.stop()
2019-01-22 23:44:17 +01:00
log.info("Stopped blob announcer")
2018-07-24 18:26:29 +02:00
async def get_status(self):
2018-08-02 23:33:56 +02:00
return {
2019-01-22 23:44:17 +01:00
'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.announce_queue)
2018-08-02 23:33:56 +02:00
}
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
class StreamManagerComponent(Component):
component_name = STREAM_MANAGER_COMPONENT
depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-22 23:44:17 +01:00
self.stream_manager: StreamManager = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[StreamManager]:
return self.stream_manager
2018-07-24 18:26:29 +02:00
async def get_status(self):
2019-01-22 23:44:17 +01:00
if not self.stream_manager:
return
return {
2019-01-22 23:44:17 +01:00
'managed_files': len(self.stream_manager.streams)
}
2019-01-22 23:44:17 +01:00
async def start(self):
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT)
try:
node = self.component_manager.get_component(DHT_COMPONENT)
except NameError:
node = None
2018-07-24 18:26:29 +02:00
log.info('Starting the file manager')
2019-01-22 23:44:17 +01:00
loop = asyncio.get_event_loop()
self.stream_manager = StreamManager(
loop, blob_manager, wallet, storage, node, self.conf.blob_download_timeout,
self.conf.peer_connect_timeout, [
KademliaPeer(loop, address=(await resolve_host(loop, url)), tcp_port=port + 1)
for url, port in self.conf.reflector_servers
2019-01-25 21:05:22 +01:00
], self.conf.reflector_servers
2019-01-21 21:55:50 +01:00
)
2019-01-22 23:44:17 +01:00
await self.stream_manager.start()
log.info('Done setting up file manager')
2018-07-24 18:26:29 +02:00
2019-01-22 23:44:17 +01:00
async def stop(self):
await self.stream_manager.stop()
2018-07-24 18:26:29 +02:00
class PeerProtocolServerComponent(Component):
component_name = PEER_PROTOCOL_SERVER_COMPONENT
2019-01-22 23:44:17 +01:00
depends_on = [UPNP_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT]
2018-07-24 18:26:29 +02:00
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-22 23:44:17 +01:00
self.blob_server: BlobServer = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> typing.Optional[BlobServer]:
return self.blob_server
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def start(self):
2019-01-22 23:44:17 +01:00
log.info("start blob server")
upnp = self.component_manager.get_component(UPNP_COMPONENT)
2019-01-22 23:44:17 +01:00
blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT)
wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT)
2019-01-28 15:51:02 +01:00
peer_port = upnp.upnp_redirects.get("TCP", self.conf.tcp_port)
2019-01-22 23:44:17 +01:00
address = await wallet.get_unused_address()
self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address)
2019-01-28 15:51:02 +01:00
self.blob_server.start_server(peer_port, interface=self.conf.network_interface)
2019-01-22 23:44:17 +01:00
await self.blob_server.started_listening.wait()
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
2019-01-22 23:44:17 +01:00
if self.blob_server:
self.blob_server.stop_server()
2018-07-24 18:26:29 +02:00
class UPnPComponent(Component):
component_name = UPNP_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
2019-01-28 15:51:02 +01:00
self._int_peer_port = self.conf.tcp_port
self._int_dht_node_port = self.conf.udp_port
2019-01-21 21:55:50 +01:00
self.use_upnp = self.conf.use_upnp
self.upnp = None
2018-07-30 23:58:17 +02:00
self.upnp_redirects = {}
self.external_ip = None
self._maintain_redirects_task = None
2018-07-24 18:26:29 +02:00
@property
2019-01-22 23:44:17 +01:00
def component(self) -> 'UPnPComponent':
2018-07-24 18:26:29 +02:00
return self
async def _repeatedly_maintain_redirects(self, now=True):
while True:
if now:
await self._maintain_redirects()
await asyncio.sleep(360)
2018-12-15 21:31:02 +01:00
async def _maintain_redirects(self):
# setup the gateway if necessary
if not self.upnp:
try:
2018-12-15 21:31:02 +01:00
self.upnp = await UPnP.discover()
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
except Exception as err:
log.warning("upnp discovery failed: %s", err)
self.upnp = None
# update the external ip
external_ip = None
if self.upnp:
try:
2018-12-15 21:31:02 +01:00
external_ip = await self.upnp.get_external_ip()
2018-11-12 20:47:11 +01:00
if external_ip != "0.0.0.0" and not self.external_ip:
log.info("got external ip from UPnP: %s", external_ip)
except (asyncio.TimeoutError, UPnPError):
pass
if external_ip == "0.0.0.0" or not external_ip:
log.warning("unable to get external ip from UPnP, checking lbry.io fallback")
2018-12-15 21:31:02 +01:00
external_ip = await get_external_ip()
if self.external_ip and self.external_ip != external_ip:
log.info("external ip changed from %s to %s", self.external_ip, external_ip)
self.external_ip = external_ip
assert self.external_ip is not None # TODO: handle going/starting offline
if not self.upnp_redirects and self.upnp: # setup missing redirects
try:
log.info("add UPnP port mappings")
d = {}
if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
2018-12-15 21:31:02 +01:00
d["TCP"] = self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
if DHT_COMPONENT not in self.component_manager.skip_components:
2018-12-15 21:31:02 +01:00
d["UDP"] = self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
upnp_redirects = await gather_dict(d)
log.info("set up redirects: %s", upnp_redirects)
self.upnp_redirects.update(upnp_redirects)
except (asyncio.TimeoutError, UPnPError):
self.upnp = None
return self._maintain_redirects()
elif self.upnp: # check existing redirects are still active
found = set()
2018-12-15 21:31:02 +01:00
mappings = await self.upnp.get_redirects()
for mapping in mappings:
proto = mapping['NewProtocol']
if proto in self.upnp_redirects and mapping['NewExternalPort'] == self.upnp_redirects[proto]:
if mapping['NewInternalClient'] == self.upnp.lan_address:
found.add(proto)
if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components:
try:
2018-12-15 21:31:02 +01:00
udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
self.upnp_redirects['UDP'] = udp_port
log.info("refreshed upnp redirect for dht port: %i", udp_port)
except (asyncio.TimeoutError, UPnPError):
del self.upnp_redirects['UDP']
if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
try:
2018-12-15 21:31:02 +01:00
tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
self.upnp_redirects['TCP'] = tcp_port
log.info("refreshed upnp redirect for peer port: %i", tcp_port)
except (asyncio.TimeoutError, UPnPError):
del self.upnp_redirects['TCP']
if ('TCP' in self.upnp_redirects
and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components) and (
'UDP' in self.upnp_redirects and DHT_COMPONENT not in self.component_manager.skip_components):
if self.upnp_redirects:
log.debug("upnp redirects are still active")
2018-12-15 21:31:02 +01:00
async def start(self):
log.info("detecting external ip")
2018-08-05 19:12:39 +02:00
if not self.use_upnp:
2018-12-15 21:31:02 +01:00
self.external_ip = await get_external_ip()
2018-08-05 19:12:39 +02:00
return
success = False
2018-12-15 21:31:02 +01:00
await self._maintain_redirects()
if self.upnp:
if not self.upnp_redirects and not all([x in self.component_manager.skip_components for x in
(DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)]):
log.error("failed to setup upnp, debugging infomation: %s", self.upnp.zipped_debugging_info)
else:
success = True
if self.upnp_redirects:
log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
else:
log.error("failed to setup upnp")
2019-01-22 23:44:17 +01:00
if self.component_manager.analytics_manager:
self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status())
self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False))
2018-07-24 18:26:29 +02:00
2018-12-15 21:31:02 +01:00
async def stop(self):
if self.upnp_redirects:
await asyncio.wait([
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
])
2019-01-22 23:44:17 +01:00
if self._maintain_redirects_task and not self._maintain_redirects_task.done():
self._maintain_redirects_task.cancel()
2018-07-24 18:26:29 +02:00
async def get_status(self):
return {
'aioupnp_version': aioupnp_version,
'redirects': self.upnp_redirects,
'gateway': 'No gateway found' if not self.upnp else self.upnp.gateway.manufacturer_string,
'dht_redirect_set': 'UDP' in self.upnp_redirects,
'peer_redirect_set': 'TCP' in self.upnp_redirects,
'external_ip': self.external_ip
}
2018-07-24 18:26:29 +02:00
class ExchangeRateManagerComponent(Component):
component_name = EXCHANGE_RATE_MANAGER_COMPONENT
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.exchange_rate_manager = ExchangeRateManager()
@property
2019-01-22 23:44:17 +01:00
def component(self) -> ExchangeRateManager:
2018-07-24 18:26:29 +02:00
return self.exchange_rate_manager
2018-12-15 21:31:02 +01:00
async def start(self):
self.exchange_rate_manager.start()
2018-12-15 21:31:02 +01:00
async def stop(self):
self.exchange_rate_manager.stop()