diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 16501d2e3..3dec521ab 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -984,14 +984,13 @@ class Daemon(AuthJSONRPCServer): def _reflect(self, lbry_file): if not lbry_file: return defer.fail(Exception("no lbry file given to reflect")) - stream_hash = lbry_file.stream_hash - if stream_hash is None: + if lbry_file.stream_hash is None: return defer.fail(Exception("no stream hash")) - log.info("Reflecting stream: %s" % stream_hash) factory = reflector.ClientFactory( self.session.blob_manager, self.lbry_file_manager.stream_info_manager, - stream_hash + lbry_file.stream_hash, + lbry_file.uri ) return run_reflector_factory(factory) diff --git a/lbrynet/lbrynet_daemon/Publisher.py b/lbrynet/lbrynet_daemon/Publisher.py index c422194bd..39dcb98e6 100644 --- a/lbrynet/lbrynet_daemon/Publisher.py +++ b/lbrynet/lbrynet_daemon/Publisher.py @@ -90,7 +90,8 @@ class Publisher(object): factory = reflector.ClientFactory( self.session.blob_manager, self.lbry_file_manager.stream_info_manager, - self.stream_hash + self.stream_hash, + self.publish_name ) d = reactor.resolve(reflector_address) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 9b2bd94bc..a42c49b71 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -25,6 +25,7 @@ class EncryptedFileReflectorClient(Protocol): self.sent_stream_info = False self.received_descriptor_response = False self.protocol_version = self.factory.protocol_version + self.lbry_uri = "lbry://%s" % self.factory.lbry_uri self.received_server_version = False self.server_version = None self.stream_descriptor = None @@ -99,7 +100,7 @@ class EncryptedFileReflectorClient(Protocol): log.info("Reflector needs %s%i blobs for %s", needs_desc, len(filtered), - str(self.stream_descriptor)[:16]) + self.lbry_uri) return filtered d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash) @@ -186,10 +187,10 @@ class EncryptedFileReflectorClient(Protocol): self.received_descriptor_response = True if response_dict['received_sd_blob']: self.reflected_blobs.append(self.next_blob_to_send.blob_hash) - log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16]) + log.info("Sent reflector descriptor %s", self.next_blob_to_send) else: - log.warning("Reflector failed to receive descriptor %s, trying again later", - self.next_blob_to_send.blob_hash[:16]) + log.warning("Reflector failed to receive descriptor %s for %s", + self.next_blob_to_send, self.lbry_uri) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() @@ -201,7 +202,8 @@ class EncryptedFileReflectorClient(Protocol): self.file_sender = FileSender() return defer.succeed(True) else: - log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16]) + log.warning("Reflector already has %s for %s", self.next_blob_to_send, + self.lbry_uri) return self.set_not_uploading() else: # Expecting Server Blob Response if 'received_blob' not in response_dict: @@ -209,10 +211,11 @@ class EncryptedFileReflectorClient(Protocol): else: if response_dict['received_blob']: self.reflected_blobs.append(self.next_blob_to_send.blob_hash) - log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16]) + log.info("Sent reflector blob %s for %s", self.next_blob_to_send, + self.lbry_uri) else: - log.warning("Reflector failed to receive blob %s, trying again later", - self.next_blob_to_send.blob_hash[:16]) + log.warning("Reflector failed to receive blob %s for %s", + self.next_blob_to_send, self.lbry_uri) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() @@ -274,11 +277,12 @@ class EncryptedFileReflectorClient(Protocol): class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient - def __init__(self, blob_manager, stream_info_manager, stream_hash): + def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri): self.protocol_version = REFLECTOR_V2 self.blob_manager = blob_manager self.stream_info_manager = stream_info_manager self.stream_hash = stream_hash + self.lbry_uri = lbry_uri self.p = None self.finished_deferred = defer.Deferred() diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 9008e113f..55b4d1789 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -19,19 +19,25 @@ def _check_if_reflector_has_stream(lbry_file, reflector_server): return d +def log_result(result, uri): + if len(result) == 0: + log.info("Reflector has all blobs for lbry://%s", uri) + else: + log.info("Reflected %i blobs for lbry://%s", len(result), uri) + + def _reflect_stream(lbry_file, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] factory = ClientFactory( lbry_file.blob_manager, lbry_file.stream_info_manager, - lbry_file.stream_hash + lbry_file.stream_hash, + lbry_file.uri ) d = reactor.resolve(reflector_address) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) d.addCallback(lambda _: factory.finished_deferred) - d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s", - len(reflected_blobs), - lbry_file.uri)) + d.addCallback(log_result, lbry_file.uri) return d diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 9eb21e915..4d88842ab 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -173,7 +173,8 @@ class TestReflector(unittest.TestCase): factory = reflector.ClientFactory( self.session.blob_manager, self.stream_info_manager, - self.stream_hash + self.stream_hash, + "fake_uri" ) from twisted.internet import reactor