214 lines
7.9 KiB
Python
214 lines
7.9 KiB
Python
import logging
|
|
import requests
|
|
import miniupnpc
|
|
import argparse
|
|
from copy import deepcopy
|
|
from twisted.internet import reactor, defer
|
|
from twisted.web import resource
|
|
from twisted.web.server import Site
|
|
|
|
from lbrynet import conf
|
|
from lbrynet.core.log_support import configure_console
|
|
from lbrynet.dht.error import TimeoutError
|
|
conf.initialize_settings()
|
|
|
|
log = logging.getLogger("dht tool")
|
|
configure_console()
|
|
log.setLevel(logging.INFO)
|
|
|
|
from lbrynet.dht.node import Node
|
|
from lbrynet.dht.contact import Contact
|
|
from lbrynet.daemon.auth.server import AuthJSONRPCServer
|
|
from lbrynet.core.utils import generate_id
|
|
|
|
def get_external_ip_and_setup_upnp():
|
|
try:
|
|
u = miniupnpc.UPnP()
|
|
u.discoverdelay = 200
|
|
u.discover()
|
|
u.selectigd()
|
|
|
|
if u.getspecificportmapping(4444, "UDP"):
|
|
u.deleteportmapping(4444, "UDP")
|
|
log.info("Removed UPnP redirect for UDP 4444.")
|
|
u.addportmapping(4444, 'UDP', u.lanaddr, 4444, 'LBRY DHT port', '')
|
|
log.info("got external ip from upnp")
|
|
return u.externalipaddress()
|
|
except Exception:
|
|
log.exception("derp")
|
|
r = requests.get('https://api.ipify.org', {'format': 'json'})
|
|
log.info("got external ip from ipify.org")
|
|
return r.json()['ip']
|
|
|
|
|
|
class NodeRPC(AuthJSONRPCServer):
|
|
def __init__(self, lbryid, seeds, node_port, rpc_port):
|
|
AuthJSONRPCServer.__init__(self, False)
|
|
self.root = None
|
|
self.port = None
|
|
self.seeds = seeds
|
|
self.node_port = node_port
|
|
self.rpc_port = rpc_port
|
|
if lbryid:
|
|
lbryid = lbryid.decode('hex')
|
|
else:
|
|
lbryid = generate_id()
|
|
self.node_id = lbryid
|
|
self.external_ip = get_external_ip_and_setup_upnp()
|
|
self.node_port = node_port
|
|
|
|
@defer.inlineCallbacks
|
|
def setup(self):
|
|
self.node = Node(node_id=self.node_id, udpPort=self.node_port,
|
|
externalIP=self.external_ip)
|
|
hosts = []
|
|
for hostname, hostport in self.seeds:
|
|
host_ip = yield reactor.resolve(hostname)
|
|
hosts.append((host_ip, hostport))
|
|
log.info("connecting to dht")
|
|
yield self.node.joinNetwork(tuple(hosts))
|
|
log.info("connected to dht")
|
|
if not self.announced_startup:
|
|
self.announced_startup = True
|
|
self.start_api()
|
|
log.info("lbry id: %s (%i bytes)", self.node.node_id.encode('hex'), len(self.node.node_id))
|
|
|
|
def start_api(self):
|
|
root = resource.Resource()
|
|
root.putChild('', self)
|
|
self.port = reactor.listenTCP(self.rpc_port, Site(root), interface='localhost')
|
|
log.info("started jsonrpc server")
|
|
|
|
@defer.inlineCallbacks
|
|
def jsonrpc_node_id_set(self, node_id):
|
|
old_id = self.node.node_id
|
|
self.node.stop()
|
|
del self.node
|
|
self.node_id = node_id.decode('hex')
|
|
yield self.setup()
|
|
msg = "changed dht id from %s to %s" % (old_id.encode('hex'),
|
|
self.node.node_id.encode('hex'))
|
|
defer.returnValue(msg)
|
|
|
|
def jsonrpc_node_id_get(self):
|
|
return self._render_response(self.node.node_id.encode('hex'))
|
|
|
|
@defer.inlineCallbacks
|
|
def jsonrpc_peer_find(self, node_id):
|
|
node_id = node_id.decode('hex')
|
|
contact = yield self.node.findContact(node_id)
|
|
result = None
|
|
if contact:
|
|
result = (contact.address, contact.port)
|
|
defer.returnValue(result)
|
|
|
|
@defer.inlineCallbacks
|
|
def jsonrpc_peer_list_for_blob(self, blob_hash):
|
|
peers = yield self.node.getPeersForBlob(blob_hash.decode('hex'))
|
|
defer.returnValue(peers)
|
|
|
|
@defer.inlineCallbacks
|
|
def jsonrpc_ping(self, node_id):
|
|
contact_host = yield self.jsonrpc_peer_find(node_id=node_id)
|
|
if not contact_host:
|
|
defer.returnValue("failed to find node")
|
|
contact_ip, contact_port = contact_host
|
|
contact = Contact(node_id.decode('hex'), contact_ip, contact_port, self.node._protocol)
|
|
try:
|
|
result = yield contact.ping()
|
|
except TimeoutError:
|
|
self.node.removeContact(contact.id)
|
|
self.node._dataStore.removePeer(contact.id)
|
|
result = {'error': 'timeout'}
|
|
defer.returnValue(result)
|
|
|
|
def get_routing_table(self):
|
|
result = {}
|
|
data_store = deepcopy(self.node._dataStore._dict)
|
|
datastore_len = len(data_store)
|
|
hosts = {}
|
|
missing_contacts = []
|
|
if datastore_len:
|
|
for k, v in data_store.iteritems():
|
|
for value, lastPublished, originallyPublished, originalPublisherID in v:
|
|
try:
|
|
contact = self.node._routingTable.getContact(originalPublisherID)
|
|
except ValueError:
|
|
if originalPublisherID.encode('hex') not in missing_contacts:
|
|
missing_contacts.append(originalPublisherID.encode('hex'))
|
|
continue
|
|
if contact in hosts:
|
|
blobs = hosts[contact]
|
|
else:
|
|
blobs = []
|
|
blobs.append(k.encode('hex'))
|
|
hosts[contact] = blobs
|
|
|
|
contact_set = []
|
|
blob_hashes = []
|
|
result['buckets'] = {}
|
|
|
|
for i in range(len(self.node._routingTable._buckets)):
|
|
for contact in self.node._routingTable._buckets[i]._contacts:
|
|
contacts = result['buckets'].get(i, [])
|
|
if contact in hosts:
|
|
blobs = hosts[contact]
|
|
del hosts[contact]
|
|
else:
|
|
blobs = []
|
|
host = {
|
|
"address": contact.address,
|
|
"id": contact.id.encode("hex"),
|
|
"blobs": blobs,
|
|
}
|
|
for blob_hash in blobs:
|
|
if blob_hash not in blob_hashes:
|
|
blob_hashes.append(blob_hash)
|
|
contacts.append(host)
|
|
result['buckets'][i] = contacts
|
|
contact_set.append(contact.id.encode("hex"))
|
|
if hosts:
|
|
result['datastore extra'] = [
|
|
{
|
|
"id": host.id.encode('hex'),
|
|
"blobs": hosts[host],
|
|
}
|
|
for host in hosts]
|
|
result['missing contacts'] = missing_contacts
|
|
result['contacts'] = contact_set
|
|
result['blob hashes'] = blob_hashes
|
|
result['node id'] = self.node_id.encode('hex')
|
|
return result
|
|
|
|
def jsonrpc_routing_table_get(self):
|
|
return self._render_response(self.get_routing_table())
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Launch a dht node which responds to rpc commands")
|
|
parser.add_argument("--node_port",
|
|
help=("The UDP port on which the node will listen for connections "
|
|
"from other dht nodes"),
|
|
type=int, default=4444)
|
|
parser.add_argument("--rpc_port",
|
|
help="The TCP port on which the node will listen for rpc commands",
|
|
type=int, default=5280)
|
|
parser.add_argument("--bootstrap_host",
|
|
help="The IP of a DHT node to be used to bootstrap into the network",
|
|
default='lbrynet1.lbry.io')
|
|
parser.add_argument("--node_id",
|
|
help="The IP of a DHT node to be used to bootstrap into the network",
|
|
default=None)
|
|
parser.add_argument("--bootstrap_port",
|
|
help="The port of a DHT node to be used to bootstrap into the network",
|
|
default=4444, type=int)
|
|
|
|
args = parser.parse_args()
|
|
seeds = [(args.bootstrap_host, args.bootstrap_port)]
|
|
server = NodeRPC(args.node_id, seeds, args.node_port, args.rpc_port)
|
|
reactor.addSystemEventTrigger('after', 'startup', server.setup)
|
|
reactor.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|