diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 3e19ea766..de6612751 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -94,7 +94,7 @@ class EncryptedFileReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): - log.debug('Finished sending data via reflector') + log.info('Finished sending data via reflector') self.factory.finished_deferred.callback(True) else: log.debug('Reflector finished: %s', reason) @@ -124,9 +124,9 @@ class EncryptedFileReflectorClient(Protocol): def set_blobs(blob_hashes): for blob_hash, position, iv, length in blob_hashes: - log.debug("Preparing to send %s", blob_hash) if blob_hash is not None: self.blob_hashes_to_send.append(blob_hash) + log.debug("Preparing to send %i blobs", len(self.blob_hashes_to_send)) d.addCallback(set_blobs) @@ -135,6 +135,7 @@ class EncryptedFileReflectorClient(Protocol): def set_sd_blobs(sd_blob_hashes): for sd_blob_hash in sd_blob_hashes: self.blob_hashes_to_send.append(sd_blob_hash) + log.debug("Preparing to send %i sd blobs", len(sd_blob_hashes)) d.addCallback(set_sd_blobs) return d @@ -190,11 +191,13 @@ class EncryptedFileReflectorClient(Protocol): self.file_sender = FileSender() return defer.succeed(True) else: + log.debug("Reflector already has %s", str(self.next_blob_to_send.blob_hash)[:16]) return self.set_not_uploading() else: # Expecting Server Blob Response if 'received_blob' not in response_dict: raise ValueError("I don't know if the blob made it to the intended destination!") else: + log.info("Reflector received %s", str(self.next_blob_to_send.blob_hash)[:16]) return self.set_not_uploading() def open_blob_for_reading(self, blob): @@ -205,22 +208,27 @@ class EncryptedFileReflectorClient(Protocol): self.next_blob_to_send = blob self.read_handle = read_handle return None - raise ValueError( - "Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) + else: + log.warning("Can't reflect blob %s", str(blob.blob_hash)[:16]) + return defer.fail(ValueError( + "Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))) def send_blob_info(self): - log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" - log.debug('sending blob info') + log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) self.write(json.dumps({ 'blob_hash': self.next_blob_to_send.blob_hash, 'blob_size': self.next_blob_to_send.length })) + def skip_missing_blob(self, err, blob_hash): + err.trap(ValueError) + return self.send_next_request() + def send_next_request(self): if self.file_sender is not None: # send the blob - log.debug('Sending the blob') + log.debug('Sending %s to reflector', str(self.next_blob_to_send.blob_hash)[:16]) return self.start_transfer() elif self.blob_hashes_to_send: # open the next blob to send @@ -230,7 +238,7 @@ class EncryptedFileReflectorClient(Protocol): d = self.blob_manager.get_blob(blob_hash, True) d.addCallback(self.open_blob_for_reading) # send the server the next blob hash + length - d.addCallback(lambda _: self.send_blob_info()) + d.addCallbacks(lambda _: self.send_blob_info(), lambda err: self.skip_missing_blob(err, blob_hash)) return d else: # close connection diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 1656fe3ef..86bfea2cf 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -27,6 +27,7 @@ def _reflect_stream(lbry_file, reflector_server): ) d = reactor.resolve(reflector_address) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: log.info("Connected to %s", reflector_address)) d.addCallback(lambda _: factory.finished_deferred) return d @@ -40,13 +41,10 @@ def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server): def _catch_error(err, uri): - log.error("An error occurred while checking availability for lbry://%s", uri) - log.debug("Traceback: %s", err.getTraceback()) + log.error("An error occurred while checking availability for lbry://%s: %s", uri, err.getTraceback()) def check_and_restore_availability(lbry_file, reflector_server): - d = _check_if_reflector_has_stream(lbry_file, reflector_server) - d.addCallbacks( - lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server), - lambda err: _catch_error(err, lbry_file.uri)) + d = _reflect_stream(lbry_file, reflector_server) + d.addErrback(_catch_error, lbry_file.uri) return d