forked from LBRYCommunity/lbry-sdk
remove repeated code for reflector
This commit is contained in:
parent
85dee27c92
commit
abc7b11e26
1 changed files with 12 additions and 20 deletions
|
@ -1022,44 +1022,26 @@ class Daemon(AuthJSONRPCServer):
|
||||||
def _reflect(self, lbry_file):
|
def _reflect(self, lbry_file):
|
||||||
if not lbry_file:
|
if not lbry_file:
|
||||||
return defer.fail(Exception("no lbry file given to reflect"))
|
return defer.fail(Exception("no lbry file given to reflect"))
|
||||||
|
|
||||||
stream_hash = lbry_file.stream_hash
|
stream_hash = lbry_file.stream_hash
|
||||||
|
|
||||||
if stream_hash is None:
|
if stream_hash is None:
|
||||||
return defer.fail(Exception("no stream hash"))
|
return defer.fail(Exception("no stream hash"))
|
||||||
|
|
||||||
log.info("Reflecting stream: %s" % stream_hash)
|
log.info("Reflecting stream: %s" % stream_hash)
|
||||||
|
|
||||||
reflector_server = random.choice(lbrynet_settings.reflector_servers)
|
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
|
||||||
log.info("Start reflector client")
|
|
||||||
factory = reflector.ClientFactory(
|
factory = reflector.ClientFactory(
|
||||||
self.session.blob_manager,
|
self.session.blob_manager,
|
||||||
self.lbry_file_manager.stream_info_manager,
|
self.lbry_file_manager.stream_info_manager,
|
||||||
stream_hash
|
stream_hash
|
||||||
)
|
)
|
||||||
d = reactor.resolve(reflector_address)
|
return run_reflector_factory(factory)
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
|
||||||
d.addCallback(lambda _: factory.finished_deferred)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _reflect_blobs(self, blob_hashes):
|
def _reflect_blobs(self, blob_hashes):
|
||||||
if not blob_hashes:
|
if not blob_hashes:
|
||||||
return defer.fail(Exception("no lbry file given to reflect"))
|
return defer.fail(Exception("no lbry file given to reflect"))
|
||||||
|
|
||||||
log.info("Reflecting %i blobs" % len(blob_hashes))
|
log.info("Reflecting %i blobs" % len(blob_hashes))
|
||||||
|
|
||||||
reflector_server = random.choice(lbrynet_settings.reflector_servers)
|
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
|
||||||
log.info("Start reflector client")
|
|
||||||
factory = reflector.BlobClientFactory(
|
factory = reflector.BlobClientFactory(
|
||||||
self.session.blob_manager,
|
self.session.blob_manager,
|
||||||
blob_hashes
|
blob_hashes
|
||||||
)
|
)
|
||||||
d = reactor.resolve(reflector_address)
|
return run_reflector_factory(factory)
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
|
||||||
d.addCallback(lambda _: factory.finished_deferred)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _log_to_slack(self, msg):
|
def _log_to_slack(self, msg):
|
||||||
URL = "https://hooks.slack.com/services/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"
|
URL = "https://hooks.slack.com/services/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"
|
||||||
|
@ -2701,3 +2683,13 @@ def handle_failure(err, msg):
|
||||||
#
|
#
|
||||||
# If so, maybe we should return something else.
|
# If so, maybe we should return something else.
|
||||||
return server.failure
|
return server.failure
|
||||||
|
|
||||||
|
|
||||||
|
def run_reflector_factory(factory):
|
||||||
|
reflector_server = random.choice(lbrynet_settings.reflector_servers)
|
||||||
|
reflector_address, reflector_port = reflector_server
|
||||||
|
log.info("Start reflector client")
|
||||||
|
d = reactor.resolve(reflector_address)
|
||||||
|
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||||
|
d.addCallback(lambda _: factory.finished_deferred)
|
||||||
|
return d
|
||||||
|
|
Loading…
Reference in a new issue