discover stream info for uploads via reflector v1
This commit is contained in:
parent
474ce21a51
commit
4b8700268f
2 changed files with 98 additions and 11 deletions
|
@ -88,6 +88,18 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
def hashes_to_announce(self):
|
||||
return self._get_blobs_to_announce()
|
||||
|
||||
def set_should_announce(self, blob_hash, should_announce):
|
||||
if blob_hash in self.blobs:
|
||||
blob = self.blobs[blob_hash]
|
||||
if blob.get_is_verified():
|
||||
return self._set_should_announce(blob_hash,
|
||||
self.get_next_announce_time(),
|
||||
should_announce)
|
||||
return defer.succeed(False)
|
||||
|
||||
def get_should_announce(self, blob_hash):
|
||||
return self._should_announce(blob_hash)
|
||||
|
||||
def creator_finished(self, blob_creator, should_announce):
|
||||
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
|
||||
if blob_creator.blob_hash is None:
|
||||
|
@ -170,14 +182,29 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce):
|
||||
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
|
||||
should_announce = 1 if should_announce else 0
|
||||
d = self.db_conn.runQuery(
|
||||
"insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+
|
||||
"values (?, ?, ?, ?)",
|
||||
(blob_hash, length, next_announce_time, should_announce)
|
||||
)
|
||||
d = self.db_conn.runQuery("insert into blobs (blob_hash, blob_length, next_announce_time, "
|
||||
"should_announce) values (?, ?, ?, ?)", (blob_hash, length,
|
||||
next_announce_time,
|
||||
should_announce))
|
||||
# TODO: why is this here?
|
||||
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
@defer.inlineCallbacks
|
||||
def _set_should_announce(self, blob_hash, next_announce_time, should_announce):
|
||||
yield self.db_conn.runOperation("update blobs set next_announce_time=?, should_announce=? "
|
||||
"where blob_hash=?", (next_announce_time, should_announce,
|
||||
blob_hash))
|
||||
defer.returnValue(True)
|
||||
|
||||
@rerun_if_locked
|
||||
@defer.inlineCallbacks
|
||||
def _should_announce(self, blob_hash):
|
||||
result = yield self.db_conn.runQuery("select should_announce from blobs where blob_hash=?",
|
||||
(blob_hash,))
|
||||
defer.returnValue(result[0][0])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _completed_blobs(self, blobhashes_to_check):
|
||||
"""Returns of the blobhashes_to_check, which are valid"""
|
||||
|
@ -192,7 +219,6 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
|
||||
@rerun_if_locked
|
||||
def _get_blobs_to_announce(self):
|
||||
|
||||
def get_and_update(transaction):
|
||||
timestamp = time.time()
|
||||
if self.announce_head_blobs_only is True:
|
||||
|
|
|
@ -4,7 +4,7 @@ from twisted.python import failure
|
|||
from twisted.internet import error, defer
|
||||
from twisted.internet.protocol import Protocol, ServerFactory
|
||||
from lbrynet.core.utils import is_valid_blobhash
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash
|
||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
|
||||
from lbrynet.lbry_file.StreamDescriptor import save_sd_info
|
||||
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
||||
|
@ -64,6 +64,41 @@ class ReflectorServer(Protocol):
|
|||
else:
|
||||
log.exception(err)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_head_blob_announce(self, stream_hash):
|
||||
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
|
||||
blob_hash, blob_num, blob_iv, blob_length = blob_infos[0]
|
||||
if blob_hash in self.blob_manager.blobs:
|
||||
head_blob = self.blob_manager.blobs[blob_hash]
|
||||
if head_blob.get_is_verified():
|
||||
should_announce = yield self.blob_manager.get_should_announce(blob_hash)
|
||||
if should_announce == 0:
|
||||
yield self.blob_manager.set_should_announce(blob_hash, 1)
|
||||
log.info("Discovered previously completed head blob (%s), "
|
||||
"setting it to be announced", blob_hash[:8])
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_sd_blob_announce(self, sd_hash):
|
||||
if sd_hash in self.blob_manager.blobs:
|
||||
sd_blob = self.blob_manager.blobs[sd_hash]
|
||||
if sd_blob.get_is_verified():
|
||||
should_announce = yield self.blob_manager.get_should_announce(sd_hash)
|
||||
if should_announce == 0:
|
||||
yield self.blob_manager.set_should_announce(sd_hash, 1)
|
||||
log.info("Discovered previously completed sd blob (%s), "
|
||||
"setting it to be announced", sd_hash[:8])
|
||||
try:
|
||||
yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
|
||||
except NoSuchSDHash:
|
||||
log.info("Adding blobs to stream")
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(
|
||||
sd_info['stream_hash'],
|
||||
sd_hash)
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_completed_blob(self, blob, response_key):
|
||||
should_announce = False
|
||||
|
@ -74,16 +109,29 @@ class ReflectorServer(Protocol):
|
|||
blob.blob_hash)
|
||||
should_announce = True
|
||||
|
||||
# if we already have the head blob, set it to be announced now that we know it's
|
||||
# a head blob
|
||||
d = self.check_head_blob_announce(sd_info['stream_hash'])
|
||||
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash)
|
||||
if stream_hash is not None:
|
||||
blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash,
|
||||
blob.blob_hash)
|
||||
if blob_num == 0:
|
||||
should_announce = True
|
||||
sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(
|
||||
stream_hash)
|
||||
|
||||
# if we already have the sd blob, set it to be announced now that we know it's
|
||||
# a sd blob
|
||||
for sd_hash in sd_hashes:
|
||||
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
|
||||
|
||||
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
|
||||
yield self.close_blob()
|
||||
yield d
|
||||
log.info("Received %s", blob)
|
||||
yield self.send_response({response_key: True})
|
||||
|
||||
|
@ -249,16 +297,29 @@ class ReflectorServer(Protocol):
|
|||
d = self.blob_finished_d
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_descriptor_response(self, sd_blob):
|
||||
if sd_blob.get_is_verified():
|
||||
d = defer.succeed({SEND_SD_BLOB: False})
|
||||
d.addCallback(self.request_needed_blobs, sd_blob)
|
||||
# if we already have the sd blob being offered, make sure we have it and the head blob
|
||||
# marked as such for announcement now that we know it's an sd blob that we have.
|
||||
yield self.check_sd_blob_announce(sd_blob.blob_hash)
|
||||
try:
|
||||
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(
|
||||
sd_blob.blob_hash)
|
||||
except NoSuchSDHash:
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
stream_hash = sd_info['stream_hash']
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash,
|
||||
sd_blob.blob_hash)
|
||||
yield self.check_head_blob_announce(stream_hash)
|
||||
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob)
|
||||
else:
|
||||
self.incoming_blob = sd_blob
|
||||
self.receiving_blob = True
|
||||
self.handle_incoming_blob(RECEIVED_SD_BLOB)
|
||||
d = defer.succeed({SEND_SD_BLOB: True})
|
||||
return d
|
||||
response = {SEND_SD_BLOB: True}
|
||||
defer.returnValue(response)
|
||||
|
||||
def request_needed_blobs(self, response, sd_blob):
|
||||
def _add_needed_blobs_to_response(needed_blobs):
|
||||
|
|
Loading…
Add table
Reference in a new issue