log traceback on failed blob upload
-move status message on connectionDone to client, indicate if blobs were sent or not (and how many blobs reflector still needs, if any) -only try uploading failed blob once after first failure, to prevent indefinite retries
This commit is contained in:
parent
832a32474f
commit
777419b4a9
2 changed files with 18 additions and 12 deletions
|
@ -20,6 +20,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.response_buff = ''
|
self.response_buff = ''
|
||||||
self.outgoing_buff = ''
|
self.outgoing_buff = ''
|
||||||
self.blob_hashes_to_send = []
|
self.blob_hashes_to_send = []
|
||||||
|
self.failed_blob_hashes = []
|
||||||
self.next_blob_to_send = None
|
self.next_blob_to_send = None
|
||||||
self.read_handle = None
|
self.read_handle = None
|
||||||
self.sent_stream_info = False
|
self.sent_stream_info = False
|
||||||
|
@ -54,10 +55,17 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
if reason.check(error.ConnectionDone):
|
if reason.check(error.ConnectionDone):
|
||||||
log.debug('Finished sending data via reflector')
|
if not self.needed_blobs:
|
||||||
|
log.info("Reflector has all blobs for %s", self.lbry_uri)
|
||||||
|
elif not self.reflected_blobs:
|
||||||
|
log.info("No more completed blobs for %s to reflect, %i are still needed",
|
||||||
|
self.lbry_uri, len(self.needed_blobs))
|
||||||
|
else:
|
||||||
|
log.info('Finished sending reflector %i blobs for %s',
|
||||||
|
len(self.reflected_blobs), self.lbry_uri)
|
||||||
self.factory.finished_deferred.callback(self.reflected_blobs)
|
self.factory.finished_deferred.callback(self.reflected_blobs)
|
||||||
else:
|
else:
|
||||||
log.debug('Reflector finished: %s', reason)
|
log.info('Reflector finished for %s: %s', self.lbry_uri, reason)
|
||||||
self.factory.finished_deferred.callback(reason)
|
self.factory.finished_deferred.callback(reason)
|
||||||
|
|
||||||
# IConsumer stuff
|
# IConsumer stuff
|
||||||
|
@ -248,9 +256,15 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.send_request(r)
|
self.send_request(r)
|
||||||
|
|
||||||
def skip_missing_blob(self, err, blob_hash):
|
def skip_missing_blob(self, err, blob_hash):
|
||||||
log.warning("Can't reflect blob %s", str(blob_hash)[:16])
|
|
||||||
err.trap(ValueError)
|
err.trap(ValueError)
|
||||||
self.blob_hashes_to_send.append(blob_hash)
|
if blob_hash not in self.failed_blob_hashes:
|
||||||
|
log.warning("Failed to reflect blob %s for %s, reason: %s",
|
||||||
|
str(blob_hash)[:16], self.lbry_uri, err.getTraceback())
|
||||||
|
self.blob_hashes_to_send.append(blob_hash)
|
||||||
|
self.failed_blob_hashes.append(blob_hash)
|
||||||
|
else:
|
||||||
|
log.warning("Failed second try reflecting blob %s for %s, giving up, reason: %s",
|
||||||
|
str(blob_hash)[:16], self.lbry_uri, err.getTraceback())
|
||||||
|
|
||||||
def send_next_request(self):
|
def send_next_request(self):
|
||||||
if self.file_sender is not None:
|
if self.file_sender is not None:
|
||||||
|
|
|
@ -19,13 +19,6 @@ def _check_if_reflector_has_stream(lbry_file, reflector_server):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
def log_result(result, uri):
|
|
||||||
if len(result) == 0:
|
|
||||||
log.info("Reflector has all blobs for lbry://%s", uri)
|
|
||||||
else:
|
|
||||||
log.info("Reflected %i blobs for lbry://%s", len(result), uri)
|
|
||||||
|
|
||||||
|
|
||||||
def _reflect_stream(lbry_file, reflector_server):
|
def _reflect_stream(lbry_file, reflector_server):
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||||
factory = ClientFactory(
|
factory = ClientFactory(
|
||||||
|
@ -37,7 +30,6 @@ def _reflect_stream(lbry_file, reflector_server):
|
||||||
d = reactor.resolve(reflector_address)
|
d = reactor.resolve(reflector_address)
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||||
d.addCallback(lambda _: factory.finished_deferred)
|
d.addCallback(lambda _: factory.finished_deferred)
|
||||||
d.addCallback(log_result, lbry_file.uri)
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue