Merge branch 'fix-reflector'

This commit is contained in:
Jack Robison 2018-03-08 16:49:28 -05:00
commit de0b60aa9a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 80 additions and 105 deletions

View file

@ -15,14 +15,16 @@ at anytime.
### Fixed
* fixed the inconsistencies in API and CLI docstrings
* `blob_announce` error when announcing a single blob
* `blob_list` error when looking up blobs by stream or sd hash
### Deprecated
* `report_bug` jsonrpc command
*
### Changed
*
*
* reflector server to periodically check and set `should_announce` for sd and head blobs instead of during each request
* reflector server to use `SQLiteStorage` to find needed blob hashes for a stream
### Added
* scripts to autogenerate documentation
*

View file

@ -1,7 +1,7 @@
import logging
import os
from sqlite3 import IntegrityError
from twisted.internet import threads, defer, reactor
from twisted.internet import threads, defer, reactor, task
from lbrynet import conf
from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator
@ -29,10 +29,18 @@ class DiskBlobManager(DHTHashSupplier):
self.blobs = {}
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
self.check_should_announce_lc = None
if conf.settings['run_reflector_server']:
self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs)
def setup(self):
if self.check_should_announce_lc and not self.check_should_announce_lc.running:
self.check_should_announce_lc.start(600)
return defer.succeed(True)
def stop(self):
if self.check_should_announce_lc and self.check_should_announce_lc.running:
self.check_should_announce_lc.stop()
return defer.succeed(True)
def get_blob(self, blob_hash, length=None):

View file

@ -331,6 +331,7 @@ class Session(object):
self.rate_limiter.start()
d = self.storage.setup()
d.addCallback(lambda _: self.blob_manager.setup())
d.addCallback(lambda _: self.wallet.start())
d.addCallback(lambda _: self.blob_tracker.start())
return d

View file

@ -3031,7 +3031,8 @@ class Daemon(AuthJSONRPCServer):
stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash)
sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash)
if stream_hash:
blobs = yield self.session.storage.get_blobs_for_stream(stream_hash)
crypt_blobs = yield self.session.storage.get_blobs_for_stream(stream_hash)
blobs = [self.session.blob_manager.blobs[crypt_blob.blob_hash] for crypt_blob in crypt_blobs]
else:
blobs = []
# get_blobs_for_stream does not include the sd blob, so we'll add it manually

View file

@ -261,12 +261,12 @@ class SQLiteStorage(object):
def count_should_announce_blobs(self):
return self.run_and_return_one_or_none(
"select count(*) from blob where should_announce=1 and status=?", "finished"
"select count(*) from blob where should_announce=1 and status='finished'"
)
def get_all_should_announce_blobs(self):
return self.run_and_return_list(
"select blob_hash from blob where should_announce=1 and status=?", "finished"
"select blob_hash from blob where should_announce=1 and status='finished'"
)
def get_blobs_to_announce(self, hash_announcer):
@ -275,12 +275,13 @@ class SQLiteStorage(object):
if conf.settings['announce_head_blobs_only']:
r = transaction.execute(
"select blob_hash from blob "
"where blob_hash is not null and should_announce=1 and next_announce_time<?",
"where blob_hash is not null and should_announce=1 and next_announce_time<? and status='finished'",
(timestamp,)
)
else:
r = transaction.execute(
"select blob_hash from blob where blob_hash is not null and next_announce_time<?", (timestamp,)
"select blob_hash from blob where blob_hash is not null "
"and next_announce_time<? and status='finished'", (timestamp,)
)
blobs = [b for b, in r.fetchall()]
@ -319,6 +320,30 @@ class SQLiteStorage(object):
if blob_info.get('blob_hash') and blob_info['length']:
yield self.add_known_blob(blob_info['blob_hash'], blob_info['length'])
def verify_will_announce_head_and_sd_blobs(self, stream_hash):
# fix should_announce for imported head and sd blobs
return self.db.runOperation(
"update blob set should_announce=1 "
"where should_announce=0 and "
"blob.blob_hash in "
" (select b.blob_hash from blob b inner join stream s on b.blob_hash=s.sd_hash and s.stream_hash=?) "
"or blob.blob_hash in "
" (select b.blob_hash from blob b "
" inner join stream_blob s2 on b.blob_hash=s2.blob_hash and s2.position=0 and s2.stream_hash=?)",
(stream_hash, stream_hash)
)
def verify_will_announce_all_head_and_sd_blobs(self):
return self.db.runOperation(
"update blob set should_announce=1 "
"where should_announce=0 and "
"blob.blob_hash in "
" (select b.blob_hash from blob b inner join stream s on b.blob_hash=s.sd_hash) "
"or blob.blob_hash in "
" (select b.blob_hash from blob b "
" inner join stream_blob s2 on b.blob_hash=s2.blob_hash and s2.position=0)"
)
# # # # # # # # # stream functions # # # # # # # # #
def store_stream(self, stream_hash, sd_hash, stream_name, stream_key, suggested_file_name,
@ -392,25 +417,38 @@ class SQLiteStorage(object):
def get_blobs_for_stream(self, stream_hash, only_completed=False):
def _get_blobs_for_stream(transaction):
crypt_blob_infos = []
stream_blobs = transaction.execute(
"select blob_hash, position, iv from stream_blob where stream_hash=?", (stream_hash, )
).fetchall()
if only_completed:
query = "select blob_hash, position, iv from stream_blob where stream_hash=? and status='finished'"
lengths = transaction.execute(
"select b.blob_hash, b.blob_length from blob b "
"inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished'"
).fetchall()
else:
query = "select blob_hash, position, iv from stream_blob where stream_hash=?"
stream_blobs = transaction.execute(query, (stream_hash, )).fetchall()
if stream_blobs:
for blob_hash, position, iv in stream_blobs:
if blob_hash is not None:
blob_length = transaction.execute("select blob_length from blob "
"where blob_hash=?",
(blob_hash,)).fetchone()
blob_length = 0 if not blob_length else blob_length[0]
crypt_blob_infos.append(CryptBlobInfo(blob_hash, position, blob_length, iv))
else:
crypt_blob_infos.append(CryptBlobInfo(None, position, 0, iv))
crypt_blob_infos = sorted(crypt_blob_infos, key=lambda info: info.blob_num)
lengths = transaction.execute(
"select b.blob_hash, b.blob_length from blob b "
"inner join stream_blob s ON b.blob_hash=s.blob_hash"
).fetchall()
blob_length_dict = {}
for blob_hash, length in lengths:
blob_length_dict[blob_hash] = length
for blob_hash, position, iv in stream_blobs:
blob_length = blob_length_dict.get(blob_hash, 0)
crypt_blob_infos.append(CryptBlobInfo(blob_hash, position, blob_length, iv))
crypt_blob_infos = sorted(crypt_blob_infos, key=lambda info: info.blob_num)
return crypt_blob_infos
return self.db.runInteraction(_get_blobs_for_stream)
def get_pending_blobs_for_stream(self, stream_hash):
return self.run_and_return_list(
"select s.blob_hash from stream_blob s where stream_hash=? "
"inner join blob b on b.blob_hash=s.blob_hash and b.status='pending'",
stream_hash
)
def get_stream_of_blob(self, blob_hash):
return self.run_and_return_one_or_none(
"select stream_hash from stream_blob where blob_hash=?", blob_hash

View file

@ -4,7 +4,7 @@ from twisted.python import failure
from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ServerFactory
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.core.StreamDescriptor import save_sd_info
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
@ -65,36 +65,6 @@ class ReflectorServer(Protocol):
else:
log.exception(err)
@defer.inlineCallbacks
def check_head_blob_announce(self, stream_hash):
head_blob_hash = yield self.storage.get_stream_blob_by_position(stream_hash, 0)
if head_blob_hash in self.blob_manager.blobs:
head_blob = self.blob_manager.blobs[head_blob_hash]
if head_blob.get_is_verified():
should_announce = yield self.blob_manager.get_should_announce(head_blob_hash)
if should_announce == 0:
yield self.blob_manager.set_should_announce(head_blob_hash, 1)
log.info("Discovered previously completed head blob (%s), "
"setting it to be announced", head_blob_hash[:8])
defer.returnValue(None)
@defer.inlineCallbacks
def check_sd_blob_announce(self, sd_hash):
if sd_hash in self.blob_manager.blobs:
sd_blob = self.blob_manager.blobs[sd_hash]
if sd_blob.get_is_verified():
should_announce = yield self.blob_manager.get_should_announce(sd_hash)
if should_announce == 0:
yield self.blob_manager.set_should_announce(sd_hash, 1)
log.info("Discovered previously completed sd blob (%s), "
"setting it to be announced", sd_hash[:8])
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
if not stream_hash:
log.info("Adding blobs to stream")
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
yield save_sd_info(self.blob_manager, sd_hash, sd_info)
defer.returnValue(None)
@defer.inlineCallbacks
def _on_completed_blob(self, blob, response_key):
yield self.blob_manager.blob_completed(blob, should_announce=False)
@ -103,27 +73,15 @@ class ReflectorServer(Protocol):
sd_info = yield BlobStreamDescriptorReader(blob).get_info()
yield save_sd_info(self.blob_manager, blob.blob_hash, sd_info)
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
# if we already have the head blob, set it to be announced now that we know it's
# a head blob
d = self.check_head_blob_announce(sd_info['stream_hash'])
else:
d = defer.succeed(None)
stream_hash = yield self.storage.get_stream_of_blob(blob.blob_hash)
if stream_hash is not None:
blob_num = yield self.storage.get_blob_num_by_hash(stream_hash,
blob.blob_hash)
if blob_num == 0:
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash)
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
# if we already have the sd blob, set it to be announced now that we know it's
# a sd blob
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
yield self.close_blob()
yield d
log.info("Received %s", blob)
yield self.send_response({response_key: True})
@ -238,7 +196,6 @@ class ReflectorServer(Protocol):
if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]:
raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION]))
self.peer_version = int(request_dict[VERSION])
log.debug('Handling handshake for client version %i', self.peer_version)
self.received_handshake = True
@ -292,17 +249,9 @@ class ReflectorServer(Protocol):
@defer.inlineCallbacks
def get_descriptor_response(self, sd_blob):
if sd_blob.get_is_verified():
# if we already have the sd blob being offered, make sure we have it and the head blob
# marked as such for announcement now that we know it's an sd blob that we have.
yield self.check_sd_blob_announce(sd_blob.blob_hash)
try:
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(
sd_blob.blob_hash)
except NoSuchSDHash:
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
stream_hash = sd_info['stream_hash']
yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info)
yield self.check_head_blob_announce(stream_hash)
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info)
yield self.storage.verify_will_announce_head_and_sd_blobs(sd_info['stream_hash'])
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob)
else:
self.incoming_blob = sd_blob
@ -311,35 +260,11 @@ class ReflectorServer(Protocol):
response = {SEND_SD_BLOB: True}
defer.returnValue(response)
@defer.inlineCallbacks
def request_needed_blobs(self, response, sd_blob):
def _add_needed_blobs_to_response(needed_blobs):
response.update({NEEDED_BLOBS: needed_blobs})
return response
d = self.determine_missing_blobs(sd_blob)
d.addCallback(_add_needed_blobs_to_response)
return d
def determine_missing_blobs(self, sd_blob):
reader = sd_blob.open_for_reading()
sd_blob_data = reader.read()
reader.close()
decoded_sd_blob = json.loads(sd_blob_data)
return self.get_unvalidated_blobs_in_stream(decoded_sd_blob)
def get_unvalidated_blobs_in_stream(self, sd_blob):
dl = defer.DeferredList(list(self._iter_unvalidated_blobs_in_stream(sd_blob)),
consumeErrors=True)
dl.addCallback(lambda needed: [blob[1] for blob in needed if blob[1]])
return dl
def _iter_unvalidated_blobs_in_stream(self, sd_blob):
for blob in sd_blob['blobs']:
if 'blob_hash' in blob and 'length' in blob:
blob_hash, blob_len = blob['blob_hash'], blob['length']
d = self.blob_manager.get_blob(blob_hash, blob_len)
d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None)
yield d
needed_blobs = yield self.storage.get_pending_blobs_for_stream(sd_blob.blob_hash)
response.update({NEEDED_BLOBS: needed_blobs})
defer.returnValue(response)
def handle_blob_request(self, request_dict):
"""