From a2eb0cad33ef2830d884905781649118f879facf Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 6 Apr 2017 19:56:36 -0400 Subject: [PATCH] delete live lbrylive livestreaming code --- lbrynet/lbrylive/LiveBlob.py | 24 -- lbrynet/lbrylive/LiveStreamCreator.py | 176 -------- lbrynet/lbrylive/LiveStreamMetadataManager.py | 390 ------------------ lbrynet/lbrylive/PaymentRateManager.py | 53 --- lbrynet/lbrylive/StdinUploader.py | 122 ------ lbrynet/lbrylive/StdoutDownloader.py | 102 ----- lbrynet/lbrylive/StreamDescriptor.py | 138 ------- lbrynet/lbrylive/__init__.py | 0 .../lbrylive/client/LiveStreamDownloader.py | 180 -------- .../client/LiveStreamMetadataHandler.py | 347 ---------------- lbrynet/lbrylive/client/LiveStreamOptions.py | 74 ---- .../client/LiveStreamProgressManager.py | 91 ---- lbrynet/lbrylive/client/__init__.py | 0 .../server/LiveBlobInfoQueryHandler.py | 184 --------- lbrynet/lbrylive/server/__init__.py | 0 tests/functional/test_misc.py | 231 ----------- 16 files changed, 2112 deletions(-) delete mode 100644 lbrynet/lbrylive/LiveBlob.py delete mode 100644 lbrynet/lbrylive/LiveStreamCreator.py delete mode 100644 lbrynet/lbrylive/LiveStreamMetadataManager.py delete mode 100644 lbrynet/lbrylive/PaymentRateManager.py delete mode 100644 lbrynet/lbrylive/StdinUploader.py delete mode 100644 lbrynet/lbrylive/StdoutDownloader.py delete mode 100644 lbrynet/lbrylive/StreamDescriptor.py delete mode 100644 lbrynet/lbrylive/__init__.py delete mode 100644 lbrynet/lbrylive/client/LiveStreamDownloader.py delete mode 100644 lbrynet/lbrylive/client/LiveStreamMetadataHandler.py delete mode 100644 lbrynet/lbrylive/client/LiveStreamOptions.py delete mode 100644 lbrynet/lbrylive/client/LiveStreamProgressManager.py delete mode 100644 lbrynet/lbrylive/client/__init__.py delete mode 100644 lbrynet/lbrylive/server/LiveBlobInfoQueryHandler.py delete mode 100644 lbrynet/lbrylive/server/__init__.py diff --git a/lbrynet/lbrylive/LiveBlob.py b/lbrynet/lbrylive/LiveBlob.py deleted file mode 100644 index 46bf54f7a..000000000 --- a/lbrynet/lbrylive/LiveBlob.py +++ /dev/null @@ -1,24 +0,0 @@ -# pylint: skip-file -from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker, CryptBlobInfo -import binascii - - -class LiveBlobInfo(CryptBlobInfo): - def __init__(self, blob_hash, blob_num, length, iv, revision, signature): - CryptBlobInfo.__init__(self, blob_hash, blob_num, length, iv) - self.revision = revision - self.signature = signature - - -class LiveStreamBlobMaker(CryptStreamBlobMaker): - def __init__(self, key, iv, blob_num, blob): - CryptStreamBlobMaker.__init__(self, key, iv, blob_num, blob) - # The following is a placeholder for a currently unimplemented feature. - # In the future it may be possible for the live stream creator to overwrite a blob - # with a newer revision. If that happens, the 0 will be incremented to the - # actual revision count - self.revision = 0 - - def _return_info(self, blob_hash): - return LiveBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv), - self.revision, None) diff --git a/lbrynet/lbrylive/LiveStreamCreator.py b/lbrynet/lbrylive/LiveStreamCreator.py deleted file mode 100644 index f0710c5fc..000000000 --- a/lbrynet/lbrylive/LiveStreamCreator.py +++ /dev/null @@ -1,176 +0,0 @@ -# pylint: skip-file -from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter -from lbrynet.lbrylive.StreamDescriptor import get_sd_info -from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator -from lbrynet.lbrylive.LiveBlob import LiveStreamBlobMaker -from lbrynet.core.cryptoutils import get_lbry_hash_obj, get_pub_key, sign_with_pass_phrase -from Crypto import Random -import binascii -import logging -from lbrynet import conf -from twisted.internet import interfaces, defer -from twisted.protocols.basic import FileSender -from zope.interface import implements - - -log = logging.getLogger(__name__) - - -class LiveStreamCreator(CryptStreamCreator): - def __init__(self, blob_manager, stream_info_manager, name=None, key=None, iv_generator=None, - delete_after_num=None, secret_pass_phrase=None): - CryptStreamCreator.__init__(self, blob_manager, name, key, iv_generator) - self.stream_hash = None - self.stream_info_manager = stream_info_manager - self.delete_after_num = delete_after_num - self.secret_pass_phrase = secret_pass_phrase - self.file_extension = conf.settings['CRYPTSD_FILE_EXTENSION'] - self.finished_blob_hashes = {} - - def _save_stream(self): - d = self.stream_info_manager.save_stream(self.stream_hash, get_pub_key(self.secret_pass_phrase), - binascii.hexlify(self.name), binascii.hexlify(self.key), - []) - return d - - def _blob_finished(self, blob_info): - log.debug("In blob_finished") - log.debug("length: %s", str(blob_info.length)) - sig_hash = get_lbry_hash_obj() - sig_hash.update(self.stream_hash) - if blob_info.length != 0: - sig_hash.update(blob_info.blob_hash) - sig_hash.update(str(blob_info.blob_num)) - sig_hash.update(str(blob_info.revision)) - sig_hash.update(blob_info.iv) - sig_hash.update(str(blob_info.length)) - signature = sign_with_pass_phrase(sig_hash.digest(), self.secret_pass_phrase) - blob_info.signature = signature - self.finished_blob_hashes[blob_info.blob_num] = blob_info.blob_hash - if self.delete_after_num is not None: - self._delete_old_blobs(blob_info.blob_num) - d = self.stream_info_manager.add_blobs_to_stream(self.stream_hash, [blob_info]) - - def log_add_error(err): - log.error("An error occurred adding a blob info to the stream info manager: %s", err.getErrorMessage()) - return err - - d.addErrback(log_add_error) - log.debug("returning from blob_finished") - return d - - def setup(self): - """Create the secret pass phrase if it wasn't provided, compute the stream hash, - save the stream to the stream info manager, and return the stream hash - """ - if self.secret_pass_phrase is None: - self.secret_pass_phrase = Random.new().read(512) - - d = CryptStreamCreator.setup(self) - - def make_stream_hash(): - hashsum = get_lbry_hash_obj() - hashsum.update(binascii.hexlify(self.name)) - hashsum.update(get_pub_key(self.secret_pass_phrase)) - hashsum.update(binascii.hexlify(self.key)) - self.stream_hash = hashsum.hexdigest() - return self.stream_hash - - d.addCallback(lambda _: make_stream_hash()) - d.addCallback(lambda _: self._save_stream()) - d.addCallback(lambda _: self.stream_hash) - return d - - def publish_stream_descriptor(self): - descriptor_writer = BlobStreamDescriptorWriter(self.blob_manager) - d = get_sd_info(self.stream_info_manager, self.stream_hash, False) - d.addCallback(descriptor_writer.create_descriptor) - return d - - def _delete_old_blobs(self, newest_blob_num): - assert self.delete_after_num is not None, "_delete_old_blobs called with delete_after_num=None" - oldest_to_keep = newest_blob_num - self.delete_after_num + 1 - nums_to_delete = [num for num in self.finished_blob_hashes.iterkeys() if num < oldest_to_keep] - for num in nums_to_delete: - self.blob_manager.delete_blobs([self.finished_blob_hashes[num]]) - del self.finished_blob_hashes[num] - - def _get_blob_maker(self, iv, blob_creator): - return LiveStreamBlobMaker(self.key, iv, self.blob_count, blob_creator) - - -class StdOutLiveStreamCreator(LiveStreamCreator): - def __init__(self, stream_name, blob_manager, stream_info_manager): - LiveStreamCreator.__init__(self, blob_manager, stream_info_manager, stream_name, - delete_after_num=20) - - def start_streaming(self): - stdin_producer = StdinStreamProducer(self) - d = stdin_producer.begin_producing() - - def stop_stream(): - d = self.stop() - return d - - d.addCallback(lambda _: stop_stream()) - return d - - -class FileLiveStreamCreator(LiveStreamCreator): - def __init__(self, blob_manager, stream_info_manager, file_name, file_handle, - secret_pass_phrase=None, key=None, iv_generator=None, stream_name=None): - if stream_name is None: - stream_name = file_name - LiveStreamCreator.__init__(self, blob_manager, stream_info_manager, stream_name, - secret_pass_phrase, key, iv_generator) - self.file_name = file_name - self.file_handle = file_handle - - def start_streaming(self): - file_sender = FileSender() - d = file_sender.beginFileTransfer(self.file_handle, self) - - def stop_stream(): - d = self.stop() - return d - - d.addCallback(lambda _: stop_stream()) - return d - - -class StdinStreamProducer(object): - """This class reads data from standard in and sends it to a stream creator""" - - implements(interfaces.IPushProducer) - - def __init__(self, consumer): - self.consumer = consumer - self.reader = None - self.finished_deferred = None - - def begin_producing(self): - - self.finished_deferred = defer.Deferred() - self.consumer.registerProducer(self, True) - self.resumeProducing() - return self.finished_deferred - - def resumeProducing(self): - if self.reader is not None: - self.reader.resumeProducing() - - def stopProducing(self): - if self.reader is not None: - self.reader.stopReading() - self.consumer.unregisterProducer() - self.finished_deferred.callback(True) - - def pauseProducing(self): - if self.reader is not None: - self.reader.pauseProducing() - - def childDataReceived(self, fd, data): - self.consumer.write(data) - - def childConnectionLost(self, fd, reason): - self.stopProducing() diff --git a/lbrynet/lbrylive/LiveStreamMetadataManager.py b/lbrynet/lbrylive/LiveStreamMetadataManager.py deleted file mode 100644 index b809148cd..000000000 --- a/lbrynet/lbrylive/LiveStreamMetadataManager.py +++ /dev/null @@ -1,390 +0,0 @@ -# pylint: skip-file -import time -import logging -from twisted.enterprise import adbapi -import os -import sqlite3 -from twisted.internet import defer -from twisted.python.failure import Failure -from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier -from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash -from lbrynet.core.sqlite_helpers import rerun_if_locked - - -log = logging.getLogger(__name__) - - -class DBLiveStreamMetadataManager(DHTHashSupplier): - """This class stores all stream info in a leveldb database stored in the same directory as the blobfiles""" - - def __init__(self, db_dir, hash_announcer): - DHTHashSupplier.__init__(self, hash_announcer) - self.db_dir = db_dir - self.db_conn = None - - def setup(self): - return self._open_db() - - def stop(self): - self.db_conn = None - return defer.succeed(True) - - def get_all_streams(self): - return self._get_all_streams() - - def save_stream(self, stream_hash, pub_key, file_name, key, blobs): - next_announce_time = time.time() + self.hash_reannounce_time - d = self._store_stream(stream_hash, pub_key, file_name, key, - next_announce_time=next_announce_time) - - def save_blobs(): - return self.add_blobs_to_stream(stream_hash, blobs) - - def announce_have_stream(): - if self.hash_announcer is not None: - self.hash_announcer.immediate_announce([stream_hash]) - return stream_hash - - d.addCallback(lambda _: save_blobs()) - d.addCallback(lambda _: announce_have_stream()) - return d - - def get_stream_info(self, stream_hash): - return self._get_stream_info(stream_hash) - - def check_if_stream_exists(self, stream_hash): - return self._check_if_stream_exists(stream_hash) - - def delete_stream(self, stream_hash): - return self._delete_stream(stream_hash) - - def add_blobs_to_stream(self, stream_hash, blobs): - return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True) - - def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): - log.info("Getting blobs for a stream. Count is %s", str(count)) - - def get_positions_of_start_and_end(): - if start_blob is not None: - d1 = self._get_blob_num_by_hash(stream_hash, start_blob) - else: - d1 = defer.succeed(None) - if end_blob is not None: - d2 = self._get_blob_num_by_hash(stream_hash, end_blob) - else: - d2 = defer.succeed(None) - - dl = defer.DeferredList([d1, d2]) - - def get_positions(results): - start_num = None - end_num = None - if results[0][0] is True: - start_num = results[0][1] - if results[1][0] is True: - end_num = results[1][1] - return start_num, end_num - - dl.addCallback(get_positions) - return dl - - def get_blob_infos(nums): - start_num, end_num = nums - return self._get_further_blob_infos(stream_hash, start_num, end_num, - count, reverse) - - d = get_positions_of_start_and_end() - d.addCallback(get_blob_infos) - return d - - def get_stream_of_blob(self, blob_hash): - return self._get_stream_of_blobhash(blob_hash) - - def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) - - def get_sd_blob_hashes_for_stream(self, stream_hash): - return self._get_sd_blob_hashes_for_stream(stream_hash) - - def hashes_to_announce(self): - next_announce_time = time.time() + self.hash_reannounce_time - return self._get_streams_to_announce(next_announce_time) - - ######### database calls ######### - - def _open_db(self): - # check_same_thread=False is solely to quiet a spurious error that appears to be due - # to a bug in twisted, where the connection is closed by a different thread than the - # one that opened it. The individual connections in the pool are not used in multiple - # threads. - self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "live_stream.db"), - check_same_thread=False) - - def create_tables(transaction): - transaction.execute("create table if not exists live_streams (" + - " stream_hash text primary key, " + - " public_key text, " + - " key text, " + - " stream_name text, " + - " next_announce_time real" + - ")") - transaction.execute("create table if not exists live_stream_blobs (" + - " blob_hash text, " + - " stream_hash text, " + - " position integer, " + - " revision integer, " + - " iv text, " + - " length integer, " + - " signature text, " + - " foreign key(stream_hash) references live_streams(stream_hash)" + - ")") - transaction.execute("create table if not exists live_stream_descriptors (" + - " sd_blob_hash TEXT PRIMARY KEY, " + - " stream_hash TEXT, " + - " foreign key(stream_hash) references live_streams(stream_hash)" + - ")") - - return self.db_conn.runInteraction(create_tables) - - @rerun_if_locked - def _delete_stream(self, stream_hash): - - d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) - d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHash(stream_hash))) - - def do_delete(transaction, s_h): - transaction.execute("delete from live_streams where stream_hash = ?", (s_h,)) - transaction.execute("delete from live_stream_blobs where stream_hash = ?", (s_h,)) - transaction.execute("delete from live_stream_descriptors where stream_hash = ?", (s_h,)) - - d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) - return d - - @rerun_if_locked - def _store_stream(self, stream_hash, public_key, name, key, next_announce_time=None): - d = self.db_conn.runQuery("insert into live_streams values (?, ?, ?, ?, ?)", - (stream_hash, public_key, key, name, next_announce_time)) - - def check_duplicate(err): - if err.check(sqlite3.IntegrityError): - raise DuplicateStreamHashError(stream_hash) - return err - - d.addErrback(check_duplicate) - return d - - @rerun_if_locked - def _get_all_streams(self): - d = self.db_conn.runQuery("select stream_hash from live_streams") - d.addCallback(lambda results: [r[0] for r in results]) - return d - - @rerun_if_locked - def _get_stream_info(self, stream_hash): - d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHash(stream_hash))) - return d - - @rerun_if_locked - def _check_if_stream_exists(self, stream_hash): - d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) - d.addCallback(lambda r: True if len(r) else False) - return d - - @rerun_if_locked - def _get_streams_to_announce(self, next_announce_time): - - def get_and_update(transaction): - timestamp = time.time() - r = transaction.execute("select stream_hash from live_streams where" + - " (next_announce_time is null or next_announce_time < ?) " + - " and stream_hash is not null", (timestamp, )) - s_hs = [s_h for s_h, in r.fetchall()] - transaction.execute("update live_streams set next_announce_time = ? where " + - " (next_announce_time is null or next_announce_time < ?)", - (next_announce_time, timestamp)) - return s_hs - - return self.db_conn.runInteraction(get_and_update) - - @rerun_if_locked - def _get_blob_num_by_hash(self, stream_hash, blob_hash): - d = self.db_conn.runQuery("select position from live_stream_blobs where stream_hash = ? and blob_hash = ?", - (stream_hash, blob_hash)) - d.addCallback(lambda r: r[0][0] if len(r) else None) - return d - - @rerun_if_locked - def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): - params = [] - q_string = "select * from (" - q_string += " select blob_hash, position, revision, iv, length, signature from live_stream_blobs " - q_string += " where stream_hash = ? " - params.append(stream_hash) - if start_num is not None: - q_string += " and position > ? " - params.append(start_num) - if end_num is not None: - q_string += " and position < ? " - params.append(end_num) - q_string += " order by position " - if reverse is True: - q_string += " DESC " - if count is not None: - q_string += " limit ? " - params.append(count) - q_string += ") order by position" - # Order by position is done twice so that it always returns them from lowest position to - # greatest, but the limit by clause can select the 'count' greatest or 'count' least - return self.db_conn.runQuery(q_string, tuple(params)) - - @rerun_if_locked - def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False): - - def add_blobs(transaction): - for blob_info in blob_infos: - try: - transaction.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)", - (blob_info.blob_hash, stream_hash, blob_info.blob_num, - blob_info.revision, blob_info.iv, blob_info.length, - blob_info.signature)) - except sqlite3.IntegrityError: - if ignore_duplicate_error is False: - raise - - return self.db_conn.runInteraction(add_blobs) - - @rerun_if_locked - def _get_stream_of_blobhash(self, blob_hash): - d = self.db_conn.runQuery("select stream_hash from live_stream_blobs where blob_hash = ?", - (blob_hash,)) - d.addCallback(lambda r: r[0][0] if len(r) else None) - return d - - @rerun_if_locked - def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - return self.db_conn.runQuery("insert into live_stream_descriptors values (?, ?)", - (sd_blob_hash, stream_hash)) - - @rerun_if_locked - def _get_sd_blob_hashes_for_stream(self, stream_hash): - d = self.db_conn.runQuery("select sd_blob_hash from live_stream_descriptors where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda results: [r[0] for r in results]) - return d - - -class TempLiveStreamMetadataManager(DHTHashSupplier): - - def __init__(self, hash_announcer): - DHTHashSupplier.__init__(self, hash_announcer) - self.streams = {} - self.stream_blobs = {} - self.stream_desc = {} - - def setup(self): - return defer.succeed(True) - - def stop(self): - return defer.succeed(True) - - def get_all_streams(self): - return defer.succeed(self.streams.keys()) - - def save_stream(self, stream_hash, pub_key, file_name, key, blobs): - next_announce_time = time.time() + self.hash_reannounce_time - self.streams[stream_hash] = {'public_key': pub_key, 'stream_name': file_name, - 'key': key, 'next_announce_time': next_announce_time} - d = self.add_blobs_to_stream(stream_hash, blobs) - - def announce_have_stream(): - if self.hash_announcer is not None: - self.hash_announcer.immediate_announce([stream_hash]) - return stream_hash - - d.addCallback(lambda _: announce_have_stream()) - return d - - def get_stream_info(self, stream_hash): - if stream_hash in self.streams: - stream_info = self.streams[stream_hash] - return defer.succeed([stream_info['public_key'], stream_info['key'], stream_info['stream_name']]) - return defer.succeed(None) - - def delete_stream(self, stream_hash): - if stream_hash in self.streams: - del self.streams[stream_hash] - for (s_h, b_h) in self.stream_blobs.keys(): - if s_h == stream_hash: - del self.stream_blobs[(s_h, b_h)] - return defer.succeed(True) - - def add_blobs_to_stream(self, stream_hash, blobs): - assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known" - for blob in blobs: - info = {} - info['blob_num'] = blob.blob_num - info['length'] = blob.length - info['iv'] = blob.iv - info['revision'] = blob.revision - info['signature'] = blob.signature - self.stream_blobs[(stream_hash, blob.blob_hash)] = info - return defer.succeed(True) - - def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): - - if start_blob is not None: - start_num = self._get_blob_num_by_hash(stream_hash, start_blob) - else: - start_num = None - if end_blob is not None: - end_num = self._get_blob_num_by_hash(stream_hash, end_blob) - else: - end_num = None - return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse) - - def get_stream_of_blob(self, blob_hash): - for (s_h, b_h) in self.stream_blobs.iterkeys(): - if b_h == blob_hash: - return defer.succeed(s_h) - return defer.succeed(None) - - def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): - blob_infos = [] - for (s_h, b_h), info in self.stream_blobs.iteritems(): - if stream_hash == s_h: - position = info['blob_num'] - length = info['length'] - iv = info['iv'] - revision = info['revision'] - signature = info['signature'] - if (start_num is None) or (position > start_num): - if (end_num is None) or (position < end_num): - blob_infos.append((b_h, position, revision, iv, length, signature)) - blob_infos.sort(key=lambda i: i[1], reverse=reverse) - if count is not None: - blob_infos = blob_infos[:count] - return defer.succeed(blob_infos) - - def _get_blob_num_by_hash(self, stream_hash, blob_hash): - if (stream_hash, blob_hash) in self.stream_blobs: - return self.stream_blobs[(stream_hash, blob_hash)]['blob_num'] - - def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - self.stream_desc[sd_blob_hash] = stream_hash - return defer.succeed(True) - - def get_sd_blob_hashes_for_stream(self, stream_hash): - return defer.succeed([sd_hash for sd_hash, s_h in self.stream_desc.iteritems() if s_h == stream_hash]) - - def hashes_to_announce(self): - next_announce_time = time.time() + self.hash_reannounce_time - stream_hashes = [] - current_time = time.time() - for stream_hash, stream_info in self.streams.iteritems(): - announce_time = stream_info['announce_time'] - if announce_time < current_time: - self.streams[stream_hash]['announce_time'] = next_announce_time - stream_hashes.append(stream_hash) - return stream_hashes diff --git a/lbrynet/lbrylive/PaymentRateManager.py b/lbrynet/lbrylive/PaymentRateManager.py deleted file mode 100644 index 8b87ea6e3..000000000 --- a/lbrynet/lbrylive/PaymentRateManager.py +++ /dev/null @@ -1,53 +0,0 @@ -# pylint: skip-file -class BaseLiveStreamPaymentRateManager(object): - def __init__(self, blob_info_rate, blob_data_rate=None): - self.min_live_blob_info_payment_rate = blob_info_rate - self.min_blob_data_payment_rate = blob_data_rate - - -class LiveStreamPaymentRateManager(object): - def __init__(self, base_live_stream_payment_rate_manager, payment_rate_manager, - blob_info_rate=None, blob_data_rate=None): - self._base_live_stream_payment_rate_manager = base_live_stream_payment_rate_manager - self._payment_rate_manager = payment_rate_manager - self.min_live_blob_info_payment_rate = blob_info_rate - self.min_blob_data_payment_rate = blob_data_rate - self.points_paid = 0.0 - - def get_rate_live_blob_info(self, peer): - return self.get_effective_min_live_blob_info_payment_rate() - - def accept_rate_live_blob_info(self, peer, payment_rate): - return payment_rate >= self.get_effective_min_live_blob_info_payment_rate() - - def get_rate_blob_data(self, peer, blobs): - response = self._payment_rate_manager.strategy.make_offer(peer, blobs) - return response.rate - - def accept_rate_blob_data(self, peer, blobs, offer): - response = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs) - return response.accepted - - def reply_to_offer(self, peer, blobs, offer): - reply = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs) - self._payment_rate_manager.strategy.offer_accepted(peer, reply) - return reply - - def get_effective_min_blob_data_payment_rate(self): - rate = self.min_blob_data_payment_rate - if rate is None: - rate = self._payment_rate_manager.min_blob_data_payment_rate - if rate is None: - rate = self._base_live_stream_payment_rate_manager.min_blob_data_payment_rate - if rate is None: - rate = self._payment_rate_manager.get_effective_min_blob_data_payment_rate() - return rate - - def get_effective_min_live_blob_info_payment_rate(self): - rate = self.min_live_blob_info_payment_rate - if rate is None: - rate = self._base_live_stream_payment_rate_manager.min_live_blob_info_payment_rate - return rate - - def record_points_paid(self, amount): - self.points_paid += amount diff --git a/lbrynet/lbrylive/StdinUploader.py b/lbrynet/lbrylive/StdinUploader.py deleted file mode 100644 index 232871524..000000000 --- a/lbrynet/lbrylive/StdinUploader.py +++ /dev/null @@ -1,122 +0,0 @@ -# pylint: skip-file -# pylint: skip-file -# This file is not maintained, but might be used in the future -# -import logging -import sys -from lbrynet.lbrylive.LiveStreamCreator import StdOutLiveStreamCreator -from lbrynet.core.BlobManager import TempBlobManager -from lbrynet.core.Session import Session -from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory -from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory -from lbrynet.core.server.ServerProtocol import ServerProtocolFactory -from lbrynet.lbrylive.PaymentRateManager import BaseLiveStreamPaymentRateManager -from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager -from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory -from lbrynet.dht.node import Node -from twisted.internet import defer, task - - -class StdinUploader(): - """This class reads from standard in, creates a stream, and makes it available on the network.""" - def __init__(self, peer_port, dht_node_port, known_dht_nodes, - stream_info_manager_class=DBLiveStreamMetadataManager, blob_manager_class=TempBlobManager): - """ - @param peer_port: the network port on which to listen for peers - - @param dht_node_port: the network port on which to listen for nodes in the DHT - - @param known_dht_nodes: a list of (ip_address, dht_port) which will be used to join the DHT network - """ - self.peer_port = peer_port - self.lbry_server_port = None - self.session = Session(blob_manager_class=blob_manager_class, - stream_info_manager_class=stream_info_manager_class, - dht_node_class=Node, dht_node_port=dht_node_port, - known_dht_nodes=known_dht_nodes, peer_port=self.peer_port, - use_upnp=False) - self.payment_rate_manager = BaseLiveStreamPaymentRateManager() - - def start(self): - """Initialize the session and start listening on the peer port""" - d = self.session.setup() - d.addCallback(lambda _: self._start()) - - return d - - def _start(self): - self._start_server() - return True - - def _start_server(self): - query_handler_factories = [ - CryptBlobInfoQueryHandlerFactory(self.stream_info_manager, self.session.wallet, - self.payment_rate_manager), - BlobAvailabilityHandlerFactory(self.session.blob_manager), - BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet, - self.payment_rate_manager), - self.session.wallet.get_wallet_info_query_handler_factory() - ] - - self.server_factory = ServerProtocolFactory(self.session.rate_limiter, - query_handler_factories, - self.session.peer_manager) - from twisted.internet import reactor - self.lbry_server_port = reactor.listenTCP(self.peer_port, self.server_factory) - - def start_live_stream(self, stream_name): - """Create the stream and start reading from stdin - - @param stream_name: a string, the suggested name of this stream - """ - stream_creator_helper = StdOutLiveStreamCreator(stream_name, self.session.blob_manager, - self.stream_info_manager) - d = stream_creator_helper.create_and_publish_stream_descriptor() - - def print_sd_hash(sd_hash): - print "Stream descriptor hash:", sd_hash - - d.addCallback(print_sd_hash) - d.addCallback(lambda _: stream_creator_helper.start_streaming()) - return d - - def shut_down(self): - """End the session and stop listening on the server port""" - d = self.session.shut_down() - d.addCallback(lambda _: self._shut_down()) - return d - - def _shut_down(self): - if self.lbry_server_port is not None: - d = defer.maybeDeferred(self.lbry_server_port.stopListening) - else: - d = defer.succeed(True) - return d - - -def launch_stdin_uploader(): - - from twisted.internet import reactor - - logging.basicConfig(level=logging.WARNING, filename="ul.log") - if len(sys.argv) == 4: - uploader = StdinUploader(int(sys.argv[2]), int(sys.argv[3]), []) - elif len(sys.argv) == 6: - uploader = StdinUploader(int(sys.argv[2]), int(sys.argv[3]), [(sys.argv[4], int(sys.argv[5]))]) - else: - print "Usage: lbrynet-stdin-uploader " \ - " [ ]" - sys.exit(1) - - def start_stdin_uploader(): - return uploader.start_live_stream(sys.argv[1]) - - def shut_down(): - logging.debug("Telling the reactor to stop in 60 seconds") - reactor.callLater(60, reactor.stop) - - d = task.deferLater(reactor, 0, uploader.start) - d.addCallback(lambda _: start_stdin_uploader()) - d.addCallback(lambda _: shut_down()) - reactor.addSystemEventTrigger('before', 'shutdown', uploader.shut_down) - reactor.run() diff --git a/lbrynet/lbrylive/StdoutDownloader.py b/lbrynet/lbrylive/StdoutDownloader.py deleted file mode 100644 index 86aad2f7c..000000000 --- a/lbrynet/lbrylive/StdoutDownloader.py +++ /dev/null @@ -1,102 +0,0 @@ -# pylint: skip-file -# pylint: skip-file -# This file is not maintained, but might be used in the future -# -import logging -import sys - -from lbrynet.lbrylive.client.LiveStreamDownloader import LiveStreamDownloader -from lbrynet.core.BlobManager import TempBlobManager -from lbrynet.core.Session import Session -from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader -from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader -from lbrynet.lbrylive.PaymentRateManager import BaseLiveStreamPaymentRateManager -from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager -from lbrynet.lbrylive.StreamDescriptor import save_sd_info -from lbrynet.dht.node import Node -from twisted.internet import task - - -class StdoutDownloader(): - """This class downloads a live stream from the network and outputs it to standard out.""" - def __init__(self, dht_node_port, known_dht_nodes, - stream_info_manager_class=DBLiveStreamMetadataManager, blob_manager_class=TempBlobManager): - """ - @param dht_node_port: the network port on which to listen for DHT node requests - - @param known_dht_nodes: a list of (ip_address, dht_port) which will be used to join the DHT network - - """ - - self.session = Session(blob_manager_class=blob_manager_class, - stream_info_manager_class=stream_info_manager_class, - dht_node_class=Node, dht_node_port=dht_node_port, known_dht_nodes=known_dht_nodes, - use_upnp=False) - self.payment_rate_manager = BaseLiveStreamPaymentRateManager() - - def start(self): - """Initialize the session""" - d = self.session.setup() - return d - - def read_sd_file(self, sd_blob): - reader = BlobStreamDescriptorReader(sd_blob) - return save_sd_info(self.stream_info_manager, reader, ignore_duplicate=True) - - def download_sd_file_from_hash(self, sd_hash): - downloader = StandaloneBlobDownloader(sd_hash, self.session.blob_manager, - self.session.peer_finder, self.session.rate_limiter, - self.session.wallet) - d = downloader.download() - return d - - def start_download(self, sd_hash): - """Start downloading the stream from the network and outputting it to standard out""" - d = self.download_sd_file_from_hash(sd_hash) - d.addCallbacks(self.read_sd_file) - - def start_stream(stream_hash): - consumer = LiveStreamDownloader(stream_hash, self.session.peer_finder, - self.session.rate_limiter, self.session.blob_manager, - self.stream_info_manager, self.payment_rate_manager, - self.session.wallet) - return consumer.start() - - d.addCallback(start_stream) - return d - - def shut_down(self): - """End the session""" - d = self.session.shut_down() - return d - - -def launch_stdout_downloader(): - - from twisted.internet import reactor - - logging.basicConfig(level=logging.WARNING, filename="dl.log") - if len(sys.argv) == 3: - downloader = StdoutDownloader(int(sys.argv[2]), []) - elif len(sys.argv) == 5: - downloader = StdoutDownloader(int(sys.argv[2]), [(sys.argv[3], int(sys.argv[4]))]) - else: - print "Usage: lbrynet-stdout-downloader " \ - " [ ]" - sys.exit(1) - - def start_stdout_downloader(): - return downloader.start_download(sys.argv[1]) - - def print_error(err): - logging.warning(err.getErrorMessage()) - - def shut_down(): - reactor.stop() - - d = task.deferLater(reactor, 0, downloader.start) - d.addCallback(lambda _: start_stdout_downloader()) - d.addErrback(print_error) - d.addCallback(lambda _: shut_down()) - reactor.addSystemEventTrigger('before', 'shutdown', downloader.shut_down) - reactor.run() diff --git a/lbrynet/lbrylive/StreamDescriptor.py b/lbrynet/lbrylive/StreamDescriptor.py deleted file mode 100644 index 1977c263c..000000000 --- a/lbrynet/lbrylive/StreamDescriptor.py +++ /dev/null @@ -1,138 +0,0 @@ -# pylint: skip-file -import binascii -import logging -from lbrynet.core.cryptoutils import get_lbry_hash_obj, verify_signature -from twisted.internet import defer, threads -from lbrynet.core.Error import DuplicateStreamHashError, InvalidStreamDescriptorError -from lbrynet.lbrylive.LiveBlob import LiveBlobInfo -from lbrynet.interfaces import IStreamDescriptorValidator -from zope.interface import implements - - -log = logging.getLogger(__name__) - - -LiveStreamType = "lbrylive" - - -def save_sd_info(stream_info_manager, sd_info, ignore_duplicate=False): - log.debug("Saving info for %s", str(sd_info['stream_name'])) - hex_stream_name = sd_info['stream_name'] - public_key = sd_info['public_key'] - key = sd_info['key'] - stream_hash = sd_info['stream_hash'] - raw_blobs = sd_info['blobs'] - crypt_blobs = [] - for blob in raw_blobs: - length = blob['length'] - if length != 0: - blob_hash = blob['blob_hash'] - else: - blob_hash = None - blob_num = blob['blob_num'] - revision = blob['revision'] - iv = blob['iv'] - signature = blob['signature'] - crypt_blobs.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature)) - log.debug("Trying to save stream info for %s", str(hex_stream_name)) - d = stream_info_manager.save_stream(stream_hash, public_key, hex_stream_name, - key, crypt_blobs) - - def check_if_duplicate(err): - if ignore_duplicate is True: - err.trap(DuplicateStreamHashError) - - d.addErrback(check_if_duplicate) - - d.addCallback(lambda _: stream_hash) - return d - - -def get_sd_info(stream_info_manager, stream_hash, include_blobs): - d = stream_info_manager.get_stream_info(stream_hash) - - def format_info(stream_info): - fields = {} - fields['stream_type'] = LiveStreamType - fields['stream_name'] = stream_info[2] - fields['public_key'] = stream_info[0] - fields['key'] = stream_info[1] - fields['stream_hash'] = stream_hash - - def format_blobs(blobs): - formatted_blobs = [] - for blob_hash, blob_num, revision, iv, length, signature in blobs: - blob = {} - if length != 0: - blob['blob_hash'] = blob_hash - blob['blob_num'] = blob_num - blob['revision'] = revision - blob['iv'] = iv - blob['length'] = length - blob['signature'] = signature - formatted_blobs.append(blob) - fields['blobs'] = formatted_blobs - return fields - - if include_blobs is True: - d = stream_info_manager.get_blobs_for_stream(stream_hash) - else: - d = defer.succeed([]) - d.addCallback(format_blobs) - return d - - d.addCallback(format_info) - return d - - -class LiveStreamDescriptorValidator(object): - implements(IStreamDescriptorValidator) - - def __init__(self, raw_info): - self.raw_info = raw_info - - def validate(self): - log.debug("Trying to validate stream descriptor for %s", str(self.raw_info['stream_name'])) - hex_stream_name = self.raw_info['stream_name'] - public_key = self.raw_info['public_key'] - key = self.raw_info['key'] - stream_hash = self.raw_info['stream_hash'] - h = get_lbry_hash_obj() - h.update(hex_stream_name) - h.update(public_key) - h.update(key) - if h.hexdigest() != stream_hash: - raise InvalidStreamDescriptorError("Stream hash does not match stream metadata") - blobs = self.raw_info['blobs'] - - def check_blob_signatures(): - for blob in blobs: - length = blob['length'] - if length != 0: - blob_hash = blob['blob_hash'] - else: - blob_hash = None - blob_num = blob['blob_num'] - revision = blob['revision'] - iv = blob['iv'] - signature = blob['signature'] - hashsum = get_lbry_hash_obj() - hashsum.update(stream_hash) - if length != 0: - hashsum.update(blob_hash) - hashsum.update(str(blob_num)) - hashsum.update(str(revision)) - hashsum.update(iv) - hashsum.update(str(length)) - if not verify_signature(hashsum.digest(), signature, public_key): - raise InvalidStreamDescriptorError("Invalid signature in stream descriptor") - - return threads.deferToThread(check_blob_signatures) - - def info_to_show(self): - info = [] - info.append(("stream_name", binascii.unhexlify(self.raw_info.get("stream_name")))) - return info - - def get_length_of_stream(self): - return None diff --git a/lbrynet/lbrylive/__init__.py b/lbrynet/lbrylive/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lbrynet/lbrylive/client/LiveStreamDownloader.py b/lbrynet/lbrylive/client/LiveStreamDownloader.py deleted file mode 100644 index a59eff1cf..000000000 --- a/lbrynet/lbrylive/client/LiveStreamDownloader.py +++ /dev/null @@ -1,180 +0,0 @@ -# pylint: skip-file -import binascii -from lbrynet.core.StreamDescriptor import StreamMetadata -from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader -from zope.interface import implements -from lbrynet.lbrylive.client.LiveStreamMetadataHandler import LiveStreamMetadataHandler -from lbrynet.lbrylive.client.LiveStreamProgressManager import LiveStreamProgressManager -import os -from lbrynet.lbrylive.StreamDescriptor import save_sd_info -from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager -from twisted.internet import defer, threads # , process -from lbrynet.interfaces import IStreamDownloaderFactory -from lbrynet.lbrylive.StreamDescriptor import LiveStreamType - - -class _LiveStreamDownloader(CryptStreamDownloader): - - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet): - CryptStreamDownloader.__init__(self, peer_finder, rate_limiter, blob_manager, - payment_rate_manager, wallet) - self.stream_hash = stream_hash - self.stream_info_manager = stream_info_manager - self.public_key = None - - def set_stream_info(self): - if self.public_key is None and self.key is None: - - d = self.stream_info_manager.get_stream_info(self.stream_hash) - - def set_stream_info(stream_info): - public_key, key, stream_name = stream_info - self.public_key = public_key - self.key = binascii.unhexlify(key) - self.stream_name = binascii.unhexlify(stream_name) - - d.addCallback(set_stream_info) - return d - else: - return defer.succeed(True) - - -class LiveStreamDownloader(_LiveStreamDownloader): - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet): - _LiveStreamDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, - stream_info_manager, payment_rate_manager, wallet) - - - def _get_metadata_handler(self, download_manager): - return LiveStreamMetadataHandler(self.stream_hash, self.stream_info_manager, - self.peer_finder, self.public_key, False, - self.payment_rate_manager, self.wallet, download_manager, 10) - - def _get_progress_manager(self, download_manager): - return LiveStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager, - delete_blob_after_finished=True, download_whole=False, - max_before_skip_ahead=10) - - def _get_write_func(self): - def write_func(data): - if self.stopped is False: - pass - return write_func - - -class FullLiveStreamDownloader(_LiveStreamDownloader): - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet): - _LiveStreamDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, - blob_manager, stream_info_manager, payment_rate_manager, - wallet) - self.file_handle = None - self.file_name = None - - def set_stream_info(self): - d = _LiveStreamDownloader.set_stream_info(self) - - def set_file_name_if_unset(): - if not self.file_name: - if not self.stream_name: - self.stream_name = "_" - self.file_name = os.path.basename(self.stream_name) - - d.addCallback(lambda _: set_file_name_if_unset()) - return d - - def stop(self, err=None): - d = self._close_file() - d.addBoth(lambda _: _LiveStreamDownloader.stop(self, err)) - return d - - def _start(self): - if self.file_handle is None: - d = self._open_file() - else: - d = defer.succeed(True) - d.addCallback(lambda _: _LiveStreamDownloader._start(self)) - return d - - def _open_file(self): - def open_file(): - self.file_handle = open(self.file_name, 'wb') - return threads.deferToThread(open_file) - - def _get_metadata_handler(self, download_manager): - return LiveStreamMetadataHandler(self.stream_hash, self.stream_info_manager, - self.peer_finder, self.public_key, True, - self.payment_rate_manager, self.wallet, download_manager) - - def _get_primary_request_creators(self, download_manager): - return [download_manager.blob_requester, download_manager.blob_info_finder] - - def _get_write_func(self): - def write_func(data): - if self.stopped is False: - self.file_handle.write(data) - return write_func - - def _close_file(self): - def close_file(): - if self.file_handle is not None: - self.file_handle.close() - self.file_handle = None - return threads.deferToThread(close_file) - - -class FullLiveStreamDownloaderFactory(object): - - implements(IStreamDownloaderFactory) - - def __init__(self, peer_finder, rate_limiter, blob_manager, stream_info_manager, wallet, - default_payment_rate_manager): - self.peer_finder = peer_finder - self.rate_limiter = rate_limiter - self.blob_manager = blob_manager - self.stream_info_manager = stream_info_manager - self.wallet = wallet - self.default_payment_rate_manager = default_payment_rate_manager - - def can_download(self, sd_validator): - return True - - def make_downloader(self, metadata, options, payment_rate_manager): - # TODO: check options for payment rate manager parameters - prm = LiveStreamPaymentRateManager(self.default_payment_rate_manager, - payment_rate_manager) - - def save_source_if_blob(stream_hash): - if metadata.metadata_source == StreamMetadata.FROM_BLOB: - d = self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash) - else: - d = defer.succeed(True) - d.addCallback(lambda _: stream_hash) - return d - - d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info) - d.addCallback(save_source_if_blob) - - def create_downloader(stream_hash): - stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter, - self.blob_manager, self.stream_info_manager, - prm, self.wallet) - d = stream_downloader.set_stream_info() - d.addCallback(lambda _: stream_downloader) - return d - - d.addCallback(create_downloader) - return d - - -def add_full_live_stream_downloader_to_sd_identifier(session, stream_info_manager, sd_identifier, - base_live_stream_payment_rate_manager): - downloader_factory = FullLiveStreamDownloaderFactory(session.peer_finder, - session.rate_limiter, - session.blob_manager, - stream_info_manager, - session.wallet, - base_live_stream_payment_rate_manager) - sd_identifier.add_stream_downloader_factory(LiveStreamType, downloader_factory) diff --git a/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py b/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py deleted file mode 100644 index 921c27dc7..000000000 --- a/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py +++ /dev/null @@ -1,347 +0,0 @@ -# pylint: skip-file -from collections import defaultdict -import logging -from zope.interface import implements -from twisted.internet import defer -from twisted.python.failure import Failure -from lbrynet import conf -from lbrynet.core.client.ClientRequest import ClientRequest, ClientPaidRequest -from lbrynet.lbrylive.LiveBlob import LiveBlobInfo -from lbrynet.core.cryptoutils import get_lbry_hash_obj, verify_signature -from lbrynet.interfaces import IRequestCreator, IMetadataHandler -from lbrynet.core.Error import InsufficientFundsError, InvalidResponseError, RequestCanceledError -from lbrynet.core.Error import NoResponseError, ConnectionClosedBeforeResponseError - - -log = logging.getLogger(__name__) - - -class LiveStreamMetadataHandler(object): - implements(IRequestCreator, IMetadataHandler) - - def __init__(self, stream_hash, stream_info_manager, peer_finder, stream_pub_key, download_whole, - payment_rate_manager, wallet, download_manager, max_before_skip_ahead=None): - self.stream_hash = stream_hash - self.stream_info_manager = stream_info_manager - self.payment_rate_manager = payment_rate_manager - self.wallet = wallet - self.peer_finder = peer_finder - self.stream_pub_key = stream_pub_key - self.download_whole = download_whole - self.max_before_skip_ahead = max_before_skip_ahead - if self.download_whole is False: - assert self.max_before_skip_ahead is not None, \ - "If download whole is False, max_before_skip_ahead must be set" - self.download_manager = download_manager - self._peers = defaultdict(int) # {Peer: score} - self._protocol_prices = {} - self._final_blob_num = None - self._price_disagreements = [] # [Peer] - self._incompatible_peers = [] # [Peer] - - ######### IMetadataHandler ######### - - def get_initial_blobs(self): - d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - d.addCallback(self._format_initial_blobs_for_download_manager) - return d - - def final_blob_num(self): - return self._final_blob_num - - ######## IRequestCreator ######### - - def send_next_request(self, peer, protocol): - if self._finished_discovery() is False and self._should_send_request_to(peer) is True: - p_r = None - if not self._price_settled(protocol): - p_r = self._get_price_request(peer, protocol) - d_r = self._get_discover_request(peer) - reserved_points = self._reserve_points(peer, protocol, d_r.max_pay_units) - if reserved_points is not None: - d1 = protocol.add_request(d_r) - d1.addCallback(self._handle_discover_response, peer, d_r) - d1.addBoth(self._pay_or_cancel_payment, protocol, reserved_points) - d1.addErrback(self._request_failed, peer) - if p_r is not None: - d2 = protocol.add_request(p_r) - d2.addCallback(self._handle_price_response, peer, p_r, protocol) - d2.addErrback(self._request_failed, peer) - return defer.succeed(True) - else: - return defer.fail(InsufficientFundsError()) - return defer.succeed(False) - - def get_new_peers(self): - d = self._get_hash_for_peer_search() - d.addCallback(self._find_peers_for_hash) - return d - - ######### internal calls ######### - - def _get_hash_for_peer_search(self): - r = None - if self._finished_discovery() is False: - r = self.stream_hash - log.debug("Info finder peer search response for stream %s: %s", str(self.stream_hash), str(r)) - return defer.succeed(r) - - def _find_peers_for_hash(self, h): - if h is None: - return None - else: - d = self.peer_finder.find_peers_for_blob(h) - - def choose_best_peers(peers): - bad_peers = self._get_bad_peers() - return [p for p in peers if not p in bad_peers] - - d.addCallback(choose_best_peers) - return d - - def _format_initial_blobs_for_download_manager(self, blob_infos): - infos = [] - for blob_hash, blob_num, revision, iv, length, signature in blob_infos: - if blob_hash is not None: - infos.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature)) - else: - log.debug("Setting _final_blob_num to %s", str(blob_num - 1)) - self._final_blob_num = blob_num - 1 - return infos - - def _should_send_request_to(self, peer): - if self._peers[peer] < -5.0: - return False - if peer in self._price_disagreements: - return False - return True - - def _get_bad_peers(self): - return [p for p in self._peers.iterkeys() if not self._should_send_request_to(p)] - - def _finished_discovery(self): - if self._get_discovery_params() is None: - return True - return False - - def _get_discover_request(self, peer): - discovery_params = self._get_discovery_params() - if discovery_params: - further_blobs_request = {} - reference, start, end, count = discovery_params - further_blobs_request['reference'] = reference - if start is not None: - further_blobs_request['start'] = start - if end is not None: - further_blobs_request['end'] = end - if count is not None: - further_blobs_request['count'] = count - else: - further_blobs_request['count'] = conf.settings['MAX_BLOB_INFOS_TO_REQUEST'] - log.debug("Requesting %s blob infos from %s", str(further_blobs_request['count']), str(peer)) - r_dict = {'further_blobs': further_blobs_request} - response_identifier = 'further_blobs' - request = ClientPaidRequest(r_dict, response_identifier, further_blobs_request['count']) - return request - return None - - def _get_discovery_params(self): - log.debug("In _get_discovery_params") - stream_position = self.download_manager.stream_position() - blobs = self.download_manager.blobs - if blobs: - last_blob_num = max(blobs.iterkeys()) - else: - last_blob_num = -1 - final_blob_num = self.final_blob_num() - if final_blob_num is not None: - last_blob_num = final_blob_num - if self.download_whole is False: - log.debug("download_whole is False") - if final_blob_num is not None: - for i in xrange(stream_position, final_blob_num + 1): - if not i in blobs: - count = min(self.max_before_skip_ahead, (final_blob_num - i + 1)) - return self.stream_hash, None, 'end', count - return None - else: - if blobs: - for i in xrange(stream_position, last_blob_num + 1): - if not i in blobs: - if i == 0: - return self.stream_hash, 'beginning', 'end', -1 * self.max_before_skip_ahead - else: - return self.stream_hash, blobs[i-1].blob_hash, 'end', -1 * self.max_before_skip_ahead - return self.stream_hash, blobs[last_blob_num].blob_hash, 'end', -1 * self.max_before_skip_ahead - else: - return self.stream_hash, None, 'end', -1 * self.max_before_skip_ahead - log.debug("download_whole is True") - beginning = None - end = None - for i in xrange(stream_position, last_blob_num + 1): - if not i in blobs: - if beginning is None: - if i == 0: - beginning = 'beginning' - else: - beginning = blobs[i-1].blob_hash - else: - if beginning is not None: - end = blobs[i].blob_hash - break - if beginning is None: - if final_blob_num is not None: - log.debug("Discovery is finished. stream_position: %s, last_blob_num + 1: %s", str(stream_position), - str(last_blob_num + 1)) - return None - else: - log.debug("Discovery is not finished. final blob num is unknown.") - if last_blob_num != -1: - return self.stream_hash, blobs[last_blob_num].blob_hash, None, None - else: - return self.stream_hash, 'beginning', None, None - else: - log.info("Discovery is not finished. Not all blobs are known.") - return self.stream_hash, beginning, end, None - - def _price_settled(self, protocol): - if protocol in self._protocol_prices: - return True - return False - - def _update_local_score(self, peer, amount): - self._peers[peer] += amount - - def _reserve_points(self, peer, protocol, max_infos): - assert protocol in self._protocol_prices - point_amount = 1.0 * max_infos * self._protocol_prices[protocol] / 1000.0 - return self.wallet.reserve_points(peer, point_amount) - - def _pay_or_cancel_payment(self, arg, protocol, reserved_points): - if isinstance(arg, Failure) or arg == 0: - self._cancel_points(reserved_points) - else: - self._pay_peer(protocol, arg, reserved_points) - return arg - - def _pay_peer(self, protocol, num_infos, reserved_points): - assert num_infos != 0 - assert protocol in self._protocol_prices - point_amount = 1.0 * num_infos * self._protocol_prices[protocol] / 1000.0 - self.wallet.send_points(reserved_points, point_amount) - self.payment_rate_manager.record_points_paid(point_amount) - - def _cancel_points(self, reserved_points): - return self.wallet.cancel_point_reservation(reserved_points) - - def _get_price_request(self, peer, protocol): - self._protocol_prices[protocol] = self.payment_rate_manager.get_rate_live_blob_info(peer) - request_dict = {'blob_info_payment_rate': self._protocol_prices[protocol]} - request = ClientRequest(request_dict, 'blob_info_payment_rate') - return request - - def _handle_price_response(self, response_dict, peer, request, protocol): - if not request.response_identifier in response_dict: - return InvalidResponseError("response identifier not in response") - assert protocol in self._protocol_prices - response = response_dict[request.response_identifier] - if response == "RATE_ACCEPTED": - return True - else: - log.info("Rate offer has been rejected by %s", str(peer)) - del self._protocol_prices[protocol] - self._price_disagreements.append(peer) - return True - - def _handle_discover_response(self, response_dict, peer, request): - if not request.response_identifier in response_dict: - return InvalidResponseError("response identifier not in response") - response = response_dict[request.response_identifier] - blob_infos = [] - if 'error' in response: - if response['error'] == 'RATE_UNSET': - return defer.succeed(0) - else: - return InvalidResponseError("Got an unknown error from the peer: %s" % - (response['error'],)) - if not 'blob_infos' in response: - return InvalidResponseError("Missing the required field 'blob_infos'") - raw_blob_infos = response['blob_infos'] - log.info("Handling %s further blobs from %s", str(len(raw_blob_infos)), str(peer)) - log.debug("blobs: %s", str(raw_blob_infos)) - for raw_blob_info in raw_blob_infos: - length = raw_blob_info['length'] - if length != 0: - blob_hash = raw_blob_info['blob_hash'] - else: - blob_hash = None - num = raw_blob_info['blob_num'] - revision = raw_blob_info['revision'] - iv = raw_blob_info['iv'] - signature = raw_blob_info['signature'] - blob_info = LiveBlobInfo(blob_hash, num, length, iv, revision, signature) - log.debug("Learned about a potential blob: %s", str(blob_hash)) - if self._verify_blob(blob_info): - if blob_hash is None: - log.info("Setting _final_blob_num to %s", str(num - 1)) - self._final_blob_num = num - 1 - else: - blob_infos.append(blob_info) - else: - raise ValueError("Peer sent an invalid blob info") - d = self.stream_info_manager.add_blobs_to_stream(self.stream_hash, blob_infos) - - def add_blobs_to_download_manager(): - blob_nums = [b.blob_num for b in blob_infos] - log.info("Adding the following blob nums to the download manager: %s", str(blob_nums)) - self.download_manager.add_blobs_to_download(blob_infos) - - d.addCallback(lambda _: add_blobs_to_download_manager()) - - def pay_or_penalize_peer(): - if len(blob_infos): - self._update_local_score(peer, len(blob_infos)) - peer.update_stats('downloaded_crypt_blob_infos', len(blob_infos)) - peer.update_score(len(blob_infos)) - else: - self._update_local_score(peer, -.0001) - return len(blob_infos) - - d.addCallback(lambda _: pay_or_penalize_peer()) - - return d - - def _verify_blob(self, blob): - log.debug("Got an unverified blob to check:") - log.debug("blob_hash: %s", blob.blob_hash) - log.debug("blob_num: %s", str(blob.blob_num)) - log.debug("revision: %s", str(blob.revision)) - log.debug("iv: %s", blob.iv) - log.debug("length: %s", str(blob.length)) - hashsum = get_lbry_hash_obj() - hashsum.update(self.stream_hash) - if blob.length != 0: - hashsum.update(blob.blob_hash) - hashsum.update(str(blob.blob_num)) - hashsum.update(str(blob.revision)) - hashsum.update(blob.iv) - hashsum.update(str(blob.length)) - log.debug("hexdigest to be verified: %s", hashsum.hexdigest()) - if verify_signature(hashsum.digest(), blob.signature, self.stream_pub_key): - log.debug("Blob info is valid") - return True - else: - log.debug("The blob info is invalid") - return False - - def _request_failed(self, reason, peer): - if reason.check(RequestCanceledError): - return - if reason.check(NoResponseError): - self._incompatible_peers.append(peer) - log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage()) - self._update_local_score(peer, -5.0) - peer.update_score(-10.0) - if reason.check(ConnectionClosedBeforeResponseError): - return - return reason diff --git a/lbrynet/lbrylive/client/LiveStreamOptions.py b/lbrynet/lbrylive/client/LiveStreamOptions.py deleted file mode 100644 index 21961c746..000000000 --- a/lbrynet/lbrylive/client/LiveStreamOptions.py +++ /dev/null @@ -1,74 +0,0 @@ -# pylint: skip-file -from lbrynet.lbrylive.StreamDescriptor import LiveStreamType, LiveStreamDescriptorValidator -from lbrynet.core.DownloadOption import DownloadOption, DownloadOptionChoice - - -def add_live_stream_to_sd_identifier(sd_identifier, base_live_stream_payment_rate_manager): - sd_identifier.add_stream_type(LiveStreamType, LiveStreamDescriptorValidator, - LiveStreamOptions(base_live_stream_payment_rate_manager)) - - -class LiveStreamOptions(object): - def __init__(self, base_live_stream_payment_rate_manager): - self.base_live_stream_prm = base_live_stream_payment_rate_manager - - def get_downloader_options(self, sd_validator, payment_rate_manager): - prm = payment_rate_manager - - def get_default_data_rate_description(): - if prm.min_blob_data_payment_rate is None: - return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate) - else: - return "%f LBC/MB" % prm.min_blob_data_payment_rate - - options = [ - DownloadOption( - [ - DownloadOptionChoice(None, - "No change", - "No change"), - DownloadOptionChoice(None, - "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate), - "Default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)), - DownloadOptionChoice(float, - "Rate in LBC/MB", - "Rate in LBC/MB") - ], - "rate which will be paid for data", - "data payment rate", - prm.min_blob_data_payment_rate, - get_default_data_rate_description() - ), - DownloadOption( - [ - DownloadOptionChoice(None, - "No change", - "No change"), - DownloadOptionChoice(None, - "Application default (%s LBC/MB)" % str(self.base_live_stream_prm.min_live_blob_info_payment_rate), - "Default (%s LBC/MB)" % str(self.base_live_stream_prm.min_live_blob_info_payment_rate)), - DownloadOptionChoice(float, - "Rate in LBC/MB", - "Rate in LBC/MB") - ], - "rate which will be paid for metadata", - "metadata payment rate", - None, - "Application default (%s LBC/MB)" % str(self.base_live_stream_prm.min_live_blob_info_payment_rate) - ), - DownloadOption( - [ - DownloadOptionChoice(True, - "Allow reuploading data downloaded for this file", - "Allow reuploading"), - DownloadOptionChoice(False, - "Disallow reuploading data downloaded for this file", - "Disallow reuploading") - ], - "allow reuploading data downloaded for this file", - "allow upload", - True, - "Allow" - ), - ] - return options diff --git a/lbrynet/lbrylive/client/LiveStreamProgressManager.py b/lbrynet/lbrylive/client/LiveStreamProgressManager.py deleted file mode 100644 index 2869ada61..000000000 --- a/lbrynet/lbrylive/client/LiveStreamProgressManager.py +++ /dev/null @@ -1,91 +0,0 @@ -# pylint: skip-file -import logging -from lbrynet.core.client.StreamProgressManager import StreamProgressManager -from twisted.internet import defer - - -log = logging.getLogger(__name__) - - -class LiveStreamProgressManager(StreamProgressManager): - def __init__(self, finished_callback, blob_manager, download_manager, delete_blob_after_finished=False, - download_whole=True, max_before_skip_ahead=5): - self.download_whole = download_whole - self.max_before_skip_ahead = max_before_skip_ahead - StreamProgressManager.__init__(self, finished_callback, blob_manager, download_manager, - delete_blob_after_finished) - - ######### IProgressManager ######### - - def stream_position(self): - blobs = self.download_manager.blobs - if not blobs: - return 0 - else: - newest_known_blobnum = max(blobs.iterkeys()) - position = newest_known_blobnum - oldest_relevant_blob_num = (max(0, newest_known_blobnum - self.max_before_skip_ahead + 1)) - for i in xrange(newest_known_blobnum, oldest_relevant_blob_num - 1, -1): - if i in blobs and (not blobs[i].is_validated() and not i in self.provided_blob_nums): - position = i - return position - - def needed_blobs(self): - blobs = self.download_manager.blobs - stream_position = self.stream_position() - if blobs: - newest_known_blobnum = max(blobs.iterkeys()) - else: - newest_known_blobnum = -1 - blobs_needed = [] - for i in xrange(stream_position, newest_known_blobnum + 1): - if i in blobs and not blobs[i].is_validated() and not i in self.provided_blob_nums: - blobs_needed.append(blobs[i]) - return blobs_needed - - ######### internal ######### - - def _output_loop(self): - - from twisted.internet import reactor - - if self.stopped is True: - if self.outputting_d is not None: - self.outputting_d.callback(True) - self.outputting_d = None - return - - blobs = self.download_manager.blobs - log.info("In _output_loop. last_blob_outputted: %s", str(self.last_blob_outputted)) - if blobs: - log.debug("Newest blob number: %s", str(max(blobs.iterkeys()))) - if self.outputting_d is None: - self.outputting_d = defer.Deferred() - - current_blob_num = self.last_blob_outputted + 1 - - def finished_outputting_blob(): - self.last_blob_outputted += 1 - final_blob_num = self.download_manager.final_blob_num() - if final_blob_num is not None and final_blob_num == self.last_blob_outputted: - self._finished_outputting() - self.outputting_d.callback(True) - self.outputting_d = None - else: - reactor.callLater(0, self._output_loop) - - if current_blob_num in blobs and blobs[current_blob_num].is_validated(): - log.info("Outputting blob %s", str(current_blob_num)) - self.provided_blob_nums.append(current_blob_num) - d = self.download_manager.handle_blob(current_blob_num) - d.addCallback(lambda _: finished_outputting_blob()) - d.addCallback(lambda _: self._finished_with_blob(current_blob_num)) - elif blobs and max(blobs.iterkeys()) > self.last_blob_outputted + self.max_before_skip_ahead - 1: - self.last_blob_outputted += 1 - log.info("Skipping blob number %s due to knowing about blob number %s", - str(self.last_blob_outputted), str(max(blobs.iterkeys()))) - self._finished_with_blob(current_blob_num) - reactor.callLater(0, self._output_loop) - else: - self.outputting_d.callback(True) - self.outputting_d = None diff --git a/lbrynet/lbrylive/client/__init__.py b/lbrynet/lbrylive/client/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lbrynet/lbrylive/server/LiveBlobInfoQueryHandler.py b/lbrynet/lbrylive/server/LiveBlobInfoQueryHandler.py deleted file mode 100644 index 2fce90b6b..000000000 --- a/lbrynet/lbrylive/server/LiveBlobInfoQueryHandler.py +++ /dev/null @@ -1,184 +0,0 @@ -# pylint: skip-file -import logging -from twisted.internet import defer -from zope.interface import implements -from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler - - -log = logging.getLogger(__name__) - - -class CryptBlobInfoQueryHandlerFactory(object): - implements(IQueryHandlerFactory) - - def __init__(self, stream_info_manager, wallet, payment_rate_manager): - self.stream_info_manager = stream_info_manager - self.wallet = wallet - self.payment_rate_manager = payment_rate_manager - - ######### IQueryHandlerFactory ######### - - def build_query_handler(self): - q_h = CryptBlobInfoQueryHandler(self.stream_info_manager, self.wallet, self.payment_rate_manager) - return q_h - - def get_primary_query_identifier(self): - return 'further_blobs' - - def get_description(self): - return ("Stream Blob Information - blob hashes that are associated with streams," - " and the blobs' associated metadata") - - -class CryptBlobInfoQueryHandler(object): - implements(IQueryHandler) - - def __init__(self, stream_info_manager, wallet, payment_rate_manager): - self.stream_info_manager = stream_info_manager - self.wallet = wallet - self.payment_rate_manager = payment_rate_manager - self.query_identifiers = ['blob_info_payment_rate', 'further_blobs'] - self.blob_info_payment_rate = None - self.peer = None - - ######### IQueryHandler ######### - - def register_with_request_handler(self, request_handler, peer): - self.peer = peer - request_handler.register_query_handler(self, self.query_identifiers) - - def handle_queries(self, queries): - response = {} - - if self.query_identifiers[0] in queries: - if not self.handle_blob_info_payment_rate(queries[self.query_identifiers[0]]): - return defer.succeed({'blob_info_payment_rate': 'RATE_TOO_LOW'}) - else: - response['blob_info_payment_rate'] = "RATE_ACCEPTED" - - if self.query_identifiers[1] in queries: - further_blobs_request = queries[self.query_identifiers[1]] - log.debug("Received the client's request for additional blob information") - - if self.blob_info_payment_rate is None: - response['further_blobs'] = {'error': 'RATE_UNSET'} - return defer.succeed(response) - - def count_and_charge(blob_infos): - if len(blob_infos) != 0: - log.info("Responding with %s infos", str(len(blob_infos))) - expected_payment = 1.0 * len(blob_infos) * self.blob_info_payment_rate / 1000.0 - self.wallet.add_expected_payment(self.peer, expected_payment) - self.peer.update_stats('uploaded_crypt_blob_infos', len(blob_infos)) - return blob_infos - - def set_field(further_blobs): - response['further_blobs'] = {'blob_infos': further_blobs} - return response - - def get_further_blobs(stream_hash): - if stream_hash is None: - response['further_blobs'] = {'error': 'REFERENCE_HASH_UNKNOWN'} - return defer.succeed(response) - start = further_blobs_request.get("start") - end = further_blobs_request.get("end") - count = further_blobs_request.get("count") - if count is not None: - try: - count = int(count) - except ValueError: - response['further_blobs'] = {'error': 'COUNT_NON_INTEGER'} - return defer.succeed(response) - - if len([x for x in [start, end, count] if x is not None]) < 2: - response['further_blobs'] = {'error': 'TOO_FEW_PARAMETERS'} - return defer.succeed(response) - - inner_d = self.get_further_blobs(stream_hash, start, end, count) - - inner_d.addCallback(count_and_charge) - inner_d.addCallback(self.format_blob_infos) - inner_d.addCallback(set_field) - return inner_d - - if 'reference' in further_blobs_request: - d = self.get_stream_hash_from_reference(further_blobs_request['reference']) - d.addCallback(get_further_blobs) - return d - else: - response['further_blobs'] = {'error': 'NO_REFERENCE_SENT'} - return defer.succeed(response) - else: - return defer.succeed({}) - - ######### internal ######### - - def handle_blob_info_payment_rate(self, requested_payment_rate): - if not self.payment_rate_manager.accept_rate_live_blob_info(self.peer, requested_payment_rate): - return False - else: - self.blob_info_payment_rate = requested_payment_rate - return True - - def format_blob_infos(self, blobs): - blob_infos = [] - for blob_hash, blob_num, revision, iv, length, signature in blobs: - blob_info = {} - if length != 0: - blob_info['blob_hash'] = blob_hash - blob_info['blob_num'] = blob_num - blob_info['revision'] = revision - blob_info['iv'] = iv - blob_info['length'] = length - blob_info['signature'] = signature - blob_infos.append(blob_info) - return blob_infos - - def get_stream_hash_from_reference(self, reference): - d = self.stream_info_manager.check_if_stream_exists(reference) - - def check_if_stream_found(result): - if result is True: - return reference - else: - return self.stream_info_manager.get_stream_of_blob(reference) - - d.addCallback(check_if_stream_found) - return d - - def get_further_blobs(self, stream_hash, start, end, count): - ds = [] - if start is not None and start != "beginning": - ds.append(self.stream_info_manager.get_stream_of_blob(start)) - if end is not None and end != 'end': - ds.append(self.stream_info_manager.get_stream_of_blob(end)) - dl = defer.DeferredList(ds, fireOnOneErrback=True) - - def ensure_streams_match(results): - for success, stream_of_blob in results: - if stream_of_blob != stream_hash: - raise ValueError("Blob does not match stream") - return True - - def get_blob_infos(): - reverse = False - count_to_use = count - if start is None: - reverse = True - elif end is not None and count_to_use is not None and count_to_use < 0: - reverse = True - if count_to_use is not None and count_to_use < 0: - count_to_use *= -1 - if start == "beginning" or start is None: - s = None - else: - s = start - if end == "end" or end is None: - e = None - else: - e = end - return self.stream_info_manager.get_blobs_for_stream(stream_hash, s, e, count_to_use, reverse) - - dl.addCallback(ensure_streams_match) - dl.addCallback(lambda _: get_blob_infos()) - return dl diff --git a/lbrynet/lbrylive/server/__init__.py b/lbrynet/lbrylive/server/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 5421f10a8..69f35f820 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -12,15 +12,9 @@ from Crypto.PublicKey import RSA from Crypto import Random from Crypto.Hash import MD5 from lbrynet import conf -from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator -from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager -from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, \ DBEncryptedFileMetadataManager from lbrynet import analytics -from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator -from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager -from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager @@ -44,10 +38,6 @@ from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory -from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory -from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier -from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier - from tests import mocks @@ -308,132 +298,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, reactor.run() -def start_live_server(sd_hash_queue, kill_event, dead_event): - use_epoll_on_linux() - from twisted.internet import reactor - - logging.debug("In start_server.") - - Random.atfork() - - r = random.Random() - r.seed("start_live_server") - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() - - db_dir = "server" - os.mkdir(db_dir) - - session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1]) - stream_info_manager = DBLiveStreamMetadataManager(session.db_dir, hash_announcer) - - logging.debug("Created the session") - - server_port = [] - - def start_listening(): - logging.debug("Starting the server protocol") - query_handler_factories = { - 1: CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet, - session.payment_rate_manager), - 2: BlobRequestHandlerFactory(session.blob_manager, session.wallet, - session.payment_rate_manager, - analytics.Track()), - 3: session.wallet.get_wallet_info_query_handler_factory() - } - - server_factory = ServerProtocolFactory(session.rate_limiter, - query_handler_factories, - session.peer_manager) - server_port.append(reactor.listenTCP(5553, server_factory)) - logging.debug("Server protocol has started") - - def create_stream(): - logging.debug("Making the live stream") - test_file = GenFile(5209343, b''.join([chr(i + 2) for i in xrange(0, 64, 6)])) - stream_creator_helper = FileLiveStreamCreator(session.blob_manager, stream_info_manager, - "test_file", test_file) - d = stream_creator_helper.setup() - d.addCallback(lambda _: stream_creator_helper.publish_stream_descriptor()) - d.addCallback(put_sd_hash_on_queue) - d.addCallback(lambda _: stream_creator_helper.start_streaming()) - return d - - def put_sd_hash_on_queue(sd_hash): - logging.debug("Telling the client to start running. Stream hash: %s", str(sd_hash)) - sd_hash_queue.put(sd_hash) - logging.debug("sd hash has been added to the queue") - - def set_dead_event(): - logging.debug("Setting the dead event") - dead_event.set() - - def print_error(err): - logging.debug("An error occurred during shutdown: %s", err.getTraceback()) - - def stop_reactor(): - logging.debug("Server is stopping its reactor") - reactor.stop() - - def shut_down(arg): - logging.debug("Shutting down") - if isinstance(arg, Failure): - logging.error("Shut down is due to an error: %s", arg.getTraceback()) - d = defer.maybeDeferred(server_port[0].stopListening) - d.addErrback(print_error) - d.addCallback(lambda _: session.shut_down()) - d.addCallback(lambda _: stream_info_manager.stop()) - d.addErrback(print_error) - d.addCallback(lambda _: set_dead_event()) - d.addErrback(print_error) - d.addCallback(lambda _: reactor.callLater(0, stop_reactor)) - d.addErrback(print_error) - return d - - def wait_for_kill_event(): - - d = defer.Deferred() - - def check_for_kill(): - if kill_event.is_set(): - logging.debug("Kill event has been found set") - kill_check.stop() - d.callback(True) - - kill_check = task.LoopingCall(check_for_kill) - kill_check.start(1.0) - - return d - - def enable_live_stream(): - add_live_stream_to_sd_identifier(sd_identifier, session.base_payment_rate_manager) - add_full_live_stream_downloader_to_sd_identifier(session, stream_info_manager, sd_identifier, - session.base_payment_rate_manager) - - def run_server(): - d = session.setup() - d.addCallback(lambda _: stream_info_manager.setup()) - d.addCallback(lambda _: enable_live_stream()) - d.addCallback(lambda _: start_listening()) - d.addCallback(lambda _: create_stream()) - d.addCallback(lambda _: wait_for_kill_event()) - d.addBoth(shut_down) - return d - - reactor.callLater(1, run_server) - if not reactor.running: - reactor.run() - - def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False): use_epoll_on_linux() from twisted.internet import reactor @@ -704,101 +568,6 @@ class TestTransfer(TestCase): return d - @unittest.skip("Sadly skipping failing test instead of fixing it") - def test_live_transfer(self): - - sd_hash_queue = Queue() - kill_event = Event() - dead_event = Event() - server_args = (sd_hash_queue, kill_event, dead_event) - server = Process(target=start_live_server, args=server_args) - server.start() - self.server_processes.append(server) - - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 1) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() - - db_dir = "client" - os.mkdir(db_dir) - - self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None, - peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node - ) - - self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer) - - d = self.wait_for_hash_from_queue(sd_hash_queue) - - def create_downloader(metadata, prm): - info_validator = metadata.validator - options = metadata.options - factories = metadata.factories - chosen_options = [ - o.default_value for o in options.get_downloader_options(info_validator, prm)] - return factories[0].make_downloader(metadata, chosen_options, prm) - - def start_lbry_file(lbry_file): - lbry_file = lbry_file - return lbry_file.start() - - def download_stream(sd_blob_hash): - prm = self.session.payment_rate_manager - d = download_sd_blob(self.session, sd_blob_hash, prm) - d.addCallback(sd_identifier.get_metadata_for_sd_blob) - d.addCallback(create_downloader, prm) - d.addCallback(start_lbry_file) - return d - - def do_download(sd_blob_hash): - logging.debug("Starting the download") - - d = self.session.setup() - d.addCallback(lambda _: enable_live_stream()) - d.addCallback(lambda _: download_stream(sd_blob_hash)) - return d - - def enable_live_stream(): - add_live_stream_to_sd_identifier(sd_identifier, self.session.payment_rate_manager) - add_full_live_stream_downloader_to_sd_identifier(self.session, self.stream_info_manager, - sd_identifier, - self.session.payment_rate_manager) - - d.addCallback(do_download) - - def check_md5_sum(): - f = open('test_file') - hashsum = MD5.new() - hashsum.update(f.read()) - self.assertEqual(hashsum.hexdigest(), "215b177db8eed86d028b37e5cbad55c7") - - d.addCallback(lambda _: check_md5_sum()) - - def stop(arg): - if isinstance(arg, Failure): - logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback()) - else: - logging.debug("Client is stopping normally.") - kill_event.set() - logging.debug("Set the kill event") - d = self.wait_for_event(dead_event, 15) - - def print_shutting_down(): - logging.info("Client is shutting down") - - d.addCallback(lambda _: print_shutting_down()) - d.addCallback(lambda _: arg) - return d - - d.addBoth(stop) - return d - def test_last_blob_retrieval(self): kill_event = Event() dead_event_1 = Event()