From a58a81a8413d08120af0f60e5e167679a732a66b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 7 Feb 2017 14:27:05 -0500 Subject: [PATCH 1/5] stopProducing in reflector client file_sender when uploading is done this fixes exceptions.ValueError: I/O operation on closed file in twisted.protocols.basic.ResumeProducing --- lbrynet/reflector/client/client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 31eda8d31..9b2bd94bc 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -145,9 +145,11 @@ class EncryptedFileReflectorClient(Protocol): def set_not_uploading(self): if self.next_blob_to_send is not None: + log.debug("Close %s", self.next_blob_to_send) self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle = None self.next_blob_to_send = None + self.file_sender.stopProducing() self.file_sender = None return defer.succeed(None) From d053db8dfd345f12989de50de0731084652d0062 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 7 Feb 2017 14:27:38 -0500 Subject: [PATCH 2/5] add uri to stream reflector for better logging --- lbrynet/lbrynet_daemon/Daemon.py | 7 +++---- lbrynet/lbrynet_daemon/Publisher.py | 3 ++- lbrynet/reflector/client/client.py | 22 +++++++++++++--------- lbrynet/reflector/reupload.py | 14 ++++++++++---- tests/functional/test_reflector.py | 3 ++- 5 files changed, 30 insertions(+), 19 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 16501d2e3..3dec521ab 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -984,14 +984,13 @@ class Daemon(AuthJSONRPCServer): def _reflect(self, lbry_file): if not lbry_file: return defer.fail(Exception("no lbry file given to reflect")) - stream_hash = lbry_file.stream_hash - if stream_hash is None: + if lbry_file.stream_hash is None: return defer.fail(Exception("no stream hash")) - log.info("Reflecting stream: %s" % stream_hash) factory = reflector.ClientFactory( self.session.blob_manager, self.lbry_file_manager.stream_info_manager, - stream_hash + lbry_file.stream_hash, + lbry_file.uri ) return run_reflector_factory(factory) diff --git a/lbrynet/lbrynet_daemon/Publisher.py b/lbrynet/lbrynet_daemon/Publisher.py index c422194bd..39dcb98e6 100644 --- a/lbrynet/lbrynet_daemon/Publisher.py +++ b/lbrynet/lbrynet_daemon/Publisher.py @@ -90,7 +90,8 @@ class Publisher(object): factory = reflector.ClientFactory( self.session.blob_manager, self.lbry_file_manager.stream_info_manager, - self.stream_hash + self.stream_hash, + self.publish_name ) d = reactor.resolve(reflector_address) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 9b2bd94bc..a42c49b71 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -25,6 +25,7 @@ class EncryptedFileReflectorClient(Protocol): self.sent_stream_info = False self.received_descriptor_response = False self.protocol_version = self.factory.protocol_version + self.lbry_uri = "lbry://%s" % self.factory.lbry_uri self.received_server_version = False self.server_version = None self.stream_descriptor = None @@ -99,7 +100,7 @@ class EncryptedFileReflectorClient(Protocol): log.info("Reflector needs %s%i blobs for %s", needs_desc, len(filtered), - str(self.stream_descriptor)[:16]) + self.lbry_uri) return filtered d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash) @@ -186,10 +187,10 @@ class EncryptedFileReflectorClient(Protocol): self.received_descriptor_response = True if response_dict['received_sd_blob']: self.reflected_blobs.append(self.next_blob_to_send.blob_hash) - log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16]) + log.info("Sent reflector descriptor %s", self.next_blob_to_send) else: - log.warning("Reflector failed to receive descriptor %s, trying again later", - self.next_blob_to_send.blob_hash[:16]) + log.warning("Reflector failed to receive descriptor %s for %s", + self.next_blob_to_send, self.lbry_uri) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() @@ -201,7 +202,8 @@ class EncryptedFileReflectorClient(Protocol): self.file_sender = FileSender() return defer.succeed(True) else: - log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16]) + log.warning("Reflector already has %s for %s", self.next_blob_to_send, + self.lbry_uri) return self.set_not_uploading() else: # Expecting Server Blob Response if 'received_blob' not in response_dict: @@ -209,10 +211,11 @@ class EncryptedFileReflectorClient(Protocol): else: if response_dict['received_blob']: self.reflected_blobs.append(self.next_blob_to_send.blob_hash) - log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16]) + log.info("Sent reflector blob %s for %s", self.next_blob_to_send, + self.lbry_uri) else: - log.warning("Reflector failed to receive blob %s, trying again later", - self.next_blob_to_send.blob_hash[:16]) + log.warning("Reflector failed to receive blob %s for %s", + self.next_blob_to_send, self.lbry_uri) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() @@ -274,11 +277,12 @@ class EncryptedFileReflectorClient(Protocol): class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient - def __init__(self, blob_manager, stream_info_manager, stream_hash): + def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri): self.protocol_version = REFLECTOR_V2 self.blob_manager = blob_manager self.stream_info_manager = stream_info_manager self.stream_hash = stream_hash + self.lbry_uri = lbry_uri self.p = None self.finished_deferred = defer.Deferred() diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 9008e113f..55b4d1789 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -19,19 +19,25 @@ def _check_if_reflector_has_stream(lbry_file, reflector_server): return d +def log_result(result, uri): + if len(result) == 0: + log.info("Reflector has all blobs for lbry://%s", uri) + else: + log.info("Reflected %i blobs for lbry://%s", len(result), uri) + + def _reflect_stream(lbry_file, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] factory = ClientFactory( lbry_file.blob_manager, lbry_file.stream_info_manager, - lbry_file.stream_hash + lbry_file.stream_hash, + lbry_file.uri ) d = reactor.resolve(reflector_address) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) d.addCallback(lambda _: factory.finished_deferred) - d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s", - len(reflected_blobs), - lbry_file.uri)) + d.addCallback(log_result, lbry_file.uri) return d diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 9eb21e915..4d88842ab 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -173,7 +173,8 @@ class TestReflector(unittest.TestCase): factory = reflector.ClientFactory( self.session.blob_manager, self.stream_info_manager, - self.stream_hash + self.stream_hash, + "fake_uri" ) from twisted.internet import reactor From f2ddc9bd98206e684bb54580dca6b1b7c221ddfe Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 7 Feb 2017 16:09:44 -0500 Subject: [PATCH 3/5] fix recursion depth bug upon failed blob --- lbrynet/reflector/client/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index a42c49b71..8cae3b66f 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -250,7 +250,7 @@ class EncryptedFileReflectorClient(Protocol): def skip_missing_blob(self, err, blob_hash): log.warning("Can't reflect blob %s", str(blob_hash)[:16]) err.trap(ValueError) - return self.send_next_request() + self.blob_hashes_to_send.append(blob_hash) def send_next_request(self): if self.file_sender is not None: @@ -260,7 +260,8 @@ class EncryptedFileReflectorClient(Protocol): # open the sd blob to send blob = self.stream_descriptor d = self.open_blob_for_reading(blob) - d.addCallback(lambda _: self.send_descriptor_info()) + d.addCallbacks(lambda _: self.send_descriptor_info(), + lambda err: self.skip_missing_blob(err, blob.blob_hash)) return d elif self.blob_hashes_to_send: # open the next blob to send From 832a32474f7895529df7e20eed9f2a6cca5afea8 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 Feb 2017 14:46:18 -0500 Subject: [PATCH 4/5] update changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cbf834c3..a61198fc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ can and probably will change functionality and break backwards compatability at anytime. ## [Unreleased] +### Changed + * add uri to stream reflector to de-obfuscate reflector logs +### Fixed + * fix recursion depth error upon failed blob + * call stopProducing in reflector client file_sender when uploading is done ## [0.8.1] - 2017-02-01 ### Changed From 777419b4a98404cc69358824f2053eef9e1f8dcc Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 Feb 2017 16:18:34 -0500 Subject: [PATCH 5/5] log traceback on failed blob upload -move status message on connectionDone to client, indicate if blobs were sent or not (and how many blobs reflector still needs, if any) -only try uploading failed blob once after first failure, to prevent indefinite retries --- lbrynet/reflector/client/client.py | 22 ++++++++++++++++++---- lbrynet/reflector/reupload.py | 8 -------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 8cae3b66f..e433c22dc 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -20,6 +20,7 @@ class EncryptedFileReflectorClient(Protocol): self.response_buff = '' self.outgoing_buff = '' self.blob_hashes_to_send = [] + self.failed_blob_hashes = [] self.next_blob_to_send = None self.read_handle = None self.sent_stream_info = False @@ -54,10 +55,17 @@ class EncryptedFileReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): - log.debug('Finished sending data via reflector') + if not self.needed_blobs: + log.info("Reflector has all blobs for %s", self.lbry_uri) + elif not self.reflected_blobs: + log.info("No more completed blobs for %s to reflect, %i are still needed", + self.lbry_uri, len(self.needed_blobs)) + else: + log.info('Finished sending reflector %i blobs for %s', + len(self.reflected_blobs), self.lbry_uri) self.factory.finished_deferred.callback(self.reflected_blobs) else: - log.debug('Reflector finished: %s', reason) + log.info('Reflector finished for %s: %s', self.lbry_uri, reason) self.factory.finished_deferred.callback(reason) # IConsumer stuff @@ -248,9 +256,15 @@ class EncryptedFileReflectorClient(Protocol): self.send_request(r) def skip_missing_blob(self, err, blob_hash): - log.warning("Can't reflect blob %s", str(blob_hash)[:16]) err.trap(ValueError) - self.blob_hashes_to_send.append(blob_hash) + if blob_hash not in self.failed_blob_hashes: + log.warning("Failed to reflect blob %s for %s, reason: %s", + str(blob_hash)[:16], self.lbry_uri, err.getTraceback()) + self.blob_hashes_to_send.append(blob_hash) + self.failed_blob_hashes.append(blob_hash) + else: + log.warning("Failed second try reflecting blob %s for %s, giving up, reason: %s", + str(blob_hash)[:16], self.lbry_uri, err.getTraceback()) def send_next_request(self): if self.file_sender is not None: diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 55b4d1789..1a64f8a86 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -19,13 +19,6 @@ def _check_if_reflector_has_stream(lbry_file, reflector_server): return d -def log_result(result, uri): - if len(result) == 0: - log.info("Reflector has all blobs for lbry://%s", uri) - else: - log.info("Reflected %i blobs for lbry://%s", len(result), uri) - - def _reflect_stream(lbry_file, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] factory = ClientFactory( @@ -37,7 +30,6 @@ def _reflect_stream(lbry_file, reflector_server): d = reactor.resolve(reflector_address) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) d.addCallback(lambda _: factory.finished_deferred) - d.addCallback(log_result, lbry_file.uri) return d