load all finished blobs into the node datastore

This commit is contained in:
Jack Robison 2018-06-29 12:01:46 -04:00
parent 03769b94b8
commit 7fe92d2df0
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 42 additions and 7 deletions

View file

@ -22,8 +22,8 @@ at anytime.
*
### Changed
*
*
* include all of our own blobs in the local dht datastore (as if we had announced them to ourselves)
* ignore dht `store` token validation errors for the first expiration-time after startup (fixes failed `store` requests after a restart)
### Added
*

View file

@ -10,7 +10,7 @@ log = logging.getLogger(__name__)
class DiskBlobManager(object):
def __init__(self, blob_dir, storage):
def __init__(self, blob_dir, storage, node_datastore=None):
"""
This class stores blobs on the hard disk
@ -19,6 +19,7 @@ class DiskBlobManager(object):
"""
self.storage = storage
self.blob_dir = blob_dir
self._node_datastore = node_datastore
self.blob_creator_type = BlobFileCreator
# TODO: consider using an LRU for blobs as there could potentially
# be thousands of blobs loaded up, many stale
@ -29,10 +30,14 @@ class DiskBlobManager(object):
if conf.settings['run_reflector_server']: # TODO: move this looping call to SQLiteStorage
self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs)
@defer.inlineCallbacks
def setup(self):
if self.check_should_announce_lc and not self.check_should_announce_lc.running:
self.check_should_announce_lc.start(600)
return defer.succeed(True)
if self._node_datastore is not None:
raw_blob_hashes = yield self.storage.get_all_finished_blobs()
self._node_datastore.completed_blobs.update(raw_blob_hashes)
defer.returnValue(True)
def stop(self):
if self.check_should_announce_lc and self.check_should_announce_lc.running:
@ -63,6 +68,8 @@ class DiskBlobManager(object):
yield self.storage.add_completed_blob(
blob.blob_hash, blob.length, next_announce_time, should_announce
)
if self._node_datastore is not None:
self._node_datastore.completed_blobs.add(blob.blob_hash.decode('hex'))
def completed_blobs(self, blobhashes_to_check):
return self._completed_blobs(blobhashes_to_check)
@ -98,6 +105,11 @@ class DiskBlobManager(object):
def delete_blobs(self, blob_hashes):
bh_to_delete_from_db = []
for blob_hash in blob_hashes:
if self._node_datastore is not None:
try:
self._node_datastore.completed_blobs.remove(blob_hash.decode('hex'))
except KeyError:
pass
try:
blob = yield self.get_blob(blob_hash)
yield blob.delete()

View file

@ -242,7 +242,7 @@ class Session(object):
raise Exception(
"TempBlobManager is no longer supported, specify BlobManager or db_dir")
else:
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore)
# if self.blob_tracker is None:
# self.blob_tracker = self.blob_tracker_class(

View file

@ -245,6 +245,13 @@ class SQLiteStorage(object):
"select blob_hash from blob where should_announce=1 and status='finished'"
)
@defer.inlineCallbacks
def get_all_finished_blobs(self):
blob_hashes = yield self.run_and_return_list(
"select blob_hash from blob where status='finished'"
)
defer.returnValue([blob_hash.decode('hex') for blob_hash in blob_hashes])
def update_last_announced_blob(self, blob_hash, last_announced):
return self.db.runOperation(
"update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?",

View file

@ -16,6 +16,7 @@ class DictDataStore(UserDict.DictMixin):
from twisted.internet import reactor
getTime = reactor.seconds
self._getTime = getTime
self.completed_blobs = set()
def keys(self):
""" Return a list of the keys in this data store """

View file

@ -579,8 +579,23 @@ class Node(MockKademliaHelper):
if self._protocol._protocolVersion:
response['protocolVersion'] = self._protocol._protocolVersion
if self._dataStore.hasPeersForBlob(key):
response[key] = self._dataStore.getPeersForBlob(key)
# get peers we have stored for this blob
has_other_peers = self._dataStore.hasPeersForBlob(key)
peers = []
if has_other_peers:
peers.extend(self._dataStore.getPeersForBlob(key))
# if we don't have k storing peers to return and we have this hash locally, include our contact information
if len(peers) < constants.k and key in self._dataStore.completed_blobs:
compact_ip = str(
reduce(lambda buff, x: buff + bytearray([int(x)]), self.externalIP.split('.'), bytearray())
)
compact_port = str(struct.pack('>H', self.peerPort))
compact_address = compact_ip + compact_port + self.node_id
peers.append(compact_address)
if peers:
response[key] = peers
else:
response['contacts'] = self.findNode(rpc_contact, key)
return response