From 29e59e783a4b38d359eaf23ad32b5b25ca193023 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 16 Dec 2016 13:35:33 -0600 Subject: [PATCH] scripts to test reflector --- scripts/download_blob_from_peer.py | 147 +++++++++++++++++++++++ scripts/download_blobs_from_reflector.py | 80 ++++++++++++ 2 files changed, 227 insertions(+) create mode 100644 scripts/download_blob_from_peer.py create mode 100644 scripts/download_blobs_from_reflector.py diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py new file mode 100644 index 000000000..1f8467c9d --- /dev/null +++ b/scripts/download_blob_from_peer.py @@ -0,0 +1,147 @@ +"""A simple script that attempts to directly download a single blob from a given peer""" +import argparse +import logging +import sys +import tempfile + +from twisted.internet import defer +from twisted.internet import reactor +from zope.interface import implements + +from lbrynet import interfaces +from lbrynet import conf +from lbrynet.core import log_support +from lbrynet.core import BlobManager +from lbrynet.core import HashAnnouncer +from lbrynet.core import HashBlob +from lbrynet.core import RateLimiter +from lbrynet.core import Peer +from lbrynet.core import Wallet +from lbrynet.core.client import BlobRequester +from lbrynet.core.client import ConnectionManager + + +log = logging.getLogger() +SUCCESS = False + + +def main(args=None): + parser = argparse.ArgumentParser() + parser.add_argument('--timeout', type=int, default=30) + parser.add_argument('peer') + parser.add_argument('blob_hash') + args = parser.parse_args(args) + + log_support.configure_console(level='DEBUG') + + announcer = HashAnnouncer.DummyHashAnnouncer() + blob_manager = MyBlobManager(announcer) + blob = HashBlob.TempBlob(args.blob_hash, False) + download_manager = SingleBlobDownloadManager(blob) + peer = Peer.Peer(*conf.server_port(args.peer)) + payment_rate_manager = DumbPaymentRateManager() + wallet = getWallet() + requester = SingleBlobRequester( + peer, blob_manager, payment_rate_manager, wallet, download_manager) + rate_limiter = RateLimiter.DummyRateLimiter() + downloader = SingleBlobDownloader() + connection_manager = ConnectionManager.ConnectionManager( + downloader, rate_limiter, [requester], [wallet.get_info_exchanger()]) + reactor.callLater(args.timeout, reactor.stop) + d = connection_manager.start() + d.addErrback(log_support.failure, 'Something bad happened: %s') + reactor.run() + + if SUCCESS: + sys.exit(0) + else: + sys.exit(1) + + +class MyBlobManager(BlobManager.BlobManager): + def blob_completed(self, blob): + global SUCCESS + log.info('Blob has been downloaded, we can stop') + # this feels pretty hacky, but its as good of a stopping point as any + SUCCESS = True + reactor.stop() + + +def getWallet(): + config = {'auto_connect': True} + if conf.settings.lbryum_wallet_dir: + config['lbryum_path'] = conf.settings.lbryum_wallet_dir + db_dir = tempfile.mkdtemp() + return Wallet.LBRYumWallet(db_dir, config) + + +class SingleBlobDownloader(object): + def insufficientfunds(self, err): + pass + + +class SingleBlobDownloadManager(object): + def __init__(self, blob): + self.blob = blob + + def needed_blobs(self): + if self.blob.verified: + return [] + else: + return [self.blob] + + +class NullStrategy(object): + def __init__(self): + self.pending_sent_offers = {} + + +class DumbPaymentRateManager(object): + def __init__(self): + self.strategy = NullStrategy() + + def price_limit_reached(self, peer): + return False + + def get_rate_blob_data(self, *args): + return 0.0 + + def record_offer_reply(self, peer, offer): + pass + + +class FreeDownload(BlobRequester.DownloadRequest): + def _pay_peer(self, *args): + # TODO: somewhere I missed the part that is supposed to get + # and address from the remote server for where to send + # data fees to so we can't make payments. Probably has + # to do with the wallet_info_exchanger + pass + + +class SingleBlobRequester(BlobRequester.BlobRequester): + implements(interfaces.IRequestCreator) + DownloadRequest = FreeDownload + + def __init__(self, peer, blob_manager, payment_rate_manager, wallet, download_manager): + self.peer = peer + self.sent = False + BlobRequester.BlobRequester.__init__( + self, blob_manager, None, payment_rate_manager, wallet, download_manager) + + def __repr__(self): + return 'SingleBlobRequestor({!r})'.format(self.peer) + + def get_new_peers(self): + if self.sent: + return defer.succeed([]) + else: + self.sent = True + return defer.succeed([self.peer]) + + def send_next_request(self, peer, protocol): + return self._send_next_request(peer, protocol) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/scripts/download_blobs_from_reflector.py b/scripts/download_blobs_from_reflector.py new file mode 100644 index 000000000..19a0ac219 --- /dev/null +++ b/scripts/download_blobs_from_reflector.py @@ -0,0 +1,80 @@ +"""A test script that downloads blobs from a reflector server""" +import argparse +import itertools +import json +import random +import subprocess +import sys + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('reflector_ip') + parser.add_argument('--ssh-key') + parser.add_argument('--size', type=int, default=100) + parser.add_argument('--batch', type=int, default=10) + parser.add_argument('--timeout', type=int, default=30) + parser.add_argument('--hashes', help='file listing hashes in json') + args = parser.parse_args() + + if args.hashes: + hashes = readHashes(args.hashes) + else: + hashes = getHashes(args.reflector_ip, args.ssh_key) + if len(hashes) > args.size: + selected_hashes = random.sample(hashes, args.size) + else: + print 'Only {} hashes are available'.format(hashes) + selected_hashes = hashes + + successes = 0 + for hashes in grouper(selected_hashes, args.batch): + hashes = filter(None, hashes) + successes += downloadHashes(args.reflector_ip, hashes, args.timeout) + print 'Downloaded {} / {}'.format(successes, len(selected_hashes)) + + +def grouper(iterable, n, fillvalue=None): + "Collect data into fixed-length chunks or blocks" + # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx + args = [iter(iterable)] * n + return itertools.izip_longest(fillvalue=fillvalue, *args) + + +def readHashes(hash_file): + with open(hash_file) as f: + return json.load(f) + + +def getHashes(ip, key=None): + key = ['-i', key] if key else [] + hashes = subprocess.check_output(['ssh'] + key + + ['lbry@{}'.format(ip), '/opt/venvs/lbrynet/bin/lbrynet-cli', 'get_blob_hashes']) + return json.loads(hashes) + + +def downloadHashes(ip, blob_hashes, timeout=30): + processes = [ + subprocess.Popen( + [ + 'python', + 'download_blob_from_peer.py', + '--timeout', str(timeout), '{}:3333'.format(ip), blob_hash, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + for blob_hash in blob_hashes + ] + for p, h in zip(processes, blob_hashes): + stdout, stderr = p.communicate() + print p.returncode, h + if p.returncode != 0: + print 'Failed to download', h + print stdout + print stderr + return sum(1 for p in processes if p.returncode == 0) + + +if __name__ == '__main__': + sys.exit(main())