From 74ec4192e2f338d18845cfe88c489dc049c96d20 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Dec 2017 20:23:16 -0500 Subject: [PATCH] delete outdated scripts --- scripts/common.py | 23 ---- scripts/name.py | 59 --------- scripts/pool.py | 50 ------- scripts/query_available_blobs.py | 174 ------------------------- scripts/send_sd_blobs_to_lighthouse.py | 163 ----------------------- scripts/track.py | 41 ------ 6 files changed, 510 deletions(-) delete mode 100644 scripts/common.py delete mode 100644 scripts/name.py delete mode 100644 scripts/pool.py delete mode 100644 scripts/query_available_blobs.py delete mode 100644 scripts/send_sd_blobs_to_lighthouse.py delete mode 100644 scripts/track.py diff --git a/scripts/common.py b/scripts/common.py deleted file mode 100644 index 9a8e6405f..000000000 --- a/scripts/common.py +++ /dev/null @@ -1,23 +0,0 @@ -import logging - -from twisted.internet import defer - - -log = logging.getLogger(__name__) - - -@defer.inlineCallbacks -def getNames(wallet, names=None): - if names: - defer.returnValue(names) - nametrie = yield wallet.get_nametrie() - defer.returnValue(list(getNameClaims(nametrie))) - - -def getNameClaims(trie): - for x in trie: - if 'txid' in x: - try: - yield str(x['name']) - except UnicodeError: - log.warning('Skippin name %s as it is not ascii', x['name']) diff --git a/scripts/name.py b/scripts/name.py deleted file mode 100644 index 48888858e..000000000 --- a/scripts/name.py +++ /dev/null @@ -1,59 +0,0 @@ -import logging -import random - -from twisted.internet import defer -from twisted.internet import reactor - -from lbrynet.core import Error -from lbrynet.core import StreamDescriptor -from lbrynet.metadata import Metadata - - -log = logging.getLogger(__name__) - - -class Name(object): - def __init__(self, name): - self.name = name - self.sd_hash = None - self.sd_blob = None - - @defer.inlineCallbacks - def setSdHash(self, wallet): - try: - stream = yield wallet.get_stream_info_for_name(self.name) - metadata = Metadata.Metadata(stream) - self.sd_hash = _getSdHash(metadata) - except (Error.InvalidStreamInfoError, AssertionError): - pass - except Exception: - log.exception('What happened') - - @defer.inlineCallbacks - def download_sd_blob(self, session): - print('Trying to get sd_blob for {} using {}'.format(self.name, self.sd_hash)) - try: - blob = yield download_sd_blob_with_timeout( - session, self.sd_hash, session.payment_rate_manager) - - self.sd_blob = blob - yield self._after_download(blob) - print('Downloaded sd_blob for {} using {}'.format(self.name, self.sd_hash)) - except defer.TimeoutError: - print('Downloading sd_blob for {} timed-out'.format(self.name)) - # swallow errors from the timeout - pass - except Exception: - log.exception('Failed to download {}'.format(self.name)) - - def _after_download(self, blob): - pass - -def _getSdHash(metadata): - return metadata['sources']['lbry_sd_hash'] - - -def download_sd_blob_with_timeout(session, sd_hash, payment_rate_manager): - d = StreamDescriptor.download_sd_blob(session, sd_hash, payment_rate_manager) - d.addTimeout(random.randint(10, 30), reactor) - return d diff --git a/scripts/pool.py b/scripts/pool.py deleted file mode 100644 index 8ba4199d8..000000000 --- a/scripts/pool.py +++ /dev/null @@ -1,50 +0,0 @@ -import itertools -import logging - -from twisted.internet import defer - - -log = logging.getLogger(__name__) - - -class DeferredPool(defer.Deferred): - def __init__(self, deferred_iter, pool_size): - self.deferred_iter = deferred_iter - self.pool_size = pool_size - # results are stored unordered - self.result_list = [] - self.started_count = 0 - self.total_count = None - defer.Deferred.__init__(self) - - for deferred in itertools.islice(deferred_iter, pool_size): - self._start_one(deferred) - - def _start_one(self, deferred): - deferred.addCallbacks(self._callback, self._callback, - callbackArgs=(self.started_count, defer.SUCCESS), - errbackArgs=(self.started_count, defer.FAILURE)) - self.started_count += 1 - - def _callback(self, result, index, success): - self.result_list.append((index, success, result)) - if self._done(): - self._finish() - else: - self._process_next() - return result - - def _done(self): - return self.total_count == len(self.result_list) - - def _finish(self): - result_list = [(s, r) for i, s, r in sorted(self.result_list)] - self.callback(result_list) - - def _process_next(self): - try: - deferred = next(self.deferred_iter) - except StopIteration: - self.total_count = self.started_count - else: - self._start_one(deferred) diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py deleted file mode 100644 index c2b08f944..000000000 --- a/scripts/query_available_blobs.py +++ /dev/null @@ -1,174 +0,0 @@ -from __future__ import print_function -from lbrynet.core import log_support - -import argparse -import collections -import logging -import os -import random -import shutil -import sys -import tempfile - -from twisted.internet import defer -from twisted.internet import reactor - -from lbrynet import analytics -from lbrynet import conf -from lbrynet.core import Wallet -from lbrynet.core import BlobAvailability -from lbrynet.core import Session -from lbrynet.core import utils - -import common -import name -import pool -import track - - -log = logging.getLogger() - - -def main(args=None): - conf.initialize_settings() - parser = argparse.ArgumentParser() - parser.add_argument('--limit', type=int) - parser.add_argument('--download', action='store_true', - help='Set flag to also download each sd_blob and report on success') - args = parser.parse_args(args) - - log_support.configure_console() - log_support.configure_twisted() - - # make a fresh dir or else we will include blobs that we've - # already downloaded but might not otherwise be available. - db_dir = tempfile.mkdtemp() - try: - blob_dir = os.path.join(db_dir, 'blobfiles') - os.makedirs(blob_dir) - storage = Wallet.InMemoryStorage() - wallet = Wallet.LBRYumWallet(storage) - session = Session.Session( - 0, - db_dir=db_dir, - node_id=utils.generate_id(), - blob_dir=blob_dir, - dht_node_port=4444, - known_dht_nodes=conf.settings['known_dht_nodes'], - peer_port=3333, - use_upnp=False, - wallet=wallet - ) - api = analytics.Api.new_instance(conf.settings['share_usage_data']) - run(args, session, api) - reactor.run() - finally: - shutil.rmtree(db_dir) - - -@defer.inlineCallbacks -def run(args, session, api): - try: - yield session.setup() - names = yield common.getNames(session.wallet) - if args.limit and len(names) > args.limit: - names = random.sample(list(names), args.limit) - names = [Name(n) for n in names] - blob_tracker = BlobAvailability.BlobAvailabilityTracker( - session.blob_manager, session.peer_finder, session.dht_node) - - tracker = yield Tracker(session, names, blob_tracker) - yield tracker.processNameClaims(args.download) - event = makeEvent(tracker.stats) - if args.download and not args.limit: - api.track(event) - else: - # don't send event to analytics if it doesn't contain the full info - print(event) - except Exception: - log.exception('Something bad happened') - finally: - reactor.stop() - - -class Tracker(track.Tracker): - def __init__(self, session, names, blob_tracker): - track.Tracker.__init__(self, session, names) - self.blob_tracker = blob_tracker - - @defer.inlineCallbacks - def processNameClaims(self, download=False): - try: - yield self._getSdHashes() - yield self._filterNames('sd_hash') - yield self._checkAvailability() - yield self._filterNames('is_available') - yield self.print_attempts_counter() - if download: - yield self._downloadAllBlobs() - yield self._filterNames('sd_blob') - except Exception: - log.exception('Something bad happened') - - def print_attempts_counter(self): - print(self.attempts_counter) - - def attempts_counter(self): - return collections.Counter([n.availability_attempts for n in self.names]) - - def _checkAvailability(self): - return pool.DeferredPool( - (n.check_availability(self.blob_tracker) for n in self.names), - 10 - ) - - -class Name(name.Name): - # From experience, very few sd_blobs get found after the third attempt - MAX_ATTEMPTS = 6 - def __init__(self, my_name): - name.Name.__init__(self, my_name) - self.is_available = None - self.availability_attempts = 0 - - @defer.inlineCallbacks - def _check_availability(self, blob_tracker): - b = yield blob_tracker.get_blob_availability(self.sd_hash) - peer_count = b[self.sd_hash] - self._setAvailable(peer_count) - - @defer.inlineCallbacks - def check_availability(self, blob_tracker): - while not self.is_available and self.availability_attempts < self.MAX_ATTEMPTS: - self.availability_attempts += 1 - log.info('Attempt %s to find %s', self.availability_attempts, self.name) - yield self._check_availability(blob_tracker) - - def _setAvailable(self, peer_count): - self.is_available = peer_count > 0 - - -def makeEvent(stats): - return { - 'userId': 'lbry', - 'event': 'Content Availability', - 'properties': { - 'total_published': stats['sd_hash'], - 'sd_blob_available_on_dht': stats['is_available'], - 'sd_blob_available_for_download': stats['sd_blob'], - }, - 'context': { - 'app': { - 'name': 'Availability Tracker', - 'version': 1, - }, - 'library': { - 'name': 'lbrynet-analytics', - 'version': '1.0.0' - }, - }, - 'timestamp': utils.isonow() - } - -if __name__ == '__main__': - sys.exit(main()) diff --git a/scripts/send_sd_blobs_to_lighthouse.py b/scripts/send_sd_blobs_to_lighthouse.py deleted file mode 100644 index aad1f21f9..000000000 --- a/scripts/send_sd_blobs_to_lighthouse.py +++ /dev/null @@ -1,163 +0,0 @@ -from __future__ import print_function -from lbrynet.core import log_support - -import argparse -import logging -import os -import random -import sys - -import appdirs -from twisted.internet import defer -from twisted.internet import reactor - -from lbrynet import conf -from lbrynet.core import Wallet -from lbrynet.core import BlobManager -from lbrynet.core import Session -from lbrynet.core import utils - -from lbrynet import reflector - -import name -import track - - -log = logging.getLogger('main') - - -def main(args=None): - conf.initialize_settings() - - parser = argparse.ArgumentParser() - parser.add_argument('destination', type=conf.server_port, nargs='+') - parser.add_argument('--names', nargs='*') - parser.add_argument('--limit', type=int) - args = parser.parse_args(args) - - log_support.configure_console(level='INFO') - - db_dir = appdirs.user_data_dir('lighthouse-uploader') - safe_makedirs(db_dir) - # no need to persist metadata info - storage = Wallet.InMemoryStorage() - wallet = Wallet.LBRYumWallet(storage) - blob_dir = os.path.join(db_dir, 'blobfiles') - safe_makedirs(blob_dir) - # Don't set a hash_announcer, we have no need to tell anyone we - # have these blobs - blob_manager = BlobManager.DiskBlobManager(None, blob_dir, db_dir) - # TODO: make it so that I can disable the BlobAvailabilityTracker - # or, in general, make the session more reusable for users - # that only want part of the functionality - session = Session.Session( - blob_data_payment_rate=0, - db_dir=db_dir, - node_id=utils.generate_id(), - blob_dir=blob_dir, - dht_node_port=4444, - known_dht_nodes=conf.settings['known_dht_nodes'], - peer_port=3333, - use_upnp=False, - wallet=wallet, - blob_manager=blob_manager, - ) - assert session.wallet - run(session, args.destination, args.names, args.limit) - reactor.run() - - -def safe_makedirs(directory): - try: - os.makedirs(directory) - except OSError: - pass - - -@defer.inlineCallbacks -def run(session, destinations, names, limit): - try: - yield session.setup() - while not session.wallet.network.is_connected(): - log.info('Retrying wallet startup') - try: - yield session.wallet.start() - except ValueError: - pass - names = yield getNames(session.wallet, names) - if limit and limit < len(names): - names = random.sample(names, limit) - log.info('Processing %s names', len(names)) - names = [Name(n, session.blob_manager) for n in names] - t = Tracker(session, destinations, names) - yield t.processNameClaims() - except Exception: - log.exception('Something bad happened') - finally: - log.warning('We are stopping the reactor gracefully') - reactor.stop() - - -def logAndStop(err): - log_support.failure(err, log, 'This sucks: %s') - reactor.stop() - - -def logAndRaise(err): - log_support.failure(err, log, 'This still sucks: %s') - return err - - -class Tracker(track.Tracker): - def __init__(self, session, destinations, names): - self.destinations = destinations - track.Tracker.__init__(self, session, names) - - @property - def blob_manager(self): - return self.session.blob_manager - - @defer.inlineCallbacks - def processNameClaims(self): - yield super(Tracker, self).processNameClaims() - log.info('Sending the blobs') - yield self._sendSdBlobs() - - @defer.inlineCallbacks - def _sendSdBlobs(self): - blobs = [n.sd_blob for n in self.names if n.sd_blob] - log.info('Sending %s blobs', len(blobs)) - blob_hashes = [b.blob_hash for b in blobs] - for destination in self.destinations: - factory = reflector.BlobClientFactory(self.blob_manager, blob_hashes) - yield self._connect(destination, factory) - - @defer.inlineCallbacks - def _connect(self, destination, factory): - url, port = destination - ip = yield reactor.resolve(url) - try: - print('Connecting to {}'.format(ip)) - yield reactor.connectTCP(ip, port, factory) - #factory.finished_deferred.addTimeout(60, reactor) - value = yield factory.finished_deferred - if value: - print('Success!') - else: - print('Not success?', value) - except Exception: - log.exception('Somehow failed to send blobs') - - -class Name(name.Name): - def __init__(self, my_name, blob_manager): - name.Name.__init__(self, my_name) - self.blob_manager = blob_manager - - def _after_download(self, blob): - # keep the blob for future runs - self.blob_manager.blob_completed(blob) - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/scripts/track.py b/scripts/track.py deleted file mode 100644 index 35aa8ee4b..000000000 --- a/scripts/track.py +++ /dev/null @@ -1,41 +0,0 @@ -import logging - -from twisted.internet import defer - -import pool - - -log = logging.getLogger(__name__) - - -class Tracker(object): - def __init__(self, session, names): - self.session = session - self.names = names - self.stats = {} - - @property - def wallet(self): - return self.session.wallet - - @defer.inlineCallbacks - def processNameClaims(self): - try: - log.info('Starting to get name claims') - yield self._getSdHashes() - self._filterNames('sd_hash') - log.info('Downloading all of the blobs') - yield self._downloadAllBlobs() - except Exception: - log.exception('Something bad happened') - - def _getSdHashes(self): - return pool.DeferredPool((n.setSdHash(self.wallet) for n in self.names), 10) - - def _filterNames(self, attr): - self.names = [n for n in self.names if getattr(n, attr)] - self.stats[attr] = len(self.names) - print("We have {} names with attribute {}".format(len(self.names), attr)) - - def _downloadAllBlobs(self): - return pool.DeferredPool((n.download_sd_blob(self.session) for n in self.names), 10)