From 5dbf9034ec65cc324908d239776358011fa88e5a Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 6 Jan 2017 08:27:18 -0600 Subject: [PATCH] Add scripts related to querying / downloading sd blobs --- lbrynet/core/HashBlob.py | 2 +- lbrynet/metadata/Metadata.py | 2 +- scripts/common.py | 23 +++ scripts/name.py | 59 ++++++++ scripts/pool.py | 50 +++++++ scripts/query_available_blobs.py | 186 +++++++++++++++++++++++++ scripts/send_sd_blobs_to_lighthouse.py | 176 +++++++++++++++++++++++ scripts/track.py | 44 ++++++ 8 files changed, 540 insertions(+), 2 deletions(-) create mode 100644 scripts/common.py create mode 100644 scripts/name.py create mode 100644 scripts/pool.py create mode 100644 scripts/query_available_blobs.py create mode 100644 scripts/send_sd_blobs_to_lighthouse.py create mode 100644 scripts/track.py diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index cf4c085d0..28a3aa4e0 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -194,7 +194,7 @@ class HashBlob(object): return self.blob_hash[:16] def __repr__(self): - return str(self) + return '<{}({})>'.format(self.__class__.__name__, str(self)) class BlobFile(HashBlob): diff --git a/lbrynet/metadata/Metadata.py b/lbrynet/metadata/Metadata.py index 56e94ec30..b1b7de8b0 100644 --- a/lbrynet/metadata/Metadata.py +++ b/lbrynet/metadata/Metadata.py @@ -38,7 +38,7 @@ class Metadata(StructuredDict): def __init__(self, metadata, migrate=True, target_version=None): if not isinstance(metadata, dict): - raise TypeError("metadata is not a dictionary") + raise TypeError("{} is not a dictionary".format(metadata)) starting_version = metadata.get('ver', '0.0.1') StructuredDict.__init__(self, metadata, starting_version, migrate, target_version) diff --git a/scripts/common.py b/scripts/common.py new file mode 100644 index 000000000..9a8e6405f --- /dev/null +++ b/scripts/common.py @@ -0,0 +1,23 @@ +import logging + +from twisted.internet import defer + + +log = logging.getLogger(__name__) + + +@defer.inlineCallbacks +def getNames(wallet, names=None): + if names: + defer.returnValue(names) + nametrie = yield wallet.get_nametrie() + defer.returnValue(list(getNameClaims(nametrie))) + + +def getNameClaims(trie): + for x in trie: + if 'txid' in x: + try: + yield str(x['name']) + except UnicodeError: + log.warning('Skippin name %s as it is not ascii', x['name']) diff --git a/scripts/name.py b/scripts/name.py new file mode 100644 index 000000000..48888858e --- /dev/null +++ b/scripts/name.py @@ -0,0 +1,59 @@ +import logging +import random + +from twisted.internet import defer +from twisted.internet import reactor + +from lbrynet.core import Error +from lbrynet.core import StreamDescriptor +from lbrynet.metadata import Metadata + + +log = logging.getLogger(__name__) + + +class Name(object): + def __init__(self, name): + self.name = name + self.sd_hash = None + self.sd_blob = None + + @defer.inlineCallbacks + def setSdHash(self, wallet): + try: + stream = yield wallet.get_stream_info_for_name(self.name) + metadata = Metadata.Metadata(stream) + self.sd_hash = _getSdHash(metadata) + except (Error.InvalidStreamInfoError, AssertionError): + pass + except Exception: + log.exception('What happened') + + @defer.inlineCallbacks + def download_sd_blob(self, session): + print('Trying to get sd_blob for {} using {}'.format(self.name, self.sd_hash)) + try: + blob = yield download_sd_blob_with_timeout( + session, self.sd_hash, session.payment_rate_manager) + + self.sd_blob = blob + yield self._after_download(blob) + print('Downloaded sd_blob for {} using {}'.format(self.name, self.sd_hash)) + except defer.TimeoutError: + print('Downloading sd_blob for {} timed-out'.format(self.name)) + # swallow errors from the timeout + pass + except Exception: + log.exception('Failed to download {}'.format(self.name)) + + def _after_download(self, blob): + pass + +def _getSdHash(metadata): + return metadata['sources']['lbry_sd_hash'] + + +def download_sd_blob_with_timeout(session, sd_hash, payment_rate_manager): + d = StreamDescriptor.download_sd_blob(session, sd_hash, payment_rate_manager) + d.addTimeout(random.randint(10, 30), reactor) + return d diff --git a/scripts/pool.py b/scripts/pool.py new file mode 100644 index 000000000..8ba4199d8 --- /dev/null +++ b/scripts/pool.py @@ -0,0 +1,50 @@ +import itertools +import logging + +from twisted.internet import defer + + +log = logging.getLogger(__name__) + + +class DeferredPool(defer.Deferred): + def __init__(self, deferred_iter, pool_size): + self.deferred_iter = deferred_iter + self.pool_size = pool_size + # results are stored unordered + self.result_list = [] + self.started_count = 0 + self.total_count = None + defer.Deferred.__init__(self) + + for deferred in itertools.islice(deferred_iter, pool_size): + self._start_one(deferred) + + def _start_one(self, deferred): + deferred.addCallbacks(self._callback, self._callback, + callbackArgs=(self.started_count, defer.SUCCESS), + errbackArgs=(self.started_count, defer.FAILURE)) + self.started_count += 1 + + def _callback(self, result, index, success): + self.result_list.append((index, success, result)) + if self._done(): + self._finish() + else: + self._process_next() + return result + + def _done(self): + return self.total_count == len(self.result_list) + + def _finish(self): + result_list = [(s, r) for i, s, r in sorted(self.result_list)] + self.callback(result_list) + + def _process_next(self): + try: + deferred = next(self.deferred_iter) + except StopIteration: + self.total_count = self.started_count + else: + self._start_one(deferred) diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py new file mode 100644 index 000000000..603b687cc --- /dev/null +++ b/scripts/query_available_blobs.py @@ -0,0 +1,186 @@ +from __future__ import print_function +from lbrynet.core import log_support + +import argparse +import collections +import itertools +import logging +import os +import random +import shutil +import sys +import tempfile + +import appdirs +from twisted.internet import defer +from twisted.internet import reactor +from twisted.internet import protocol +from twisted.internet import endpoints + +from lbrynet import analytics +from lbrynet import conf +from lbrynet.core import Error +from lbrynet.core import Wallet +from lbrynet.core import BlobAvailability +from lbrynet.core import BlobManager +from lbrynet.core import HashAnnouncer +from lbrynet.core import PeerManager +from lbrynet.core import Session +from lbrynet.core import utils +from lbrynet.core.client import DHTPeerFinder +from lbrynet.dht import node +from lbrynet.metadata import Metadata +from lbrynet.core import StreamDescriptor as sd + +import common +import name +import pool +import track + + +log = logging.getLogger() + + +def main(args=None): + conf.initialize_settings() + parser = argparse.ArgumentParser() + parser.add_argument('--limit', type=int) + parser.add_argument('--download', action='store_true', + help='Set flag to also download each sd_blob and report on success') + args = parser.parse_args(args) + + log_support.configure_console() + log_support.configure_twisted() + + # make a fresh dir or else we will include blobs that we've + # already downloaded but might not otherwise be available. + db_dir = tempfile.mkdtemp() + try: + blob_dir = os.path.join(db_dir, 'blobfiles') + os.makedirs(blob_dir) + storage = Wallet.InMemoryStorage() + wallet = Wallet.LBRYumWallet(storage) + session = Session.Session( + 0, + db_dir=db_dir, + lbryid=utils.generate_id(), + blob_dir=blob_dir, + dht_node_port=4444, + known_dht_nodes=conf.settings.known_dht_nodes, + peer_port=3333, + use_upnp=False, + wallet=wallet + ) + api = analytics.Api.new_instance() + run(args, session, api) + reactor.run() + finally: + shutil.rmtree(db_dir) + + +@defer.inlineCallbacks +def run(args, session, api): + try: + yield session.setup() + names = yield common.getNames(session.wallet) + if args.limit and len(names) > args.limit: + names = random.sample(list(names), args.limit) + names = [Name(n) for n in names] + blob_tracker = BlobAvailability.BlobAvailabilityTracker( + session.blob_manager, session.peer_finder, session.dht_node) + + tracker = yield Tracker(session, names, blob_tracker) + yield tracker.processNameClaims(args.download) + event = makeEvent(tracker.stats) + if args.download and not args.limit: + api.track(event) + else: + # don't send event to analytics if it doesn't contain the full info + print(event) + except Exception: + log.exception('Something bad happened') + finally: + reactor.stop() + + +class Tracker(track.Tracker): + def __init__(self, session, names, blob_tracker): + track.Tracker.__init__(self, session, names) + self.blob_tracker = blob_tracker + + @defer.inlineCallbacks + def processNameClaims(self, download=False): + try: + yield self._getSdHashes() + yield self._filterNames('sd_hash') + yield self._checkAvailability() + yield self._filterNames('is_available') + yield self.print_attempts_counter() + if download: + yield self._downloadAllBlobs() + yield self._filterNames('sd_blob') + except Exception: + log.exception('Something bad happened') + + def print_attempts_counter(self): + print(self.attempts_counter) + + def attempts_counter(self): + return collections.Counter([n.availability_attempts for n in self.names]) + + def _checkAvailability(self): + return pool.DeferredPool( + (n.check_availability(self.blob_tracker) for n in self.names), + 10 + ) + + +class Name(name.Name): + # From experience, very few sd_blobs get found after the third attempt + MAX_ATTEMPTS = 6 + def __init__(self, my_name): + name.Name.__init__(self, my_name) + self.is_available = None + self.availability_attempts = 0 + + @defer.inlineCallbacks + def _check_availability(self, blob_tracker): + b = yield blob_tracker.get_blob_availability(self.sd_hash) + peer_count = b[self.sd_hash] + self._setAvailable(peer_count) + + @defer.inlineCallbacks + def check_availability(self, blob_tracker): + while not self.is_available and self.availability_attempts < self.MAX_ATTEMPTS: + self.availability_attempts += 1 + log.info('Attempt %s to find %s', self.availability_attempts, self.name) + yield self._check_availability(blob_tracker) + + def _setAvailable(self, peer_count): + self.is_available = peer_count > 0 + + +def makeEvent(stats): + return { + 'userId': 'lbry', + 'event': 'Content Availability', + 'properties': { + 'total_published': stats['sd_hash'], + 'sd_blob_available_on_dht': stats['is_available'], + 'sd_blob_available_for_download': stats['sd_blob'], + }, + 'context': { + 'app': { + 'name': 'Availability Tracker', + 'version': 1, + }, + 'library': { + 'name': 'lbrynet-analytics', + 'version': '1.0.0' + }, + }, + 'timestamp': utils.isonow() + } + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/send_sd_blobs_to_lighthouse.py b/scripts/send_sd_blobs_to_lighthouse.py new file mode 100644 index 000000000..405348d85 --- /dev/null +++ b/scripts/send_sd_blobs_to_lighthouse.py @@ -0,0 +1,176 @@ +from __future__ import print_function +from lbrynet.core import log_support + +import argparse +import collections +import itertools +import logging +import os +import random +import sys + +import appdirs +from twisted.internet import defer +from twisted.internet import reactor +from twisted.internet import protocol +from twisted.internet import endpoints + +from lbrynet import conf +from lbrynet.core import Error +from lbrynet.core import Wallet +from lbrynet.core import BlobAvailability +from lbrynet.core import BlobManager +from lbrynet.core import HashAnnouncer +from lbrynet.core import PeerManager +from lbrynet.core import Session +from lbrynet.core import utils +from lbrynet.core.client import DHTPeerFinder +from lbrynet.dht import node + +from lbrynet.metadata import Metadata +from lbrynet.core import StreamDescriptor as sd +from lbrynet import reflector + +import common +import name +import pool +import track + + +log = logging.getLogger('main') + + +def main(args=None): + conf.initialize_settings() + parser = argparse.ArgumentParser() + parser.add_argument('destination', type=conf.server_port, nargs='+') + parser.add_argument('--names', nargs='*') + parser.add_argument('--limit', type=int) + args = parser.parse_args(args) + + log_support.configure_console(level='INFO') + + db_dir = appdirs.user_data_dir('lighthouse-uploader') + safe_makedirs(db_dir) + # no need to persist metadata info + storage = Wallet.InMemoryStorage() + wallet = Wallet.LBRYumWallet(storage) + blob_dir = os.path.join(db_dir, 'blobfiles') + safe_makedirs(blob_dir) + # Don't set a hash_announcer, we have no need to tell anyone we + # have these blobs + blob_manager = BlobManager.DiskBlobManager(None, blob_dir, db_dir) + # TODO: make it so that I can disable the BlobAvailabilityTracker + # or, in general, make the session more reusable for users + # that only want part of the functionality + session = Session.Session( + blob_data_payment_rate=0, + db_dir=db_dir, + lbryid=utils.generate_id(), + blob_dir=blob_dir, + dht_node_port=4444, + known_dht_nodes=conf.settings.known_dht_nodes, + peer_port=3333, + use_upnp=False, + wallet=wallet, + blob_manager=blob_manager, + ) + assert session.wallet + run(session, args.destination, args.names, args.limit) + reactor.run() + + +def safe_makedirs(directory): + try: + os.makedirs(directory) + except OSError: + pass + + +@defer.inlineCallbacks +def run(session, destinations, names, limit): + try: + yield session.setup() + while not session.wallet.network.is_connected(): + log.info('Retrying wallet startup') + try: + yield session.wallet.start() + except ValueError: + pass + names = yield getNames(session.wallet, names) + if limit and limit < len(names): + names = random.sample(names, limit) + log.info('Processing %s names', len(names)) + names = [Name(n, session.blob_manager) for n in names] + t = Tracker(session, destinations, names) + yield t.processNameClaims() + except Exception: + log.exception('Something bad happened') + finally: + log.warning('We are stopping the reactor gracefully') + reactor.stop() + + +def logAndStop(err): + log_support.failure(err, log, 'This sucks: %s') + reactor.stop() + + +def logAndRaise(err): + log_support.failure(err, log, 'This still sucks: %s') + return err + + +class Tracker(track.Tracker): + def __init__(self, session, destinations, names): + self.destinations = destinations + track.Tracker.__init__(self, session, names) + + @property + def blob_manager(self): + return self.session.blob_manager + + @defer.inlineCallbacks + def processNameClaims(self): + yield super(Tracker, self).processNameClaims() + log.info('Sending the blobs') + yield self._sendSdBlobs() + + @defer.inlineCallbacks + def _sendSdBlobs(self): + blobs = [n.sd_blob for n in self.names if n.sd_blob] + log.info('Sending %s blobs', len(blobs)) + blob_hashes = [b.blob_hash for b in blobs] + for destination in self.destinations: + factory = reflector.BlobClientFactory(self.blob_manager, blob_hashes) + yield self._connect(destination, factory) + + @defer.inlineCallbacks + def _connect(self, destination, factory): + url, port = destination + ip = yield reactor.resolve(url) + try: + print('Connecting to {}'.format(ip)) + yield reactor.connectTCP(ip, port, factory) + #factory.finished_deferred.addTimeout(60, reactor) + value = yield factory.finished_deferred + if value: + print('Success!') + else: + print('Not success?', value) + except Exception: + log.exception('Somehow failed to send blobs') + + +class Name(name.Name): + def __init__(self, my_name, blob_manager): + name.Name.__init__(self, my_name) + self.blob_manager = blob_manager + + def _after_download(self, blob): + # keep the blob for future runs + self.blob_manager.blob_completed(blob) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/track.py b/scripts/track.py new file mode 100644 index 000000000..2cf7e66f1 --- /dev/null +++ b/scripts/track.py @@ -0,0 +1,44 @@ +import logging +import random + +from twisted.internet import defer + +from lbrynet.core import Error + +import pool + + +log = logging.getLogger(__name__) + + +class Tracker(object): + def __init__(self, session, names): + self.session = session + self.names = names + self.stats = {} + + @property + def wallet(self): + return self.session.wallet + + @defer.inlineCallbacks + def processNameClaims(self): + try: + log.info('Starting to get name claims') + yield self._getSdHashes() + self._filterNames('sd_hash') + log.info('Downloading all of the blobs') + yield self._downloadAllBlobs() + except Exception: + log.exception('Something bad happened') + + def _getSdHashes(self): + return pool.DeferredPool((n.setSdHash(self.wallet) for n in self.names), 10) + + def _filterNames(self, attr): + self.names = [n for n in self.names if getattr(n, attr)] + self.stats[attr] = len(self.names) + print("We have {} names with attribute {}".format(len(self.names), attr)) + + def _downloadAllBlobs(self): + return pool.DeferredPool((n.download_sd_blob(self.session) for n in self.names), 10)