exploring dht

This commit is contained in:
Alex Grintsvayg 2017-04-10 10:51:49 -04:00
parent 54c29d4a8d
commit 8db7c37fa7
11 changed files with 119 additions and 142 deletions

View file

@ -12,7 +12,6 @@ from lbrynet.core.PaymentRateManager import BasePaymentRateManager, NegotiatedPa
from lbrynet.core.BlobAvailability import BlobAvailabilityTracker
from twisted.internet import threads, defer
log = logging.getLogger(__name__)
@ -36,6 +35,7 @@ class Session(object):
upnp, which opens holes in compatible firewalls so that remote
peers can connect to this peer.
"""
def __init__(self, blob_data_payment_rate, db_dir=None,
lbryid=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None,
@ -251,10 +251,7 @@ class Session(object):
from twisted.internet import reactor
log.debug("Starting the dht")
def match_port(h, p):
return h, p
log.info("Starting DHT")
def join_resolved_addresses(result):
addresses = []
@ -272,7 +269,7 @@ class Session(object):
ds = []
for host, port in self.known_dht_nodes:
d = reactor.resolve(host)
d.addCallback(match_port, port)
d.addCallback(lambda h: (h, port)) # match host to port
ds.append(d)
self.dht_node = self.dht_node_class(
@ -323,6 +320,7 @@ class Session(object):
def _unset_upnp(self):
log.info("Unsetting upnp for %s", self)
def threaded_unset_upnp():
u = miniupnpc.UPnP()
num_devices_found = u.discover()

View file

@ -44,7 +44,7 @@ def datetime_obj(*args, **kwargs):
def call_later(delay, func, *args, **kwargs):
# Import here to ensure that it gets called after installing a reator
# Import here to ensure that it gets called after installing a reactor
# see: http://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html
from twisted.internet import reactor
return reactor.callLater(delay, func, *args, **kwargs)
@ -87,7 +87,7 @@ def obfuscate(plain):
return base64.b64encode(plain).encode('rot13')
def check_connection(server="www.lbry.io", port=80):
def check_connection(server="lbry.io", port=80):
"""Attempts to open a socket to server:port and returns True if successful."""
try:
log.debug('Checking connection to %s:%s', server, port)

View file

@ -46,7 +46,10 @@ peer_request_timeout = 10
checkRefreshInterval = refreshTimeout / 5
#: Max size of a single UDP datagram, in bytes. If a message is larger than this, it will
#: be spread accross several UDP packets.
#: be spread across several UDP packets.
udpDatagramMaxSize = 8192 # 8 KB
key_bits = 384
from lbrynet.core.cryptoutils import get_lbry_hash_obj
h = get_lbry_hash_obj()
key_bits = h.digest_size * 8 # 384 bits

View file

@ -7,6 +7,7 @@
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
class DecodeError(Exception):
""" Should be raised by an C{Encoding} implementation if decode operation
fails

View file

@ -2,8 +2,8 @@ from collections import Counter
import datetime
class HashWatcher():
def __init__(self, ttl=600):
class HashWatcher(object):
def __init__(self):
self.ttl = 600
self.hashes = []
self.next_tick = None

View file

@ -9,7 +9,7 @@
import hashlib
import random
from lbrynet.core.utils import generate_id
class Message(object):
""" Base class for messages - all "unknown" messages use this class """
@ -24,9 +24,7 @@ class RequestMessage(Message):
def __init__(self, nodeID, method, methodArgs, rpcID=None):
if rpcID == None:
hash = hashlib.sha384()
hash.update(str(random.getrandbits(255)))
rpcID = hash.digest()
rpcID = generate_id()
Message.__init__(self, rpcID, nodeID)
self.request = method
self.args = methodArgs

View file

