diff --git a/CHANGELOG.md b/CHANGELOG.md index a04286501..3146ed558 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,8 +22,8 @@ at anytime. * ### Changed - * - * + * include all of our own blobs in the local dht datastore (as if we had announced them to ourselves) + * ignore dht `store` token validation errors for the first expiration-time after startup (fixes failed `store` requests after a restart) ### Added * diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index a5fe1e8ed..370a3ddeb 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -10,7 +10,7 @@ log = logging.getLogger(__name__) class DiskBlobManager(object): - def __init__(self, blob_dir, storage): + def __init__(self, blob_dir, storage, node_datastore=None): """ This class stores blobs on the hard disk @@ -19,6 +19,7 @@ class DiskBlobManager(object): """ self.storage = storage self.blob_dir = blob_dir + self._node_datastore = node_datastore self.blob_creator_type = BlobFileCreator # TODO: consider using an LRU for blobs as there could potentially # be thousands of blobs loaded up, many stale @@ -29,10 +30,14 @@ class DiskBlobManager(object): if conf.settings['run_reflector_server']: # TODO: move this looping call to SQLiteStorage self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs) + @defer.inlineCallbacks def setup(self): if self.check_should_announce_lc and not self.check_should_announce_lc.running: self.check_should_announce_lc.start(600) - return defer.succeed(True) + if self._node_datastore is not None: + raw_blob_hashes = yield self.storage.get_all_finished_blobs() + self._node_datastore.completed_blobs.update(raw_blob_hashes) + defer.returnValue(True) def stop(self): if self.check_should_announce_lc and self.check_should_announce_lc.running: @@ -63,6 +68,8 @@ class DiskBlobManager(object): yield self.storage.add_completed_blob( blob.blob_hash, blob.length, next_announce_time, should_announce ) + if self._node_datastore is not None: + self._node_datastore.completed_blobs.add(blob.blob_hash.decode('hex')) def completed_blobs(self, blobhashes_to_check): return self._completed_blobs(blobhashes_to_check) @@ -98,6 +105,11 @@ class DiskBlobManager(object): def delete_blobs(self, blob_hashes): bh_to_delete_from_db = [] for blob_hash in blob_hashes: + if self._node_datastore is not None: + try: + self._node_datastore.completed_blobs.remove(blob_hash.decode('hex')) + except KeyError: + pass try: blob = yield self.get_blob(blob_hash) yield blob.delete() diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 3e52b9649..d3a1febbc 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -242,7 +242,7 @@ class Session(object): raise Exception( "TempBlobManager is no longer supported, specify BlobManager or db_dir") else: - self.blob_manager = DiskBlobManager(self.blob_dir, self.storage) + self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore) # if self.blob_tracker is None: # self.blob_tracker = self.blob_tracker_class( diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 8241879cb..d2bbb5849 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -245,6 +245,13 @@ class SQLiteStorage(object): "select blob_hash from blob where should_announce=1 and status='finished'" ) + @defer.inlineCallbacks + def get_all_finished_blobs(self): + blob_hashes = yield self.run_and_return_list( + "select blob_hash from blob where status='finished'" + ) + defer.returnValue([blob_hash.decode('hex') for blob_hash in blob_hashes]) + def update_last_announced_blob(self, blob_hash, last_announced): return self.db.runOperation( "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?", diff --git a/lbrynet/dht/datastore.py b/lbrynet/dht/datastore.py index 72969a772..234eb3209 100644 --- a/lbrynet/dht/datastore.py +++ b/lbrynet/dht/datastore.py @@ -16,6 +16,7 @@ class DictDataStore(UserDict.DictMixin): from twisted.internet import reactor getTime = reactor.seconds self._getTime = getTime + self.completed_blobs = set() def keys(self): """ Return a list of the keys in this data store """ diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 39e8291b0..76763f9e5 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -518,7 +518,9 @@ class Node(MockKademliaHelper): if originalPublisherID is None: originalPublisherID = rpc_contact.id compact_ip = rpc_contact.compact_ip() - if not self.verify_token(token, compact_ip): + if self.clock.seconds() - self._protocol.started_listening_time < constants.tokenSecretChangeInterval: + pass + elif not self.verify_token(token, compact_ip): raise ValueError("Invalid token") if 0 <= port <= 65536: compact_port = str(struct.pack('>H', port)) @@ -577,8 +579,23 @@ class Node(MockKademliaHelper): if self._protocol._protocolVersion: response['protocolVersion'] = self._protocol._protocolVersion - if self._dataStore.hasPeersForBlob(key): - response[key] = self._dataStore.getPeersForBlob(key) + # get peers we have stored for this blob + has_other_peers = self._dataStore.hasPeersForBlob(key) + peers = [] + if has_other_peers: + peers.extend(self._dataStore.getPeersForBlob(key)) + + # if we don't have k storing peers to return and we have this hash locally, include our contact information + if len(peers) < constants.k and key in self._dataStore.completed_blobs: + compact_ip = str( + reduce(lambda buff, x: buff + bytearray([int(x)]), self.externalIP.split('.'), bytearray()) + ) + compact_port = str(struct.pack('>H', self.peerPort)) + compact_address = compact_ip + compact_port + self.node_id + peers.append(compact_address) + + if peers: + response[key] = peers else: response['contacts'] = self.findNode(rpc_contact, key) return response diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 73b81bc2c..40407d191 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -103,6 +103,7 @@ class KademliaProtocol(protocol.DatagramProtocol): self._listening = defer.Deferred(None) self._ping_queue = PingQueue(self._node) self._protocolVersion = constants.protocolVersion + self.started_listening_time = 0 def _migrate_incoming_rpc_args(self, contact, method, *args): if method == 'store' and contact.protocolVersion == 0: @@ -202,6 +203,7 @@ class KademliaProtocol(protocol.DatagramProtocol): if self._listening.called: self._listening = defer.Deferred() self._listening.callback(True) + self.started_listening_time = self._node.clock.seconds() return self._ping_queue.start() def datagramReceived(self, datagram, address):