try reflecting all the blobs in a stream

previously only the sd blob was reflected, if the server indicated it
needed the blob then the rest of the stream would follow. this allowed
for many streams to be partially reflected, where for whatever reason
the connection was broken before the full upload was completed. this
meant that on a subsequent run, the client would falsely believe
reflector had the whole stream when it actually only had some portion
of it.

this solution isn’t ideal, I’m most of the way done with a better one,
but this can be deployed now.
This commit is contained in:
Jack Robison 2017-01-25 13:03:48 -05:00
parent b80a1f66da
commit 5fa2dfeca7
2 changed files with 20 additions and 14 deletions

View file

@ -94,7 +94,7 @@ class EncryptedFileReflectorClient(Protocol):
def connectionLost(self, reason): def connectionLost(self, reason):
if reason.check(error.ConnectionDone): if reason.check(error.ConnectionDone):
log.debug('Finished sending data via reflector') log.info('Finished sending data via reflector')
self.factory.finished_deferred.callback(True) self.factory.finished_deferred.callback(True)
else: else:
log.debug('Reflector finished: %s', reason) log.debug('Reflector finished: %s', reason)
@ -124,9 +124,9 @@ class EncryptedFileReflectorClient(Protocol):
def set_blobs(blob_hashes): def set_blobs(blob_hashes):
for blob_hash, position, iv, length in blob_hashes: for blob_hash, position, iv, length in blob_hashes:
log.debug("Preparing to send %s", blob_hash)
if blob_hash is not None: if blob_hash is not None:
self.blob_hashes_to_send.append(blob_hash) self.blob_hashes_to_send.append(blob_hash)
log.debug("Preparing to send %i blobs", len(self.blob_hashes_to_send))
d.addCallback(set_blobs) d.addCallback(set_blobs)
@ -135,6 +135,7 @@ class EncryptedFileReflectorClient(Protocol):
def set_sd_blobs(sd_blob_hashes): def set_sd_blobs(sd_blob_hashes):
for sd_blob_hash in sd_blob_hashes: for sd_blob_hash in sd_blob_hashes:
self.blob_hashes_to_send.append(sd_blob_hash) self.blob_hashes_to_send.append(sd_blob_hash)
log.debug("Preparing to send %i sd blobs", len(sd_blob_hashes))
d.addCallback(set_sd_blobs) d.addCallback(set_sd_blobs)
return d return d
@ -190,11 +191,13 @@ class EncryptedFileReflectorClient(Protocol):
self.file_sender = FileSender() self.file_sender = FileSender()
return defer.succeed(True) return defer.succeed(True)
else: else:
log.debug("Reflector already has %s", str(self.next_blob_to_send.blob_hash)[:16])
return self.set_not_uploading() return self.set_not_uploading()
else: # Expecting Server Blob Response else: # Expecting Server Blob Response
if 'received_blob' not in response_dict: if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!") raise ValueError("I don't know if the blob made it to the intended destination!")
else: else:
log.info("Reflector received %s", str(self.next_blob_to_send.blob_hash)[:16])
return self.set_not_uploading() return self.set_not_uploading()
def open_blob_for_reading(self, blob): def open_blob_for_reading(self, blob):
@ -205,22 +208,27 @@ class EncryptedFileReflectorClient(Protocol):
self.next_blob_to_send = blob self.next_blob_to_send = blob
self.read_handle = read_handle self.read_handle = read_handle
return None return None
raise ValueError( else:
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) log.warning("Can't reflect blob %s", str(blob.blob_hash)[:16])
return defer.fail(ValueError(
"Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)))
def send_blob_info(self): def send_blob_info(self):
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
log.debug('sending blob info') log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
self.write(json.dumps({ self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash, 'blob_hash': self.next_blob_to_send.blob_hash,
'blob_size': self.next_blob_to_send.length 'blob_size': self.next_blob_to_send.length
})) }))
def skip_missing_blob(self, err, blob_hash):
err.trap(ValueError)
return self.send_next_request()
def send_next_request(self): def send_next_request(self):
if self.file_sender is not None: if self.file_sender is not None:
# send the blob # send the blob
log.debug('Sending the blob') log.debug('Sending %s to reflector', str(self.next_blob_to_send.blob_hash)[:16])
return self.start_transfer() return self.start_transfer()
elif self.blob_hashes_to_send: elif self.blob_hashes_to_send:
# open the next blob to send # open the next blob to send
@ -230,7 +238,7 @@ class EncryptedFileReflectorClient(Protocol):
d = self.blob_manager.get_blob(blob_hash, True) d = self.blob_manager.get_blob(blob_hash, True)
d.addCallback(self.open_blob_for_reading) d.addCallback(self.open_blob_for_reading)
# send the server the next blob hash + length # send the server the next blob hash + length
d.addCallback(lambda _: self.send_blob_info()) d.addCallbacks(lambda _: self.send_blob_info(), lambda err: self.skip_missing_blob(err, blob_hash))
return d return d
else: else:
# close connection # close connection

View file

@ -27,6 +27,7 @@ def _reflect_stream(lbry_file, reflector_server):
) )
d = reactor.resolve(reflector_address) d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: log.info("Connected to %s", reflector_address))
d.addCallback(lambda _: factory.finished_deferred) d.addCallback(lambda _: factory.finished_deferred)
return d return d
@ -40,13 +41,10 @@ def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server):
def _catch_error(err, uri): def _catch_error(err, uri):
log.error("An error occurred while checking availability for lbry://%s", uri) log.error("An error occurred while checking availability for lbry://%s: %s", uri, err.getTraceback())
log.debug("Traceback: %s", err.getTraceback())
def check_and_restore_availability(lbry_file, reflector_server): def check_and_restore_availability(lbry_file, reflector_server):
d = _check_if_reflector_has_stream(lbry_file, reflector_server) d = _reflect_stream(lbry_file, reflector_server)
d.addCallbacks( d.addErrback(_catch_error, lbry_file.uri)
lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server),
lambda err: _catch_error(err, lbry_file.uri))
return d return d