feedback from job
This commit is contained in:
parent
d4785849e1
commit
035a1cf758
4 changed files with 48 additions and 50 deletions
|
@ -13,7 +13,7 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver,
|
||||||
from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport
|
from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport
|
||||||
from lbrynet.interfaces import IStreamDownloaderFactory
|
from lbrynet.interfaces import IStreamDownloaderFactory
|
||||||
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
|
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
|
||||||
from lbrynet.reflector import ReflectorAvailabilityHelper
|
from lbrynet.reflector import reupload
|
||||||
from lbrynet.conf import REFLECTOR_SERVERS
|
from lbrynet.conf import REFLECTOR_SERVERS
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -86,7 +86,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
|
|
||||||
reflector_server = random.choice(REFLECTOR_SERVERS)
|
reflector_server = random.choice(REFLECTOR_SERVERS)
|
||||||
|
|
||||||
d.addCallback(lambda _: ReflectorAvailabilityHelper.check_and_restore_availability(self, reflector_server))
|
d.addCallback(lambda _: reupload.check_and_restore_availability(self, reflector_server))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def stop(self, err=None, change_status=True):
|
def stop(self, err=None, change_status=True):
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
|
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
|
||||||
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
|
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
|
||||||
from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory
|
from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory
|
||||||
from lbrynet.reflector.util import ReflectorAvailabilityHelper
|
from lbrynet.reflector import reupload
|
45
lbrynet/reflector/reupload.py
Normal file
45
lbrynet/reflector/reupload.py
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
import logging
|
||||||
|
from twisted.internet import reactor, defer
|
||||||
|
from lbrynet.reflector import BlobClientFactory, ClientFactory
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_if_reflector_has_stream(lbry_file, reflector_server):
|
||||||
|
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||||
|
factory = BlobClientFactory(
|
||||||
|
lbry_file.blob_manager,
|
||||||
|
[lbry_file.sd_hash]
|
||||||
|
)
|
||||||
|
d = reactor.resolve(reflector_address)
|
||||||
|
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||||
|
d.addCallback(lambda _: factory.finished_deferred)
|
||||||
|
d.addCallback(lambda _: not factory.sent_blobs)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def _reflect_stream(lbry_file, reflector_server):
|
||||||
|
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||||
|
factory = ClientFactory(
|
||||||
|
lbry_file.blob_manager,
|
||||||
|
lbry_file.stream_info_manager,
|
||||||
|
lbry_file.stream_hash
|
||||||
|
)
|
||||||
|
d = reactor.resolve(reflector_address)
|
||||||
|
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||||
|
d.addCallback(lambda _: factory.finished_deferred)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server):
|
||||||
|
if reflector_has_stream:
|
||||||
|
log.info("lbry://%s is available", lbry_file.uri)
|
||||||
|
return defer.succeed(False)
|
||||||
|
log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri)
|
||||||
|
return _reflect_stream(lbry_file, reflector_server)
|
||||||
|
|
||||||
|
|
||||||
|
def check_and_restore_availability(cls, lbry_file, reflector_server):
|
||||||
|
d = cls._check_if_reflector_has_stream(lbry_file, reflector_server)
|
||||||
|
d.addCallback(lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server))
|
||||||
|
return d
|
|
@ -1,47 +0,0 @@
|
||||||
import logging
|
|
||||||
from twisted.internet import reactor, defer
|
|
||||||
from lbrynet.reflector import BlobClientFactory, ClientFactory
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class ReflectorAvailabilityHelper(object):
|
|
||||||
@staticmethod
|
|
||||||
def _check_if_reflector_has_stream(lbry_file, reflector_server):
|
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
|
||||||
factory = BlobClientFactory(
|
|
||||||
lbry_file.blob_manager,
|
|
||||||
[lbry_file.sd_hash]
|
|
||||||
)
|
|
||||||
d = reactor.resolve(reflector_address)
|
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
|
||||||
d.addCallback(lambda _: factory.finished_deferred)
|
|
||||||
d.addCallback(lambda _: not factory.sent_blobs)
|
|
||||||
return d
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _reflect_stream(lbry_file, reflector_server):
|
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
|
||||||
factory = ClientFactory(
|
|
||||||
lbry_file.blob_manager,
|
|
||||||
lbry_file.stream_info_manager,
|
|
||||||
lbry_file.stream_hash
|
|
||||||
)
|
|
||||||
d = reactor.resolve(reflector_address)
|
|
||||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
|
||||||
d.addCallback(lambda _: factory.finished_deferred)
|
|
||||||
return d
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _reflect_if_unavailable(cls, reflector_has_stream, lbry_file, reflector_server):
|
|
||||||
if reflector_has_stream:
|
|
||||||
log.info("lbry://%s is available", lbry_file.uri)
|
|
||||||
return defer.succeed(False)
|
|
||||||
log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri)
|
|
||||||
return cls._reflect_stream(lbry_file, reflector_server)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def check_and_restore_availability(cls, lbry_file, reflector_server):
|
|
||||||
d = cls._check_if_reflector_has_stream(lbry_file, reflector_server)
|
|
||||||
d.addCallback(lambda send_stream: cls._reflect_if_unavailable(send_stream, lbry_file, reflector_server))
|
|
||||||
return d
|
|
Loading…
Reference in a new issue