From 4b8700268ffa7dc79fd390240252f5590398a6f1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 20 Sep 2017 22:04:23 -0400 Subject: [PATCH] discover stream info for uploads via reflector v1 --- lbrynet/core/BlobManager.py | 38 +++++++++++++--- lbrynet/reflector/server/server.py | 71 +++++++++++++++++++++++++++--- 2 files changed, 98 insertions(+), 11 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index f73b6909f..f6c329dc1 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -88,6 +88,18 @@ class DiskBlobManager(DHTHashSupplier): def hashes_to_announce(self): return self._get_blobs_to_announce() + def set_should_announce(self, blob_hash, should_announce): + if blob_hash in self.blobs: + blob = self.blobs[blob_hash] + if blob.get_is_verified(): + return self._set_should_announce(blob_hash, + self.get_next_announce_time(), + should_announce) + return defer.succeed(False) + + def get_should_announce(self, blob_hash): + return self._should_announce(blob_hash) + def creator_finished(self, blob_creator, should_announce): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) if blob_creator.blob_hash is None: @@ -170,14 +182,29 @@ class DiskBlobManager(DHTHashSupplier): def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce): log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) should_announce = 1 if should_announce else 0 - d = self.db_conn.runQuery( - "insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+ - "values (?, ?, ?, ?)", - (blob_hash, length, next_announce_time, should_announce) - ) + d = self.db_conn.runQuery("insert into blobs (blob_hash, blob_length, next_announce_time, " + "should_announce) values (?, ?, ?, ?)", (blob_hash, length, + next_announce_time, + should_announce)) + # TODO: why is this here? d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) return d + @rerun_if_locked + @defer.inlineCallbacks + def _set_should_announce(self, blob_hash, next_announce_time, should_announce): + yield self.db_conn.runOperation("update blobs set next_announce_time=?, should_announce=? " + "where blob_hash=?", (next_announce_time, should_announce, + blob_hash)) + defer.returnValue(True) + + @rerun_if_locked + @defer.inlineCallbacks + def _should_announce(self, blob_hash): + result = yield self.db_conn.runQuery("select should_announce from blobs where blob_hash=?", + (blob_hash,)) + defer.returnValue(result[0][0]) + @defer.inlineCallbacks def _completed_blobs(self, blobhashes_to_check): """Returns of the blobhashes_to_check, which are valid""" @@ -192,7 +219,6 @@ class DiskBlobManager(DHTHashSupplier): @rerun_if_locked def _get_blobs_to_announce(self): - def get_and_update(transaction): timestamp = time.time() if self.announce_head_blobs_only is True: diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index f03284b48..a22b93add 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -4,7 +4,7 @@ from twisted.python import failure from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ServerFactory from lbrynet.core.utils import is_valid_blobhash -from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError +from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader from lbrynet.lbry_file.StreamDescriptor import save_sd_info from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2 @@ -64,6 +64,41 @@ class ReflectorServer(Protocol): else: log.exception(err) + @defer.inlineCallbacks + def check_head_blob_announce(self, stream_hash): + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) + blob_hash, blob_num, blob_iv, blob_length = blob_infos[0] + if blob_hash in self.blob_manager.blobs: + head_blob = self.blob_manager.blobs[blob_hash] + if head_blob.get_is_verified(): + should_announce = yield self.blob_manager.get_should_announce(blob_hash) + if should_announce == 0: + yield self.blob_manager.set_should_announce(blob_hash, 1) + log.info("Discovered previously completed head blob (%s), " + "setting it to be announced", blob_hash[:8]) + defer.returnValue(None) + + @defer.inlineCallbacks + def check_sd_blob_announce(self, sd_hash): + if sd_hash in self.blob_manager.blobs: + sd_blob = self.blob_manager.blobs[sd_hash] + if sd_blob.get_is_verified(): + should_announce = yield self.blob_manager.get_should_announce(sd_hash) + if should_announce == 0: + yield self.blob_manager.set_should_announce(sd_hash, 1) + log.info("Discovered previously completed sd blob (%s), " + "setting it to be announced", sd_hash[:8]) + try: + yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + except NoSuchSDHash: + log.info("Adding blobs to stream") + sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream( + sd_info['stream_hash'], + sd_hash) + defer.returnValue(None) + @defer.inlineCallbacks def _on_completed_blob(self, blob, response_key): should_announce = False @@ -74,16 +109,29 @@ class ReflectorServer(Protocol): blob.blob_hash) should_announce = True + # if we already have the head blob, set it to be announced now that we know it's + # a head blob + d = self.check_head_blob_announce(sd_info['stream_hash']) + else: + d = defer.succeed(None) stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash) if stream_hash is not None: blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash, blob.blob_hash) if blob_num == 0: should_announce = True + sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream( + stream_hash) + + # if we already have the sd blob, set it to be announced now that we know it's + # a sd blob + for sd_hash in sd_hashes: + d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash)) yield self.blob_manager.blob_completed(blob, should_announce=should_announce) yield self.close_blob() + yield d log.info("Received %s", blob) yield self.send_response({response_key: True}) @@ -249,16 +297,29 @@ class ReflectorServer(Protocol): d = self.blob_finished_d return d + @defer.inlineCallbacks def get_descriptor_response(self, sd_blob): if sd_blob.get_is_verified(): - d = defer.succeed({SEND_SD_BLOB: False}) - d.addCallback(self.request_needed_blobs, sd_blob) + # if we already have the sd blob being offered, make sure we have it and the head blob + # marked as such for announcement now that we know it's an sd blob that we have. + yield self.check_sd_blob_announce(sd_blob.blob_hash) + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash( + sd_blob.blob_hash) + except NoSuchSDHash: + sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() + stream_hash = sd_info['stream_hash'] + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, + sd_blob.blob_hash) + yield self.check_head_blob_announce(stream_hash) + response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob) else: self.incoming_blob = sd_blob self.receiving_blob = True self.handle_incoming_blob(RECEIVED_SD_BLOB) - d = defer.succeed({SEND_SD_BLOB: True}) - return d + response = {SEND_SD_BLOB: True} + defer.returnValue(response) def request_needed_blobs(self, response, sd_blob): def _add_needed_blobs_to_response(needed_blobs):