update tests and scripts
This commit is contained in:
parent
9919fd06c6
commit
e9fd8eb096
12 changed files with 332 additions and 93 deletions
|
@ -218,7 +218,8 @@ class Node(object):
|
|||
for peer in result[blob_hash]:
|
||||
if self.node_id != peer[6:]:
|
||||
host = ".".join([str(ord(d)) for d in peer[:4]])
|
||||
if host == "127.0.0.1" and "from_peer" in result and result["from_peer"] != "self":
|
||||
if host == "127.0.0.1" and "from_peer" in result \
|
||||
and result["from_peer"] != "self":
|
||||
host = result["from_peer"]
|
||||
port, = struct.unpack('>H', peer[4:6])
|
||||
if (host, port) not in expanded_peers:
|
||||
|
|
|
@ -46,6 +46,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
|
|||
log_format = "%(funcName)s(): %(message)s"
|
||||
logging.basicConfig(level=logging.CRITICAL, format=log_format)
|
||||
|
||||
|
||||
def require_system(system):
|
||||
def wrapper(fn):
|
||||
return fn
|
||||
|
@ -115,10 +116,10 @@ class LbryUploader(object):
|
|||
|
||||
self.session = Session(
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir,
|
||||
lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node, is_generous=self.is_generous)
|
||||
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
|
||||
stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
self.session, stream_info_manager, self.sd_identifier)
|
||||
|
@ -218,12 +219,13 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
|
|||
|
||||
db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
|
||||
lbryid="abcd" + str(n),
|
||||
node_id="abcd" + str(n),
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=peer_port,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
|
||||
external_ip="127.0.0.1")
|
||||
|
||||
stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
|
@ -330,12 +332,13 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
|
|||
|
||||
db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
|
||||
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="efgh",
|
||||
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=peer_port,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
|
||||
external_ip="127.0.0.1")
|
||||
|
||||
if slow is True:
|
||||
session.rate_limiter.set_ul_limit(2 ** 11)
|
||||
|
@ -508,11 +511,11 @@ class TestTransfer(TestCase):
|
|||
db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
self.session = Session(
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
|
||||
lbryid="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
dht_node_class=Node, is_generous=self.is_generous)
|
||||
dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1")
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
|
@ -599,12 +602,12 @@ class TestTransfer(TestCase):
|
|||
|
||||
db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
self.session = Session(
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1")
|
||||
|
||||
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
|
||||
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
|
||||
|
@ -678,11 +681,12 @@ class TestTransfer(TestCase):
|
|||
|
||||
db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
|
||||
lbryid="abcd", peer_finder=peer_finder,
|
||||
node_id="abcd", peer_finder=peer_finder,
|
||||
hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
|
||||
external_ip="127.0.0.1")
|
||||
|
||||
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
|
||||
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager,
|
||||
|
@ -800,11 +804,12 @@ class TestTransfer(TestCase):
|
|||
|
||||
db_dir, blob_dir = mk_db_and_blob_dir()
|
||||
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
|
||||
lbryid="abcd", peer_finder=peer_finder,
|
||||
node_id="abcd", peer_finder=peer_finder,
|
||||
hash_announcer=hash_announcer, blob_dir=blob_dir,
|
||||
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter,
|
||||
wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
|
||||
external_ip="127.0.0.1")
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@ from lbrynet.core import PeerManager
|
|||
from lbrynet.core import RateLimiter
|
||||
from lbrynet.core import Session
|
||||
from lbrynet.core import StreamDescriptor
|
||||
from lbrynet.dht.node import Node
|
||||
from lbrynet.lbry_file import EncryptedFileMetadataManager
|
||||
from lbrynet.lbry_file.client import EncryptedFileOptions
|
||||
from lbrynet.file_manager import EncryptedFileCreator
|
||||
|
@ -18,6 +17,7 @@ from lbrynet.file_manager import EncryptedFileManager
|
|||
from lbrynet.tests import mocks
|
||||
from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir
|
||||
|
||||
|
||||
class TestReflector(unittest.TestCase):
|
||||
def setUp(self):
|
||||
mocks.mock_conf_settings(self)
|
||||
|
@ -57,7 +57,7 @@ class TestReflector(unittest.TestCase):
|
|||
self.session = Session.Session(
|
||||
conf.settings['data_rate'],
|
||||
db_dir=self.db_dir,
|
||||
lbryid="abcd",
|
||||
node_id="abcd",
|
||||
peer_finder=peer_finder,
|
||||
hash_announcer=hash_announcer,
|
||||
blob_dir=self.blob_dir,
|
||||
|
@ -66,7 +66,7 @@ class TestReflector(unittest.TestCase):
|
|||
rate_limiter=rate_limiter,
|
||||
wallet=wallet,
|
||||
blob_tracker_class=mocks.BlobAvailabilityTracker,
|
||||
dht_node_class=Node
|
||||
external_ip="127.0.0.1"
|
||||
)
|
||||
|
||||
self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(
|
||||
|
|
|
@ -72,12 +72,12 @@ class TestStreamify(TestCase):
|
|||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=self.is_generous
|
||||
is_generous=self.is_generous, external_ip="127.0.0.1"
|
||||
)
|
||||
|
||||
self.stream_info_manager = TempEncryptedFileMetadataManager()
|
||||
|
@ -128,11 +128,11 @@ class TestStreamify(TestCase):
|
|||
os.mkdir(blob_dir)
|
||||
|
||||
self.session = Session(
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer,
|
||||
blob_dir=blob_dir, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1"
|
||||
)
|
||||
|
||||
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
|
||||
|
|
|
@ -18,6 +18,7 @@ def shell_command(command):
|
|||
FNULL = open(os.devnull, 'w')
|
||||
p = subprocess.Popen(command,shell=False,stdout=FNULL,stderr=subprocess.STDOUT)
|
||||
|
||||
|
||||
def lbrynet_cli(commands):
|
||||
cli_cmd=['lbrynet-cli']
|
||||
for cmd in commands:
|
||||
|
@ -65,7 +66,6 @@ class TestIntegration(unittest.TestCase):
|
|||
out = json.loads(out)
|
||||
self.assertTrue(out['is_running'])
|
||||
|
||||
|
||||
def test_cli_docopts(self):
|
||||
out,err = lbrynet_cli(['cli_test_command'])
|
||||
self.assertEqual('',out)
|
||||
|
@ -83,7 +83,6 @@ class TestIntegration(unittest.TestCase):
|
|||
out = json.loads(out)
|
||||
self.assertEqual([1,[],1,None,False,False], out)
|
||||
|
||||
|
||||
out,err = lbrynet_cli(['cli_test_command','1', '--pos_arg2=2','--pos_arg3=3'])
|
||||
out = json.loads(out)
|
||||
self.assertEqual([1,[],2,3,False,False], out)
|
||||
|
@ -93,7 +92,6 @@ class TestIntegration(unittest.TestCase):
|
|||
# TODO: variable length arguments don't have guess_type() on them
|
||||
self.assertEqual([1,['2','3'],None,None,False,False], out)
|
||||
|
||||
|
||||
out,err = lbrynet_cli(['cli_test_command','1','-a'])
|
||||
out = json.loads(out)
|
||||
self.assertEqual([1,[],None,None,True,False], out)
|
||||
|
@ -102,13 +100,10 @@ class TestIntegration(unittest.TestCase):
|
|||
out = json.loads(out)
|
||||
self.assertEqual([1,[],None,None,True,False], out)
|
||||
|
||||
|
||||
out,err = lbrynet_cli(['cli_test_command','1','-a','-b'])
|
||||
out = json.loads(out)
|
||||
self.assertEqual([1,[],None,None,True,True], out)
|
||||
|
||||
|
||||
|
||||
def test_status(self):
|
||||
out = lbrynet.status()
|
||||
self.assertTrue(out['is_running'])
|
||||
|
|
|
@ -15,6 +15,10 @@ from lbrynet.tests.mocks import BlobAvailabilityTracker as DummyBlobAvailability
|
|||
from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager
|
||||
from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed
|
||||
|
||||
import logging
|
||||
logging.getLogger("lbryum").setLevel(logging.WARNING)
|
||||
|
||||
|
||||
def get_test_daemon(data_rate=None, generous=True, with_fee=False):
|
||||
if data_rate is None:
|
||||
data_rate = conf.ADJUSTABLE_SETTINGS['data_rate'][1]
|
||||
|
@ -68,7 +72,6 @@ class TestCostEst(unittest.TestCase):
|
|||
size = 10000000
|
||||
correct_result = 4.5
|
||||
daemon = get_test_daemon(generous=True, with_fee=True)
|
||||
print daemon.get_est_cost("test", size)
|
||||
self.assertEquals(daemon.get_est_cost("test", size).result, correct_result)
|
||||
|
||||
def test_fee_and_ungenerous_data(self):
|
||||
|
|
103
scripts/dht_monitor.py
Normal file
103
scripts/dht_monitor.py
Normal file
|
@ -0,0 +1,103 @@
|
|||
import curses
|
||||
import time
|
||||
from jsonrpc.proxy import JSONRPCProxy
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log.addHandler(logging.FileHandler("dht contacts.log"))
|
||||
# log.addHandler(logging.StreamHandler())
|
||||
log.setLevel(logging.INFO)
|
||||
stdscr = curses.initscr()
|
||||
|
||||
api = JSONRPCProxy.from_url("http://localhost:5280")
|
||||
|
||||
|
||||
def init_curses():
|
||||
curses.noecho()
|
||||
curses.cbreak()
|
||||
stdscr.nodelay(1)
|
||||
stdscr.keypad(1)
|
||||
|
||||
|
||||
def teardown_curses():
|
||||
curses.nocbreak()
|
||||
stdscr.keypad(0)
|
||||
curses.echo()
|
||||
curses.endwin()
|
||||
|
||||
|
||||
def refresh(last_contacts, last_blobs):
|
||||
height, width = stdscr.getmaxyx()
|
||||
|
||||
try:
|
||||
routing_table_info = api.routing_table_get()
|
||||
node_id = routing_table_info['node id']
|
||||
except:
|
||||
node_id = "UNKNOWN"
|
||||
routing_table_info = {
|
||||
'buckets': {},
|
||||
'contacts': [],
|
||||
'blob hashes': []
|
||||
}
|
||||
for y in range(height):
|
||||
stdscr.addstr(y, 0, " " * (width - 1))
|
||||
|
||||
buckets = routing_table_info['buckets']
|
||||
stdscr.addstr(0, 0, "node id: %s" % node_id)
|
||||
stdscr.addstr(1, 0, "%i buckets, %i contacts, %i blobs" %
|
||||
(len(buckets), len(routing_table_info['contacts']),
|
||||
len(routing_table_info['blob hashes'])))
|
||||
|
||||
y = 3
|
||||
for i in sorted(buckets.keys()):
|
||||
stdscr.addstr(y, 0, "bucket %s" % i)
|
||||
y += 1
|
||||
for h in sorted(buckets[i], key=lambda x: x['id'].decode('hex')):
|
||||
stdscr.addstr(y, 0, '%s (%s) - %i blobs' % (h['id'], h['address'], len(h['blobs'])))
|
||||
y += 1
|
||||
y += 1
|
||||
|
||||
new_contacts = set(routing_table_info['contacts']) - last_contacts
|
||||
lost_contacts = last_contacts - set(routing_table_info['contacts'])
|
||||
|
||||
if new_contacts:
|
||||
for c in new_contacts:
|
||||
log.debug("added contact %s", c)
|
||||
if lost_contacts:
|
||||
for c in lost_contacts:
|
||||
log.info("lost contact %s", c)
|
||||
|
||||
new_blobs = set(routing_table_info['blob hashes']) - last_blobs
|
||||
lost_blobs = last_blobs - set(routing_table_info['blob hashes'])
|
||||
|
||||
if new_blobs:
|
||||
for c in new_blobs:
|
||||
log.debug("added blob %s", c)
|
||||
if lost_blobs:
|
||||
for c in lost_blobs:
|
||||
log.info("lost blob %s", c)
|
||||
|
||||
stdscr.addstr(y + 1, 0, str(time.time()))
|
||||
stdscr.refresh()
|
||||
return set(routing_table_info['contacts']), set(routing_table_info['blob hashes'])
|
||||
|
||||
|
||||
def do_main():
|
||||
c = None
|
||||
last_contacts, last_blobs = set(), set()
|
||||
while c not in [ord('q'), ord('Q')]:
|
||||
last_contacts, last_blobs = refresh(last_contacts, last_blobs)
|
||||
c = stdscr.getch()
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
init_curses()
|
||||
do_main()
|
||||
finally:
|
||||
teardown_curses()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -22,7 +22,7 @@ def join_network(udp_port, known_nodes):
|
|||
lbryid = generate_id()
|
||||
|
||||
log.info('Creating node')
|
||||
node = Node(udpPort=udp_port, lbryid=lbryid)
|
||||
node = Node(udpPort=udp_port, node_id=lbryid)
|
||||
|
||||
log.info('Joining network')
|
||||
yield node.joinNetwork(known_nodes)
|
||||
|
|
|
@ -150,7 +150,7 @@ if __name__ == '__main__':
|
|||
# If you wish to have a pure Kademlia network, use the
|
||||
# entangled.kademlia.node.Node class instead
|
||||
print 'Creating Node'
|
||||
node = Node(udpPort=int(sys.argv[1]), lbryid=lbryid)
|
||||
node = Node(udpPort=int(sys.argv[1]), node_id=lbryid)
|
||||
|
||||
# Schedule the node to join the Kademlia/Entangled DHT
|
||||
node.joinNetwork(knownNodes)
|
||||
|
|
|
@ -51,7 +51,7 @@ def main(args=None):
|
|||
session = Session.Session(
|
||||
0,
|
||||
db_dir=db_dir,
|
||||
lbryid=utils.generate_id(),
|
||||
node_id=utils.generate_id(),
|
||||
blob_dir=blob_dir,
|
||||
dht_node_port=4444,
|
||||
known_dht_nodes=conf.settings['known_dht_nodes'],
|
||||
|
|
|
@ -1,82 +1,214 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# This library is free software, distributed under the terms of
|
||||
# the GNU Lesser General Public License Version 3, or any later version.
|
||||
# See the COPYING file included in this archive
|
||||
#
|
||||
|
||||
# Thanks to Paul Cannon for IP-address resolution functions (taken from aspn.activestate.com)
|
||||
|
||||
|
||||
"""
|
||||
Launch a DHT node which can respond to RPC commands.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import requests
|
||||
import miniupnpc
|
||||
import argparse
|
||||
from lbrynet.dht.node import Node
|
||||
from txjsonrpc.web import jsonrpc
|
||||
from twisted.web import server
|
||||
from copy import deepcopy
|
||||
from twisted.internet import reactor, defer
|
||||
from twisted.web import resource
|
||||
from twisted.web.server import Site
|
||||
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.log_support import configure_console
|
||||
from lbrynet.dht.error import TimeoutError
|
||||
conf.initialize_settings()
|
||||
|
||||
log = logging.getLogger("dht tool")
|
||||
configure_console()
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
from lbrynet.dht.node import Node
|
||||
from lbrynet.dht.contact import Contact
|
||||
from lbrynet.daemon.auth.server import AuthJSONRPCServer
|
||||
from lbrynet.core.utils import generate_id
|
||||
|
||||
def get_external_ip_and_setup_upnp():
|
||||
try:
|
||||
u = miniupnpc.UPnP()
|
||||
u.discoverdelay = 200
|
||||
u.discover()
|
||||
u.selectigd()
|
||||
|
||||
if u.getspecificportmapping(4444, "UDP"):
|
||||
u.deleteportmapping(4444, "UDP")
|
||||
log.info("Removed UPnP redirect for UDP 4444.")
|
||||
u.addportmapping(4444, 'UDP', u.lanaddr, 4444, 'LBRY DHT port', '')
|
||||
log.info("got external ip from upnp")
|
||||
return u.externalipaddress()
|
||||
except Exception:
|
||||
log.exception("derp")
|
||||
r = requests.get('https://api.ipify.org', {'format': 'json'})
|
||||
log.info("got external ip from ipify.org")
|
||||
return r.json()['ip']
|
||||
|
||||
|
||||
class RPCNode(jsonrpc.JSONRPC):
|
||||
def __init__(self, node, shut_down_cb):
|
||||
jsonrpc.JSONRPC.__init__(self)
|
||||
self.node = node
|
||||
self.shut_down_cb = shut_down_cb
|
||||
class NodeRPC(AuthJSONRPCServer):
|
||||
def __init__(self, lbryid, seeds, node_port, rpc_port):
|
||||
AuthJSONRPCServer.__init__(self, False)
|
||||
self.root = None
|
||||
self.port = None
|
||||
self.seeds = seeds
|
||||
self.node_port = node_port
|
||||
self.rpc_port = rpc_port
|
||||
if lbryid:
|
||||
lbryid = lbryid.decode('hex')
|
||||
else:
|
||||
lbryid = generate_id()
|
||||
self.node_id = lbryid
|
||||
self.external_ip = get_external_ip_and_setup_upnp()
|
||||
self.node_port = node_port
|
||||
|
||||
def jsonrpc_total_dht_nodes(self):
|
||||
return self.node.getApproximateTotalDHTNodes()
|
||||
@defer.inlineCallbacks
|
||||
def setup(self):
|
||||
self.node = Node(node_id=self.node_id, udpPort=self.node_port,
|
||||
externalIP=self.external_ip)
|
||||
hosts = []
|
||||
for hostname, hostport in self.seeds:
|
||||
host_ip = yield reactor.resolve(hostname)
|
||||
hosts.append((host_ip, hostport))
|
||||
log.info("connecting to dht")
|
||||
yield self.node.joinNetwork(tuple(hosts))
|
||||
log.info("connected to dht")
|
||||
if not self.announced_startup:
|
||||
self.announced_startup = True
|
||||
self.start_api()
|
||||
log.info("lbry id: %s (%i bytes)", self.node.node_id.encode('hex'), len(self.node.node_id))
|
||||
|
||||
def jsonrpc_total_dht_hashes(self):
|
||||
return self.node.getApproximateTotalHashes()
|
||||
def start_api(self):
|
||||
root = resource.Resource()
|
||||
root.putChild('', self)
|
||||
self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost')
|
||||
log.info("started jsonrpc server")
|
||||
|
||||
def jsonrpc_stop(self):
|
||||
self.shut_down_cb()
|
||||
return "fine"
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_node_id_set(self, node_id):
|
||||
old_id = self.node.node_id
|
||||
self.node.stop()
|
||||
del self.node
|
||||
self.node_id = node_id.decode('hex')
|
||||
yield self.setup()
|
||||
msg = "changed dht id from %s to %s" % (old_id.encode('hex'),
|
||||
self.node.node_id.encode('hex'))
|
||||
defer.returnValue(msg)
|
||||
|
||||
def jsonrpc_node_id_get(self):
|
||||
return self._render_response(self.node.node_id.encode('hex'))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_peer_find(self, node_id):
|
||||
node_id = node_id.decode('hex')
|
||||
contact = yield self.node.findContact(node_id)
|
||||
result = None
|
||||
if contact:
|
||||
result = (contact.address, contact.port)
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_peer_list_for_blob(self, blob_hash):
|
||||
peers = yield self.node.getPeersForBlob(blob_hash.decode('hex'))
|
||||
defer.returnValue(peers)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_ping(self, node_id):
|
||||
contact_host = yield self.jsonrpc_peer_find(node_id=node_id)
|
||||
if not contact_host:
|
||||
defer.returnValue("failed to find node")
|
||||
contact_ip, contact_port = contact_host
|
||||
contact = Contact(node_id.decode('hex'), contact_ip, contact_port, self.node._protocol)
|
||||
try:
|
||||
result = yield contact.ping()
|
||||
except TimeoutError:
|
||||
self.node.removeContact(contact.id)
|
||||
self.node._dataStore.removePeer(contact.id)
|
||||
result = {'error': 'timeout'}
|
||||
defer.returnValue(result)
|
||||
|
||||
def get_routing_table(self):
|
||||
result = {}
|
||||
data_store = deepcopy(self.node._dataStore._dict)
|
||||
datastore_len = len(data_store)
|
||||
hosts = {}
|
||||
missing_contacts = []
|
||||
if datastore_len:
|
||||
for k, v in data_store.iteritems():
|
||||
for value, lastPublished, originallyPublished, originalPublisherID in v:
|
||||
try:
|
||||
contact = self.node._routingTable.getContact(originalPublisherID)
|
||||
except ValueError:
|
||||
if originalPublisherID.encode('hex') not in missing_contacts:
|
||||
missing_contacts.append(originalPublisherID.encode('hex'))
|
||||
continue
|
||||
if contact in hosts:
|
||||
blobs = hosts[contact]
|
||||
else:
|
||||
blobs = []
|
||||
blobs.append(k.encode('hex'))
|
||||
hosts[contact] = blobs
|
||||
|
||||
contact_set = []
|
||||
blob_hashes = []
|
||||
result['buckets'] = {}
|
||||
|
||||
for i in range(len(self.node._routingTable._buckets)):
|
||||
for contact in self.node._routingTable._buckets[i]._contacts:
|
||||
contacts = result['buckets'].get(i, [])
|
||||
if contact in hosts:
|
||||
blobs = hosts[contact]
|
||||
del hosts[contact]
|
||||
else:
|
||||
blobs = []
|
||||
host = {
|
||||
"address": contact.address,
|
||||
"id": contact.id.encode("hex"),
|
||||
"blobs": blobs,
|
||||
}
|
||||
for blob_hash in blobs:
|
||||
if blob_hash not in blob_hashes:
|
||||
blob_hashes.append(blob_hash)
|
||||
contacts.append(host)
|
||||
result['buckets'][i] = contacts
|
||||
contact_set.append(contact.id.encode("hex"))
|
||||
if hosts:
|
||||
result['datastore extra'] = [
|
||||
{
|
||||
"id": host.id.encode('hex'),
|
||||
"blobs": hosts[host],
|
||||
}
|
||||
for host in hosts]
|
||||
result['missing contacts'] = missing_contacts
|
||||
result['contacts'] = contact_set
|
||||
result['blob hashes'] = blob_hashes
|
||||
result['node id'] = self.node_id.encode('hex')
|
||||
return result
|
||||
|
||||
def jsonrpc_routing_table_get(self):
|
||||
return self._render_response(self.get_routing_table())
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands")
|
||||
|
||||
parser.add_argument("node_port",
|
||||
parser.add_argument("--node_port",
|
||||
help=("The UDP port on which the node will listen for connections "
|
||||
"from other dht nodes"),
|
||||
type=int)
|
||||
parser.add_argument("rpc_port",
|
||||
type=int, default=4444)
|
||||
parser.add_argument("--rpc_port",
|
||||
help="The TCP port on which the node will listen for rpc commands",
|
||||
type=int)
|
||||
parser.add_argument("dht_bootstrap_host",
|
||||
type=int, default=5280)
|
||||
parser.add_argument("--bootstrap_host",
|
||||
help="The IP of a DHT node to be used to bootstrap into the network",
|
||||
nargs='?')
|
||||
parser.add_argument("dht_bootstrap_port",
|
||||
default='lbrynet1.lbry.io')
|
||||
parser.add_argument("--node_id",
|
||||
help="The IP of a DHT node to be used to bootstrap into the network",
|
||||
default=None)
|
||||
parser.add_argument("--bootstrap_port",
|
||||
help="The port of a DHT node to be used to bootstrap into the network",
|
||||
nargs='?', default=4000, type=int)
|
||||
parser.add_argument("--rpc_ip_address",
|
||||
help="The network interface on which to listen for rpc connections",
|
||||
default="127.0.0.1")
|
||||
default=4444, type=int)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
def start_rpc():
|
||||
rpc_node = RPCNode(node, shut_down)
|
||||
reactor.listenTCP(args.rpc_port, server.Site(rpc_node), interface=args.rpc_ip_address)
|
||||
|
||||
def shut_down():
|
||||
d = defer.maybeDeferred(node.stop)
|
||||
d.addBoth(lambda _: reactor.stop())
|
||||
return d
|
||||
|
||||
known_nodes = []
|
||||
if args.dht_bootstrap_host:
|
||||
known_nodes.append((args.dht_bootstrap_host, args.dht_bootstrap_port))
|
||||
|
||||
node = Node(udpPort=args.node_port)
|
||||
node.joinNetwork(known_nodes)
|
||||
d = node._joinDeferred
|
||||
d.addCallback(lambda _: start_rpc())
|
||||
seeds = [(args.bootstrap_host, args.bootstrap_port)]
|
||||
server = NodeRPC(args.node_id, seeds, args.node_port, args.rpc_port)
|
||||
reactor.addSystemEventTrigger('after', 'startup', server.setup)
|
||||
reactor.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
@ -53,7 +53,7 @@ def main(args=None):
|
|||
session = Session.Session(
|
||||
blob_data_payment_rate=0,
|
||||
db_dir=db_dir,
|
||||
lbryid=utils.generate_id(),
|
||||
node_id=utils.generate_id(),
|
||||
blob_dir=blob_dir,
|
||||
dht_node_port=4444,
|
||||
known_dht_nodes=conf.settings['known_dht_nodes'],
|
||||
|
|
Loading…
Reference in a new issue