diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 78907de6a..686ee8c48 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.19 +current_version = 0.3.20 commit = True tag = True message = Bump version: {current_version} -> {new_version} diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index d83b7602f..3a92417c3 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,2 +1,2 @@ -__version__ = "0.3.19" +__version__ = "0.3.20" version = tuple(__version__.split('.')) \ No newline at end of file diff --git a/lbrynet/conf.py b/lbrynet/conf.py index f348fc85d..118a52f1f 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -28,6 +28,8 @@ SEARCH_SERVERS = ["http://lighthouse1.lbry.io:50005", "http://lighthouse2.lbry.io:50005", "http://lighthouse3.lbry.io:50005"] +REFLECTOR_SERVERS = [("reflector.lbry.io", 5566)] + LOG_FILE_NAME = "lbrynet.log" LOG_POST_URL = "https://lbry.io/log-upload" diff --git a/lbrynet/core/LBRYWallet.py b/lbrynet/core/LBRYWallet.py index 0bc584aca..7d275278b 100644 --- a/lbrynet/core/LBRYWallet.py +++ b/lbrynet/core/LBRYWallet.py @@ -489,6 +489,10 @@ class LBRYWallet(object): d.addCallback(self._get_decoded_tx) return d + def get_history(self): + d = self._get_history() + return d + def get_name_and_validity_for_sd_hash(self, sd_hash): d = self._get_claim_metadata_for_sd_hash(sd_hash) d.addCallback(lambda name_txid: self._get_status_of_claim(name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None) @@ -688,6 +692,9 @@ class LBRYWallet(object): def _get_balance_for_address(self, address): return defer.fail(NotImplementedError()) + def _get_history(self): + return defer.fail(NotImplementedError()) + def _start(self): pass @@ -823,6 +830,9 @@ class LBRYcrdWallet(LBRYWallet): def _get_value_for_name(self, name): return threads.deferToThread(self._get_value_for_name_rpc, name) + def _get_history(self): + return threads.deferToThread(self._list_transactions_rpc) + def _get_rpc_conn(self): return AuthServiceProxy(self.rpc_conn_string) @@ -1007,6 +1017,11 @@ class LBRYcrdWallet(LBRYWallet): rpc_conn = self._get_rpc_conn() return rpc_conn.getbestblockhash() + @_catch_connection_error + def _list_transactions_rpc(self): + rpc_conn = self._get_rpc_conn() + return rpc_conn.listtransactions() + @_catch_connection_error def _stop_rpc(self): # check if our lbrycrdd is actually running, or if we connected to one that was already @@ -1294,7 +1309,7 @@ class LBRYumWallet(LBRYWallet): func = getattr(self.cmd_runner, cmd.name) return threads.deferToThread(func) - def get_history(self): + def _get_history(self): cmd = known_commands['history'] func = getattr(self.cmd_runner, cmd.name) return threads.deferToThread(func) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index aa4381e3d..31aa86b74 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -73,7 +73,7 @@ class ClientProtocol(Protocol): return defer.fail(failure.Failure(ValueError("There is already a request for that response active"))) self._next_request.update(request.request_dict) d = defer.Deferred() - log.debug("Adding a request. Request: %s", str(request)) + log.debug("Adding a request. Request: %s", str(request.request_dict)) self._response_deferreds[request.response_identifier] = d return d diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 683935036..f82f19a20 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -172,7 +172,7 @@ class ConnectionManager(object): def pick_best_peer(peers): # TODO: Eventually rank them based on past performance/reputation. For now # TODO: just pick the first to which we don't have an open connection - log.debug("Got a list of peers to choose from: %s", str(peers)) + log.debug("Got a list of peers to choose from: %s", str(["%s:%i" % (p.host, p.port) for p in peers])) if peers is None: return None for peer in peers: diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index cc6bba682..c2f652e74 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -77,6 +77,10 @@ def disable_third_party_loggers(): def disable_noisy_loggers(): logging.getLogger('lbrynet.dht').setLevel(logging.INFO) + logging.getLogger('lbrynet.core.client.ConnectionManager').setLevel(logging.INFO) + logging.getLogger('lbrynet.core.client.BlobRequester').setLevel(logging.INFO) + logging.getLogger('lbrynet.core.client.ClientProtocol').setLevel(logging.INFO) + logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO) @_log_decorator diff --git a/lbrynet/core/server/BlobAvailabilityHandler.py b/lbrynet/core/server/BlobAvailabilityHandler.py index 5203f72d2..a5d550bdf 100644 --- a/lbrynet/core/server/BlobAvailabilityHandler.py +++ b/lbrynet/core/server/BlobAvailabilityHandler.py @@ -44,6 +44,7 @@ class BlobAvailabilityHandler(object): d = self._get_available_blobs(queries[self.query_identifiers[0]]) def set_field(available_blobs): + log.debug("available blobs: %s", str(available_blobs)) return {'available_blobs': available_blobs} d.addCallback(set_field) diff --git a/lbrynet/lbryfile/__init__.py b/lbrynet/lbryfile/__init__.py index e69de29bb..8cd10066a 100644 --- a/lbrynet/lbryfile/__init__.py +++ b/lbrynet/lbryfile/__init__.py @@ -0,0 +1,2 @@ +from lbrynet.lbryfile.StreamDescriptor import get_sd_info +from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 587e43733..753aeb157 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -49,7 +49,7 @@ from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, \ KNOWN_DHT_NODES, DEFAULT_MAX_KEY_FEE, DEFAULT_WALLET, \ DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, \ - LOG_POST_URL, LOG_FILE_NAME + LOG_POST_URL, LOG_FILE_NAME, REFLECTOR_SERVERS from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT from lbrynet.conf import DEFAULT_TIMEOUT from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader @@ -58,7 +58,7 @@ from lbrynet.core.PTCWallet import PTCWallet from lbrynet.core.LBRYWallet import LBRYcrdWallet, LBRYumWallet from lbrynet.lbryfilemanager.LBRYFileManager import LBRYFileManager from lbrynet.lbryfile.LBRYFileMetadataManager import DBLBRYFileMetadataManager, TempLBRYFileMetadataManager -# from lbryum import LOG_PATH as lbryum_log +from lbrynet import reflector # TODO: this code snippet is everywhere. Make it go away @@ -216,9 +216,11 @@ class LBRYDaemon(jsonrpc.JSONRPC): 'delete_blobs_on_remove': True, 'peer_port': 3333, 'dht_node_port': 4444, + 'reflector_port': 5566, 'use_upnp': True, 'start_lbrycrdd': True, 'requested_first_run_credits': False, + 'run_reflector_server': False, 'cache_time': DEFAULT_CACHE_TIME, 'startup_scripts': [], 'last_version': {'lbrynet': lbrynet_version, 'lbryum': lbryum_version} @@ -278,6 +280,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): self.search_timeout = self.session_settings['search_timeout'] self.download_timeout = self.session_settings['download_timeout'] self.max_search_results = self.session_settings['max_search_results'] + self.run_reflector_server = self.session_settings['run_reflector_server'] #### # # Ignore the saved wallet type. Some users will have their wallet type @@ -304,6 +307,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): #### self.delete_blobs_on_remove = self.session_settings['delete_blobs_on_remove'] self.peer_port = self.session_settings['peer_port'] + self.reflector_port = self.session_settings['reflector_port'] self.dht_node_port = self.session_settings['dht_node_port'] self.use_upnp = self.session_settings['use_upnp'] self.start_lbrycrdd = self.session_settings['start_lbrycrdd'] @@ -688,10 +692,10 @@ class LBRYDaemon(jsonrpc.JSONRPC): def _start_server(self): if self.peer_port is not None: - server_factory = ServerProtocolFactory(self.session.rate_limiter, self.query_handlers, self.session.peer_manager) + try: self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory) except error.CannotListenError as e: @@ -700,6 +704,33 @@ class LBRYDaemon(jsonrpc.JSONRPC): raise ValueError("%s lbrynet may already be running on your computer.", str(e)) return defer.succeed(True) + def _start_reflector(self): + if self.run_reflector_server: + log.info("Starting reflector server") + if self.reflector_port is not None: + reflector_factory = reflector.ServerFactory( + self.session.peer_manager, + self.session.blob_manager + ) + try: + self.reflector_server_port = reactor.listenTCP(self.reflector_port, reflector_factory) + log.info('Started reflector on port %s', self.reflector_port) + except error.CannotListenError as e: + log.exception("Couldn't bind reflector to port %d", self.reflector_port) + raise ValueError("{} lbrynet may already be running on your computer.".format(e)) + return defer.succeed(True) + + def _stop_reflector(self): + if self.run_reflector_server: + log.info("Stopping reflector server") + try: + if self.reflector_server_port is not None: + self.reflector_server_port, p = None, self.reflector_server_port + return defer.maybeDeferred(p.stopListening) + except AttributeError: + return defer.succeed(True) + return defer.succeed(True) + def _stop_server(self): try: if self.lbry_server_port is not None: @@ -713,7 +744,8 @@ class LBRYDaemon(jsonrpc.JSONRPC): def _setup_server(self): def restore_running_status(running): if running is True: - return self._start_server() + d = self._start_server() + d.addCallback(lambda _: self._start_reflector()) return defer.succeed(True) self.startup_status = STARTUP_STAGES[4] @@ -813,6 +845,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = self._upload_log(log_type="close", exclude_previous=False if self.first_run else True) d.addCallback(lambda _: self._stop_server()) + d.addCallback(lambda _: self._stop_reflector()) d.addErrback(lambda err: True) d.addCallback(lambda _: self.lbry_file_manager.stop()) d.addErrback(lambda err: True) @@ -1312,6 +1345,30 @@ class LBRYDaemon(jsonrpc.JSONRPC): d = defer.DeferredList([self._get_lbry_file('sd_hash', l.sd_hash) for l in self.lbry_file_manager.lbry_files]) return d + def _reflect(self, lbry_file): + if not lbry_file: + return defer.fail(Exception("no lbry file given to reflect")) + + stream_hash = lbry_file.stream_hash + + if stream_hash is None: + return defer.fail(Exception("no stream hash")) + + log.info("Reflecting stream: %s" % stream_hash) + + reflector_server = random.choice(REFLECTOR_SERVERS) + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + log.info("Start reflector client") + factory = reflector.ClientFactory( + self.session.blob_manager, + self.lbry_file_manager.stream_info_manager, + stream_hash + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + return d + def _log_to_slack(self, msg): URL = "https://hooks.slack.com/services/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" msg = platform.platform() + ": " + base58.b58encode(self.lbryid)[:20] + ", " + msg @@ -1886,6 +1943,12 @@ class LBRYDaemon(jsonrpc.JSONRPC): m['fee'][currency]['address'] = address return m + def _reflect_if_possible(sd_hash, txid): + d = self._get_lbry_file('sd_hash', sd_hash, return_json=False) + d.addCallback(self._reflect) + d.addCallback(lambda _: txid) + return d + name = p['name'] log.info("Publish: ") @@ -1902,8 +1965,10 @@ class LBRYDaemon(jsonrpc.JSONRPC): try: metadata = Metadata(p['metadata']) make_lbry_file = False + sd_hash = metadata['sources']['lbry_sd_hash'] except AssertionError: make_lbry_file = True + sd_hash = None metadata = p['metadata'] file_path = p['file_path'] @@ -1927,6 +1992,9 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda meta: pub.start(name, file_path, bid, meta)) else: d.addCallback(lambda meta: self.session.wallet.claim_name(name, bid, meta)) + if sd_hash: + d.addCallback(lambda txid: _reflect_if_possible(sd_hash, txid)) + d.addCallback(lambda txid: self._add_to_pending_claims(name, txid)) d.addCallback(lambda r: self._render_response(r, OK_CODE)) @@ -2356,6 +2424,45 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d + def jsonrpc_announce_all_blobs_to_dht(self): + """ + Announce all blobs to the dht + + Args: + None + Returns: + + """ + + d = self.session.blob_manager.immediate_announce_all_blobs() + d.addCallback(lambda _: self._render_response("Announced", OK_CODE)) + return d + + def jsonrpc_reflect(self, p): + """ + Reflect a stream + + Args: + sd_hash + Returns: + True or traceback + """ + + sd_hash = p['sd_hash'] + d = self._get_lbry_file('sd_hash', sd_hash, return_json=False) + d.addCallback(self._reflect) + d.addCallbacks(lambda _: self._render_response(True, OK_CODE), lambda err: self._render_response(err.getTraceback(), OK_CODE)) + return d + + def jsonrpc_get_blobs(self): + """ + return all blobs + """ + + d = defer.succeed(self.session.blob_manager.blobs) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" diff --git a/lbrynet/lbrynet_daemon/LBRYDaemonCLI.py b/lbrynet/lbrynet_daemon/LBRYDaemonCLI.py index 116fd32cb..1d9733b19 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemonCLI.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemonCLI.py @@ -4,7 +4,7 @@ import json from lbrynet.conf import API_CONNECTION_STRING from jsonrpc.proxy import JSONRPCProxy -help_msg = "Useage: lbrynet-cli method json-args\n" \ +help_msg = "Usage: lbrynet-cli method json-args\n" \ + "Examples: " \ + "lbrynet-cli resolve_name '{\"name\": \"what\"}'\n" \ + "lbrynet-cli get_balance\n" \ diff --git a/lbrynet/lbrynet_daemon/LBRYPublisher.py b/lbrynet/lbrynet_daemon/LBRYPublisher.py index 75ceb6093..04b9bdaab 100644 --- a/lbrynet/lbrynet_daemon/LBRYPublisher.py +++ b/lbrynet/lbrynet_daemon/LBRYPublisher.py @@ -2,6 +2,7 @@ import logging import mimetypes import os import sys +import random from appdirs import user_data_dir @@ -11,8 +12,9 @@ from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob from lbrynet.core.PaymentRateManager import PaymentRateManager from lbrynet.core.LBRYMetadata import Metadata, CURRENT_METADATA_VERSION from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader -from lbrynet.conf import LOG_FILE_NAME -from twisted.internet import threads, defer +from lbrynet import reflector +from lbrynet.conf import LOG_FILE_NAME, REFLECTOR_SERVERS +from twisted.internet import threads, defer, reactor if sys.platform != "darwin": log_dir = os.path.join(os.path.expanduser("~"), ".lbrynet") @@ -40,12 +42,14 @@ class Publisher(object): self.lbry_file = None self.txid = None self.stream_hash = None + reflector_server = random.choice(REFLECTOR_SERVERS) + self.reflector_server, self.reflector_port = reflector_server[0], reflector_server[1] self.metadata = {} def start(self, name, file_path, bid, metadata): - + log.info('Starting publish for %s', name) def _show_result(): - log.info("Published %s --> lbry://%s txid: %s", self.file_name, self.publish_name, self.txid) + log.info("Success! Published %s --> lbry://%s txid: %s", self.file_name, self.publish_name, self.txid) return defer.succeed(self.txid) self.publish_name = name @@ -60,10 +64,25 @@ class Publisher(object): d.addCallback(lambda _: self._create_sd_blob()) d.addCallback(lambda _: self._claim_name()) d.addCallback(lambda _: self.set_status()) + d.addCallback(lambda _: self.start_reflector()) d.addCallbacks(lambda _: _show_result(), self._show_publish_error) return d + def start_reflector(self): + reflector_server = random.choice(REFLECTOR_SERVERS) + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + log.info("Reflecting new publication") + factory = reflector.ClientFactory( + self.session.blob_manager, + self.lbry_file_manager.stream_info_manager, + self.stream_hash + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + return d + def _check_file_path(self, file_path): def check_file_threaded(): f = open(file_path) @@ -84,10 +103,13 @@ class Publisher(object): return d def _create_sd_blob(self): - d = publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager, + log.debug('Creating stream descriptor blob') + d = publish_sd_blob(self.lbry_file_manager.stream_info_manager, + self.session.blob_manager, self.lbry_file.stream_hash) def set_sd_hash(sd_hash): + log.debug('stream descriptor hash: %s', sd_hash) if 'sources' not in self.metadata: self.metadata['sources'] = {} self.metadata['sources']['lbry_sd_hash'] = sd_hash @@ -96,23 +118,29 @@ class Publisher(object): return d def set_status(self): + log.debug('Setting status') d = self.lbry_file_manager.change_lbry_file_status(self.lbry_file, ManagedLBRYFileDownloader.STATUS_FINISHED) d.addCallback(lambda _: self.lbry_file.restore()) return d def _claim_name(self): - self.metadata['content-type'] = mimetypes.guess_type(os.path.join(self.lbry_file.download_directory, - self.lbry_file.file_name))[0] - self.metadata['ver'] = CURRENT_METADATA_VERSION + log.debug('Claiming name') + self._update_metadata() m = Metadata(self.metadata) def set_tx_hash(txid): + log.debug('Name claimed using txid: %s', txid) self.txid = txid d = self.wallet.claim_name(self.publish_name, self.bid_amount, m) d.addCallback(set_tx_hash) return d + def _update_metadata(self): + filename = os.path.join(self.lbry_file.download_directory, self.lbry_file.file_name) + self.metadata['content-type'] = get_content_type(filename) + self.metadata['ver'] = CURRENT_METADATA_VERSION + def _show_publish_error(self, err): log.info(err.getTraceback()) message = "An error occurred publishing %s to %s. Error: %s." @@ -125,3 +153,7 @@ class Publisher(object): log.error(message, str(self.file_name), str(self.publish_name), err.getTraceback()) return defer.fail(Exception("Publish failed")) + + +def get_content_type(filename): + return mimetypes.guess_type(filename)[0] diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py new file mode 100644 index 000000000..10e8292e7 --- /dev/null +++ b/lbrynet/reflector/__init__.py @@ -0,0 +1,2 @@ +from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory +from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory as ClientFactory diff --git a/tests/lbrynet/__init__.py b/lbrynet/reflector/client/__init__.py similarity index 100% rename from tests/lbrynet/__init__.py rename to lbrynet/reflector/client/__init__.py diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py new file mode 100644 index 000000000..0fe2c8ce1 --- /dev/null +++ b/lbrynet/reflector/client/client.py @@ -0,0 +1,269 @@ +""" +The reflector protocol (all dicts encoded in json): + +Client Handshake (sent once per connection, at the start of the connection): + +{ + 'version': 0, +} + + +Server Handshake (sent once per connection, after receiving the client handshake): + +{ + 'version': 0, +} + + +Client Info Request: + +{ + 'blob_hash': "", + 'blob_size': +} + + +Server Info Response (sent in response to Client Info Request): + +{ + 'send_blob': True|False +} + +If response is 'YES', client may send a Client Blob Request or a Client Info Request. +If response is 'NO', client may only send a Client Info Request + + +Client Blob Request: + +{} # Yes, this is an empty dictionary, in case something needs to go here in the future + # this blob data must match the info sent in the most recent Client Info Request + + +Server Blob Response (sent in response to Client Blob Request): +{ + 'received_blob': True +} + +Client may now send another Client Info Request + +""" +import json +import logging +from twisted.protocols.basic import FileSender +from twisted.internet.protocol import Protocol, ClientFactory +from twisted.internet import defer, error + + +log = logging.getLogger(__name__) + + +class IncompleteResponseError(Exception): + pass + + +class LBRYFileReflectorClient(Protocol): + + # Protocol stuff + + def connectionMade(self): + self.blob_manager = self.factory.blob_manager + self.response_buff = '' + self.outgoing_buff = '' + self.blob_hashes_to_send = [] + self.next_blob_to_send = None + self.blob_read_handle = None + self.received_handshake_response = False + self.protocol_version = None + self.file_sender = None + self.producer = None + self.streaming = False + d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) + d.addCallback(lambda _: self.send_handshake()) + d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) + + def dataReceived(self, data): + log.debug('Recieved %s', data) + self.response_buff += data + try: + msg = self.parse_response(self.response_buff) + except IncompleteResponseError: + pass + else: + self.response_buff = '' + d = self.handle_response(msg) + d.addCallback(lambda _: self.send_next_request()) + d.addErrback(self.response_failure_handler) + + def connectionLost(self, reason): + if reason.check(error.ConnectionDone): + log.debug('Finished sending data via reflector') + self.factory.finished_deferred.callback(True) + else: + log.debug('reflector finished: %s', reason) + self.factory.finished_deferred.callback(reason) + + # IConsumer stuff + + def registerProducer(self, producer, streaming): + self.producer = producer + self.streaming = streaming + if self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + self.producer = None + + def write(self, data): + self.transport.write(data) + if self.producer is not None and self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) + + def get_blobs_to_send(self, stream_info_manager, stream_hash): + log.debug('Getting blobs from stream hash: %s', stream_hash) + d = stream_info_manager.get_blobs_for_stream(stream_hash) + + def set_blobs(blob_hashes): + for blob_hash, position, iv, length in blob_hashes: + log.info("Preparing to send %s", blob_hash) + if blob_hash is not None: + self.blob_hashes_to_send.append(blob_hash) + + d.addCallback(set_blobs) + + d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash)) + + def set_sd_blobs(sd_blob_hashes): + for sd_blob_hash in sd_blob_hashes: + self.blob_hashes_to_send.append(sd_blob_hash) + + d.addCallback(set_sd_blobs) + return d + + def send_handshake(self): + log.debug('Sending handshake') + self.write(json.dumps({'version': 0})) + + def parse_response(self, buff): + try: + return json.loads(buff) + except ValueError: + raise IncompleteResponseError() + + def response_failure_handler(self, err): + log.warning("An error occurred handling the response: %s", err.getTraceback()) + + def handle_response(self, response_dict): + if self.received_handshake_response is False: + return self.handle_handshake_response(response_dict) + else: + return self.handle_normal_response(response_dict) + + def set_not_uploading(self): + if self.next_blob_to_send is not None: + self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle = None + self.next_blob_to_send = None + self.file_sender = None + return defer.succeed(None) + + def start_transfer(self): + self.write(json.dumps({})) + assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" + d = self.file_sender.beginFileTransfer(self.read_handle, self) + return d + + def handle_handshake_response(self, response_dict): + if 'version' not in response_dict: + raise ValueError("Need protocol version number!") + self.protocol_version = int(response_dict['version']) + if self.protocol_version != 0: + raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) + self.received_handshake_response = True + return defer.succeed(True) + + def handle_normal_response(self, response_dict): + if self.file_sender is None: # Expecting Server Info Response + if 'send_blob' not in response_dict: + raise ValueError("I don't know whether to send the blob or not!") + if response_dict['send_blob'] is True: + self.file_sender = FileSender() + return defer.succeed(True) + else: + return self.set_not_uploading() + else: # Expecting Server Blob Response + if 'received_blob' not in response_dict: + raise ValueError("I don't know if the blob made it to the intended destination!") + else: + return self.set_not_uploading() + + def open_blob_for_reading(self, blob): + if blob.is_validated(): + read_handle = blob.open_for_reading() + if read_handle is not None: + log.debug('Getting ready to send %s', blob.blob_hash) + self.next_blob_to_send = blob + self.read_handle = read_handle + return None + raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) + + def send_blob_info(self): + log.info("Send blob info for %s", self.next_blob_to_send.blob_hash) + assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" + log.debug('sending blob info') + self.write(json.dumps({ + 'blob_hash': self.next_blob_to_send.blob_hash, + 'blob_size': self.next_blob_to_send.length + })) + + def send_next_request(self): + if self.file_sender is not None: + # send the blob + log.debug('Sending the blob') + return self.start_transfer() + elif self.blob_hashes_to_send: + # open the next blob to send + blob_hash = self.blob_hashes_to_send[0] + log.debug('No current blob, sending the next one: %s', blob_hash) + self.blob_hashes_to_send = self.blob_hashes_to_send[1:] + d = self.blob_manager.get_blob(blob_hash, True) + d.addCallback(self.open_blob_for_reading) + # send the server the next blob hash + length + d.addCallback(lambda _: self.send_blob_info()) + return d + else: + # close connection + log.debug('No more blob hashes, closing connection') + self.transport.loseConnection() + + +class LBRYFileReflectorClientFactory(ClientFactory): + protocol = LBRYFileReflectorClient + + def __init__(self, blob_manager, stream_info_manager, stream_hash): + self.blob_manager = blob_manager + self.stream_info_manager = stream_info_manager + self.stream_hash = stream_hash + self.p = None + self.finished_deferred = defer.Deferred() + + def buildProtocol(self, addr): + p = self.protocol() + p.factory = self + self.p = p + return p + + def startFactory(self): + log.debug('Starting reflector factory') + ClientFactory.startFactory(self) + + def startedConnecting(self, connector): + log.debug('Started connecting') + + def clientConnectionLost(self, connector, reason): + """If we get disconnected, reconnect to server.""" + log.debug("connection lost: %s", reason) + + def clientConnectionFailed(self, connector, reason): + log.debug("connection failed: %s", reason) diff --git a/tests/lbrynet/core/__init__.py b/lbrynet/reflector/server/__init__.py similarity index 100% rename from tests/lbrynet/core/__init__.py rename to lbrynet/reflector/server/__init__.py diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py new file mode 100644 index 000000000..38127dce2 --- /dev/null +++ b/lbrynet/reflector/server/server.py @@ -0,0 +1,148 @@ +import logging +from twisted.python import failure +from twisted.internet import error, defer +from twisted.internet.protocol import Protocol, ServerFactory +import json + +from lbrynet.core.utils import is_valid_blobhash + + +log = logging.getLogger(__name__) + + +class ReflectorServer(Protocol): + + def connectionMade(self): + peer_info = self.transport.getPeer() + log.debug('Connection made to %s', peer_info) + self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) + self.blob_manager = self.factory.blob_manager + self.received_handshake = False + self.peer_version = None + self.receiving_blob = False + self.incoming_blob = None + self.blob_write = None + self.blob_finished_d = None + self.cancel_write = None + self.request_buff = "" + + def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): + log.info("Reflector upload from %s finished" % self.peer.host) + + def dataReceived(self, data): + if self.receiving_blob: + # log.debug('Writing data to blob') + self.blob_write(data) + else: + log.debug('Not yet recieving blob, data needs further processing') + self.request_buff += data + msg, extra_data = self._get_valid_response(self.request_buff) + if msg is not None: + self.request_buff = '' + d = self.handle_request(msg) + d.addCallbacks(self.send_response, self.handle_error) + if self.receiving_blob and extra_data: + log.debug('Writing extra data to blob') + self.blob_write(extra_data) + + def _get_valid_response(self, response_msg): + extra_data = None + response = None + curr_pos = 0 + while True: + next_close_paren = response_msg.find('}', curr_pos) + if next_close_paren != -1: + curr_pos = next_close_paren + 1 + try: + response = json.loads(response_msg[:curr_pos]) + except ValueError: + if curr_pos > 100: + raise Exception("error decoding response") + else: + pass + else: + extra_data = response_msg[curr_pos:] + break + else: + break + return response, extra_data + + def handle_request(self, request_dict): + if self.received_handshake is False: + return self.handle_handshake(request_dict) + else: + return self.handle_normal_request(request_dict) + + def handle_handshake(self, request_dict): + log.debug('Handling handshake') + if 'version' not in request_dict: + raise ValueError("Client should send version") + self.peer_version = int(request_dict['version']) + if self.peer_version != 0: + raise ValueError("I don't know that version!") + self.received_handshake = True + return defer.succeed({'version': 0}) + + def determine_blob_needed(self, blob): + if blob.is_validated(): + return {'send_blob': False} + else: + self.incoming_blob = blob + self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + self.blob_finished_d.addCallback(lambda _ :self.blob_manager.blob_completed(blob)) + return {'send_blob': True} + + def close_blob(self): + self.blob_finished_d = None + self.blob_write = None + self.cancel_write = None + self.incoming_blob = None + self.receiving_blob = False + + def handle_normal_request(self, request_dict): + if self.blob_write is None: + # we haven't opened a blob yet, meaning we must be waiting for the + # next message containing a blob hash and a length. this message + # should be it. if it's one we want, open the blob for writing, and + # return a nice response dict (in a Deferred) saying go ahead + if not 'blob_hash' in request_dict or not 'blob_size' in request_dict: + raise ValueError("Expected a blob hash and a blob size") + if not is_valid_blobhash(request_dict['blob_hash']): + raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash'])) + log.debug('Recieved info for blob: %s', request_dict['blob_hash']) + d = self.blob_manager.get_blob( + request_dict['blob_hash'], + True, + int(request_dict['blob_size']) + ) + d.addCallback(self.determine_blob_needed) + else: + # we have a blob open already, so this message should have nothing + # important in it. to the deferred that fires when the blob is done, + # add a callback which returns a nice response dict saying to keep + # sending, and then return that deferred + log.debug('blob is already open') + self.receiving_blob = True + d = self.blob_finished_d + d.addCallback(lambda _: self.close_blob()) + d.addCallback(lambda _: {'received_blob': True}) + return d + + def send_response(self, response_dict): + self.transport.write(json.dumps(response_dict)) + + def handle_error(self, err): + log.error(err.getTraceback()) + self.transport.loseConnection() + + +class ReflectorServerFactory(ServerFactory): + protocol = ReflectorServer + + def __init__(self, peer_manager, blob_manager): + self.peer_manager = peer_manager + self.blob_manager = blob_manager + + def buildProtocol(self, addr): + log.debug('Creating a protocol for %s', addr) + return ServerFactory.buildProtocol(self, addr) diff --git a/packaging/ubuntu/lbry.desktop b/packaging/ubuntu/lbry.desktop index 061299934..1af5a588c 100644 --- a/packaging/ubuntu/lbry.desktop +++ b/packaging/ubuntu/lbry.desktop @@ -1,5 +1,5 @@ [Desktop Entry] -Version=0.3.19 +Version=0.3.20 Name=LBRY Comment=The world's first user-owned content marketplace Icon=lbry diff --git a/tests/lbrynet/core/server/__init__.py b/tests/functional/__init__.py similarity index 100% rename from tests/lbrynet/core/server/__init__.py rename to tests/functional/__init__.py diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py new file mode 100644 index 000000000..691653eae --- /dev/null +++ b/tests/functional/test_reflector.py @@ -0,0 +1,190 @@ +import os +import shutil + +from twisted.internet import defer, threads, error +from twisted.trial import unittest + +from lbrynet import conf +from lbrynet import lbryfile +from lbrynet import reflector +from lbrynet.core import BlobManager +from lbrynet.core import PeerManager +from lbrynet.core import RateLimiter +from lbrynet.core import Session +from lbrynet.core import StreamDescriptor +from lbrynet.lbryfile import LBRYFileMetadataManager +from lbrynet.lbryfile.client import LBRYFileOptions +from lbrynet.lbryfilemanager import LBRYFileCreator +from lbrynet.lbryfilemanager import LBRYFileManager + +from tests import mocks + + +class TestReflector(unittest.TestCase): + def setUp(self): + self.session = None + self.stream_info_manager = None + self.lbry_file_manager = None + self.server_blob_manager = None + self.reflector_port = None + self.addCleanup(self.take_down_env) + + def take_down_env(self): + d = defer.succeed(True) + if self.lbry_file_manager is not None: + d.addCallback(lambda _: self.lbry_file_manager.stop()) + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) + if self.stream_info_manager is not None: + d.addCallback(lambda _: self.stream_info_manager.stop()) + if self.server_blob_manager is not None: + d.addCallback(lambda _: self.server_blob_manager.stop()) + if self.reflector_port is not None: + d.addCallback(lambda _: self.reflector_port.stopListening()) + + def delete_test_env(): + shutil.rmtree('client') + + d.addCallback(lambda _: threads.deferToThread(delete_test_env)) + return d + + def test_reflector(self): + wallet = mocks.Wallet() + peer_manager = PeerManager.PeerManager() + peer_finder = mocks.PeerFinder(5553, peer_manager, 2) + hash_announcer = mocks.Announcer() + rate_limiter = RateLimiter.DummyRateLimiter() + sd_identifier = StreamDescriptor.StreamDescriptorIdentifier() + + self.expected_blobs = [ + ( + 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b' + '441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', + 2097152 + ), + ( + 'f4067522c1b49432a2a679512e3917144317caa1abba0c04' + '1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', + 2097152 + ), + ( + '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1' + 'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', + 1015056 + ), + ] + + db_dir = "client" + os.mkdir(db_dir) + + self.session = Session.LBRYSession( + conf.MIN_BLOB_DATA_PAYMENT_RATE, + db_dir=db_dir, + lbryid="abcd", + peer_finder=peer_finder, + hash_announcer=hash_announcer, + blob_dir=None, + peer_port=5553, + use_upnp=False, + rate_limiter=rate_limiter, + wallet=wallet + ) + + self.stream_info_manager = LBRYFileMetadataManager.TempLBRYFileMetadataManager() + + self.lbry_file_manager = LBRYFileManager.LBRYFileManager( + self.session, self.stream_info_manager, sd_identifier) + + self.server_blob_manager = BlobManager.TempBlobManager(hash_announcer) + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: LBRYFileOptions.add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: self.server_blob_manager.setup()) + + def verify_equal(sd_info): + self.assertEqual(mocks.create_stream_sd_file, sd_info) + + def save_sd_blob_hash(sd_hash): + self.expected_blobs.append((sd_hash, 923)) + + def verify_stream_descriptor_file(stream_hash): + d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) + d.addCallback(verify_equal) + d.addCallback( + lambda _: lbryfile.publish_sd_blob( + self.lbry_file_manager.stream_info_manager, + self.session.blob_manager, stream_hash + ) + ) + d.addCallback(save_sd_blob_hash) + d.addCallback(lambda _: stream_hash) + return d + + def create_stream(): + test_file = mocks.GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)])) + d = LBRYFileCreator.create_lbry_file( + self.session, + self.lbry_file_manager, + "test_file", + test_file, + key="0123456701234567", + iv_generator=iv_generator() + ) + return d + + def start_server(): + server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager) + from twisted.internet import reactor + port = 8943 + while self.reflector_port is None: + try: + self.reflector_port = reactor.listenTCP(port, server_factory) + except error.CannotListenError: + port += 1 + return defer.succeed(port) + + def send_to_server(port, stream_hash): + factory = reflector.ClientFactory( + self.session.blob_manager, + self.stream_info_manager, + stream_hash + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def upload_to_reflector(stream_hash): + d = start_server() + d.addCallback(lambda port: send_to_server(port, stream_hash)) + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + d.addCallback(lambda _: create_stream()) + d.addCallback(verify_stream_descriptor_file) + d.addCallback(upload_to_reflector) + return d + + +def iv_generator(): + iv = 0 + while True: + iv += 1 + yield "%016d" % iv diff --git a/tests/functional_tests.py b/tests/functional_tests.py index 76af2b5ed..c9e062d1c 100644 --- a/tests/functional_tests.py +++ b/tests/functional_tests.py @@ -26,7 +26,7 @@ from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.lbryfilemanager.LBRYFileCreator import create_lbry_file from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.StreamDescriptor import get_sd_info -from twisted.internet import defer, threads, task +from twisted.internet import defer, threads, task, error from twisted.trial.unittest import TestCase from twisted.python.failure import Failure import os @@ -38,6 +38,10 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier +from lbrynet.core.BlobManager import TempBlobManager +from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory +from lbrynet.reflector.server.server import ReflectorServerFactory +from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob log_format = "%(funcName)s(): %(message)s" @@ -1369,4 +1373,147 @@ class TestStreamify(TestCase): d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: create_stream()) d.addCallback(combine_stream) + return d + + +class TestReflector(TestCase): + + def setUp(self): + self.session = None + self.stream_info_manager = None + self.lbry_file_manager = None + self.server_blob_manager = None + self.reflector_port = None + self.addCleanup(self.take_down_env) + + def take_down_env(self): + + d = defer.succeed(True) + if self.lbry_file_manager is not None: + d.addCallback(lambda _: self.lbry_file_manager.stop()) + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) + if self.stream_info_manager is not None: + d.addCallback(lambda _: self.stream_info_manager.stop()) + if self.server_blob_manager is not None: + d.addCallback(lambda _: self.server_blob_manager.stop()) + if self.reflector_port is not None: + d.addCallback(lambda _: self.reflector_port.stopListening()) + + def delete_test_env(): + shutil.rmtree('client') + + d.addCallback(lambda _: threads.deferToThread(delete_test_env)) + return d + + def test_reflector(self): + + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 2) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + self.expected_blobs = [ + ('dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b' + '441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', 2097152), + ('f4067522c1b49432a2a679512e3917144317caa1abba0c04' + '1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', 2097152), + ('305486c434260484fcb2968ce0e963b72f81ba56c11b08b1' + 'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', 1015056), + ] + + db_dir = "client" + os.mkdir(db_dir) + + self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=None, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + + self.stream_info_manager = TempLBRYFileMetadataManager() + + self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier) + + self.server_blob_manager = TempBlobManager(hash_announcer) + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: self.server_blob_manager.setup()) + + def verify_equal(sd_info): + self.assertEqual(sd_info, test_create_stream_sd_file) + + def save_sd_blob_hash(sd_hash): + self.expected_blobs.append((sd_hash, 923)) + + def verify_stream_descriptor_file(stream_hash): + d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) + d.addCallback(verify_equal) + d.addCallback(lambda _: publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager, stream_hash)) + d.addCallback(save_sd_blob_hash) + d.addCallback(lambda _: stream_hash) + return d + + def iv_generator(): + iv = 0 + while 1: + iv += 1 + yield "%016d" % iv + + def create_stream(): + test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)])) + d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, + key="0123456701234567", iv_generator=iv_generator()) + return d + + def start_server(): + server_factory = ReflectorServerFactory(peer_manager, self.server_blob_manager) + from twisted.internet import reactor + port = 8943 + while self.reflector_port is None: + try: + self.reflector_port = reactor.listenTCP(port, server_factory) + except error.CannotListenError: + port += 1 + return defer.succeed(port) + + def send_to_server(port, stream_hash): + factory = LBRYFileReflectorClientFactory( + self.session.blob_manager, + self.stream_info_manager, + stream_hash + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def upload_to_reflector(stream_hash): + d = start_server() + d.addCallback(lambda port: send_to_server(port, stream_hash)) + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + d.addCallback(lambda _: create_stream()) + d.addCallback(verify_stream_descriptor_file) + d.addCallback(upload_to_reflector) return d \ No newline at end of file diff --git a/tests/mocks.py b/tests/mocks.py new file mode 100644 index 000000000..6ea183195 --- /dev/null +++ b/tests/mocks.py @@ -0,0 +1,164 @@ +import io + +from Crypto.PublicKey import RSA +from twisted.internet import defer, threads, task, error + +from lbrynet.core import PTCWallet + + +class Node(object): + def __init__(self, *args, **kwargs): + pass + + def joinNetwork(self, *args): + pass + + def stop(self): + pass + + +class Wallet(object): + def __init__(self): + self.private_key = RSA.generate(1024) + self.encoded_public_key = self.private_key.publickey().exportKey() + + def start(self): + return defer.succeed(True) + + def stop(self): + return defer.succeed(True) + + def get_info_exchanger(self): + return PTCWallet.PointTraderKeyExchanger(self) + + def get_wallet_info_query_handler_factory(self): + return PTCWallet.PointTraderKeyQueryHandlerFactory(self) + + def reserve_points(self, *args): + return True + + def cancel_point_reservation(self, *args): + pass + + def send_points(self, *args): + return defer.succeed(True) + + def add_expected_payment(self, *args): + pass + + def get_balance(self): + return defer.succeed(1000) + + def set_public_key_for_peer(self, peer, public_key): + pass + + +class PeerFinder(object): + def __init__(self, start_port, peer_manager, num_peers): + self.start_port = start_port + self.peer_manager = peer_manager + self.num_peers = num_peers + self.count = 0 + + def find_peers_for_blob(self, *args): + peer_port = self.start_port + self.count + self.count += 1 + if self.count >= self.num_peers: + self.count = 0 + return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)]) + + def run_manage_loop(self): + pass + + def stop(self): + pass + + +class Announcer(object): + def __init__(self, *args): + pass + + def add_supplier(self, supplier): + pass + + def immediate_announce(self, *args): + pass + + def run_manage_loop(self): + pass + + def stop(self): + pass + + +class GenFile(io.RawIOBase): + def __init__(self, size, pattern): + io.RawIOBase.__init__(self) + self.size = size + self.pattern = pattern + self.read_so_far = 0 + self.buff = b'' + self.last_offset = 0 + + def readable(self): + return True + + def writable(self): + return False + + def read(self, n=-1): + if n > -1: + bytes_to_read = min(n, self.size - self.read_so_far) + else: + bytes_to_read = self.size - self.read_so_far + output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] + bytes_to_read -= len(output) + while bytes_to_read > 0: + self.buff = self._generate_chunk() + new_output, self.buff = self.buff[:bytes_to_read], self.buff[bytes_to_read:] + bytes_to_read -= len(new_output) + output += new_output + self.read_so_far += len(output) + return output + + def readall(self): + return self.read() + + def _generate_chunk(self, n=2**10): + output = self.pattern[self.last_offset:self.last_offset + n] + n_left = n - len(output) + whole_patterns = n_left / len(self.pattern) + output += self.pattern * whole_patterns + self.last_offset = n - len(output) + output += self.pattern[:self.last_offset] + return output + + +create_stream_sd_file = { + 'stream_name': '746573745f66696c65', + 'blobs': [ + { + 'length': 2097152, + 'blob_num': 0, + 'blob_hash': 'dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', + 'iv': '30303030303030303030303030303031' + }, + { + 'length': 2097152, + 'blob_num': 1, + 'blob_hash': 'f4067522c1b49432a2a679512e3917144317caa1abba0c041e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', + 'iv': '30303030303030303030303030303032' + }, + { + 'length': 1015056, + 'blob_num': 2, + 'blob_hash': '305486c434260484fcb2968ce0e963b72f81ba56c11b08b1af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', + 'iv': '30303030303030303030303030303033' + }, + {'length': 0, 'blob_num': 3, 'iv': '30303030303030303030303030303034'} + ], + 'stream_type': 'lbryfile', + 'key': '30313233343536373031323334353637', + 'suggested_file_name': '746573745f66696c65', + 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f' +} diff --git a/tests/lbrynet/lbrynet_daemon/__init__.py b/tests/unit/__init__.py similarity index 100% rename from tests/lbrynet/lbrynet_daemon/__init__.py rename to tests/unit/__init__.py diff --git a/tests/unit/core/__init__.py b/tests/unit/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/core/server/__init__.py b/tests/unit/core/server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lbrynet/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py similarity index 100% rename from tests/lbrynet/core/server/test_BlobRequestHandler.py rename to tests/unit/core/server/test_BlobRequestHandler.py diff --git a/tests/lbrynet/core/test_LBRYExchangeRateManager.py b/tests/unit/core/test_LBRYExchangeRateManager.py similarity index 82% rename from tests/lbrynet/core/test_LBRYExchangeRateManager.py rename to tests/unit/core/test_LBRYExchangeRateManager.py index 2a6457536..e07b49194 100644 --- a/tests/lbrynet/core/test_LBRYExchangeRateManager.py +++ b/tests/unit/core/test_LBRYExchangeRateManager.py @@ -19,12 +19,10 @@ class LBRYFeeFormatTest(unittest.TestCase): class LBRYFeeTest(unittest.TestCase): def setUp(self): - self.patcher = mock.patch('time.time') - self.time = self.patcher.start() + patcher = mock.patch('time.time') + self.time = patcher.start() self.time.return_value = 0 - - def tearDown(self): - self.time.stop() + self.addCleanup(patcher.stop) def test_fee_converts_to_lbc(self): fee_dict = { @@ -35,4 +33,4 @@ class LBRYFeeTest(unittest.TestCase): } rates = {'BTCLBC': {'spot': 3.0, 'ts': 2}, 'USDBTC': {'spot': 2.0, 'ts': 3}} manager = LBRYExchangeRateManager.DummyExchangeRateManager(rates) - self.assertEqual(60.0, manager.to_lbc(fee_dict).amount) \ No newline at end of file + self.assertEqual(60.0, manager.to_lbc(fee_dict).amount) diff --git a/tests/lbrynet/core/test_LBRYMetadata.py b/tests/unit/core/test_LBRYMetadata.py similarity index 100% rename from tests/lbrynet/core/test_LBRYMetadata.py rename to tests/unit/core/test_LBRYMetadata.py diff --git a/tests/lbrynet/core/test_utils.py b/tests/unit/core/test_utils.py similarity index 100% rename from tests/lbrynet/core/test_utils.py rename to tests/unit/core/test_utils.py diff --git a/tests/unit/lbrynet_daemon/__init__.py b/tests/unit/lbrynet_daemon/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lbrynet/lbrynet_daemon/test_LBRYDaemon.py b/tests/unit/lbrynet_daemon/test_LBRYDaemon.py similarity index 100% rename from tests/lbrynet/lbrynet_daemon/test_LBRYDaemon.py rename to tests/unit/lbrynet_daemon/test_LBRYDaemon.py