Merge pull request #201 from lbryio/reflect-my-unavailable-streams
Upload unavailable streams to reflector on startup
This commit is contained in:
commit
0f4fee475f
4 changed files with 65 additions and 7 deletions
|
@ -1,16 +1,20 @@
|
|||
"""
|
||||
Download LBRY Files from LBRYnet and save them to disk.
|
||||
"""
|
||||
import random
|
||||
import logging
|
||||
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
|
||||
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
|
||||
from lbrynet.core.StreamDescriptor import StreamMetadata
|
||||
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver, EncryptedFileDownloader
|
||||
from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport
|
||||
from lbrynet.interfaces import IStreamDownloaderFactory
|
||||
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
|
||||
from twisted.internet import defer
|
||||
import logging
|
||||
from lbrynet.reflector import reupload
|
||||
from lbrynet.conf import settings
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -63,8 +67,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid))
|
||||
return d
|
||||
|
||||
reflector_server = random.choice(settings.reflector_servers)
|
||||
|
||||
d.addCallback(_save_sd_hash)
|
||||
d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None)
|
||||
d.addCallback(lambda _: reupload.check_and_restore_availability(self, reflector_server))
|
||||
d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self))
|
||||
|
||||
def restore_status(status):
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory
|
||||
from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory
|
||||
from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory
|
||||
from lbrynet.reflector import reupload
|
|
@ -126,7 +126,7 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
|
||||
def set_blobs(blob_hashes):
|
||||
for blob_hash, position, iv, length in blob_hashes:
|
||||
log.info("Preparing to send %s", blob_hash)
|
||||
log.debug("Preparing to send %s", blob_hash)
|
||||
if blob_hash is not None:
|
||||
self.blob_hashes_to_send.append(blob_hash)
|
||||
|
||||
|
@ -209,7 +209,7 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
|
||||
|
||||
def send_blob_info(self):
|
||||
log.info("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
|
||||
log.debug('sending blob info')
|
||||
self.write(json.dumps({
|
||||
|
@ -284,6 +284,7 @@ class BlobReflectorClient(Protocol):
|
|||
self.file_sender = None
|
||||
self.producer = None
|
||||
self.streaming = False
|
||||
self.sent_blobs = False
|
||||
d = self.send_handshake()
|
||||
d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
|
||||
|
@ -302,10 +303,12 @@ class BlobReflectorClient(Protocol):
|
|||
|
||||
def connectionLost(self, reason):
|
||||
if reason.check(error.ConnectionDone):
|
||||
log.debug('Finished sending data via reflector')
|
||||
self.factory.sent_blobs = self.sent_blobs
|
||||
if self.factory.sent_blobs:
|
||||
log.info('Finished sending data via reflector')
|
||||
self.factory.finished_deferred.callback(True)
|
||||
else:
|
||||
log.debug('reflector finished: %s', reason)
|
||||
log.info('Reflector finished: %s', reason)
|
||||
self.factory.finished_deferred.callback(reason)
|
||||
|
||||
# IConsumer stuff
|
||||
|
@ -355,6 +358,7 @@ class BlobReflectorClient(Protocol):
|
|||
return defer.succeed(None)
|
||||
|
||||
def start_transfer(self):
|
||||
self.sent_blobs = True
|
||||
self.write(json.dumps({}))
|
||||
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
|
||||
d = self.file_sender.beginFileTransfer(self.read_handle, self)
|
||||
|
@ -395,7 +399,7 @@ class BlobReflectorClient(Protocol):
|
|||
raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))
|
||||
|
||||
def send_blob_info(self):
|
||||
log.info("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
|
||||
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
|
||||
log.debug('sending blob info')
|
||||
self.write(json.dumps({
|
||||
|
@ -431,6 +435,7 @@ class BlobReflectorClientFactory(ClientFactory):
|
|||
self.blob_manager = blob_manager
|
||||
self.blobs = blobs
|
||||
self.p = None
|
||||
self.sent_blobs = False
|
||||
self.finished_deferred = defer.Deferred()
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
|
|
45
lbrynet/reflector/reupload.py
Normal file
45
lbrynet/reflector/reupload.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
import logging
|
||||
from twisted.internet import reactor, defer
|
||||
from lbrynet.reflector import BlobClientFactory, ClientFactory
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _check_if_reflector_has_stream(lbry_file, reflector_server):
|
||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||
factory = BlobClientFactory(
|
||||
lbry_file.blob_manager,
|
||||
[lbry_file.sd_hash]
|
||||
)
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
d.addCallback(lambda _: not factory.sent_blobs)
|
||||
return d
|
||||
|
||||
|
||||
def _reflect_stream(lbry_file, reflector_server):
|
||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||
factory = ClientFactory(
|
||||
lbry_file.blob_manager,
|
||||
lbry_file.stream_info_manager,
|
||||
lbry_file.stream_hash
|
||||
)
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
return d
|
||||
|
||||
|
||||
def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server):
|
||||
if reflector_has_stream:
|
||||
log.info("lbry://%s is available", lbry_file.uri)
|
||||
return defer.succeed(False)
|
||||
log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri)
|
||||
return _reflect_stream(lbry_file, reflector_server)
|
||||
|
||||
|
||||
def check_and_restore_availability(lbry_file, reflector_server):
|
||||
d = _check_if_reflector_has_stream(lbry_file, reflector_server)
|
||||
d.addCallback(lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server))
|
||||
return d
|
Loading…
Add table
Reference in a new issue