forked from LBRYCommunity/lbry-sdk
Merge branch 'self-store'
This commit is contained in:
commit
396542dc26
7 changed files with 47 additions and 8 deletions
|
@ -22,8 +22,8 @@ at anytime.
|
|||
*
|
||||
|
||||
### Changed
|
||||
*
|
||||
*
|
||||
* include all of our own blobs in the local dht datastore (as if we had announced them to ourselves)
|
||||
* ignore dht `store` token validation errors for the first expiration-time after startup (fixes failed `store` requests after a restart)
|
||||
|
||||
### Added
|
||||
*
|
||||
|
|
|
@ -10,7 +10,7 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class DiskBlobManager(object):
|
||||
def __init__(self, blob_dir, storage):
|
||||
def __init__(self, blob_dir, storage, node_datastore=None):
|
||||
"""
|
||||
This class stores blobs on the hard disk
|
||||
|
||||
|
@ -19,6 +19,7 @@ class DiskBlobManager(object):
|
|||
"""
|
||||
self.storage = storage
|
||||
self.blob_dir = blob_dir
|
||||
self._node_datastore = node_datastore
|
||||
self.blob_creator_type = BlobFileCreator
|
||||
# TODO: consider using an LRU for blobs as there could potentially
|
||||
# be thousands of blobs loaded up, many stale
|
||||
|
@ -29,10 +30,14 @@ class DiskBlobManager(object):
|
|||
if conf.settings['run_reflector_server']: # TODO: move this looping call to SQLiteStorage
|
||||
self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def setup(self):
|
||||
if self.check_should_announce_lc and not self.check_should_announce_lc.running:
|
||||
self.check_should_announce_lc.start(600)
|
||||
return defer.succeed(True)
|
||||
if self._node_datastore is not None:
|
||||
raw_blob_hashes = yield self.storage.get_all_finished_blobs()
|
||||
self._node_datastore.completed_blobs.update(raw_blob_hashes)
|
||||
defer.returnValue(True)
|
||||
|
||||
def stop(self):
|
||||
if self.check_should_announce_lc and self.check_should_announce_lc.running:
|
||||
|
@ -63,6 +68,8 @@ class DiskBlobManager(object):
|
|||
yield self.storage.add_completed_blob(
|
||||
blob.blob_hash, blob.length, next_announce_time, should_announce
|
||||
)
|
||||
if self._node_datastore is not None:
|
||||
self._node_datastore.completed_blobs.add(blob.blob_hash.decode('hex'))
|
||||
|
||||
def completed_blobs(self, blobhashes_to_check):
|
||||
return self._completed_blobs(blobhashes_to_check)
|
||||
|
@ -98,6 +105,11 @@ class DiskBlobManager(object):
|
|||
def delete_blobs(self, blob_hashes):
|
||||
bh_to_delete_from_db = []
|
||||
for blob_hash in blob_hashes:
|
||||
if self._node_datastore is not None:
|
||||
try:
|
||||
self._node_datastore.completed_blobs.remove(blob_hash.decode('hex'))
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
blob = yield self.get_blob(blob_hash)
|
||||
yield blob.delete()
|
||||
|
|
|
@ -242,7 +242,7 @@ class Session(object):
|
|||
raise Exception(
|
||||
"TempBlobManager is no longer supported, specify BlobManager or db_dir")
|
||||
else:
|
||||
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)
|
||||
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore)
|
||||
|
||||
# if self.blob_tracker is None:
|
||||
# self.blob_tracker = self.blob_tracker_class(
|
||||
|
|
|
@ -245,6 +245,13 @@ class SQLiteStorage(object):
|
|||
"select blob_hash from blob where should_announce=1 and status='finished'"
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_all_finished_blobs(self):
|
||||
blob_hashes = yield self.run_and_return_list(
|
||||
"select blob_hash from blob where status='finished'"
|
||||
)
|
||||
defer.returnValue([blob_hash.decode('hex') for blob_hash in blob_hashes])
|
||||
|
||||
def update_last_announced_blob(self, blob_hash, last_announced):
|
||||
return self.db.runOperation(
|
||||
"update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?",
|
||||
|
|
|
@ -16,6 +16,7 @@ class DictDataStore(UserDict.DictMixin):
|
|||
from twisted.internet import reactor
|
||||
getTime = reactor.seconds
|
||||
self._getTime = getTime
|
||||
self.completed_blobs = set()
|
||||
|
||||
def keys(self):
|
||||
""" Return a list of the keys in this data store """
|
||||
|
|
|
@ -518,7 +518,9 @@ class Node(MockKademliaHelper):
|
|||
if originalPublisherID is None:
|
||||
originalPublisherID = rpc_contact.id
|
||||
compact_ip = rpc_contact.compact_ip()
|
||||
if not self.verify_token(token, compact_ip):
|
||||
if self.clock.seconds() - self._protocol.started_listening_time < constants.tokenSecretChangeInterval:
|
||||
pass
|
||||
elif not self.verify_token(token, compact_ip):
|
||||
raise ValueError("Invalid token")
|
||||
if 0 <= port <= 65536:
|
||||
compact_port = str(struct.pack('>H', port))
|
||||
|
@ -577,8 +579,23 @@ class Node(MockKademliaHelper):
|
|||
if self._protocol._protocolVersion:
|
||||
response['protocolVersion'] = self._protocol._protocolVersion
|
||||
|
||||
if self._dataStore.hasPeersForBlob(key):
|
||||
response[key] = self._dataStore.getPeersForBlob(key)
|
||||
# get peers we have stored for this blob
|
||||
has_other_peers = self._dataStore.hasPeersForBlob(key)
|
||||
peers = []
|
||||
if has_other_peers:
|
||||
peers.extend(self._dataStore.getPeersForBlob(key))
|
||||
|
||||
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
||||
if len(peers) < constants.k and key in self._dataStore.completed_blobs:
|
||||
compact_ip = str(
|
||||
reduce(lambda buff, x: buff + bytearray([int(x)]), self.externalIP.split('.'), bytearray())
|
||||
)
|
||||
compact_port = str(struct.pack('>H', self.peerPort))
|
||||
compact_address = compact_ip + compact_port + self.node_id
|
||||
peers.append(compact_address)
|
||||
|
||||
if peers:
|
||||
response[key] = peers
|
||||
else:
|
||||
response['contacts'] = self.findNode(rpc_contact, key)
|
||||
return response
|
||||
|
|
|
@ -103,6 +103,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
self._listening = defer.Deferred(None)
|
||||
self._ping_queue = PingQueue(self._node)
|
||||
self._protocolVersion = constants.protocolVersion
|
||||
self.started_listening_time = 0
|
||||
|
||||
def _migrate_incoming_rpc_args(self, contact, method, *args):
|
||||
if method == 'store' and contact.protocolVersion == 0:
|
||||
|
@ -202,6 +203,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
if self._listening.called:
|
||||
self._listening = defer.Deferred()
|
||||
self._listening.callback(True)
|
||||
self.started_listening_time = self._node.clock.seconds()
|
||||
return self._ping_queue.start()
|
||||
|
||||
def datagramReceived(self, datagram, address):
|
||||
|
|
Loading…
Reference in a new issue