From 7720724ec00cc8ffc3ce3903e6c9a853c5bd967b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 10 Feb 2017 10:56:22 -0500 Subject: [PATCH] reflect lbry_files in looping call in EncryptedFileManager -remove ManagedEncryptedFileDownloader._reupload -clean up reflector functions in Daemon, move to reflector.reupload -check ConnectionLost in reflector client -close sd blob file handle when it wont be sent (otherwise read handle stays open) -log reflector sd info -give reflector client factory a lbry file --- .../EncryptedFileDownloader.py | 11 --- .../lbryfilemanager/EncryptedFileManager.py | 27 ++++++- lbrynet/lbrynet_daemon/Daemon.py | 48 ++---------- lbrynet/reflector/client/client.py | 75 ++++++++++++++----- lbrynet/reflector/reupload.py | 60 ++++++--------- tests/functional/test_reflector.py | 10 +-- tests/mocks.py | 9 +++ 7 files changed, 127 insertions(+), 113 deletions(-) diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 4c5f1938d..83676c1b3 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -1,7 +1,6 @@ """ Download LBRY Files from LBRYnet and save them to disk. """ -import random import logging from zope.interface import implements @@ -14,8 +13,6 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileDownloa from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import save_sd_info -from lbrynet.reflector import reupload -from lbrynet import conf log = logging.getLogger(__name__) @@ -71,14 +68,6 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): else: raise Exception("Unknown status for stream %s: %s", self.stream_hash, status) - @defer.inlineCallbacks - def _reupload(self): - if not conf.settings['reflector_reupload']: - defer.returnValue(None) - else: - reflector_server = random.choice(conf.settings['reflector_servers']) - yield reupload.check_and_restore_availability(self, reflector_server) - @defer.inlineCallbacks def stop(self, err=None, change_status=True): log.debug('Stopping download for %s', self.sd_hash) diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index a34d8c760..1813231d2 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -9,6 +9,7 @@ from twisted.enterprise import adbapi from twisted.internet import defer, task, reactor from twisted.python.failure import Failure +from lbrynet.reflector.reupload import reflect_stream from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -21,6 +22,16 @@ from lbrynet.core.sqlite_helpers import rerun_if_locked log = logging.getLogger(__name__) +def safe_start_looping_call(looping_call, seconds=3600): + if not looping_call.running: + looping_call.start(seconds) + + +def safe_stop_looping_call(looping_call): + if looping_call.running: + looping_call.stop() + + class EncryptedFileManager(object): """Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata. @@ -37,6 +48,7 @@ class EncryptedFileManager(object): self.download_directory = download_directory else: self.download_directory = os.getcwd() + self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory)) @defer.inlineCallbacks @@ -44,6 +56,7 @@ class EncryptedFileManager(object): yield self._open_db() yield self._add_to_sd_identifier() yield self._start_lbry_files() + safe_start_looping_call(self.lbry_file_reflector) def get_lbry_file_status(self, lbry_file): return self._get_lbry_file_status(lbry_file.rowid) @@ -97,8 +110,7 @@ class EncryptedFileManager(object): stream_hashes = yield self.stream_info_manager.get_all_streams() log.debug("Checking %s streams", len(stream_hashes)) - check_streams = yield defer.DeferredList(list(_iter_streams(stream_hashes))) - defer.returnValue(check_streams) + yield defer.DeferredList(list(_iter_streams(stream_hashes))) @defer.inlineCallbacks def _restore_lbry_file(self, lbry_file): @@ -137,8 +149,8 @@ class EncryptedFileManager(object): self, payment_rate_manager, self.session.wallet, download_directory, upload_allowed, file_name=file_name) - self.lbry_files.append(lbry_file) yield lbry_file.set_stream_info() + self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @defer.inlineCallbacks @@ -207,8 +219,17 @@ class EncryptedFileManager(object): else: return defer.fail(Failure(ValueError("Could not find that LBRY file"))) + def _reflect_lbry_files(self): + for lbry_file in self.lbry_files: + yield reflect_stream(lbry_file) + + @defer.inlineCallbacks + def reflect_lbry_files(self): + yield defer.DeferredList(list(self._reflect_lbry_files())) + @defer.inlineCallbacks def stop(self): + safe_stop_looping_call(self.lbry_file_reflector) yield defer.DeferredList(list(self._stop_lbry_files())) yield self.sql_db.close() self.sql_db = None diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 483f51935..3dad9195c 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -2,7 +2,6 @@ import binascii import logging.handlers import mimetypes import os -import random import re import base58 import requests @@ -20,8 +19,10 @@ from twisted.python.failure import Failure # TODO: importing this when internet is disabled raises a socket.gaierror from lbryum.version import LBRYUM_VERSION as lbryum_version from lbrynet import __version__ as lbrynet_version -from lbrynet import conf, reflector, analytics +from lbrynet import conf, analytics from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET +from lbrynet.reflector import reupload +from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.metadata.Fee import FeeValidator from lbrynet.metadata.Metadata import verify_name_characters from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory @@ -437,13 +438,13 @@ class Daemon(AuthJSONRPCServer): if self.run_reflector_server: log.info("Starting reflector server") if self.reflector_port is not None: - reflector_factory = reflector.ServerFactory( + reflector_factory = reflector_server_factory( self.session.peer_manager, self.session.blob_manager ) try: - self.reflector_server_port = reactor.listenTCP( - self.reflector_port, reflector_factory) + self.reflector_server_port = reactor.listenTCP(self.reflector_port, + reflector_factory) log.info('Started reflector on port %s', self.reflector_port) except error.CannotListenError as e: log.exception("Couldn't bind reflector to port %d", self.reflector_port) @@ -785,7 +786,7 @@ class Daemon(AuthJSONRPCServer): claim_out = yield publisher.update_stream(name, bid, metadata) else: claim_out = yield publisher.publish_stream(name, file_path, bid, metadata) - yield threads.deferToThread(self._reflect, publisher.lbry_file) + yield threads.deferToThread(reupload.reflect_stream, publisher.lbry_file) log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], claim_out['nout']) @@ -991,29 +992,6 @@ class Daemon(AuthJSONRPCServer): ]) return d - def _reflect(self, lbry_file): - if not lbry_file: - return defer.fail(Exception("no lbry file given to reflect")) - if lbry_file.stream_hash is None: - return defer.fail(Exception("no stream hash")) - factory = reflector.ClientFactory( - self.session.blob_manager, - self.lbry_file_manager.stream_info_manager, - lbry_file.stream_hash, - lbry_file.uri - ) - return run_reflector_factory(factory) - - def _reflect_blobs(self, blob_hashes): - if not blob_hashes: - return defer.fail(Exception("no lbry file given to reflect")) - log.info("Reflecting %i blobs" % len(blob_hashes)) - factory = reflector.BlobClientFactory( - self.session.blob_manager, - blob_hashes - ) - return run_reflector_factory(factory) - ############################################################################ # # # JSON-RPC API methods start here # @@ -2239,7 +2217,7 @@ class Daemon(AuthJSONRPCServer): """ d = self.session.blob_manager.get_all_verified_blobs() - d.addCallback(self._reflect_blobs) + d.addCallback(reupload.reflect_blob_hashes, self.session.blob_manager) d.addCallback(lambda r: self._render_response(r)) return d @@ -2659,13 +2637,3 @@ def get_lbry_file_search_value(search_fields): if value: return searchtype, value raise NoValidSearch('{} is missing a valid search type'.format(search_fields)) - - -def run_reflector_factory(factory): - reflector_server = random.choice(conf.settings['reflector_servers']) - reflector_address, reflector_port = reflector_server - log.info("Start reflector client") - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index bc9b2e728..b67ca1b1a 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -16,7 +16,6 @@ class EncryptedFileReflectorClient(Protocol): # Protocol stuff def connectionMade(self): log.debug("Connected to reflector") - self.blob_manager = self.factory.blob_manager self.response_buff = '' self.outgoing_buff = '' self.blob_hashes_to_send = [] @@ -25,8 +24,6 @@ class EncryptedFileReflectorClient(Protocol): self.read_handle = None self.sent_stream_info = False self.received_descriptor_response = False - self.protocol_version = self.factory.protocol_version - self.lbry_uri = "lbry://%s" % self.factory.lbry_uri self.received_server_version = False self.server_version = None self.stream_descriptor = None @@ -41,6 +38,26 @@ class EncryptedFileReflectorClient(Protocol): d.addErrback( lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) + @property + def lbry_uri(self): + return "lbry://%s" % self.factory.lbry_uri + + @property + def blob_manager(self): + return self.factory.blob_manager + + @property + def protocol_version(self): + return self.factory.protocol_version + + @property + def stream_info_manager(self): + return self.factory.stream_info_manager + + @property + def stream_hash(self): + return self.factory.stream_hash + def dataReceived(self, data): self.response_buff += data try: @@ -56,16 +73,22 @@ class EncryptedFileReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): if not self.needed_blobs: - log.info("Reflector has all blobs for %s", self.lbry_uri) + log.info("Reflector has all blobs for %s (%s)", + self.lbry_uri, self.stream_descriptor) elif not self.reflected_blobs: - log.info("No more completed blobs for %s to reflect, %i are still needed", - self.lbry_uri, len(self.needed_blobs)) + log.info("No more completed blobs for %s (%s) to reflect, %i are still needed", + self.lbry_uri, self.stream_descriptor, len(self.needed_blobs)) else: - log.info('Finished sending reflector %i blobs for %s', - len(self.reflected_blobs), self.lbry_uri) + log.info('Finished sending reflector %i blobs for %s (%s)', + len(self.reflected_blobs), self.lbry_uri, self.stream_descriptor) + self.factory.finished_deferred.callback(self.reflected_blobs) + elif reason.check(error.ConnectionLost): + log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.lbry_uri, + self.stream_descriptor, len(self.reflected_blobs)) self.factory.finished_deferred.callback(self.reflected_blobs) else: - log.info('Reflector finished for %s: %s', self.lbry_uri, reason) + log.info('Reflector finished for %s (%s): %s', self.lbry_uri, self.stream_descriptor, + reason) self.factory.finished_deferred.callback(reason) # IConsumer stuff @@ -118,6 +141,7 @@ class EncryptedFileReflectorClient(Protocol): [blob for blob in filtered if blob.blob_hash in self.needed_blobs]) d.addCallback(_show_missing_blobs) d.addCallback(self.set_blobs_to_send) + d.addCallback(lambda _: None if self.descriptor_needed else self.set_not_uploading()) return d def send_request(self, request_dict): @@ -158,8 +182,9 @@ class EncryptedFileReflectorClient(Protocol): self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle = None self.next_blob_to_send = None - self.file_sender.stopProducing() - self.file_sender = None + if self.file_sender is not None: + self.file_sender.stopProducing() + self.file_sender = None return defer.succeed(None) def start_transfer(self): @@ -292,15 +317,31 @@ class EncryptedFileReflectorClient(Protocol): class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient - def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri): - self.protocol_version = REFLECTOR_V2 - self.blob_manager = blob_manager - self.stream_info_manager = stream_info_manager - self.stream_hash = stream_hash - self.lbry_uri = lbry_uri + def __init__(self, lbry_file): + self._lbry_file = lbry_file self.p = None self.finished_deferred = defer.Deferred() + @property + def blob_manager(self): + return self._lbry_file.blob_manager + + @property + def stream_info_manager(self): + return self._lbry_file.stream_info_manager + + @property + def stream_hash(self): + return self._lbry_file.stream_hash + + @property + def lbry_uri(self): + return self._lbry_file.uri + + @property + def protocol_version(self): + return REFLECTOR_V2 + def buildProtocol(self, addr): p = self.protocol() p.factory = self diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 1a64f8a86..9c7432f48 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -1,45 +1,33 @@ -import logging -from twisted.internet import reactor -from twisted.internet.error import ConnectionLost, ConnectionDone -from lbrynet.reflector import BlobClientFactory, ClientFactory +import random -log = logging.getLogger(__name__) - - -def _check_if_reflector_has_stream(lbry_file, reflector_server): - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = BlobClientFactory( - lbry_file.blob_manager, - [lbry_file.sd_hash] - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - d.addCallback(lambda _: not factory.sent_blobs) - return d +from twisted.internet import reactor, defer +from lbrynet import conf +from lbrynet.reflector import ClientFactory, BlobClientFactory +@defer.inlineCallbacks def _reflect_stream(lbry_file, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory( - lbry_file.blob_manager, - lbry_file.stream_info_manager, - lbry_file.stream_hash, - lbry_file.uri - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d + factory = ClientFactory(lbry_file) + ip = yield reactor.resolve(reflector_address) + yield reactor.connectTCP(ip, reflector_port, factory) + yield factory.finished_deferred -def _catch_error(err, uri): - msg = "An error occurred while checking availability for lbry://%s: %s" - log.error(msg, uri, err.getTraceback()) +@defer.inlineCallbacks +def _reflect_blobs(blob_manager, blob_hashes, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = BlobClientFactory(blob_manager, blob_hashes) + ip = yield reactor.resolve(reflector_address) + yield reactor.connectTCP(ip, reflector_port, factory) + yield factory.finished_deferred -def check_and_restore_availability(lbry_file, reflector_server): - d = _reflect_stream(lbry_file, reflector_server) - d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost)) - d.addErrback(_catch_error, lbry_file.uri) - return d +def reflect_stream(lbry_file): + reflector_server = random.choice(conf.settings['reflector_servers']) + return _reflect_stream(lbry_file, reflector_server) + + +def reflect_blob_hashes(blob_hashes, blob_manager): + reflector_server = random.choice(conf.settings['reflector_servers']) + return _reflect_blobs(blob_manager, blob_hashes, reflector_server) diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 4d88842ab..7067d9776 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -170,12 +170,10 @@ class TestReflector(unittest.TestCase): return d def send_to_server(): - factory = reflector.ClientFactory( - self.session.blob_manager, - self.stream_info_manager, - self.stream_hash, - "fake_uri" - ) + fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, + self.stream_info_manager, + self.stream_hash) + factory = reflector.ClientFactory(fake_lbry_file) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory) diff --git a/tests/mocks.py b/tests/mocks.py index 96845ad43..69f89b15d 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -10,6 +10,15 @@ from lbrynet import conf KB = 2**10 + +class FakeLBRYFile(object): + def __init__(self, blob_manager, stream_info_manager, stream_hash, uri="fake_uri"): + self.blob_manager = blob_manager + self.stream_info_manager = stream_info_manager + self.stream_hash = stream_hash + self.uri = "fake_uri" + + class Node(object): def __init__(self, *args, **kwargs): pass