From 4a11cf007f50de03fbd4461158b1c7bbe96c7208 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sun, 27 Jan 2019 19:26:18 -0500 Subject: [PATCH 1/4] remove unused settings and constants --- lbrynet/conf.py | 38 ------------------------ lbrynet/extras/daemon/analytics.py | 5 +++- tests/unit/lbrynet_daemon/test_Daemon.py | 17 ----------- 3 files changed, 4 insertions(+), 56 deletions(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 8c1e16396..c220b3e8d 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -16,29 +16,11 @@ log = logging.getLogger(__name__) NOT_SET = type(str('NOT_SET'), (object,), {}) T = typing.TypeVar('T') -KB = 2 ** 10 -MB = 2 ** 20 - -ANALYTICS_ENDPOINT = 'https://api.segment.io/v1' -ANALYTICS_TOKEN = 'Ax5LZzR1o3q3Z3WjATASDwR5rKyHH0qOIRIbLmMXn2H=' -API_ADDRESS = 'lbryapi' -APP_NAME = 'LBRY' -BLOBFILES_DIR = 'blobfiles' -CRYPTSD_FILE_EXTENSION = '.cryptsd' CURRENCIES = { 'BTC': {'type': 'crypto'}, 'LBC': {'type': 'crypto'}, 'USD': {'type': 'fiat'}, } -ICON_PATH = 'icons' if 'win' in sys.platform else 'app.icns' -LOG_FILE_NAME = 'lbrynet.log' -LOG_POST_URL = 'https://lbry.io/log-upload' -MAX_BLOB_REQUEST_SIZE = 64 * KB -MAX_HANDSHAKE_SIZE = 64 * KB -MAX_REQUEST_SIZE = 64 * KB -MAX_RESPONSE_INFO_SIZE = 64 * KB -MAX_BLOB_INFOS_TO_REQUEST = 20 -PROTOCOL_PREFIX = 'lbry' SLACK_WEBHOOK = ( 'nUE0pUZ6Yl9bo29epl5moTSwnl5wo20ip2IlqzywMKZiIQSFZR5' 'AHx4mY0VmF0WQZ1ESEP9kMHZlp1WzJwWOoKN3ImR1M2yUAaMyqGZ=' @@ -471,7 +453,6 @@ class CLIConfig(BaseConfig): class Config(CLIConfig): - data_dir = Path("Directory path to store blobs.", metavar='DIR') download_dir = Path( "Directory path to place assembled files downloaded from LBRY.", @@ -486,20 +467,12 @@ class Config(CLIConfig): "Whether to share usage stats and diagnostic info with LBRY.", True, previous_names=['upload_log', 'upload_log', 'share_debug_info'] ) - - # claims set to expire within this many blocks will be - # automatically renewed after startup (if set to 0, renews - # will not be made automatically) - auto_renew_claim_height_delta = Integer("", 0) cache_time = Integer("", 150) - data_rate = Float("points/megabyte", .0001) - delete_blobs_on_remove = Toggle("", True) dht_node_port = Integer("", 4444) download_timeout = Float("", 30.0) blob_download_timeout = Float("", 20.0) peer_connect_timeout = Float("", 3.0) node_rpc_timeout = Float("", constants.rpc_timeout) - is_generous_host = Toggle("", True) announce_head_blobs_only = Toggle("", True) concurrent_announcers = Integer("", 10) known_dht_nodes = Servers("", [ @@ -509,26 +482,15 @@ class Config(CLIConfig): ('lbrynet4.lbry.io', 4444) # ASIA ]) max_connections_per_stream = Integer("", 5) - seek_head_blob_first = Toggle("", True) max_key_fee = MaxKeyFee("", {'currency': 'USD', 'amount': 50.0}) - min_info_rate = Float("points/1000 infos", .02) - min_valuable_hash_rate = Float("points/1000 infos", .05) - min_valuable_info_rate = Float("points/1000 infos", .05) peer_port = Integer("", 3333) - pointtrader_server = String("", 'http://127.0.0.1:2424') - reflector_port = Integer("", 5566) # if reflect_uploads is True, send files to reflector after publishing (as well as a periodic check in the # event the initial upload failed or was disconnected part way through, provided the auto_re_reflect_interval > 0) reflect_uploads = Toggle("", True) - auto_re_reflect_interval = Integer("set to 0 to disable", 86400) reflector_servers = Servers("", [ ('reflector.lbry.io', 5566) ]) - run_reflector_server = Toggle("adds reflector to components_to_skip unless True", False) - sd_download_timeout = Integer("", 3) - peer_search_timeout = Integer("", 60) use_upnp = Toggle("", True) - use_keyring = Toggle("", False) blockchain_name = String("", 'lbrycrd_main') lbryum_servers = Servers("", [ ('lbryumx1.lbry.io', 50001), diff --git a/lbrynet/extras/daemon/analytics.py b/lbrynet/extras/daemon/analytics.py index 1e9c607d8..3ac005b6a 100644 --- a/lbrynet/extras/daemon/analytics.py +++ b/lbrynet/extras/daemon/analytics.py @@ -5,9 +5,12 @@ import logging import aiohttp from lbrynet import utils -from lbrynet.conf import Config, ANALYTICS_ENDPOINT, ANALYTICS_TOKEN +from lbrynet.conf import Config from lbrynet.extras import system_info +ANALYTICS_ENDPOINT = 'https://api.segment.io/v1' +ANALYTICS_TOKEN = 'Ax5LZzR1o3q3Z3WjATASDwR5rKyHH0qOIRIbLmMXn2H=' + # Things We Track SERVER_STARTUP = 'Server Startup' SERVER_STARTUP_SUCCESS = 'Server Startup Success' diff --git a/tests/unit/lbrynet_daemon/test_Daemon.py b/tests/unit/lbrynet_daemon/test_Daemon.py index adb47c262..2a2918ffd 100644 --- a/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/tests/unit/lbrynet_daemon/test_Daemon.py @@ -84,15 +84,6 @@ class TestCostEst(unittest.TestCase): result = yield f2d(daemon.get_est_cost("test", size)) self.assertEqual(result, correct_result) - def test_fee_and_ungenerous_data(self): - conf = Config(is_generous_host=False) - size = 10000000 - fake_fee_amount = 4.5 - correct_result = size / 10 ** 6 * conf.data_rate + fake_fee_amount - daemon = get_test_daemon(conf, with_fee=True) - result = yield f2d(daemon.get_est_cost("test", size)) - self.assertEqual(result, round(correct_result, 1)) - def test_generous_data_and_no_fee(self): size = 10000000 correct_result = 0.0 @@ -100,14 +91,6 @@ class TestCostEst(unittest.TestCase): result = yield f2d(daemon.get_est_cost("test", size)) self.assertEqual(result, correct_result) - def test_ungenerous_data_and_no_fee(self): - conf = Config(is_generous_host=False) - size = 10000000 - correct_result = size / 10 ** 6 * conf.data_rate - daemon = get_test_daemon(conf) - result = yield f2d(daemon.get_est_cost("test", size)) - self.assertEqual(result, round(correct_result, 1)) - @unittest.SkipTest class TestJsonRpc(unittest.TestCase): From 778d3826ab1c0abac66999518d40ada6645d4ddc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 28 Jan 2019 09:51:02 -0500 Subject: [PATCH 2/4] clean up settings and use them --- lbrynet/blob_exchange/downloader.py | 6 +- lbrynet/conf.py | 83 +++++++++++++++-------- lbrynet/dht/node.py | 45 ++++++++---- lbrynet/extras/daemon/Components.py | 21 +++--- lbrynet/extras/daemon/Daemon.py | 4 +- lbrynet/extras/daemon/storage.py | 4 +- lbrynet/stream/downloader.py | 2 +- lbrynet/stream/stream_manager.py | 17 +++-- tests/integration/wallet/test_commands.py | 2 +- 9 files changed, 113 insertions(+), 71 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 53f458b5e..5316e70ec 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -31,9 +31,9 @@ class BlobDownloader: # TODO: refactor to be the base class used by StreamDownl self.blob: 'BlobFile' = None self.blob_queue = asyncio.Queue(loop=self.loop) - self.blob_download_timeout = config.get('blob_download_timeout') - self.peer_connect_timeout = config.get('peer_connect_timeout') - self.max_connections = config.get('max_connections_per_stream') + self.blob_download_timeout = config.blob_download_timeout + self.peer_connect_timeout = config.peer_connect_timeout + self.max_connections = config.max_connections_per_download async def _request_blob(self, peer: 'KademliaPeer'): if self.blob.get_is_verified(): diff --git a/lbrynet/conf.py b/lbrynet/conf.py index c220b3e8d..68ce3dc8c 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -453,6 +453,7 @@ class CLIConfig(BaseConfig): class Config(CLIConfig): + # directories data_dir = Path("Directory path to store blobs.", metavar='DIR') download_dir = Path( "Directory path to place assembled files downloaded from LBRY.", @@ -463,41 +464,69 @@ class Config(CLIConfig): previous_names=['lbryum_wallet_dir'], metavar='DIR' ) - share_usage_data = Toggle( - "Whether to share usage stats and diagnostic info with LBRY.", True, - previous_names=['upload_log', 'upload_log', 'share_debug_info'] + # network + use_upnp = Toggle( + "Use UPnP to setup temporary port redirects for the DHT and the hosting of blobs. If you manually forward" + "ports or have firewall rules you likely want to disable this.", True ) - cache_time = Integer("", 150) - dht_node_port = Integer("", 4444) - download_timeout = Float("", 30.0) - blob_download_timeout = Float("", 20.0) - peer_connect_timeout = Float("", 3.0) - node_rpc_timeout = Float("", constants.rpc_timeout) - announce_head_blobs_only = Toggle("", True) - concurrent_announcers = Integer("", 10) - known_dht_nodes = Servers("", [ + udp_port = Integer("UDP port for communicating on the LBRY DHT", 4444, previous_names=['dht_node_port']) + tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port']) + network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0') + + # protocol timeouts + download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) + blob_download_timeout = Float("Timeout to download a blob from a peer", 20.0) + peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) + node_rpc_timeout = Float("Timeout when making a DHT request", constants.rpc_timeout) + + # blob announcement and download + announce_head_and_sd_only = Toggle( + "Announce only the descriptor and first (rather than all) data blob for a stream to the DHT", True, + previous_names=['announce_head_blobs_only'] + ) + concurrent_blob_announcers = Integer( + "Number of blobs to iteratively announce at once", 10, previous_names=['concurrent_announcers'] + ) + max_connections_per_download = Integer( + "Maximum number of peers to connect to while downloading a blob", 5, + previous_names=['max_connections_per_stream'] + ) + max_key_fee = MaxKeyFee( + "Don't download streams with fees exceeding this amount", {'currency': 'USD', 'amount': 50.0} + ) # TODO: use this + + # reflector settings + reflect_streams = Toggle( + "Upload completed streams (published and downloaded) reflector in order to re-host them", True, + previous_names=['reflect_uploads'] + ) + + # servers + reflector_servers = Servers("Reflector re-hosting servers", [ + ('reflector.lbry.io', 5566) + ]) + lbryum_servers = Servers("SPV wallet servers", [ + ('lbryumx1.lbry.io', 50001), + ('lbryumx2.lbry.io', 50001) + ]) + known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ ('lbrynet1.lbry.io', 4444), # US EAST ('lbrynet2.lbry.io', 4444), # US WEST ('lbrynet3.lbry.io', 4444), # EU ('lbrynet4.lbry.io', 4444) # ASIA ]) - max_connections_per_stream = Integer("", 5) - max_key_fee = MaxKeyFee("", {'currency': 'USD', 'amount': 50.0}) - peer_port = Integer("", 3333) - # if reflect_uploads is True, send files to reflector after publishing (as well as a periodic check in the - # event the initial upload failed or was disconnected part way through, provided the auto_re_reflect_interval > 0) - reflect_uploads = Toggle("", True) - reflector_servers = Servers("", [ - ('reflector.lbry.io', 5566) - ]) - use_upnp = Toggle("", True) - blockchain_name = String("", 'lbrycrd_main') - lbryum_servers = Servers("", [ - ('lbryumx1.lbry.io', 50001), - ('lbryumx2.lbry.io', 50001) - ]) + + # blockchain + blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main') s3_headers_depth = Integer("download headers from s3 when the local height is more than 10 chunks behind", 96 * 10) + cache_time = Integer("Time to cache resolved claims", 150) # TODO: use this + + # daemon components_to_skip = Strings("components which will be skipped during start-up of daemon", []) + share_usage_data = Toggle( + "Whether to share usage stats and diagnostic info with LBRY.", True, + previous_names=['upload_log', 'upload_log', 'share_debug_info'] + ) def __init__(self, **kwargs): super().__init__(**kwargs) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c8524617d..d4f773ae9 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -20,10 +20,10 @@ log = logging.getLogger(__name__) class Node: def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, - internal_udp_port: int, peer_port: int, external_ip: str): + internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: typing.Optional[float] = 5.0): self.loop = loop self.internal_udp_port = internal_udp_port - self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port) + self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout) self.listening_port: asyncio.DatagramTransport = None self.joined = asyncio.Event(loop=self.loop) self._join_task: asyncio.Task = None @@ -123,7 +123,10 @@ class Node: self.protocol.ping_queue.start() self._refresh_task = self.loop.create_task(self.refresh_node()) + # resolve the known node urls known_node_addresses = known_node_addresses or [] + url_to_addr = {} + if known_node_urls: for host, port in known_node_urls: info = await self.loop.getaddrinfo( @@ -132,23 +135,35 @@ class Node: ) if (info[0][4][0], port) not in known_node_addresses: known_node_addresses.append((info[0][4][0], port)) - futs = [] - for address, port in known_node_addresses: - peer = self.protocol.get_rpc_peer(KademliaPeer(self.loop, address, udp_port=port)) - futs.append(peer.ping()) - if futs: - await asyncio.wait(futs, loop=self.loop) + url_to_addr[info[0][4][0]] = host - async with self.peer_search_junction(self.protocol.node_id, max_results=16) as junction: - async for peers in junction: - for peer in peers: + if known_node_addresses: + while not self.protocol.routing_table.get_peers(): + success = False + # ping the seed nodes, this will set their node ids (since we don't know them ahead of time) + for address, port in known_node_addresses: + peer = self.protocol.get_rpc_peer(KademliaPeer(self.loop, address, udp_port=port)) try: - await self.protocol.get_rpc_peer(peer).ping() - except (asyncio.TimeoutError, RemoteException): - pass - self.joined.set() + await peer.ping() + success = True + except asyncio.TimeoutError: + log.warning("seed node (%s:%i) timed out in %s", url_to_addr.get(address, address), port, + round(self.protocol.rpc_timeout, 2)) + if success: + break + # now that we have the seed nodes in routing, to an iterative lookup of our own id to populate the buckets + # in the routing table with good peers who are near us + async with self.peer_search_junction(self.protocol.node_id, max_results=16) as junction: + async for peers in junction: + for peer in peers: + try: + await self.protocol.get_rpc_peer(peer).ping() + except (asyncio.TimeoutError, RemoteException): + pass + log.info("Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), self.protocol.routing_table.buckets_with_contacts()) + self.joined.set() def start(self, interface: str, known_node_urls: typing.List[typing.Tuple[str, int]]): self._join_task = self.loop.create_task( diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index c1afdc65b..fb850c2dd 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -376,8 +376,8 @@ class DHTComponent(Component): async def start(self): log.info("start the dht") self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) - self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.peer_port) - self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.dht_node_port) + self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port) + self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.udp_port) external_ip = self.upnp_component.external_ip if not external_ip: log.warning("UPnP component failed to get external ip") @@ -389,13 +389,14 @@ class DHTComponent(Component): asyncio.get_event_loop(), self.component_manager.peer_manager, node_id=self.get_node_id(), - internal_udp_port=self.conf.dht_node_port, + internal_udp_port=self.conf.udp_port, udp_port=self.external_udp_port, external_ip=external_ip, - peer_port=self.external_peer_port + peer_port=self.external_peer_port, + rpc_timeout=self.conf.node_rpc_timeout ) self.dht_node.start( - interface='0.0.0.0', known_node_urls=self.conf.known_dht_nodes + interface=self.conf.network_interface, known_node_urls=self.conf.known_dht_nodes ) log.info("Started the dht") @@ -419,7 +420,7 @@ class HashAnnouncerComponent(Component): storage = self.component_manager.get_component(DATABASE_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT) self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage) - self.hash_announcer.start(self.conf.concurrent_announcers) + self.hash_announcer.start(self.conf.concurrent_blob_announcers) log.info("Started blob announcer") async def stop(self): @@ -492,10 +493,10 @@ class PeerProtocolServerComponent(Component): upnp = self.component_manager.get_component(UPNP_COMPONENT) blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT) wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT) - peer_port = upnp.upnp_redirects.get("TCP", self.conf.peer_port) + peer_port = upnp.upnp_redirects.get("TCP", self.conf.tcp_port) address = await wallet.get_unused_address() self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address) - self.blob_server.start_server(peer_port, interface='0.0.0.0') + self.blob_server.start_server(peer_port, interface=self.conf.network_interface) await self.blob_server.started_listening.wait() async def stop(self): @@ -508,8 +509,8 @@ class UPnPComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self._int_peer_port = self.conf.peer_port - self._int_dht_node_port = self.conf.dht_node_port + self._int_peer_port = self.conf.tcp_port + self._int_dht_node_port = self.conf.udp_port self.use_upnp = self.conf.use_upnp self.upnp = None self.upnp_redirects = {} diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index d736a44bc..ce2e3ef03 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1551,8 +1551,6 @@ class Daemon(metaclass=JSONRPCServerType): } """ - timeout = timeout if timeout is not None else self.conf.download_timeout - parsed_uri = parse_lbry_uri(uri) if parsed_uri.is_channel: raise Exception("cannot download a channel claim, specify a /path") @@ -1584,7 +1582,7 @@ class Daemon(metaclass=JSONRPCServerType): stream = existing[0] else: stream = await self.stream_manager.download_stream_from_claim( - self.dht_node, self.conf.download_dir, resolved, file_name, timeout, fee_amount, fee_address + self.dht_node, self.conf, resolved, file_name, timeout, fee_amount, fee_address ) if stream: return stream.as_dict() diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 48586f321..c6092d89f 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -275,7 +275,7 @@ class SQLiteStorage(SQLiteMixin): def get_blobs_to_announce(self): def get_and_update(transaction): timestamp = self.loop.time() - if self.conf.announce_head_blobs_only: + if self.conf.announce_head_and_sd_only: r = transaction.execute( "select blob_hash from blob " "where blob_hash is not null and " @@ -694,5 +694,5 @@ class SQLiteStorage(SQLiteMixin): "select s.sd_hash from stream s " "left outer join reflected_stream r on s.sd_hash=r.sd_hash " "where r.timestamp is null or r.timestamp < ?", - self.loop.time() - self.conf.auto_re_reflect_interval + self.loop.time() - 86400 ) diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index 4e8eabd19..18b1a8804 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -83,7 +83,7 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t else: log.info("downloader idle...") for peer in to_add: - if len(self.running_download_requests) >= 8: + if len(self.running_download_requests) >= self.max_connections_per_stream: break task = self.loop.create_task(self._request_blob(peer)) self.requested_from[self.current_blob.blob_hash][peer] = task diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 7f584f7ab..1f4395d23 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -9,6 +9,7 @@ from lbrynet.stream.managed_stream import ManagedStream from lbrynet.schema.claim import ClaimDict from lbrynet.extras.daemon.storage import StoredStreamClaim, lbc_to_dewies if typing.TYPE_CHECKING: + from lbrynet.conf import Config from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.dht.peer import KademliaPeer from lbrynet.dht.node import Node @@ -166,16 +167,14 @@ class StreamManager: ) async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict, - file_name: typing.Optional[str] = None, data_rate: typing.Optional[int] = 0, - sd_blob_timeout: typing.Optional[float] = 60 - ) -> typing.Optional[ManagedStream]: + file_name: typing.Optional[str] = None) -> typing.Optional[ManagedStream]: claim = ClaimDict.load_dict(claim_info['value']) downloader = StreamDownloader(self.loop, self.blob_manager, claim.source_hash.decode(), self.peer_timeout, self.peer_connect_timeout, download_directory, file_name, self.fixed_peers) try: downloader.download(node) - await asyncio.wait_for(downloader.got_descriptor.wait(), sd_blob_timeout) + await downloader.got_descriptor.wait() log.info("got descriptor %s for %s", claim.source_hash.decode(), claim_info['name']) except (asyncio.TimeoutError, asyncio.CancelledError): log.info("stream timeout") @@ -187,7 +186,7 @@ class StreamManager: if not await self.blob_manager.storage.file_exists(downloader.sd_hash): await self.blob_manager.storage.save_downloaded_file( downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory, - data_rate + 0.0 ) await self.blob_manager.storage.save_content_claim( downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" @@ -210,9 +209,9 @@ class StreamManager: except asyncio.CancelledError: await downloader.stop() - async def download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict, + async def download_stream_from_claim(self, node: 'Node', config: 'Config', claim_info: typing.Dict, file_name: typing.Optional[str] = None, - sd_blob_timeout: typing.Optional[float] = 60, + timeout: typing.Optional[float] = 60, fee_amount: typing.Optional[float] = 0.0, fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]: log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) @@ -229,10 +228,10 @@ class StreamManager: self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop) stream_task = self.loop.create_task( - self._download_stream_from_claim(node, download_directory, claim_info, file_name, 0, sd_blob_timeout) + self._download_stream_from_claim(node, config.download_dir, claim_info, file_name) ) try: - await asyncio.wait_for(stream_task, sd_blob_timeout) + await asyncio.wait_for(stream_task, timeout or config.download_timeout) stream = await stream_task self.starting_streams[sd_hash].set_result(stream) if fee_address and fee_amount: diff --git a/tests/integration/wallet/test_commands.py b/tests/integration/wallet/test_commands.py index 4d3fd10ee..c6ac863c1 100644 --- a/tests/integration/wallet/test_commands.py +++ b/tests/integration/wallet/test_commands.py @@ -41,7 +41,7 @@ class CommandTestCase(IntegrationTestCase): conf.download_dir = self.wallet_node.data_path conf.share_usage_data = False conf.use_upnp = False - conf.reflect_uploads = False + conf.reflect_streams = False conf.blockchain_name = 'lbrycrd_regtest' conf.lbryum_servers = [('localhost', 50001)] conf.known_dht_nodes = [] From 330862e4878793fc6dd36e18d9e4826219f30680 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 28 Jan 2019 16:00:37 -0500 Subject: [PATCH 3/4] fix https://github.com/lbryio/lbry/issues/1297 --- lbrynet/conf.py | 3 ++- lbrynet/dht/blob_announcer.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 68ce3dc8c..0e13577d8 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -485,7 +485,8 @@ class Config(CLIConfig): previous_names=['announce_head_blobs_only'] ) concurrent_blob_announcers = Integer( - "Number of blobs to iteratively announce at once", 10, previous_names=['concurrent_announcers'] + "Number of blobs to iteratively announce at once, set to 0 to disable", 10, + previous_names=['concurrent_announcers'] ) max_connections_per_download = Integer( "Maximum number of peers to connect to while downloading a blob", 5, diff --git a/lbrynet/dht/blob_announcer.py b/lbrynet/dht/blob_announcer.py index 2a209e65e..fa4d5cc06 100644 --- a/lbrynet/dht/blob_announcer.py +++ b/lbrynet/dht/blob_announcer.py @@ -19,6 +19,8 @@ class BlobAnnouncer: self.announce_queue: typing.List[str] = [] async def _announce(self, batch_size: typing.Optional[int] = 10): + if not batch_size: + return if not self.node.joined.is_set(): await self.node.joined.wait() blob_hashes = await self.storage.get_blobs_to_announce() From 71f9f8ae9cc0d8a01a509cf21b48ffbb6493a954 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 28 Jan 2019 16:51:11 -0500 Subject: [PATCH 4/4] fix race condition in reflector server --- lbrynet/stream/reflector/client.py | 3 ++- lbrynet/stream/reflector/server.py | 12 +++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/lbrynet/stream/reflector/client.py b/lbrynet/stream/reflector/client.py index ce4d7528e..908f0608f 100644 --- a/lbrynet/stream/reflector/client.py +++ b/lbrynet/stream/reflector/client.py @@ -15,6 +15,7 @@ log = logging.getLogger(__name__) class StreamReflectorClient(asyncio.Protocol): def __init__(self, blob_manager: 'BlobFileManager', descriptor: 'StreamDescriptor'): + self.loop = asyncio.get_event_loop() self.transport: asyncio.StreamWriter = None self.blob_manager = blob_manager self.descriptor = descriptor @@ -45,7 +46,7 @@ class StreamReflectorClient(asyncio.Protocol): msg = json.dumps(request_dict) self.transport.write(msg.encode()) try: - self.pending_request = asyncio.get_event_loop().create_task(self.response_queue.get()) + self.pending_request = self.loop.create_task(self.response_queue.get()) return await self.pending_request finally: self.pending_request = None diff --git a/lbrynet/stream/reflector/server.py b/lbrynet/stream/reflector/server.py index 3547f0c8a..f403fa329 100644 --- a/lbrynet/stream/reflector/server.py +++ b/lbrynet/stream/reflector/server.py @@ -37,7 +37,8 @@ class ReflectorServerProtocol(asyncio.Protocol): try: self.writer.write(data) except IOError as err: - log.error("error downloading blob: %s", err) + log.error("error receiving blob: %s", err) + self.transport.close() return try: request = json.loads(data.decode()) @@ -67,28 +68,33 @@ class ReflectorServerProtocol(asyncio.Protocol): self.send_response({"send_sd_blob": True}) try: await asyncio.wait_for(self.sd_blob.finished_writing.wait(), 30, loop=self.loop) - self.send_response({"received_sd_blob": True}) self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.loop, self.blob_manager.blob_dir, self.sd_blob ) self.incoming.clear() self.writer.close_handle() self.writer = None + self.send_response({"received_sd_blob": True}) except (asyncio.TimeoutError, asyncio.CancelledError): - self.send_response({"received_sd_blob": False}) self.incoming.clear() self.writer.close_handle() self.writer = None self.transport.close() + self.send_response({"received_sd_blob": False}) return else: self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.loop, self.blob_manager.blob_dir, self.sd_blob ) + self.incoming.clear() + if self.writer: + self.writer.close_handle() + self.writer = None self.send_response({"send_sd_blob": False, 'needed': [ blob.blob_hash for blob in self.descriptor.blobs[:-1] if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified() ]}) + return return elif self.descriptor: if 'blob_hash' not in request: