forked from LBRYCommunity/lbry-sdk
Merge branch 'reflector_announce_rework'
This commit is contained in:
commit
43e4b3f7d4
6 changed files with 247 additions and 19 deletions
|
@ -12,6 +12,10 @@ at anytime.
|
|||
*
|
||||
*
|
||||
|
||||
### Added
|
||||
* Added ability for reflector to store stream information for head blob announce
|
||||
*
|
||||
|
||||
### Fixed
|
||||
* Fixed handling cancelled blob and availability requests
|
||||
* Fixed redundant blob requests to a peer
|
||||
|
@ -21,7 +25,7 @@ at anytime.
|
|||
*
|
||||
|
||||
### Changed
|
||||
*
|
||||
* Announcing by head blob is turned on by default
|
||||
*
|
||||
|
||||
### Added
|
||||
|
|
|
@ -251,7 +251,7 @@ ADJUSTABLE_SETTINGS = {
|
|||
'download_directory': (str, default_download_dir),
|
||||
'download_timeout': (int, 180),
|
||||
'is_generous_host': (bool, True),
|
||||
'announce_head_blobs_only': (bool, False),
|
||||
'announce_head_blobs_only': (bool, True),
|
||||
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),
|
||||
'lbryum_wallet_dir': (str, default_lbryum_dir),
|
||||
'max_connections_per_stream': (int, 5),
|
||||
|
|
|
@ -88,6 +88,18 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
def hashes_to_announce(self):
|
||||
return self._get_blobs_to_announce()
|
||||
|
||||
def set_should_announce(self, blob_hash, should_announce):
|
||||
if blob_hash in self.blobs:
|
||||
blob = self.blobs[blob_hash]
|
||||
if blob.get_is_verified():
|
||||
return self._set_should_announce(blob_hash,
|
||||
self.get_next_announce_time(),
|
||||
should_announce)
|
||||
return defer.succeed(False)
|
||||
|
||||
def get_should_announce(self, blob_hash):
|
||||
return self._should_announce(blob_hash)
|
||||
|
||||
def creator_finished(self, blob_creator, should_announce):
|
||||
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
|
||||
if blob_creator.blob_hash is None:
|
||||
|
@ -170,14 +182,29 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce):
|
||||
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
|
||||
should_announce = 1 if should_announce else 0
|
||||
d = self.db_conn.runQuery(
|
||||
"insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+
|
||||
"values (?, ?, ?, ?)",
|
||||
(blob_hash, length, next_announce_time, should_announce)
|
||||
)
|
||||
d = self.db_conn.runQuery("insert into blobs (blob_hash, blob_length, next_announce_time, "
|
||||
"should_announce) values (?, ?, ?, ?)", (blob_hash, length,
|
||||
next_announce_time,
|
||||
should_announce))
|
||||
# TODO: why is this here?
|
||||
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
@defer.inlineCallbacks
|
||||
def _set_should_announce(self, blob_hash, next_announce_time, should_announce):
|
||||
yield self.db_conn.runOperation("update blobs set next_announce_time=?, should_announce=? "
|
||||
"where blob_hash=?", (next_announce_time, should_announce,
|
||||
blob_hash))
|
||||
defer.returnValue(True)
|
||||
|
||||
@rerun_if_locked
|
||||
@defer.inlineCallbacks
|
||||
def _should_announce(self, blob_hash):
|
||||
result = yield self.db_conn.runQuery("select should_announce from blobs where blob_hash=?",
|
||||
(blob_hash,))
|
||||
defer.returnValue(result[0][0])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _completed_blobs(self, blobhashes_to_check):
|
||||
"""Returns of the blobhashes_to_check, which are valid"""
|
||||
|
@ -192,7 +219,6 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
|
||||
@rerun_if_locked
|
||||
def _get_blobs_to_announce(self):
|
||||
|
||||
def get_and_update(transaction):
|
||||
timestamp = time.time()
|
||||
if self.announce_head_blobs_only is True:
|
||||
|
@ -230,6 +256,14 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
d = self.db_conn.runQuery("select blob_hash from blobs")
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
@defer.inlineCallbacks
|
||||
def _get_all_should_announce_blob_hashes(self):
|
||||
# return a list of blob hashes where should_announce is True
|
||||
blob_hashes = yield self.db_conn.runQuery(
|
||||
"select blob_hash from blobs where should_announce = 1")
|
||||
defer.returnValue([d[0] for d in blob_hashes])
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_verified_blob_hashes(self):
|
||||
d = self._get_all_blob_hashes()
|
||||
|
|
|
@ -316,7 +316,8 @@ class Daemon(AuthJSONRPCServer):
|
|||
if self.reflector_port is not None:
|
||||
reflector_factory = reflector_server_factory(
|
||||
self.session.peer_manager,
|
||||
self.session.blob_manager
|
||||
self.session.blob_manager,
|
||||
self.stream_info_manager
|
||||
)
|
||||
try:
|
||||
self.reflector_server_port = reactor.listenTCP(self.reflector_port,
|
||||
|
|
|
@ -4,7 +4,9 @@ from twisted.python import failure
|
|||
from twisted.internet import error, defer
|
||||
from twisted.internet.protocol import Protocol, ServerFactory
|
||||
from lbrynet.core.utils import is_valid_blobhash
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash
|
||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
|
||||
from lbrynet.lbry_file.StreamDescriptor import save_sd_info
|
||||
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
||||
from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError
|
||||
|
||||
|
@ -30,6 +32,7 @@ class ReflectorServer(Protocol):
|
|||
log.debug('Connection made to %s', peer_info)
|
||||
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.stream_info_manager = self.factory.stream_info_manager
|
||||
self.protocol_version = self.factory.protocol_version
|
||||
self.received_handshake = False
|
||||
self.peer_version = None
|
||||
|
@ -61,10 +64,74 @@ class ReflectorServer(Protocol):
|
|||
else:
|
||||
log.exception(err)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_head_blob_announce(self, stream_hash):
|
||||
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
|
||||
blob_hash, blob_num, blob_iv, blob_length = blob_infos[0]
|
||||
if blob_hash in self.blob_manager.blobs:
|
||||
head_blob = self.blob_manager.blobs[blob_hash]
|
||||
if head_blob.get_is_verified():
|
||||
should_announce = yield self.blob_manager.get_should_announce(blob_hash)
|
||||
if should_announce == 0:
|
||||
yield self.blob_manager.set_should_announce(blob_hash, 1)
|
||||
log.info("Discovered previously completed head blob (%s), "
|
||||
"setting it to be announced", blob_hash[:8])
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_sd_blob_announce(self, sd_hash):
|
||||
if sd_hash in self.blob_manager.blobs:
|
||||
sd_blob = self.blob_manager.blobs[sd_hash]
|
||||
if sd_blob.get_is_verified():
|
||||
should_announce = yield self.blob_manager.get_should_announce(sd_hash)
|
||||
if should_announce == 0:
|
||||
yield self.blob_manager.set_should_announce(sd_hash, 1)
|
||||
log.info("Discovered previously completed sd blob (%s), "
|
||||
"setting it to be announced", sd_hash[:8])
|
||||
try:
|
||||
yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
|
||||
except NoSuchSDHash:
|
||||
log.info("Adding blobs to stream")
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(
|
||||
sd_info['stream_hash'],
|
||||
sd_hash)
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_completed_blob(self, blob, response_key):
|
||||
yield self.blob_manager.blob_completed(blob)
|
||||
should_announce = False
|
||||
if response_key == RECEIVED_SD_BLOB:
|
||||
sd_info = yield BlobStreamDescriptorReader(blob).get_info()
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'],
|
||||
blob.blob_hash)
|
||||
should_announce = True
|
||||
|
||||
# if we already have the head blob, set it to be announced now that we know it's
|
||||
# a head blob
|
||||
d = self.check_head_blob_announce(sd_info['stream_hash'])
|
||||
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash)
|
||||
if stream_hash is not None:
|
||||
blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash,
|
||||
blob.blob_hash)
|
||||
if blob_num == 0:
|
||||
should_announce = True
|
||||
sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(
|
||||
stream_hash)
|
||||
|
||||
# if we already have the sd blob, set it to be announced now that we know it's
|
||||
# a sd blob
|
||||
for sd_hash in sd_hashes:
|
||||
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
|
||||
|
||||
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
|
||||
yield self.close_blob()
|
||||
yield d
|
||||
log.info("Received %s", blob)
|
||||
yield self.send_response({response_key: True})
|
||||
|
||||
|
@ -230,16 +297,29 @@ class ReflectorServer(Protocol):
|
|||
d = self.blob_finished_d
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_descriptor_response(self, sd_blob):
|
||||
if sd_blob.get_is_verified():
|
||||
d = defer.succeed({SEND_SD_BLOB: False})
|
||||
d.addCallback(self.request_needed_blobs, sd_blob)
|
||||
# if we already have the sd blob being offered, make sure we have it and the head blob
|
||||
# marked as such for announcement now that we know it's an sd blob that we have.
|
||||
yield self.check_sd_blob_announce(sd_blob.blob_hash)
|
||||
try:
|
||||
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(
|
||||
sd_blob.blob_hash)
|
||||
except NoSuchSDHash:
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
stream_hash = sd_info['stream_hash']
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash,
|
||||
sd_blob.blob_hash)
|
||||
yield self.check_head_blob_announce(stream_hash)
|
||||
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob)
|
||||
else:
|
||||
self.incoming_blob = sd_blob
|
||||
self.receiving_blob = True
|
||||
self.handle_incoming_blob(RECEIVED_SD_BLOB)
|
||||
d = defer.succeed({SEND_SD_BLOB: True})
|
||||
return d
|
||||
response = {SEND_SD_BLOB: True}
|
||||
defer.returnValue(response)
|
||||
|
||||
def request_needed_blobs(self, response, sd_blob):
|
||||
def _add_needed_blobs_to_response(needed_blobs):
|
||||
|
@ -318,9 +398,10 @@ class ReflectorServer(Protocol):
|
|||
class ReflectorServerFactory(ServerFactory):
|
||||
protocol = ReflectorServer
|
||||
|
||||
def __init__(self, peer_manager, blob_manager):
|
||||
def __init__(self, peer_manager, blob_manager, stream_info_manager):
|
||||
self.peer_manager = peer_manager
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.protocol_version = REFLECTOR_V2
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
|
|
|
@ -81,17 +81,21 @@ class TestReflector(unittest.TestCase):
|
|||
self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir()
|
||||
self.server_blob_manager = BlobManager.DiskBlobManager(
|
||||
hash_announcer, self.server_blob_dir, self.server_db_dir)
|
||||
self.server_stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(self.server_db_dir)
|
||||
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||
d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.setup())
|
||||
d.addCallback(lambda _: self.server_blob_manager.setup())
|
||||
d.addCallback(lambda _: self.server_stream_info_manager.setup())
|
||||
|
||||
def verify_equal(sd_info):
|
||||
self.assertEqual(mocks.create_stream_sd_file, sd_info)
|
||||
|
||||
def save_sd_blob_hash(sd_hash):
|
||||
self.sd_hash = sd_hash
|
||||
self.expected_blobs.append((sd_hash, 923))
|
||||
|
||||
def verify_stream_descriptor_file(stream_hash):
|
||||
|
@ -120,7 +124,7 @@ class TestReflector(unittest.TestCase):
|
|||
return d
|
||||
|
||||
def start_server():
|
||||
server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager)
|
||||
server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager, self.server_stream_info_manager)
|
||||
from twisted.internet import reactor
|
||||
port = 8943
|
||||
while self.reflector_port is None:
|
||||
|
@ -160,12 +164,34 @@ class TestReflector(unittest.TestCase):
|
|||
return d
|
||||
|
||||
def test_stream_reflector(self):
|
||||
def verify_data_on_reflector():
|
||||
def verify_blob_on_reflector():
|
||||
check_blob_ds = []
|
||||
for blob_hash, blob_size in self.expected_blobs:
|
||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||
return defer.DeferredList(check_blob_ds)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def verify_stream_on_reflector():
|
||||
# check stream_info_manager has all the right information
|
||||
streams = yield self.server_stream_info_manager.get_all_streams()
|
||||
self.assertEqual(1, len(streams))
|
||||
self.assertEqual(self.stream_hash, streams[0])
|
||||
|
||||
blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||
blob_hashes = [b[0] for b in blobs if b[0] is not None]
|
||||
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
|
||||
self.assertEqual(expected_blob_hashes, blob_hashes)
|
||||
sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
|
||||
self.assertEqual(1, len(sd_hashes))
|
||||
expected_sd_hash = self.expected_blobs[-1][0]
|
||||
self.assertEqual(self.sd_hash, sd_hashes[0])
|
||||
|
||||
# check should_announce blobs on blob_manager
|
||||
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
|
||||
self.assertEqual(2, len(blob_hashes))
|
||||
self.assertTrue(self.sd_hash in blob_hashes)
|
||||
self.assertTrue(expected_blob_hashes[0] in blob_hashes)
|
||||
|
||||
def verify_have_blob(blob_hash, blob_size):
|
||||
d = self.server_blob_manager.get_blob(blob_hash)
|
||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||
|
@ -187,7 +213,8 @@ class TestReflector(unittest.TestCase):
|
|||
return
|
||||
|
||||
d = send_to_server()
|
||||
d.addCallback(lambda _: verify_data_on_reflector())
|
||||
d.addCallback(lambda _: verify_blob_on_reflector())
|
||||
d.addCallback(lambda _: verify_stream_on_reflector())
|
||||
return d
|
||||
|
||||
def test_blob_reflector(self):
|
||||
|
@ -221,6 +248,15 @@ class TestReflector(unittest.TestCase):
|
|||
return d
|
||||
|
||||
def test_blob_reflector_v1(self):
|
||||
@defer.inlineCallbacks
|
||||
def verify_stream_on_reflector():
|
||||
# this protocol should not have any impact on stream info manager
|
||||
streams = yield self.server_stream_info_manager.get_all_streams()
|
||||
self.assertEqual(0, len(streams))
|
||||
# there should be no should announce blobs here
|
||||
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
|
||||
self.assertEqual(0, len(blob_hashes))
|
||||
|
||||
def verify_data_on_reflector():
|
||||
check_blob_ds = []
|
||||
for blob_hash, blob_size in self.expected_blobs:
|
||||
|
@ -251,6 +287,78 @@ class TestReflector(unittest.TestCase):
|
|||
d.addCallback(lambda _: verify_data_on_reflector())
|
||||
return d
|
||||
|
||||
# test case when we reflect blob, and than that same blob
|
||||
# is reflected as stream
|
||||
def test_blob_reflect_and_stream(self):
|
||||
|
||||
def verify_blob_on_reflector():
|
||||
check_blob_ds = []
|
||||
for blob_hash, blob_size in self.expected_blobs:
|
||||
check_blob_ds.append(verify_have_blob(blob_hash, blob_size))
|
||||
return defer.DeferredList(check_blob_ds)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def verify_stream_on_reflector():
|
||||
# check stream_info_manager has all the right information
|
||||
|
||||
streams = yield self.server_stream_info_manager.get_all_streams()
|
||||
self.assertEqual(1, len(streams))
|
||||
self.assertEqual(self.stream_hash, streams[0])
|
||||
|
||||
blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||
blob_hashes = [b[0] for b in blobs if b[0] is not None]
|
||||
expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None]
|
||||
self.assertEqual(expected_blob_hashes, blob_hashes)
|
||||
sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
|
||||
self.assertEqual(1, len(sd_hashes))
|
||||
expected_sd_hash = self.expected_blobs[-1][0]
|
||||
self.assertEqual(self.sd_hash, sd_hashes[0])
|
||||
|
||||
# check should_announce blobs on blob_manager
|
||||
blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes()
|
||||
self.assertEqual(2, len(blob_hashes))
|
||||
self.assertTrue(self.sd_hash in blob_hashes)
|
||||
self.assertTrue(expected_blob_hashes[0] in blob_hashes)
|
||||
|
||||
def verify_have_blob(blob_hash, blob_size):
|
||||
d = self.server_blob_manager.get_blob(blob_hash)
|
||||
d.addCallback(lambda blob: verify_blob_completed(blob, blob_size))
|
||||
return d
|
||||
|
||||
def send_to_server_as_blobs(blob_hashes_to_send):
|
||||
factory = reflector.BlobClientFactory(
|
||||
self.session.blob_manager,
|
||||
blob_hashes_to_send
|
||||
)
|
||||
factory.protocol_version = 0
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
return factory.finished_deferred
|
||||
|
||||
def send_to_server_as_stream(result):
|
||||
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
|
||||
self.stream_info_manager,
|
||||
self.stream_hash)
|
||||
factory = reflector.ClientFactory(fake_lbry_file)
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
return factory.finished_deferred
|
||||
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
|
||||
# Modify this to change which blobs to send
|
||||
blobs_to_send = self.expected_blobs
|
||||
|
||||
d = send_to_server_as_blobs([x[0] for x in self.expected_blobs])
|
||||
d.addCallback(send_to_server_as_stream)
|
||||
d.addCallback(lambda _: verify_blob_on_reflector())
|
||||
d.addCallback(lambda _: verify_stream_on_reflector())
|
||||
return d
|
||||
|
||||
def iv_generator():
|
||||
iv = 0
|
||||
|
|
Loading…
Add table
Reference in a new issue