From 5dbf9034ec65cc324908d239776358011fa88e5a Mon Sep 17 00:00:00 2001
From: Job Evers-Meltzer <jobevers@users.noreply.github.com>
Date: Fri, 6 Jan 2017 08:27:18 -0600
Subject: [PATCH] Add scripts related to querying / downloading sd blobs

---
 lbrynet/core/HashBlob.py               |   2 +-
 lbrynet/metadata/Metadata.py           |   2 +-
 scripts/common.py                      |  23 +++
 scripts/name.py                        |  59 ++++++++
 scripts/pool.py                        |  50 +++++++
 scripts/query_available_blobs.py       | 186 +++++++++++++++++++++++++
 scripts/send_sd_blobs_to_lighthouse.py | 176 +++++++++++++++++++++++
 scripts/track.py                       |  44 ++++++
 8 files changed, 540 insertions(+), 2 deletions(-)
 create mode 100644 scripts/common.py
 create mode 100644 scripts/name.py
 create mode 100644 scripts/pool.py
 create mode 100644 scripts/query_available_blobs.py
 create mode 100644 scripts/send_sd_blobs_to_lighthouse.py
 create mode 100644 scripts/track.py

diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py
index cf4c085d0..28a3aa4e0 100644
--- a/lbrynet/core/HashBlob.py
+++ b/lbrynet/core/HashBlob.py
@@ -194,7 +194,7 @@ class HashBlob(object):
         return self.blob_hash[:16]
 
     def __repr__(self):
-        return str(self)
+        return '<{}({})>'.format(self.__class__.__name__, str(self))
 
 
 class BlobFile(HashBlob):
diff --git a/lbrynet/metadata/Metadata.py b/lbrynet/metadata/Metadata.py
index 56e94ec30..b1b7de8b0 100644
--- a/lbrynet/metadata/Metadata.py
+++ b/lbrynet/metadata/Metadata.py
@@ -38,7 +38,7 @@ class Metadata(StructuredDict):
 
     def __init__(self, metadata, migrate=True, target_version=None):
         if not isinstance(metadata, dict):
-            raise TypeError("metadata is not a dictionary")
+            raise TypeError("{} is not a dictionary".format(metadata))
         starting_version = metadata.get('ver', '0.0.1')
 
         StructuredDict.__init__(self, metadata, starting_version, migrate, target_version)
