forked from LBRYCommunity/lbry-sdk
add uri to stream reflector for better logging
This commit is contained in:
parent
a58a81a841
commit
d053db8dfd
|
@ -984,14 +984,13 @@ class Daemon(AuthJSONRPCServer):
|
||||||
def _reflect(self, lbry_file):
|
def _reflect(self, lbry_file):
|
||||||
if not lbry_file:
|
if not lbry_file:
|
||||||
return defer.fail(Exception("no lbry file given to reflect"))
|
return defer.fail(Exception("no lbry file given to reflect"))
|
||||||
stream_hash = lbry_file.stream_hash
|
if lbry_file.stream_hash is None:
|
||||||
if stream_hash is None:
|
|
||||||
return defer.fail(Exception("no stream hash"))
|
return defer.fail(Exception("no stream hash"))
|
||||||
log.info("Reflecting stream: %s" % stream_hash)
|
|
||||||
factory = reflector.ClientFactory(
|
factory = reflector.ClientFactory(
|
||||||
self.session.blob_manager,
|
self.session.blob_manager,
|
||||||
self.lbry_file_manager.stream_info_manager,
|
self.lbry_file_manager.stream_info_manager,
|
||||||
stream_hash
|
lbry_file.stream_hash,
|
||||||
|
lbry_file.uri
|
||||||
)
|
)
|
||||||
return run_reflector_factory(factory)
|
return run_reflector_factory(factory)
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,8 @@ class Publisher(object):
|
||||||
factory = reflector.ClientFactory(
|
factory = reflector.ClientFactory(
|
||||||
self.session.blob_manager,
|
self.session.blob_manager,
|
||||||
self.lbry_file_manager.stream_info_manager,
|
self.lbry_file_manager.stream_info_manager,
|
||||||
self.stream_hash
|
self.stream_hash,
|
||||||
|
self.publish_name
|
||||||
)
|
)
|
||||||
d = reactor.resolve(reflector_address)
|
d = reactor.resolve(reflector_address)
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||||
|
|
|
@ -25,6 +25,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.sent_stream_info = False
|
self.sent_stream_info = False
|
||||||
self.received_descriptor_response = False
|
self.received_descriptor_response = False
|
||||||
self.protocol_version = self.factory.protocol_version
|
self.protocol_version = self.factory.protocol_version
|
||||||
|
self.lbry_uri = "lbry://%s" % self.factory.lbry_uri
|
||||||
self.received_server_version = False
|
self.received_server_version = False
|
||||||
self.server_version = None
|
self.server_version = None
|
||||||
self.stream_descriptor = None
|
self.stream_descriptor = None
|
||||||
|
@ -99,7 +100,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
log.info("Reflector needs %s%i blobs for %s",
|
log.info("Reflector needs %s%i blobs for %s",
|
||||||
needs_desc,
|
needs_desc,
|
||||||
len(filtered),
|
len(filtered),
|
||||||
str(self.stream_descriptor)[:16])
|
self.lbry_uri)
|
||||||
return filtered
|
return filtered
|
||||||
|
|
||||||
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
|
d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
|
||||||
|
@ -186,10 +187,10 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.received_descriptor_response = True
|
self.received_descriptor_response = True
|
||||||
if response_dict['received_sd_blob']:
|
if response_dict['received_sd_blob']:
|
||||||
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||||
log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16])
|
log.info("Sent reflector descriptor %s", self.next_blob_to_send)
|
||||||
else:
|
else:
|
||||||
log.warning("Reflector failed to receive descriptor %s, trying again later",
|
log.warning("Reflector failed to receive descriptor %s for %s",
|
||||||
self.next_blob_to_send.blob_hash[:16])
|
self.next_blob_to_send, self.lbry_uri)
|
||||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
|
|
||||||
|
@ -201,7 +202,8 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.file_sender = FileSender()
|
self.file_sender = FileSender()
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
else:
|
else:
|
||||||
log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16])
|
log.warning("Reflector already has %s for %s", self.next_blob_to_send,
|
||||||
|
self.lbry_uri)
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
else: # Expecting Server Blob Response
|
else: # Expecting Server Blob Response
|
||||||
if 'received_blob' not in response_dict:
|
if 'received_blob' not in response_dict:
|
||||||
|
@ -209,10 +211,11 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
else:
|
else:
|
||||||
if response_dict['received_blob']:
|
if response_dict['received_blob']:
|
||||||
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
|
||||||
log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16])
|
log.info("Sent reflector blob %s for %s", self.next_blob_to_send,
|
||||||
|
self.lbry_uri)
|
||||||
else:
|
else:
|
||||||
log.warning("Reflector failed to receive blob %s, trying again later",
|
log.warning("Reflector failed to receive blob %s for %s",
|
||||||
self.next_blob_to_send.blob_hash[:16])
|
self.next_blob_to_send, self.lbry_uri)
|
||||||
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
|
||||||
return self.set_not_uploading()
|
return self.set_not_uploading()
|
||||||
|
|
||||||
|
@ -274,11 +277,12 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
class EncryptedFileReflectorClientFactory(ClientFactory):
|
class EncryptedFileReflectorClientFactory(ClientFactory):
|
||||||
protocol = EncryptedFileReflectorClient
|
protocol = EncryptedFileReflectorClient
|
||||||
|
|
||||||
def __init__(self, blob_manager, stream_info_manager, stream_hash):
|
def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri):
|
||||||
self.protocol_version = REFLECTOR_V2
|
self.protocol_version = REFLECTOR_V2
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.stream_info_manager = stream_info_manager
|
self.stream_info_manager = stream_info_manager
|
||||||
self.stream_hash = stream_hash
|
self.stream_hash = stream_hash
|
||||||
|
self.lbry_uri = lbry_uri
|
||||||
self.p = None
|
self.p = None
|
||||||
self.finished_deferred = defer.Deferred()
|
self.finished_deferred = defer.Deferred()
|
||||||
|
|
||||||
|
|
|
@ -19,19 +19,25 @@ def _check_if_reflector_has_stream(lbry_file, reflector_server):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def log_result(result, uri):
|
||||||
|
if len(result) == 0:
|
||||||
|
log.info("Reflector has all blobs for lbry://%s", uri)
|
||||||
|
else:
|
||||||
|
log.info("Reflected %i blobs for lbry://%s", len(result), uri)
|
||||||
|
|
||||||
|
|
||||||
def _reflect_stream(lbry_file, reflector_server):
|
def _reflect_stream(lbry_file, reflector_server):
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||||
factory = ClientFactory(
|
factory = ClientFactory(
|
||||||
lbry_file.blob_manager,
|
lbry_file.blob_manager,
|
||||||
lbry_file.stream_info_manager,
|
lbry_file.stream_info_manager,
|
||||||
lbry_file.stream_hash
|
lbry_file.stream_hash,
|
||||||
|
lbry_file.uri
|
||||||
)
|
)
|
||||||
d = reactor.resolve(reflector_address)
|
d = reactor.resolve(reflector_address)
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||||
d.addCallback(lambda _: factory.finished_deferred)
|
d.addCallback(lambda _: factory.finished_deferred)
|
||||||
d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s",
|
d.addCallback(log_result, lbry_file.uri)
|
||||||
len(reflected_blobs),
|
|
||||||
lbry_file.uri))
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -173,7 +173,8 @@ class TestReflector(unittest.TestCase):
|
||||||
factory = reflector.ClientFactory(
|
factory = reflector.ClientFactory(
|
||||||
self.session.blob_manager,
|
self.session.blob_manager,
|
||||||
self.stream_info_manager,
|
self.stream_info_manager,
|
||||||
self.stream_hash
|
self.stream_hash,
|
||||||
|
"fake_uri"
|
||||||
)
|
)
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
|
|
Loading…
Reference in a new issue