forked from LBRYCommunity/lbry-sdk
clean up settings and use them
This commit is contained in:
parent
4a11cf007f
commit
778d3826ab
9 changed files with 113 additions and 71 deletions
|
@ -31,9 +31,9 @@ class BlobDownloader: # TODO: refactor to be the base class used by StreamDownl
|
|||
self.blob: 'BlobFile' = None
|
||||
self.blob_queue = asyncio.Queue(loop=self.loop)
|
||||
|
||||
self.blob_download_timeout = config.get('blob_download_timeout')
|
||||
self.peer_connect_timeout = config.get('peer_connect_timeout')
|
||||
self.max_connections = config.get('max_connections_per_stream')
|
||||
self.blob_download_timeout = config.blob_download_timeout
|
||||
self.peer_connect_timeout = config.peer_connect_timeout
|
||||
self.max_connections = config.max_connections_per_download
|
||||
|
||||
async def _request_blob(self, peer: 'KademliaPeer'):
|
||||
if self.blob.get_is_verified():
|
||||
|
|
|
@ -453,6 +453,7 @@ class CLIConfig(BaseConfig):
|
|||
|
||||
|
||||
class Config(CLIConfig):
|
||||
# directories
|
||||
data_dir = Path("Directory path to store blobs.", metavar='DIR')
|
||||
download_dir = Path(
|
||||
"Directory path to place assembled files downloaded from LBRY.",
|
||||
|
@ -463,41 +464,69 @@ class Config(CLIConfig):
|
|||
previous_names=['lbryum_wallet_dir'], metavar='DIR'
|
||||
)
|
||||
|
||||
share_usage_data = Toggle(
|
||||
"Whether to share usage stats and diagnostic info with LBRY.", True,
|
||||
previous_names=['upload_log', 'upload_log', 'share_debug_info']
|
||||
# network
|
||||
use_upnp = Toggle(
|
||||
"Use UPnP to setup temporary port redirects for the DHT and the hosting of blobs. If you manually forward"
|
||||
"ports or have firewall rules you likely want to disable this.", True
|
||||
)
|
||||
cache_time = Integer("", 150)
|
||||
dht_node_port = Integer("", 4444)
|
||||
download_timeout = Float("", 30.0)
|
||||
blob_download_timeout = Float("", 20.0)
|
||||
peer_connect_timeout = Float("", 3.0)
|
||||
node_rpc_timeout = Float("", constants.rpc_timeout)
|
||||
announce_head_blobs_only = Toggle("", True)
|
||||
concurrent_announcers = Integer("", 10)
|
||||
known_dht_nodes = Servers("", [
|
||||
udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port'])
|
||||
tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port'])
|
||||
network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0')
|
||||
|
||||
# protocol timeouts
|
||||
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
|
||||
blob_download_timeout = Float("Timeout to download a blob from a peer", 20.0)
|
||||
peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0)
|
||||
node_rpc_timeout = Float("Timeout when making a DHT request", constants.rpc_timeout)
|
||||
|
||||
# blob announcement and download
|
||||
announce_head_and_sd_only = Toggle(
|
||||
"Announce only the descriptor and first (rather than all) data blob for a stream to the DHT", True,
|
||||
previous_names=['announce_head_blobs_only']
|
||||
)
|
||||
concurrent_blob_announcers = Integer(
|
||||
"Number of blobs to iteratively announce at once", 10, previous_names=['concurrent_announcers']
|
||||
)
|
||||
max_connections_per_download = Integer(
|
||||
"Maximum number of peers to connect to while downloading a blob", 5,
|
||||
previous_names=['max_connections_per_stream']
|
||||
)
|
||||
max_key_fee = MaxKeyFee(
|
||||
"Don't download streams with fees exceeding this amount", {'currency': 'USD', 'amount': 50.0}
|
||||
) # TODO: use this
|
||||
|
||||
# reflector settings
|
||||
reflect_streams = Toggle(
|
||||
"Upload completed streams (published and downloaded) reflector in order to re-host them", True,
|
||||
previous_names=['reflect_uploads']
|
||||
)
|
||||
|
||||
# servers
|
||||
reflector_servers = Servers("Reflector re-hosting servers", [
|
||||
('reflector.lbry.io', 5566)
|
||||
])
|
||||
lbryum_servers = Servers("SPV wallet servers", [
|
||||
('lbryumx1.lbry.io', 50001),
|
||||
('lbryumx2.lbry.io', 50001)
|
||||
])
|
||||
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
|
||||
('lbrynet1.lbry.io', 4444), # US EAST
|
||||
('lbrynet2.lbry.io', 4444), # US WEST
|
||||
('lbrynet3.lbry.io', 4444), # EU
|
||||
('lbrynet4.lbry.io', 4444) # ASIA
|
||||
])
|
||||
max_connections_per_stream = Integer("", 5)
|
||||
max_key_fee = MaxKeyFee("", {'currency': 'USD', 'amount': 50.0})
|
||||
peer_port = Integer("", 3333)
|
||||
# if reflect_uploads is True, send files to reflector after publishing (as well as a periodic check in the
|
||||
# event the initial upload failed or was disconnected part way through, provided the auto_re_reflect_interval > 0)
|
||||
reflect_uploads = Toggle("", True)
|
||||
reflector_servers = Servers("", [
|
||||
('reflector.lbry.io', 5566)
|
||||
])
|
||||
use_upnp = Toggle("", True)
|
||||
blockchain_name = String("", 'lbrycrd_main')
|
||||
lbryum_servers = Servers("", [
|
||||
('lbryumx1.lbry.io', 50001),
|
||||
('lbryumx2.lbry.io', 50001)
|
||||
])
|
||||
|
||||
# blockchain
|
||||
blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main')
|
||||
s3_headers_depth = Integer("download headers from s3 when the local height is more than 10 chunks behind", 96 * 10)
|
||||
cache_time = Integer("Time to cache resolved claims", 150) # TODO: use this
|
||||
|
||||
# daemon
|
||||
components_to_skip = Strings("components which will be skipped during start-up of daemon", [])
|
||||
share_usage_data = Toggle(
|
||||
"Whether to share usage stats and diagnostic info with LBRY.", True,
|
||||
previous_names=['upload_log', 'upload_log', 'share_debug_info']
|
||||
)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
|
|
@ -20,10 +20,10 @@ log = logging.getLogger(__name__)
|
|||
|
||||
class Node:
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
||||
internal_udp_port: int, peer_port: int, external_ip: str):
|
||||
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: typing.Optional[float] = 5.0):
|
||||
self.loop = loop
|
||||
self.internal_udp_port = internal_udp_port
|
||||
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port)
|
||||
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout)
|
||||
self.listening_port: asyncio.DatagramTransport = None
|
||||
self.joined = asyncio.Event(loop=self.loop)
|
||||
self._join_task: asyncio.Task = None
|
||||
|
@ -123,7 +123,10 @@ class Node:
|
|||
self.protocol.ping_queue.start()
|
||||
self._refresh_task = self.loop.create_task(self.refresh_node())
|
||||
|
||||
# resolve the known node urls
|
||||
known_node_addresses = known_node_addresses or []
|
||||
url_to_addr = {}
|
||||
|
||||
if known_node_urls:
|
||||
for host, port in known_node_urls:
|
||||
info = await self.loop.getaddrinfo(
|
||||
|
@ -132,13 +135,24 @@ class Node:
|
|||
)
|
||||
if (info[0][4][0], port) not in known_node_addresses:
|
||||
known_node_addresses.append((info[0][4][0], port))
|
||||
futs = []
|
||||
url_to_addr[info[0][4][0]] = host
|
||||
|
||||
if known_node_addresses:
|
||||
while not self.protocol.routing_table.get_peers():
|
||||
success = False
|
||||
# ping the seed nodes, this will set their node ids (since we don't know them ahead of time)
|
||||
for address, port in known_node_addresses:
|
||||
peer = self.protocol.get_rpc_peer(KademliaPeer(self.loop, address, udp_port=port))
|
||||
futs.append(peer.ping())
|
||||
if futs:
|
||||
await asyncio.wait(futs, loop=self.loop)
|
||||
|
||||
try:
|
||||
await peer.ping()
|
||||
success = True
|
||||
except asyncio.TimeoutError:
|
||||
log.warning("seed node (%s:%i) timed out in %s", url_to_addr.get(address, address), port,
|
||||
round(self.protocol.rpc_timeout, 2))
|
||||
if success:
|
||||
break
|
||||
# now that we have the seed nodes in routing, to an iterative lookup of our own id to populate the buckets
|
||||
# in the routing table with good peers who are near us
|
||||
async with self.peer_search_junction(self.protocol.node_id, max_results=16) as junction:
|
||||
async for peers in junction:
|
||||
for peer in peers:
|
||||
|
@ -146,9 +160,10 @@ class Node:
|
|||
await self.protocol.get_rpc_peer(peer).ping()
|
||||
except (asyncio.TimeoutError, RemoteException):
|
||||
pass
|
||||
self.joined.set()
|
||||
|
||||
log.info("Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()),
|
||||
self.protocol.routing_table.buckets_with_contacts())
|
||||
self.joined.set()
|
||||
|
||||
def start(self, interface: str, known_node_urls: typing.List[typing.Tuple[str, int]]):
|
||||
self._join_task = self.loop.create_task(
|
||||
|
|
|
@ -376,8 +376,8 @@ class DHTComponent(Component):
|
|||
async def start(self):
|
||||
log.info("start the dht")
|
||||
self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
|
||||
self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.peer_port)
|
||||
self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.dht_node_port)
|
||||
self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port)
|
||||
self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.udp_port)
|
||||
external_ip = self.upnp_component.external_ip
|
||||
if not external_ip:
|
||||
log.warning("UPnP component failed to get external ip")
|
||||
|
@ -389,13 +389,14 @@ class DHTComponent(Component):
|
|||
asyncio.get_event_loop(),
|
||||
self.component_manager.peer_manager,
|
||||
node_id=self.get_node_id(),
|
||||
internal_udp_port=self.conf.dht_node_port,
|
||||
internal_udp_port=self.conf.udp_port,
|
||||
udp_port=self.external_udp_port,
|
||||
external_ip=external_ip,
|
||||
peer_port=self.external_peer_port
|
||||
peer_port=self.external_peer_port,
|
||||
rpc_timeout=self.conf.node_rpc_timeout
|
||||
)
|
||||
self.dht_node.start(
|
||||
interface='0.0.0.0', known_node_urls=self.conf.known_dht_nodes
|
||||
interface=self.conf.network_interface, known_node_urls=self.conf.known_dht_nodes
|
||||
)
|
||||
log.info("Started the dht")
|
||||
|
||||
|
@ -419,7 +420,7 @@ class HashAnnouncerComponent(Component):
|
|||
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||
dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
||||
self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage)
|
||||
self.hash_announcer.start(self.conf.concurrent_announcers)
|
||||
self.hash_announcer.start(self.conf.concurrent_blob_announcers)
|
||||
log.info("Started blob announcer")
|
||||
|
||||
async def stop(self):
|
||||
|
@ -492,10 +493,10 @@ class PeerProtocolServerComponent(Component):
|
|||
upnp = self.component_manager.get_component(UPNP_COMPONENT)
|
||||
blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||
wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT)
|
||||
peer_port = upnp.upnp_redirects.get("TCP", self.conf.peer_port)
|
||||
peer_port = upnp.upnp_redirects.get("TCP", self.conf.tcp_port)
|
||||
address = await wallet.get_unused_address()
|
||||
self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address)
|
||||
self.blob_server.start_server(peer_port, interface='0.0.0.0')
|
||||
self.blob_server.start_server(peer_port, interface=self.conf.network_interface)
|
||||
await self.blob_server.started_listening.wait()
|
||||
|
||||
async def stop(self):
|
||||
|
@ -508,8 +509,8 @@ class UPnPComponent(Component):
|
|||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self._int_peer_port = self.conf.peer_port
|
||||
self._int_dht_node_port = self.conf.dht_node_port
|
||||
self._int_peer_port = self.conf.tcp_port
|
||||
self._int_dht_node_port = self.conf.udp_port
|
||||
self.use_upnp = self.conf.use_upnp
|
||||
self.upnp = None
|
||||
self.upnp_redirects = {}
|
||||
|
|
|
@ -1551,8 +1551,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
}
|
||||
"""
|
||||
|
||||
timeout = timeout if timeout is not None else self.conf.download_timeout
|
||||
|
||||
parsed_uri = parse_lbry_uri(uri)
|
||||
if parsed_uri.is_channel:
|
||||
raise Exception("cannot download a channel claim, specify a /path")
|
||||
|
@ -1584,7 +1582,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
stream = existing[0]
|
||||
else:
|
||||
stream = await self.stream_manager.download_stream_from_claim(
|
||||
self.dht_node, self.conf.download_dir, resolved, file_name, timeout, fee_amount, fee_address
|
||||
self.dht_node, self.conf, resolved, file_name, timeout, fee_amount, fee_address
|
||||
)
|
||||
if stream:
|
||||
return stream.as_dict()
|
||||
|
|
|
@ -275,7 +275,7 @@ class SQLiteStorage(SQLiteMixin):
|
|||
def get_blobs_to_announce(self):
|
||||
def get_and_update(transaction):
|
||||
timestamp = self.loop.time()
|
||||
if self.conf.announce_head_blobs_only:
|
||||
if self.conf.announce_head_and_sd_only:
|
||||
r = transaction.execute(
|
||||
"select blob_hash from blob "
|
||||
"where blob_hash is not null and "
|
||||
|
@ -694,5 +694,5 @@ class SQLiteStorage(SQLiteMixin):
|
|||
"select s.sd_hash from stream s "
|
||||
"left outer join reflected_stream r on s.sd_hash=r.sd_hash "
|
||||
"where r.timestamp is null or r.timestamp < ?",
|
||||
self.loop.time() - self.conf.auto_re_reflect_interval
|
||||
self.loop.time() - 86400
|
||||
)
|
||||
|
|
|
@ -83,7 +83,7 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t
|
|||
else:
|
||||
log.info("downloader idle...")
|
||||
for peer in to_add:
|
||||
if len(self.running_download_requests) >= 8:
|
||||
if len(self.running_download_requests) >= self.max_connections_per_stream:
|
||||
break
|
||||
task = self.loop.create_task(self._request_blob(peer))
|
||||
self.requested_from[self.current_blob.blob_hash][peer] = task
|
||||
|
|
|
@ -9,6 +9,7 @@ from lbrynet.stream.managed_stream import ManagedStream
|
|||
from lbrynet.schema.claim import ClaimDict
|
||||
from lbrynet.extras.daemon.storage import StoredStreamClaim, lbc_to_dewies
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
from lbrynet.dht.peer import KademliaPeer
|
||||
from lbrynet.dht.node import Node
|
||||
|
@ -166,16 +167,14 @@ class StreamManager:
|
|||
)
|
||||
|
||||
async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict,
|
||||
file_name: typing.Optional[str] = None, data_rate: typing.Optional[int] = 0,
|
||||
sd_blob_timeout: typing.Optional[float] = 60
|
||||
) -> typing.Optional[ManagedStream]:
|
||||
file_name: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
|
||||
|
||||
claim = ClaimDict.load_dict(claim_info['value'])
|
||||
downloader = StreamDownloader(self.loop, self.blob_manager, claim.source_hash.decode(), self.peer_timeout,
|
||||
self.peer_connect_timeout, download_directory, file_name, self.fixed_peers)
|
||||
try:
|
||||
downloader.download(node)
|
||||
await asyncio.wait_for(downloader.got_descriptor.wait(), sd_blob_timeout)
|
||||
await downloader.got_descriptor.wait()
|
||||
log.info("got descriptor %s for %s", claim.source_hash.decode(), claim_info['name'])
|
||||
except (asyncio.TimeoutError, asyncio.CancelledError):
|
||||
log.info("stream timeout")
|
||||
|
@ -187,7 +186,7 @@ class StreamManager:
|
|||
if not await self.blob_manager.storage.file_exists(downloader.sd_hash):
|
||||
await self.blob_manager.storage.save_downloaded_file(
|
||||
downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory,
|
||||
data_rate
|
||||
0.0
|
||||
)
|
||||
await self.blob_manager.storage.save_content_claim(
|
||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
||||
|
@ -210,9 +209,9 @@ class StreamManager:
|
|||
except asyncio.CancelledError:
|
||||
await downloader.stop()
|
||||
|
||||
async def download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict,
|
||||
async def download_stream_from_claim(self, node: 'Node', config: 'Config', claim_info: typing.Dict,
|
||||
file_name: typing.Optional[str] = None,
|
||||
sd_blob_timeout: typing.Optional[float] = 60,
|
||||
timeout: typing.Optional[float] = 60,
|
||||
fee_amount: typing.Optional[float] = 0.0,
|
||||
fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
|
||||
log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id'])
|
||||
|
@ -229,10 +228,10 @@ class StreamManager:
|
|||
|
||||
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
|
||||
stream_task = self.loop.create_task(
|
||||
self._download_stream_from_claim(node, download_directory, claim_info, file_name, 0, sd_blob_timeout)
|
||||
self._download_stream_from_claim(node, config.download_dir, claim_info, file_name)
|
||||
)
|
||||
try:
|
||||
await asyncio.wait_for(stream_task, sd_blob_timeout)
|
||||
await asyncio.wait_for(stream_task, timeout or config.download_timeout)
|
||||
stream = await stream_task
|
||||
self.starting_streams[sd_hash].set_result(stream)
|
||||
if fee_address and fee_amount:
|
||||
|
|
|
@ -41,7 +41,7 @@ class CommandTestCase(IntegrationTestCase):
|
|||
conf.download_dir = self.wallet_node.data_path
|
||||
conf.share_usage_data = False
|
||||
conf.use_upnp = False
|
||||
conf.reflect_uploads = False
|
||||
conf.reflect_streams = False
|
||||
conf.blockchain_name = 'lbrycrd_regtest'
|
||||
conf.lbryum_servers = [('localhost', 50001)]
|
||||
conf.known_dht_nodes = []
|
||||
|
|
Loading…
Reference in a new issue