Merge pull request #467 from lbryio/fix-reflector-bugs

Fix reflector bugs
This commit is contained in:
Jack Robison 2017-02-08 16:40:57 -05:00 committed by GitHub
commit c527c32e5f
6 changed files with 49 additions and 24 deletions

View file

@ -8,6 +8,11 @@ can and probably will change functionality and break backwards compatability
at anytime. at anytime.
## [Unreleased] ## [Unreleased]
### Changed
* add uri to stream reflector to de-obfuscate reflector logs
### Fixed
* fix recursion depth error upon failed blob
* call stopProducing in reflector client file_sender when uploading is done
## [0.8.1] - 2017-02-01 ## [0.8.1] - 2017-02-01
### Changed ### Changed

View file

@ -984,14 +984,13 @@ class Daemon(AuthJSONRPCServer):
def _reflect(self, lbry_file): def _reflect(self, lbry_file):
if not lbry_file: if not lbry_file:
return defer.fail(Exception("no lbry file given to reflect")) return defer.fail(Exception("no lbry file given to reflect"))
stream_hash = lbry_file.stream_hash if lbry_file.stream_hash is None:
if stream_hash is None:
return defer.fail(Exception("no stream hash")) return defer.fail(Exception("no stream hash"))
log.info("Reflecting stream: %s" % stream_hash)
factory = reflector.ClientFactory( factory = reflector.ClientFactory(
self.session.blob_manager, self.session.blob_manager,
self.lbry_file_manager.stream_info_manager, self.lbry_file_manager.stream_info_manager,
stream_hash lbry_file.stream_hash,
lbry_file.uri
) )
return run_reflector_factory(factory) return run_reflector_factory(factory)

View file

@ -90,7 +90,8 @@ class Publisher(object):
factory = reflector.ClientFactory( factory = reflector.ClientFactory(
self.session.blob_manager, self.session.blob_manager,
self.lbry_file_manager.stream_info_manager, self.lbry_file_manager.stream_info_manager,
self.stream_hash self.stream_hash,
self.publish_name
) )
d = reactor.resolve(reflector_address) d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))

View file

