diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index d82dc79fa..bf1436f16 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -218,7 +218,8 @@ class Node(object): for peer in result[blob_hash]: if self.node_id != peer[6:]: host = ".".join([str(ord(d)) for d in peer[:4]]) - if host == "127.0.0.1" and "from_peer" in result and result["from_peer"] != "self": + if host == "127.0.0.1" and "from_peer" in result \ + and result["from_peer"] != "self": host = result["from_peer"] port, = struct.unpack('>H', peer[4:6]) if (host, port) not in expanded_peers: diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index 1fa2b2c26..02630821b 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -46,6 +46,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker log_format = "%(funcName)s(): %(message)s" logging.basicConfig(level=logging.CRITICAL, format=log_format) + def require_system(system): def wrapper(fn): return fn @@ -115,10 +116,10 @@ class LbryUploader(object): self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir, - lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous) + dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") stream_info_manager = TempEncryptedFileMetadataManager() self.lbry_file_manager = EncryptedFileManager( self.session, stream_info_manager, self.sd_identifier) @@ -218,12 +219,13 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, db_dir, blob_dir = mk_db_and_blob_dir() session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd" + str(n), + node_id="abcd" + str(n), peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") stream_info_manager = TempEncryptedFileMetadataManager() @@ -330,12 +332,13 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero db_dir, blob_dir = mk_db_and_blob_dir() - session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="efgh", + session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") if slow is True: session.rate_limiter.set_ul_limit(2 ** 11) @@ -508,11 +511,11 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=Node, is_generous=self.is_generous) + dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -599,12 +602,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) d2 = self.wait_for_hash_from_queue(blob_hash_queue_2) @@ -678,11 +681,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, @@ -800,11 +804,12 @@ class TestTransfer(TestCase): db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - lbryid="abcd", peer_finder=peer_finder, + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) + is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], + external_ip="127.0.0.1") self.stream_info_manager = TempEncryptedFileMetadataManager() diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index 8e32d451d..d252986a2 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -9,7 +9,6 @@ from lbrynet.core import PeerManager from lbrynet.core import RateLimiter from lbrynet.core import Session from lbrynet.core import StreamDescriptor -from lbrynet.dht.node import Node from lbrynet.lbry_file import EncryptedFileMetadataManager from lbrynet.lbry_file.client import EncryptedFileOptions from lbrynet.file_manager import EncryptedFileCreator @@ -18,6 +17,7 @@ from lbrynet.file_manager import EncryptedFileManager from lbrynet.tests import mocks from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir + class TestReflector(unittest.TestCase): def setUp(self): mocks.mock_conf_settings(self) @@ -57,7 +57,7 @@ class TestReflector(unittest.TestCase): self.session = Session.Session( conf.settings['data_rate'], db_dir=self.db_dir, - lbryid="abcd", + node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=self.blob_dir, @@ -66,7 +66,7 @@ class TestReflector(unittest.TestCase): rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=mocks.BlobAvailabilityTracker, - dht_node_class=Node + external_ip="127.0.0.1" ) self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager( diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index 54b69fc1d..afd3b029c 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -72,12 +72,12 @@ class TestStreamify(TestCase): os.mkdir(blob_dir) self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=self.is_generous + is_generous=self.is_generous, external_ip="127.0.0.1" ) self.stream_info_manager = TempEncryptedFileMetadataManager() @@ -128,11 +128,11 @@ class TestStreamify(TestCase): os.mkdir(blob_dir) self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker + blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1" ) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) diff --git a/lbrynet/tests/integration/test_integration.py b/lbrynet/tests/integration/test_integration.py index 521d93844..2036a9730 100644 --- a/lbrynet/tests/integration/test_integration.py +++ b/lbrynet/tests/integration/test_integration.py @@ -18,6 +18,7 @@ def shell_command(command): FNULL = open(os.devnull, 'w') p = subprocess.Popen(command,shell=False,stdout=FNULL,stderr=subprocess.STDOUT) + def lbrynet_cli(commands): cli_cmd=['lbrynet-cli'] for cmd in commands: @@ -65,7 +66,6 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertTrue(out['is_running']) - def test_cli_docopts(self): out,err = lbrynet_cli(['cli_test_command']) self.assertEqual('',out) @@ -83,7 +83,6 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertEqual([1,[],1,None,False,False], out) - out,err = lbrynet_cli(['cli_test_command','1', '--pos_arg2=2','--pos_arg3=3']) out = json.loads(out) self.assertEqual([1,[],2,3,False,False], out) @@ -93,7 +92,6 @@ class TestIntegration(unittest.TestCase): # TODO: variable length arguments don't have guess_type() on them self.assertEqual([1,['2','3'],None,None,False,False], out) - out,err = lbrynet_cli(['cli_test_command','1','-a']) out = json.loads(out) self.assertEqual([1,[],None,None,True,False], out) @@ -102,13 +100,10 @@ class TestIntegration(unittest.TestCase): out = json.loads(out) self.assertEqual([1,[],None,None,True,False], out) - out,err = lbrynet_cli(['cli_test_command','1','-a','-b']) out = json.loads(out) self.assertEqual([1,[],None,None,True,True], out) - - def test_status(self): out = lbrynet.status() self.assertTrue(out['is_running']) diff --git a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py index 0fffb7b4a..a285cbfa8 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py @@ -15,6 +15,10 @@ from lbrynet.tests.mocks import BlobAvailabilityTracker as DummyBlobAvailability from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed +import logging +logging.getLogger("lbryum").setLevel(logging.WARNING) + + def get_test_daemon(data_rate=None, generous=True, with_fee=False): if data_rate is None: data_rate = conf.ADJUSTABLE_SETTINGS['data_rate'][1] @@ -68,7 +72,6 @@ class TestCostEst(unittest.TestCase): size = 10000000 correct_result = 4.5 daemon = get_test_daemon(generous=True, with_fee=True) - print daemon.get_est_cost("test", size) self.assertEquals(daemon.get_est_cost("test", size).result, correct_result) def test_fee_and_ungenerous_data(self): diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py new file mode 100644 index 000000000..70a93fea7 --- /dev/null +++ b/scripts/dht_monitor.py @@ -0,0 +1,103 @@ +import curses +import time +from jsonrpc.proxy import JSONRPCProxy +import logging + +log = logging.getLogger(__name__) +log.addHandler(logging.FileHandler("dht contacts.log")) +# log.addHandler(logging.StreamHandler()) +log.setLevel(logging.INFO) +stdscr = curses.initscr() + +api = JSONRPCProxy.from_url("http://localhost:5280") + + +def init_curses(): + curses.noecho() + curses.cbreak() + stdscr.nodelay(1) + stdscr.keypad(1) + + +def teardown_curses(): + curses.nocbreak() + stdscr.keypad(0) + curses.echo() + curses.endwin() + + +def refresh(last_contacts, last_blobs): + height, width = stdscr.getmaxyx() + + try: + routing_table_info = api.routing_table_get() + node_id = routing_table_info['node id'] + except: + node_id = "UNKNOWN" + routing_table_info = { + 'buckets': {}, + 'contacts': [], + 'blob hashes': [] + } + for y in range(height): + stdscr.addstr(y, 0, " " * (width - 1)) + + buckets = routing_table_info['buckets'] + stdscr.addstr(0, 0, "node id: %s" % node_id) + stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" % + (len(buckets), len(routing_table_info['contacts']), + len(routing_table_info['blob hashes']))) + + y = 3 + for i in sorted(buckets.keys()): + stdscr.addstr(y, 0, "bucket %s" % i) + y += 1 + for h in sorted(buckets[i], key=lambda x: x['id'].decode('hex')): + stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['id'], h['address'], len(h['blobs']))) + y += 1 + y += 1 + + new_contacts = set(routing_table_info['contacts']) - last_contacts + lost_contacts = last_contacts - set(routing_table_info['contacts']) + + if new_contacts: + for c in new_contacts: + log.debug("added contact %s", c) + if lost_contacts: + for c in lost_contacts: + log.info("lost contact %s", c) + + new_blobs = set(routing_table_info['blob hashes']) - last_blobs + lost_blobs = last_blobs - set(routing_table_info['blob hashes']) + + if new_blobs: + for c in new_blobs: + log.debug("added blob %s", c) + if lost_blobs: + for c in lost_blobs: + log.info("lost blob %s", c) + + stdscr.addstr(y + 1, 0, str(time.time())) + stdscr.refresh() + return set(routing_table_info['contacts']), set(routing_table_info['blob hashes']) + + +def do_main(): + c = None + last_contacts, last_blobs = set(), set() + while c not in [ord('q'), ord('Q')]: + last_contacts, last_blobs = refresh(last_contacts, last_blobs) + c = stdscr.getch() + time.sleep(0.1) + + +def main(): + try: + init_curses() + do_main() + finally: + teardown_curses() + + +if __name__ == "__main__": + main() diff --git a/scripts/dht_scripts.py b/scripts/dht_scripts.py index 657a5d7e0..b3a5cafe0 100644 --- a/scripts/dht_scripts.py +++ b/scripts/dht_scripts.py @@ -22,7 +22,7 @@ def join_network(udp_port, known_nodes): lbryid = generate_id() log.info('Creating node') - node = Node(udpPort=udp_port, lbryid=lbryid) + node = Node(udpPort=udp_port, node_id=lbryid) log.info('Joining network') yield node.joinNetwork(known_nodes) diff --git a/scripts/dhttest.py b/scripts/dhttest.py index a188030dc..fe0a0af7f 100644 --- a/scripts/dhttest.py +++ b/scripts/dhttest.py @@ -150,7 +150,7 @@ if __name__ == '__main__': # If you wish to have a pure Kademlia network, use the # entangled.kademlia.node.Node class instead print 'Creating Node' - node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid) + node = Node(udpPort=int(sys.argv[1]), node_id=lbryid) # Schedule the node to join the Kademlia/Entangled DHT node.joinNetwork(knownNodes) diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py index 39e1f406f..c2b08f944 100644 --- a/scripts/query_available_blobs.py +++ b/scripts/query_available_blobs.py @@ -51,7 +51,7 @@ def main(args=None): session = Session.Session( 0, db_dir=db_dir, - lbryid=utils.generate_id(), + node_id=utils.generate_id(), blob_dir=blob_dir, dht_node_port=4444, known_dht_nodes=conf.settings['known_dht_nodes'], diff --git a/scripts/rpc_node.py b/scripts/rpc_node.py index ced4fc6e8..40d69b8e7 100644 --- a/scripts/rpc_node.py +++ b/scripts/rpc_node.py @@ -1,82 +1,214 @@ -#!/usr/bin/env python -# -# This library is free software, distributed under the terms of -# the GNU Lesser General Public License Version 3, or any later version. -# See the COPYING file included in this archive -# - -# Thanks to Paul Cannon for IP-address resolution functions (taken from aspn.activestate.com) - - -""" -Launch a DHT node which can respond to RPC commands. -""" - +import logging +import requests +import miniupnpc import argparse -from lbrynet.dht.node import Node -from txjsonrpc.web import jsonrpc -from twisted.web import server +from copy import deepcopy from twisted.internet import reactor, defer +from twisted.web import resource +from twisted.web.server import Site + +from lbrynet import conf +from lbrynet.core.log_support import configure_console +from lbrynet.dht.error import TimeoutError +conf.initialize_settings() + +log = logging.getLogger("dht tool") +configure_console() +log.setLevel(logging.INFO) + +from lbrynet.dht.node import Node +from lbrynet.dht.contact import Contact +from lbrynet.daemon.auth.server import AuthJSONRPCServer +from lbrynet.core.utils import generate_id + +def get_external_ip_and_setup_upnp(): + try: + u = miniupnpc.UPnP() + u.discoverdelay = 200 + u.discover() + u.selectigd() + + if u.getspecificportmapping(4444, "UDP"): + u.deleteportmapping(4444, "UDP") + log.info("Removed UPnP redirect for UDP 4444.") + u.addportmapping(4444, 'UDP', u.lanaddr, 4444, 'LBRY DHT port', '') + log.info("got external ip from upnp") + return u.externalipaddress() + except Exception: + log.exception("derp") + r = requests.get('https://api.ipify.org', {'format': 'json'}) + log.info("got external ip from ipify.org") + return r.json()['ip'] -class RPCNode(jsonrpc.JSONRPC): - def __init__(self, node, shut_down_cb): - jsonrpc.JSONRPC.__init__(self) - self.node = node - self.shut_down_cb = shut_down_cb +class NodeRPC(AuthJSONRPCServer): + def __init__(self, lbryid, seeds, node_port, rpc_port): + AuthJSONRPCServer.__init__(self, False) + self.root = None + self.port = None + self.seeds = seeds + self.node_port = node_port + self.rpc_port = rpc_port + if lbryid: + lbryid = lbryid.decode('hex') + else: + lbryid = generate_id() + self.node_id = lbryid + self.external_ip = get_external_ip_and_setup_upnp() + self.node_port = node_port - def jsonrpc_total_dht_nodes(self): - return self.node.getApproximateTotalDHTNodes() + @defer.inlineCallbacks + def setup(self): + self.node = Node(node_id=self.node_id, udpPort=self.node_port, + externalIP=self.external_ip) + hosts = [] + for hostname, hostport in self.seeds: + host_ip = yield reactor.resolve(hostname) + hosts.append((host_ip, hostport)) + log.info("connecting to dht") + yield self.node.joinNetwork(tuple(hosts)) + log.info("connected to dht") + if not self.announced_startup: + self.announced_startup = True + self.start_api() + log.info("lbry id: %s (%i bytes)", self.node.node_id.encode('hex'), len(self.node.node_id)) - def jsonrpc_total_dht_hashes(self): - return self.node.getApproximateTotalHashes() + def start_api(self): + root = resource.Resource() + root.putChild('', self) + self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost') + log.info("started jsonrpc server") - def jsonrpc_stop(self): - self.shut_down_cb() - return "fine" + @defer.inlineCallbacks + def jsonrpc_node_id_set(self, node_id): + old_id = self.node.node_id + self.node.stop() + del self.node + self.node_id = node_id.decode('hex') + yield self.setup() + msg = "changed dht id from %s to %s" % (old_id.encode('hex'), + self.node.node_id.encode('hex')) + defer.returnValue(msg) + + def jsonrpc_node_id_get(self): + return self._render_response(self.node.node_id.encode('hex')) + + @defer.inlineCallbacks + def jsonrpc_peer_find(self, node_id): + node_id = node_id.decode('hex') + contact = yield self.node.findContact(node_id) + result = None + if contact: + result = (contact.address, contact.port) + defer.returnValue(result) + + @defer.inlineCallbacks + def jsonrpc_peer_list_for_blob(self, blob_hash): + peers = yield self.node.getPeersForBlob(blob_hash.decode('hex')) + defer.returnValue(peers) + + @defer.inlineCallbacks + def jsonrpc_ping(self, node_id): + contact_host = yield self.jsonrpc_peer_find(node_id=node_id) + if not contact_host: + defer.returnValue("failed to find node") + contact_ip, contact_port = contact_host + contact = Contact(node_id.decode('hex'), contact_ip, contact_port, self.node._protocol) + try: + result = yield contact.ping() + except TimeoutError: + self.node.removeContact(contact.id) + self.node._dataStore.removePeer(contact.id) + result = {'error': 'timeout'} + defer.returnValue(result) + + def get_routing_table(self): + result = {} + data_store = deepcopy(self.node._dataStore._dict) + datastore_len = len(data_store) + hosts = {} + missing_contacts = [] + if datastore_len: + for k, v in data_store.iteritems(): + for value, lastPublished, originallyPublished, originalPublisherID in v: + try: + contact = self.node._routingTable.getContact(originalPublisherID) + except ValueError: + if originalPublisherID.encode('hex') not in missing_contacts: + missing_contacts.append(originalPublisherID.encode('hex')) + continue + if contact in hosts: + blobs = hosts[contact] + else: + blobs = [] + blobs.append(k.encode('hex')) + hosts[contact] = blobs + + contact_set = [] + blob_hashes = [] + result['buckets'] = {} + + for i in range(len(self.node._routingTable._buckets)): + for contact in self.node._routingTable._buckets[i]._contacts: + contacts = result['buckets'].get(i, []) + if contact in hosts: + blobs = hosts[contact] + del hosts[contact] + else: + blobs = [] + host = { + "address": contact.address, + "id": contact.id.encode("hex"), + "blobs": blobs, + } + for blob_hash in blobs: + if blob_hash not in blob_hashes: + blob_hashes.append(blob_hash) + contacts.append(host) + result['buckets'][i] = contacts + contact_set.append(contact.id.encode("hex")) + if hosts: + result['datastore extra'] = [ + { + "id": host.id.encode('hex'), + "blobs": hosts[host], + } + for host in hosts] + result['missing contacts'] = missing_contacts + result['contacts'] = contact_set + result['blob hashes'] = blob_hashes + result['node id'] = self.node_id.encode('hex') + return result + + def jsonrpc_routing_table_get(self): + return self._render_response(self.get_routing_table()) def main(): parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands") - - parser.add_argument("node_port", + parser.add_argument("--node_port", help=("The UDP port on which the node will listen for connections " "from other dht nodes"), - type=int) - parser.add_argument("rpc_port", + type=int, default=4444) + parser.add_argument("--rpc_port", help="The TCP port on which the node will listen for rpc commands", - type=int) - parser.add_argument("dht_bootstrap_host", + type=int, default=5280) + parser.add_argument("--bootstrap_host", help="The IP of a DHT node to be used to bootstrap into the network", - nargs='?') - parser.add_argument("dht_bootstrap_port", + default='lbrynet1.lbry.io') + parser.add_argument("--node_id", + help="The IP of a DHT node to be used to bootstrap into the network", + default=None) + parser.add_argument("--bootstrap_port", help="The port of a DHT node to be used to bootstrap into the network", - nargs='?', default=4000, type=int) - parser.add_argument("--rpc_ip_address", - help="The network interface on which to listen for rpc connections", - default="127.0.0.1") + default=4444, type=int) args = parser.parse_args() - - def start_rpc(): - rpc_node = RPCNode(node, shut_down) - reactor.listenTCP(args.rpc_port, server.Site(rpc_node), interface=args.rpc_ip_address) - - def shut_down(): - d = defer.maybeDeferred(node.stop) - d.addBoth(lambda _: reactor.stop()) - return d - - known_nodes = [] - if args.dht_bootstrap_host: - known_nodes.append((args.dht_bootstrap_host, args.dht_bootstrap_port)) - - node = Node(udpPort=args.node_port) - node.joinNetwork(known_nodes) - d = node._joinDeferred - d.addCallback(lambda _: start_rpc()) + seeds = [(args.bootstrap_host, args.bootstrap_port)] + server = NodeRPC(args.node_id, seeds, args.node_port, args.rpc_port) + reactor.addSystemEventTrigger('after', 'startup', server.setup) reactor.run() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scripts/send_sd_blobs_to_lighthouse.py b/scripts/send_sd_blobs_to_lighthouse.py index de40356cd..aad1f21f9 100644 --- a/scripts/send_sd_blobs_to_lighthouse.py +++ b/scripts/send_sd_blobs_to_lighthouse.py @@ -53,7 +53,7 @@ def main(args=None): session = Session.Session( blob_data_payment_rate=0, db_dir=db_dir, - lbryid=utils.generate_id(), + node_id=utils.generate_id(), blob_dir=blob_dir, dht_node_port=4444, known_dht_nodes=conf.settings['known_dht_nodes'],