forked from LBRYCommunity/lbry-sdk
fix redundant lbryid
This commit is contained in:
parent
46e31d5b45
commit
fcaca05a81
4 changed files with 29 additions and 33 deletions
|
@ -37,7 +37,7 @@ class Session(object):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, blob_data_payment_rate, db_dir=None,
|
def __init__(self, blob_data_payment_rate, db_dir=None,
|
||||||
lbryid=None, peer_manager=None, dht_node_port=None,
|
node_id=None, peer_manager=None, dht_node_port=None,
|
||||||
known_dht_nodes=None, peer_finder=None,
|
known_dht_nodes=None, peer_finder=None,
|
||||||
hash_announcer=None, blob_dir=None,
|
hash_announcer=None, blob_dir=None,
|
||||||
blob_manager=None, peer_port=None, use_upnp=True,
|
blob_manager=None, peer_port=None, use_upnp=True,
|
||||||
|
@ -48,7 +48,7 @@ class Session(object):
|
||||||
|
|
||||||
@param db_dir: The directory in which levelDB files should be stored
|
@param db_dir: The directory in which levelDB files should be stored
|
||||||
|
|
||||||
@param lbryid: The unique ID of this node
|
@param node_id: The unique ID of this node
|
||||||
|
|
||||||
@param peer_manager: An object which keeps track of all known
|
@param peer_manager: An object which keeps track of all known
|
||||||
peers. If None, a PeerManager will be created
|
peers. If None, a PeerManager will be created
|
||||||
|
@ -101,7 +101,7 @@ class Session(object):
|
||||||
"""
|
"""
|
||||||
self.db_dir = db_dir
|
self.db_dir = db_dir
|
||||||
|
|
||||||
self.lbryid = lbryid
|
self.node_id = node_id
|
||||||
|
|
||||||
self.peer_manager = peer_manager
|
self.peer_manager = peer_manager
|
||||||
|
|
||||||
|
@ -142,8 +142,8 @@ class Session(object):
|
||||||
|
|
||||||
log.debug("Starting session.")
|
log.debug("Starting session.")
|
||||||
|
|
||||||
if self.lbryid is None:
|
if self.node_id is None:
|
||||||
self.lbryid = generate_id()
|
self.node_id = generate_id()
|
||||||
|
|
||||||
if self.wallet is None:
|
if self.wallet is None:
|
||||||
from lbrynet.core.PTCWallet import PTCWallet
|
from lbrynet.core.PTCWallet import PTCWallet
|
||||||
|
@ -274,7 +274,7 @@ class Session(object):
|
||||||
|
|
||||||
self.dht_node = self.dht_node_class(
|
self.dht_node = self.dht_node_class(
|
||||||
udpPort=self.dht_node_port,
|
udpPort=self.dht_node_port,
|
||||||
lbryid=self.lbryid,
|
node_id=self.node_id,
|
||||||
externalIP=self.external_ip
|
externalIP=self.external_ip
|
||||||
)
|
)
|
||||||
self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager)
|
self.peer_finder = DHTPeerFinder(self.dht_node, self.peer_manager)
|
||||||
|
|
|
@ -206,7 +206,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
# of the daemon, but I don't want to deal with that now
|
# of the daemon, but I don't want to deal with that now
|
||||||
|
|
||||||
self.analytics_manager = analytics_manager
|
self.analytics_manager = analytics_manager
|
||||||
self.lbryid = conf.settings.node_id
|
self.node_id = conf.settings.node_id
|
||||||
|
|
||||||
self.wallet_user = None
|
self.wallet_user = None
|
||||||
self.wallet_password = None
|
self.wallet_password = None
|
||||||
|
@ -562,7 +562,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
self.session = Session(
|
self.session = Session(
|
||||||
conf.settings['data_rate'],
|
conf.settings['data_rate'],
|
||||||
db_dir=self.db_dir,
|
db_dir=self.db_dir,
|
||||||
lbryid=self.lbryid,
|
node_id=self.node_id,
|
||||||
blob_dir=self.blobfile_dir,
|
blob_dir=self.blobfile_dir,
|
||||||
dht_node_port=self.dht_node_port,
|
dht_node_port=self.dht_node_port,
|
||||||
known_dht_nodes=conf.settings['known_dht_nodes'],
|
known_dht_nodes=conf.settings['known_dht_nodes'],
|
||||||
|
@ -1054,7 +1054,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None
|
best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None
|
||||||
|
|
||||||
response = {
|
response = {
|
||||||
'lbry_id': base58.b58encode(self.lbryid),
|
'lbry_id': base58.b58encode(self.node_id),
|
||||||
'installation_id': conf.settings.installation_id,
|
'installation_id': conf.settings.installation_id,
|
||||||
'is_running': self.announced_startup,
|
'is_running': self.announced_startup,
|
||||||
'is_first_run': self.session.wallet.is_first_run if has_wallet else None,
|
'is_first_run': self.session.wallet.is_first_run if has_wallet else None,
|
||||||
|
|
|
@ -49,8 +49,8 @@ class Node(object):
|
||||||
application is performed via this class (or a subclass).
|
application is performed via this class (or a subclass).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, id=None, udpPort=4000, dataStore=None,
|
def __init__(self, node_id=None, udpPort=4000, dataStore=None,
|
||||||
routingTableClass=None, networkProtocol=None, lbryid=None,
|
routingTableClass=None, networkProtocol=None,
|
||||||
externalIP=None):
|
externalIP=None):
|
||||||
"""
|
"""
|
||||||
@param dataStore: The data store to use. This must be class inheriting
|
@param dataStore: The data store to use. This must be class inheriting
|
||||||
|
@ -74,11 +74,7 @@ class Node(object):
|
||||||
being transmitted.
|
being transmitted.
|
||||||
@type networkProtocol: entangled.kademlia.protocol.KademliaProtocol
|
@type networkProtocol: entangled.kademlia.protocol.KademliaProtocol
|
||||||
"""
|
"""
|
||||||
if id != None:
|
self.node_id = node_id or self._generateID()
|
||||||
self.id = id
|
|
||||||
else:
|
|
||||||
self.id = self._generateID()
|
|
||||||
self.lbryid = lbryid
|
|
||||||
self.port = udpPort
|
self.port = udpPort
|
||||||
self._listeningPort = None # object implementing Twisted
|
self._listeningPort = None # object implementing Twisted
|
||||||
# IListeningPort This will contain a deferred created when
|
# IListeningPort This will contain a deferred created when
|
||||||
|
@ -91,9 +87,9 @@ class Node(object):
|
||||||
self.change_token_lc = task.LoopingCall(self.change_token)
|
self.change_token_lc = task.LoopingCall(self.change_token)
|
||||||
# Create k-buckets (for storing contacts)
|
# Create k-buckets (for storing contacts)
|
||||||
if routingTableClass is None:
|
if routingTableClass is None:
|
||||||
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.id)
|
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id)
|
||||||
else:
|
else:
|
||||||
self._routingTable = routingTableClass(self.id)
|
self._routingTable = routingTableClass(self.node_id)
|
||||||
|
|
||||||
# Initialize this node's network access mechanisms
|
# Initialize this node's network access mechanisms
|
||||||
if networkProtocol is None:
|
if networkProtocol is None:
|
||||||
|
@ -110,7 +106,7 @@ class Node(object):
|
||||||
# Try to restore the node's state...
|
# Try to restore the node's state...
|
||||||
if 'nodeState' in self._dataStore:
|
if 'nodeState' in self._dataStore:
|
||||||
state = self._dataStore['nodeState']
|
state = self._dataStore['nodeState']
|
||||||
self.id = state['id']
|
self.node_id = state['id']
|
||||||
for contactTriple in state['closestNodes']:
|
for contactTriple in state['closestNodes']:
|
||||||
contact = Contact(
|
contact = Contact(
|
||||||
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
|
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
|
||||||
|
@ -166,7 +162,7 @@ class Node(object):
|
||||||
self.change_token_lc.start(constants.tokenSecretChangeInterval)
|
self.change_token_lc.start(constants.tokenSecretChangeInterval)
|
||||||
|
|
||||||
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
|
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
|
||||||
self._joinDeferred = self._iterativeFind(self.id, bootstrapContacts)
|
self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts)
|
||||||
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
|
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
|
||||||
# Start refreshing k-buckets periodically, if necessary
|
# Start refreshing k-buckets periodically, if necessary
|
||||||
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
|
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
|
||||||
|
@ -186,7 +182,7 @@ class Node(object):
|
||||||
# get the deepest bucket and the number of contacts in that bucket and multiply it
|
# get the deepest bucket and the number of contacts in that bucket and multiply it
|
||||||
# by the number of equivalently deep buckets in the whole DHT to get a really bad
|
# by the number of equivalently deep buckets in the whole DHT to get a really bad
|
||||||
# estimate!
|
# estimate!
|
||||||
bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.id)]
|
bucket = self._routingTable._buckets[self._routingTable._kbucketIndex(self.node_id)]
|
||||||
num_in_bucket = len(bucket._contacts)
|
num_in_bucket = len(bucket._contacts)
|
||||||
factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin)
|
factor = (2 ** constants.key_bits) / (bucket.rangeMax - bucket.rangeMin)
|
||||||
return num_in_bucket * factor
|
return num_in_bucket * factor
|
||||||
|
@ -202,7 +198,7 @@ class Node(object):
|
||||||
return num_in_data_store * self.getApproximateTotalDHTNodes() / 8
|
return num_in_data_store * self.getApproximateTotalDHTNodes() / 8
|
||||||
|
|
||||||
def announceHaveBlob(self, key, port):
|
def announceHaveBlob(self, key, port):
|
||||||
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.lbryid})
|
return self.iterativeAnnounceHaveBlob(key, {'port': port, 'lbryid': self.node_id})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def getPeersForBlob(self, blob_hash):
|
def getPeersForBlob(self, blob_hash):
|
||||||
|
@ -211,7 +207,7 @@ class Node(object):
|
||||||
if result:
|
if result:
|
||||||
if blob_hash in result:
|
if blob_hash in result:
|
||||||
for peer in result[blob_hash]:
|
for peer in result[blob_hash]:
|
||||||
if self.lbryid != peer[6:]:
|
if self.node_id != peer[6:]:
|
||||||
host = ".".join([str(ord(d)) for d in peer[:4]])
|
host = ".".join([str(ord(d)) for d in peer[:4]])
|
||||||
if host == "127.0.0.1" and "from_peer" in result and result["from_peer"] != "self":
|
if host == "127.0.0.1" and "from_peer" in result and result["from_peer"] != "self":
|
||||||
host = result["from_peer"]
|
host = result["from_peer"]
|
||||||
|
@ -258,7 +254,7 @@ class Node(object):
|
||||||
result = responseMsg.response
|
result = responseMsg.response
|
||||||
if 'token' in result:
|
if 'token' in result:
|
||||||
value['token'] = result['token']
|
value['token'] = result['token']
|
||||||
d = n.store(blob_hash, value, self.id, 0)
|
d = n.store(blob_hash, value, self.node_id, 0)
|
||||||
d.addCallback(log_success)
|
d.addCallback(log_success)
|
||||||
d.addErrback(log_error, n)
|
d.addErrback(log_error, n)
|
||||||
else:
|
else:
|
||||||
|
@ -267,12 +263,12 @@ class Node(object):
|
||||||
|
|
||||||
def requestPeers(contacts):
|
def requestPeers(contacts):
|
||||||
if self.externalIP is not None and len(contacts) >= constants.k:
|
if self.externalIP is not None and len(contacts) >= constants.k:
|
||||||
is_closer = Distance(blob_hash).is_closer(self.id, contacts[-1].id)
|
is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id)
|
||||||
if is_closer:
|
if is_closer:
|
||||||
contacts.pop()
|
contacts.pop()
|
||||||
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
|
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id)
|
||||||
elif self.externalIP is not None:
|
elif self.externalIP is not None:
|
||||||
self.store(blob_hash, value, self_store=True, originalPublisherID=self.id)
|
self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id)
|
||||||
ds = []
|
ds = []
|
||||||
for contact in contacts:
|
for contact in contacts:
|
||||||
known_nodes[contact.id] = contact
|
known_nodes[contact.id] = contact
|
||||||
|
@ -456,7 +452,7 @@ class Node(object):
|
||||||
raise TypeError, 'No NodeID given. Therefore we can\'t store this node'
|
raise TypeError, 'No NodeID given. Therefore we can\'t store this node'
|
||||||
|
|
||||||
if self_store is True and self.externalIP:
|
if self_store is True and self.externalIP:
|
||||||
contact = Contact(self.id, self.externalIP, self.port, None, None)
|
contact = Contact(self.node_id, self.externalIP, self.port, None, None)
|
||||||
compact_ip = contact.compact_ip()
|
compact_ip = contact.compact_ip()
|
||||||
elif '_rpcNodeContact' in kwargs:
|
elif '_rpcNodeContact' in kwargs:
|
||||||
contact = kwargs['_rpcNodeContact']
|
contact = kwargs['_rpcNodeContact']
|
||||||
|
@ -583,7 +579,7 @@ class Node(object):
|
||||||
|
|
||||||
if startupShortlist is None:
|
if startupShortlist is None:
|
||||||
shortlist = self._routingTable.findCloseNodes(key, constants.alpha)
|
shortlist = self._routingTable.findCloseNodes(key, constants.alpha)
|
||||||
if key != self.id:
|
if key != self.node_id:
|
||||||
# Update the "last accessed" timestamp for the appropriate k-bucket
|
# Update the "last accessed" timestamp for the appropriate k-bucket
|
||||||
self._routingTable.touchKBucket(key)
|
self._routingTable.touchKBucket(key)
|
||||||
if len(shortlist) == 0:
|
if len(shortlist) == 0:
|
||||||
|
@ -679,7 +675,7 @@ class _IterativeFindHelper(object):
|
||||||
responseMsg = responseTuple[0]
|
responseMsg = responseTuple[0]
|
||||||
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
|
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
|
||||||
# Make sure the responding node is valid, and abort the operation if it isn't
|
# Make sure the responding node is valid, and abort the operation if it isn't
|
||||||
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id:
|
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.node_id:
|
||||||
return responseMsg.nodeID
|
return responseMsg.nodeID
|
||||||
|
|
||||||
# Mark this node as active
|
# Mark this node as active
|
||||||
|
|
|
@ -155,7 +155,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
C{ErrorMessage}).
|
C{ErrorMessage}).
|
||||||
@rtype: twisted.internet.defer.Deferred
|
@rtype: twisted.internet.defer.Deferred
|
||||||
"""
|
"""
|
||||||
msg = msgtypes.RequestMessage(self._node.id, method, args)
|
msg = msgtypes.RequestMessage(self._node.node_id, method, args)
|
||||||
msgPrimitive = self._translator.toPrimitive(msg)
|
msgPrimitive = self._translator.toPrimitive(msg)
|
||||||
encodedMsg = self._encoder.encode(msgPrimitive)
|
encodedMsg = self._encoder.encode(msgPrimitive)
|
||||||
|
|
||||||
|
@ -342,7 +342,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
def _sendResponse(self, contact, rpcID, response):
|
def _sendResponse(self, contact, rpcID, response):
|
||||||
""" Send a RPC response to the specified contact
|
""" Send a RPC response to the specified contact
|
||||||
"""
|
"""
|
||||||
msg = msgtypes.ResponseMessage(rpcID, self._node.id, response)
|
msg = msgtypes.ResponseMessage(rpcID, self._node.node_id, response)
|
||||||
msgPrimitive = self._translator.toPrimitive(msg)
|
msgPrimitive = self._translator.toPrimitive(msg)
|
||||||
encodedMsg = self._encoder.encode(msgPrimitive)
|
encodedMsg = self._encoder.encode(msgPrimitive)
|
||||||
self._send(encodedMsg, rpcID, (contact.address, contact.port))
|
self._send(encodedMsg, rpcID, (contact.address, contact.port))
|
||||||
|
@ -350,7 +350,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
def _sendError(self, contact, rpcID, exceptionType, exceptionMessage):
|
def _sendError(self, contact, rpcID, exceptionType, exceptionMessage):
|
||||||
""" Send an RPC error message to the specified contact
|
""" Send an RPC error message to the specified contact
|
||||||
"""
|
"""
|
||||||
msg = msgtypes.ErrorMessage(rpcID, self._node.id, exceptionType, exceptionMessage)
|
msg = msgtypes.ErrorMessage(rpcID, self._node.node_id, exceptionType, exceptionMessage)
|
||||||
msgPrimitive = self._translator.toPrimitive(msg)
|
msgPrimitive = self._translator.toPrimitive(msg)
|
||||||
encodedMsg = self._encoder.encode(msgPrimitive)
|
encodedMsg = self._encoder.encode(msgPrimitive)
|
||||||
self._send(encodedMsg, rpcID, (contact.address, contact.port))
|
self._send(encodedMsg, rpcID, (contact.address, contact.port))
|
||||||
|
|
Loading…
Add table
Reference in a new issue