scripts to test reflector
This commit is contained in:
parent
9ee95d57fc
commit
29e59e783a
2 changed files with 227 additions and 0 deletions
147
scripts/download_blob_from_peer.py
Normal file
147
scripts/download_blob_from_peer.py
Normal file
|
@ -0,0 +1,147 @@
|
|||
"""A simple script that attempts to directly download a single blob from a given peer"""
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import reactor
|
||||
from zope.interface import implements
|
||||
|
||||
from lbrynet import interfaces
|
||||
from lbrynet import conf
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.core import BlobManager
|
||||
from lbrynet.core import HashAnnouncer
|
||||
from lbrynet.core import HashBlob
|
||||
from lbrynet.core import RateLimiter
|
||||
from lbrynet.core import Peer
|
||||
from lbrynet.core import Wallet
|
||||
from lbrynet.core.client import BlobRequester
|
||||
from lbrynet.core.client import ConnectionManager
|
||||
|
||||
|
||||
log = logging.getLogger()
|
||||
SUCCESS = False
|
||||
|
||||
|
||||
def main(args=None):
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--timeout', type=int, default=30)
|
||||
parser.add_argument('peer')
|
||||
parser.add_argument('blob_hash')
|
||||
args = parser.parse_args(args)
|
||||
|
||||
log_support.configure_console(level='DEBUG')
|
||||
|
||||
announcer = HashAnnouncer.DummyHashAnnouncer()
|
||||
blob_manager = MyBlobManager(announcer)
|
||||
blob = HashBlob.TempBlob(args.blob_hash, False)
|
||||
download_manager = SingleBlobDownloadManager(blob)
|
||||
peer = Peer.Peer(*conf.server_port(args.peer))
|
||||
payment_rate_manager = DumbPaymentRateManager()
|
||||
wallet = getWallet()
|
||||
requester = SingleBlobRequester(
|
||||
peer, blob_manager, payment_rate_manager, wallet, download_manager)
|
||||
rate_limiter = RateLimiter.DummyRateLimiter()
|
||||
downloader = SingleBlobDownloader()
|
||||
connection_manager = ConnectionManager.ConnectionManager(
|
||||
downloader, rate_limiter, [requester], [wallet.get_info_exchanger()])
|
||||
reactor.callLater(args.timeout, reactor.stop)
|
||||
d = connection_manager.start()
|
||||
d.addErrback(log_support.failure, 'Something bad happened: %s')
|
||||
reactor.run()
|
||||
|
||||
if SUCCESS:
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
class MyBlobManager(BlobManager.BlobManager):
|
||||
def blob_completed(self, blob):
|
||||
global SUCCESS
|
||||
log.info('Blob has been downloaded, we can stop')
|
||||
# this feels pretty hacky, but its as good of a stopping point as any
|
||||
SUCCESS = True
|
||||
reactor.stop()
|
||||
|
||||
|
||||
def getWallet():
|
||||
config = {'auto_connect': True}
|
||||
if conf.settings.lbryum_wallet_dir:
|
||||
config['lbryum_path'] = conf.settings.lbryum_wallet_dir
|
||||
db_dir = tempfile.mkdtemp()
|
||||
return Wallet.LBRYumWallet(db_dir, config)
|
||||
|
||||
|
||||
class SingleBlobDownloader(object):
|
||||
def insufficientfunds(self, err):
|
||||
pass
|
||||
|
||||
|
||||
class SingleBlobDownloadManager(object):
|
||||
def __init__(self, blob):
|
||||
self.blob = blob
|
||||
|
||||
def needed_blobs(self):
|
||||
if self.blob.verified:
|
||||
return []
|
||||
else:
|
||||
return [self.blob]
|
||||
|
||||
|
||||
class NullStrategy(object):
|
||||
def __init__(self):
|
||||
self.pending_sent_offers = {}
|
||||
|
||||
|
||||
class DumbPaymentRateManager(object):
|
||||
def __init__(self):
|
||||
self.strategy = NullStrategy()
|
||||
|
||||
def price_limit_reached(self, peer):
|
||||
return False
|
||||
|
||||
def get_rate_blob_data(self, *args):
|
||||
return 0.0
|
||||
|
||||
def record_offer_reply(self, peer, offer):
|
||||
pass
|
||||
|
||||
|
||||
class FreeDownload(BlobRequester.DownloadRequest):
|
||||
def _pay_peer(self, *args):
|
||||
# TODO: somewhere I missed the part that is supposed to get
|
||||
# and address from the remote server for where to send
|
||||
# data fees to so we can't make payments. Probably has
|
||||
# to do with the wallet_info_exchanger
|
||||
pass
|
||||
|
||||
|
||||
class SingleBlobRequester(BlobRequester.BlobRequester):
|
||||
implements(interfaces.IRequestCreator)
|
||||
DownloadRequest = FreeDownload
|
||||
|
||||
def __init__(self, peer, blob_manager, payment_rate_manager, wallet, download_manager):
|
||||
self.peer = peer
|
||||
self.sent = False
|
||||
BlobRequester.BlobRequester.__init__(
|
||||
self, blob_manager, None, payment_rate_manager, wallet, download_manager)
|
||||
|
||||
def __repr__(self):
|
||||
return 'SingleBlobRequestor({!r})'.format(self.peer)
|
||||
|
||||
def get_new_peers(self):
|
||||
if self.sent:
|
||||
return defer.succeed([])
|
||||
else:
|
||||
self.sent = True
|
||||
return defer.succeed([self.peer])
|
||||
|
||||
def send_next_request(self, peer, protocol):
|
||||
return self._send_next_request(peer, protocol)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
80
scripts/download_blobs_from_reflector.py
Normal file
80
scripts/download_blobs_from_reflector.py
Normal file
|
@ -0,0 +1,80 @@
|
|||
"""A test script that downloads blobs from a reflector server"""
|
||||
import argparse
|
||||
import itertools
|
||||
import json
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('reflector_ip')
|
||||
parser.add_argument('--ssh-key')
|
||||
parser.add_argument('--size', type=int, default=100)
|
||||
parser.add_argument('--batch', type=int, default=10)
|
||||
parser.add_argument('--timeout', type=int, default=30)
|
||||
parser.add_argument('--hashes', help='file listing hashes in json')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.hashes:
|
||||
hashes = readHashes(args.hashes)
|
||||
else:
|
||||
hashes = getHashes(args.reflector_ip, args.ssh_key)
|
||||
if len(hashes) > args.size:
|
||||
selected_hashes = random.sample(hashes, args.size)
|
||||
else:
|
||||
print 'Only {} hashes are available'.format(hashes)
|
||||
selected_hashes = hashes
|
||||
|
||||
successes = 0
|
||||
for hashes in grouper(selected_hashes, args.batch):
|
||||
hashes = filter(None, hashes)
|
||||
successes += downloadHashes(args.reflector_ip, hashes, args.timeout)
|
||||
print 'Downloaded {} / {}'.format(successes, len(selected_hashes))
|
||||
|
||||
|
||||
def grouper(iterable, n, fillvalue=None):
|
||||
"Collect data into fixed-length chunks or blocks"
|
||||
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
|
||||
args = [iter(iterable)] * n
|
||||
return itertools.izip_longest(fillvalue=fillvalue, *args)
|
||||
|
||||
|
||||
def readHashes(hash_file):
|
||||
with open(hash_file) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def getHashes(ip, key=None):
|
||||
key = ['-i', key] if key else []
|
||||
hashes = subprocess.check_output(['ssh'] + key +
|
||||
['lbry@{}'.format(ip), '/opt/venvs/lbrynet/bin/lbrynet-cli', 'get_blob_hashes'])
|
||||
return json.loads(hashes)
|
||||
|
||||
|
||||
def downloadHashes(ip, blob_hashes, timeout=30):
|
||||
processes = [
|
||||
subprocess.Popen(
|
||||
[
|
||||
'python',
|
||||
'download_blob_from_peer.py',
|
||||
'--timeout', str(timeout), '{}:3333'.format(ip), blob_hash,
|
||||
],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
for blob_hash in blob_hashes
|
||||
]
|
||||
for p, h in zip(processes, blob_hashes):
|
||||
stdout, stderr = p.communicate()
|
||||
print p.returncode, h
|
||||
if p.returncode != 0:
|
||||
print 'Failed to download', h
|
||||
print stdout
|
||||
print stderr
|
||||
return sum(1 for p in processes if p.returncode == 0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
Loading…
Reference in a new issue