@ -20,11 +20,13 @@ class EncryptedFileReflectorClient(Protocol):
self.response_buff = '' self.response_buff = ''
self.outgoing_buff = '' self.outgoing_buff = ''
self.blob_hashes_to_send = [] self.blob_hashes_to_send = []
self.failed_blob_hashes = []
self.next_blob_to_send = None self.next_blob_to_send = None
self.read_handle = None self.read_handle = None
self.sent_stream_info = False self.sent_stream_info = False
self.received_descriptor_response = False self.received_descriptor_response = False
self.protocol_version = self.factory.protocol_version self.protocol_version = self.factory.protocol_version
self.lbry_uri = "lbry://%s" % self.factory.lbry_uri
self.received_server_version = False self.received_server_version = False
self.server_version = None self.server_version = None
self.stream_descriptor = None self.stream_descriptor = None
@ -53,10 +55,17 @@ class EncryptedFileReflectorClient(Protocol):
def connectionLost(self, reason): def connectionLost(self, reason):
if reason.check(error.ConnectionDone): if reason.check(error.ConnectionDone):
log.debug('Finished sending data via reflector') if not self.needed_blobs:
log.info("Reflector has all blobs for %s", self.lbry_uri)
elif not self.reflected_blobs:
log.info("No more completed blobs for %s to reflect, %i are still needed",
self.lbry_uri, len(self.needed_blobs))
else:
log.info('Finished sending reflector %i blobs for %s',
len(self.reflected_blobs), self.lbry_uri)
self.factory.finished_deferred.callback(self.reflected_blobs) self.factory.finished_deferred.callback(self.reflected_blobs)
else: else:
log.debug('Reflector finished: %s', reason) log.info('Reflector finished for %s: %s', self.lbry_uri, reason)
self.factory.finished_deferred.callback(reason) self.factory.finished_deferred.callback(reason)
# IConsumer stuff # IConsumer stuff
@ -99,7 +108,7 @@ class EncryptedFileReflectorClient(Protocol):
log.info("Reflector needs %s%i blobs for %s", log.info("Reflector needs %s%i blobs for %s",
needs_desc, needs_desc,
len(filtered), len(filtered),
str(self.stream_descriptor)[:16]) self.lbry_uri)
return filtered return filtered
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash) d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
@ -145,9 +154,11 @@ class EncryptedFileReflectorClient(Protocol):
def set_not_uploading(self): def set_not_uploading(self):
if self.next_blob_to_send is not None: if self.next_blob_to_send is not None:
log.debug("Close %s", self.next_blob_to_send)
self.next_blob_to_send.close_read_handle(self.read_handle) self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None self.read_handle = None
self.next_blob_to_send = None self.next_blob_to_send = None
self.file_sender.stopProducing()
self.file_sender = None self.file_sender = None
return defer.succeed(None) return defer.succeed(None)
@ -184,10 +195,10 @@ class EncryptedFileReflectorClient(Protocol):
self.received_descriptor_response = True self.received_descriptor_response = True
if response_dict['received_sd_blob']: if response_dict['received_sd_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash) self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16]) log.info("Sent reflector descriptor %s", self.next_blob_to_send)
else: else:
log.warning("Reflector failed to receive descriptor %s, trying again later", log.warning("Reflector failed to receive descriptor %s for %s",
self.next_blob_to_send.blob_hash[:16]) self.next_blob_to_send, self.lbry_uri)
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading() return self.set_not_uploading()
@ -199,7 +210,8 @@ class EncryptedFileReflectorClient(Protocol):
self.file_sender = FileSender() self.file_sender = FileSender()
return defer.succeed(True) return defer.succeed(True)
else: else:
log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16]) log.warning("Reflector already has %s for %s", self.next_blob_to_send,
self.lbry_uri)
return self.set_not_uploading() return self.set_not_uploading()
else: # Expecting Server Blob Response else: # Expecting Server Blob Response
if 'received_blob' not in response_dict: if 'received_blob' not in response_dict:
@ -207,10 +219,11 @@ class EncryptedFileReflectorClient(Protocol):
else: else:
if response_dict['received_blob']: if response_dict['received_blob']:
self.reflected_blobs.append(self.next_blob_to_send.blob_hash) self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16]) log.info("Sent reflector blob %s for %s", self.next_blob_to_send,
self.lbry_uri)
else: else:
log.warning("Reflector failed to receive blob %s, trying again later", log.warning("Reflector failed to receive blob %s for %s",
self.next_blob_to_send.blob_hash[:16]) self.next_blob_to_send, self.lbry_uri)
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
return self.set_not_uploading() return self.set_not_uploading()
@ -243,9 +256,15 @@ class EncryptedFileReflectorClient(Protocol):
self.send_request(r) self.send_request(r)
def skip_missing_blob(self, err, blob_hash): def skip_missing_blob(self, err, blob_hash):
log.warning("Can't reflect blob %s", str(blob_hash)[:16])
err.trap(ValueError) err.trap(ValueError)
return self.send_next_request() if blob_hash not in self.failed_blob_hashes:
log.warning("Failed to reflect blob %s for %s, reason: %s",
str(blob_hash)[:16], self.lbry_uri, err.getTraceback())
self.blob_hashes_to_send.append(blob_hash)
self.failed_blob_hashes.append(blob_hash)
else:
log.warning("Failed second try reflecting blob %s for %s, giving up, reason: %s",
str(blob_hash)[:16], self.lbry_uri, err.getTraceback())
def send_next_request(self): def send_next_request(self):
if self.file_sender is not None: if self.file_sender is not None:
@ -255,7 +274,8 @@ class EncryptedFileReflectorClient(Protocol):
# open the sd blob to send # open the sd blob to send
blob = self.stream_descriptor blob = self.stream_descriptor
d = self.open_blob_for_reading(blob) d = self.open_blob_for_reading(blob)
d.addCallback(lambda _: self.send_descriptor_info()) d.addCallbacks(lambda _: self.send_descriptor_info(),
lambda err: self.skip_missing_blob(err, blob.blob_hash))
return d return d
elif self.blob_hashes_to_send: elif self.blob_hashes_to_send:
# open the next blob to send # open the next blob to send
@ -272,11 +292,12 @@ class EncryptedFileReflectorClient(Protocol):
class EncryptedFileReflectorClientFactory(ClientFactory): class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient protocol = EncryptedFileReflectorClient
def __init__(self, blob_manager, stream_info_manager, stream_hash): def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri):
self.protocol_version = REFLECTOR_V2 self.protocol_version = REFLECTOR_V2
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash self.stream_hash = stream_hash
self.lbry_uri = lbry_uri
self.p = None self.p = None
self.finished_deferred = defer.Deferred() self.finished_deferred = defer.Deferred()

View file

@ -24,14 +24,12 @@ def _reflect_stream(lbry_file, reflector_server):
factory = ClientFactory( factory = ClientFactory(
lbry_file.blob_manager, lbry_file.blob_manager,
lbry_file.stream_info_manager, lbry_file.stream_info_manager,
lbry_file.stream_hash lbry_file.stream_hash,
lbry_file.uri
) )
d = reactor.resolve(reflector_address) d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred) d.addCallback(lambda _: factory.finished_deferred)
d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s",
len(reflected_blobs),
lbry_file.uri))
return d return d

View file

@ -173,7 +173,8 @@ class TestReflector(unittest.TestCase):
factory = reflector.ClientFactory( factory = reflector.ClientFactory(
self.session.blob_manager, self.session.blob_manager,
self.stream_info_manager, self.stream_info_manager,
self.stream_hash self.stream_hash,
"fake_uri"
) )
from twisted.internet import reactor from twisted.internet import reactor