Merge pull request #350 from lbryio/reflector-test
scripts to test reflector
This commit is contained in:
commit
a289886365
2 changed files with 227 additions and 0 deletions
147
scripts/download_blob_from_peer.py
Normal file
147
scripts/download_blob_from_peer.py
Normal file
|
@ -0,0 +1,147 @@
|
||||||
|
"""A simple script that attempts to directly download a single blob from a given peer"""
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
from twisted.internet import reactor
|
||||||
|
from zope.interface import implements
|
||||||
|
|
||||||
|
from lbrynet import interfaces
|
||||||
|
from lbrynet import conf
|
||||||
|
from lbrynet.core import log_support
|
||||||
|
from lbrynet.core import BlobManager
|
||||||
|
from lbrynet.core import HashAnnouncer
|
||||||
|
from lbrynet.core import HashBlob
|
||||||
|
from lbrynet.core import RateLimiter
|
||||||
|
from lbrynet.core import Peer
|
||||||
|
from lbrynet.core import Wallet
|
||||||
|
from lbrynet.core.client import BlobRequester
|
||||||
|
from lbrynet.core.client import ConnectionManager
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger()
|
||||||
|
SUCCESS = False
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('--timeout', type=int, default=30)
|
||||||
|
parser.add_argument('peer')
|
||||||
|
parser.add_argument('blob_hash')
|
||||||
|
args = parser.parse_args(args)
|
||||||
|
|
||||||
|
log_support.configure_console(level='DEBUG')
|
||||||
|
|
||||||
|
announcer = HashAnnouncer.DummyHashAnnouncer()
|
||||||
|
blob_manager = MyBlobManager(announcer)
|
||||||
|
blob = HashBlob.TempBlob(args.blob_hash, False)
|
||||||
|
download_manager = SingleBlobDownloadManager(blob)
|
||||||
|
peer = Peer.Peer(*conf.server_port(args.peer))
|
||||||
|
payment_rate_manager = DumbPaymentRateManager()
|
||||||
|
wallet = getWallet()
|
||||||
|
requester = SingleBlobRequester(
|
||||||
|
peer, blob_manager, payment_rate_manager, wallet, download_manager)
|
||||||
|
rate_limiter = RateLimiter.DummyRateLimiter()
|
||||||
|
downloader = SingleBlobDownloader()
|
||||||
|
connection_manager = ConnectionManager.ConnectionManager(
|
||||||
|
downloader, rate_limiter, [requester], [wallet.get_info_exchanger()])
|
||||||
|
reactor.callLater(args.timeout, reactor.stop)
|
||||||
|
d = connection_manager.start()
|
||||||
|
d.addErrback(log_support.failure, 'Something bad happened: %s')
|
||||||
|
reactor.run()
|
||||||
|
|
||||||
|
if SUCCESS:
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
class MyBlobManager(BlobManager.BlobManager):
|
||||||
|
def blob_completed(self, blob):
|
||||||
|
global SUCCESS
|
||||||
|
log.info('Blob has been downloaded, we can stop')
|
||||||
|
# this feels pretty hacky, but its as good of a stopping point as any
|
||||||
|
SUCCESS = True
|
||||||
|
reactor.stop()
|
||||||
|
|
||||||
|
|
||||||
|
def getWallet():
|
||||||
|
config = {'auto_connect': True}
|
||||||
|
if conf.settings.lbryum_wallet_dir:
|
||||||
|
config['lbryum_path'] = conf.settings.lbryum_wallet_dir
|
||||||
|
db_dir = tempfile.mkdtemp()
|
||||||
|
return Wallet.LBRYumWallet(db_dir, config)
|
||||||
|
|
||||||
|
|
||||||
|
class SingleBlobDownloader(object):
|
||||||
|
def insufficientfunds(self, err):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SingleBlobDownloadManager(object):
|
||||||
|
def __init__(self, blob):
|
||||||
|
self.blob = blob
|
||||||
|
|
||||||
|
def needed_blobs(self):
|
||||||
|
if self.blob.verified:
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
return [self.blob]
|
||||||
|
|
||||||
|
|
||||||
|
class NullStrategy(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.pending_sent_offers = {}
|
||||||
|
|
||||||
|
|
||||||
|
class DumbPaymentRateManager(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.strategy = NullStrategy()
|
||||||
|
|
||||||
|
def price_limit_reached(self, peer):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_rate_blob_data(self, *args):
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
def record_offer_reply(self, peer, offer):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FreeDownload(BlobRequester.DownloadRequest):
|
||||||
|
def _pay_peer(self, *args):
|
||||||
|
# TODO: somewhere I missed the part that is supposed to get
|
||||||
|
# and address from the remote server for where to send
|
||||||
|
# data fees to so we can't make payments. Probably has
|
||||||
|
# to do with the wallet_info_exchanger
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SingleBlobRequester(BlobRequester.BlobRequester):
|
||||||
|
implements(interfaces.IRequestCreator)
|
||||||
|
DownloadRequest = FreeDownload
|
||||||
|
|
||||||
|
def __init__(self, peer, blob_manager, payment_rate_manager, wallet, download_manager):
|
||||||
|
self.peer = peer
|
||||||
|
self.sent = False
|
||||||
|
BlobRequester.BlobRequester.__init__(
|
||||||
|
self, blob_manager, None, payment_rate_manager, wallet, download_manager)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'SingleBlobRequestor({!r})'.format(self.peer)
|
||||||
|
|
||||||
|
def get_new_peers(self):
|
||||||
|
if self.sent:
|
||||||
|
return defer.succeed([])
|
||||||
|
else:
|
||||||
|
self.sent = True
|
||||||
|
return defer.succeed([self.peer])
|
||||||
|
|
||||||
|
def send_next_request(self, peer, protocol):
|
||||||
|
return self._send_next_request(peer, protocol)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.exit(main())
|
80
scripts/download_blobs_from_reflector.py
Normal file
80
scripts/download_blobs_from_reflector.py
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
"""A test script that downloads blobs from a reflector server"""
|
||||||
|
import argparse
|
||||||
|
import itertools
|
||||||
|
import json
|
||||||
|
import random
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('reflector_ip')
|
||||||
|
parser.add_argument('--ssh-key')
|
||||||
|
parser.add_argument('--size', type=int, default=100)
|
||||||
|
parser.add_argument('--batch', type=int, default=10)
|
||||||
|
parser.add_argument('--timeout', type=int, default=30)
|
||||||
|
parser.add_argument('--hashes', help='file listing hashes in json')
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.hashes:
|
||||||
|
hashes = readHashes(args.hashes)
|
||||||
|
else:
|
||||||
|
hashes = getHashes(args.reflector_ip, args.ssh_key)
|
||||||
|
if len(hashes) > args.size:
|
||||||
|
selected_hashes = random.sample(hashes, args.size)
|
||||||
|
else:
|
||||||
|
print 'Only {} hashes are available'.format(hashes)
|
||||||
|
selected_hashes = hashes
|
||||||
|
|
||||||
|
successes = 0
|
||||||
|
for hashes in grouper(selected_hashes, args.batch):
|
||||||
|
hashes = filter(None, hashes)
|
||||||
|
successes += downloadHashes(args.reflector_ip, hashes, args.timeout)
|
||||||
|
print 'Downloaded {} / {}'.format(successes, len(selected_hashes))
|
||||||
|
|
||||||
|
|
||||||
|
def grouper(iterable, n, fillvalue=None):
|
||||||
|
"Collect data into fixed-length chunks or blocks"
|
||||||
|
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
|
||||||
|
args = [iter(iterable)] * n
|
||||||
|
return itertools.izip_longest(fillvalue=fillvalue, *args)
|
||||||
|
|
||||||
|
|
||||||
|
def readHashes(hash_file):
|
||||||
|
with open(hash_file) as f:
|
||||||
|
return json.load(f)
|
||||||
|
|
||||||
|
|
||||||
|
def getHashes(ip, key=None):
|
||||||
|
key = ['-i', key] if key else []
|
||||||
|
hashes = subprocess.check_output(['ssh'] + key +
|
||||||
|
['lbry@{}'.format(ip), '/opt/venvs/lbrynet/bin/lbrynet-cli', 'get_blob_hashes'])
|
||||||
|
return json.loads(hashes)
|
||||||
|
|
||||||
|
|
||||||
|
def downloadHashes(ip, blob_hashes, timeout=30):
|
||||||
|
processes = [
|
||||||
|
subprocess.Popen(
|
||||||
|
[
|
||||||
|
'python',
|
||||||
|
'download_blob_from_peer.py',
|
||||||
|
'--timeout', str(timeout), '{}:3333'.format(ip), blob_hash,
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
for blob_hash in blob_hashes
|
||||||
|
]
|
||||||
|
for p, h in zip(processes, blob_hashes):
|
||||||
|
stdout, stderr = p.communicate()
|
||||||
|
print p.returncode, h
|
||||||
|
if p.returncode != 0:
|
||||||
|
print 'Failed to download', h
|
||||||
|
print stdout
|
||||||
|
print stderr
|
||||||
|
return sum(1 for p in processes if p.returncode == 0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
sys.exit(main())
|
Loading…
Reference in a new issue