@ -6,11 +6,9 @@
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import argparse
import binascii
import hashlib
import operator
import random
import struct
import time
@ -23,10 +21,13 @@ import protocol
import twisted.internet.reactor
import twisted.internet.threads
import twisted.python.log
from contact import Contact
from hashwatcher import HashWatcher
import logging
from lbrynet.core.utils import generate_id
log = logging.getLogger(__name__)
@ -512,14 +513,14 @@ class Node(object):
"""
# Get the sender's ID (if any)
if '_rpcNodeID' in kwargs:
rpcSenderID = kwargs['_rpcNodeID']
rpc_sender_id = kwargs['_rpcNodeID']
else:
rpcSenderID = None
contacts = self._routingTable.findCloseNodes(key, constants.k, rpcSenderID)
contactTriples = []
rpc_sender_id = None
contacts = self._routingTable.findCloseNodes(key, constants.k, rpc_sender_id)
contact_triples = []
for contact in contacts:
contactTriples.append((contact.id, contact.address, contact.port))
return contactTriples
contact_triples.append((contact.id, contact.address, contact.port))
return contact_triples
@rpcmethod
def findValue(self, key, **kwargs):
@ -536,8 +537,8 @@ class Node(object):
if self._dataStore.hasPeersForBlob(key):
rval = {key: self._dataStore.getPeersForBlob(key)}
else:
contactTriples = self.findNode(key, **kwargs)
rval = {'contacts': contactTriples}
contact_triples = self.findNode(key, **kwargs)
rval = {'contacts': contact_triples}
if '_rpcNodeContact' in kwargs:
contact = kwargs['_rpcNodeContact']
compact_ip = contact.compact_ip()
@ -551,9 +552,7 @@ class Node(object):
@return: A globally unique n-bit pseudo-random identifier
@rtype: str
"""
hash = hashlib.sha384()
hash.update(str(random.getrandbits(255)))
return hash.digest()
return generate_id()
def _iterativeFind(self, key, startupShortlist=None, rpc='findNode'):
""" The basic Kademlia iterative lookup operation (for nodes/values)
@ -583,11 +582,8 @@ class Node(object):
return a list of the k closest nodes to the specified key
@rtype: twisted.internet.defer.Deferred
"""
if rpc != 'findNode':
findValue = True
else:
findValue = False
shortlist = []
findValue = rpc != 'findNode'
if startupShortlist == None:
shortlist = self._routingTable.findCloseNodes(key, constants.alpha)
if key != self.id:
@ -903,30 +899,3 @@ class ExpensiveSort(object):
def _removeValue(self):
for item in self.to_sort:
delattr(item, self.attr)
def main():
parser = argparse.ArgumentParser(description="Launch a dht node")
parser.add_argument("udp_port", help="The UDP port on which the node will listen",
type=int)
parser.add_argument("known_node_ip",
help="The IP of a known node to be used to bootstrap into the network",
nargs='?')
parser.add_argument("known_node_port",
help="The port of a known node to be used to bootstrap into the network",
nargs='?', default=4000, type=int)
args = parser.parse_args()
if args.known_node_ip:
known_nodes = [(args.known_node_ip, args.known_node_port)]
else:
known_nodes = []
node = Node(udpPort=args.udp_port)
node.joinNetwork(known_nodes)
twisted.internet.reactor.run()
if __name__ == '__main__':
main()

View file

@ -108,6 +108,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
msgPrimitive = self._translator.toPrimitive(msg)
encodedMsg = self._encoder.encode(msgPrimitive)
log.debug("DHT SEND: %s(%s)", method, args)
df = defer.Deferred()
if rawResponse:
df._rpcRawResponse = True
@ -163,7 +165,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
self._handleRPC(remoteContact, message.id, message.request, message.args)
elif isinstance(message, msgtypes.ResponseMessage):
# Find the message that triggered this response
if self._sentMessages.has_key(message.id):
if message.id in self._sentMessages:
# Cancel timeout timer for this RPC
df, timeoutCall = self._sentMessages[message.id][1:3]
timeoutCall.cancel()
@ -296,6 +298,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
func = getattr(self._node, method, None)
if callable(func) and hasattr(func, 'rpcmethod'):
# Call the exposed Node method and return the result to the deferred callback chain
log.debug("DHT RECV CALL %s with args %s", method, args)
try:
kwargs = {'_rpcNodeID': senderContact.id, '_rpcNodeContact': senderContact}
result = func(*args, **kwargs)
@ -351,7 +354,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
Will only be called once, after all ports are disconnected.
"""
log.info('Stopping dht')
log.info('Stopping DHT')
for delayed_call in self._call_later_list.values():
try:
delayed_call.cancel()
@ -363,3 +366,4 @@ class KademliaProtocol(protocol.DatagramProtocol):
# exceptions.AttributeError: 'Port' object has no attribute 'socket'
# to happen on shutdown
# reactor.iterate()
log.info('DHT stopped')

View file

@ -160,7 +160,6 @@ class TreeRoutingTable(RoutingTable):
# be dropped, and the new contact added to the tail of
# the k-bucket. This implementation follows section
# 2.2 regarding this point.
headContact = self._buckets[bucketIndex]._contacts[0]
def replaceContact(failure):
""" Callback for the deferred PING RPC to see if the head
@ -181,8 +180,8 @@ class TreeRoutingTable(RoutingTable):
self.addContact(contact)
# Ping the least-recently seen contact in this k-bucket
headContact = self._buckets[bucketIndex]._contacts[0]
df = headContact.ping()
head_contact = self._buckets[bucketIndex]._contacts[0]
df = head_contact.ping()
# If there's an error (i.e. timeout), remove the head
# contact, and append the new one
df.addErrback(replaceContact)

View file

