diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 53f458b5e..5316e70ec 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -31,9 +31,9 @@ class BlobDownloader: # TODO: refactor to be the base class used by StreamDownl self.blob: 'BlobFile' = None self.blob_queue = asyncio.Queue(loop=self.loop) - self.blob_download_timeout = config.get('blob_download_timeout') - self.peer_connect_timeout = config.get('peer_connect_timeout') - self.max_connections = config.get('max_connections_per_stream') + self.blob_download_timeout = config.blob_download_timeout + self.peer_connect_timeout = config.peer_connect_timeout + self.max_connections = config.max_connections_per_download async def _request_blob(self, peer: 'KademliaPeer'): if self.blob.get_is_verified(): diff --git a/lbrynet/conf.py b/lbrynet/conf.py index c220b3e8d..68ce3dc8c 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -453,6 +453,7 @@ class CLIConfig(BaseConfig): class Config(CLIConfig): + # directories data_dir = Path("Directory path to store blobs.", metavar='DIR') download_dir = Path( "Directory path to place assembled files downloaded from LBRY.", @@ -463,41 +464,69 @@ class Config(CLIConfig): previous_names=['lbryum_wallet_dir'], metavar='DIR' ) - share_usage_data = Toggle( - "Whether to share usage stats and diagnostic info with LBRY.", True, - previous_names=['upload_log', 'upload_log', 'share_debug_info'] + # network + use_upnp = Toggle( + "Use UPnP to setup temporary port redirects for the DHT and the hosting of blobs. If you manually forward" + "ports or have firewall rules you likely want to disable this.", True ) - cache_time = Integer("", 150) - dht_node_port = Integer("", 4444) - download_timeout = Float("", 30.0) - blob_download_timeout = Float("", 20.0) - peer_connect_timeout = Float("", 3.0) - node_rpc_timeout = Float("", constants.rpc_timeout) - announce_head_blobs_only = Toggle("", True) - concurrent_announcers = Integer("", 10) - known_dht_nodes = Servers("", [ + udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port']) + tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port']) + network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0') + + # protocol timeouts + download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) + blob_download_timeout = Float("Timeout to download a blob from a peer", 20.0) + peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) + node_rpc_timeout = Float("Timeout when making a DHT request", constants.rpc_timeout) + + # blob announcement and download + announce_head_and_sd_only = Toggle( + "Announce only the descriptor and first (rather than all) data blob for a stream to the DHT", True, + previous_names=['announce_head_blobs_only'] + ) + concurrent_blob_announcers = Integer( + "Number of blobs to iteratively announce at once", 10, previous_names=['concurrent_announcers'] + ) + max_connections_per_download = Integer( + "Maximum number of peers to connect to while downloading a blob", 5, + previous_names=['max_connections_per_stream'] + ) + max_key_fee = MaxKeyFee( + "Don't download streams with fees exceeding this amount", {'currency': 'USD', 'amount': 50.0} + ) # TODO: use this + + # reflector settings + reflect_streams = Toggle( + "Upload completed streams (published and downloaded) reflector in order to re-host them", True, + previous_names=['reflect_uploads'] + ) + + # servers + reflector_servers = Servers("Reflector re-hosting servers", [ + ('reflector.lbry.io', 5566) + ]) + lbryum_servers = Servers("SPV wallet servers", [ + ('lbryumx1.lbry.io', 50001), + ('lbryumx2.lbry.io', 50001) + ]) + known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ ('lbrynet1.lbry.io', 4444), # US EAST ('lbrynet2.lbry.io', 4444), # US WEST ('lbrynet3.lbry.io', 4444), # EU ('lbrynet4.lbry.io', 4444) # ASIA ]) - max_connections_per_stream = Integer("", 5) - max_key_fee = MaxKeyFee("", {'currency': 'USD', 'amount': 50.0}) - peer_port = Integer("", 3333) - # if reflect_uploads is True, send files to reflector after publishing (as well as a periodic check in the - # event the initial upload failed or was disconnected part way through, provided the auto_re_reflect_interval > 0) - reflect_uploads = Toggle("", True) - reflector_servers = Servers("", [ - ('reflector.lbry.io', 5566) - ]) - use_upnp = Toggle("", True) - blockchain_name = String("", 'lbrycrd_main') - lbryum_servers = Servers("", [ - ('lbryumx1.lbry.io', 50001), - ('lbryumx2.lbry.io', 50001) - ]) + + # blockchain + blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main') s3_headers_depth = Integer("download headers from s3 when the local height is more than 10 chunks behind", 96 * 10) + cache_time = Integer("Time to cache resolved claims", 150) # TODO: use this + + # daemon components_to_skip = Strings("components which will be skipped during start-up of daemon", []) + share_usage_data = Toggle( + "Whether to share usage stats and diagnostic info with LBRY.", True, + previous_names=['upload_log', 'upload_log', 'share_debug_info'] + ) def __init__(self, **kwargs): super().__init__(**kwargs) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c8524617d..d4f773ae9 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -20,10 +20,10 @@ log = logging.getLogger(__name__) class Node: def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, - internal_udp_port: int, peer_port: int, external_ip: str): + internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: typing.Optional[float] = 5.0): self.loop = loop self.internal_udp_port = internal_udp_port - self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port) + self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout) self.listening_port: asyncio.DatagramTransport = None self.joined = asyncio.Event(loop=self.loop) self._join_task: asyncio.Task = None @@ -123,7 +123,10 @@ class Node: self.protocol.ping_queue.start() self._refresh_task = self.loop.create_task(self.refresh_node()) + # resolve the known node urls known_node_addresses = known_node_addresses or [] + url_to_addr = {} + if known_node_urls: for host, port in known_node_urls: info = await self.loop.getaddrinfo( @@ -132,23 +135,35 @@ class Node: ) if (info[0][4][0], port) not in known_node_addresses: known_node_addresses.append((info[0][4][0], port)) - futs = [] - for address, port in known_node_addresses: - peer = self.protocol.get_rpc_peer(KademliaPeer(self.loop, address, udp_port=port)) - futs.append(peer.ping()) - if futs: - await asyncio.wait(futs, loop=self.loop) + url_to_addr[info[0][4][0]] = host - async with self.peer_search_junction(self.protocol.node_id, max_results=16) as junction: - async for peers in junction: - for peer in peers: + if known_node_addresses: + while not self.protocol.routing_table.get_peers(): + success = False + # ping the seed nodes, this will set their node ids (since we don't know them ahead of time) + for address, port in known_node_addresses: + peer = self.protocol.get_rpc_peer(KademliaPeer(self.loop, address, udp_port=port)) try: - await self.protocol.get_rpc_peer(peer).ping() - except (asyncio.TimeoutError, RemoteException): - pass - self.joined.set() + await peer.ping() + success = True + except asyncio.TimeoutError: + log.warning("seed node (%s:%i) timed out in %s", url_to_addr.get(address, address), port, + round(self.protocol.rpc_timeout, 2)) + if success: + break + # now that we have the seed nodes in routing, to an iterative lookup of our own id to populate the buckets + # in the routing table with good peers who are near us + async with self.peer_search_junction(self.protocol.node_id, max_results=16) as junction: + async for peers in junction: + for peer in peers: + try: + await self.protocol.get_rpc_peer(peer).ping() + except (asyncio.TimeoutError, RemoteException): + pass + log.info("Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), self.protocol.routing_table.buckets_with_contacts()) + self.joined.set() def start(self, interface: str, known_node_urls: typing.List[typing.Tuple[str, int]]): self._join_task = self.loop.create_task( diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index c1afdc65b..fb850c2dd 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -376,8 +376,8 @@ class DHTComponent(Component): async def start(self): log.info("start the dht") self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) - self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.peer_port) - self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.dht_node_port) + self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port) + self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.udp_port) external_ip = self.upnp_component.external_ip if not external_ip: log.warning("UPnP component failed to get external ip") @@ -389,13 +389,14 @@ class DHTComponent(Component): asyncio.get_event_loop(), self.component_manager.peer_manager, node_id=self.get_node_id(), - internal_udp_port=self.conf.dht_node_port, + internal_udp_port=self.conf.udp_port, udp_port=self.external_udp_port, external_ip=external_ip, - peer_port=self.external_peer_port + peer_port=self.external_peer_port, + rpc_timeout=self.conf.node_rpc_timeout ) self.dht_node.start( - interface='0.0.0.0', known_node_urls=self.conf.known_dht_nodes + interface=self.conf.network_interface, known_node_urls=self.conf.known_dht_nodes ) log.info("Started the dht") @@ -419,7 +420,7 @@ class HashAnnouncerComponent(Component): storage = self.component_manager.get_component(DATABASE_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT) self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage) - self.hash_announcer.start(self.conf.concurrent_announcers) + self.hash_announcer.start(self.conf.concurrent_blob_announcers) log.info("Started blob announcer") async def stop(self): @@ -492,10 +493,10 @@ class PeerProtocolServerComponent(Component): upnp = self.component_manager.get_component(UPNP_COMPONENT) blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT) wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT) - peer_port = upnp.upnp_redirects.get("TCP", self.conf.peer_port) + peer_port = upnp.upnp_redirects.get("TCP", self.conf.tcp_port) address = await wallet.get_unused_address() self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address) - self.blob_server.start_server(peer_port, interface='0.0.0.0') + self.blob_server.start_server(peer_port, interface=self.conf.network_interface) await self.blob_server.started_listening.wait() async def stop(self): @@ -508,8 +509,8 @@ class UPnPComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self._int_peer_port = self.conf.peer_port - self._int_dht_node_port = self.conf.dht_node_port + self._int_peer_port = self.conf.tcp_port + self._int_dht_node_port = self.conf.udp_port self.use_upnp = self.conf.use_upnp self.upnp = None self.upnp_redirects = {} diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index d736a44bc..ce2e3ef03 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1551,8 +1551,6 @@ class Daemon(metaclass=JSONRPCServerType): } """ - timeout = timeout if timeout is not None else self.conf.download_timeout - parsed_uri = parse_lbry_uri(uri) if parsed_uri.is_channel: raise Exception("cannot download a channel claim, specify a /path") @@ -1584,7 +1582,7 @@ class Daemon(metaclass=JSONRPCServerType): stream = existing[0] else: stream = await self.stream_manager.download_stream_from_claim( - self.dht_node, self.conf.download_dir, resolved, file_name, timeout, fee_amount, fee_address + self.dht_node, self.conf, resolved, file_name, timeout, fee_amount, fee_address ) if stream: return stream.as_dict() diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 48586f321..c6092d89f 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -275,7 +275,7 @@ class SQLiteStorage(SQLiteMixin): def get_blobs_to_announce(self): def get_and_update(transaction): timestamp = self.loop.time() - if self.conf.announce_head_blobs_only: + if self.conf.announce_head_and_sd_only: r = transaction.execute( "select blob_hash from blob " "where blob_hash is not null and " @@ -694,5 +694,5 @@ class SQLiteStorage(SQLiteMixin): "select s.sd_hash from stream s " "left outer join reflected_stream r on s.sd_hash=r.sd_hash " "where r.timestamp is null or r.timestamp < ?", - self.loop.time() - self.conf.auto_re_reflect_interval + self.loop.time() - 86400 ) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 4e8eabd19..18b1a8804 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -83,7 +83,7 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t else: log.info("downloader idle...") for peer in to_add: - if len(self.running_download_requests) >= 8: + if len(self.running_download_requests) >= self.max_connections_per_stream: break task = self.loop.create_task(self._request_blob(peer)) self.requested_from[self.current_blob.blob_hash][peer] = task diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 7f584f7ab..1f4395d23 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -9,6 +9,7 @@ from lbrynet.stream.managed_stream import ManagedStream from lbrynet.schema.claim import ClaimDict from lbrynet.extras.daemon.storage import StoredStreamClaim, lbc_to_dewies if typing.TYPE_CHECKING: + from lbrynet.conf import Config from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.dht.peer import KademliaPeer from lbrynet.dht.node import Node @@ -166,16 +167,14 @@ class StreamManager: ) async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict, - file_name: typing.Optional[str] = None, data_rate: typing.Optional[int] = 0, - sd_blob_timeout: typing.Optional[float] = 60 - ) -> typing.Optional[ManagedStream]: + file_name: typing.Optional[str] = None) -> typing.Optional[ManagedStream]: claim = ClaimDict.load_dict(claim_info['value']) downloader = StreamDownloader(self.loop, self.blob_manager, claim.source_hash.decode(), self.peer_timeout, self.peer_connect_timeout, download_directory, file_name, self.fixed_peers) try: downloader.download(node) - await asyncio.wait_for(downloader.got_descriptor.wait(), sd_blob_timeout) + await downloader.got_descriptor.wait() log.info("got descriptor %s for %s", claim.source_hash.decode(), claim_info['name']) except (asyncio.TimeoutError, asyncio.CancelledError): log.info("stream timeout") @@ -187,7 +186,7 @@ class StreamManager: if not await self.blob_manager.storage.file_exists(downloader.sd_hash): await self.blob_manager.storage.save_downloaded_file( downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory, - data_rate + 0.0 ) await self.blob_manager.storage.save_content_claim( downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" @@ -210,9 +209,9 @@ class StreamManager: except asyncio.CancelledError: await downloader.stop() - async def download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict, + async def download_stream_from_claim(self, node: 'Node', config: 'Config', claim_info: typing.Dict, file_name: typing.Optional[str] = None, - sd_blob_timeout: typing.Optional[float] = 60, + timeout: typing.Optional[float] = 60, fee_amount: typing.Optional[float] = 0.0, fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]: log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) @@ -229,10 +228,10 @@ class StreamManager: self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop) stream_task = self.loop.create_task( - self._download_stream_from_claim(node, download_directory, claim_info, file_name, 0, sd_blob_timeout) + self._download_stream_from_claim(node, config.download_dir, claim_info, file_name) ) try: - await asyncio.wait_for(stream_task, sd_blob_timeout) + await asyncio.wait_for(stream_task, timeout or config.download_timeout) stream = await stream_task self.starting_streams[sd_hash].set_result(stream) if fee_address and fee_amount: diff --git a/tests/integration/wallet/test_commands.py b/tests/integration/wallet/test_commands.py index 4d3fd10ee..c6ac863c1 100644 --- a/tests/integration/wallet/test_commands.py +++ b/tests/integration/wallet/test_commands.py @@ -41,7 +41,7 @@ class CommandTestCase(IntegrationTestCase): conf.download_dir = self.wallet_node.data_path conf.share_usage_data = False conf.use_upnp = False - conf.reflect_uploads = False + conf.reflect_streams = False conf.blockchain_name = 'lbrycrd_regtest' conf.lbryum_servers = [('localhost', 50001)] conf.known_dht_nodes = []