diff --git a/scripts/common.py b/scripts/common.py
new file mode 100644
index 000000000..9a8e6405f
--- /dev/null
+++ b/scripts/common.py
@@ -0,0 +1,23 @@
+import logging
+
+from twisted.internet import defer
+
+
+log = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def getNames(wallet, names=None):
+    if names:
+        defer.returnValue(names)
+    nametrie = yield wallet.get_nametrie()
+    defer.returnValue(list(getNameClaims(nametrie)))
+
+
+def getNameClaims(trie):
+    for x in trie:
+        if 'txid' in x:
+            try:
+                yield str(x['name'])
+            except UnicodeError:
+                log.warning('Skippin name %s as it is not ascii', x['name'])
diff --git a/scripts/name.py b/scripts/name.py
new file mode 100644
index 000000000..48888858e
--- /dev/null
+++ b/scripts/name.py
@@ -0,0 +1,59 @@
+import logging
+import random
+
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from lbrynet.core import Error
+from lbrynet.core import StreamDescriptor
+from lbrynet.metadata import Metadata
+
+
+log = logging.getLogger(__name__)
+
+
+class Name(object):
+    def __init__(self, name):
+        self.name = name
+        self.sd_hash = None
+        self.sd_blob = None
+
+    @defer.inlineCallbacks
+    def setSdHash(self, wallet):
+        try:
+            stream = yield wallet.get_stream_info_for_name(self.name)
+            metadata = Metadata.Metadata(stream)
+            self.sd_hash = _getSdHash(metadata)
+        except (Error.InvalidStreamInfoError, AssertionError):
+            pass
+        except Exception:
+            log.exception('What happened')
+
+    @defer.inlineCallbacks
+    def download_sd_blob(self, session):
+        print('Trying to get sd_blob for {} using {}'.format(self.name, self.sd_hash))
+        try:
+            blob = yield download_sd_blob_with_timeout(
+                session, self.sd_hash, session.payment_rate_manager)
+
+            self.sd_blob = blob
+            yield self._after_download(blob)
+            print('Downloaded sd_blob for {} using {}'.format(self.name, self.sd_hash))
+        except defer.TimeoutError:
+            print('Downloading sd_blob for {} timed-out'.format(self.name))
+            # swallow errors from the timeout
+            pass
+        except Exception:
+            log.exception('Failed to download {}'.format(self.name))
+
+    def _after_download(self, blob):
+        pass
+
+def _getSdHash(metadata):
+    return metadata['sources']['lbry_sd_hash']
+
+
+def download_sd_blob_with_timeout(session, sd_hash, payment_rate_manager):
+    d = StreamDescriptor.download_sd_blob(session, sd_hash, payment_rate_manager)
+    d.addTimeout(random.randint(10, 30), reactor)
+    return d
diff --git a/scripts/pool.py b/scripts/pool.py
new file mode 100644
index 000000000..8ba4199d8
--- /dev/null
+++ b/scripts/pool.py
@@ -0,0 +1,50 @@
+import itertools
+import logging
+
+from twisted.internet import defer
+
+
+log = logging.getLogger(__name__)
+
+
+class DeferredPool(defer.Deferred):
+    def __init__(self, deferred_iter, pool_size):
+        self.deferred_iter = deferred_iter
+        self.pool_size = pool_size
+        # results are stored unordered
+        self.result_list = []
+        self.started_count = 0
+        self.total_count = None
+        defer.Deferred.__init__(self)
+
+        for deferred in itertools.islice(deferred_iter, pool_size):
+            self._start_one(deferred)
+
+    def _start_one(self, deferred):
+        deferred.addCallbacks(self._callback, self._callback,
+                              callbackArgs=(self.started_count, defer.SUCCESS),
+                              errbackArgs=(self.started_count, defer.FAILURE))
+        self.started_count += 1
+
+    def _callback(self, result, index, success):
+        self.result_list.append((index, success, result))
+        if self._done():
+            self._finish()
+        else:
+            self._process_next()
+        return result
+
+    def _done(self):
+        return self.total_count  == len(self.result_list)
+
+    def _finish(self):
+        result_list = [(s, r) for i, s, r in sorted(self.result_list)]
+        self.callback(result_list)
+
+    def _process_next(self):
+        try:
+            deferred = next(self.deferred_iter)
+        except StopIteration:
+            self.total_count = self.started_count
+        else:
+            self._start_one(deferred)
diff --git a/scripts/query_available_blobs.py b/scripts/query_available_blobs.py
new file mode 100644
index 000000000..603b687cc
--- /dev/null
+++ b/scripts/query_available_blobs.py
@@ -0,0 +1,186 @@
+from __future__ import print_function
+from lbrynet.core import log_support
+
+import argparse
+import collections
+import itertools
+import logging
+import os
+import random
+import shutil
+import sys
+import tempfile
+
+import appdirs
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.internet import protocol
+from twisted.internet import endpoints
+
+from lbrynet import analytics
+from lbrynet import conf
+from lbrynet.core import Error
+from lbrynet.core import Wallet
+from lbrynet.core import BlobAvailability
+from lbrynet.core import BlobManager
+from lbrynet.core import HashAnnouncer
+from lbrynet.core import PeerManager
+from lbrynet.core import Session
+from lbrynet.core import utils
+from lbrynet.core.client import DHTPeerFinder
+from lbrynet.dht import node
+from lbrynet.metadata import Metadata
+from lbrynet.core import StreamDescriptor as sd
+
+import common
+import name
+import pool
+import track
+
+
+log = logging.getLogger()
+
+
+def main(args=None):
+    conf.initialize_settings()
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--limit', type=int)
+    parser.add_argument('--download', action='store_true',
+                        help='Set flag to also download each sd_blob and report on success')
+    args = parser.parse_args(args)
+
+    log_support.configure_console()
+    log_support.configure_twisted()
+
+    # make a fresh dir or else we will include blobs that we've
+    # already downloaded but might not otherwise be available.
+    db_dir = tempfile.mkdtemp()
+    try:
+        blob_dir = os.path.join(db_dir, 'blobfiles')
+        os.makedirs(blob_dir)
+        storage = Wallet.InMemoryStorage()
+        wallet = Wallet.LBRYumWallet(storage)
+        session = Session.Session(
+            0,
+            db_dir=db_dir,
+            lbryid=utils.generate_id(),
+            blob_dir=blob_dir,
+            dht_node_port=4444,
+            known_dht_nodes=conf.settings.known_dht_nodes,
+            peer_port=3333,
+            use_upnp=False,
+            wallet=wallet
+        )
+        api = analytics.Api.new_instance()
+        run(args, session, api)
+        reactor.run()
+    finally:
+        shutil.rmtree(db_dir)
+
+
+@defer.inlineCallbacks
+def run(args, session, api):
+    try:
+        yield session.setup()
+        names = yield common.getNames(session.wallet)
+        if args.limit and len(names) > args.limit:
+            names = random.sample(list(names), args.limit)
+        names = [Name(n) for n in names]
+        blob_tracker = BlobAvailability.BlobAvailabilityTracker(
+            session.blob_manager, session.peer_finder, session.dht_node)
+
+        tracker = yield Tracker(session, names, blob_tracker)
+        yield tracker.processNameClaims(args.download)
+        event = makeEvent(tracker.stats)
+        if args.download and not args.limit:
+            api.track(event)
+        else:
+            # don't send event to analytics if it doesn't contain the full info
+            print(event)
+    except Exception:
+        log.exception('Something bad happened')
+    finally:
+        reactor.stop()
+
+
+class Tracker(track.Tracker):
+    def __init__(self, session, names, blob_tracker):
+        track.Tracker.__init__(self, session, names)
+        self.blob_tracker = blob_tracker
+
+    @defer.inlineCallbacks
+    def processNameClaims(self, download=False):
+        try:
+            yield self._getSdHashes()
+            yield self._filterNames('sd_hash')
+            yield self._checkAvailability()
+            yield self._filterNames('is_available')
+            yield self.print_attempts_counter()
+            if download:
+                yield self._downloadAllBlobs()
+                yield self._filterNames('sd_blob')
+        except Exception:
+            log.exception('Something bad happened')
+
+    def print_attempts_counter(self):
+        print(self.attempts_counter)
+
+    def attempts_counter(self):
+        return collections.Counter([n.availability_attempts for n in self.names])
+
+    def _checkAvailability(self):
+        return pool.DeferredPool(
+            (n.check_availability(self.blob_tracker) for n in self.names),
+            10
+        )
+
+
+class Name(name.Name):
+    # From experience, very few sd_blobs get found after the third attempt
+    MAX_ATTEMPTS = 6
+    def __init__(self, my_name):
+        name.Name.__init__(self, my_name)
+        self.is_available = None
+        self.availability_attempts = 0
+
+    @defer.inlineCallbacks
+    def _check_availability(self, blob_tracker):
+        b = yield blob_tracker.get_blob_availability(self.sd_hash)
+        peer_count = b[self.sd_hash]
+        self._setAvailable(peer_count)
+
+    @defer.inlineCallbacks
+    def check_availability(self, blob_tracker):
+        while not self.is_available and self.availability_attempts < self.MAX_ATTEMPTS:
+            self.availability_attempts += 1
+            log.info('Attempt %s to find %s', self.availability_attempts, self.name)
+            yield self._check_availability(blob_tracker)
+
+    def _setAvailable(self, peer_count):
+        self.is_available = peer_count > 0
+
+
+def makeEvent(stats):
+    return {
+        'userId': 'lbry',
+        'event': 'Content Availability',
+        'properties': {
+            'total_published': stats['sd_hash'],
+            'sd_blob_available_on_dht': stats['is_available'],
+            'sd_blob_available_for_download': stats['sd_blob'],
+        },
+        'context': {
+            'app': {
+                'name': 'Availability Tracker',
+                'version': 1,
+            },
+            'library': {
+                'name': 'lbrynet-analytics',
+                'version': '1.0.0'
+            },
+        },
+        'timestamp': utils.isonow()
+    }
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/scripts/send_sd_blobs_to_lighthouse.py b/scripts/send_sd_blobs_to_lighthouse.py
new file mode 100644
index 000000000..405348d85
--- /dev/null
+++ b/scripts/send_sd_blobs_to_lighthouse.py
@@ -0,0 +1,176 @@
+from __future__ import print_function
+from lbrynet.core import log_support
+
+import argparse
+import collections
+import itertools
+import logging
+import os
+import random
+import sys
+
+import appdirs
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.internet import protocol
+from twisted.internet import endpoints
+
+from lbrynet import conf
+from lbrynet.core import Error
+from lbrynet.core import Wallet
+from lbrynet.core import BlobAvailability
+from lbrynet.core import BlobManager
+from lbrynet.core import HashAnnouncer
+from lbrynet.core import PeerManager
+from lbrynet.core import Session
+from lbrynet.core import utils
+from lbrynet.core.client import DHTPeerFinder
+from lbrynet.dht import node
+
+from lbrynet.metadata import Metadata
+from lbrynet.core import StreamDescriptor as sd
+from lbrynet import reflector
+
+import common
+import name
+import pool
+import track
+
+
+log = logging.getLogger('main')
+
+
+def main(args=None):
+    conf.initialize_settings()
+    parser = argparse.ArgumentParser()
+    parser.add_argument('destination', type=conf.server_port, nargs='+')
+    parser.add_argument('--names', nargs='*')
+    parser.add_argument('--limit', type=int)
+    args = parser.parse_args(args)
+
+    log_support.configure_console(level='INFO')
+
+    db_dir = appdirs.user_data_dir('lighthouse-uploader')
+    safe_makedirs(db_dir)
+    # no need to persist metadata info
+    storage = Wallet.InMemoryStorage()
+    wallet = Wallet.LBRYumWallet(storage)
+    blob_dir = os.path.join(db_dir, 'blobfiles')
+    safe_makedirs(blob_dir)
+    # Don't set a hash_announcer, we have no need to tell anyone we
+    # have these blobs
+    blob_manager = BlobManager.DiskBlobManager(None, blob_dir, db_dir)
+    # TODO: make it so that I can disable the BlobAvailabilityTracker
+    #       or, in general, make the session more reusable for users
+    #       that only want part of the functionality
+    session = Session.Session(
+        blob_data_payment_rate=0,
+        db_dir=db_dir,
+        lbryid=utils.generate_id(),
+        blob_dir=blob_dir,
+        dht_node_port=4444,
+        known_dht_nodes=conf.settings.known_dht_nodes,
+        peer_port=3333,
+        use_upnp=False,
+        wallet=wallet,
+        blob_manager=blob_manager,
+    )
+    assert session.wallet
+    run(session, args.destination, args.names, args.limit)
+    reactor.run()
+
+
+def safe_makedirs(directory):
+    try:
+        os.makedirs(directory)
+    except OSError:
+        pass
+
+
+@defer.inlineCallbacks
+def run(session, destinations, names, limit):
+    try:
+        yield session.setup()
+        while not session.wallet.network.is_connected():
+            log.info('Retrying wallet startup')
+            try:
+                yield session.wallet.start()
+            except ValueError:
+                pass
+        names = yield getNames(session.wallet, names)
+        if limit and limit < len(names):
+            names = random.sample(names, limit)
+        log.info('Processing %s names', len(names))
+        names = [Name(n, session.blob_manager) for n in names]
+        t = Tracker(session, destinations, names)
+        yield t.processNameClaims()
+    except Exception:
+        log.exception('Something bad happened')
+    finally:
+        log.warning('We are stopping the reactor gracefully')
+        reactor.stop()
+
+
+def logAndStop(err):
+    log_support.failure(err, log, 'This sucks: %s')
+    reactor.stop()
+
+
+def logAndRaise(err):
+    log_support.failure(err, log, 'This still sucks: %s')
+    return err
+
+
+class Tracker(track.Tracker):
+    def __init__(self, session, destinations, names):
+        self.destinations = destinations
+        track.Tracker.__init__(self, session, names)
+
+    @property
+    def blob_manager(self):
+        return self.session.blob_manager
+
+    @defer.inlineCallbacks
+    def processNameClaims(self):
+        yield super(Tracker, self).processNameClaims()
+        log.info('Sending the blobs')
+        yield self._sendSdBlobs()
+
+    @defer.inlineCallbacks
+    def _sendSdBlobs(self):
+        blobs = [n.sd_blob for n in self.names if n.sd_blob]
+        log.info('Sending %s blobs', len(blobs))
+        blob_hashes = [b.blob_hash for b in blobs]
+        for destination in self.destinations:
+            factory = reflector.BlobClientFactory(self.blob_manager, blob_hashes)
+            yield self._connect(destination, factory)
+
+    @defer.inlineCallbacks
+    def _connect(self, destination, factory):
+        url, port = destination
+        ip = yield reactor.resolve(url)
+        try:
+            print('Connecting to {}'.format(ip))
+            yield reactor.connectTCP(ip, port, factory)
+            #factory.finished_deferred.addTimeout(60, reactor)
+            value = yield factory.finished_deferred
+            if value:
+                print('Success!')
+            else:
+                print('Not success?', value)
+        except Exception:
+            log.exception('Somehow failed to send blobs')
+
+
+class Name(name.Name):
+    def __init__(self, my_name, blob_manager):
+        name.Name.__init__(self, my_name)
+        self.blob_manager = blob_manager
+
+    def _after_download(self, blob):
+        # keep the blob for future runs
+        self.blob_manager.blob_completed(blob)
+
+
+if __name__ == '__main__':
+    sys.exit(main())
diff --git a/scripts/track.py b/scripts/track.py
new file mode 100644
index 000000000..2cf7e66f1
--- /dev/null
+++ b/scripts/track.py
@@ -0,0 +1,44 @@
+import logging
+import random
+
+from twisted.internet import defer
+
+from lbrynet.core import Error
+
+import pool
+
+
+log = logging.getLogger(__name__)
+
+
+class Tracker(object):
+    def __init__(self, session, names):
+        self.session = session
+        self.names = names
+        self.stats = {}
+
+    @property
+    def wallet(self):
+        return self.session.wallet
+
+    @defer.inlineCallbacks
+    def processNameClaims(self):
+        try:
+            log.info('Starting to get name claims')
+            yield self._getSdHashes()
+            self._filterNames('sd_hash')
+            log.info('Downloading all of the blobs')
+            yield self._downloadAllBlobs()
+        except Exception:
+            log.exception('Something bad happened')
+
+    def _getSdHashes(self):
+        return pool.DeferredPool((n.setSdHash(self.wallet) for n in self.names), 10)
+
+    def _filterNames(self, attr):
+        self.names = [n for n in self.names if getattr(n, attr)]
+        self.stats[attr] = len(self.names)
+        print("We have {} names with attribute {}".format(len(self.names), attr))
+
+    def _downloadAllBlobs(self):
+        return pool.DeferredPool((n.download_sd_blob(self.session) for n in self.names), 10)