@ -1,11 +1,15 @@
import binascii
import logging
from lbrynet.core import log_support
import logging.handlers
import sys
import traceback
from lbrynet.dht.node import Node
from twisted.internet import reactor, task
from twisted.internet import reactor, defer
from lbrynet.core.utils import generate_id
log = logging.getLogger(__name__)
@ -13,90 +17,91 @@ def print_usage():
print "Usage:\n%s UDP_PORT KNOWN_NODE_IP KNOWN_NODE_PORT HASH"
@defer.inlineCallbacks
def join_network(udp_port, known_nodes):
lbryid = generate_id()
log.info('Creating Node')
log.info('Creating node')
node = Node(udpPort=udp_port, lbryid=lbryid)
log.info('Joining network')
d = node.joinNetwork(known_nodes)
yield node.joinNetwork(known_nodes)
def log_network_size():
defer.returnValue(node)
@defer.inlineCallbacks
def get_hosts(node, h):
log.info("Looking up %s", h)
hosts = yield node.getPeersForBlob(h.decode("hex"))
log.info("Hosts returned from the DHT: %s", hosts)
@defer.inlineCallbacks
def announce_hash(node, h):
results = yield node.announceHaveBlob(h, 34567)
for success, result in results:
if success:
log.info("Succeeded: %s", str(result))
else:
log.info("Failed: %s", str(result.getErrorMessage()))
# def get_args():
# if len(sys.argv) < 5:
# print_usage()
# sys.exit(1)
# udp_port = int(sys.argv[1])
# known_nodes = [(sys.argv[2], int(sys.argv[3]))]
# h = binascii.unhexlify(sys.argv[4])
# return udp_port, known_nodes, h
@defer.inlineCallbacks
def connect(port=None):
try:
if port is None:
raise Exception("need a port")
known_nodes = [('54.236.227.82', 4444)] # lbrynet1
node = yield join_network(port, known_nodes)
log.info("joined")
reactor.callLater(3, find, node)
except Exception:
log.error("CAUGHT EXCEPTION")
traceback.print_exc()
log.info("Stopping reactor")
yield reactor.stop()
@defer.inlineCallbacks
def find(node):
try:
log.info("Approximate number of nodes in DHT: %s", str(node.getApproximateTotalDHTNodes()))
log.info("Approximate number of blobs in DHT: %s", str(node.getApproximateTotalHashes()))
d.addCallback(lambda _: log_network_size())
h = "578f5e82da7db97bfe0677826d452cc0c65406a8e986c9caa126af4ecdbf4913daad2f7f5d1fb0ffec17d0bf8f187f5a"
peersFake = yield node.getPeersForBlob(h.decode("hex"))
print peersFake
peers = yield node.getPeersForBlob(h.decode("hex"))
print peers
d.addCallback(lambda _: node)
# yield get_hosts(node, h)
except Exception:
log.error("CAUGHT EXCEPTION")
traceback.print_exc()
return d
log.info("Stopping reactor")
yield reactor.stop()
def get_hosts(node, h):
def print_hosts(hosts):
print "Hosts returned from the DHT: "
print hosts
log.info("Looking up %s", h)
d = node.getPeersForBlob(h)
d.addCallback(print_hosts)
return d
def announce_hash(node, h):
d = node.announceHaveBlob(h, 34567)
def log_results(results):
for success, result in results:
if success:
log.info("Succeeded: %s", str(result))
else:
log.info("Failed: %s", str(result.getErrorMessage()))
d.addCallback(log_results)
return d
def get_args():
if len(sys.argv) < 5:
print_usage()
sys.exit(1)
udp_port = int(sys.argv[1])
known_nodes = [(sys.argv[2], int(sys.argv[3]))]
h = binascii.unhexlify(sys.argv[4])
return udp_port, known_nodes, h
def run_dht_script(dht_func):
log_format = "(%(asctime)s)[%(filename)s:%(lineno)s] %(funcName)s(): %(message)s"
logging.basicConfig(level=logging.DEBUG, format=log_format)
udp_port, known_nodes, h = get_args()
d = task.deferLater(reactor, 0, join_network, udp_port, known_nodes)
def run_dht_func(node):
return dht_func(node, h)
d.addCallback(run_dht_func)
def log_err(err):
log.error("An error occurred: %s", err.getTraceback())
return err
def shut_down():
log.info("Shutting down")
reactor.stop()
d.addErrback(log_err)
d.addBoth(lambda _: shut_down())
def main():
log_support.configure_console(level='DEBUG')
log_support.configure_twisted()
reactor.callLater(0, connect, port=10001)
log.info("Running reactor")
reactor.run()
def get_hosts_for_hash_in_dht():
run_dht_script(get_hosts)
def announce_hash_to_dht():
run_dht_script(announce_hash)
if __name__ == '__main__':
sys.exit(main())

View file

@ -22,18 +22,18 @@
import binascii
import hashlib
import random
import twisted.internet.reactor
from lbrynet.dht.node import Node
from lbrynet.core.cryptoutils import get_lbry_hash_obj
# The Entangled DHT node; instantiated in the main() method
node = None
# The key to use for this example when storing/retrieving data
hash = hashlib.sha384()
hash.update("key")
KEY = hash.digest()
h = get_lbry_hash_obj()
h.update("key")
KEY = h.digest()
# The value to store
VALUE = random.randint(10000, 20000)