move availability check and fix from ManagedEncryptedFileDownloader to ReflectorAvailabilityHelper

This commit is contained in:
Jack 2016-10-26 16:17:01 -04:00
parent 3dd99fdc92
commit 369cd516c0
4 changed files with 58 additions and 38 deletions

View file

@ -13,10 +13,9 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver,
from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbryfile.StreamDescriptor import save_sd_info from lbrynet.lbryfile.StreamDescriptor import save_sd_info
from lbrynet.reflector import BlobClientFactory, ClientFactory from lbrynet.reflector import BlobClientFactory, ClientFactory, ReflectorAvailabilityHelper
from lbrynet.conf import REFLECTOR_SERVERS from lbrynet.conf import REFLECTOR_SERVERS
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -68,41 +67,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid)) d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid))
return d return d
def _check_file_availability():
reflector_server = random.choice(REFLECTOR_SERVERS)
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = BlobClientFactory(
self.blob_manager,
[self.sd_hash]
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
d.addCallback(lambda _: _reflect_if_unavailable(factory.sent_blobs))
return d
def _reflect_if_unavailable(sent_blobs):
if not sent_blobs:
log.info("lbry://%s is available", self.uri)
return defer.succeed(True)
if self.stream_hash is None:
return defer.fail(Exception("no stream hash"))
log.info("Reflecting previously unavailable stream: %s" % self.stream_hash)
reflector_server = random.choice(REFLECTOR_SERVERS)
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = ClientFactory(
self.blob_manager,
self.stream_info_manager,
self.stream_hash
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d
d.addCallback(_save_sd_hash) d.addCallback(_save_sd_hash)
d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None) d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None)
d.addCallback(lambda _: _check_file_availability())
d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self))
def restore_status(status): def restore_status(status):
@ -115,6 +83,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
return defer.succeed(True) return defer.succeed(True)
d.addCallback(restore_status) d.addCallback(restore_status)
reflector_server = random.choice(REFLECTOR_SERVERS)
d.addCallback(lambda _: ReflectorAvailabilityHelper.check_and_restore_availability(self, reflector_server))
return d return d
def stop(self, err=None, change_status=True): def stop(self, err=None, change_status=True):

View file

@ -1,3 +1,4 @@
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory
from lbrynet.reflector.util import ReflectorAvailabilityHelper

View file

@ -126,7 +126,7 @@ class EncryptedFileReflectorClient(Protocol):
def set_blobs(blob_hashes): def set_blobs(blob_hashes):
for blob_hash, position, iv, length in blob_hashes: for blob_hash, position, iv, length in blob_hashes:
log.info("Preparing to send %s", blob_hash) log.debug("Preparing to send %s", blob_hash)
if blob_hash is not None: if blob_hash is not None:
self.blob_hashes_to_send.append(blob_hash) self.blob_hashes_to_send.append(blob_hash)
@ -209,7 +209,7 @@ class EncryptedFileReflectorClient(Protocol):
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
def send_blob_info(self): def send_blob_info(self):
log.info("Send blob info for %s", self.next_blob_to_send.blob_hash) log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
log.debug('sending blob info') log.debug('sending blob info')
self.write(json.dumps({ self.write(json.dumps({
@ -284,7 +284,7 @@ class BlobReflectorClient(Protocol):
self.file_sender = None self.file_sender = None
self.producer = None self.producer = None
self.streaming = False self.streaming = False
self.blobs_where_sent = False self.sent_blobs = False
d = self.send_handshake() d = self.send_handshake()
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
@ -303,7 +303,7 @@ class BlobReflectorClient(Protocol):
def connectionLost(self, reason): def connectionLost(self, reason):
if reason.check(error.ConnectionDone): if reason.check(error.ConnectionDone):
self.factory.sent_blobs = self.blobs_where_sent self.factory.sent_blobs = self.sent_blobs
if self.factory.sent_blobs: if self.factory.sent_blobs:
log.info('Finished sending data via reflector') log.info('Finished sending data via reflector')
self.factory.finished_deferred.callback(True) self.factory.finished_deferred.callback(True)
@ -358,6 +358,7 @@ class BlobReflectorClient(Protocol):
return defer.succeed(None) return defer.succeed(None)
def start_transfer(self): def start_transfer(self):
self.sent_blobs = True
self.write(json.dumps({})) self.write(json.dumps({}))
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self) d = self.file_sender.beginFileTransfer(self.read_handle, self)
@ -377,7 +378,6 @@ class BlobReflectorClient(Protocol):
if 'send_blob' not in response_dict: if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!") raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True: if response_dict['send_blob'] is True:
self.blobs_where_sent = True
self.file_sender = FileSender() self.file_sender = FileSender()
return defer.succeed(True) return defer.succeed(True)
else: else:

47
lbrynet/reflector/util.py Normal file
View file

@ -0,0 +1,47 @@
import logging
from twisted.internet import reactor, defer
from lbrynet.reflector import BlobClientFactory, ClientFactory
log = logging.getLogger(__name__)
class ReflectorAvailabilityHelper(object):
@staticmethod
def _check_if_reflector_has_stream(lbry_file, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = BlobClientFactory(
lbry_file.blob_manager,
[lbry_file.sd_hash]
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
d.addCallback(lambda _: not factory.sent_blobs)
return d
@staticmethod
def _reflect_stream(lbry_file, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = ClientFactory(
lbry_file.blob_manager,
lbry_file.stream_info_manager,
lbry_file.stream_hash
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d
@classmethod
def _reflect_if_unavailable(cls, reflector_has_stream, lbry_file, reflector_server):
if reflector_has_stream:
log.info("lbry://%s is available", lbry_file.uri)
return defer.succeed(False)
log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri)
return cls._reflect_stream(lbry_file, reflector_server)
@classmethod
def check_and_restore_availability(cls, lbry_file, reflector_server):
d = cls._check_if_reflector_has_stream(lbry_file, reflector_server)
d.addCallback(lambda send_stream: cls._reflect_if_unavailable(send_stream, lbry_file, reflector_server))
return d