diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py index edcad4066..06dd0b4e9 100644 --- a/lbrynet/reflector/__init__.py +++ b/lbrynet/reflector/__init__.py @@ -1,4 +1,4 @@ -""" +__doc__ = """ Reflector is a protocol to re-host lbry blobs and streams Client queries and server responses follow, all dicts are encoded as json diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 85f72b785..d70e531a2 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -50,10 +50,6 @@ class EncryptedFileReflectorClient(Protocol): def protocol_version(self): return self.factory.protocol_version - @property - def stream_info_manager(self): - return self.factory.stream_info_manager - @property def stream_hash(self): return self.factory.stream_hash @@ -113,9 +109,9 @@ class EncryptedFileReflectorClient(Protocol): def get_validated_blobs(self, blobs_in_stream): def get_blobs(blobs): - for (blob, _, _, blob_len) in blobs: - if blob and blob_len: - yield self.blob_manager.get_blob(blob, blob_len) + for crypt_blob in blobs: + if crypt_blob.blob_hash and crypt_blob.length: + yield self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True) dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()]) @@ -135,7 +131,7 @@ class EncryptedFileReflectorClient(Protocol): len(filtered)) return filtered - d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash) + d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash) d.addCallback(self.get_validated_blobs) if not self.descriptor_needed: d.addCallback(lambda filtered: @@ -155,8 +151,8 @@ class EncryptedFileReflectorClient(Protocol): def _save_descriptor_blob(sd_blob): self.stream_descriptor = sd_blob - d = self.factory.stream_info_manager.get_sd_blob_hashes_for_stream(self.factory.stream_hash) - d.addCallback(lambda sd: self.factory.blob_manager.get_blob(sd[0])) + d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash) + d.addCallback(self.factory.blob_manager.get_blob) d.addCallback(_save_descriptor_blob) return d @@ -326,10 +322,6 @@ class EncryptedFileReflectorClientFactory(ClientFactory): def blob_manager(self): return self._lbry_file.blob_manager - @property - def stream_info_manager(self): - return self._lbry_file.stream_info_manager - @property def stream_hash(self): return self._lbry_file.stream_hash diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 74e457c1d..6d0e56656 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -6,7 +6,7 @@ from twisted.internet.protocol import Protocol, ServerFactory from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader -from lbrynet.lbry_file.StreamDescriptor import save_sd_info +from lbrynet.core.StreamDescriptor import save_sd_info from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2 from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError @@ -32,7 +32,7 @@ class ReflectorServer(Protocol): log.debug('Connection made to %s', peer_info) self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) self.blob_manager = self.factory.blob_manager - self.stream_info_manager = self.factory.stream_info_manager + self.storage = self.factory.blob_manager.storage self.lbry_file_manager = self.factory.lbry_file_manager self.protocol_version = self.factory.protocol_version self.received_handshake = False @@ -67,16 +67,15 @@ class ReflectorServer(Protocol): @defer.inlineCallbacks def check_head_blob_announce(self, stream_hash): - blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) - blob_hash, blob_num, blob_iv, blob_length = blob_infos[0] - if blob_hash in self.blob_manager.blobs: - head_blob = self.blob_manager.blobs[blob_hash] + head_blob_hash = yield self.storage.get_stream_blob_by_position(stream_hash, 0) + if head_blob_hash in self.blob_manager.blobs: + head_blob = self.blob_manager.blobs[head_blob_hash] if head_blob.get_is_verified(): - should_announce = yield self.blob_manager.get_should_announce(blob_hash) + should_announce = yield self.blob_manager.get_should_announce(head_blob_hash) if should_announce == 0: - yield self.blob_manager.set_should_announce(blob_hash, 1) + yield self.blob_manager.set_should_announce(head_blob_hash, 1) log.info("Discovered previously completed head blob (%s), " - "setting it to be announced", blob_hash[:8]) + "setting it to be announced", head_blob_hash[:8]) defer.returnValue(None) @defer.inlineCallbacks @@ -89,27 +88,21 @@ class ReflectorServer(Protocol): yield self.blob_manager.set_should_announce(sd_hash, 1) log.info("Discovered previously completed sd blob (%s), " "setting it to be announced", sd_hash[:8]) - try: - yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) - except NoSuchSDHash: + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + if not stream_hash: log.info("Adding blobs to stream") sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() - yield save_sd_info(self.stream_info_manager, sd_info) - yield self.stream_info_manager.save_sd_blob_hash_to_stream( - sd_info['stream_hash'], - sd_hash) + yield save_sd_info(self.blob_manager, sd_hash, sd_info) defer.returnValue(None) @defer.inlineCallbacks def _on_completed_blob(self, blob, response_key): - should_announce = False + yield self.blob_manager.blob_completed(blob, should_announce=False) + if response_key == RECEIVED_SD_BLOB: sd_info = yield BlobStreamDescriptorReader(blob).get_info() - yield save_sd_info(self.stream_info_manager, sd_info) - yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'], - blob.blob_hash) - yield self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'], blob.blob_hash) - should_announce = True + yield save_sd_info(self.blob_manager, blob.blob_hash, sd_info) + yield self.blob_manager.set_should_announce(blob.blob_hash, True) # if we already have the head blob, set it to be announced now that we know it's # a head blob @@ -117,21 +110,18 @@ class ReflectorServer(Protocol): else: d = defer.succeed(None) - stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash) + stream_hash = yield self.storage.get_stream_of_blob(blob.blob_hash) if stream_hash is not None: - blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash, - blob.blob_hash) + blob_num = yield self.storage.get_blob_num_by_hash(stream_hash, + blob.blob_hash) if blob_num == 0: - should_announce = True - sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream( - stream_hash) + sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) + yield self.blob_manager.set_should_announce(blob.blob_hash, True) # if we already have the sd blob, set it to be announced now that we know it's # a sd blob - for sd_hash in sd_hashes: - d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash)) + d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash)) - yield self.blob_manager.blob_completed(blob, should_announce=should_announce) yield self.close_blob() yield d log.info("Received %s", blob) @@ -306,14 +296,12 @@ class ReflectorServer(Protocol): # marked as such for announcement now that we know it's an sd blob that we have. yield self.check_sd_blob_announce(sd_blob.blob_hash) try: - stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash( + stream_hash = yield self.storage.get_stream_hash_for_sd_hash( sd_blob.blob_hash) except NoSuchSDHash: sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() stream_hash = sd_info['stream_hash'] - yield save_sd_info(self.stream_info_manager, sd_info) - yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, - sd_blob.blob_hash) + yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info) yield self.check_head_blob_announce(stream_hash) response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob) else: @@ -401,10 +389,9 @@ class ReflectorServer(Protocol): class ReflectorServerFactory(ServerFactory): protocol = ReflectorServer - def __init__(self, peer_manager, blob_manager, stream_info_manager, lbry_file_manager): + def __init__(self, peer_manager, blob_manager, lbry_file_manager): self.peer_manager = peer_manager self.blob_manager = blob_manager - self.stream_info_manager = stream_info_manager self.lbry_file_manager = lbry_file_manager self.protocol_version = REFLECTOR_V2