verify_will_announce_head_and_sd_blobs
This commit is contained in:
parent
cdfb7f4cb4
commit
81de5fbbf4
5 changed files with 78 additions and 104 deletions
|
@ -15,14 +15,16 @@ at anytime.
|
|||
### Fixed
|
||||
* fixed the inconsistencies in API and CLI docstrings
|
||||
* `blob_announce` error when announcing a single blob
|
||||
* `blob_list` error when looking up blobs by stream or sd hash
|
||||
|
||||
### Deprecated
|
||||
* `report_bug` jsonrpc command
|
||||
*
|
||||
|
||||
### Changed
|
||||
*
|
||||
*
|
||||
* reflector server to periodically check and set `should_announce` for sd and head blobs instead of during each request
|
||||
* reflector server to use `SQLiteStorage` to find needed blob hashes for a stream
|
||||
|
||||
### Added
|
||||
* scripts to autogenerate documentation
|
||||
*
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import logging
|
||||
import os
|
||||
from sqlite3 import IntegrityError
|
||||
from twisted.internet import threads, defer, reactor
|
||||
from twisted.internet import threads, defer, reactor, task
|
||||
from lbrynet import conf
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
from lbrynet.blob.creator import BlobFileCreator
|
||||
|
@ -29,10 +29,18 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
self.blobs = {}
|
||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
||||
|
||||
self.check_should_announce_lc = None
|
||||
if conf.settings['run_reflector_server']:
|
||||
self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs)
|
||||
|
||||
def setup(self):
|
||||
if self.check_should_announce_lc and not self.check_should_announce_lc.running:
|
||||
self.check_should_announce_lc.start(600)
|
||||
return defer.succeed(True)
|
||||
|
||||
def stop(self):
|
||||
if self.check_should_announce_lc and self.check_should_announce_lc.running:
|
||||
self.check_should_announce_lc.stop()
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_blob(self, blob_hash, length=None):
|
||||
|
|
|
@ -331,6 +331,7 @@ class Session(object):
|
|||
|
||||
self.rate_limiter.start()
|
||||
d = self.storage.setup()
|
||||
d.addCallback(lambda _: self.blob_manager.setup())
|
||||
d.addCallback(lambda _: self.wallet.start())
|
||||
d.addCallback(lambda _: self.blob_tracker.start())
|
||||
return d
|
||||
|
|
|
@ -261,12 +261,12 @@ class SQLiteStorage(object):
|
|||
|
||||
def count_should_announce_blobs(self):
|
||||
return self.run_and_return_one_or_none(
|
||||
"select count(*) from blob where should_announce=1 and status=?", "finished"
|
||||
"select count(*) from blob where should_announce=1 and status='finished'"
|
||||
)
|
||||
|
||||
def get_all_should_announce_blobs(self):
|
||||
return self.run_and_return_list(
|
||||
"select blob_hash from blob where should_announce=1 and status=?", "finished"
|
||||
"select blob_hash from blob where should_announce=1 and status='finished'"
|
||||
)
|
||||
|
||||
def get_blobs_to_announce(self, hash_announcer):
|
||||
|
@ -275,12 +275,13 @@ class SQLiteStorage(object):
|
|||
if conf.settings['announce_head_blobs_only']:
|
||||
r = transaction.execute(
|
||||
"select blob_hash from blob "
|
||||
"where blob_hash is not null and should_announce=1 and next_announce_time<?",
|
||||
"where blob_hash is not null and should_announce=1 and next_announce_time<? and status='finished'",
|
||||
(timestamp,)
|
||||
)
|
||||
else:
|
||||
r = transaction.execute(
|
||||
"select blob_hash from blob where blob_hash is not null and next_announce_time<?", (timestamp,)
|
||||
"select blob_hash from blob where blob_hash is not null "
|
||||
"and next_announce_time<? and status='finished'", (timestamp,)
|
||||
)
|
||||
|
||||
blobs = [b for b, in r.fetchall()]
|
||||
|
@ -319,6 +320,30 @@ class SQLiteStorage(object):
|
|||
if blob_info.get('blob_hash') and blob_info['length']:
|
||||
yield self.add_known_blob(blob_info['blob_hash'], blob_info['length'])
|
||||
|
||||
def verify_will_announce_head_and_sd_blobs(self, stream_hash):
|
||||
# fix should_announce for imported head and sd blobs
|
||||
return self.db.runOperation(
|
||||
"update blob set should_announce=1 "
|
||||
"where should_announce=0 and "
|
||||
"blob.blob_hash in "
|
||||
" (select b.blob_hash from blob b inner join stream s on b.blob_hash=s.sd_hash and s.stream_hash=?) "
|
||||
"or blob.blob_hash in "
|
||||
" (select b.blob_hash from blob b "
|
||||
" inner join stream_blob s2 on b.blob_hash=s2.blob_hash and s2.position=0 and s2.stream_hash=?)",
|
||||
(stream_hash, stream_hash)
|
||||
)
|
||||
|
||||
def verify_will_announce_all_head_and_sd_blobs(self):
|
||||
return self.db.runOperation(
|
||||
"update blob set should_announce=1 "
|
||||
"where should_announce=0 and "
|
||||
"blob.blob_hash in "
|
||||
" (select b.blob_hash from blob b inner join stream s on b.blob_hash=s.sd_hash) "
|
||||
"or blob.blob_hash in "
|
||||
" (select b.blob_hash from blob b "
|
||||
" inner join stream_blob s2 on b.blob_hash=s2.blob_hash and s2.position=0)"
|
||||
)
|
||||
|
||||
# # # # # # # # # stream functions # # # # # # # # #
|
||||
|
||||
def store_stream(self, stream_hash, sd_hash, stream_name, stream_key, suggested_file_name,
|
||||
|
@ -392,25 +417,38 @@ class SQLiteStorage(object):
|
|||
def get_blobs_for_stream(self, stream_hash, only_completed=False):
|
||||
def _get_blobs_for_stream(transaction):
|
||||
crypt_blob_infos = []
|
||||
stream_blobs = transaction.execute(
|
||||
"select blob_hash, position, iv from stream_blob where stream_hash=?", (stream_hash, )
|
||||
).fetchall()
|
||||
if only_completed:
|
||||
query = "select blob_hash, position, iv from stream_blob where stream_hash=? and status='finished'"
|
||||
lengths = transaction.execute(
|
||||
"select b.blob_hash, b.blob_length from blob b "
|
||||
"inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished'"
|
||||
).fetchall()
|
||||
else:
|
||||
query = "select blob_hash, position, iv from stream_blob where stream_hash=?"
|
||||
stream_blobs = transaction.execute(query, (stream_hash, )).fetchall()
|
||||
if stream_blobs:
|
||||
for blob_hash, position, iv in stream_blobs:
|
||||
if blob_hash is not None:
|
||||
blob_length = transaction.execute("select blob_length from blob "
|
||||
"where blob_hash=?",
|
||||
(blob_hash,)).fetchone()
|
||||
blob_length = 0 if not blob_length else blob_length[0]
|
||||
crypt_blob_infos.append(CryptBlobInfo(blob_hash, position, blob_length, iv))
|
||||
else:
|
||||
crypt_blob_infos.append(CryptBlobInfo(None, position, 0, iv))
|
||||
crypt_blob_infos = sorted(crypt_blob_infos, key=lambda info: info.blob_num)
|
||||
lengths = transaction.execute(
|
||||
"select b.blob_hash, b.blob_length from blob b "
|
||||
"inner join stream_blob s ON b.blob_hash=s.blob_hash"
|
||||
).fetchall()
|
||||
|
||||
blob_length_dict = {}
|
||||
for blob_hash, length in lengths:
|
||||
blob_length_dict[blob_hash] = length
|
||||
|
||||
for blob_hash, position, iv in stream_blobs:
|
||||
blob_length = blob_length_dict.get(blob_hash, 0)
|
||||
crypt_blob_infos.append(CryptBlobInfo(blob_hash, position, blob_length, iv))
|
||||
crypt_blob_infos = sorted(crypt_blob_infos, key=lambda info: info.blob_num)
|
||||
return crypt_blob_infos
|
||||
return self.db.runInteraction(_get_blobs_for_stream)
|
||||
|
||||
def get_pending_blobs_for_stream(self, stream_hash):
|
||||
return self.run_and_return_list(
|
||||
"select s.blob_hash from stream_blob s where stream_hash=? "
|
||||
"inner join blob b on b.blob_hash=s.blob_hash and b.status='pending'",
|
||||
stream_hash
|
||||
)
|
||||
|
||||
def get_stream_of_blob(self, blob_hash):
|
||||
return self.run_and_return_one_or_none(
|
||||
"select stream_hash from stream_blob where blob_hash=?", blob_hash
|
||||
|
|
|
@ -4,7 +4,7 @@ from twisted.python import failure
|
|||
from twisted.internet import error, defer
|
||||
from twisted.internet.protocol import Protocol, ServerFactory
|
||||
from lbrynet.core.utils import is_valid_blobhash
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
|
||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
|
||||
from lbrynet.core.StreamDescriptor import save_sd_info
|
||||
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
||||
|
@ -65,36 +65,6 @@ class ReflectorServer(Protocol):
|
|||
else:
|
||||
log.exception(err)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_head_blob_announce(self, stream_hash):
|
||||
head_blob_hash = yield self.storage.get_stream_blob_by_position(stream_hash, 0)
|
||||
if head_blob_hash in self.blob_manager.blobs:
|
||||
head_blob = self.blob_manager.blobs[head_blob_hash]
|
||||
if head_blob.get_is_verified():
|
||||
should_announce = yield self.blob_manager.get_should_announce(head_blob_hash)
|
||||
if should_announce == 0:
|
||||
yield self.blob_manager.set_should_announce(head_blob_hash, 1)
|
||||
log.info("Discovered previously completed head blob (%s), "
|
||||
"setting it to be announced", head_blob_hash[:8])
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_sd_blob_announce(self, sd_hash):
|
||||
if sd_hash in self.blob_manager.blobs:
|
||||
sd_blob = self.blob_manager.blobs[sd_hash]
|
||||
if sd_blob.get_is_verified():
|
||||
should_announce = yield self.blob_manager.get_should_announce(sd_hash)
|
||||
if should_announce == 0:
|
||||
yield self.blob_manager.set_should_announce(sd_hash, 1)
|
||||
log.info("Discovered previously completed sd blob (%s), "
|
||||
"setting it to be announced", sd_hash[:8])
|
||||
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||
if not stream_hash:
|
||||
log.info("Adding blobs to stream")
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
yield save_sd_info(self.blob_manager, sd_hash, sd_info)
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_completed_blob(self, blob, response_key):
|
||||
yield self.blob_manager.blob_completed(blob, should_announce=False)
|
||||
|
@ -103,27 +73,15 @@ class ReflectorServer(Protocol):
|
|||
sd_info = yield BlobStreamDescriptorReader(blob).get_info()
|
||||
yield save_sd_info(self.blob_manager, blob.blob_hash, sd_info)
|
||||
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
|
||||
|
||||
# if we already have the head blob, set it to be announced now that we know it's
|
||||
# a head blob
|
||||
d = self.check_head_blob_announce(sd_info['stream_hash'])
|
||||
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
stream_hash = yield self.storage.get_stream_of_blob(blob.blob_hash)
|
||||
if stream_hash is not None:
|
||||
blob_num = yield self.storage.get_blob_num_by_hash(stream_hash,
|
||||
blob.blob_hash)
|
||||
if blob_num == 0:
|
||||
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash)
|
||||
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
|
||||
|
||||
# if we already have the sd blob, set it to be announced now that we know it's
|
||||
# a sd blob
|
||||
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
|
||||
|
||||
yield self.close_blob()
|
||||
yield d
|
||||
log.info("Received %s", blob)
|
||||
yield self.send_response({response_key: True})
|
||||
|
||||
|
@ -238,7 +196,6 @@ class ReflectorServer(Protocol):
|
|||
if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]:
|
||||
raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION]))
|
||||
|
||||
|
||||
self.peer_version = int(request_dict[VERSION])
|
||||
log.debug('Handling handshake for client version %i', self.peer_version)
|
||||
self.received_handshake = True
|
||||
|
@ -292,17 +249,9 @@ class ReflectorServer(Protocol):
|
|||
@defer.inlineCallbacks
|
||||
def get_descriptor_response(self, sd_blob):
|
||||
if sd_blob.get_is_verified():
|
||||
# if we already have the sd blob being offered, make sure we have it and the head blob
|
||||
# marked as such for announcement now that we know it's an sd blob that we have.
|
||||
yield self.check_sd_blob_announce(sd_blob.blob_hash)
|
||||
try:
|
||||
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(
|
||||
sd_blob.blob_hash)
|
||||
except NoSuchSDHash:
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
stream_hash = sd_info['stream_hash']
|
||||
yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info)
|
||||
yield self.check_head_blob_announce(stream_hash)
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info)
|
||||
yield self.storage.verify_will_announce_head_and_sd_blobs(sd_info['stream_hash'])
|
||||
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob)
|
||||
else:
|
||||
self.incoming_blob = sd_blob
|
||||
|
@ -311,35 +260,11 @@ class ReflectorServer(Protocol):
|
|||
response = {SEND_SD_BLOB: True}
|
||||
defer.returnValue(response)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def request_needed_blobs(self, response, sd_blob):
|
||||
def _add_needed_blobs_to_response(needed_blobs):
|
||||
response.update({NEEDED_BLOBS: needed_blobs})
|
||||
return response
|
||||
|
||||
d = self.determine_missing_blobs(sd_blob)
|
||||
d.addCallback(_add_needed_blobs_to_response)
|
||||
return d
|
||||
|
||||
def determine_missing_blobs(self, sd_blob):
|
||||
reader = sd_blob.open_for_reading()
|
||||
sd_blob_data = reader.read()
|
||||
reader.close()
|
||||
decoded_sd_blob = json.loads(sd_blob_data)
|
||||
return self.get_unvalidated_blobs_in_stream(decoded_sd_blob)
|
||||
|
||||
def get_unvalidated_blobs_in_stream(self, sd_blob):
|
||||
dl = defer.DeferredList(list(self._iter_unvalidated_blobs_in_stream(sd_blob)),
|
||||
consumeErrors=True)
|
||||
dl.addCallback(lambda needed: [blob[1] for blob in needed if blob[1]])
|
||||
return dl
|
||||
|
||||
def _iter_unvalidated_blobs_in_stream(self, sd_blob):
|
||||
for blob in sd_blob['blobs']:
|
||||
if 'blob_hash' in blob and 'length' in blob:
|
||||
blob_hash, blob_len = blob['blob_hash'], blob['length']
|
||||
d = self.blob_manager.get_blob(blob_hash, blob_len)
|
||||
d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None)
|
||||
yield d
|
||||
needed_blobs = yield self.storage.get_pending_blobs_for_stream(sd_blob.blob_hash)
|
||||
response.update({NEEDED_BLOBS: needed_blobs})
|
||||
defer.returnValue(response)
|
||||
|
||||
def handle_blob_request(self, request_dict):
|
||||
"""
|
||||
|
|
Loading…
Reference in a new issue