Merge pull request #440 from lbryio/reflect-missing-blobs
Reflect missing blobs
This commit is contained in:
commit
74b4e4508a
3 changed files with 23 additions and 14 deletions
|
@ -117,6 +117,7 @@ def disable_third_party_loggers():
|
||||||
logging.getLogger('urllib3').setLevel(logging.WARNING)
|
logging.getLogger('urllib3').setLevel(logging.WARNING)
|
||||||
logging.getLogger('BitcoinRPC').setLevel(logging.INFO)
|
logging.getLogger('BitcoinRPC').setLevel(logging.INFO)
|
||||||
logging.getLogger('lbryum').setLevel(logging.WARNING)
|
logging.getLogger('lbryum').setLevel(logging.WARNING)
|
||||||
|
logging.getLogger('twisted').setLevel(logging.WARNING)
|
||||||
|
|
||||||
|
|
||||||
@_log_decorator
|
@_log_decorator
|
||||||
|
|
|
@ -94,7 +94,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
if reason.check(error.ConnectionDone):
|
if reason.check(error.ConnectionDone):
|
||||||
log.debug('Finished sending data via reflector')
|
log.info('Finished sending data via reflector')
|
||||||
self.factory.finished_deferred.callback(True)
|
self.factory.finished_deferred.callback(True)
|
||||||
else:
|
else:
|
||||||
log.debug('Reflector finished: %s', reason)
|
log.debug('Reflector finished: %s', reason)
|
||||||
|
@ -124,9 +124,9 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
|
|
||||||
def set_blobs(blob_hashes):
|
def set_blobs(blob_hashes):
|
||||||
for blob_hash, position, iv, length in blob_hashes:
|
for blob_hash, position, iv, length in blob_hashes:
|
||||||
log.debug("Preparing to send %s", blob_hash)
|
|
||||||
if blob_hash is not None:
|
if blob_hash is not None:
|
||||||
self.blob_hashes_to_send.append(blob_hash)
|
self.blob_hashes_to_send.append(blob_hash)
|
||||||
|
log.debug("Preparing to send %i blobs", len(self.blob_hashes_to_send))
|
||||||
|
|
||||||
d.addCallback(set_blobs)
|
d.addCallback(set_blobs)
|
||||||
|
|
||||||
|
@ -135,6 +135,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
def set_sd_blobs(sd_blob_hashes):
|
def set_sd_blobs(sd_blob_hashes):
|
||||||
for sd_blob_hash in sd_blob_hashes:
|
for sd_blob_hash in sd_blob_hashes:
|
||||||
self.blob_hashes_to_send.append(sd_blob_hash)
|
self.blob_hashes_to_send.append(sd_blob_hash)
|
||||||
|
log.debug("Preparing to send %i sd blobs", len(sd_blob_hashes))
|
||||||
|
|
||||||
d.addCallback(set_sd_blobs)
|
d.addCallback(set_sd_blobs)
|
||||||
return d
|
return d
|
||||||
|
@ -190,11 +191,13 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.file_sender = FileSender()
|
self.file_sender = FileSender()
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
else:
|
else:
|
||||||
|
log.debug("Reflector already has %s", str(self.next_blob_to_send.blob_hash)[:16])
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
else: # Expecting Server Blob Response
|
else: # Expecting Server Blob Response
|
||||||
if 'received_blob' not in response_dict:
|
if 'received_blob' not in response_dict:
|
||||||
raise ValueError("I don't know if the blob made it to the intended destination!")
|
raise ValueError("I don't know if the blob made it to the intended destination!")
|
||||||
else:
|
else:
|
||||||
|
log.info("Reflector received %s", str(self.next_blob_to_send.blob_hash)[:16])
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
|
|
||||||
def open_blob_for_reading(self, blob):
|
def open_blob_for_reading(self, blob):
|
||||||
|
@ -205,22 +208,27 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.next_blob_to_send = blob
|
self.next_blob_to_send = blob
|
||||||
self.read_handle = read_handle
|
self.read_handle = read_handle
|
||||||
return None
|
return None
|
||||||
raise ValueError(
|
else:
|
||||||
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
|
log.warning("Can't reflect blob %s", str(blob.blob_hash)[:16])
|
||||||
|
return defer.fail(ValueError(
|
||||||
|
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)))
|
||||||
|
|
||||||
def send_blob_info(self):
|
def send_blob_info(self):
|
||||||
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
|
||||||
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
|
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
|
||||||
log.debug('sending blob info')
|
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||||
self.write(json.dumps({
|
self.write(json.dumps({
|
||||||
'blob_hash': self.next_blob_to_send.blob_hash,
|
'blob_hash': self.next_blob_to_send.blob_hash,
|
||||||
'blob_size': self.next_blob_to_send.length
|
'blob_size': self.next_blob_to_send.length
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
def skip_missing_blob(self, err, blob_hash):
|
||||||
|
err.trap(ValueError)
|
||||||
|
return self.send_next_request()
|
||||||
|
|
||||||
def send_next_request(self):
|
def send_next_request(self):
|
||||||
if self.file_sender is not None:
|
if self.file_sender is not None:
|
||||||
# send the blob
|
# send the blob
|
||||||
log.debug('Sending the blob')
|
log.debug('Sending %s to reflector', str(self.next_blob_to_send.blob_hash)[:16])
|
||||||
return self.start_transfer()
|
return self.start_transfer()
|
||||||
elif self.blob_hashes_to_send:
|
elif self.blob_hashes_to_send:
|
||||||
# open the next blob to send
|
# open the next blob to send
|
||||||
|
@ -230,7 +238,8 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
d = self.blob_manager.get_blob(blob_hash, True)
|
d = self.blob_manager.get_blob(blob_hash, True)
|
||||||
d.addCallback(self.open_blob_for_reading)
|
d.addCallback(self.open_blob_for_reading)
|
||||||
# send the server the next blob hash + length
|
# send the server the next blob hash + length
|
||||||
d.addCallback(lambda _: self.send_blob_info())
|
d.addCallbacks(lambda _: self.send_blob_info(),
|
||||||
|
lambda err: self.skip_missing_blob(err, blob_hash))
|
||||||
return d
|
return d
|
||||||
else:
|
else:
|
||||||
# close connection
|
# close connection
|
||||||
|
|
|
@ -27,6 +27,7 @@ def _reflect_stream(lbry_file, reflector_server):
|
||||||
)
|
)
|
||||||
d = reactor.resolve(reflector_address)
|
d = reactor.resolve(reflector_address)
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||||
|
d.addCallback(lambda _: log.info("Connected to %s", reflector_address))
|
||||||
d.addCallback(lambda _: factory.finished_deferred)
|
d.addCallback(lambda _: factory.finished_deferred)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -40,13 +41,11 @@ def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server):
|
||||||
|
|
||||||
|
|
||||||
def _catch_error(err, uri):
|
def _catch_error(err, uri):
|
||||||
log.error("An error occurred while checking availability for lbry://%s", uri)
|
msg = "An error occurred while checking availability for lbry://%s: %s"
|
||||||
log.debug("Traceback: %s", err.getTraceback())
|
log.error(msg, uri, err.getTraceback())
|
||||||
|
|
||||||
|
|
||||||
def check_and_restore_availability(lbry_file, reflector_server):
|
def check_and_restore_availability(lbry_file, reflector_server):
|
||||||
d = _check_if_reflector_has_stream(lbry_file, reflector_server)
|
d = _reflect_stream(lbry_file, reflector_server)
|
||||||
d.addCallbacks(
|
d.addErrback(_catch_error, lbry_file.uri)
|
||||||
lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server),
|
|
||||||
lambda err: _catch_error(err, lbry_file.uri))
|
|
||||||
return d
|
return d
|
||||||
|
|
Loading…
Reference in a new issue