forked from LBRYCommunity/lbry-sdk
reflect lbry_files in looping call in EncryptedFileManager
-remove ManagedEncryptedFileDownloader._reupload -clean up reflector functions in Daemon, move to reflector.reupload -check ConnectionLost in reflector client -close sd blob file handle when it wont be sent (otherwise read handle stays open) -log reflector sd info -give reflector client factory a lbry file
This commit is contained in:
parent
d137528f67
commit
7720724ec0
7 changed files with 127 additions and 113 deletions
|
@ -1,7 +1,6 @@
|
|||
"""
|
||||
Download LBRY Files from LBRYnet and save them to disk.
|
||||
"""
|
||||
import random
|
||||
import logging
|
||||
|
||||
from zope.interface import implements
|
||||
|
@ -14,8 +13,6 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileDownloa
|
|||
from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport
|
||||
from lbrynet.interfaces import IStreamDownloaderFactory
|
||||
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
|
||||
from lbrynet.reflector import reupload
|
||||
from lbrynet import conf
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -71,14 +68,6 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
else:
|
||||
raise Exception("Unknown status for stream %s: %s", self.stream_hash, status)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _reupload(self):
|
||||
if not conf.settings['reflector_reupload']:
|
||||
defer.returnValue(None)
|
||||
else:
|
||||
reflector_server = random.choice(conf.settings['reflector_servers'])
|
||||
yield reupload.check_and_restore_availability(self, reflector_server)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self, err=None, change_status=True):
|
||||
log.debug('Stopping download for %s', self.sd_hash)
|
||||
|
|
|
@ -9,6 +9,7 @@ from twisted.enterprise import adbapi
|
|||
from twisted.internet import defer, task, reactor
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from lbrynet.reflector.reupload import reflect_stream
|
||||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
||||
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||
|
@ -21,6 +22,16 @@ from lbrynet.core.sqlite_helpers import rerun_if_locked
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def safe_start_looping_call(looping_call, seconds=3600):
|
||||
if not looping_call.running:
|
||||
looping_call.start(seconds)
|
||||
|
||||
|
||||
def safe_stop_looping_call(looping_call):
|
||||
if looping_call.running:
|
||||
looping_call.stop()
|
||||
|
||||
|
||||
class EncryptedFileManager(object):
|
||||
"""Keeps track of currently opened LBRY Files, their options, and
|
||||
their LBRY File specific metadata.
|
||||
|
@ -37,6 +48,7 @@ class EncryptedFileManager(object):
|
|||
self.download_directory = download_directory
|
||||
else:
|
||||
self.download_directory = os.getcwd()
|
||||
self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
|
||||
log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -44,6 +56,7 @@ class EncryptedFileManager(object):
|
|||
yield self._open_db()
|
||||
yield self._add_to_sd_identifier()
|
||||
yield self._start_lbry_files()
|
||||
safe_start_looping_call(self.lbry_file_reflector)
|
||||
|
||||
def get_lbry_file_status(self, lbry_file):
|
||||
return self._get_lbry_file_status(lbry_file.rowid)
|
||||
|
@ -97,8 +110,7 @@ class EncryptedFileManager(object):
|
|||
|
||||
stream_hashes = yield self.stream_info_manager.get_all_streams()
|
||||
log.debug("Checking %s streams", len(stream_hashes))
|
||||
check_streams = yield defer.DeferredList(list(_iter_streams(stream_hashes)))
|
||||
defer.returnValue(check_streams)
|
||||
yield defer.DeferredList(list(_iter_streams(stream_hashes)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _restore_lbry_file(self, lbry_file):
|
||||
|
@ -137,8 +149,8 @@ class EncryptedFileManager(object):
|
|||
self, payment_rate_manager, self.session.wallet,
|
||||
download_directory, upload_allowed,
|
||||
file_name=file_name)
|
||||
self.lbry_files.append(lbry_file)
|
||||
yield lbry_file.set_stream_info()
|
||||
self.lbry_files.append(lbry_file)
|
||||
defer.returnValue(lbry_file)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -207,8 +219,17 @@ class EncryptedFileManager(object):
|
|||
else:
|
||||
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
|
||||
|
||||
def _reflect_lbry_files(self):
|
||||
for lbry_file in self.lbry_files:
|
||||
yield reflect_stream(lbry_file)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def reflect_lbry_files(self):
|
||||
yield defer.DeferredList(list(self._reflect_lbry_files()))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self):
|
||||
safe_stop_looping_call(self.lbry_file_reflector)
|
||||
yield defer.DeferredList(list(self._stop_lbry_files()))
|
||||
yield self.sql_db.close()
|
||||
self.sql_db = None
|
||||
|
|
|
@ -2,7 +2,6 @@ import binascii
|
|||
import logging.handlers
|
||||
import mimetypes
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import base58
|
||||
import requests
|
||||
|
@ -20,8 +19,10 @@ from twisted.python.failure import Failure
|
|||
# TODO: importing this when internet is disabled raises a socket.gaierror
|
||||
from lbryum.version import LBRYUM_VERSION as lbryum_version
|
||||
from lbrynet import __version__ as lbrynet_version
|
||||
from lbrynet import conf, reflector, analytics
|
||||
from lbrynet import conf, analytics
|
||||
from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET
|
||||
from lbrynet.reflector import reupload
|
||||
from lbrynet.reflector import ServerFactory as reflector_server_factory
|
||||
from lbrynet.metadata.Fee import FeeValidator
|
||||
from lbrynet.metadata.Metadata import verify_name_characters
|
||||
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory
|
||||
|
@ -437,13 +438,13 @@ class Daemon(AuthJSONRPCServer):
|
|||
if self.run_reflector_server:
|
||||
log.info("Starting reflector server")
|
||||
if self.reflector_port is not None:
|
||||
reflector_factory = reflector.ServerFactory(
|
||||
reflector_factory = reflector_server_factory(
|
||||
self.session.peer_manager,
|
||||
self.session.blob_manager
|
||||
)
|
||||
try:
|
||||
self.reflector_server_port = reactor.listenTCP(
|
||||
self.reflector_port, reflector_factory)
|
||||
self.reflector_server_port = reactor.listenTCP(self.reflector_port,
|
||||
reflector_factory)
|
||||
log.info('Started reflector on port %s', self.reflector_port)
|
||||
except error.CannotListenError as e:
|
||||
log.exception("Couldn't bind reflector to port %d", self.reflector_port)
|
||||
|
@ -785,7 +786,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
claim_out = yield publisher.update_stream(name, bid, metadata)
|
||||
else:
|
||||
claim_out = yield publisher.publish_stream(name, file_path, bid, metadata)
|
||||
yield threads.deferToThread(self._reflect, publisher.lbry_file)
|
||||
yield threads.deferToThread(reupload.reflect_stream, publisher.lbry_file)
|
||||
|
||||
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'],
|
||||
claim_out['nout'])
|
||||
|
@ -991,29 +992,6 @@ class Daemon(AuthJSONRPCServer):
|
|||
])
|
||||
return d
|
||||
|
||||
def _reflect(self, lbry_file):
|
||||
if not lbry_file:
|
||||
return defer.fail(Exception("no lbry file given to reflect"))
|
||||
if lbry_file.stream_hash is None:
|
||||
return defer.fail(Exception("no stream hash"))
|
||||
factory = reflector.ClientFactory(
|
||||
self.session.blob_manager,
|
||||
self.lbry_file_manager.stream_info_manager,
|
||||
lbry_file.stream_hash,
|
||||
lbry_file.uri
|
||||
)
|
||||
return run_reflector_factory(factory)
|
||||
|
||||
def _reflect_blobs(self, blob_hashes):
|
||||
if not blob_hashes:
|
||||
return defer.fail(Exception("no lbry file given to reflect"))
|
||||
log.info("Reflecting %i blobs" % len(blob_hashes))
|
||||
factory = reflector.BlobClientFactory(
|
||||
self.session.blob_manager,
|
||||
blob_hashes
|
||||
)
|
||||
return run_reflector_factory(factory)
|
||||
|
||||
############################################################################
|
||||
# #
|
||||
# JSON-RPC API methods start here #
|
||||
|
@ -2239,7 +2217,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
"""
|
||||
|
||||
d = self.session.blob_manager.get_all_verified_blobs()
|
||||
d.addCallback(self._reflect_blobs)
|
||||
d.addCallback(reupload.reflect_blob_hashes, self.session.blob_manager)
|
||||
d.addCallback(lambda r: self._render_response(r))
|
||||
return d
|
||||
|
||||
|
@ -2659,13 +2637,3 @@ def get_lbry_file_search_value(search_fields):
|
|||
if value:
|
||||
return searchtype, value
|
||||
raise NoValidSearch('{} is missing a valid search type'.format(search_fields))
|
||||
|
||||
|
||||
def run_reflector_factory(factory):
|
||||
reflector_server = random.choice(conf.settings['reflector_servers'])
|
||||
reflector_address, reflector_port = reflector_server
|
||||
log.info("Start reflector client")
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
return d
|
||||
|
|
|
@ -16,7 +16,6 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
# Protocol stuff
|
||||
def connectionMade(self):
|
||||
log.debug("Connected to reflector")
|
||||
self.blob_manager = self.factory.blob_manager
|
||||
self.response_buff = ''
|
||||
self.outgoing_buff = ''
|
||||
self.blob_hashes_to_send = []
|
||||
|
@ -25,8 +24,6 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
self.read_handle = None
|
||||
self.sent_stream_info = False
|
||||
self.received_descriptor_response = False
|
||||
self.protocol_version = self.factory.protocol_version
|
||||
self.lbry_uri = "lbry://%s" % self.factory.lbry_uri
|
||||
self.received_server_version = False
|
||||
self.server_version = None
|
||||
self.stream_descriptor = None
|
||||
|
@ -41,6 +38,26 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
d.addErrback(
|
||||
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
|
||||
|
||||
@property
|
||||
def lbry_uri(self):
|
||||
return "lbry://%s" % self.factory.lbry_uri
|
||||
|
||||
@property
|
||||
def blob_manager(self):
|
||||
return self.factory.blob_manager
|
||||
|
||||
@property
|
||||
def protocol_version(self):
|
||||
return self.factory.protocol_version
|
||||
|
||||
@property
|
||||
def stream_info_manager(self):
|
||||
return self.factory.stream_info_manager
|
||||
|
||||
@property
|
||||
def stream_hash(self):
|
||||
return self.factory.stream_hash
|
||||
|
||||
def dataReceived(self, data):
|
||||
self.response_buff += data
|
||||
try:
|
||||
|
@ -56,16 +73,22 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
def connectionLost(self, reason):
|
||||
if reason.check(error.ConnectionDone):
|
||||
if not self.needed_blobs:
|
||||
log.info("Reflector has all blobs for %s", self.lbry_uri)
|
||||
log.info("Reflector has all blobs for %s (%s)",
|
||||
self.lbry_uri, self.stream_descriptor)
|
||||
elif not self.reflected_blobs:
|
||||
log.info("No more completed blobs for %s to reflect, %i are still needed",
|
||||
self.lbry_uri, len(self.needed_blobs))
|
||||
log.info("No more completed blobs for %s (%s) to reflect, %i are still needed",
|
||||
self.lbry_uri, self.stream_descriptor, len(self.needed_blobs))
|
||||
else:
|
||||
log.info('Finished sending reflector %i blobs for %s',
|
||||
len(self.reflected_blobs), self.lbry_uri)
|
||||
log.info('Finished sending reflector %i blobs for %s (%s)',
|
||||
len(self.reflected_blobs), self.lbry_uri, self.stream_descriptor)
|
||||
self.factory.finished_deferred.callback(self.reflected_blobs)
|
||||
elif reason.check(error.ConnectionLost):
|
||||
log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.lbry_uri,
|
||||
self.stream_descriptor, len(self.reflected_blobs))
|
||||
self.factory.finished_deferred.callback(self.reflected_blobs)
|
||||
else:
|
||||
log.info('Reflector finished for %s: %s', self.lbry_uri, reason)
|
||||
log.info('Reflector finished for %s (%s): %s', self.lbry_uri, self.stream_descriptor,
|
||||
reason)
|
||||
self.factory.finished_deferred.callback(reason)
|
||||
|
||||
# IConsumer stuff
|
||||
|
@ -118,6 +141,7 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
[blob for blob in filtered if blob.blob_hash in self.needed_blobs])
|
||||
d.addCallback(_show_missing_blobs)
|
||||
d.addCallback(self.set_blobs_to_send)
|
||||
d.addCallback(lambda _: None if self.descriptor_needed else self.set_not_uploading())
|
||||
return d
|
||||
|
||||
def send_request(self, request_dict):
|
||||
|
@ -158,8 +182,9 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
self.next_blob_to_send.close_read_handle(self.read_handle)
|
||||
self.read_handle = None
|
||||
self.next_blob_to_send = None
|
||||
self.file_sender.stopProducing()
|
||||
self.file_sender = None
|
||||
if self.file_sender is not None:
|
||||
self.file_sender.stopProducing()
|
||||
self.file_sender = None
|
||||
return defer.succeed(None)
|
||||
|
||||
def start_transfer(self):
|
||||
|
@ -292,15 +317,31 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
class EncryptedFileReflectorClientFactory(ClientFactory):
|
||||
protocol = EncryptedFileReflectorClient
|
||||
|
||||
def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri):
|
||||
self.protocol_version = REFLECTOR_V2
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.stream_hash = stream_hash
|
||||
self.lbry_uri = lbry_uri
|
||||
def __init__(self, lbry_file):
|
||||
self._lbry_file = lbry_file
|
||||
self.p = None
|
||||
self.finished_deferred = defer.Deferred()
|
||||
|
||||
@property
|
||||
def blob_manager(self):
|
||||
return self._lbry_file.blob_manager
|
||||
|
||||
@property
|
||||
def stream_info_manager(self):
|
||||
return self._lbry_file.stream_info_manager
|
||||
|
||||
@property
|
||||
def stream_hash(self):
|
||||
return self._lbry_file.stream_hash
|
||||
|
||||
@property
|
||||
def lbry_uri(self):
|
||||
return self._lbry_file.uri
|
||||
|
||||
@property
|
||||
def protocol_version(self):
|
||||
return REFLECTOR_V2
|
||||
|
||||
def buildProtocol(self, addr):
|
||||
p = self.protocol()
|
||||
p.factory = self
|
||||
|
|
|
@ -1,45 +1,33 @@
|
|||
import logging
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet.error import ConnectionLost, ConnectionDone
|
||||
from lbrynet.reflector import BlobClientFactory, ClientFactory
|
||||
import random
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _check_if_reflector_has_stream(lbry_file, reflector_server):
|
||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||
factory = BlobClientFactory(
|
||||
lbry_file.blob_manager,
|
||||
[lbry_file.sd_hash]
|
||||
)
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
d.addCallback(lambda _: not factory.sent_blobs)
|
||||
return d
|
||||
from twisted.internet import reactor, defer
|
||||
from lbrynet import conf
|
||||
from lbrynet.reflector import ClientFactory, BlobClientFactory
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _reflect_stream(lbry_file, reflector_server):
|
||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||
factory = ClientFactory(
|
||||
lbry_file.blob_manager,
|
||||
lbry_file.stream_info_manager,
|
||||
lbry_file.stream_hash,
|
||||
lbry_file.uri
|
||||
)
|
||||
d = reactor.resolve(reflector_address)
|
||||
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
|
||||
d.addCallback(lambda _: factory.finished_deferred)
|
||||
return d
|
||||
factory = ClientFactory(lbry_file)
|
||||
ip = yield reactor.resolve(reflector_address)
|
||||
yield reactor.connectTCP(ip, reflector_port, factory)
|
||||
yield factory.finished_deferred
|
||||
|
||||
|
||||
def _catch_error(err, uri):
|
||||
msg = "An error occurred while checking availability for lbry://%s: %s"
|
||||
log.error(msg, uri, err.getTraceback())
|
||||
@defer.inlineCallbacks
|
||||
def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
|
||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||
factory = BlobClientFactory(blob_manager, blob_hashes)
|
||||
ip = yield reactor.resolve(reflector_address)
|
||||
yield reactor.connectTCP(ip, reflector_port, factory)
|
||||
yield factory.finished_deferred
|
||||
|
||||
|
||||
def check_and_restore_availability(lbry_file, reflector_server):
|
||||
d = _reflect_stream(lbry_file, reflector_server)
|
||||
d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost))
|
||||
d.addErrback(_catch_error, lbry_file.uri)
|
||||
return d
|
||||
def reflect_stream(lbry_file):
|
||||
reflector_server = random.choice(conf.settings['reflector_servers'])
|
||||
return _reflect_stream(lbry_file, reflector_server)
|
||||
|
||||
|
||||
def reflect_blob_hashes(blob_hashes, blob_manager):
|
||||
reflector_server = random.choice(conf.settings['reflector_servers'])
|
||||
return _reflect_blobs(blob_manager, blob_hashes, reflector_server)
|
||||
|
|
|
@ -170,12 +170,10 @@ class TestReflector(unittest.TestCase):
|
|||
return d
|
||||
|
||||
def send_to_server():
|
||||
factory = reflector.ClientFactory(
|
||||
self.session.blob_manager,
|
||||
self.stream_info_manager,
|
||||
self.stream_hash,
|
||||
"fake_uri"
|
||||
)
|
||||
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
|
||||
self.stream_info_manager,
|
||||
self.stream_hash)
|
||||
factory = reflector.ClientFactory(fake_lbry_file)
|
||||
|
||||
from twisted.internet import reactor
|
||||
reactor.connectTCP('localhost', self.port, factory)
|
||||
|
|
|
@ -10,6 +10,15 @@ from lbrynet import conf
|
|||
|
||||
KB = 2**10
|
||||
|
||||
|
||||
class FakeLBRYFile(object):
|
||||
def __init__(self, blob_manager, stream_info_manager, stream_hash, uri="fake_uri"):
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.stream_hash = stream_hash
|
||||
self.uri = "fake_uri"
|
||||
|
||||
|
||||
class Node(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
|
Loading…
Reference in a new issue