forked from LBRYCommunity/lbry-sdk
update reflector client and server to use SQLiteStorage
This commit is contained in:
parent
68542f3ae1
commit
9a8cac20e7
3 changed files with 31 additions and 52 deletions
|
@ -1,4 +1,4 @@
|
|||
"""
|
||||
__doc__ = """
|
||||
Reflector is a protocol to re-host lbry blobs and streams
|
||||
Client queries and server responses follow, all dicts are encoded as json
|
||||
|
||||
|
|
|
@ -50,10 +50,6 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
def protocol_version(self):
|
||||
return self.factory.protocol_version
|
||||
|
||||
@property
|
||||
def stream_info_manager(self):
|
||||
return self.factory.stream_info_manager
|
||||
|
||||
@property
|
||||
def stream_hash(self):
|
||||
return self.factory.stream_hash
|
||||
|
@ -113,9 +109,9 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
|
||||
def get_validated_blobs(self, blobs_in_stream):
|
||||
def get_blobs(blobs):
|
||||
for (blob, _, _, blob_len) in blobs:
|
||||
if blob and blob_len:
|
||||
yield self.blob_manager.get_blob(blob, blob_len)
|
||||
for crypt_blob in blobs:
|
||||
if crypt_blob.blob_hash and crypt_blob.length:
|
||||
yield self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length)
|
||||
|
||||
dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
|
||||
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()])
|
||||
|
@ -135,7 +131,7 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
len(filtered))
|
||||
return filtered
|
||||
|
||||
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
|
||||
d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash)
|
||||
d.addCallback(self.get_validated_blobs)
|
||||
if not self.descriptor_needed:
|
||||
d.addCallback(lambda filtered:
|
||||
|
@ -155,8 +151,8 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
def _save_descriptor_blob(sd_blob):
|
||||
self.stream_descriptor = sd_blob
|
||||
|
||||
d = self.factory.stream_info_manager.get_sd_blob_hashes_for_stream(self.factory.stream_hash)
|
||||
d.addCallback(lambda sd: self.factory.blob_manager.get_blob(sd[0]))
|
||||
d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash)
|
||||
d.addCallback(self.factory.blob_manager.get_blob)
|
||||
d.addCallback(_save_descriptor_blob)
|
||||
return d
|
||||
|
||||
|
@ -326,10 +322,6 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
|
|||
def blob_manager(self):
|
||||
return self._lbry_file.blob_manager
|
||||
|
||||
@property
|
||||
def stream_info_manager(self):
|
||||
return self._lbry_file.stream_info_manager
|
||||
|
||||
@property
|
||||
def stream_hash(self):
|
||||
return self._lbry_file.stream_hash
|
||||
|
|
|
@ -6,7 +6,7 @@ from twisted.internet.protocol import Protocol, ServerFactory
|
|||
from lbrynet.core.utils import is_valid_blobhash
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash
|
||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
|
||||
from lbrynet.lbry_file.StreamDescriptor import save_sd_info
|
||||
from lbrynet.core.StreamDescriptor import save_sd_info
|
||||
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
|
||||
from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError
|
||||
|
||||
|
@ -32,7 +32,7 @@ class ReflectorServer(Protocol):
|
|||
log.debug('Connection made to %s', peer_info)
|
||||
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.stream_info_manager = self.factory.stream_info_manager
|
||||
self.storage = self.factory.blob_manager.storage
|
||||
self.lbry_file_manager = self.factory.lbry_file_manager
|
||||
self.protocol_version = self.factory.protocol_version
|
||||
self.received_handshake = False
|
||||
|
@ -67,16 +67,15 @@ class ReflectorServer(Protocol):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def check_head_blob_announce(self, stream_hash):
|
||||
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
|
||||
blob_hash, blob_num, blob_iv, blob_length = blob_infos[0]
|
||||
if blob_hash in self.blob_manager.blobs:
|
||||
head_blob = self.blob_manager.blobs[blob_hash]
|
||||
head_blob_hash = yield self.storage.get_stream_blob_by_position(stream_hash, 0)
|
||||
if head_blob_hash in self.blob_manager.blobs:
|
||||
head_blob = self.blob_manager.blobs[head_blob_hash]
|
||||
if head_blob.get_is_verified():
|
||||
should_announce = yield self.blob_manager.get_should_announce(blob_hash)
|
||||
should_announce = yield self.blob_manager.get_should_announce(head_blob_hash)
|
||||
if should_announce == 0:
|
||||
yield self.blob_manager.set_should_announce(blob_hash, 1)
|
||||
yield self.blob_manager.set_should_announce(head_blob_hash, 1)
|
||||
log.info("Discovered previously completed head blob (%s), "
|
||||
"setting it to be announced", blob_hash[:8])
|
||||
"setting it to be announced", head_blob_hash[:8])
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -89,27 +88,21 @@ class ReflectorServer(Protocol):
|
|||
yield self.blob_manager.set_should_announce(sd_hash, 1)
|
||||
log.info("Discovered previously completed sd blob (%s), "
|
||||
"setting it to be announced", sd_hash[:8])
|
||||
try:
|
||||
yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
|
||||
except NoSuchSDHash:
|
||||
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||
if not stream_hash:
|
||||
log.info("Adding blobs to stream")
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(
|
||||
sd_info['stream_hash'],
|
||||
sd_hash)
|
||||
yield save_sd_info(self.blob_manager, sd_hash, sd_info)
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _on_completed_blob(self, blob, response_key):
|
||||
should_announce = False
|
||||
yield self.blob_manager.blob_completed(blob, should_announce=False)
|
||||
|
||||
if response_key == RECEIVED_SD_BLOB:
|
||||
sd_info = yield BlobStreamDescriptorReader(blob).get_info()
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'],
|
||||
blob.blob_hash)
|
||||
yield self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'], blob.blob_hash)
|
||||
should_announce = True
|
||||
yield save_sd_info(self.blob_manager, blob.blob_hash, sd_info)
|
||||
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
|
||||
|
||||
# if we already have the head blob, set it to be announced now that we know it's
|
||||
# a head blob
|
||||
|
@ -117,21 +110,18 @@ class ReflectorServer(Protocol):
|
|||
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash)
|
||||
stream_hash = yield self.storage.get_stream_of_blob(blob.blob_hash)
|
||||
if stream_hash is not None:
|
||||
blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash,
|
||||
blob.blob_hash)
|
||||
blob_num = yield self.storage.get_blob_num_by_hash(stream_hash,
|
||||
blob.blob_hash)
|
||||
if blob_num == 0:
|
||||
should_announce = True
|
||||
sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(
|
||||
stream_hash)
|
||||
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash)
|
||||
yield self.blob_manager.set_should_announce(blob.blob_hash, True)
|
||||
|
||||
# if we already have the sd blob, set it to be announced now that we know it's
|
||||
# a sd blob
|
||||
for sd_hash in sd_hashes:
|
||||
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
|
||||
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
|
||||
|
||||
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
|
||||
yield self.close_blob()
|
||||
yield d
|
||||
log.info("Received %s", blob)
|
||||
|
@ -306,14 +296,12 @@ class ReflectorServer(Protocol):
|
|||
# marked as such for announcement now that we know it's an sd blob that we have.
|
||||
yield self.check_sd_blob_announce(sd_blob.blob_hash)
|
||||
try:
|
||||
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(
|
||||
stream_hash = yield self.storage.get_stream_hash_for_sd_hash(
|
||||
sd_blob.blob_hash)
|
||||
except NoSuchSDHash:
|
||||
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
|
||||
stream_hash = sd_info['stream_hash']
|
||||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash,
|
||||
sd_blob.blob_hash)
|
||||
yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info)
|
||||
yield self.check_head_blob_announce(stream_hash)
|
||||
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob)
|
||||
else:
|
||||
|
@ -401,10 +389,9 @@ class ReflectorServer(Protocol):
|
|||
class ReflectorServerFactory(ServerFactory):
|
||||
protocol = ReflectorServer
|
||||
|
||||
def __init__(self, peer_manager, blob_manager, stream_info_manager, lbry_file_manager):
|
||||
def __init__(self, peer_manager, blob_manager, lbry_file_manager):
|
||||
self.peer_manager = peer_manager
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.lbry_file_manager = lbry_file_manager
|
||||
self.protocol_version = REFLECTOR_V2
|
||||
|
||||
|
|
Loading…
Reference in a new issue